Skip to content
Mulugeta Ayalew edited this page Nov 16, 2017 · 15 revisions

Kafka Connector

The Kafka connector provides the following advantages when connecting OBP to a core banking system.

  • It provides a logging layer for non repudiation. i.e. OBP sent this request to the core banking system at this time. The messages can be consumed into another (offsite) storge system.

  • It provides a layer for real time analytics / fraud monitoring. i.e. Apache Spark or other tools could monitor the queue and look for irregularities.

  • It provides a separation from the core OBP API. i.e. it's easier to use the OBP API develop branch when using the Kafka connector.

  • Connector Code (south of the Kafka queue) is not restricted to a JVM language (Scala, Java, Clojure etc.) You can use any language that speaks Kafka e.g. Python, Go, C# etc.

         OBP
          |
      North side connector (Scala or any JVM language)
        Kafka
      South side connector (Any language)
          | 
      Core Banking
    

Status

The Kafka Connector is under rapid development. TESOBE is using it as the prefered method to connect to core banking systems (with south side code so far in Go, Scala and Python).

Try using Docker

Run the OBP Kafka Docker Image: https://hub.docker.com/r/openbankproject/obp-full-kafka/

And run an example Python script on the south side to respond to requests on the queue https://github.com/OpenBankProject/OBP-Kafka-Python

Tools

We found the following third-party tools be cool to manage a Kakfa cluster using a web interface.

Message / Data requirements

The following KafkaInbound* classes give an indication of the data that is requested over the Kafka queue. These classes are currently serialised to json for the queue. For the current status, use the force [source] (https://github.com/OpenBankProject/OBP-API/blob/develop/src/main/scala/code/bankconnectors/KafkaMappedConnector.scala)

Bank:

  case class KafkaInboundBank(
                              id : String,
                              short_name : String,
                              full_name : String,
                              logo : String,
                              website : String)

User:

  case class KafkaInboundValidatedUser(
                                       email : String,
                                       display_name : String)

Account and Transaction related:

  case class KafkaInboundAccount(
                                  id : String,
                                  bank : String,
                                  label : String,
                                  number : String,
                                  type : String,
                                  balance : KafkaInboundBalance,
                                  IBAN : String,
                                  owners : List[String],
                                  generate_public_view : Boolean,
                                  generate_accountants_view : Boolean,
                                  generate_auditors_view : Boolean)

  case class KafkaInboundBalance(
                                 currency : String,
                                 amount : String)

  case class KafkaInboundTransaction(
                                      id : String,
                                      this_account : KafkaInboundAccountId,
                                      counterparty : Option[KafkaInboundTransactionCounterparty],
                                      details : KafkaInboundTransactionDetails)

  case class KafkaInboundTransactionCounterparty(
                                           name : Option[String],  // Also known as Label
                                           account_number : Option[String])

  case class KafkaInboundAccountId(
                                   id : String,
                                   bank : String)



  case class KafkaInboundTransactionDetails(
                                        type : String,
                                        description : String,
                                        posted : String,
                                        completed : String,
                                        new_balance : String,
                                        value : String)

Customer (basic info only here):

  case class KafkaInboundCustomer(
                                   name: String,
                                   number : String // customer number
                                 )

Customer related:

  case class KafkaInboundCrmEvent(
                                   id : String, // crmEventId
                                   bank_id : String,
                                   customer: KafkaInboundCustomer,
                                   category : String,
                                   detail : String,
                                   channel : String,
                                   actual_date: String
                                 )

Bank Open Data:

  case class KafkaInboundAtm(
                              id : String,
                              bank_id: String,
                              name : String,
                              address : KafkaInboundAddress,
                              location : KafkaInboundLocation,
                              meta : KafkaInboundMeta
                           )


  case class KafkaInboundProduct(
                                 bank_id : String,
                                 code: String,
                                 name : String,
                                 category : String,
                                 family : String,
                                 super_family : String,
                                 more_info_url : String,
                                 meta : KafkaInboundMeta
                               )


    /** Bank Branches
    *
    * @param id Uniquely identifies the Branch within the Bank. SHOULD be url friendly (no spaces etc.) Used in URLs
    * @param bank_id MUST match bank_id in Banks
    * @param name Informal name for the Branch
    * @param address Address
    * @param location Geolocation
    * @param meta Meta information including the license this information is published under
    * @param lobby Info about when the lobby doors are open
    * @param driveUp Info about when automated facilities are open e.g. cash point machine
    */
  case class KafkaInboundBranch(
                                 id : String,
                                 bank_id: String,
                                 name : String,
                                 address : KafkaInboundAddress,
                                 location : KafkaInboundLocation,
                                 meta : KafkaInboundMeta,
                                 lobby : Option[KafkaInboundLobby],
                                 driveUp : Option[KafkaInboundDriveUp])

  case class KafkaInboundLicense(
                                 id : String,
                                 name : String)

  case class KafkaInboundMeta(
                              license : KafkaInboundLicense)

  case class KafkaInboundLobby(
                               hours : String)

  case class KafkaInboundDriveUp(
                                 hours : String)

  /**
    *
    * @param line_1 Line 1 of Address
    * @param line_2 Line 2 of Address
    * @param line_3 Line 3 of Address
    * @param city City
    * @param county County i.e. Division of State
    * @param state State i.e. Division of Country
    * @param post_code Post Code or Zip Code
    * @param country_code 2 letter country code: ISO 3166-1 alpha-2
    */
  case class KafkaInboundAddress(
                                 line_1 : String,
                                 line_2 : String,
                                 line_3 : String,
                                 city : String,
                                 county : String, // Division of State
                                 state : String, // Division of Country
                                 post_code : String,
                                 country_code: String)

  case class KafkaInboundLocation(
                                  latitude : Double,
                                  longitude : Double)
Clone this wiki locally