Skip to content

Commit

Permalink
Merge pull request #2443 from hongwei1/develop
Browse files Browse the repository at this point in the history
feature/added the ssl to rabbitMq connector
  • Loading branch information
simonredfern authored Nov 13, 2024
2 parents 9f43f61 + 178f2eb commit a97df0c
Show file tree
Hide file tree
Showing 7 changed files with 122 additions and 27 deletions.
7 changes: 7 additions & 0 deletions obp-api/src/main/resources/props/sample.props.template
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,9 @@ jwt.use.ssl=false
## Enable SSL for kafka, if set to true must set paths for the keystore locations
#kafka.use.ssl=true

## Enable SSL for rabbitmq, if set to true must set paths for the keystore locations
#rabbitmq.use.ssl=false

# Paths to the SSL keystore files - has to be jks
#keystore.path=/path/to/api.keystore.jks
#keystore password
Expand Down Expand Up @@ -835,6 +838,10 @@ featured_apis=elasticSearchWarehouseV300
# rabbitmq_connector.port=5672
# rabbitmq_connector.username=obp
# rabbitmq_connector.password=obp
# rabbitmq_connector.virtual_host=/
# -- RabbitMQ Adapter --------------------------------------------
#rabbitmq.adapter.enabled=false



# -- Scopes -----------------------------------------------------
Expand Down
6 changes: 6 additions & 0 deletions obp-api/src/main/scala/bootstrap/liftweb/Boot.scala
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,12 @@ class Boot extends MdcLoggable {
}
}

// start RabbitMq Adatper
if (APIUtil.getPropsAsBoolValue("rabbitmq.adapter.enabled", false)) {
code.bankconnectors.rabbitmq.Adapter.startRabbitMqAdapter.main(Array(""))
}


// Database query timeout
// APIUtil.getPropsValue("database_query_timeout_in_seconds").map { timeoutInSeconds =>
// tryo(timeoutInSeconds.toInt).isDefined match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ object AdapterStubBuilder {
println("===================")

val path = new File(getClass.getResource("").toURI.toString.replaceFirst("target/.*", "").replace("file:", ""),
"src/main/scala/code/bankconnectors/rabbitmq/Adapter/RPCServer.scala")
"src/main/scala/code/bankconnectors/rabbitmq/Adapter/MockedRabbitMqAdapter.scala")
val source = FileUtils.readFileToString(path, "utf-8")
val start = "//---------------- dynamic start -------------------please don't modify this line"
val end = "//---------------- dynamic end ---------------------please don't modify this line"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package code.bankconnectors.rabbitmq.Adapter

import bootstrap.liftweb.ToSchemify
import code.api.util.APIUtil
import code.bankconnectors.rabbitmq.RabbitMQUtils
import com.openbankproject.commons.ExecutionContext.Implicits.global
import com.openbankproject.commons.dto._
import com.openbankproject.commons.model._
Expand All @@ -14,10 +15,11 @@ import net.liftweb.mapper.Schemifier

import scala.concurrent.Future
import com.openbankproject.commons.ExecutionContext.Implicits.global

import code.bankconnectors.rabbitmq.RabbitMQUtils._
import java.util.Date
import code.util.Helper.MdcLoggable

