CompletableReactor framework makes it easier to create business flows that have concurrently running parts and complex execution branching.
CompletableReactor provides DSL-like builder-style API to describe business flows and visualization plugins that parse code and displays it as an execution graph. CompletableReactor follows code-first approach when developer writes code and IDE visualizes it in plugin window and provides ability to navigate from graph to code and backward.
Framework built on top of Fork Join Pool and CompletableFuture API.
Different JVM based languages use extension to support more suitable DSL-like API.
Currently supported DSLs are for Java
and Kotlin
languages.
Motivation
CompletableFuture API
with ForkJoinPool
provides ability to write asynchronous code. But sometimes business logic not as straight forward as we would like.
This leads to complex thenApply
/thenCompose
CompletableFuture chains that hard to read and maintain. Kotlin
suspend
methods and coroutines
slightly simplifies sequential chains, but they are still fail to clarify complexity of
unobvious conditional branching with scenarios executed in parallel.
Complex business logic with lots of branching and concurrently executing parts hard to describe using regular coding approach without proper visualization. One of ways to represent that branching is a graph with nodes and transitions. CompletableReactor API tries to keep API as simple and possible, saves benefits of statically typed languages and in same time provides ability for fast code parsing and visualization the execution flow.
Project goals are:
-
Tool that can visualize complex concurrent code in simple and self-documented manner and can provide search and navigation capabilities.
-
Code first approach: write concurrent code and then visualize/navigate through it in your favorite IDE
-
Concise easy to learn DSL that enables fast source code parsing for visualization purpose
-
Tracing, profiling, performance monitoring, detailed information in exceptional cases out of the box.
-
Performance and lightweight
Getting Started
Add CompletableReactor dependencies into your project.
Install plugin into your IDE or standalone viewer
Write simple graph application
//
// In given example flight ticket purchase process implemented as a reactor graph.
// Payload contains request, response and intermediate data for computation.
//
@Data
class BuyFightTicketPayload {
@Data
@Accessors(chain = true)
class Request {
String destination;
String name;
Integer age;
}
@Data
class IntermediateData {
BigDecimal price;
}
@Data
class Response {
String operationResult;
}
final Request request = new Request();
final IntermediateData intermediateData = new IntermediateData();
final Response response = new Response();
}
//
// All graph classes extends base Graph<Payload>
//
public class BuyFlightTicketGraph extends Graph<BuyFightTicketPayload> {
//
// During execution graph uses external async services as a building blocks
// to create complex business process.
//
SalesDepartment salesDepartment = new SalesDepartment();
Bank bank = new Bank();
EmailClient emailClient = new EmailClient();
FlightPlanner flightPlanner = new FlightPlanner();
//
// Enum values identifies transitions in graph
//
enum Flow {
DENY_PURCHASE,
SEAT_RESERVED,
SUCCESS_WITHDRAW
}
//
// Vertex represent step in business process.
// Vertex encapsulates async method invocation and merging result of invocation
// into payload.
//
Vertex askForPrice =
handler(
payload -> salesDepartment.calculateCurrentPrice(payload.request.destination)
).withMerger((payload, currentPrice) -> {
payload.intermediateData.price = currentPrice;
});
Vertex reserveSeat =
handler(
payload -> flightPlanner.reserveSeat()
).withRoutingMerger((payload, isSeatReserved) -> {
if (!isSeatReserved) {
payload.response.operationResult = "Seat reservation failed";
return Flow.DENY_PURCHASE;
} else {
return Flow.SEAT_RESERVED;
}
});
Vertex withdrawMoney =
handler(
// Withdraw money from user account to purchase flight ticket
payload -> bank.withdrawMoney(payload.intermediateData.price)
).withRoutingMerger(
//# Is withdraw successful?
(payload, withdrawSuccessful) -> {
if (withdrawSuccessful) {
payload.response.operationResult = "Successful purchase for " +
payload.intermediateData.price;
return Flow.SUCCESS_WITHDRAW;
} else {
payload.response.operationResult = "Money withdraw failed";
return Flow.DENY_PURCHASE;
}
});
Vertex sendDenyEmail =
handler(
payload -> emailClient.sendEmail("Sorry, can not purchase a ticket.")
).withoutMerger();
Vertex sendSuccessEmail =
handler(
payload -> emailClient.sendEmail("Congratulations, you have purchased a ticket.")
).withoutMerger();
//
// To build graph we join vertices with transitions.
// Some transitions could be conditional.
//
{
payload()
.handleBy(askForPrice)
.handleBy(reserveSeat);
reserveSeat
.on(Flow.DENY_PURCHASE).complete()
.on(Flow.SEAT_RESERVED).mergeBy(askForPrice);
askForPrice
.onAny().handleBy(withdrawMoney);
withdrawMoney
.on(Flow.SUCCESS_WITHDRAW).handleBy(sendSuccessEmail)
.on(Flow.DENY_PURCHASE).handleBy(sendDenyEmail);
sendSuccessEmail
.onAny().complete();
sendDenyEmail
.onAny().complete();
//
// coordinates updated automatically by IDE plugin
//
coordinates()
.pd(89, -126)
.vx(askForPrice, 219, -60, 205, 45)
.vx(reserveSeat, 18, -60, 90, 12)
.vx(sendDenyEmail, 6, 304, 54, 368)
.vx(sendSuccessEmail, 248, 309, 312, 369)
.vx(withdrawMoney, 116, 145, 113, 218)
.ct(reserveSeat, -67, 66)
.ct(sendDenyEmail, 47, 420)
.ct(sendSuccessEmail, 308, 419);
}
}
//
// Single instance of completable reactor created for application.
// Graph registered withing reactor.
// Payload submitted to reactor and received as a result of computation.
//
public static void main(String[] args) {
CompletableReactor completableReactor = new CompletableReactor(new AggregatingProfiler());
completableReactor.registerGraph(new BuyFlightTicketGraph());
BuyFightTicketPayload payload = new BuyFightTicketPayload();
payload.request
.setAge(30)
.setName("John Smith")
.setDestination("New York");
CompletableFuture<BuyFightTicketPayload> future = completableReactor.submit(payload).getResultFuture();
BuyFightTicketPayload resultPayload = future.join();
System.out.println("Result: " + resultPayload.response.operationResult);
}
//
// Simple implementation of external async services
// that used by the graph.
//
class SalesDepartment {
CompletableFuture<BigDecimal> calculateCurrentPrice(String destination) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("SalesDepartment: calculate current price for " + destination);
return BigDecimal.valueOf(12.0);
});
}
}
class Bank {
CompletableFuture<Boolean> withdrawMoney(BigDecimal amount) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("Bank: withdraw money: " + amount);
return true;
});
}
}
class EmailClient {
CompletableFuture<Void> sendEmail(String message) {
return CompletableFuture.runAsync(() -> System.out.println("EmailClient: " + message));
}
}
class FlightPlanner {
CompletableFuture<Boolean> reserveSeat() {
return CompletableFuture.supplyAsync(() -> {
System.out.println("FlightPlanner: reserve seat");
return true;
});
}
}
Application output:
SalesDepartment: calculate current price for New York
FlightPlanner: reserve seat
Bank: withdraw money: 12.0
EmailClient: Congratulations, you have purchased a ticket.
Result: Successful purchase for 12.0
View visual representation of the Graph
-
Display visual graph directly from code by
Ctrl+R
shortcut(Tools→ReactorGraph)
-
Navigate from graph to code by double-clicking on the nodes
-
Read graph description in nodes menu generated directly from code comments
Monitor graph execution
-
Monitor graph and individual vertex performance
-
Trace how graph execute through individual vertex by enabling tracing for particular payloads
-
Check reactor reports about vertices that executed too long, did not complete on timeout or hang
Completable Reactor Concept
Describes concept behind CompletableReactor. Explains how we can decompose concurrent conditional execution into base graph components. Shows how reactor components could be described in code and how they visualized.
To describe and idea behind CompletableReactor let’s start with simple model and then evolve it step by step.
Simple sequential model
This model consists of two base elements: Payload and Vertex. Payload is a simple POJO. Vertex is active element of model that takes Payload, execute business logic based on that Payload. Then vertex execute computation and stores computation result inside this Payload. Then vertex passes this Payload to the next Vertex. All Vertices linked in chain one after another. Modification of Payload is optional for Vertex.
In given example we have two vertices and single payload object. Payload consists of two fields: x and result. MultiplyVertex reads x from payload, multiplies it by 2 and stores computation result in result field. StdoutVertex does not modify incoming payload and simply prints result field to stdout.
Sequential asynchronous handler-merger model
Now lets make our vertices work asynchronously.
We will split computation logic of vertices in two parts.
First one, called Handler
, will read input data from payload and perform computation.
Second one, called Merger
, will store computation result inside payload.
Handler will be asynchronous function and Merger will be synchronous.
Here is an example how to split MultiplyVertex logic to handler and merger functions: handler will read
argument x from payload, then asynchronously perform multiplication and return CompletableFuture of this operation.
Merger will get result of handler function as an argument and will store it inside payload in result filed.
CompletableFuture<Integer> handle(Payload payload){ return CompletableFuture.supplyAsync(()->{ return payload.getX() * 2; }); } void merge(Payload paylaod, int result){ paylaod.setResult(result); }
Visual representation:
Visual representation of such computation contains several items:
-
Handler bar that represent asynchronous computation of handler function
-
Merger circle that represent synchronous computation of merger function
-
Transition between handler and merger that carries two objects: payload and computation result of handler function.
-
Transition that goes into Handler and carries single object - Payload.
-
Transition that goes out of Merger and carries single object - Payload.
-
Payload object that goes into Handler, then to Merger then out of Merger.
Now we can simplify visual notation and hide implicit Payload and handler function result.
Parallel handler-merger model
Lets update Merger and allow it to have several outgoing transitions. When merger function completes merging process it will send reference to same Payload instance through all of outgoing transitions in parallel.
In given example origin Payload and vertex Result goes from Handler to Merger. Then Merger modifies Payload and puts result inside it. Then Merger sends this Payload instance that contains result 20 to all outgoing transitions.
Now we need another feature in Merger - an ability to join two incoming transitions into single outgoing. When Merger have two incoming transitions: one from Vertex that carries Payload with handler Result and another with Payload only, Merger will chose Result from first transition and will merge it to Payload that it received from second transition. For activation Merger has to wait for both incoming transitions.
In given illustration there are two incoming transitions into Merger. First incoming transition carries Payload and computation result of Handler2 - value 42. Second incoming transition carries ony Payload. Merger takes reference to Payload and Result from first transition. Merges them together by merger function and passes Payload to outgoing transition.
Now we are ready to make big step to parallel execution.
As an example lets implement purchasing process.
Suppose that customer with identifier userId
wants to buy product with identifier item
.
During purchase process we have to reserve money on users account and reserve product in our storage.
BillingService
will provide asynchronous method to reserve money. And StorageFacility will provide asynchronous method to reserve product in the storage.
We can require reservation in parallel in both services and then if both services successfully performed reservation we can return answer to the user which purchase accomplished successfully.
Plain class Purchase
will serve as payload object in our model.
class Purchase{ Long userId; Long item; Boolean moneyReserved; Boolean productReserved; Boolean result; }
userId
and item
will identify customer and requested product. moneyReserved
and productReserved
fields will keep information about reservation status returned from BillingService and StorageFacility.
Lets visualize execution graph and discuss execution steps in detail.
As a first step we create Purchase payload and populate userid
and item.
Suppose that first Merger at the top simply sends this Payload into two places in parallel:
to BillingService Vertex and StorageFacility Vertex.
StorageFacility runs reservation logic and send via outgoing transitions two objects:
origin Purchase payload and product reservation status.
BillingService runs reservation logic and send via outgoing transition two objects:
origin Purchase payload and money reservation status.
Left Merger that belongs to BillingService Vertex waits for BillingService to complete.
After that Merger takes money reservation status and puts in into Purchase payload
(by using it’s merger function) and sends this Payload through outgoing transition to Merger
on right side of the graph.
//BillingService Vertex Merger void merge(Purchase paylaod, BillingServiceStatus billingStatus){ if(billingStatus == RESERVED){ paylaod.setMoneyReserved(true); } }
Right Merger that belongs to StorageFacility Vertex waits two incoming transitions: with StorageFacility
result and with Payload from BillingService.
Then it takes StorageFacility product reservation status and puts it into Payload that came from BillingService processor.
This payload already contains information about BillingService operation status. After that Merger checks both fields: Purchase.moneyReserved
and Purchase.productReserved
.
If both fields are true, Merger sets Purchase.result
to true. This means that purchase accomplished
successfully.
This process is done by using merge function of StorageFacility MergePoint.
//StorageFacility Merger void merge(Purchase paylaod, StorageFacilityStatus storageStatus){ if(storageStatus == RESERVED){ paylaod.setProductReserved(true); } if(paylaod.getMoneyReserved() && payload.getProductReserved()){ payload.setResult(true); } else { payload.setResult(false); } }
Then StorageFacility Merger sends Payload with BillingService and StorageFacility results through outgoing transition at the end of the given graph.
Now we can simplify visualisation:
Handler-merger model with conditional transitions
We almost there. Fasten seat belts. Lets enrich our Merger with last feature: conditional transitions:
Each outgoing transitions now have enum value associated with it. In graph this enum value illustrated as text label near arrows. Merger function signature is changed too. Now merger should return enum that will control which outgoing transition will be activated and which is not.
Enum merge(Payload payload, HandlerResult handlerResult) {...}
When Merger is activated by incoming transition it evaluates merger function and checks merger result. After that Merger marks all outgoing transitions which associated enum values does not match merger function result as dead and deactivates them. Also Merger activates all outgoing transitions which associated with enum values that matches merger function result. If there are two or more outgoing transitions that matches enum function result then all of them activates and runs in parallel. In given illustration we can control how graph will execute. If Merger function return FIRST then two outgoing transitions will run in parallel. If Merger function return SECOND then only single transition will run.
It is important to mention that when at least one of incoming transition of Merger is marked as dead then Merger itself marked as dead, it does not execute and simply marks all outgoing transitions as dead too. When Handler have several incoming transitions and one of them is marked as dead then there is nothing happens with Handler. Only if all incoming transitions marked as dead Handler dies too and marks outgoing transition as dead. In other worlds Handler waits result of all incoming transitions. If all incoming transitions dies - Handler dies. If single incoming transition survives - Handler executes. If there are more that one active incoming transitions survives - Handlers rise an error. This is inconsistent graph configuration. Merger in same manner waits result of all incoming transitions. But if any of incoming transitions dies - Merger dies too. To execute Merger should have all incoming transitions to be alive.
In given example if Merger returns FIRST then:
-
Vertex1 and Vertex2 execute in parallel, Vertex3 dies.
-
Then runs Merger1 and Merger3 dies.
-
Then runs Merger2
-
Then Vertex4 executes.
-
Then runs Merger4
If Merger returns SECOND then:
-
Vertex1 and Vertex2 dies. Vertex3 executes.
-
Then runs Merger3 and Merger1 with Merger2 dies.
-
Then Vertex4 executes.
-
Then runs Merger4.
StartPoint and EndPoints
The difficult part is over. Now let discuss how to start execution and how to stop it. StartPoint specify position where Payload start it trip over graph. There is only one StartPoint for each graph. EndPoints defines places where execution of the graph is stops and current Payload is returned as a graph computation result.
In given illustration:
-
Graph execution starts at StartPoint.
-
Then Payload goes to Vertex1 and Vertex2 in parallel.
-
Then Merger1 executes.
-
Then Merger2 executes.
-
Then if Merger2 returns FIRST graph execution stops at left EndPoint
-
Otherwise if Merger2 returns SECOND execution continues
-
Vertex3 with Merger3 runs sequentially and after that graph execution stops ant bottom EndPoint
Completable Reactor Runtime
Completable Reactor Runtime enriches Handler-Merger execution concept with new components and determines its execution on JVM.
Payload and StartPoint
Payload is a plain old java object that encapsulates request, response and intermediate computation data required for request processing. CompletableReactor receives payload as an argument. Executes business flows, modifies payload during execution and returns it as a result.
class Purchase{ //request parameters //provided by user during graph launching Long userId; Long productId; //intermediate data, used to pass information //from one vertex to another vertex Boolean moneyReservationStatus; Boolean productReservationStatus; //results //that user receives when graph execution is complete Boolean purchaseStatus; }
We can add little structure to payload to clarify purpose of each field and make our graph code more readable:
class Purchase{ Request request; Intermediate intermediate; Response response; Purchase(Long userId, Long productId){ this.request = new Request(userId, productId); } static class Request{ Long userId; Long productId; Request(Long userId, Long productId){ this.userId = userId; this.productId = productId; } } static class Intermediate{ Boolean moneyReservationStatus; Boolean productReservationStatus; } static class Response{ Boolean purchaseStatus; } }
To launch graph user should create Payload instance and submit it to reactor:
//launch graph Execution<Purchase> execution = completableReactor.submit(new Purchase(107, 42)); //wait for graph to complete Purchase purchase = execution.getResultFuture().get(); //check result System.out.print( purchase.getResponse().getPurchaseStatus() );
Start point visualization:
To attach vertices to StartPoint you can use payload()
builder method of graph.
class PurchaseGraph extends Graph<Purchase>{ Vertex reserveMoney = ... Vertex reserveProduct = ... { payload() .handleBy(reserveMoney) .handleBy(reserveProduct); } }
Vertices
Vertex is a visual graph item that represent a handler and optional merger invocation. Vertex encapsulates coupled business logic that can be reused in different branches. There are three types of vertices:
-
Handlers with or without Mergers
Vertex vx = handler(...).withMerger(...); Vertex vx = handler(...).withRoutingMerger(...); Vertex vx = handler(...).withoutMerger();
-
Subgraphs
Vertex vx = subgraph(...).withMerger(...); Vertex vx = subgraph(...).withRoutingMerger(...); Vertex vx = subgraph(...).withoutMerger();
-
Routers and Mutators
Vertex vx = router(...); Vertex vx = mutator(...);
Handlers and Mergers
Handler is an asynchronous function that takes information from Payload and returns computation result.
Handler implements asynchronous part of business logic of the execution flow.
It could be reused in different flows branches several time.
Handler MUST NOT
change Payload because same instance of Payload passed to other handlers in parallel.
Is is ok to concurrently read payload from several handlers but not to modify payload.
The only point of payload modification is Merger.
Vertex reserveMoney = handler( // Reserve required amount of money for the purchase in the bank payload -> bank.reserve(payload.getAccountId(), payload.getAmount()) ) ...;
Merger is a synchronous function that takes Handlers computation result and uses it to update Payload. It is safe to modify payload within merger.
There are several types of Mergers: RoutingMerger, Merger and EmptyMerger.
With Routing Merger
Routing Merger can change Payload and should always return enum value that defines outgoing transition. Graph will continue to execute by this transition returned from RoutingMerger function. In given example we send request to bank. Then we saves reservation result in payload and return transition value depending on reservation result. Graph will continue to execute by transition that will be selected based on enum value returned by RoutingMerger.
Vertex reserveMoney = handler( // Reserve required amount of money for the purchase in the bank payload -> bank.reserve(payload.getAccountId(), payload.getAmount()) ).withRoutingMerger((payload, result) -> { if(result.getStatus() == OK){ payload.moneyReserved = true; return RESERVED; } if(result.getStatus() == NO_MONEY){ payload.moneyReserved = false; payload.reservateionFailReason = NO_MONEY; return NO_MONEY; } else { payload.moneyReserved = false; payload.reservateionFailReason = OTHER_REASON; return OTHER_REASON; } }); }
With Merger
If the only thing you need is to modify Payload and unconditionally continue to execute graph by the next Vertex, then you need to use Merger. Merger works in a same way as RoutingMerger but it is not required for Merger to return any value. In given example we send notification and save it’s result in payload. But graph execution path does not depend on notification result.
Vertex sendNotification = handler( payload -> notificatior.sendNotification( payload.getUserId(), "You are purchasing " + payload.getServiceTitle()) ).withMerger((payload, result) -> { if(result.getStatus() == OK){ payload.isNotificationSent = true; } }); }
Without Merger
Last option with mergers is not to use them at all and not to wait for async operation completeness. Handler will be executed in parallel with other vertices but will not trigger any outgoing transitions because such vertex does not have any.
Vertex fireStatisticEvent = handler( payload -> statistic.fireEvent( new PurchaseEvent( payload.getUserId(), payload.getServiceId()) ) ).withoutMerger(); } ... businessAction .onAny().handleBy(fireStatisticEvent); //no outgoing transitions for fireStatisticEvent
Connect vertices into graph
No we can connect described vertices into graph.
{ payload() .handleBy(reserveMoney) .handleBy(sendNotification) .handleBy(fireStatisticEvent); sendNotification .onAny().mergeBy(reserveMoney); reserveMoney .on(RESERVED).handleBy(trySendEmail) .on(NO_MONEY).complete() .on(OTHER_REASON).complete(); trySendEmail .onAny().complete() }
Transition
Transition is a Enum instance that represent jumps between graph items during flow execution.
RoutingMerger returns instance of Enum.
Outgoing transition will be activated according to this value.
If merger returns status PLAN_B, then all outgoing transitions with condition status PLAN_B
will be activated and all transitions without PLAN_B status will be deactivated
or marked as dead transition.
Unconditional transitions onAny
will be always activated regardless of the Routing Merger result.
EmptyMerger and Merger work as a RoutingMerger that returns internal default merge status that
could participate only in onAny
transitions.
enum MyTransitions{ ONE_WAY, ANOTHER_WAY } { vx1 .on(ON_WAY).handleBy(vx2) .on(ANOTHER_WAY).handleBy(vx3); vx2 .onAny().handleBy(vx4); }
-
handleBy
transition connect Merers, Routers and Mutators to Handlers, Subgraphs, Routers and Mutators. -
mergeBy
transition connect Merers, Routers and Mutators to Mergers.
EndPoint
EndPoint is a visual graph item that indicates end of flow execution. When graph execution reaches EndPoint then all transitions marked as terminated and CompletableReactor immediately returns graph result. The only exception is Handlers and Subgraphs without mergers. They will continue to execute but their execution result will be ignored.
Execution<Purchase> execution = reactor.submit(new Purchase(100, 42)); //chain execution future completes when all vertices, including //Handlers and Subgraphs without mergers are complete and graph reached EndPoint. execution.getChainExecutionFuture() //chain result future completes when graph reaches EndPoint execution.getResultFutureFuture()
Yuy can attach EndPoint to any of transition withing reactor graph.
vx3.onAny().complete()
Subgraph
Subgraph - is a Vertex that allows to invoke one graph from another. It simply submits constructed child Payload to CompletableReactor. Subgraph has implicit Handler that takes Payload as argument and returns Payload as computation result. Subgraphs allows us to reuse graphs.
Vertex bonusPurchase = subgraph( Purchase.class, paylaod -> new Purchase(payload.userId, payload.bonusServiceId) ).withMerger(...)
Routers and Mutators
-
Router is a synchronous function that works as RoutingMerger. It can modify payload. Outgoing transition activated depending ot Router result.
-
Mutator works as Merger. It can only modify payload and does not return any value.
Vertex vx1 = router( paylaod -> { payload.data = ...; return ONE_WAY; }); Vertex vx2 = mutator( paylaod -> { payload.data = ...; });
Merger decision
If all of outgoing transitions from Merger have distinct statuses then flow will continue to execute only one of transitions.
If Merger will return FAIL status then execution completes immediately. In case of FIRST status flow will continue and Handler2 will be invoked. In case of SECOND status - Handler3 will be invoked. In given example there could be three options:
-
Payload → Handler1 → Merger1 → End (if merger returns FAIL status)
-
Payload → Handler1 → Merger1 → Handler2 → Merger2 → End (if merger returns FIRST status)
-
Payload → Handler1 → Merger1 → Handler3 → Merger3 → End (if merger returns SECOND status)
At first payload goes to Handler1. Then framework waits when CompletableFuture returned by Handler1 completes. After that Merger will be invoked and Payload with Handler1 result will be passed to this merger. Merger will check Handler1 result, modifies Payload if needed and returns one of statuses:
-
FAIL
-
FIRST
-
SECOND.
After that execution will continue along with one of transitions:
-
FAIL leads to END
-
FIRST - to Handler1
-
SECOND - to Handler3.
Parallel execution
If two transitions have same condition Status then flow will continue execution in parallel and both transitions will be activated.
In given example there could be three options:
-
merger1 returns FAIL status:
Payload → Handler1 → Merger1 → End -
merger1 returns OK, merger3 returns FAIL:
Payload → Handler1 → Merger1 → Handler2, Handler3 → Merger3 → End -
merger1 returns OK, merger3 returns OK:
Payload → Handler1 → Merger1 → Handler2, Handler3 → Merger3 → Merger 2 → continue
CompletableReactor will wait until result of Handler1 is ready. Then it sends Handler1 result to Merger1. Merger1 analyzes Handler1 result, mutates Payload instance if needed and then returns merge status. If this status is FAIL then flow execution stops. If this status is OK then framework launches Handler2 and Handler3 in parallel. Merger2 have two incoming transitions. It will wait until all of them is complete and only then will launch merging function. Suppose Handler2 will finish first. Merger2 will have to wait for second incoming transition from Merger3. When Handler3 result is complete, Merger3 will be executed. If Merger3 returns FAIL then all flows stops. If Merger3 returns OK then transition from Merger3 to Merger2 activates. Then Merger2 executes. This will leads us to deterministic order of Payload modification and Processors execution. Handlers could be launched in parallel and provided with same reference to Payload. But no two mergers can runs concurrently in this configuration of the graph. Payload always modified by mergers graph sequentially. In general sequential modification of payload is not necessary. We can build graph with two concurrently running Mergers. In that case payload will be modified by these Mergers concurrently.
Detached Handler without Merger
Some times we are not interesting in computation result of Handler. We simply need to invoke async method and does not care about it’s result. It could be Notification service that sends message to external system and current flow business is not depend on notification result.
We passing Payload to Hanlder6 by reference. In that case there could be concurrent reading data by Handler6 and data reading or modification in one of Vertices that will be triggered after Merger2 is complete. To exclude problems with concurrency Handler6 should only read and use data that will not be modified by last part of the graph. Or Handler6 could simply copy data and continue to work with local copy. Merger2 will launch Handler6 function first, and only after receiving CompletableFuture of computation result of Hanlder6 Merger2 will trigger outgoing transition to Merger1. That behaviour will secure Handler 6 invocation and allows it to safely read payload without expecting of concurrent modification of payload by Merger1.
Vertex handler6 = handler( payload -> { // It is safe to read data from payload even if they are going to be modified by Merger1 // Merger1 will be invoked after this method returns CompletableFuture result. MutableData data = payload.getMutableData().clone() CompletableFuture<ServiceResult> result = asyncService.doAction(data); // After we return result Merger1 will be invoked. // But asyncService will continue to run with it's own copy of MutableData. return result; } ).withoutMerger(); }
We can avoid this difficulties if we will follow simple rule: Populate Payload with new data and handlers results and do not modify this results if you are using them in detached handlers.
Vertex handler6 = handler( // It is safe to pass data without copying if we know That // now vertices after current one in the graph will not try to modify this data. // Or even better: when Data is simply immutable. payload -> asyncService.doAction(paylaod.getData()); ).withoutMerger(); }
Detached Handler with Merger
Some times we want to launch Handler execution in parallel with main flow and we interested in result. In that case we also have to pass copy of Payload data to async function of Handler6 or use immutable arguments to prevent concurrent reading by Handler6 async function and data modification by other Vertices of the graph like we did in Detached Handler without Merger scenario. Also we have to use Merger to bring Handler6 result back to main flow.
Conditional execution
-
Handlers, Routers and Mutators works as logical OR
||
. Vertex waits for all incoming transition to complete. If some transitions marked as dead and there is at least one alive transition - vertex will run and send paylaod through outgoing transitions. Otherwise if all transitions are dead - vertex will not run. -
Mergers works as logical AND
&&
. Vertex waits for all incoming transition to complete. If at least one transition mareked as daed - vertex will not run. All outgoing transitions will be marked as dead. Otherwise if all incoming transition are alive - then vertex will run and send payload through outgoing transitions.
Lets discuss an example where we have two cases in our graph. First one: when we execute handler A, handler B and handler C in parallel. Second one: when we execute handler A, handler B and we do not need to run C at all. We can implement this behaviour through several approaches.
-
Simple approach with concurrent Mergers
-
Use conditional transition for vertex
C
. Mergers ofB
andC
executed concurrently.
-
-
Complex approach with sequential Mergers
-
Use conditional transition with optional handling and merging
-
Use conditional transition with vertex cloning
-
Always run vertex C. Use if-else logic inside handler
C
and simply do nothing in handler itself.
-
Lets discuss second example with parallel execution and conditions. Suppose that merger B depends on result of computation of A. Launching D depends on result of computation of A. It will be executed or not. In given configuration A and B launched in parallel. Merger B will wait for completion of merger A. D will execute only if merger of A return status REQUIRED_D.
-
If A returns REQUIRED_D
-
Merger B executes.
-
Vertex D executes.
-
Vertex C waits for all incoming transitions to complete.
-
Vertex C executes.
-
-
If A returns other status, e.g. NOT_REQUIRED_D
-
Merger B executes.
-
Vertex D does not execute. Outgoing transition from D marked as Dead.
-
Vertex C waits for all incoming transitions to complete.
-
Vertex C see that transition from D is dead. But there are active transition from B. Vertex C executes.
-
Vertex template function
It is possible to use a function as a vertex builder to create vertices with similar functionality. Builder function can have arguments and must be defined in same source file as graph itself.
Instead of creating two similar logging vertices buy copy-pasting code:
val successLog = handler( paylaod -> { if(paylaod.getAction().isTraceable()){ logEvent("success", payload.getAction(), paylaod.getUserInfo()) } } ).withoutMerger(); val failureLog = handler( paylaod -> { if(paylaod.getAction().isTraceable()){ logEvent("failure", payload.getAction(), paylaod.getUserInfo()) } } ).withoutMerger(); ... .on(SUCCESS).handleBy(successLog) .on(FAILURE).handleBy(failureLog)
you can use vertex template function instead:
Vertex loggingVertex(String message){ return handler( paylaod -> { if(paylaod.getAction().isTraceable()){ logEvent(message, payload.getAction(), paylaod.getUserInfo()) } } ).withoutMerger(); } val successLog = loggingVertex("success"); val failureLog = loggingVertex("failure"); ... .on(SUCCESS).handleBy(successLog) .on(FAILURE).handleBy(failureLog)
Validation
It is possible to configure graph in a wrong way. For example we can forgot to specify endpoints for one of branches that could lead to situation when graph execution could not complete at all. To prevent this, during graph building process CompletableReactor apply validation procedures on Graph instance. Validators checks that graph is consistent, have correct configuration of nodes and transitions.
Metrics
Each graph and each vertex within graph is being profiled by aggregating profiler
All metrics names have common prefix, by default it is
ru.fix.completable.reactor.runtime.*
for example
ru.fix.completable.reactor.runtime.exc.AvailableSubscriptionsPayload.callsCountSum
There are four types of metrics:
-
pld.payload-name.* - span between two point of time: when we submitted payload into reactor and when result is completed.
-
exc.payload-name.* - some graphs contain detached nodes (see Detached Handler without Merger). In such cases graph execution is continues even after result completion. exc metric shows when whole graph execution is completes including such detached vertices.
-
hndl.vertex-name.* - async handler operation execution time span
-
mrg.vertex_name.* - sync merger, mutator or router operations time span
Each metric measure how many times vertex was invoked, how long it took to execute, etc. Check out details in aggregating-profiler metrics summary.
Intellij Idea Plugin
Completable Reactor Intellij Idea plugin provides graph visualization and source code navigation within IDE. You can jump to code using double click on graph item or by using context menu.
Best Practice and Code Convention
Paragraph explains handy rules that will keep your code clear.
Give your payloads and graphs similar names
It is good to follow same naming convention in the project.
This way it would be easy to find payloads that could be submitted to reactor
and graphs that implements their execution flows.
We can use ActionNamePayload
and ActionNameGraph
template for names.
Bad:
class BuyData {...} class PurchaseConfiguration extends Graph<BuyData> {...} class SubscriptonAction {...} class SubscriptonGraphDeclaration extends Graph<SubscriptonAction> {...}
Good:
class PurchasePayload {...} class PurchaseGraph extends Graph<PurchasePayload> {...} class SubscriptonPayload {...} class SubscriptonGraph extends Graph<SubscriptonPayload.> {...}
Give your payloads and graphs names that represent actions that graph implies
It is better to give graph a name that represent action.
Bad:
class SmsSenderPayload {...}
Good:
class SendSmsPayload {...}
Treat vertex declaration as a function
Do not wrap your logic into function if the only purpose of the function is to be invoked inside handler/merger block of a Vertex. If you are not reusing this function in other places - simply put code inside handler/merger lambda directly and give the Vertex same name as you would give to the function.
Bad:
public class PurchaseGraph extends Graph<PurchasePayload> { Vertex writeUserLog = handler( //# log ussd purchase payload -> writeUserLog(payload) ).withMerger((payload, result) -> { ... }); Vertex fireSubscribeAction = handler( //# fire purchase statistic event payload -> fireSubscribeAction(payload) ).withoutMerger(); private CompletableFuture<Reullt> writeUserLog(PurchasePayload p) { <purchase logging code> } private CompletableFuture<Void> fireSubscribeAction(PurchasePayload payload) { <statistc event firing code> } }
Good:
public class PurchaseGraph extends Graph<PurchasePayload> { Vertex writeUserLog = handler( //# log ussd purchase payload -> { <purchase logging code> } ).withMerger((payload, result) -> { ... }); Vertex fireSubscribeAction = handler( //# fire purchase statistic event payload -> { <statistc event firing code> } ).withoutMerger(); }
Give vertex a name as if it is a function
Vertex usually implies business logic that should be executed as a part of use case scenario. It would be better to give vertex a name that describes what action this vertex implements.
Bad:
Vertex userLogWriter = ...; Vertex statisticProcessor = ...;
Good:
Vertex writePurchaseToUserLog = ...; Vertex firePurchaseEventToStatistics = ...;
Do not decouple async request and result validation into two separate vertices over need
Vertex consist of two parts: handler and merger. Handler makes a request. Merger saves result in context and makes a decision where to go next. Most common pattern is to fetch data from remote store, validate it and move to another step. Or invoke remote service, check result of invocation and go to the next vertex depending on result.
The purpose of the Graph is to give hi level visual representation of the business flow. If you start to decouple invocation of remote service in one vertex and validation and routing into another - this will lead to overcomplicated visual schema with lots of vertices. That breaks main purpose of the graph - hi level visualization of flow.
In given example we have to actions:
-
fetchPayment - loads data from store
-
fetchStatusFromAbs - requests data from remote service
Bad: invocation and routing splitted into two separate vertices
Good: invocation and routing done in single vertex
This approach is applicable to the situation when decision of routing is tightly coupled with data fetched from remote store or result of service invocation.
If you are making several request and routing is based on many conditions and depends on several statuses from context - then it could be a good idea to have distinct routing point in a graph that do not invokes anything and simply make a decision and route execution to one of outgoing directions.
In Spring project declare your graph as a Bean
You can use different approaches in registering graphs in you application. Main thing is to be consistent about it. We suggest to use an approach when you declare graph as a Bean
public class MyGraph extends Graph<MyPayload>{ @Autowired MyService myService; Vertex doStuff = handler( paylaod -> myService.doSomethingUsefull() ).withMerger(...); } @Configuration public class ApplicationConfiguration{ @Bean public MyGraph muGraph(){ return new MyGraph(); } ... @Autowired Collection<Graph> applicationGraphs; @Bean public CompletableReactor completableReactor(){ CompletableReactor completableReactor = new CompletableReactor() applicationGraphs.forEach(completableReactor::registerGraph) return completableReactor; } }
Optional, lateinit vars for Graph context variable
Nullable types T?
or Optional<T>
force us to check value for null before we can use it.
This is very handy for function arguments, class states, etc.
But lets discuss situation with graphs.
We usually desing graphs in a way so upper vertices initialize data and bottom vertices uses them.
Upper vertex1 initialize data
with value and vertex2 reads data
.
In that case our graph design imply that data will be ready for reading in vertex2.
Suppose that data is not ready : vertex2 from our example have only one case: if data
is null - rise an exception.
But Kotlin lateinit or Nullable data
will rise NPE for this case for us.
So if our graph design imply that data should be ready before reading by bottom vertices - we should use nullable or lateinit vars.
What if our logic have two cases: in one we use data, and in another - do not. Best way to solve this case is to try to implement it thought graph design and have two branches of vertices. This way we can continue to use lateinit vars/nullable values inside payload.
But some times it is better not to increase complexity by adding new branches to graph
and instead to use T?/Optional<T>
data in graph payload.
Then we simply implement if (data == null/isPresent) A else B
logic inside vertex without any exceptions.
Code formatting
(1) Separate vertex field declaration and handler block with new line '\n'
(2) Separate handler block and merger block with new line '\n'
(3) Separate documentation comment with new line '\n'
public class PurchaseGraph extends Graph<PurchasePayload> { Vertex writeUserLog = //(1) handler( //# log ussd purchase details //(3) payload -> { <purchase logging code> } ).withMerger((payload, result) -> { //(2) ... }); Vertex fireSubscribeAction = handler( payload -> { //(3) <statistc event firing code> } ).withoutMerger(); }
Source guidebook
Completable Reactor implementation details
Graph representation
There are four scopes of classes that express Graph:
-
DSL - public language specific API that developers use to build Graphs.
ru.fix.completable.reactor.graph.Vertex + ru.fix.completable.reactor.graph.Graph class MyGraph: Graph<MyPayload>{ ... val myVertex = handler{...}.withMerger{...} ... myVertex.on(...).handleBy(...) ... }
-
Runtime - stores graph configuration in runtime. CompletabelReactor executes DSL and creates runtime graph instances during application initialization.
ru.fix.completable.reactor.graph.runtime.RuntimeGraph + ru.fix.completable.reactor.graph.runtime.RuntimeVertex
-
Execution - for each Payload submission CompletableReactor reads Runtime graph description `RuntimeGraph`and builds Execution representation. Execution representation of the graph is a brunch of CompletableFutures linked with each other.
ru.fix.completable.reactor.runtime.execution.ExecutionBuilder.ProcessingVertex
-
Compile - functionality that represent Graph during visualization. Parser analyzes source code of Graph and builds graph model. This model is displayed by viewer as visual representation.
ru.fix.completable.reactor.parser.java.JavaSourceParser ru.fix.completable.reactor.model.CompileTimeGraph
Execution Process
Each invocation of Completable Reactor triggers execution process that consists of three parts.
-
Read graph runtime description
RuntimeGraph/RuntimeVertices
-
Build CompletableFuture chain that implements given Graph
-
Execute CompletableFuture chain
ru.fix.completable.reactor.runtime.execution.ExecutionBuilder
is responsible for building CompletableFuture chain. Intermediate representation of CompletableFuture chain is a graph of Processing Vertices. Processing Vertex Graph is a temporal structure that stores CompletableFutures. Each ProcessingVertex (pvx) in Processing Vertex Graph is linked with RuntimeVertex (vx). Tree of Runtime Vertices (vx) is build by DSL and represents graph structure. Tree of runtime graph is immutable. Tree of Processing Vertexes (pvx) represents execution graph. Runtime builds execution graph each time when payload submitted to ReactorGraph.
Processing Vertex keeps references to CompletableFutures. CompletableFutures represent incoming or outgoing transitions and result of computation particular elements of a vertex: handlers.
All types of Vertices in runtime represents as a Processing Vertex with handler and merger. Router is a Processing Vertex with empty merger. Mutator is a router with no result.
How to build
There is no task that builds Intellij Idea plugin.
Use Intellij Idea IDE UI to create plugin release.
Intellij plugin located at completable-reactor-plugin-idea
directory.
In order to open plugin module into Intellij Idea you have to manually add line
<module fileurl="file://$PROJECT_DIR$/completable-reactor-plugin-idea/completable-reactor-plugin-idea.iml" filepath="$PROJECT_DIR$/completable-reactor-plugin-idea/completable-reactor-plugin-idea.iml" />
into .idea/modules.xml
configuration file.
Generate documentation
Documentation source located at
asciidoc/*
Compiled documentation stored at
docs/*
Compiled documentation stored in git and served as a static content. To compile documentation run
gradle asciidoctor
Well known problems
JavaFx application failed to load canberra-gtk-module.
Application starts with error message:
Gtk-Message: ... Failed to load module "canberra-gtk-module"
Fix:
sudo apt-get install libcanberra-gtk-module