Solving complex state updates using Kafka CQRS

Solving complex state updates using Kafka CQRS

Business Case: Solve Intraday liquidity management and monitor the risks near-realtime.

The Primary goals for Intraday liquidity management and monitoring are as follows.

  • Monitor and measure expected daily gross liquidity in-flows and outflows.

  • Monitor and alert intraday liquidity exposures against the available liquidity sources.

  • Manage and mobilize sufficient liquidity funding sources to meet the liquidity demands and avoid shortfalls.

I used a mock business case based on references from (1) and (2)

Here is the Architecture that uses Kafka event streaming technology. This uses simple event sourcing implementation along with complex event processing using Kafka Streaming API.

In this example implementation, we address the 1st and the 3rd bullet points. Alerting and Monitoring will covered in the later part .

Lets find out each of the layer in more details

  • Payment Transaction Application: Lets us simulate this component using "PaymentTransactionsProducer.java" This basically produces transactions to kafka topic "payment-item-transactions". This is nothing but upstream transactions.

Here are the attributes that are packed as JSON. This simulates the Intraday Payment Flow.

1        transaction.put("participant", name);
2        transaction.put("transtype",transtype);
3        transaction.put("amount", amount);
4        transaction.put("crdr", rndcrdr);
5        transaction.put("time", now.toString());
  • Kafka Streaming Application: PartPayCollate.Java ingests data from the topic "payment-item-transactions" at real-time and computes the aggregate balance using the Ktable. The liquidity is calculated near real-time and it is updated to new topic "participant-collate-log". The state is maintained in memory, however this can be flushed to non-volatile memory and also can be distributed across the cluster. This is advanced topic and will be covered in later part.

The above program facilitates realtime calculation of available funds for the particular participant/institution. This may be running on central bank server or any financial institutions that would primarily serve liquidity management.

 1
 2        // Building Materialized view
 3        KTable<String, JsonNode> bankBalance = bankTransactions
 4                .groupByKey(Grouped.with(Serdes.String(), jsonSerde))
 5                .aggregate(
 6                        () -> initialBalance,   
 7                        (key, transaction, balance) -> newBalance(transaction, balance),
 8                        Materialized.with(Serdes.String(), jsonSerde)
 9                        
10                );
  • While ongoing transaction, the liquidity fund sources may top up or bring down the funding. These are simulated by "FundingDefunding.java"

Here are some sample outputs, after you run the programs. After running PaymentTransactionsProducer for few seconds (break using ctrl + C). You will find the transactions log under the topic "payment-item-transactions"

1{"participant":"Participant-id-001","transtype":"pay","amount":58,"crdr":1,"time":"2020-11-29T14:03:05.326945Z"}
2{"participant":"Participant-id-001","transtype":"pay","amount":83,"crdr":0,"time":"2020-11-29T14:03:05.329527Z"}
3{"participant":"Participant-id-001","transtype":"pay","amount":8,"crdr":0,"time":"2020-11-29T14:03:05.332109Z"}
4{"participant":"Participant-id-001","transtype":"pay","amount":91,"crdr":0,"time":"2020-11-29T14:03:05.334691Z"}
5{"participant":"Participant-id-001","transtype":"pay","amount":3,"crdr":1,"time":"2020-11-29T14:03:05.337173Z"}
6{"participant":"Participant-id-001","transtype":"pay","amount":30,"crdr":1,"time":"2020-11-29T14:03:05.339751Z"}

Here is the Ktable output to stream topic "participant-collate-log"

1{"count":2945,"transtype":"pay","amount":"64","balance":10603,"crdr":"DR","time":"2020-11-29T14:03:04.309Z","processtime":"2020-11-29T14:03:04.311840Z"}
2{"count":2946,"transtype":"pay","amount":"51","balance":10654,"crdr":"CR","time":"2020-11-29T14:03:04.311Z","processtime":"2020-11-29T14:03:04.314097Z"}
3{"count":2947,"transtype":"pay","amount":"51","balance":10603,"crdr":"DR","time":"2020-11-29T14:03:04.314Z","processtime":"2020-11-29T14:03:04.316319Z"}
4{"count":2948,"transtype":"pay","amount":"32","balance":10571,"crdr":"DR","time":"2020-11-29T14:03:04.316Z","processtime":"2020-11-29T14:03:04.318416Z"}
5{"count":2949,"transtype":"pay","amount":"49","balance":10620,"crdr":"CR","time":"2020-11-29T14:03:04.318Z","processtime":"2020-11-29T14:03:04.320816Z"}
6{"count":2950,"transtype":"pay","amount":"27","balance":10647,"crdr":"CR","time":"2020-11-29T14:03:04.320Z","processtime":"2020-11-29T14:03:04.322608Z"}
7{"count":2951,"transtype":"pay","amount":"38","balance":10609,"crdr":"DR","time":"2020-11-29T14:03:04.322Z","processtime":"2020-11-29T14:03:04.325438Z"}

When you closely look at the processing time it is as close to realtime. For eg., when you dissect the last record:-

1{"count":2951,"transtype":"pay","amount":"38","balance":10609,"crdr":"DR","time":"2020-11-29T14:03:04.322Z","processtime":"2020-11-29T14:03:04.325438Z"}

Transaction time "2020-11-29T14:03:04.322Z" vs processed time "2020-11-29T14:03:04.325438Z"

Conclusion:

The above example shows how effectively Kafka Streaming API can be used to perform complex business operations. The way Kafka Streaming API handles CQRS pattern simplifies developers. There are some architectural considerations that needs to be taken care while scaling this on distributed cluster. I will cover this part of my next blog. Happy learning :)

Here is the github repo https://github.com/livespotty/kafkastreams-cqrs, you may try running on your machine.

References:

comments powered by Disqus