Skip to content
Simon Redfern edited this page May 20, 2016 · 15 revisions

Kafka Connector

The Kafka connector provides the following advantages when connecting OBP to a core banking system.

  • It provides a logging layer for non repudiation. i.e. OBP sent this request to the core banking system at this time. The messages can be consumed into another (offsite) storge system.

  • It provides a layer for real time analytics / fraud monitoring. i.e. Apache Spark or other tools could monitor the queue and look for irregularities.

  • It provides a separation from the core OBP API. i.e. it's easier to use the OBP API develop branch when using the Kafka connector.

  • Connector Code (south of the Kafka queue) is not restricted to a JVM language (Scala, Java, Clojure etc.) You can use any language that speaks Kafka e.g. Python, Go, C# etc.

    OBP | (Scala) North side (Any JVM language) Kafka South side | (Any language) Core Banking

Status

The Kafka Connector is under rapid development. TESOBE is using it as the prefered method to connect to core banking systems (with south side code so far in Go, Scala and Python).

Quick Try

Run the OBP Kafka Docker Image: https://hub.docker.com/r/openbankproject/obp-full-kafka/

Run a south side Python Client (this is Apache licensed so you can use it as a starting point for a Proof of Concept) https://github.com/OpenBankProject/OBP-Kafka-Python

Data quick look

The following KafkaInbound* classes give an indication of the basic data that is requested over the Kafka queue. These classes are serialised before sending over the Kafka queue. For the current status see the bottom of this file: https://github.com/OpenBankProject/OBP-API/blob/develop/src/main/scala/code/bankconnectors/KafkaMappedConnector.scala

  case class KafkaInboundBank(
                              id : String,
                              short_name : String,
                              full_name : String,
                              logo : String,
                              website : String)


  /** Bank Branches
    *
    * @param id Uniquely identifies the Branch within the Bank. SHOULD be url friendly (no spaces etc.) Used in URLs
    * @param bank_id MUST match bank_id in Banks
    * @param name Informal name for the Branch
    * @param address Address
    * @param location Geolocation
    * @param meta Meta information including the license this information is published under
    * @param lobby Info about when the lobby doors are open
    * @param driveUp Info about when automated facilities are open e.g. cash point machine
    */
  case class KafkaInboundBranch(
                                 id : String,
                                 bank_id: String,
                                 name : String,
                                 address : KafkaInboundAddress,
                                 location : KafkaInboundLocation,
                                 meta : KafkaInboundMeta,
                                 lobby : Option[KafkaInboundLobby],
                                 driveUp : Option[KafkaInboundDriveUp])

  case class KafkaInboundLicense(
                                 id : String,
                                 name : String)

  case class KafkaInboundMeta(
                              license : KafkaInboundLicense)

  case class KafkaInboundLobby(
                               hours : String)

  case class KafkaInboundDriveUp(
                                 hours : String)

  /**
    *
    * @param line_1 Line 1 of Address
    * @param line_2 Line 2 of Address
    * @param line_3 Line 3 of Address
    * @param city City
    * @param county County i.e. Division of State
    * @param state State i.e. Division of Country
    * @param post_code Post Code or Zip Code
    * @param country_code 2 letter country code: ISO 3166-1 alpha-2
    */
  case class KafkaInboundAddress(
                                 line_1 : String,
                                 line_2 : String,
                                 line_3 : String,
                                 city : String,
                                 county : String, // Division of State
                                 state : String, // Division of Country
                                 post_code : String,
                                 country_code: String)

  case class KafkaInboundLocation(
                                  latitude : Double,
                                  longitude : Double)

  case class KafkaInboundUser(
                              email : String,
                              password : String,
                              display_name : String)

  case class KafkaInboundValidatedUser(
                                       email : String,
                                       display_name : String)

  case class KafkaInboundAccount(
                                  id : String,
                                  bank : String,
                                  label : String,
                                  number : String,
                                  type : String,
                                  balance : KafkaInboundBalance,
                                  IBAN : String,
                                  owners : List[String],
                                  generate_public_view : Boolean,
                                  generate_accountants_view : Boolean,
                                  generate_auditors_view : Boolean)

  case class KafkaInboundBalance(
                                 currency : String,
                                 amount : String)

  case class KafkaInboundTransaction(
                                      id : String,
                                      this_account : KafkaInboundAccountId,
                                      counterparty : Option[KafkaInboundTransactionCounterparty],
                                      details : KafkaInboundTransactionDetails)

  case class KafkaInboundTransactionCounterparty(
                                           name : Option[String],  // Also known as Label
                                           account_number : Option[String])

  case class KafkaInboundAccountId(
                                   id : String,
                                   bank : String)

  case class KafkaInboundTransactionDetails(
                                        type : String,
                                        description : String,
                                        posted : String,
                                        completed : String,
                                        new_balance : String,
                                        value : String)


  case class KafkaInboundAtm(
                              id : String,
                              bank_id: String,
                              name : String,
                              address : KafkaInboundAddress,
                              location : KafkaInboundLocation,
                              meta : KafkaInboundMeta
                           )


  case class KafkaInboundProduct(
                                 bank_id : String,
                                 code: String,
                                 name : String,
                                 category : String,
                                 family : String,
                                 super_family : String,
                                 more_info_url : String,
                                 meta : KafkaInboundMeta
                               )


  case class KafkaInboundCrmEvent(
                                   id : String, // crmEventId
                                   bank_id : String,
                                   customer: KafkaInboundCustomer,
                                   category : String,
                                   detail : String,
                                   channel : String,
                                   actual_date: String
                                 )

  case class KafkaInboundCustomer(
                                   name: String,
                                   number : String // customer number
                                 )
Clone this wiki locally