class: center, middle # Akka Workshop Jan Ypma jyp@tradeshift.com .smallright[ This presentation: http://jypma.github.io/20161025-ts-workshop ] --- # Today's agenda - Four Java concurrency models .small[[9:30]] - Exercises 1. Create a chat web application using akka .small[[10:30]] 2. Add persistence so we don't lose data all the time .small[[12:00]] 3. Add scalability and integration .small[[14:00]] 4. Streaming integration .small[[15:30]] ### First things first ```php mkdir workshop-1 cd workshop-1 wget "https://jypma.github.io/20161025-ts-workshop/workshop-1-akka.tgz" tar xzvf workshop-1-akka.tgz ./sbt compile #or just "sbt compile" if installed ``` - This presentation: http://jypma.github.io/20161025-ts-workshop - Akka guide: [http://doc.akka.io/docs/akka/2.4/java.html](http://doc.akka.io/docs/akka/2.4/java.html) - Akka JavaDoc: [http://doc.akka.io/japi/akka/2.4/](http://doc.akka.io/japi/akka/2.4/) - IntelliJ user? Install **Scala** and **SBT** plugins --- class: center, middle # Four Java concurrency models --- # Parallellism - Actually run several units of work simultaneously -- # Concurrency - Logically run several units of work, seemingly in parallel --- # 1. Threads and mutexes - **Thread** : logical thread of execution, scheduled by OS scheduler - All threads share one memory space - **Mutex** : _mutually exclusive_ flag that prevents threads from accessing shared resources -- Advantages - Close to how the hardware actually works - The only model generally tought in school - Blocking on I/O natually equivalent to "this, then this" Disadvantages - Expensive (~1M stack) - Slow to create/destroy - Deadlock --- # 2. Futures and callbacks - **Future** is a handle to a calculation which _eventually_ yields a result - Scala, Finagle, Q.js, ... : _Future_ / _Promise_ - Java 8: `CompletionStage
` / `CompletableFuture
` - Can be chained, causing further processing (and further futures) after a stage completes - Can be merged, waiting for several futures' results before commencing ``` CompletionStage
= http.request(GET("/tweets")) .thenApply(response -> response.getBody()) .thenApply(body -> parseJSON(body)) .thenCompose(tweets -> http.request(PUT("/subscribe?topic=" + tweets.getMostPopular()))) .thenApply(response -> Done.getInstance()) .whenComplete((r,x) -> { if (x != null) { log.error("Well something went wrong", x); } }); ``` --- # 2. Futures and callbacks Advantages - Simple to grasp Disadvantages - only one value: no obvious way to stream data - "callback hell" - brittle error handling - not (easily) cancellable - still needs locking for shared resources (unless using a single thread) --- # Cores and caches - Latency Comparison Numbers ```text L1 cache reference 0.5 ns Branch mispredict 5 ns L2 cache reference 7 ns Mutex lock/unlock 25 ns Main memory reference 100 ns Compress 1K bytes with Zippy 3,000 ns *Context switch 10,000 ns (if working set in L2) Send 1K bytes over 1 Gbps network 10,000 ns *Context switch >100,000 ns (bigger working set) Read 4K randomly from SSD* 150,000 ns Read 1 MB sequentially from memory 250,000 ns Round trip within same datacenter 500,000 ns Read 1 MB sequentially from SSD* 1,000,000 ns Disk seek 10,000,000 ns Read 1 MB sequentially from disk 20,000,000 ns Send packet CA->Netherlands->CA 150,000,000 ns ``` - Keep working on the same data for as long as possible --- # 3. The actor model - **Actor** is an entity that responds only to messages by - sending messages to other actors - creating other (child) actors - adjusting its behaviour - *Akka* is a toolkit for writing actors in Java - Actor is a normal Java class that extends `UntypedActor` or `AbstractActor` - **Message** is an immutable, serializable, Java class - Parent actor is the _supervisor_ of its child actors. On child actor failure, parent decides what to do: - Restart child - Stop child - Escalate --- # Actor ping pong ``` public class PongActor extends UntypedActor { public void onReceive(Object message) { if (message instanceof String) { System.out.println("In PongActor - received message: " + message); getSender().tell("pong", getSelf()); } } } ``` --- # Actor ping pong ``` public class Initialize {} public class PingActor extends AbstractActor { private int counter = 0; private ActorRef pongActor = getContext().actorOf( Props.create(PongActor.class), "pongActor"); { receive(ReceiveBuilder .match(Initialize.class, msg -> { System.out.println("In PingActor - starting ping-pong"); pongActor.tell("ping", getSelf()); }) .match(String.class, msg -> { System.out.println("In PingActor - received message: " + message); counter += 1; if (counter == 3) { getContext().system().shutdown(); } else { getSender().tell("ping", getSelf()); } }) .build()); } } ``` --- # Actor ping pong ``` public static void main() { ActorSystem system = ActorSystem.create(); ActorRef pingActor = system.actorOf(Props.create(PingActor.class)); PingActor.tell(new Initialize()); } ``` Output: ``` In PingActor - starting ping-pong In PongActor - received message: ping In PingActor - received message: pong In PongActor - received message: ping In PingActor - received message: pong In PongActor - received message: ping In PingActor - received message: pong ``` --- # The actor model Advantages - Share-nothing approach comes natural - Straightforward unit testing - Much more lightweight (100s of bytes) and performant (ForkJoinPool) than threads - Trivially extends to a distributed system - Ability to "just let it crash" in isolated areas Disadvantages - No obvious way to stream data --- # Backpressure - Imagine an HTTP request with a very long response, that reads 1.000.000 rows from a DB *Using Threads* -- - Just block while reading (waits for more data), and block while writing (TCP send buffer full) *Using Actors* - Let's have a "Request Actor" and a "Database Actor" 1. Request Actor messages Database Actor to requests the rows 2. Database Actor sends back 1.000.000 messages as fast as it can? -- - Needs some sort of `Ack` protocol between Request Actor and Database Actor, or batching *Streams* - What if we could combine the advantages of asynchronous messaging and add in backpressure? --- # 4. Akka Streams - `Graph` is a *blueprint* for a closed, finite network of *stages*, which communicate by streaming elements - `GraphStage
` is one processing stage within a graph, taking elements in through zero or more *Inlets*, and emitting through *Outlets* - It's completely up to the stage when and how to respond to arriving elements - All built-in graph stages embrace _backpressure_ and _bounded processing_ .floatright[![Types](graphtypes.svg)] #### Mostly used graph stages - `Source
` has one outlet of type `T` - `Sink
` has one inlet of type `T` - `Flow
` has one inlet of type `A` and one outlet of type `B` - `RunnableGraph
` has no inlets or outlets --- # Hello, streams ![Graph](hellostreams.svg) ``` final ActorSystem system = ActorSystem.create("QuickStart"); final Materializer materializer = ActorMaterializer.create(system); final Source
numbers = Source.range(1, 100); final Sink
> print = Sink.foreach(i -> System.out.println(i)); final CompletionStage
done = numbers.runWith(print, materializer); // Output: // 1 // 2 // ... ``` --- # Time-based processing ``` final Source
numbers = Source.range(1, 100000000); final Sink
> print = Sink.foreach(i -> System.out.println(i)); final CompletionStage
done = numbers .throttle(1, Duration.create(1, TimeUnit.SECONDS), 1, ThrottleMode.shaping()) .runWith(print, materializer); ``` - This does what you expect: print one message per second - No `OutOfMemoryError`, akka buffers only as needed: _backpressure_ --- # Example Sources - Materialize as `ActorRef` ``` Source
s = Source.actorRef(10, OverflowStrategy.fail()); ``` - Materialize as reactive `Subscriber
` ``` Source
> s = Source.asSubscriber(); ``` - Read from a reactive `Publisher
p` ``` Source
s = Source.fromPublisher(p); ``` - Emit the same element regularly ``` Source
s = Source.tick(duration, duration, element); ``` --- # Example Sinks - Send to `ActorRef` ``` Sink
s = Sink.actorRef(target, "done"); ``` - Materialize as reactive `Publisher
` ``` Sink
> s = Sink.asPublisher(WITH_FANOUT); ``` - Materialize into a `java.util.List` of all elements ``` Sink
> s = Sink.seq(); ``` --- # Example source and flow operators - Send `Source
src` to an additional `Sink
sink` ``` Source
s = src.alsoTo(sink); ``` - Process batches of concatenated strings, but only if coming in too fast ``` Source
s = src.batchWeighted(1000, s -> s.length(), s -> s, (s1,s2) -> s1 + s2); ``` - Process 1 seconds' worth of elements at a time, but at most 100 ``` Source
, M> s = src.groupedWithin(100, Duration.create(1, SECONDS)); ``` - Invoke a `CompletionStage` for each element, and resume with the results in order ``` CompletionStage
process(String s) { ... } Source
s = src.mapAsync(this::process); ``` --- # Comparison of the four models ![Table](table.svg) --- class: center, middle # Exercises --- # Exercise project structure - Each tgz contains a multi-module SBT project - All but one of the modules will be empty - That's the module you'll be working on - Each module is a chat web application, running on `http://localhost:8173` ### SBT primer - Run a project: `sbt "project example-1-akka" "run arg1 arg2"` - **Eclipse** (4.6): Create eclipse project files using `sbt eclipse`, then import - **IntelliJ** (16.2.5): Install Scala and SBT plugins, import (download sources AND javadoc) - Fallback: `sbt makePom`, and Maven `pom` will magically appear in `target/` - Why SBT? - Solid support for protobuf, docker - We got tired of writing xml --- # Exercise 1: Actors First: - Download the [skeleton](workshop-1-akka.tgz) for exercise 1 and unpack it - Make your changes in the `example-1-akka` directory - Run : `sbt "project example-1-akka"` run - Create implementation of `ConversationProtocol` using plain actors - Use `akka.pattern.PatternCS.ask` as bridge to your actors -- And then, if your're bored: - Change `RootRoute` so it reads from files and the HTML becomes hot-reloadable - Add web socket (or hanging GET) support to client and server using akka streams - Add authentication - Make magic numbers configurable using akka's [config library](https://github.com/typesafehub/config) - Make the client beautiful --- # Exercise 1: Tips - Have a single parent actor that spawns conversations as child actors - This gives an easy "one child actor per conversation" - Use `getContext().actorOf(props, name)` to name a child actor - Use `getContext().getChild(name)` to check if a child actor already exists - The parent can serve as a router proxy - Use `childActorRef.forward(msg, getContext())` to forward an incoming message to a child. The child will process the message as if the original sender sent it. --- # Exercise 1: Solution --- class: center, middle # Persistence & event sourcing --- # Event sourcing .center[![Event sourcing](eventsource.svg)] - System considers an append-only **Event journal** the only source of truth - **Aggregate** is one unit of information to which (and only which) an event atomically applies - Events have a guaranteed order, but only within an aggregate --- # Event sourcing - Nice scalability properties - Each aggregate can process changes independently - All information that spans >1 aggregate is materialized using *event listeners* - Traditionally only applied inside a system - Synchronous APIs only ("_get customer history_") - Why not expose event stream itself? - Eventual consistency - Latency implications - Security implications --- # Akka persistence - Framework to do event sourcing using actors - Persistence plugins for levelDB, cassandra, kafka, ... - Each `PersistentActor` has a `String` identifier, under which events are stored ``` public class ChatActor extends AbstractPersistentActor { private final List
messages = new ArrayList<>(); private void postMessage(String msg) { persist(msg, evt -> { messages.add(msg); sender().tell(Done.getInstance(), self()); }); } private void getMessageList() { sender().tell(new ArrayList<>(messages), self()); } // ... } ``` --- # Akka persistence ``` public class ChatActor extends AbstractPersistentActor { private final List
messages = new ArrayList<>(); private void postMessage(String msg) { /* ... */ } private void getMessageList() { /* ... */ } @Override public String persistenceId() { return "chat-1"; } @Override public PartialFunction
receiveRecover() { return ReceiveBuilder .match(String.class, messages::add) .build(); } @Override public PartialFunction
receiveCommand() { return ReceiveBuilder .matchEquals("/list", msg -> getMessageList()) .match(String.class, this::postMessage) .build(); } } ``` --- # A bit of refactoring - Introduce a single base class for all commands ``` public abstract class ChatCommand {} public class GetMessageList extends ChatCommand {} public class PostMessage extends ChatCommand { private final String message; public PostMessage(String message) { this.message = message; } public String getMessage() { return message; } } ``` --- # A bit of refactoring - Introduce a single base class for all events ``` public abstract class ChatEvent {} public class MessagePosted extends ChatEvent { private final String message; public MessagePosted(String message) { this.message = message; } public String getMessage() { return message; } } ``` --- # ts-reaktive: Independent state class ``` public class ChatState extends AbstractState
{ public static final ChatState EMPTY = new ChatState(Vector.empty()); private final Seq
messages; @Override public ChatState apply(ChatEvent event) { if (event instanceof MessagePosted) { return new ChatState(messages.append( MessagePosted.class.cast(event).getMessage())); } else { return this; } } } ``` - Plain old Java class, hence easily unit testable (compared to actor) --- # ts-reaktive: Stateful persistent actor ``` public class ChatActor extends AbstractStatefulPersistentActor< ChatCommand, ChatEvent, ChatState> { public static abstract class Handler extends AbstractCommandHandler< ChatCommand, ChatEvent, ChatState> { } @Override protected ChatState initialState() { return ChatState.EMPTY; } @Override protected PartialFunction
applyCommand() { return new PFBuilder
() .match(ChatCommand.GetMessageList.class, cmd -> new GetMessageListHandler(getState(), cmd)) .match(ChatCommand.PostMessage.class, cmd -> new PostMessageHandler(getState(), cmd)) .build(); } } ``` - Business logic pushed out to `ChatState` and `*Handler` --- # Command handler example ``` public class PostMessageHandler extends ChatActor.Handler { @Override public Seq
getEventsToEmit() { return Vector.of(new ChatEvent.MessagePosted( ChatCommand.PostMessage.class.cast(cmd).getMessage())); } @Override public Object getReply(Seq
emittedEvents, long lastSequenceNr) { return Done.getInstance(); } } ``` - Plain old Java class, hence easily unit testable (compared to actor) - Behaviour of different commands separated into different classes --- # Exercise 2: Persistence and type safety First: - Either resume from part 1, or get a fresh [skeleton](workshop-2-persistence.tgz) for exercise 2 and unpack it - Make your changes in the `example-2-persistence` directory - Run : `sbt "project example-2-persistence"` run - Define your command, event, and state types - Introduce persistence by making `ChatActor` extend `AbstractPersistentStatefulActor` - Use the [LevelDB persistence plugin](http://doc.akka.io/docs/akka/current/java/persistence.html#local-leveldb-journal-java) -- And then, if your're bored: - Other things you didn't get to do in part 1 ;-) --- # Exercise 2: Tips - Declare a `Handler` base class for your actor, for concrete handlers to extend: ``` static abstract class Handler extends AbstractCommandHandler< ChatCommand, ChatEvent, ChatState> { /* ... */ } ``` - Use `new PFBuilder
().match(...).build()` to make the `PartialFunction` for `applyCommand()`. - Save events by calling `persist(event, e -> { ...done... })` - You need to have an `application.conf` resource with akka config ``` akka.persistence.journal.plugin = "akka.persistence.journal.leveldb" ``` - The SBT project already has LevelDB on the classpath --- # Exercise 2: Solution --- class: center, middle # Clustering --- # Immutability - Command, event and state classes are best kept immutable - Challenge to use built-in `Map`, `List` and `Set` * Interfaces declare and embrace mutability * Implementing data structures are not [_persistent_](https://en.wikipedia.org/wiki/Persistent_data_structure) - Serializable DTO's: [Google Protobuf](https://developers.google.com/protocol-buffers/) * Cross-platform IDL, Java code generator creates immutable classes * Very efficient binary representation and nice versioning - Everything else: [`javaslang`](http://www.javaslang.io/) * Immutable, persistent collection classes focused on Java 8 * Rich lambda-based combinator DSL ``` Seq
ints = Vector.empty(); ints = ints.append(42); ``` --- # Akka remoting and clustering - Transparently lets actors communicate between systems - `ActorRef` can point to a remote actor - Messages must be serializable (using configurable mechanisms) ``` akka { actor { provider = "akka.remote.RemoteActorRefProvider" } remote { enabled-transports = ["akka.remote.netty.tcp"] netty.tcp { hostname = "127.0.0.1" port = 2552 } } cluster { seed-nodes = [ "akka.tcp://ClusterSystem@127.0.0.1:2551", "akka.tcp://ClusterSystem@127.0.0.1:2552"] } } ``` --- # Akka cluster sharding - Dynamically distributes a group of actors across an akka cluster - `MessageExtractor` informs cluster sharding where a particular message should go ``` public abstract class ChatCommand { UUID conversation; } class ChatMessageExtractor implements MessageExtractor { private final int numberOfShards = 256; @Override public String entityId(Object command) { return ChatCommand.cast(command).conversation.toString(); } @Override public String shardId(Object command) { return String.valueOf(entityId(command).hashCode() % numberOfShards); } @Override public Object entityMessage(Object command) { return command; } } ``` --- # Akka cluster sharding - `ShardRegion` proxy sits between client and real (remote) persistent actor - Persistent actor names will be their persistence id ``` public class ChatActor extends AbstractPersistentActor { // ... @Override public String persistenceId() { return getSelf().path().name(); } } ActorRef proxy = ClusterSharding.get(system).start( "conversations", Props.create(ChatActor.class), ClusterShardingSettings.create(system), new MyMessageExtractor()); proxy.tell(new ChatCommand.PostMessage( UUID.fromString("67c67d28-4719-4bf9-bfe6-3944ed961a60"), "hello!")); ``` --- # ts-reaktive: Clustering actor support ``` public class PersistentActorSharding
{ PersistentActorSharding(Props props, String persistenceIdPrefix, Function
entityIdForCommand, int numberOfShards) { ... } public ActorRef shardRegion(ActorSystem system) { ... } } public static final PersistentActorSharding
sharding = PersistentActorSharding.of(Props.create(ChatActor.class), "conversation", ConversationCommand::getConversationId); ``` - `PersistentActorSharding` implements sensible sharding defaults - Fixed shard allocation per entityId hashcode - No message unwrapping (transparent proxy) --- # Exercise 3: Clustering - Either resume from part 2, or get a fresh [skeleton](workshop-3-cluster.tgz) for exercise 3 and unpack it - Make your changes in the `example-3-cluster` directory - Run : `sbt "project example-3-cluster"` run - Refactor `ConversationActor` to be a `AbstractStatefulPersistentActor` - Add [clustering support](http://doc.akka.io/docs/akka/current/java/cluster-usage.html) - Switch to the [Cassandra persistence plugin](https://github.com/akka/akka-persistence-cassandra) And then, if your're bored: -- - Add validation to incoming messages by implementing `AbstractCommandHandler.getValidationError` - Define command and event classes using Google Protobuf (instead of `Serializable`), in `src/main/protobuf` - Serve the conversation contents as an akka stream using `ts-reaktive-marshal` --- # Exercise 3: Tips - Use docker to start a cassandra server real quick: ``` docker run -d --name cass -p 9042:9042 cassandra:3.7 ``` - Your `new ActorSystem("myName")` must match the configured seed nodes ``` seed-nodes = [ "akka.tcp://myName@127.0.0.1:2551", "akka.tcp://myName@127.0.0.1:2552"] ``` - Tell SBT to run your cluster members at a particular port: ``` config = ConfigFactory.parseString("akka.remote.netty.tcp.port=" + (2551 + Integer.parseInt(args[0]))); system = new ActorSystem(config.withFallback(ConfigFactory.load())); ``` and then `sbt "project example-3-reaktive" "run 1"` --- # Exercise 3: Solution --- class: center, middle # Streaming --- # Stream materialization - _Graph_ is only a blueprint: nothing runs until it's given to a _materializer_, typically `ActorMaterializer` - All graph stages are generic in their materialized type `M` - Graph can be materialized (`run`, `runWith`) more than once ``` class Source
{ // A graph which materializes into the M2 of the sink (ignoring source's M) public RunnableGraph
to(Sink
sink); // Materializes, and returns the M of the sink (ignoring this source's M) public
M2 runWith(Sink
sink, Materializer m) { ... } // A graph which materializes into the result of applying [combine] to // this source's M and the sink's M2 public
RunnableGraph
toMat(Sink
sink, Function2
combine); } class RunnableGraph
{ public M run(Materializer m); } ``` --- # Reusable pieces - `Source`, `Sink` and `Flow` are all normal, immutable objects, so they're ideal to be constructed in reusable factory methods: ``` public Sink
> lineSink(String filename) { Sink
> file = FileIO.toPath(Paths.get(filename); // Let's start with some strings return Flow.of(String.class) // Flow
// Convert them into bytes (UTF-8), adding a newline // We now have a Flow
.map(s -> ByteString.fromString(s + "\n")) // Send them into a file, and we want the IOResult of the // FileIO sink as materialized value of our own sink .toMat(file), Keep.right()); } numbers.runWith(lineSink("numbers.txt"), materializer); ``` --- # Time-based processing ``` final Source
numbers = Source.range(1, 100000000); final Sink
> print = Sink.foreach(i -> System.out.println(i)); final CompletionStage
done = numbers .throttle(1, Duration.create(1, TimeUnit.SECONDS), 1, ThrottleMode.shaping()) .runWith(print, materializer); ``` - This does what you expect: print one message per second - No `OutOfMemoryError`, akka buffers only as needed: _backpressure_ --- # Akka persistence query - Query part of (or the whole) event journal in various ways - Each query type defined by a Java interface. - We'll be using `EventsByTagQuery`: all events, across aggregates, since a given timestamp ``` interface EventsByTagQuery { Source
eventsByTag(String tag, Long offset); } class EventEnvelope { Long offset; String persistenceId; Long sequenceNr; Object event } ``` - The source will return historic events, catching up to real time - Lookup is journal-dependent. For cassandra: ``` CassandraReadJournal journal = PersistenceQuery.get(system) .getReadJournalFor(CassandraReadJournal.class, CassandraReadJournal.Identifier()); ``` --- # Exercise 4: Streaming - Either resume from part 3, or get a fresh [skeleton](workshop-4-streams.tgz) for exercise 4 and unpack it - Make your changes in the `example-4-streams` directory - Run : `sbt "project example-4-streams"` run - Create an independent chat bot that is listening in on conversations - Implement it by running a continuous persistence query - Have the bot respond whenever anybody types "`/bot`" in a conversation -- And then, if your're bored: - Make sure the chat bot only starts once in the cluster - For this, investigate [ClusterSingleton](http://doc.akka.io/docs/akka/2.4.11/java/cluster-singleton.html) - Make sure the stream restarts if interrupted, e.g. due to cassandra restart - For this, investigate [BackoffSupervisor](http://doc.akka.io/docs/akka/2.4.11/general/supervision.html#Delayed_restarts_with_the_BackoffSupervisor_pattern) --- # Exercise 4: tips - Add the following config to have the cassandra persistence query be more real-time: ``` cassandra-journal { pubsub-minimum-interval = 100ms } cassandra-query-journal { eventual-consistency-delay = 0s refresh-interval = 1s } ``` --- # Optional extra topics 1. Akka HTTP routing DSL 2. ts-reaktive Marshalling DSL --- # Wrap-up - Solutions to all exercises: [https://github.com/jypma/ts-reaktive-examples](https://github.com/jypma/ts-reaktive-examples) - Want to get involved? - Get: [http://akka.io/](http://akka.io/) - Read: [http://doc.akka.io/docs/akka/current/java.html](http://doc.akka.io/docs/akka/current/java.html) - Chat: `#akka-workshop` on Slack, or [https://gitter.im/akka/akka](https://gitter.im/akka/akka) - Hack: [https://github.com/Tradeshift/ts-reaktive/](https://github.com/Tradeshift/ts-reaktive/) and [https://github.com/jypma/ts-reaktive-examples/](https://github.com/jypma/ts-reaktive-examples/)