class: center, middle # Akka and Concurrency Models Jan Ypma jyp@tradeshift.com --- # Agenda - Threads and locks - Futures and callbacks - Actor model - Akka streams - Akka HTTP --- # Parallellism - Actually run several units of work simultaneously -- # Concurrency - Logically run several units of work, seemingly in parallel --- # Threads and mutexes - **Thread** : logical thread of execution, scheduled by OS scheduler - All threads share one memory space - **Mutex** : _mutually exclusive_ flag that prevents threads from accessing shared resources -- Advantages - Close to how the hardware actually works - The only model generally tought in school - Blocking on I/O natually equivalent to "this, then this" Disadvantages - Expensive (~1M stack) - Slow to create/destroy - Deadlock --- # Futures and callbacks - **Future** is a handle to a calculation which _eventually_ yields a result - Scala, Finagle, Q.js, ... : _Future_ / _Promise_ - Java 8: `CompletionStage
` / `CompletableFuture
` - Can be chained, causing further processing (and further futures) after a stage completes - Can be merged, waiting for several futures' results before commencing ``` CompletionStage
= http.request(GET("/tweets")) .thenApply(response -> response.getBody()) .thenApply(body -> parseJSON(body)) .thenCompose(tweets -> http.request(PUT("/subscribe?topic=" + tweets.getMostPopular()))) .thenApply(response -> Done.getInstance()) .whenComplete((r,x) -> { if (x != null) { log.error("Well something went wrong", x); } }); ``` --- # Futures and callbacks Advantages - Simple to grasp Disadvantages - only one value: no obvious way to stream data - "callback hell" - brittle error handling - not (easily) cancellable - still needs locking for shared resources (unless using a single thread) --- # Cores and caches - Latency Comparison Numbers ```text L1 cache reference 0.5 ns Branch mispredict 5 ns L2 cache reference 7 ns Mutex lock/unlock 25 ns Main memory reference 100 ns Compress 1K bytes with Zippy 3,000 ns *Context switch 10,000 ns (if working set in L2) Send 1K bytes over 1 Gbps network 10,000 ns *Context switch >100,000 ns (bigger working set) Read 4K randomly from SSD* 150,000 ns Read 1 MB sequentially from memory 250,000 ns Round trip within same datacenter 500,000 ns Read 1 MB sequentially from SSD* 1,000,000 ns Disk seek 10,000,000 ns Read 1 MB sequentially from disk 20,000,000 ns Send packet CA->Netherlands->CA 150,000,000 ns ``` - Keep working on the same data for as long as possible --- # The actor model - **Actor** is an entity that responds only to messages by - sending messages to other actors - creating other, child, actors - adjusting its behaviour - *Akka* is a toolkit for writing actors in Java - Actor is a normal Java class that extends `UntypedActor` or `AbstractActor` - Native network transparency, clustering, event sourcing, ... - **Message** is an immutable, serializable, Java class - Parent actor is the _supervisor_ of its child actors. On child actor failure, parent decides what to do: - Restart child - Stop child - Escalate --- # Actor ping pong ```java public class PingActor extends AbstractActor { private int counter = 0; private ActorRef pongActor = getContext().actorOf( Props.create(PongActor.class), "pongActor"); { receive(ReceiveBuilder .match(Initialize.class, msg -> { log.info("In PingActor - starting ping-pong"); pongActor.tell("ping", getSelf()); }) .match(String.class, msg -> { log.info("In PingActor - received message: {}", message); counter += 1; if (counter == 3) { getContext().system().shutdown(); } else { getSender().tell("ping", getSelf()); } }) .build()); } } ``` --- # Actor ping pong ```java public class PongActor extends UntypedActor { public void onReceive(Object message) { if (message instanceof String) { log.info("In PongActor - received message: {}", message); getSender().tell("pong", getSelf()); } } } ``` --- # The actor model Advantages - Share-nothing approach comes natural - Straightforward unit testing - Much more lightweight (100s of bytes) and performant (ForkJoinPool) than threads - Trivially extends to a distributed system - Ability to "just let it crash" in isolated areas Disadvantages - No obvious way to stream data --- # Backpressure - Imagine an HTTP request with a very long response, that reads 1.000.000 rows from a DB *Using Threads* -- - Just block while reading (waits for more data), and block while writing (TCP send buffer full) *Using Actors* - Let's have a "Request Actor" and a "Database Actor" 1. Request Actor messages Database Actor to requests the rows 2. Database Actor sends back 1.000.000 messages as fast as it can? -- - Needs some sort of `Ack` protocol between Request Actor and Database Actor, or batching *Streams* - What if we could combine the advantages of asynchronous messaging and add in backpressure? --- # Introducing Akka Streams - `Graph` is a *blueprint* for a closed, finite network of *stages*, which communicate by streaming elements - `GraphStage
` is one processing stage within a graph, taking elements in through zero or more *Inlets*, and emitting through *Outlets* - It's completely up to the stage when and how to respond to arriving elements - All built-in graph stages embrace _backpressure_ and _bounded processing_ #### Mostly used graph stages - `Source
` has one outlet of type `T` - `Sink
` has one inlet of type `T` - `Flow
` has one inlet of type `A` and one outlet of type `B` - `RunnableGraph
` has no inlets or outlets #### Reactive streams - Akka is a reactive streams implementation (just like `RxJava` and others) - You typically don't interact in terms of _publisher_ and _subscriber_ directly --- # Hello, streams ``` final ActorSystem system = ActorSystem.create("QuickStart"); final Materializer materializer = ActorMaterializer.create(system); final Source
numbers = Source.range(1, 100); final Sink
> print = Sink.foreach(i -> System.out.println(i)); final CompletionStage
done = numbers.runWith(print, materializer); // Output: // 1 // 2 // ... ``` --- # Stream materialization - _Graph_ is only a blueprint: nothing runs until it's given to a _materializer_, typically `ActorMaterializer` - All graph stages are generic in their materialized type `M` - Graph can be materialized (`run`, `runWith`) more than once ``` class Source
{ // A graph which materializes into the M2 of the sink (ignoring source's M) public RunnableGraph
to(Sink
sink); // Materializes, and returns the M of the sink (ignoring this source's M) public
M2 runWith(Sink
sink, Materializer m) { ... } // A graph which materializes into the result of applying [combine] to // this source's M and the sink's M2 public
RunnableGraph
toMat(Sink
sink, Function2
combine); } class RunnableGraph
{ public M run(Materializer m); } ``` --- # Reusable pieces - `Source`, `Sink` and `Flow` are all normal, immutable objects, so they're ideal to be constructed in reusable factory methods: ``` public Sink
> lineSink(String filename) { Sink
> file = FileIO.toPath(Paths.get(filename); // Let's start with some strings return Flow.of(String.class) // Flow
// Convert them into bytes (UTF-8), adding a newline // We now have a Flow
.map(s -> ByteString.fromString(s + "\n")) // Send them into a file, and we want the IOResult of the // FileIO sink as materialized value of our own sink .toMat(file), Keep.right()); } numbers.runWith(lineSink("numbers.txt"), materializer); ``` --- # Time-based processing ``` final Source
numbers = Source.range(1, 100000000); final Sink
> print = Sink.foreach(i -> System.out.println(i)); final CompletionStage
done = numbers .throttle(1, Duration.create(1, TimeUnit.SECONDS), 1, ThrottleMode.shaping()) .runWith(print, materializer); ``` - This does what you expect: print one message per second - No `OutOfMemoryError`, akka buffers only as needed: _backpressure_ --- # Example Sources - Materialize as `ActorRef` ``` Source
s = Source.actorRef(10, OverflowStrategy.fail()); ``` - Materialize as reactive `Subscriber
` ``` Source
> s = Source.asSubscriber(); ``` - Read from a reactive `Publisher
p` ``` Source
s = Source.fromPublisher(p); ``` - Emit the same element regularly ``` Source
s = Source.tick(duration, duration, element); ``` --- # Example Sinks - Send to `ActorRef` ``` Sink
s = Sink.actorRef(target, "done"); ``` - Materialize as reactive `Publisher
` ``` Sink
> s = Sink.asPublisher(WITH_FANOUT); ``` - Materialize into a `java.util.List` of all elements ``` Sink
> s = Sink.seq(); ``` --- # Example source and flow operators - Send `Source
src` to an additional `Sink
sink` ``` Source
s = src.alsoTo(sink); ``` - Process batches of concatenated strings, but only if coming in too fast ``` Source
s = src.batchWeighted(1000, s -> s.length(), s -> s, (s1,s2) -> s1 + s2); ``` - Process 1 seconds' worth of elements at a time, but at most 100 ``` Source
, M> s = src.groupedWithin(100, Duration.create(1, SECONDS)); ``` - Invoke a `CompletionStage` for each element, and resume with the results in order ``` CompletionStage
process(String s) { ... } Source
s = src.mapAsync(this::process); ``` --- # Comparison of the four models ![Table](table.svg) --- # Akka HTTP (server) ``` final Route route = get(() -> path("/", () -> complete("Hello, world!") ) ); final Flow
handler = route.flow(system, materializer); final CompletionStage
binding = Http.get(system).bindAndHandle( handler, ConnectHttp.toHost("127.0.0.1", 8080), materializer); ``` - `Route` contains asynchronous processing of an HTTP request - Delegates to nested inner routes - Convertable to a `Flow
` in order to become a real HTTP server --- # Directives - The `get`, `path`, `complete` methods are called _directives_ - *Directive* is a method that returns a `Route` ``` /** Invokes the inner route if the request has GET as method */ public Route get(Supplier
inner); /** Invokes the inner route if the remaining path exactly is [p] */ public Route path(String p, Supplier
inner); /** Invokes the inner route if the first path segment is [segment] */ public Route pathPrefix(String segment, Supplier
inner); /** Invokes the function with the next path segment */ public Route pathPrefix(PathMatchers.segment(), Function
inner); /** Completes the request with 200 OK and the given body as UTF-8 */ public Route complete(String body); ``` - Invoking the directive method doesn't actually handle any HTTP requests (all directives are just factories for `Route` instances) --- # Marshalling - Unmarshalling a request body into a Jackson-annotated class ``` Route example = post(() -> entity(Jackson.unmarshaller(Person.class), person -> complete("Hello, " + person.getName()) ) ); ``` - Marshalling a Jackson-annotated class into a response body ``` Route example = get(() -> completeOK(thePerson, Jackson.marshaller()) ); ``` --- # Query and path parameters - Extracting several parameters for `?q=foo&q=bar&limit=100` ``` Route example = parameterOptional(INTEGER, "limit", limit -> // type Optional
parameterList("q", q -> // type List
complete("You gave q " + q + " and limit " + limit) ) ); ``` - Extracting a UUID path parameter for `/documents/{uuid}/summary` ``` Route example = pathPrefix("documents", () -> pathPrefix(UUIDSegment, uuid -> // type UUID path("summary", () -> complete("You hit document " + uuid) ) ) ); ``` --- class: center, middle # No annotations, no reflection, no AOP # NO MAGIC! --- # Other directives - Complete route when a `CompletionStage f` succeeds ``` Route r = onSuccess(f, result -> complete("result was " + result)); ``` - Handle incoming web socket request ``` Flow
handler = ... Route r = handleWebSocketMessages(handler); ``` # Important other topics - Exception handling - Rejections and route matching --- # Onwards - Current topics of development - Performance optimizations - Stabilization of akka http - Non-blocking marshalling - Want to get involved? - Get: [http://akka.io/](http://akka.io/) - Read: [http://doc.akka.io/docs/akka/current/java.html](http://doc.akka.io/docs/akka/current/java.html) - Chat: [https://gitter.im/akka/akka](https://gitter.im/akka/akka) - Hack: [https://github.com/akka/akka/issues?q=label:community](https://github.com/akka/akka/issues?q=label:community)