class ServerCallback(val ch: Channel) extends DeliverCallback {
class ServerCallback(val ch: Channel) extends DeliverCallback with MdcLoggable{

private implicit val formats = code.api.util.CustomJsonFormats.nullTolerateFormats

Expand All @@ -32,7 +34,7 @@ class ServerCallback(val ch: Channel) extends DeliverCallback {
.messageId(obpMessageId)
.build
val message = new String(delivery.getBody, "UTF-8")
println(s"Request: OutBound message from OBP: methodId($obpMessageId) : message is $message ")
logger.debug(s"Request: OutBound message from OBP: methodId($obpMessageId) : message is $message ")

try {
val responseToOBP = if (obpMessageId.contains("obp_get_banks")) {
Expand Down Expand Up @@ -3052,10 +3054,10 @@ class ServerCallback(val ch: Channel) extends DeliverCallback {
}

response = responseToOBP.map(a => write(a)).map("" + _)
response.map(res => println(s"Response: inBound message to OBP: process($obpMessageId) : message is $res "))
response.map(res => logger.debug(s"Response: inBound message to OBP: process($obpMessageId) : message is $res "))
response
} catch {
case e: Throwable => println("Unknown exception: " + e.toString)
case e: Throwable => logger.error("Unknown exception: " + e.toString)

} finally {
response.map(res => ch.basicPublish("", delivery.getProperties.getReplyTo, replyProps, res.getBytes("UTF-8")))
Expand All @@ -3065,14 +3067,13 @@ class ServerCallback(val ch: Channel) extends DeliverCallback {

}

object RPCServer extends App {
/**
* This is only for testing, not ready for production.
* Still in processing.
*/
object MockedRabbitMqAdapter extends App with MdcLoggable{
private val RPC_QUEUE_NAME = "obp_rpc_queue"
// lazy initial RabbitMQ connection
val host = APIUtil.getPropsValue("rabbitmq_connector.host").openOrThrowException("mandatory property rabbitmq_connector.host is missing!")
val port = APIUtil.getPropsAsIntValue("rabbitmq_connector.port").openOrThrowException("mandatory property rabbitmq_connector.port is missing!")
// val username = APIUtil.getPropsValue("rabbitmq_connector.username").openOrThrowException("mandatory property rabbitmq_connector.username is missing!")
// val password = APIUtil.getPropsValue("rabbitmq_connector.password").openOrThrowException("mandatory property rabbitmq_connector.password is missing!")


DB.defineConnectionManager(net.liftweb.util.DefaultConnectionIdentifier, APIUtil.vendor)
Schemifier.schemify(true, Schemifier.infoF _, ToSchemify.models: _*)

Expand All @@ -3083,26 +3084,51 @@ object RPCServer extends App {
val factory = new ConnectionFactory()
factory.setHost(host)
factory.setPort(port)
factory.setUsername("server")
factory.setPassword("server")
factory.setUsername(username)
factory.setPassword(password)
factory.setVirtualHost(virtualHost)
if (APIUtil.getPropsAsBoolValue("rabbitmq.use.ssl", false)){
factory.useSslProtocol(RabbitMQUtils.createSSLContext(
keystorePath,
keystorePassword,
truststorePath,
truststorePassword
))
}

connection = factory.newConnection()
channel = connection.createChannel()
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null)
channel.basicQos(1)
// stop after one consumed message since this is example code
val serverCallback = new ServerCallback(channel)
channel basicConsume(RPC_QUEUE_NAME, false, serverCallback, _ => {})
println("Start awaiting OBP Connector Requests:")
logger.info("Start awaiting OBP Connector Requests:")
} catch {
case e: Exception => e.printStackTrace()
} finally {
if (connection != null) {
try {
// connection.close()
// connection.close() //this is a tempreory solution, we keep this connection open to wait for messages
} catch {
case e: Exception => println(s"unknown Exception:$e")
case e: Exception => logger.error(s"unknown Exception:$e")
}
}
}

}

/**
* This adapter is only for testing poplors, not ready for the production
*/
object startRabbitMqAdapter {
def main(args: Array[String]): Unit = {
val thread = new Thread(new Runnable {
override def run(): Unit = {
MockedRabbitMqAdapter.main(Array.empty)
}
})
thread.start()
thread.join()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,26 @@ import org.apache.commons.pool2.impl.{GenericObjectPool, GenericObjectPoolConfig
import org.apache.commons.pool2.BasePooledObjectFactory
import org.apache.commons.pool2.PooledObject
import org.apache.commons.pool2.impl.DefaultPooledObject
import code.bankconnectors.rabbitmq.RabbitMQUtils._

// Factory to create RabbitMQ connections
class RabbitMQConnectionFactory extends BasePooledObjectFactory[Connection] {

// lazy initial RabbitMQ connection
val host = APIUtil.getPropsValue("rabbitmq_connector.host").openOrThrowException("mandatory property rabbitmq_connector.host is missing!")
val port = APIUtil.getPropsAsIntValue("rabbitmq_connector.port").openOrThrowException("mandatory property rabbitmq_connector.port is missing!")
val username = APIUtil.getPropsValue("rabbitmq_connector.username").openOrThrowException("mandatory property rabbitmq_connector.username is missing!")
val password = APIUtil.getPropsValue("rabbitmq_connector.password").openOrThrowException("mandatory property rabbitmq_connector.password is missing!")


private val factory = new ConnectionFactory()
factory.setHost(host)
factory.setPort(port)
factory.setUsername(username)
factory.setPassword(password)
factory.setVirtualHost(virtualHost)
if (APIUtil.getPropsAsBoolValue("rabbitmq.use.ssl", false)){
factory.useSslProtocol(RabbitMQUtils.createSSLContext(
keystorePath,
keystorePassword,
truststorePath,
truststorePassword
))
}


// Create a new RabbitMQ connection
override def create(): Connection = factory.newConnection()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@ package code.bankconnectors.rabbitmq
import code.api.util.ErrorMessages.AdapterUnknownError
import code.bankconnectors.Connector
import code.util.Helper.MdcLoggable
import code.api.util.APIUtil
import com.openbankproject.commons.model.TopicTrait
import net.liftweb.common.{Box, Empty, Failure, Full}
import net.liftweb.json.Serialization.write
import com.rabbitmq.client.AMQP.BasicProperties
import com.rabbitmq.client._

import java.util
import java.util.UUID
import javax.net.ssl.{KeyManagerFactory, SSLContext, TrustManagerFactory}
import java.io.FileInputStream
import java.security.KeyStore
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Future, Promise}

Expand All @@ -21,6 +24,17 @@ import scala.concurrent.{Future, Promise}
*/
object RabbitMQUtils extends MdcLoggable{

val host = APIUtil.getPropsValue("rabbitmq_connector.host").openOrThrowException("mandatory property rabbitmq_connector.host is missing!")
val port = APIUtil.getPropsAsIntValue("rabbitmq_connector.port").openOrThrowException("mandatory property rabbitmq_connector.port is missing!")
val username = APIUtil.getPropsValue("rabbitmq_connector.username").openOrThrowException("mandatory property rabbitmq_connector.username is missing!")
val password = APIUtil.getPropsValue("rabbitmq_connector.password").openOrThrowException("mandatory property rabbitmq_connector.password is missing!")
val virtualHost = APIUtil.getPropsValue("rabbitmq_connector.virtual_host").openOrThrowException("mandatory property rabbitmq_connector.virtual_host is missing!")

val keystorePath = APIUtil.getPropsValue("keystore.path").getOrElse("")
val keystorePassword = APIUtil.getPropsValue("keystore.password").getOrElse(APIUtil.initPasswd)
val truststorePath = APIUtil.getPropsValue("truststore.path").getOrElse("")
val truststorePassword = APIUtil.getPropsValue("keystore.password").getOrElse(APIUtil.initPasswd)

private implicit val formats = code.api.util.CustomJsonFormats.nullTolerateFormats

val requestQueueName: String = "obp_rpc_queue"
Expand Down Expand Up @@ -107,4 +121,36 @@ object RabbitMQUtils extends MdcLoggable{
rabbitResponseJsonFuture.map(rabbitResponseJsonString =>logger.debug(s"${RabbitMQConnector_vOct2024.toString} inBoundJson: $messageId = $rabbitResponseJsonString" ))
rabbitResponseJsonFuture.map(rabbitResponseJsonString =>Connector.extractAdapterResponse[T](rabbitResponseJsonString, Empty))
}

def createSSLContext(
keystorePath: String,
keystorePassword: String,
truststorePath: String,
truststorePassword: String
): SSLContext = {
// Load client keystore
val keyStore = KeyStore.getInstance("jks")
val keystoreFile = new FileInputStream(keystorePath)
keyStore.load(keystoreFile, keystorePassword.toCharArray)
keystoreFile.close()
// Set up KeyManagerFactory for client certificates
val kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm)
kmf.init(keyStore, keystorePassword.toCharArray)

// Load truststore for CA certificates
val trustStore = KeyStore.getInstance("jks")
val truststoreFile = new FileInputStream(truststorePath)
trustStore.load(truststoreFile, truststorePassword.toCharArray)
truststoreFile.close()

// Set up TrustManagerFactory for CA certificates
val tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm)
tmf.init(trustStore)

// Initialize SSLContext
val sslContext = SSLContext.getInstance("TLSv1.3")
sslContext.init(kmf.getKeyManagers, tmf.getTrustManagers, null)
sslContext
}

}
6 changes: 6 additions & 0 deletions release_notes.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@
### Most recent changes at top of file
```
Date Commit Action
12/11/2024 d2e711b4 Added props rabbitmq_connector.virtual_host, default is /.
If you need to set it, please make sure you already add the virtual_host to the rabbitmq and grant the access to the user:
eg: run `rabbitmqctl add_vhost /obp/` => create the `/obp/`
and run `rabbitmqctl set_permissions -p /obp/ obp ".*" ".*" ".*"` => grant user `obp` the access permissions.
12/11/2024 d2e711b4 Added props rabbitmq.adapter.enabled, default is false
12/11/2024 a5253b4e Added props rabbitmq.use.ssl, default is false
30/10/2024 e69161b6 set V121, V130 and V200 status to DEPRECATED
29/10/2024 c83032f0 added the props for RabbitMq connector:
Added props rabbitmq_connector.host=localhost
Expand Down

0 comments on commit a97df0c

Please sign in to comment.