class: center, middle # Reactive marshalling Playing with XML, JSON and CSV made fun again Jan Ypma jyp@tradeshift.com --- # Agenda - XML, I know this! - Introduction to Akka streams - Structured data as streams - Document processing use case - Other features --- # About .center[![Tradeshift](ts.jpg)] - Platform for Business interactions - Procurement, ordering, invoicing, payment, marketplace, ... - Builds on **UBL** (Universal Business Language) for open document exchange - [http://docs.oasis-open.org/ubl/UBL-2.1.html](http://docs.oasis-open.org/ubl/UBL-2.1.html) --- # Problem statement - Tradeshift processes XML documents of > 100MB in size - Several of these per second, at an increasing rate - Only a small part of that 100MB is semantically interesting ```xml
IVAD1dOYeBI5rdxktUgQvdn/NL8idKirI35p2DhD[...]
[...many more attachments with base64 crap...>
Cats
120
``` - We historically use JAXB-generated DTO's to process these - Can someone spot a problem with this approach? --- # I need to do XML, I know this! ``` package com.service.data; class Person { private final String firstName, lastName; private final List
address; public Person (String firstName, String lastName, Iterable
address) { this.firstName = firstName; this.lastName = lastName; this.address = Collections.unmodifiableList(new ArrayList(address)); } public String getFirstName() { return firstName; } public String getLastName() { return lastName; } public List
getAddress() { return address; } } ``` --- # I need to do XML, I know this! ``` *@XmlRootElement(namespace = API, name = "Person") class Person { * @XmlAttribute("firstName") private final String firstName; * @XmlAttribute("lastName") private final String lastName; * @XmlElement(namespace = Constants.API, "address") private final List
address; * private Person() {} public Person (String firstName, String lastName, Iterable
address) { this.firstName = firstName; this.lastName = lastName; this.address = Collections.unmodifiableList(new ArrayList(address)); } public String getFirstName() { return firstName; } public String getLastName() { return lastName; } public List
getAddress() { return address; } } ``` --- # I need to do XML, I know this! ``` @XmlRootElement(namespace = API, name = "Person") class Person { @XmlAttribute("firstName") private final String firstName; @XmlAttribute("lastName") private final String lastName; @XmlElement(namespace = Constants.API, "address") private final List
address; private Person() {} public Person (String firstName, String lastName, Iterable
address) { this.firstName = firstName; this.lastName = lastName; * this.address = new ArrayList(address); } public String getFirstName() { return firstName; } public String getLastName() { return lastName; } * public List
getAddress() { return Collections.unmodifiableList(address); } } ``` --- # I need to do XML, I know this! - A short while later... ``` @XmlRootElement(namespace = API, name = "Person") *@XmlAccessorType(XmlAccessType.NONE) class Person { /* ... */ * public String getName() { return getFirstName() + " " + getLastName(); } } ``` --- # I need to do XML, I know this! - A little later yet... ``` class Person { /* ... */ private final LocalDate birthday; @XmlAttribute("birthday") public LocalDate getBirthday() { return birthday; } } ``` - Oh I'd actually like ```xml
1981
11
1
``` - Anyone? -- .small[ - Declare `@XmlJavaTypeAdapter` on every usage - Create `XmlAdapter
` and have it cast to DOM `Element` ] --- # I need to do XML, I know this! - Even later... ``` @XmlRootElement(namespace = API_V2, name="Person") class PersonV2 { //lots of extra fields added here, //slightly incompatible with Person public Person toPerson() { return new Person( /* ... */ ); } } ``` - I'd like to accept either a `
` or a `
` for my JAX-RS API - Anyone? --- # Problems with data binding through annotations - Limited discoverability - Tightly coupled - Polluting service data classes with REST API logic - Must change the data classes (or write clumsy XML or binding code) - Must always deal with both read and write concerns - Code structure must follow XML or JSON object structure - Limited reuse - Data classes can only be annotated once - Hard or impossible to use any collections besides `java.util.ArrayList` - Duplication: namespace declarations, collection handling, XmlAccessorType, ... - Same concerns exist, to differing extents, in Jackson and its annotations --- # Data binding through code - What if I could just say "_Put the first name into firstName, the last into lastName, and the address into some tags_" ``` import static com.tradeshift.reaktive.xml.XMLProtocol.*; WriteProtocol
proto = tag(qname(API, "person"), Person::getFirstName, attribute(qname("firstName")), Person::getLastName, attribute(qname("lastName")), Person::getAddress, iterable( tag(qname(API, "address"), body ) ) ); ``` - which can then be used e.g. with STaX: `String xml = stax.writeAsString(person, proto.writer());` --- # Read and write protocol - Just add another lambda to allow reading as well ``` Protocol
proto = tag(qname(API, "person"), attribute(qname("firstName")), attribute(qname("lastName")), * arrayList( tag(qname(API, "address"), body ) ), * (f, l, a) -> new Person(f, l, a), Person::getFirstName, Person::getLastName, Person::getAddress ); ``` - reading with STaX: `Person p = stax.parse("
...
", proto.reader()).findFirst().get();` --- # Our birthday example ``` Protocol
date(QName tagName) { return tag(tagName, tag(qname("year"), body.as(INTEGER)), tag(qname("month"), body.as(INTEGER)), tag(qname("day"), body.as(INTEGER)), LocalDate::of, LocalDate::getYear, LocalDate::getMonthValue, LocalDate::getDayOfMonth ); } Protocol
proto = tag(qname(API, "person"), attribute(qname("firstName")), attribute(qname("lastName")), arrayList( tag(qname(API, "address"), body ) ), * date(qname("birthday")), Person::new, /* ... */ ); ``` - But why limit ourselves to STaX or Jackson's thread-blocking model? --- # Introduction to Akka streams - The challenge: - We all know how to `while (inputStream.read(buf) > 0) { ... }` - We _also_ all know that blocking on a stream is a waste of thread resources - Take existing streaming concept, make it asynchronous, add in some type safety - ** Akka streams ** - An asynchronous graph processing framework - `Source
` emits elements of type T - `Flow
` consumes elements of type T, emits elements of type U - `Sink
` consumes elements of type T .mermaid[graph LR source -->|T| flow flow -->|T| sink ] - Materialization - Streams API create a _blueprint_ for streams - Materialization actually runs a stream, returning instance of `M` --- # Introduction to Akka streams - Print a number once every second - without having 10000000 `Integer` instances in memory ``` Flow
delay = Flow.
create().throttle(1, Duration.of("1 second"), 1, ThrottleMode.Shaping); Sink
> print = Sink.foreach(i -> System.out.println(i)); Source.range(0, 100000000) // Source
.via(delay) // Source
.to(print) // RunnableGraph
> .run(materializer); // CompletionStage
``` .mermaid[graph LR range -->|Integer| delay delay -->|Integer| print ] --- # Introduction to Akka streams ``` Flow
delay = Flow.
create().throttle(1, Duration.of("1 second"), 1, ThrottleMode.Shaping); Sink
> print = Sink.foreach(i -> System.out.println(i)); * Flow
toBytes = Flow.fromFunction(i -> * ByteString.fromArray(String.valueOf(i).getBytes())); Sink
> file = FileIO.toFile(Paths.get("out.txt")); Source.range(0, 10000) // Source
.via(delay) // Source
* .alsoTo(print) // Source
* .via(toBytes) // Source
* .to(file) // RunnableGraph
> .run(materializer); // CompletionStage
``` .mermaid[graph LR range -->|Integer| delay delay -->|Integer| print delay -->|Integer| toBytes toBytes -->|ByteString| file ] --- # Structured data as streams - Let's try to represent XML, JSON or CSV as a stream - Not entirely new: `@see org.xml.sax.DocumentHandler` (SAX 1) - Then came SAX2, but more recent STaX: [`javax.xml.stream.events.XMLEvent`](http://docs.oracle.com/javaee/5/api/javax/xml/stream/events/XMLEvent.html) - No equivalent data type for JSON or CSV - Introduce types to indicate parser events - JSON: `JSONEvent` (start object, start array, declare field, string value, ...) - XML: `XMLEvent` (start element, attribute, text, ...) - CSV: `CSVEvent` (start line, start column, text, ...) - Now we can have parsers and generators: - Akka defines an immutable byte array as `ByteString` ``` T | Flow
| Flow
========================================================================= XMLEvent | AaltoReader.instance | StaxWriter.instance JSONEvent | ActsonReader.instance | JacksonWriter.instance CSVEvent | CSVParser.instance | CSVWriter.instance ``` --- # Tying in marshalling and unmarshalling - Use protocols as an Akka stream stage - Turn a `Protocol
` into a `Flow
` _(unmarshal)_ or a `Flow
` _(marshal)_ ``` Protocol
proto = /* ... */; Flow
parse = AaltoReader.instance; Flow
write = StaxWriter.instance; Flow
unmarshal = ProtocolReader.of(proto); Flow
marshal = ProtocolWriter.of(proto); Flow
handle = Flow.
create() .map(p -> p.withName(p.getName().toUpperCase())); ``` .mermaid[graph LR in -->|ByteString| parse parse -->|XMLEvent| unmarshal unmarshal -->|Person| handle handle -->|Person| marshal marshal -->|XMLEvent| write write -->|ByteString| out ] --- # Implementation progress - Format-specific combinators for structural features of [XML](https://github.com/Tradeshift/ts-reaktive/blob/master/ts-reaktive-marshal/src/main/java/com/tradeshift/reaktive/xml/XMLProtocol.java), [JSON](https://github.com/Tradeshift/ts-reaktive/blob/master/ts-reaktive-marshal/src/main/java/com/tradeshift/reaktive/json/JSONProtocol.java) and [CSV](https://github.com/Tradeshift/ts-reaktive/blob/master/ts-reaktive-csv/src/main/java/com/tradeshift/reaktive/csv/marshal/CsvProtocol.java) - Format-agnostic combinators - Java collection bindings (`ArrayList`, `Optional`, ...) - Javaslang collection bindings (`Seq`, `Option`, ...) - Scala bindings (in development) - Selectively process parts of a stream: `ProtocolFilter` ``` public static
Flow
filter(ReadProtocol
selector, Graph
, ?> target) ``` - Validate an XSD schema: `SchemaValidatorFlow` --- # A few complete examples - Convert XML properties file from `
` to `
bar
` ``` import static com.tradeshift.reaktive.xml.XMLProtocol.*; ReadProtocol
> in_proto = anyTag( // Allow any root tag tag(qname("property"), // Emit tuple per
attribute("name"), attribute("value") ) ); WriteProtocol
> out_proto = anyTagWithBody.asLocalName(); Source
> in = StreamConverters.fromInputStream(() -> System.in); Source
> in_xml = in.via(AaltoReader.instance); Source
, CompletionStage
> elements = in_xml.via(ProtocolReader.of(in_proto)); Source
> out_xml = elements.via(ProtocolWriter.of(out_proto)); Source
> out_bytes = out_xml.via(StaxWriter.flow()); Sink
> out = StreamConverters.fromOutputStream(() -> System.out); out_bytes.runWith(out, materializer).thenAccept(ioResult -> { system.terminate(); }); ``` --- # A few complete examples - Convert XML properties file from `
` to `
bar
` ``` StreamConverters .fromInputStream(() -> System.in) // Source
> .via(AaltoReader.instance) // Source
> .via(ProtocolReader.of( // Source
, anyTag( // CompletionStage
> tag(qname("property"), attribute("name"), attribute("value") ) ) )) .via(ProtocolWriter.of(anyTagWithBody.asLocalName())) // Source
> .via(StaxWriter.flow()) // Source
> .to(StreamConverters.fromOutputStream(() -> System.out)) // RunnableGraph
> .run(materializer).thenAccept(ioResult -> { system.terminate(); }); ``` --- # A few complete examples - Find cheapest offer in battle.net JSON auctions ```json { "realm":{"name":"Aegwynn","slug":"aegwynn"}, "alliance":{"auctions":[ {"auc":234421082,"item":3821,"owner":"???","ownerRealm":"???", "bid":84500,"buyout":90000,"quantity":1,"timeLeft":"VERY_LONG","rand":0,"seed":385088896}, {"auc":234242934,"item":72093,"owner":"???","ownerRealm":"???", "bid":399999,"buyout":399999,"quantity":20,"timeLeft":"VERY_LONG","rand":0,"seed":517959808}, {"auc":234278691,"item":74863,"owner":"???","ownerRealm":"???", "bid":13750,"buyout":15000,"quantity":1,"timeLeft":"VERY_LONG","rand":0,"seed":1789946496}, [...many more megabytes...] ``` --- # A few complete examples - Find cheapest offer in battle.net JSON auctions ``` import static com.tradeshift.reaktive.json.JSONProtocol.*; public static class Auction { public final long id; public final long item; public final long bid; public Auction(long id, long item, long bid) { this.id = id; this.item = item; this.bid = bid; } public static final ReadProtocol
proto = object( field("auc", longValue), field("item", longValue), field("bid", longValue), Auction::new ); } ``` --- # A few complete examples - Find cheapest offer in battle.net JSON auctions ``` Map
cheapest = new HashMap<>(); FileIO.fromFile(new File("auctions.json")) .via(ActsonReader.instance) .via(ProtocolReader.of(object( field("alliance", object( field("auctions", array(Auction.proto) ) ) ) ))) .runWith(Sink.foreach(auction -> { if (!cheapest.containsKey(auction.item) || cheapest.get(auction.item).bid > auction.bid) { cheapest.put(auction.item, auction); } }), materializer).thenAccept(done -> { // TODO: output the cheapest alternatives as JSON? cheapest.forEach((id, auction) -> System.out.println(id + " -> " + auction.bid)); system.terminate(); }); ``` --- # Huge XML document processing ```xml
IVAD1dOYeBI5rdxktUgQvdn/NL8idKirI35p2DhD[...]
[...many more attachments with base64 crap...>
Cats
120
``` - Extract each attachment into an S3 key - Replace the `
` content with just a reference URI - Store the resulting root XML document into S3 as well --- # Huge XML document processing .mermaid[graph LR in(ByteString) xml(XMLEvent) in --> xml split(Split on attachment) xml --> split att1(Attachment 1) split --> att1 b1(Base64 decode) s31(Upload to S3) ref1(Replace with ref) att1 --> b1 b1 --> s31 att1 --> ref1 att2(Attachment 2) s32(Upload to S3) ref2(Replace with ref) b2(Base64 decode) att2 --> b2 b2 --> s32 att2 --> ref2 split --> att2 body(Body) split --> body join(concat) ref1 --> join ref2 --> join body --> join out(ByteString) join --> out outs3(Upload to S3) out --> outs3 ] - Proof of concept implements the graph above -- - Able to process 1000 concurrent requests of 100MB each - At a rate of a few seconds per document (averaged) - On a 256MB heap - With 8 threads --- # Wrap-up - Nice properties - Compile-time traceability - Composable - Re-use, even across XML, JSON and CSV - Extensible through simple interfaces - No magic (no annotations / reflection / AOP) - Testable - Want to get involved? - Get: [https://github.com/Tradeshift/ts-reaktive/](https://github.com/Tradeshift/ts-reaktive/) (`ts-reaktive-marshal-*` modules) - Talk: [https://github.com/akka/alpakka/pull/320](https://github.com/akka/alpakka/pull/320) - Chat: [https://gitter.im/akka/akka](https://gitter.im/akka/akka) --- class: center, middle # Extra slides --- # Cores and caches - Latency Comparison Numbers ```text L1 cache reference 0.5 ns Branch mispredict 5 ns L2 cache reference 7 ns Mutex lock/unlock 25 ns Main memory reference 100 ns Compress 1K bytes with Zippy 3,000 ns *Context switch 10,000 ns (if working set in L2) Send 1K bytes over 1 Gbps network 10,000 ns *Context switch >100,000 ns (bigger working set) Read 4K randomly from SSD* 150,000 ns Read 1 MB sequentially from memory 250,000 ns Round trip within same datacenter 500,000 ns Read 1 MB sequentially from SSD* 1,000,000 ns Disk seek 10,000,000 ns Read 1 MB sequentially from disk 20,000,000 ns Send packet CA->Netherlands->CA 150,000,000 ns ``` - Keep working on the same data for as long as possible