class: center, middle # Event sourcing at scale Jan Ypma jyp@tradeshift.com .smallright[ This presentation: http://jypma.github.io/20160914-jdkio-architecture ] --- # Agenda - A bit of Tradeshift history - Typical sources of failures - Event sourcing - Actors, Akka, and Clustering - Example use case: collaboration --- # Tradeshift in 2011 .center[![Tradeshift system](tsold.svg)] - Two major software components - _the frontend_ - _the backend_ - <10 developers --- # Tradeshift in 2016 .center[![Tradeshift system](tsnew.svg)] - 30 deployed components (and growing) - 150 developers - 250.000 LoC in _the backend_ --- # Scaling of Tradeshift's systems - Moore's law applies to AWS - Single point of _not quite failing often enough_ - 2016 directive: _All new components must be clustered_ - Yeah, what about the 30-ish existing ones? - New architecture is needed --- # Scaling of Tradehift's development process - 2011: _"We're a Java shop"_ -- - 2016: Not really, at least not anymore - Groovy and grails - Python, go, ruby for infrastructure - Crazy javascript people - Scala - But still, mostly Java - Empower teams to pick their own language and frameworks --- # Typical sources of failures .center[![Tradeshift flow](tsflow.svg)] - We're down! - We overloaded the database - which caused the backend to respond slowly - which caused the frontend to respond slowly - which caused our users' web browsers to respond slowly - which caused our users to reload their page - `GOTO 10` -- - Enter the buzzwords - Let it crash [2003, Amstrong] - Micro-services [2005, Rodgers] - Self-contained systems [2015, [scs-architecture.org](http://scs-architecture.org)] --- # Self-contained systems .floatright[![Async comm](async.svg)] - No outgoing calls while handling an incoming request - (except to our own databases) - All inter-service communication must be **asynchronous** - This implies data replication - No single points of failure - System must be clustered - Design must trivially scale to 10x expected load --- # Event sourcing .center[![Event sourcing](eventsource.svg)] - System considers an append-only **Event journal** the only source of truth - **Aggregate** is one unit of information to which (and only which) an event atomically applies - Events have a guaranteed order, but only within an aggregate --- # Event sourcing - Nice scalability properties - Each aggregate can process changes independently - All information that spans >1 aggregate is materialized using *event listeners* - Traditionally only applied inside a system - Synchronous APIs only ("_get customer history_") - Why not expose event stream itself? - Eventual consistency - Latency implications - Security implications --- class: center, middle # Implementation --- # The actor model - **Actor** is an entity that responds only to messages by - sending messages to other actors - creating other (child) actors - adjusting its behaviour - *Akka* is a toolkit for writing actors in Java - Actor is a normal Java class that extends `UntypedActor` or `AbstractActor` - **Message** is an immutable, serializable, Java class - Parent actor is the _supervisor_ of its child actors. On child actor failure, parent decides what to do: - Restart child - Stop child - Escalate --- # Actor ping pong ``` public class PongActor extends UntypedActor { public void onReceive(Object message) { if (message instanceof String) { System.out.println("In PongActor - received message: " + message); getSender().tell("pong", getSelf()); } } } ``` --- # Actor ping pong ``` public class Initialize {} public class PingActor extends AbstractActor { private int counter = 0; private ActorRef pongActor = getContext().actorOf( Props.create(PongActor.class), "pongActor"); { receive(ReceiveBuilder .match(Initialize.class, msg -> { System.out.println("In PingActor - starting ping-pong"); pongActor.tell("ping", getSelf()); }) .match(String.class, msg -> { System.out.println("In PingActor - received message: " + message); counter += 1; if (counter == 3) { getContext().system().shutdown(); } else { getSender().tell("ping", getSelf()); } }) .build()); } } ``` --- # Actor ping ping ``` public static void main() { ActorSystem system = ActorSystem.create(); ActorRef pingActor = system.actorOf(Props.create(PingActor.class)); PingActor.tell(new Initialize()); } ``` Output: ``` In PingActor - starting ping-pong In PongActor - received message: ping In PingActor - received message: pong In PongActor - received message: ping In PingActor - received message: pong In PongActor - received message: ping In PingActor - received message: pong ``` --- # Akka persistence - Framework to do event sourcing using actors - Persistence plugins for levelDB, cassandra, kafka, ... - Each `PersistentActor` has a `String` identifier, under which events are stored ``` public class ChatActor extends AbstractPersistentActor { private final List
messages = new ArrayList<>(); @Override public String persistenceId() { return "chat-1"; } @Override public void receiveRecover(Object msg) { if (msg instanceof String) { messages.add((String)msg); } } @Override public void receiveCommand(Object msg) { if (msg.equals("/list")) { sender().tell(new ArrayList<>(messages)); } else if (msg instanceof String) { persist(msg, evt -> messages.add((String)msg)); } } } ``` --- # Akka remoting and clustering - Transparently lets actors communicate between systems - `ActorRef` can point to a remote actor - Messages must be serializable (using configurable mechanisms) ``` akka { actor { provider = "akka.remote.RemoteActorRefProvider" } remote { enabled-transports = ["akka.remote.netty.tcp"] netty.tcp { hostname = "127.0.0.1" port = 2552 } } cluster { seed-nodes = [ "akka.tcp://ClusterSystem@127.0.0.1:2551", "akka.tcp://ClusterSystem@127.0.0.1:2552"] } } ``` --- # Akka cluster sharding - Dynamically distributes a group of actors across an akka cluster - `MessageExtractor` informs cluster sharding where a particular message should go ``` class ChatMessage { UUID conversation; String msg; } class MyMessageExtractor implements MessageExtractor { private final int numberOfShards = 256; @Override public String entityId(Object command) { return ChatMessage.cast(command).conversation.toString(); } @Override public String shardId(Object command) { return String.valueOf(entityId(command).hashCode() % numberOfShards); } @Override public Object entityMessage(Object command) { return ChatMessage.cast(command).msg; } } ``` --- # Akka cluster sharding - `ShardRegion` proxy sits between client and real (remote) persistent actor - Persistent actor names will be their persistence id ``` public class ChatActor extends AbstractPersistentActor { // ... @Override public String persistenceId() { return getSelf().path().name(); } } ActorRef proxy = ClusterSharding.get(system).start( "conversations", Props.create(ChatActor.class), ClusterShardingSettings.create(system), new MyMessageExtractor()); proxy.tell(new ChatMessage( UUID.fromString("67c67d28-4719-4bf9-bfe6-3944ed961a60"), "hello!")); ``` --- # Putting it all together - 2015: Let's try it out first - 2016: _Collaboration_ in production - Real-time text message exchange between employees - Text interface to automated travel agent - In development: documents, FTP - Stuff that works well for us: [https://github.com/Tradeshift/ts-reaktive/](https://github.com/Tradeshift/ts-reaktive/) - `ts-reaktive-actors`: Persistent actor base classes with reasonable defaults, and HTTP API for event journal - `ts-reaktive-marshal`: Non-blocking streaming marshalling framework for akka streams - `ts-reaktive-replication`: Master-slave replication across data centers for persistent actors --- # A bit of refactoring - Introduce a single base class for all commands ``` public abstract class ConversationCommand {} public class GetMessageList extends ConversationCommand {} public class PostMessage extends ConversationCommand { private final String message; public PostMessage(String message) { this.message = message; } public String getMessage() { return message; } } ``` --- # A bit of refactoring - Introduce a single base class for all events ``` public abstract class ConversationEvent {} public class MessagePosted extends ConversationEvent { private final String message; public MessagePosted(String message) { this.message = message; } public String getMessage() { return message; } } ``` --- # Stateful persistent actor ``` public class ConversationActor extends AbstractStatefulPersistentActor< ConversationCommand, ConversationEvent, ConversationState> { public static abstract class Handler extends AbstractCommandHandler< ConversationCommand, ConversationEvent, ConversationState> { } @Override protected ConversationState initialState() { return ConversationState.EMPTY; } @Override protected PartialFunction
applyCommand() { return new PFBuilder
() .match(ConversationCommand.GetMessageList.class, cmd -> new GetMessageListHandler(getState(), cmd)) .match(ConversationCommand.PostMessage.class, cmd -> new PostMessageHandler(getState(), cmd)) .build(); } } ``` - Functionality pushed out to `ConversationState` and `*Handler` --- # Independent state class ``` public class ConversationState extends AbstractState< ConversationEvent, ConversationState> { public static final ConversationState EMPTY = new ConversationState(Vector.empty()); private final Seq
messages; @Override public ConversationState apply(ConversationEvent event) { if (event instanceof MessagePosted) { return new ConversationState(messages.append( MessagePosted.class.cast(event).getMessage())); } else { return this; } } } ``` - Plain old Java class, hence easily unit testable (compared to actor) --- # Command handler example ``` public class PostMessageHandler extends ConversationActor.Handler { @Override public Seq
getEventsToEmit() { return Vector.of(new ConversationEvent.MessagePosted( ConversationCommand.PostMessage.class.cast(cmd).getMessage())); } @Override public Object getReply(Seq
emittedEvents, long lastSequenceNr) { return Done.getInstance(); } } ``` - Plain old Java class, hence easily unit testable (compared to actor) - Behaviour of different commands separated into different classes --- # Event journal over HTTP - We wanted easy consumption of the event log by other systems - HTTP chunked encoding (1 chunk per event), without completion ```php GET /events?since=1473689920000 200 OK Content-Type: application/protobuf Transfer-Encoding: chunked 14 (14 bytes with the first protobuf event) 11 (11 bytes with the second protobuf event) (TCP stream stalls here) ``` - Additional events can arrive in real time --- # Use case: collaboration .center[![Collaboration architecture](collaboration.svg)] - _Content_ backend has API to add messages to conversations - Messages go into the event journal - Journal is queryable over HTTP - _Presentation_ backend listens to event stream - Materializes into views that are UI dependent - Can combine other sources as well - Browser talks to both Presentation and Content backends - Web socket stream informs browser of incoming messages --- # Wrap-up - Scalable systems: check - Scalable development: check - We're not quite there yet - Akka, cassandra and the reaktive combo still under refactoring - Attitude of _I've done Spring for 10+ years successfully, why would I learn this_ - The proof is in more pudding - Want to get involved? - Get: [http://akka.io/](http://akka.io/) - Read: [http://doc.akka.io/docs/akka/current/java.html](http://doc.akka.io/docs/akka/current/java.html) - Chat: [https://gitter.im/akka/akka](https://gitter.im/akka/akka) - Hack: [https://github.com/Tradeshift/ts-reaktive/](https://github.com/Tradeshift/ts-reaktive/) --- class: center, middle # Reactive workshop ## Tomorrow, 13:00, Room Holberg You can do all this yourself! --- class: center, middle # Extra slides --- # Introducing Akka Streams - `Graph` is a *blueprint* for a closed, finite network of *stages*, which communicate by streaming elements - `GraphStage
` is one processing stage within a graph, taking elements in through zero or more *Inlets*, and emitting through *Outlets* - It's completely up to the stage when and how to respond to arriving elements - All built-in graph stages embrace _backpressure_ and _bounded processing_ #### Mostly used graph stages - `Source
` has one outlet of type `T` - `Sink
` has one inlet of type `T` - `Flow
` has one inlet of type `A` and one outlet of type `B` - `RunnableGraph
` has no inlets or outlets #### Reactive streams - Akka is a reactive streams implementation (just like `RxJava` and others) - You typically don't interact in terms of _publisher_ and _subscriber_ directly --- # Hello, streams ``` final ActorSystem system = ActorSystem.create("QuickStart"); final Materializer materializer = ActorMaterializer.create(system); final Source
numbers = Source.range(1, 100); final Sink
> print = Sink.foreach(i -> System.out.println(i)); final CompletionStage
done = numbers.runWith(print, materializer); // Output: // 1 // 2 // ... ``` --- # Stream materialization - _Graph_ is only a blueprint: nothing runs until it's given to a _materializer_, typically `ActorMaterializer` - All graph stages are generic in their materialized type `M` - Graph can be materialized (`run`, `runWith`) more than once ``` class Source
{ // A graph which materializes into the M2 of the sink (ignoring source's M) public RunnableGraph
to(Sink
sink); // Materializes, and returns the M of the sink (ignoring this source's M) public
M2 runWith(Sink
sink, Materializer m) { ... } // A graph which materializes into the result of applying [combine] to // this source's M and the sink's M2 public
RunnableGraph
toMat(Sink
sink, Function2
combine); } class RunnableGraph
{ public M run(Materializer m); } ``` --- # Reusable pieces - `Source`, `Sink` and `Flow` are all normal, immutable objects, so they're ideal to be constructed in reusable factory methods: ``` public Sink
> lineSink(String filename) { Sink
> file = FileIO.toPath(Paths.get(filename); // Let's start with some strings return Flow.of(String.class) // Flow
// Convert them into bytes (UTF-8), adding a newline // We now have a Flow
.map(s -> ByteString.fromString(s + "\n")) // Send them into a file, and we want the IOResult of the // FileIO sink as materialized value of our own sink .toMat(file), Keep.right()); } numbers.runWith(lineSink("numbers.txt"), materializer); ``` --- # Time-based processing ``` final Source
numbers = Source.range(1, 100000000); final Sink
> print = Sink.foreach(i -> System.out.println(i)); final CompletionStage
done = numbers .throttle(1, Duration.create(1, TimeUnit.SECONDS), 1, ThrottleMode.shaping()) .runWith(print, materializer); ``` - This does what you expect: print one message per second - No `OutOfMemoryError`, akka buffers only as needed: _backpressure_ --- # Example Sources - Materialize as `ActorRef` ``` Source
s = Source.actorRef(10, OverflowStrategy.fail()); ``` - Materialize as reactive `Subscriber
` ``` Source
> s = Source.asSubscriber(); ``` - Read from a reactive `Publisher
p` ``` Source
s = Source.fromPublisher(p); ``` - Emit the same element regularly ``` Source
s = Source.tick(duration, duration, element); ``` --- # Example Sinks - Send to `ActorRef` ``` Sink
s = Sink.actorRef(target, "done"); ``` - Materialize as reactive `Publisher
` ``` Sink
> s = Sink.asPublisher(WITH_FANOUT); ``` - Materialize into a `java.util.List` of all elements ``` Sink
> s = Sink.seq(); ``` --- # Example source and flow operators - Send `Source
src` to an additional `Sink
sink` ``` Source
s = src.alsoTo(sink); ``` - Process batches of concatenated strings, but only if coming in too fast ``` Source
s = src.batchWeighted(1000, s -> s.length(), s -> s, (s1,s2) -> s1 + s2); ``` - Process 1 seconds' worth of elements at a time, but at most 100 ``` Source
, M> s = src.groupedWithin(100, Duration.create(1, SECONDS)); ``` - Invoke a `CompletionStage` for each element, and resume with the results in order ``` CompletionStage
process(String s) { ... } Source
s = src.mapAsync(this::process); ```