Previous: Automation overview.
The automation is built using Java bindings to the low-level gRPC server API. This API is event-driven: it delivers notifications of contract creation, exercises, and archives as an event stream. It is implemented using RxJava reactive streams.
Each of the automation functions (Trading party automation, Automated market setup, and Automated user simulation) is implemented as a Java class, all derived from the common base class EventProcessor
.
A top-level class, BondTradingMain
, acts as the main class for any of these. It accepts command line arguments, which determine what part of the automation to execute, and on behalf of which party.
A full application consists of multiple processes - one for each party and automation function. Each of these processes can be considered a bot that performs autonomous actions on behalf of the party.
BondTradingMain.java implements a main() method that:
- Creates an instance of itself and calls its
run()
method. - Waits in a sleep loop for a termination indication from other threads.
- Terminates itself with a provided exit code.
The run() method implements the bulk of the required set-up. It:
- Connects to the ledger, creating an instance of a gRPC
ManagedChannel
. - Fetches the
ledgerId
and the primarypackageId
. - Creates a processor object (one of MarketSetupProcessor, TradeInjector or TradingPartyProcessor), passing in the channel, ledgerId, packageId, and specified party to the constructor.
The initial command-line argument determines the class of the processor, and through that the function of the created bot.
Each of the processor classes performs a separate function, but have a lot of behavior in common. This is captured by using a common superclass, EventProcessor
.
EventProcessor.java is an abstract class, responsible for:
- creating a command submission service
- creating services that provide transaction and command completion events
- connecting these to RxJava streams
- tracking in-flight (pending) commands, and handling their completion
- unpacking transactions into events and dispatching them for processing
The service classes for the transaction, completion and submission services are set up in a similar manner, on processor construction. Each service is represented by a gRPC service class, which provides a factory method that takes the channel as an argument.
These service instances are stored in instance variables in the processor.
Once these services are established, the run() method (called by BondTradingMain to start the processor):
Creates RxJava stream observers on these endpoints by creating a request and submitting it to the service.
This is done by the methods setupTransactionService() and setupCompletionService().
For each service, builds a service request (
GetTransactionsRequest
for transactions, andCompletionStreamRequest
for completions) and aStreamObserver
for the type of event expected.Provides these requests to the service via a creation method (
getTransactions()
orcompletionStream()
) which connects the Rxjava observer to the event stream and makes it live.
As a RxJava stream observer, the StreamObserver
instance implements onNext()
and onError()
methods, and it is these that receive and handle each event. For transaction events, each transaction is handled by the method processTransaction
, and for completions, each event is handled by processCompletion
. These are described below.
Once this is done, the onNext()
methods handle all processing, scheduled by the underlying RxJava and gRPC implementation using a thread provided by a Java Executor
.
The method submitCommands() handles all command submissions, and does so by calling the submission service directly.
This method takes a list of commands, packages them up as a SubmitRequest
, along with required submission information such as ledger effective time, command, workflow, application, and ledger IDs, as well as submitting party, and then submits them to the ledger.
The submission is recorded by creating a CompletionRecord
and storing it in a pendingCommands
map, indexed by command ID.
The StreamObservers
created by the run
method dispatch events by calling event handling methods from their onNext
methods. The method processTransaction handles transaction events, and the method processCompletion handles command completion events.
processTransaction
splits each transaction into its individual events, and calls processEvent() on each of these. This method dispatches on the event type - one of create
or archive
. For each of these, a corresponding abstract method is called (processCreatedEvent
, processExerciseEvent
or processArchivedEvent
).
Each of these methods is expected to return a list of commands that should be executed in response to handling a given event. When the event handler returns, EventProcessor
takes any returned commands and automatically submits them.
This is the central mechanism though which ledger changes are processed. Different types of event processors are built by sub-classing EventProcessor
, and implementing the above abstract methods. Reactive automation can be implemented by returning new commands to be executed in response. State-based automation is realized by updating an active contract store or more general operational data store according to these events.
Completion handling is done by the method processCompletion in an analogous manner, in conjunction with the pending CompletionRecord stored by command submission. Each completion event contains the relevant commandId, and this is used to retrieve the corresponding CompletionRecord
. Depending on the completion status code, either a successful or error completion is indicated by calling processCompletionError or processCompletionSuccess. EventProcessor
provides empty implementations of these, and allows sub-classes to override if required.
MarketSetupProcessor.java is responsible for the initial Automated market setup. It reads the assets to be issued from a .CSV file, which is provided as a command line argument.
Since the majority of the setup is coded in DAML (MarketSetupJob), all the processor needs to do is to create an instance of this template, wait for its creation, and then exercise the Process
choice to complete the setup. This is done by implementing the processCreateEvent() method and listening for a create
event of a MarketSetupJob
.
The run() implementation starts the process with a single creation of an instance of MarketSetupJob
. Creation of the contract requires cash and bonds data in the form of MarketSetupEntry. Most of the logic in the MarketSetupProcessor
class is concerned with mapping the tabular asset data from the CSV file to the required MarketSetupEntry
objects.
Finally, the processor overrides processCompletionSuccess and processCompletionError to detect the completion of the Process
exercise. When this occurs, trace information is output and the process terminates.
TradeInjector.java defines a processor that is responsible for injecting trade requests into the market to simulate the actions of a party as described in Automated user simulation. The trades are instances of the DvpProposal template, and are again created from a .CSV file provided as a command line argument. The submitting party is also defined by a command line argument.
Trade injection cannot happen until market participants are established, so the processor waits for an appropriate signal that this is done. Since injection is done on a per-party basis, this must be tied to the setup for a given party. TradeInjector
does this by overriding processCreateEvent and waiting for the creation of a SettlementProcessor
contract for the party in question.
Once the creation is detected, the .CSV file is read, and streamed through a pipeline that selects records for the submitting party, converts them into a creation command for a DvpProposal
, and submits them to the ledger. The processor terminates once the end of the file is reached. To slow this process down, a delay is inserted into this pipeline by having the thread sleep before each command is submitted.
TradingPartyProcessor defines a processor responsible for performing all actions described in Trading party automation. The reactive automation of accepting transfer and trade proposals is implemented by listening to the respective create
events in processCreateEvent and returning corresponding Accept
exercise commands for cash and bond transfers and Dvp proposals.
For the state-based asset/trade matching logic, a simple ACS is implemented, consisting of:
- Queues of cash assets, indexed by the primary ID - the Currency
- Queues of bonds assets, indexed by the primary ID - the ISIN.
- Queues of DvPs awaiting cash allocation - instances of the template
Dvp.Dvp
that the acting party is the buyer on, indexed by the bond asset ID (the Currency) - Queues of DvPs awaiting settlement - instances of the template
Dvp.DvpAllocated
that the acting party is the seller on, indexed by the bond asset ID (the ISIN)
This state is held and maintained by the static class SettlementState. A single instance of SettlementState
is held by the processor, and contains four data structures, one for each of the four items above. Assets and trades are represented by two other static classes, Asset and Dvp.
This state is kept in sync with the ledger by tracking create and archive events of the respective contract types in processCreateEvent() and processArchiveEvent.
To run the matching logic, the processor overrides the submitCommands method of EventProcessor
. This is called at the end of processTransaction
so at this point, the ACS will have been updated with all new create
and archive
events.
The state kept in this way is enough to implement the settlement algorithm described in Trading party automation. The method runSettlement() implements this, and runs the algorithm for all active currencies and ISINs. Since the core of both allocation and settlement is implemented in DAML by the template SettlementProcessor
, the task of runSettlement
is merely to generate lists of assets and trades for one of the two choices on SettlementProcessor
: AllocateCash or SettleMany.
Two corresponding methods on SettlementState
, allocateCash() and allocateBonds, generate these lists. Since the matching is essentially the same for cash and bonds, they are both implemented by calling a single method matchAssets, parameterized for allocation or settlement.
matchAssets
takes matching queues of assets and trades, and looks for the longest initial segment of the queue of trades that the list of assets can allocate or settle. It then matches that with the shortest initial segment of assets that is sufficient. The resulting lists are removed from the state and returned in a helper class, MatchResults. An exercise command on a SettlementProcessor
is created by settlementCommandFor() from these lists, and added to the list of commands originally received by submitCommands. The final call to super.submitCommands
actually submits them to the ledger.