class: center, middle # Disruptive Development using Reactive Event-Sourced Systems in Scala and Java Jan Ypma jyp@tradeshift.com --- # Agenda - A bit of Tradeshift history - Typical sources of failures - Concurrency models - Event sourcing - Implementation - Real-world use cases --- # Tradeshift in 2011 .center[![Tradeshift system](tsold.svg)] - Electronic network to connect business - Two major software components - _the frontend_ - _the backend_ - <10 developers --- # Tradeshift in 2017 .center[![Tradeshift system](tsnew.svg)] - 30 deployed components (and growing) - 250 developers - 250.000 LoC in _just the backend_ --- # Scaling of Tradeshift's systems - Moore's law applies to AWS - Single point of _not quite failing often enough_ - 2016 directive: _All new components must be clustered_ - Yeah, what about the 30-ish existing ones? - New architecture is needed --- # Scaling of Tradehift's development process - 2011: _"We're a Java shop"_ -- - 2017: Not really, at least not anymore - Groovy and grails - Scala - Python, Go, Ruby - Crazy javascript people - But still, mostly Java - Empower teams to pick their own language and frameworks - but interact through well-described standards and APIs (REST) --- # Typical sources of failures .center[![Tradeshift flow](tsflow.svg)] - We're down! - We overloaded the database - which caused the backend to respond slowly - which caused the frontend to respond slowly - which caused our users' web browsers to respond slowly - which caused our users to reload their page - `GOTO 10` -- - What, we're not that special? - _Let it crash_ [2003, Amstrong] - Micro-services [2005, Rodgers] - The Reactive Manifesto [2013, BonĂ©r, Farley, Kuhn, Thompson] - Self-contained systems [2015, [scs-architecture.org](http://scs-architecture.org)] --- # Self-contained systems .floatright[![Async comm](async.svg)] - No outgoing calls while handling an incoming request - (except to our own databases) - All inter-service communication must be **asynchronous** - This implies data replication - No single points of failure - System must be clustered - Design must trivially scale to 10x expected load --- # The actor model .floatright[![Actors](actors.svg)] - **Actor** is an entity that responds to messages by - sending _messages_ to any other actors - creating child actors - performing side-effects - A parent actor is the **Supervisor** of its child actors. On child actor failure, parent decides what to do: - Restart child (and grandchildren) - Stop child (and grandchildren) - Escalate - **Akka** is a toolkit for writing actors on the JVM - _Java_: Actor is a normal class that extends `UntypedActor` or `AbstractActor` - _Scala_: Actor is a normal class that implements the `Actor` trait - **Message** is an immutable, serializable, Java class (`case class`) --- # Reactive streams - Imagine an HTTP request with a very long response, that reads 1.000.000 rows from a DB *Using Threads* -- - Just block while reading (waits for more data), and block while writing (TCP send buffer full) *Using Actors* - Let's have a "Request Actor" and a "Database Actor" 1. Request Actor messages Database Actor to requests the rows 2. Database Actor sends back 1.000.000 messages as fast as it can? -- - Needs some sort of `Ack` protocol between Request Actor and Database Actor, or batching *Using Reactive streams* - What if we could combine the advantages of asynchronous messaging and add in backpressure? ![Streams - terms](stream-terms.svg) --- # Concurrency model summary - **Threads and locks** .good[→] Most of us know this .good[→] Obvious way to stream data .bad[→] Limited scalability (max threads) and performance (context switch) .bad[→] Deadlocks .bad[→] Most of us _think_ we know this - **Futures / Promises** .good[→] Easy to grasp .good[→] Solves thread scalability .bad[→] Still need locks for shared data access, or limit to single thread .bad[→] Can't stream data - **Actors** .good[→] Easier to reach both high scalability and performance .good[→] Handle shared data without locks .bad[→] Learning curve .bad[→] Can't stream data - **Reactive streams** .good[→] Builds on actors for scalability and performance .good[→] Easier to grasp than full-on actors .good[→] Streaming --- # Event sourcing .center[![Event sourcing](eventsource.svg)] - System considers an append-only **Event journal** the only source of truth - **Aggregate** is one unit of information to which (and only which) an event atomically applies - Events have a guaranteed order, but only within an aggregate --- # Event sourcing - Nice scalability properties - Each aggregate can process changes independently - All information that spans >1 aggregate is materialized asynchronously into secundary data stores .center[![Event sourcing - events](eventsource-events.svg)] - Traditionally only applied inside a system - Synchronous APIs ("_get customer history_") - Why not expose event stream itself? - Eventual consistency - Latency implications - Security implications --- class: center, middle # Implementation --- # Actor ping pong ``` package akka; trait Actor { def sender: ActorRef = ??? // this will return the sender of the current message "receive" is handling implicit def self: ActorRef = ??? // this is a reference to the current actor instance def context: ActorContext = ??? // interact with the actor system, create children, etc. def receive: PartialFunction[Any, Unit] // handles received messages for this actor } trait ActorRef { def !(msg: Any)(implicit sender: ActorRef): Unit // Sends a message to this actor } ``` ``` class PongActor extends Actor { override def receive = { case msg:String => sender ! "pong" } } ``` --- # Actor ping pong ``` case object DoIt class PingActor extends Actor { val pongActor: ActorRef = context.actorOf(Props[PongActor], "pongActor") var counter = 0 override def receive = { case DoIt => pongActor ! "ping" case msg:String => counter += 1 if (counter == 3) { context.system.shutdown() } else { sender ! "ping" } } } ``` --- # Actor ping ping ``` object PingPong extends App { val system = ActorSystem() val pingActor = system.actorOf(Props[PingActor]) pingActor ! DoIt } ``` Output: ``` In PingActor - starting ping-pong In PongActor - received message: ping In PingActor - received message: pong In PongActor - received message: ping In PingActor - received message: pong In PongActor - received message: ping In PingActor - received message: pong ``` --- # Akka persistence - Framework to do event sourcing using actors - Persistence plugins for levelDB, cassandra, kafka, ... - Each `PersistentActor` has a `String` identifier, under which events are stored ``` class ChatActor extends PersistentActor { override def persistenceId = "chat-1" var messages: Seq[String] = Seq.empty override def receiveCommand = { case "/list" => sender ! messages case msg: String => persist(msg) { evt => messages :+= msg sender ! Done } } override def receiveRecover = { case msg: String => messages :+= msg } } ``` --- # Akka remoting and clustering - Transparently lets actors communicate between systems - `ActorRef` can point to a remote actor - Messages must be serializable (using configurable mechanisms) ``` akka { actor { provider = "akka.remote.RemoteActorRefProvider" } remote { enabled-transports = ["akka.remote.netty.tcp"] netty.tcp { hostname = "127.0.0.1" port = 2552 } } cluster { seed-nodes = [ "akka.tcp://ClusterSystem@127.0.0.1:2551", "akka.tcp://ClusterSystem@127.0.0.1:2552"] } } ``` --- # Akka cluster sharding - Dynamically distributes a group of actors across an akka cluster - Cluster sharding needs to know where a particular message should go ``` case class ChatMessage(chatId: UUID, msg: String) val extractEntityId: ShardRegion.ExtractEntityId = { case msg:ChatMessage => (msg.chatId.toString, msg) } val extractShardId: ShardRegion.ExtractShardId = { case msg:ChatMessage => (msg.chatId.getLeastSignificantBits % 100).toString } ``` --- # Akka cluster sharding - `ShardRegion` proxy sits between client and real (remote) persistent actor - Persistent actor names will be their persistence id ``` class ChatActor extends PersistentActor { // ... override def persistenceId = self.path.name } val proxy: ActorRef = ClusterSharding(system).start( "conversations", Props[ChatActor], ClusterShardingSettings(system), extractEntityId, extractShardId) proxy ! ChatMessage(UUID.fromString("67c67d28-4719-4bf9-bfe6-3944ed961a60"), "hello!")) ``` --- # Akka streams .center[![Streams - terms](stream-terms.svg)] - `Graph` is a *blueprint* for a closed, finite network of *stages*, which communicate by streaming elements - `GraphStage[S extends Shape]` is one processing stage within a graph, taking elements in through zero or more *Inlets*, and emitting through *Outlets* - It's completely up to the stage when and how to respond to arriving elements - All built-in graph stages embrace _backpressure_ and _bounded processing_ #### Mostly used graph stages - `Source[T, M]` has one outlet of type `T` - `Sink[T, M]` has one inlet of type `T` - `Flow[A, B, M]` has one inlet of type `A` and one outlet of type `B` - `RunnableGraph[M]` has no inlets or outlets --- # Hello, streams ``` val system = ActorSystem() val materializer = ActorMaterializer(system) val numbers: Source[Int, NotUsed] = Source.range(1, 100) val format: Flow[Int, String, NotUsed] = Flow[Int].map(i => i.toString) val print: Sink[String, Future[Done]] = Sink.foreach(i => System.out.println(i)) val done: Future[Done] = numbers.via(format).to(print).run(materializer) // Output: // 1 // 2 // ... ``` --- # Event journal over HTTP - We wanted easy consumption of the event log by other systems - HTTP chunked encoding (1 chunk per event), without completion ```php GET /events?since=1473689920000 HTTP/1.1 200 OK Content-Type: application/protobuf Transfer-Encoding: chunked 14 (14 bytes with the first protobuf event) 11 (11 bytes with the second protobuf event) (TCP stream stalls here) ``` - Additional events can arrive in real time - Ideal to use with akka streams --- # Putting it all together - 2015: Let's try it out first - 2016: _Collaboration_ in production - Real-time text message exchange between employees - Text interface to automated travel agent - 2017: Migrating _Network_ - Global Tradeshift network graph - Stuff that works well for us: [https://github.com/Tradeshift/ts-reaktive/](https://github.com/Tradeshift/ts-reaktive/) - `ts-reaktive-actors`: Persistent actor Java base classes with reasonable defaults, and HTTP API for event journal - `ts-reaktive-marshal`: Non-blocking streaming marshalling framework for akka streams - `ts-reaktive-replication`: Master-slave replication across data centers for persistent actors --- # Use case: collaboration .center[![Collaboration architecture](collaboration.svg)] - _Content_ backend has API to add messages to conversations - Messages go into the event journal - Journal is queryable over HTTP - _Presentation_ backend listens to event stream - Materializes into views that are UI dependent - Can combine other sources as well - Browser talks to both Presentation and Content backends - Web socket stream informs browser of incoming messages --- # Use case: Business network routing - Legacy backend system - >500.000 company accounts with business identifiers - Currently in a relational database - Updated in-place - Migration strategy 1. Introduce event journal alongside existing data 2. Write event sourced new identity microservice 3. Start continuous import of events from the old system 4. Swap over .center[![Identity](identity.svg)] --- # Wrap-up - Scalable systems: check - Scalable development: check - We're not quite there yet - Akka, cassandra and the reaktive combo in active development - Attitude of _I've done Spring for 10+ years successfully, why would I learn this_ - The proof is in more pudding - Want to get involved? - Get: [http://akka.io/](http://akka.io/) - Read: [http://doc.akka.io/docs/akka/current/java.html](http://doc.akka.io/docs/akka/current/java.html) - Chat: [https://gitter.im/akka/akka](https://gitter.im/akka/akka) - Hack: [https://github.com/Tradeshift/ts-reaktive/](https://github.com/Tradeshift/ts-reaktive/) and [https://github.com/jypma/ts-reaktive-examples/](https://github.com/jypma/ts-reaktive-examples/) --- class: center, middle # Extra slides --- # Data binding: a solved problem? - Who remembers this? ``` @XmlRootElement class Person { private final String firstName, lastName; private final List
address; public Person (String firstName, String lastName, Iterable
address) { this.firstName = firstName; this.lastName = lastName; this.address = Collections.unmodifiableList(new ArrayList(address)); } public String getFirstName() { return firstName; } public String getLastName() { return lastName; } public List
getAddress() { return address; } } ``` --- # Data binding: a solved problem? - Who remembers this? ``` *@XmlRootElement(namespace = Constants.API, name = "Person") *@XmlAccessorType(XmlAccessType.NONE) class Person { private final String firstName, lastName; private final List
address; public Person (String firstName, String lastName, Iterable
address) { this.firstName = firstName; this.lastName = lastName; this.address = Collections.unmodifiableList(new ArrayList(address)); } * @XmlAttribute("firstName") public String getFirstName() { return firstName; } * @XmlAttribute("lastName") public String getLastName() { return lastName; } * @XmlElement(namespace = Constants.API, "Address") public List
getAddress() { return address; } } ``` --- # Data binding: a solved problem? - Scala people have it a bit nicer ``` @XmlRootElement(namespace = API, name = "Person") @XmlAccessorType(XmlAccessType.FIELD) case class Person(firstName: String, lastName: String, address: Seq[String]) ``` - or do they? .small[ ([scalaxb](http://scalaxb.org/) solves some of this for XML) ] - Other challenges - JSON and XML on one API, both should look nice - Huge requests - Maintainability (reflection, AOP, ...) - Discoverability (what annotations to attach where) - Type and data represenation are linked - Express request-only or response-only classes --- # Data binding through code ``` WriteProtocol
proto = tag(qname(API, "person"), Person::getFirstName, attribute(qname("firstName")), Person::getLastName, attribute(qname("lastName")), Person::getAddress, iterable( tag(qname(API, "Address"), body ) ) ); ``` - Put the first name into `attribute` `firstName`, the last into `lastName`, and the address into the `body` of nested `Address` tags. - which can then be used e.g. with STaX: `String xml = stax.writeAsString(person, proto.writer());` - or with akka: `Flow
f = ProtocolReader.of(proto);` --- # Getting creative - Extract and write _any_ tag and have it represented as a string ``` Protocol
p = XMLProtocol.anyTag ``` - Extract and write any tag in the default namespace and its body into a `Tuple2` ([javaslang.io](http://www.javaslang.io)) ``` Protocol
> p = XMLProtocol.anyTag.asLocalName() ``` - Represent the `
value1
more
` pattern as a normal `Map` ``` Protocol
> proto = tag(qname("props"), hashMap( anyTagWithBody.asLocalName() )); ``` - `Map
` of _all nested tag names and their 'value' attribute_ - `Map
` of _all of this tag's attributes_ --- # Reactive marshalling - Fully streaming - No reflection / AOP / annotations / ... - DSLs for XML, JSON and CSV, but a unified base API - Variants for Java and (coming up) Scala - To be added to akka's [alpakka](https://github.com/akka/alpakka) integration repository - For now, check [ts-reaktive-marshal](https://github.com/Tradeshift/ts-reaktive/tree/master/ts-reaktive-marshal) on Github