Scalaz 8 IO vs Akka.
What’s special about actors
Before we dive into technical comparisons, let’s first write down what is so special about actors, which might hint on the use-cases which are most naturally implemented with a “raw” actor, instead of e.g. using streams.
First of all, an actor encapsulates and manages state (statless actors are considered an anti-pattern). Access to the state is guaranteed to be serialized, so that the state is always accessed and changed by a single thread. An actor provides a “safe haven” for the data it manages.
Secondly, actors define a way to communicate between concurrently running processes, via message passing. Each actor is associated with a mailbox, to which incoming messages are enqueued and processed by the actor one-by-one in a first-in-first-out fashion. In other words, there’s a queue in front of each actor.
Finally, actors provide a way to manage errors. Not all errors have to be handled inside an actor; instead, errors can (and should!) be propagated to parent actors. The parent actor might have more context and decide what’s the best course of action: re-creating the child actor, stopping it, escalating, etc. This is known as supervisor hierarchies.
Action plan
We’ll go over some use-cases for actors and see if & potentially how they can be implemented using Monix / ZIO in three installments. This part covers the basics and state encapsulation. The next part will cover communication, and the final one error handling.
The goal is to keep the disussion close to “real life”, so each example is backed by runnable code, available on GitHub, implemented using all four approaches. The crucial code snippets are embedded in the article, but to test things interactively, it’s always useful to simply browse the code.
Rate limiter
Let’s start with an example which requires protected access to some non-trivial state. The goal will be to implement a rate limiter: we want to run a side-effecting computation (e.g. sending an HTTP request) so that in any
perMillis
time window, at most maxRuns
computations are started. (Note that we only count the start of the computation, not the when the computation finishes; due to e.g. network latencies the target system might see at times a slightly different rate of requests.)
All of the implementations will use the same data structure to store a queue of waiting computations and calculate when they can be run. The
RateLimiterQueue
case class includes:waiting
: a FIFO queue of computations waiting until the rate limit allows them to be startedlastTimestamp
: a list of timestamps at which computations in the current time window (which isperMillis
wide) have been started
Apart from enqueueing a new request, we can compute which requests (if any) should be run given the current timestamp. The result of the
def run(now: Long): (List[RateLimiterTask[F]], RateLimiterQueue[F])
method is:- a list of tasks, where each task can be either running a computation, or scheduling an invocation of
run
in the future (when the rate limit will allow computations to be started)
- as the
RateLimiterQueue
is immutable, an updated copy of the data structure, with modifiedlastTimestamps
andwaiting
queues
Using Akka
First, let’s use the above
RateLimiterQueue
data structure to implement a rate limiter using pure, “traditional” Akka. As we are in the Akka ecosystem, computations will be represented as Future
values. However, as a Future
is a computation that is already running (eagerly), we have to make it lazy and make sure that it’s only constructed once the rate limiter allows it to be run.
This also requires attention from the user of the code, so that instead of passing an already created
Future
instance (which is a running computation), only blocks which lazily create a Future
are used as arguments.
The actor will accept two messages:
LazyFuture
— for scheduling a computationScheduledRunQueue
— for scheduled invocations ofrun
The state of the actor consists of a single mutable variable, which holds the current rate limiter queue. Hence, even though
RateLimiterQueue
itself is immutable, the reference to the current queue is mutable, and will change over time. Since the state is encapsulated with an actor, this is a safe thing to do.
The code of the actor is quite straightforward; upon receiving a message a computation is enqueued or the
scheduled
flag cleared and a check for computations to run is being invoked:
Note that this “traditional” Akka actor doesn’t specify anywhere what type of messages it accepts; it’s possible to send it any message, at the risk of the message being not handled and discarded. The knowledge of what messages an actor accepts must be passed in another way, through documentation or an implied protocol.
Now that we have an actor, we still need a way to create the actor instance and schedule computations:
The rate limiter can be created given an
ActorSystem
. The created ActorRef
is wrapped in an AkkaRateLimiter
instance, with a convenient interface for running a rate-limited computation. runLimited
uses a by-name — and hence lazily computed — Future
parameter.
The result of the
runLimited
method has to be a (running) Future
, which will be completed only once the rate limiter allows the computation to start and complete. That’s why we create an intermediate Promise
; the computation that will be run by the rate limiter first runs f
, and then completes p
with its result (the operations are sequenced using andThen
).Using Akka Typed
If we were to write the akka-actor solution 5 years ago, the code would be more or less the same. But, things have changed since then; it’s about time to address the lack of type safety in actors.
There were several attempts to do so, but akka-typed seems to finally have a chance to mature to a stable solution.
The first crucial difference when using akka-typed is that we don’t write actor code, instead we define the actor’s behavior. The behavior is s a specification, written in Scala, of how the actor should behave — how it should handle incoming messages.
The result of message-receiving code should be new actor behavior, specifying what to do next. This takes the
become
construct known from “traditional” actors to a new level, making it a basic building block.
The actor’s behavior can be parametrized with the actor’s state. This way we can avoid using mutable actor state (variables) altogether, while keeping the most crucial actor feature: serializing and protecting access to the actor data. After receiving the message, the state can be changed, and new behavior — parametrized with the new state — returned.
The second difference is that each behavior has a type parameter, specifying the type of messages that the actor can handle. In our case, there are two such messages — exactly the same as before — but we need to introduce a common parent trait:
The core logic of the actor doesn’t diverge much from the “traditional” approach. The main difference is that we are calling the same behavior-creating method (
rateLimit
) recursively, after each handled message modifying the RateLimiterQueue
and passing it to subsequent invocations:
As before, in the
runQueue
method, tasks obtained from the RateLimiterQueue
are run in a side-effecting fashion: either forcing the lazy Future
s or scheduling a timer. Note that the timer comes from a special behavior factory method (see Behaviors.withTimers
below), and is also parametrized with the type of messages that can be sent. The timer is always bound to a specific actor and can schedule messages for this actor only.
But having a behavior is not enough; it’s just a description. We can create loads of behavior instances, and nothing will happen; we still need to create an actor:
To create an actor, we either need to create a new typed
ActorSystem
(as is the case here), or another typed actor has to spawn a (typed) child actor. This is different from traditional Akka, where an actor system could be used to create a number of actors. Here, there can only be one top-level actor.
In “real life” the actor would probably be created from a parent actor, but for demo purposes in
create
we are creating a new ActorSystem
, passing the initial behavior and a name. Once the actor is created, we get a typedActorRef
(an ActorSystem
is also an ActorRef
), which — as everything — is parametrized with the type of messages that can be sent to the actor. That way, the !
(tell
) method no longer accepts Any
, but instead only messages which can be handled by our actor.
We are also using the same way of creating a
Promise
and from it a Future
which will be completed once the rate-limited computation is done.Using Monix
Let’s depart the comfortable world of actors and move to slightly less known territory, but one that is very rapidly evolving. We’ll start with Monix. The basic construct is the
Task
data type, which represents a description of a computation. Unlike a Future
, which represents a running computation, a value of type Task
doesn’t do anything by itself, in a similar fashion like creating a value of type Behavior
didn’t create the actor. However, when run, a value of type Task[A]
will produce a single value of type A
, or an error.
The important part here is that the
Task
can be run multiple times (or no at all), each run will be independent and will run the side effects as described by the construction of that particular Task
. A convenient analogy is to think about a Task
as a lazy Future
. (In the code above already had a very simple LazyFuture
implementation, as a wrapper for () => Future[T]
; Task
is in reality much more complex and optimized for composition. But as the example shows, such a construct is useful even in the Akka-world!)
The various methods on the
Task
companion object and the methods on Task
instances allow describing complex processes running concurrently. How to use them to create our rate-limiter example?Note that our goal isn’t to imitate actors using Monix or ZIO. Instead, we want to solve the same problem that is being solved by the actor implementation, using the most natural approach given by a different set of tools.
We definitely need a process running in the background which:
- accepts new computations to be run
- queues them
- when the rate limit allows, runs as much as possible.
Communicating using asynchronous message passing works great for concurrent processes, so let’s stick to that.
One of the tools available in Monix is
MVar
, which is a container for a single value. That value can be accessed through take
and put
operations. Taking a value empties the MVar
if it’s full, and blocks/suspends (asynchronously, without blocking threads) otherwise. Putting a value works the other way round: blocks if the MVar
is full, and stores a value otherwise.
This makes
MVar
an asynchronous blocking queue of size 1 (with simple back-pressure, as the operations are blocking), but that’s sufficient for our use case. We’ll use an MVar
to communicate with a background process which manages which computations should be run and when. The messages passed through the MVar
will be analogous to the ones used in the akka implementations:
Note that to pass a computation to the rate-limiter, we are using a
Task
instead of a Future
, as we’ve moved from Akka-land to Monix-land. The Task
is already lazy, it’s enough to simply store the reference passed by the user. It’s also safer: there’s no danger of prematurely running the computation by accident, as was the case with Future
.That safety property is a consequence of the often cited referential transparency, meaning (in this context) that it doesn’t matter (putting memory allocation considerations aside) if we use an already created value as a function argument, or if the value is created as part of the invocation, or if the value is created lazily when needed by the function.
The rate limiter logic is again a recursive function, parametrized with the current
RateLimiterQueue
instance, similar as in the akka-typed implementation. Additionally, we also have a reference to a queue: MVar[RateLimiterMsg]
, which will be used to communicate with the outside world (note that queue is used twice here in different meanings: once as the queue for rate limited computations encapsulated by RateLimiterQueue
, and once as the 1-element queue of incoming messages which orchestrates the whole process):
Going step-by-step: (1) first we
take
a message from the MVar
, and (2) change our rate limiter data structure accordingly (enqueueing a new computation or clearing the scheduled
flag). Then, (3) we run
the rate limiter queue, obtaining a list of tasks and a new RateLimiterQueue
instance.
Now we have to execute side-effects appropriate for each task (4). However, we can’t simply “execute” the side-effects — in the Monix/ZIO world that’s forbidden! Instead, once again we create a description of how these side-effects should be run — somewhere far off in the future, when the interpreter for the
Task
is actually run. This description is created by composing smaller descriptions into one big value, which describes the whole process.
Each rate-limiter-task is converted into a Monix-
Task
(it seems that name clashes are inevitable, sorry!): a rate-limiter-Run
task is simply unwrapped into a Monix-Task
(remember that we were storing the Task
that the user submitted, which is already lazy), and a rate-limiter-RunAfter
task is converted to a Monix-Task
which sleeps for the given amount of time, and then puts a ScheduledRunQueue
message on the queue. Each rate-limiter-task converted to a Monix-Task
is then forked into a fiber (5) using Task.fork
.
What’s a fiber? It’s easiest to think about it as a lightweight thread. Given a
compute: Task[A]
, compute.fork
results in a Task[Fiber[A]]
: a descriptionof a Task
which, when run (and only then!):- will start running the
compute
task asynchronously, in the background - will return a
Fiber[A]
to the parent (forking) process, which can be used to wait for the background computation to complete (using thejoin: Task[A]
method), or cancel it (using thecancel: Task[Unit]
method).
Because fibers can sleep or wait for external resources, a small number of threads can be used to run a large number of fibers.
Doesn’t that sounds familiar? Yes — there’s a strong similarity with actors. Actors are also lightweight processes, and a large number of actors can be run on a small number of threads. Both fibers and actors can wait without blocking the thread on which they are running. However, unlike an actor, where the process is coupled with a queue and state, in Monix these are separate: creating a lightweight process is just another combinator (
fork
), and state/queues can (and have to) be managed independently.
Coming back to the code: each rate-limiter-task converted to Monix-
Task
is forked (5) so that it runs in the background and doesn’t block the process which consumes messages from the queue
. Finally, we sequence (6) the list of all such forked computations into one big computation (converting a List[Task[Fiber]]
into a Task[List[Fiber]]
— the same thing that Future.sequence
does), forget the results (we don’t need the fiber instances here) and (7) recursively call the runQueue
method, using the updated RateLimiterQueue
data structure.
Keep in mind, that all that we are doing is creating process descriptions. Nothing happens yet — no recursion, forking, etc. We only get back a data structure, which describes what should happen, then the task is executed.
What about bootstrapping the whole process, and providing a nice interface for the user?
Creating an
MVar
is itself a side-effecting computation (as we need to allocate the reference), and it results in a Task
. Once we have the queue
instance in the create
method, we can create the Task
which describes running the rate-limiting process, and fork it into a background fiber.
The instance that is being returned to the user,
MonixRateLimiter
, upon receiving a Task
to be rate-limited creates an MVar
, but for another purpose — here it’s used similarly to scala.concurrent.Promise
. That MVar
instance (mv
) will be filled with the result of the computation, once it is run by the rate limiter. Note that the task that is passed to the rate limiter contains that logic (f.flatMap(mv.put)
): it is a description of a computation, which first runs f
, and then puts the result into mv
. The task that is being returned — mv.take
— will block (again, asynchronously) until the computation is done.
However, so far all that we’ve been doing is creating descriptions. When do we actually run things? As late as possible! The general goal should be to create, using composition, larger and larger descriptions of the logic of our program.
But in the end, be it in tests or the
main
method of our application, we need to run the side effects. That is possible using the run*
methods on Task
, such as Task.runSync
or Task.runAsync
— which run the described computations and block, or return a Future
. In our case, these methods are used in the tests.Using ZIO
Finally, let’s move to an implementation using ZIO
IO
. If you look at the code, you’ll notice that it’s surprisingly similar to the Monix version. And that’s not a coincidence. While Monix has been around for some time now, the relatively recent development of ZIO brought a fresh set of ideas to the table, and the two projects started competing both on the performance and functionality side, resulting in implementations that are distinct in a couple of key areas, but overall quite similar.
Despite the scalaz origins, ZIO, as a stand-alone project, can be used both with Scalaz 7, Scalaz 8, Cats, or even without any of them. In fact in the example project, just for the fun of it, we’re using ZIO with Cats to get convenient methods such as sequence_
.
The basic ideas behind an
IO
value is the same as in Monix: it’s a description of a computation which, when run, can yield a value or throw an error. However, there’s a significant difference: while Monix’s Task
takes a single type parameter, here we have two: IO[E, A]
. The first type parameter specifies the types of errors, which the computation can return. The second — the type of the value that will be produced. If that sounds similar to an Either
— it is: it’s as if you stacked Either
over a Task
, but without the performance and memory penalties. A blog by John de Goes explores this topic further.
Of course, it’s not possible to guarantee that the code inside an
IO
won’t throw an arbitrary exception — but the idea behind the design of IO
is to divide errors into recoverable errors — the ones that are expected, and which are of type E
, and programming defects or catastrophic errors, which are not recoverable, and are caused either by a programming error or a VM problem, such as running out of memory. Again, there’s a John de Goes blogdiving into that design.
This gives us additional possibility to enhance type safety. Not only we can describe what type of value will be produced, but also which errors are expected, but not handled. For example, a value of type
IO[Nothing, A]
specifies that this is a description of a computation which, when run, can’t result in errors, and will produce a single A
value. There is no way to convey this information through a Task
or Future
.
Otherwise, for the purposes of this example, the concepts are analogous as in Monix. In ZIO we can also use the
fork
method to run a computation in the background, obtaining a Fiber
(which also has two type parameters). A minor annoyance is that sometimes the type inferencer can’t infer the correct error type and it needs to be given explicitly — especially for methods which are polymorphic in the error type (e.g. IO.sleep
can produce an IO which can result in any type of error). Scala is even more reluctant to infer Nothing
, causing some additional pain when this “no-error” type is used.
Some of the method names are different, but otherwise the implementation of
runQueue
is almost the same:
The thing that is different is that there’s no construct analogous to
MVar
in ZIO. Instead, to communicate with the outside world, there’s a back-pressured IOQueue
, which is a bounded queue backed by IO
. We can enqueue messages using the offer
method (for example, above we enqueue ScheduledRunQueue
instances) and dequeue using the take
method.
What about creating the rate limiter and providing the user with an interface? Again, things are quite similar as before:
One important difference is again related to the absence of
MVar
. Instead, we have at our disposal a (ZIO) Promise
, a concept known from the scala.concurrent.Future
-world, but this time backed by IO
. Again, as all of the computations are lazy, we can create a description of a computation which will first run the given IO
(f: IO[E, T]
), and then complete the p: Promise
using it. Once that is ready, we enqueue a Schedule
message, which will then be taken off the queue by the background process.
Another difference comparing to the Monix version is that we have to make a hard choice and decide what the capacity of the queue going to be. This can be as low as 1 (effectively making the
IOQueue
an MVar
), or a higher value, if we suspect that this might improve the performance.
At “the end of the world” — which might be our
main
method or tests — we’ll need to run the computations executing the side-effects. There are no methods on IO
directly, but instead we need to create an object which mixes in the RTS
trait. This gives us access to def unsafePerformIO[E, A](io: IO[E, A]): A
and unsafePerformIOAsync
, and a couple of other variants.
John De Goes, the author of ZIO, also did a more high-level talk on how
IO
can replace Akka’s actors during Scalar 2018. You’ll probably notice that in his example, when modelling an actor, he manages the actor’s internal state explicitly, using an IORef
. While that is a possibility, here we don’t even need that, as it’s easier to just use recursion.
Reference:
Comments
Post a Comment