Realtime Stream Processing with Coroutines

Rizo Isrof speaking at The Realtime Guild in April, 2017
912Views
 
Great talks, fired to your inbox 👌
No junk, no spam, just great talks. Unsubscribe any time.

About this talk

This talk introduces coroutines – a powerful control flow mechanism for cooperative multitasking. They can be used to elegantly solve real-world problems, from parsing to UNIX scripting, and they find application in modern programming languages embodied as generators, goroutines and streams. What makes coroutines different from threads and processes? How to implement them? When and why should you use them? To answer these questions we will discuss the implementation of a functional stream processing library and analyse the involved trade-offs.


Transcript


Thanks for introducing me Jim, and welcome. Welcome, and I hope you will enjoy this talk about coroutines, and how to build streams with coroutines. And, coroutines are essentially a cooperative model for multitasking, which means, multiple coroutines work together and I really want this presentation to be cooperative too, so if you have any questions, do ask me, ask me questions, if something is not clear just interrupt me and let me know. So, I will start the introduction to coroutines with this quote from Melvin Conway, who invented coroutines, and, it says that "Any organisation that designs a system "will produce a design whose structure is a copy "of the organization's communication structure." So, it's interesting how he emphasises communication and communication patterns, as a design methodology for computer systems in this case, and I think it's a very interesting fact, especially because coroutines are units of work, but we use them to create these complex communication systems. So, as opposed to processes that are essentially containers for work, we start them, we make them do some work, with coroutines we actually have this complex, sometimes networks of coroutines talking together and trying to solve the problem cooperatively. So, this talk is structured in two parts, and we will have a small break in between. The first part will be mainly about the history of coroutines and how they appeared, what kind of problems people solved with them, and I will also compare them to other units of work, like processes, threads and goroutines for example. I will also show you a few examples, just to introduce you to the concept and show you what you can actually do with coroutines. And then, in the second part, I will try to abstract, try to define an abstract model for coroutines and explain how they can be defined in a more mathematical way. And finally, using this knowledge we will try to build an actual stream processing library. So, this is the beginning of Part I and, everything began with COBOL, so, Conway was building compilers for COBOL, and most of the compilers at that time were very very slow, they parsed the input, the source programme, in multiple stages essentially so, as you probably know, compilers have very well defined stages where, they perform some lexicon analysis, they parse, create syntax trees and stuff like that, so, what he built was the first one-pass COBOL compiler, and, what does that mean? It means that, essentially you don't have to traverse your input data or intermediate data multiple times, and as a result you get a more efficient compiler. So, before that, each step in this diagram, which represents the design of this compiler, would run and produce some intermediate result then, the next stage would need to run with the input of this result and, again produce some new intermediate result, and, it was very inefficient. So, by using coroutines he was able to essentially separate the logic and transform the entire computation into a streaming model, essentially. So, in this case for example, A is a coroutine, and B is a coroutine, so all letters here are coroutines, and, when parsing began, once one stage had enough data to send to the next one, the next one would immediately start and do its work and once that stage was done, it would pass the work to the next one. So essentially, the work was completely separated, and continuously flowing through this diagram, instead of stopping at each stage, and of course it was just more efficient. So, this quote is from the original paper that introduced this design, and, Melvin compares coroutines to functions, to subroutines, and he says that, with coroutines you end up having this system where no-one is in charge. So, functions have this strange relation with each other, so you have the caller and the callee, so, if functions calls another one, then waits and, it's kind of in charge of starting that computation, but with coroutines it's fundamentally different, so coroutines just suspend their work and they can delegate the work to another coroutine, and, it means also that they all exist on the same level, so, it means that they work together essentially as a team and favour this communication to solve the problems. And, the result was just amazingly fast performance, it was able to parse 100 cards per minute, which is really surprising. Yeah so, another attempt to define coroutines was made by Donald Knuth, in his extensive writings, and, he essentially called coroutines functions, he said that all functions are a subset of coroutines, and, it also means that we use coroutines implicitly probably every day when we write code. The main difference between functions and coroutines is that, functions have this temporal relation between each other, so if function f calls function g, then g is allowed to run for some time, it produces some result, and it goes back to f, and then f continues its work, and g just disappeared completely, it was lost so, all the memory allocated, the stack was clean and frame pointer was moved, and then if f decides to call g again then, the entire process starts from scratch, essentially. But with coroutines what we can observe is that f calls g, but g probably already was called before, so its computation resumes, and it does some work and then it can call f again, and f does some work, so, they work together. And an interesting thing here is that, they don't work at the same time, so they allow each other to do their work, and just store the computation state while they are suspended, is everything clear? Cool. Now, how these things compare to something more popular let's say, threads or processes, and what's the difference between a process and a function? So, it's just simply some work that we want the computer to perform for us, right, but the way it does, the way processes and functions work are fundamentally different, and the calls of course are very different too. So, every single time we start a process, our operating system, the kernel, needs to allocate some data structures, it needs to perform some scheduling, verify if there are enough resources so, it essentially commands the execution of these different processes, and, it also decides for the process if it's going to wait for something, if it's blocked, if it needs to be terminated, or if it has the freedom to run essentially and do some useful work. We also have these references to allocated devices, open socket handlers for example and so on, and, the most costly part of managing processes is managing their memory, so. This is the representation of the process memory layout and, there is some reserved space for the kernel utilities, like scheduling state and registers and stuff like that, then we have the stack that essentially is the working space for functions, so every time we run a function, it goes to the stack, it allocates the space for the parameters but then, when the function is done, the stack is clean, we also have heap, some static data, text and so on. The interesting thing about the stack is that it's fixed size, which means that it does grow but it has this very well defined limit, so when we get a stack overflow it means that we essentially used stack too much, we probably used a recursive function or something like that, and stack just hit its limit here. Now, having this baseline for processes, let's try to go from there to coroutines and, maybe in functions and see how they compare. So, I introduced these five different variables, and processes of course are very heavyweight, so, their stack usually goes up to eight megabytes, it cannot of course be changed by, the rating system can provide some different parameters but, usually the reserved space for the stack is eight megabytes, which is a lot of space. And, as I said, they are managed by the rating system, which means that there's some additional managing cost involved, it's not just like calling a function, and, the operating system also decides if this unit of work has the resources available to run or not, so it's a pre-emptive form of scheduling, in this case. Each process has a very well defined rules and its own separate, individual address space, so two processes can just access the memory of one another, and, these restrictions of course allow them to be fully parallelizable, which means that you can have multiple processes running at the same time on multiple cores. Threads, threads I think were invented just to make processes more lightweight and performant, so, for each thread we usually allocate about two megabytes of data, and, for the stack of course, and they are also OS managed, they are also pre-emptive so they are very similar in the form they behave to processes, the only difference being that they share the memory space in which processes live so, multiple threads can access the same memory, which is very useful, but also very problematic as you probably know. So, one interesting thing about threads is that, they are not portable enough, so, Java for example, JVM, introduced the notion of green threads, that abstract away the entire operating system and allows the creation of a similar concept on the user level, completely avoiding kernel calls. So, they are more lightweight in a sense, so they just occupy 64 kilobytes of data, or more, so their stack can grow dynamically, and, they are not OS managed, they are pre-emptive, so the JVM still decides who's in charge and how the threads will be executed. They, again, share the address space of the container which is the process or an actual physical thread, and, given these restrictions, it also means that you can't have two green threads running at the same time, because they are already contained in one of these bigger containers. And, there's another interesting model, which was introduced by the Go programming language, and it's probably the closest model to coroutines that exists, that at the same time is parallel. So if we look at this table, we can see that the stack that goroutines allocate is very very small, but it also can grow, so, the allocation of the stack actually happens in two separate phases, there is some management on the actual process stack, but then the stack can grow, and can be allocated on the heap, so it's a dynamically managed memory in this case, it can grow, and it can shrink if needed. So they are managed by the Go runtime, and not the operating system, they are not pre-emptive, which is a huge difference as compared to other models, which means also that, if a goroutine is running, there's literally nothing that the operating system, well operating system actually can because it has special privileges as we know, but the Go runtime can just decide to stop a goroutine. And, yeah they also, just as green threads and threads, share the process memory, and they are parallelizable, they are parallel because multiple goroutines can be scheduled to run on the same physical OS thread, and this given this property. Now coroutines, well, it's essentially an abstraction that you can build for yourself, which means, if you need a stack then go ahead and allocate some space for stack, you can use the existing stack and kind of segment it in different parts and say that, this part here is for coroutine A and this one is for another, and so on. If you want you can allocate some space dynamically and use it again, so it's really flexible, but at the same time of course, probably hard to get right. It's not OS managed, you essentially start coroutines and they work, they do their work, they can decide to stop, they can decide to start another coroutines. They are not pre-emptively scheduled too, so, again, this gives them more freedom, and they also share the private space of a specific process in which they are contained, so. And they are not parallel, and they are not parallel because they are not concurrent which means, you cannot have two coroutines running exactly at the same time, their computation is not overlapped, so, it also reduces the huge number of concurrency bugs that are present in these different models, because, a coroutine just suspends and then another one starts, you don't have like actual concurrent access to your memory. Now, let's try to play with these ideas and let's try to build something maybe useful, I don't know. So, let's look at this horrible example in C and let's try to guess what it will return. So it's a form of a suspendable computation, what do you think will be the output for the first line here? If this talk for, is this talk for programmers, or are we, or do you, it's just switch, for, there's nothing fancy, just one function. - [Audience Member] Zero. - Zero, zero is one potential answer. OK, yes it is zero. What about the next execution? So let's walk through it. We call this coro function, and, we have this static, two variables actually, i and s, s is for state in this case, and it starts with zero, which means that when our switch checks the value of s, it will be of course zero the first time we run it, at least, and, this also means that this for loop will be executed immediately, and i will be set to zero, and then we check if not s in this case then, yes s, so it's essentially a flag that defines if the state was changed or not. And the first time this piece of code gets to this point, s will be changed to one, and it's completely ignored afterwards, so we return i, and at this point i is of course zero as we're seeing here, but then there's this strange case label so, what's for? The first execution will completely ignore it, but the next time we go there, our statically initialised variables, and let's not forget that static is actually for this part of memory, which means that it's not changed, once you declare something static it will be static forever, of course you can change the value of it but, like the memory is not clean at all, like with stacks, so it essentially allows our functions to create like this loophole and they can place stuff there, and the next execution will be able to retrieve the previous value from the static memory. So, we go back to switch and the initialization is not performed the second time, so, is it zero? No it's not, so we go to case one, and it leads us to nowhere essentially, but there's a loop, which means that we will go back to our for, and i will be incremented, and, the if check will be ignored, then we just return the value, at this point it will be one, and the next time it will be three. - [Audience Member] Two. - Yeah sorry, two. This is horrible, of course just, it was just an example of how you can kind of overcome the limitations of regular functions and create a form, an illusion of suspendable computations. Let's try something completely different, which is still horrible, so. The syntax is pretty right so, if, I actually renamed the function to count, just to show that it counts numbers, and I show you the output here, and, what it does is, it's very similar to the previous example, except it uses gotos, which are a very popular practise and, yeah. So, this example just shows that we can create an illusion of suspendable computations in C, if we really want to, and, we can even make it pretty and, almost similar to let's say Go or something. Should we do that? Well, I don't advise you so, please don't include that, this implementation at least of coroutines in your code. So, how are actually coroutines implemented in C or in other languages? And usually people rely on APIs, more low-level APIs that interact directly with the kernel and the memory and the stack and stuff like that so, essentially, they manipulate the way the execution of the code happens, so that they can get the suspendable thing that coroutines provide. And in this case, this is the entire API included in setjmp header, for C, and what it allows us is to essentially just simply control time, and, how is it possible? So, setjmp function is able to create an instance, save an instance of the execution world of our programme, into this jmp buffer, and it returns, potentially, eventually, a value that can count from the future. Then we have another function called longjmp, that takes the previously saved state of the world, and takes some value, and just goes back in time and makes your code just jump and start the same logic again. So, the first time setjmp is called it will always return zero, so it will be like the first execution, but the next time it's called by a longjmp, it will be something else, and this something else is the value that we provide here. So, yeah, that's how you control time in C, so what actually happens is that this jmp buffer contains the values, specific values for the registers used by the programme, and also the frame pointer essentially so, it stores the references and when you call longjmp again, it just replaces the existing values, actually erases them, because it writes on top of them, and changes the state of your programme. So yeah, another simple problem, and the question is the same, so what happens when you run it? We have setjmp that takes that world, and saves into it the present state, and we run this function, it will save the state but also will return zero the first time, because it's the first execution of that function, and the code just continues. And then, we have this apparently redundant check, if from future was triggered, in this case it won't be because it will be zero, and, we just call the function present. And, present just calls future, and the future prints two to the screen and then calls longjmp, which will trigger this time change, and it also passes the 42 value to the previous execution of setjmp. So, at this stage, what happens is that, setjmp, actually time changes, and the execution of your programme goes back here, and setjmp will actually return 42, and everything else is completely ignored. Your printf one, your printf three, nothing of that exists anymore, so, that code is never reached, it's just essentially dead code. And the output is two and 42, 42 because we got something else that's not zero here and we just print the value, and that value came from the future. OK, so here's another form of coroutines, probably very familiar to many of you, so it's just generators, record generators, lazy sequences, functions that can suspend and produce some values in between, and they produce values on demand when you ask them. So, in this case we can see our count function implemented in Python, and we can also define the Fibonacci sequence too. So, essentially the idea here is that, yield will produce the value that a contains in this case for example, and everything else will be just suspended for a few moments, at least until this function gets called again by some other function or coroutine, and this also means that it will store the previous state and just continue the execution. So, the way you get the values from this function is just by saying generator.next, and it will just give you the next value. And, if for instance you had some terminating condition, and your count or any other producer would decide to stop, then you would get the stop iteration exception. And, my goal for this talk is to implement this model in a more efficient and, and elegant way from scratch, so we will be implementing generators, we will be implementing a form of consumers that are also suspendable, but all of this will be done in the second part. In this second part I will be comparing coroutines to another very popular model, which is pipes, like Unix pipes. And, most of you probably know how to use them, you use them daily to perform some data analysis, basic data analysis, just check the logs or stuff like that, and, it's interesting how this very very pragmatic model that we use daily is a reflection of coroutines. And, I will try to dissect pipes, and I will try to show you how they are defined in a more mathematical sense, and how we can build them in a functional programming language, and on top of pipes, actually create some useful stream processing utilities. The language I will use will be OCaml, and if you are not familiar with the syntax and the language I can probably explain a few details, but, the important bit are the data structures and the way they compose, so you don't have to understand all the details. Now pipes, this is a pipe, it can be like any random command, it's not useful if you don't provide it any input, or output. So, pipes are essentially contained computations, they get something then produce something, they can be seen as functions, but the difference is that in this example here, you actually compose them, you have multiple pipes, and they work together, so, what can also happen if one of these things fails? So for example let's say that there's no such file, like there's no text files, word count would simply fail and sort and tail will not be called at all, right, or, if something happens in the middle, then the failing pipe will also kind of communicate this failure to other pipes. So, yeah, pipes can fail, and if a pipe fails, then the entire computation is down, it's a form of short circuit breaking mechanism implemented in them. But then again, just one single pipe is not useful, you need to connect them, you need to take another pipe, and potentially that pipe will produce some value, and the following pipe will take that value, do something with it, and produce an output and feed it into another pipe, so, yeah this is how pipes work in a more like theoretical sense, right, it's very clear, it's very simple, just boxes and arrows and stuff in between. So, usually when we speak about pipes, we say that the pipes that produce values are called upstream pipes, and the ones that consume values are called downstream pipes, so the flow of data comes from left to right, from upstream to downstream, and, lets try to define a type that will reflect this behaviour, this producing of values and this consuming of values and, again this circuit breaking behaviour too. So, this is a type definition in OCaml, and I will be using a, I will be using a union type essentially, and, this thing here called Yield is a constructor, so it essentially takes a string, and the star is for layers, so, it takes a string and another pipe, and creates the original pipe, and it's useful when you want to essentially say that this pipe here is producing some value b, and this value b in this case is a string, but at the same time what happens next? Next comes the pipe, the next pipe, so, we try to look at this, the abstractions in a more sequential way, we have this pipe, we've produced a value, and the following state of this particular pipe is contained here, which can be of course another Yield, so we can Yield a, b, c, d, whatever. And, on the opposite side we have this Await constructor that is waiting for some input, in this case it's a string. It gets this value a, does something in between, and then returns the next pipe and we don't know yet what it will be, so it can be something that takes a value and then produces another value for example, it can be a function that receives an argument like an integer and then increments the integer and so on. And again, at any point it can just stop, it can be Ready, it doesn't mean that it's a failure, so, it can be just like a return status of something, I'm done, I'm finished, I don't need any more input for example. So when we do head, in shell, it will eventually terminate, it stops when it gets the desired amount of lines. And with these types, with this type we can define a more generic type, so before, we were using just strings and pipes and integers to, you know, the return value of the entire pipe or entire composition of pipes, but it's of course more useful in programming when you have generic types, and, in this case this thing here and there and there are just type arguments that we provide to our pipe type, a is for input, b is for output and r is the Ready value, the result value. And, we can also define a few helper types, like producers, producers are pipes that are only allowed to be, they are just yielding, and we have also consumers, that essentially just await for results, there is no b here, so void essentially just blocks the pipe, it will check, using the compiler's type checker, and make sure that if we define something to be a producer, it can't await, it can just yield. And, the final type that we define is a pipeline, and a pipeline is like the entire composition of these intermediate pipes, and, you can think of it, looking at this type definition, as something that is not waiting for any input and is not producing any output, it just has the result value, and we need to ask it to give us this result value, so, this is the pipeline. Now I'm going to introduce a few helper functions, just to make sure that we can actually play with these types so, I think it kind of, it describes reasonably well our previous attempt at modelling pipes, so we can yield, produce values, we can await, we can return, but it's of course not enough, we still need to compose pipes, we need to make them work together. In this slide here, return is used to essentially place some value into a pipe as a Ready value, so, it will take the argument x and just say, here's the pipe that just returns Ready, and it's useful for many different things as I will show in the following examples. And, the second function is called bind or then, and it's useful to extract the final value from a pipe, so, since we wrap our final result with this Ready constructor, we need to have a way to actually extracting that value and potentially applying some transformation to it. So in this example for example we just created pipe foo, that has 42, and bar will try to extract the value from foo, which will be 42, and it goes here, and then you essentially return a new pipe, Ready pipe, that will contain 43. The syntax is simple, so fun is just for under functions, let defines functions or variables, match here, matches on the structure of, in this case the first argument, and returns, you know the value on this side. You don't have to again, understand all the details, it's just useful to go through them to show how simple they are, like there's nothing special happening here, just a few transformations, based on the structure of our types. Now, we can also make our life easier by saying that we have these functions, like empty, yield and await, and we are used to having them, let's say like in Python or other languages that provide core routines and, you usually just, you don't use like this uppercase Yield directly, the constructor, you use the helper function yield that will place the value in its right place, the same happens for await and empty. As the result of these few slides, we can already build streams like, in this case we're building a stream that yields x, y and z, and it just sits there so, the values are there, they're not doing anything with them, they are just waiting for us to be computed. And increment, increment is a very simple pipe that awaits for something, gets a number and yields this number incremented, and, it's just an encapsulated form of a computation, that is self-explanatory I think, and can be connected let's say with another pipe, and it will always just increment the number and pass it over, so it's like this tiny machine that is highly specialised in incrementing numbers. Now, let's go back to the Python example that I showed you during the first part of the talk, and, I explained how it works, and my question is, is what we have is enough to try to build this thing here? And the answer is yes, here's how you do it in OCaml, and, with the abstractions that we just created, so, we don't use while loops, we usually use recursion in functional languages to represent sequential computations. And, this means that in this case for example we defined a function called count, that will have like this helper loop function, that starts with a zero, then it gets the zero, yields the zero, and continues to loop, incrementing the number, so it's composing, it's incrementally and lazily composing this thing, because otherwise, if I called count, just count and it would just stack overflow, again because we're not checking any stopping conditions here. And the reason why it blocks is because this computation here, following the definition that we had before, is lazy in nature so, it will just produce one value and stop, and if there is a consumer that needs that value, then it will continue the loop, the same happens for the Fibonacci sequence. Well, in real life we don't usually use Fibonacci sequences don't we, so, we work with files, we work with lists, we work with other data structures so, in this case for example we're trying to define this list producer that takes a linked list, and based on its content, will try to yield the elements and if it's ready then it's empty and, empty is just Ready I think. So, let me again just explain the syntax, match input, input in this case is a list, and the double colon here is used to extract the head of the list, so we take the head, and xs will be like the remaining linked list, without the head, so we yield the head and we compose this computation with the same function recursively, so that we can keep yielding until the list is empty, at which point we just say empty. Another useful function is, another useful producer is this file function, and, as we can see it's a producer, so it will give us strings, and eventually terminate with some r, and this r here is defined by the entire computation, not just by this small function, so if we compose multiple pipes together then, that particular r will be specialised to be an int or a string or a list or something completely different. So, as I mentioned, producers are lazy, which means that this function will not be reading the entire content of the file into the memory, it will just read one single line and pass it over as yield. So, again we use a for loop, sorry we use a loop function, recursive loop function, we open input file, which gives us a channel, we read a line from that channel and yield it and continue to loop, and if End_of_file exception happens, then we just return and we say, close in channel. This is important because, if our file is empty we don't want to keep the opened handler for that file, and our producer will actually be able to clean that resource for us, we don't have to think about that. Now, I showed you, in the previous examples, somewhere here, yeah, how you can extract the list from the generators. We define this next function, and, I'm not sure if people who use Python actually know how this next function is implemented, unless you, of course, are involved in the design of the standard library or the language itself, so it seems to be quite an opaque function that just gets us the next value, but in our case we can see that it's really simple and, easy to understand so, next is defined as, we take the pipe as an argument, and we check the state. It's ready, then well, you don't have anything, if it's ready there is nothing to produce. It's yielding a, then, let's just take that a and send it to whoever requested it, and we also send the rest, rest is the next pipe, the next state of the pipe, that we can also check for more output. So, the state handling in this case is explicit while, in Python's example it was hidden by objects and, it was internally changed and so on, in this case it's like, absolutely explicit. And, if we are working with pipes that can both yield and await, how can you get something from a pipe that is blocked waiting for some input? You can't, and that's why you just fail, saying that, pipe requires more input, you can't get any output from it. Now, we defined a lot of different things, we defined, let's review it, we defined monad compositions that allow us to write these, you know, loops and compose the streams, produce more values, we also defined functions to read from files. And I will eventually implement functions that await for input and act as transformers or consumers of input, and essentially reduce the entire stream into a meaningful result for us, but before that we need to think about the fundamental property that was implicitly present here, in this slide. So, how do you compose pipes, how do you connect them together? And, in the shell example here, this pipe operator is used, so the pipe operator actually checks for the state of individual components here, and will act accordingly, based on that verification, and, we also need a similar feature, let's go and implement it. Yeah, so, here's the function that I called connect, that will take the upstream pipe, the pipe is producing something, and the downstream pipe, that is waiting for something, or potentially producing something too, but it's producing to downstream pipes too. So, to connect two pipes, we need to first of all check if our downstream pipe is ready, because if our last pipe is ready, we aren't actually interested in knowing what's happening before that, because we already have the value, and if we have the value we just return it, and we say, it's ready. But what if our downstream pipe is producing something? So we're trying to connect two pipes, and one of them is just producing values. Well, let it produce the value, we say yield b, which was contained in the constructor, and then, we connect the upstream pipe, this one here, with the next state of this one, which will allow us essentially to continuously, it just pushes the output from these two pipes into the next stage. The next step is our downstream pipe is waiting for something, our upstream pipe is producing something, seems like a perfect match, so we just take the value of b that the upstream is producing and place it into this continuation function, and then, we connect the next upstream with the result of this function, again, we just took the value from one, placed it into another, checked the result here and connected the entire computation again. At this point our upstream pipe is waiting for something, and this means that if we have a very complex computation here, that, this computation is just blocked, it's blocked because this pipe here is not producing any meaningful output. So what we can do, and remember that this coroutine, these pipes here are not pre-emptive, so we can't just say stop, or please do give us some output, no, it's now how it works, you fully trust your pipes and, if they decide to give you the output, then you act on that output. So in this case, we return another await, and once this pipe is ready, it will give us this value, and only after that we can try to connect these two pipes together, and of course continue the computation. And finally, if our producer, our upstream pipe, finished, so for example we are reading from a file and the file became empty, then, well it's ready, it's done so, we don't need to continue composing pipes. We also define these two nice operators to connect these pipes, and, just to reassemble the pipe operator that we saw in the Unix example. Another interesting property of this model that I just introduced is that, simple things are simple like, cat, the function that we use very frequently, it just waits for some input, and gives some output, so, it's awaiting for some input and it's giving some output, and, the interesting thing here is that the next stage of this pipe is another cat, so it's a loop that just reads and writes values, the same value, the received value over and over again. And, another implicit property of pipes and shell compositions is that, they obey to some very well defined rules, like, if you have cat and another programme, so let's say cat and had or whatever, it really doesn't change the semantics of the computation so, it will still get the first pipe here, and the order in which you perform this composition doesn't matter too so, this function here obeys to these rules too. And the second rule here is the associativity rule, which means that the order in which you compose multiple pipes, also doesn't matter, so you can perform pipe1, pipe2 first, and then pipe3 or, pipe2, pipe3 and then pipe1, and why is it useful? It's useful because it's just easier to reason about your system this way, because if this doesn't happen in your actual production code that, there's something very wrong about it, because if you compose this and then this, and it won't be equal to this composition, then well, there's just something undefined happening there. This connect function is not special, so, we just took two arguments and decide upon the structure of the pipes, how we want to produce the final result, it also means that I can get a list of pipes as an argument, and then based on that, perform some scheduling, implicit scheduling. And, more interesting modelling is possible with pipes, because you can even take, let's say two pipes, connect them together, merge their outputs into one single pipe, or multiplex one pipe in two different flows, so it really depends on the connexion function that you use, the composition function that you use, and the way you handle these structures. Now that we know how to connect pipes, let's see how we can transform their content, and, let's define the traditional map, filter and take combinators. So map, map is a recursive function, that rec there, do you know the function is recursive, and it will return a pipe with all generic arguments. It awaits for something, it gets this a value, and it applies f to this value before yielding the result. After that it just continues, in loop, so, in these computations, the continuation of the loop, the closing the loop is very important because you essentially, you keep awaiting for more input and keep producing more output. Filter, there's a slight change in type, as we can see the input type equals to output type in this example, and that's because filter really doesn't perform any changes to the input, it just filters out some of the elements, but their type is not changed. So, we wait for something, we get this a, we check if the a matches the predicate function that we passed, and then, what we do is yield it if it does, and we just call filter again if it doesn't, ignoring completely the a value. Take is also a very interesting example, and again it preserves the type, input and output are equal. And what it does, let's take a look at this part here, it waits for something, and it gets this input value, it yields this input value, and it continues the computation with n decremented by one, and once n gets to zero, then we stop, which means that this pipe here counts every single element that it gets, and it will stop, producing empty it will be ready, it will stop the entire computation, once it gets the desired number of elements. Finally, consumers, consumers take pipes, take entire pipelines actually as arguments, and that's the fundamental difference that they have with other combinators. So in this case for example, if we want to check the head of a string, just pick one element from it, what we do is we call the next function, and let's go back and see that next, just returns a, if it's yielding, and the next of the computation, and fails of course if it's not yielding. This means essentially that we took the yielding value from the pipe, and we returned it as a optional value in this case, and we don't really care about the rest of the pipe, because we just want the head, and of course if it's not there then there's no head, an empty pipe has no content of course. We can also define reduce, or fold, or left, for our producers, or pipes, so the interesting thing here is that we only take producer as an argument but, essentially our composition here allows us to treat multiple pipes as one single pipe, so the return type of this function is actually another pipe, so we take an arbitrary large number of pipes and say, here's another pipe, and it will be receiving the input from the first one and it will be producing the output from the last one, and doing all the transformations and filtering that's happening in between. Yeah so, reduce will be able to take that entire stream of data that we have, and keep calling the next element. It also gets this accumulator value, initial accumulator, and applies the reducing function, the function that will essentially take the accumulator, take the next value, and produce some next state of the accumulator. One way to define this collect function, which just takes this entire producer pipeline and gives us a list, is explicitly like writing this content here, but actually let's just ignore it and take a look at this definition here, which uses reduce. So here we abstract it completely away, all the pipes and types and stuff like that, so, we just say that we want to reduce our producer, with a function that takes a result, and the next element, some element. What we do is we, by using this cons operator that will essentially take a list, and take an element, it will create a new linked list with the x on its head, and this result initially will be just an empty list. So what actually happens is that, our reduce function gets this empty accumulator, it will check for the next element, it will call our function, so r will be our empty list, and we just place this x inside, and we keep adding these elements, until of course the reduce function decides that there are no more output and it just returns the list. But the problem is, we inverted the elements, because the string is producing result and our linked list keeps adding the elements, and what happens is that the first element produced is now the last element of the list, so we of course need to reverse the list. Is everything is clear? Cool. Now just a few examples, a few more examples. Yeah, in this case we just used our count function, that counts until infinity, take in this case will strictly limit the amount, the number of elements to a million, we filter all the odd elements and sum them up, and get this huge number. We can also of course apply some arbitrary computations in between before taking the elements, and the interesting thing here is that, if I omit this part here, collect, nothing will happen, like absolutely nothing. I can have, I have infinity here, but no computation will be performed at all because, well I didn't ask for anything, so collect will pull the elements from the string, and take will say that I'm ready once I have five elements, and yeah, you get this list as a result. Of course, I have this example that parses some, there's a mistake there, ignore drop. This example here just parses a log file, it searches for all root entries, takes the 100 elements and then just prints them, and, I didn't define grep for example, I didn't define each, but I assure you that it's really simple to do so, because you essentially just await for something, you check some condition, and if it matches then you yield, if it doesn't you do something else, so, my point here is that all these primitive functions, all these operations, small coroutines, can be defined for anything. You can have coroutines reading from ZeroMQ sockets, you can have coroutines writing to database handlers, whatever, and, all these rules, all these functions that we defined will still work for these computations. Questions? Cool so, this is the end of my talk and just to conclude, I showed you an implementation of a very simple, very very simple and extensible model for coroutines, so we essentially analysed the nature of coroutines, what is a coroutine, it's just something that suspends and can yield and await, so, we used that definition to build this core type for coroutines. An interesting property that this implementation has is that, pipes and coroutines are first-class, so you can take them, you can place them in lists, you can connect them together, you can build complex networks, you can have multiplexing, whatever, and it will still work because they are not special, they are just functions and types. And, it's also, as I mentioned, they are also, as I mentioned, completely generic, and you can use them with data structures, sockets, files and so on. One property that I mentioned initially about the file producer, is that it promptly finalises the resources, so once the file is empty, or once the socket is finished for example or once the database connexion was lost, you can actually close the resources that you were using, and, the entire computation doesn't have to worry about that, so you will still be able, in this case, to perform arbitrary computations, and that file will be closed. And more complex communication patterns are possible, it's literally just, the limit is your imagination and the requirements of your system of course, you can define custom connexion functions, you can perform more sophisticated safety checks, and so on. And yeah, this is the end. - Yes, that's true. You can define different composition functions that operate specifically for push or for pull, but in this case, it's just whoever does the first thing, so if you push then it's push, if it's pull then it's pull. These two arguments need to be inverted, the downstream goes here and the upstream goes here, thanks for pointing that out. - Yeah so, the question is essentially, when, what is the difference between cooperative and pre-emptive scheduling, or units of work as I call them and, what kind of trade-offs exist in choosing one or another, is that correct? - Yeah. - OK so, I think it depends on the nature of the work that you actually do inside your application and, pre-emptive scheduling is useful on the low level, so, the operating system needs to do pre-emptive scheduling, because otherwise, I mean, it will be a complete chaos if it just says, hey process A, go do something and when you're done, let me know. That's not how you write operating systems right, so, if it's something very low level, like processes or threads, then you absolutely need pre-emptive scheduling because, well, you need to impose order. But, in this case, these things here are used for high level, abstract, application-specific computations, which are abstracted from this low level world, and it also means that, well, if you're for example parsing some HTTP requests with one of these coroutines for example, then, do you actually want to suspend that computation to do something else, as a programmer? Probably not, and if the run time of the link, which as in Go for example, handles all this, you know, scheduling for you, then why not just let it do its work, right, so, high level functions and coroutines are just a better suited implementation for non-pre-emptive scheduling. - So, even in these simple examples, let's say that I forget take, I completely forgot about it, my reduce function will never stop, because it's trying to sum all odd numbers, forever, and what happens here, if I forget take, well, I'll allocate too much memory because I will be trying to create a linked list of an infinite size, right because count never stops. So it does mean that you really need to be careful about these things, but then if you have a for loop and you forget to check the termination condition, you have the same problem right? So, I think that, yes you need to be careful, and yes the fact that they have probably too much freedom, requires you to have this special care, but at the same time, it's intentionally very flexible. No more questions, OK. Well thank you, thanks again.