class: center, middle # Reactive workshop Jan Ypma jyp@tradeshift.com .smallright[ This presentation: http://jypma.github.io/20160915-jdkio-workshop ] --- # Agenda - Quick technology revamp .small[[0:30]] - Exercises 1. Create a chat web application using akka .small[[0:45]] 2. Add persistence so we don't lose data all the time .small[[0:45]] 3. Add scalability and integration .small[[0:45]] - Profit! ### First things first ```php mkdir workshop-1 cd workshop-1 wget "https://jypma.github.io/20160915-jdkio-workshop/workshop-1-akka.tgz" tar xzvf workshop-1-akka.tgz ./sbt compile //or just "sbt compile" if installed ``` - This presentation: http://jypma.github.io/20160915-jdkio-workshop - Akka guide: [http://doc.akka.io/docs/akka/2.4/java.html](http://doc.akka.io/docs/akka/2.4/java.html) - Akka JavaDoc: [http://doc.akka.io/japi/akka/2.4/](http://doc.akka.io/japi/akka/2.4/) --- # The actor model - **Actor** is an entity that responds only to messages by - sending messages to other actors - creating other (child) actors - adjusting its behaviour - *Akka* is a toolkit for writing actors in Java - Actor is a normal Java class that extends `UntypedActor` or `AbstractActor` - **Message** is an immutable, serializable, Java class - Parent actor is the _supervisor_ of its child actors. On child actor failure, parent decides what to do: - Restart child - Stop child - Escalate --- # Actor ping pong ``` public class PongActor extends UntypedActor { public void onReceive(Object message) { if (message instanceof String) { System.out.println("In PongActor - received message: " + message); getSender().tell("pong", getSelf()); } } } ``` --- # Actor ping pong ``` public class Initialize {} public class PingActor extends AbstractActor { private int counter = 0; private ActorRef pongActor = getContext().actorOf( Props.create(PongActor.class), "pongActor"); { receive(ReceiveBuilder .match(Initialize.class, msg -> { System.out.println("In PingActor - starting ping-pong"); pongActor.tell("ping", getSelf()); }) .match(String.class, msg -> { System.out.println("In PingActor - received message: " + message); counter += 1; if (counter == 3) { getContext().system().shutdown(); } else { getSender().tell("ping", getSelf()); } }) .build()); } } ``` --- # Actor ping ping ``` public static void main() { ActorSystem system = ActorSystem.create(); ActorRef pingActor = system.actorOf(Props.create(PingActor.class)); PingActor.tell(new Initialize()); } ``` Output: ``` In PingActor - starting ping-pong In PongActor - received message: ping In PingActor - received message: pong In PongActor - received message: ping In PingActor - received message: pong In PongActor - received message: ping In PingActor - received message: pong ``` --- # Akka persistence - Framework to do event sourcing using actors - Persistence plugins for levelDB, cassandra, kafka, ... - Each `PersistentActor` has a `String` identifier, under which events are stored ``` public class ChatActor extends AbstractPersistentActor { private final List
messages = new ArrayList<>(); @Override public String persistenceId() { return "chat-1"; } @Override public void receiveRecover(Object msg) { if (msg instanceof String) { messages.add((String)msg); } } @Override public void receiveCommand(Object msg) { if (msg.equals("/list")) { sender().tell(new ArrayList<>(messages)); } else if (msg instanceof String) { persist(msg, evt -> messages.add((String)msg)); } } } ``` --- # Akka remoting and clustering - Transparently lets actors communicate between systems - `ActorRef` can point to a remote actor - Messages must be serializable (using configurable mechanisms) ``` akka { actor { provider = "akka.remote.RemoteActorRefProvider" } remote { enabled-transports = ["akka.remote.netty.tcp"] netty.tcp { hostname = "127.0.0.1" port = 2552 } } cluster { seed-nodes = [ "akka.tcp://ClusterSystem@127.0.0.1:2551", "akka.tcp://ClusterSystem@127.0.0.1:2552"] } } ``` --- # Akka cluster sharding - Dynamically distributes a group of actors across an akka cluster - `MessageExtractor` informs cluster sharding where a particular message should go ``` class ChatMessage { UUID conversation; String msg; } class MyMessageExtractor implements MessageExtractor { private final int numberOfShards = 256; @Override public String entityId(Object command) { return ChatMessage.cast(command).conversation.toString(); } @Override public String shardId(Object command) { return String.valueOf(entityId(command).hashCode() % numberOfShards); } @Override public Object entityMessage(Object command) { return ChatMessage.cast(command).msg; } } ``` --- # Akka cluster sharding - `ShardRegion` proxy sits between client and real (remote) persistent actor - Persistent actor names will be their persistence id ``` public class ChatActor extends AbstractPersistentActor { // ... @Override public String persistenceId() { return getSelf().path().name(); } } ActorRef proxy = ClusterSharding.get(system).start( "conversations", Props.create(ChatActor.class), ClusterShardingSettings.create(system), new MyMessageExtractor()); proxy.tell(new ChatMessage( UUID.fromString("67c67d28-4719-4bf9-bfe6-3944ed961a60"), "hello!")); ``` --- # ts-reaktive: Stateful persistent actor ``` public class ConversationActor extends AbstractStatefulPersistentActor< ConversationCommand, ConversationEvent, ConversationState> { public static abstract class Handler extends AbstractCommandHandler< ConversationCommand, ConversationEvent, ConversationState> { } @Override protected ConversationState initialState() { return ConversationState.EMPTY; } @Override protected PartialFunction
applyCommand() { return new PFBuilder
() .match(ConversationCommand.GetMessageList.class, cmd -> new GetMessageListHandler(getState(), cmd)) .match(ConversationCommand.PostMessage.class, cmd -> new PostMessageHandler(getState(), cmd)) .build(); } } ``` - Functionality pushed out to `ConversationState` and `*Handler` --- # Independent state class ``` public class ConversationState extends AbstractState< ConversationEvent, ConversationState> { public static final ConversationState EMPTY = new ConversationState(Vector.empty()); private final Seq
messages; @Override public ConversationState apply(ConversationEvent event) { if (event instanceof MessagePosted) { return new ConversationState(messages.append( MessagePosted.class.cast(event).getMessage())); } else { return this; } } } ``` - Plain old Java class, hence easily unit testable (compared to actor) --- # Command handler example ``` public class PostMessageHandler extends ConversationActor.Handler { @Override public Seq
getEventsToEmit() { return Vector.of(new ConversationEvent.MessagePosted( ConversationCommand.PostMessage.class.cast(cmd).getMessage())); } @Override public Object getReply(Seq
emittedEvents, long lastSequenceNr) { return Done.getInstance(); } } ``` - Plain old Java class, hence easily unit testable (compared to actor) - Behaviour of different commands separated into different classes --- # Introducing Akka Streams - `Graph` is a *blueprint* for a closed, finite network of *stages*, which communicate by streaming elements - `GraphStage
` is one processing stage within a graph, taking elements in through zero or more *Inlets*, and emitting through *Outlets* - It's completely up to the stage when and how to respond to arriving elements - All built-in graph stages embrace _backpressure_ and _bounded processing_ #### Mostly used graph stages - `Source
` has one outlet of type `T` - `Sink
` has one inlet of type `T` - `Flow
` has one inlet of type `A` and one outlet of type `B` - `RunnableGraph
` has no inlets or outlets #### Reactive streams - Akka is a reactive streams implementation (just like `RxJava` and others) - You typically don't interact in terms of _publisher_ and _subscriber_ directly --- # Hello, streams ``` final ActorSystem system = ActorSystem.create("QuickStart"); final Materializer materializer = ActorMaterializer.create(system); final Source
numbers = Source.range(1, 100); final Sink
> print = Sink.foreach(i -> System.out.println(i)); final CompletionStage
done = numbers.runWith(print, materializer); // Output: // 1 // 2 // ... ``` --- # Exercise project structure - Each tgz contains a multi-module SBT project - All but one of the modules will be empty - That's the module you'll be working on ### SBT primer - Run a project: `sbt "project example-1-akka" "run arg1 arg2"` - Create eclipse project files: `sbt eclipse` - Import into IntelliJ: just import, but set Java to level 8 - Why SBT? - Solid support for protobuf, docker - We got tired of writing xml --- # Exercise 1: Actors First: - Download the [skeleton](workshop-1-akka.tgz) for exercise 1 and unpack it - Make your changes in the `example-1-akka` directory - Create implementation of `ConversationProtocol` using plain actors - Use `akka.pattern.PatternCS.ask` as bridge to your actors And then, if your're bored: -- - _akka-http_: Change `RootRoute` so it reads from files and the HTML becomes hot-reloadable - _akka-http_, _akka-streams_: Add web socket (or hanging GET) support to client and server using akka streams - _akka-http_: Add authentication - _typesafe-config_: Make magic numbers configurable using akka's [config library](https://github.com/typesafehub/config) - _js_: Make the client beautiful --- # Exercise 1: Tips - Have a single parent actor that spawns conversations as child actors - This gives an easy "one child actor per conversation" - Use `getContext().actorOf(props, name)` to name a child actor - Use `getContext().getChild(name)` to check if a child actor already exists - The parent can serve as a router proxy - Use `childActorRef.forward(msg, getContext())` to forward an incoming message to a child. The child will process the message as if the original sender sent it. --- # Exercise 1: Solution --- # Exercise 2: Persistence First: - Either resume from part 1, or get a fresh [skeleton](workshop-2-persistence.tgz) for exercise 2 and unpack it ``` wget https://jypma.github.io/20160915-jdkio-workshop/workshop-2-persistence.tgz ``` - Make your changes in the `example-2-persistence` directory - Introduce persistence by making `ConversationActor` extend `AbstractPersistentActor` - Use the [LevelDB persistence plugin](http://doc.akka.io/docs/akka/current/java/persistence.html#local-leveldb-journal-java) And then, if your're bored: -- - Other things you didn't get to do in part 1 ;-) --- # Exercise 2: Tips - Save events by calling `persist(event, e -> { ...done... })` - You need to have an `application.conf` resource with akka config ``` akka.persistence.journal.plugin = "akka.persistence.journal.leveldb" ``` - Remember to update your actor state both in `receiveCommand` _AND_ `receiveRecover` --- # Exercise 2: Solution --- # Exercise 3: Clustering & Type safety - Either resume from part 2, or get a fresh [skeleton](workshop-3-reaktive.tgz) for exercise 3 and unpack it ``` wget https://jypma.github.io/20160915-jdkio-workshop/workshop-3-reaktive.tgz ``` - Make your changes in the `example-3-reaktive` directory - Refactor `ConversationActor` to be a `AbstractStatefulPersistentActor` - Add [clustering support](http://doc.akka.io/docs/akka/current/java/cluster-usage.html) - Switch to the [Cassandra persistence plugin](https://github.com/akka/akka-persistence-cassandra) And then, if your're bored: -- - Add event journal API using `EventRoute`, adding akka [Persistence Query](http://doc.akka.io/docs/akka/current/java/persistence-query.html) for cassandra - Serve the conversation contents as an akka stream using `ts-reaktive-marshal` - Add validation to incoming messages by implementing `AbstractCommandHandler.getValidationError` - Define command and event classes using Google Protobuf (instead of `Serializable`), in `src/main/protobuf` --- # Exercise 3: Tips - Use docker to start a cassandra server real quick ``` docker run -d --name jdkio_cass -p 9042:9042 cassandra:3.7 ``` - Or alternatively, just run it directly ``` wget \ http://mirrors.dotsrc.org/apache/cassandra/3.7/apache-cassandra-3.7-bin.tar.gz tar xzvf apache-cassandra-3.7-bin.tar.gz cd apache-cassandra-3.7 bin/cassandra ``` - Your `new ActorSystem("myName")` must match the configured seed nodes ``` seed-nodes = [ "akka.tcp://myName@127.0.0.1:2551", "akka.tcp://myName@127.0.0.1:2552"] ``` --- # Exercise 3: Solution --- # Wrap-up - Solutions to all exercises: [https://github.com/jypma/ts-reaktive-examples](https://github.com/jypma/ts-reaktive-examples) - Questions, comments? - jyp@tradeshift.com - `https://gitter.im/akka/akka` (`@jypma`)