Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Feature/multi data source #843

Open
wants to merge 11 commits into
base: develop
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class PubSubClient(
"Status code: ${throwable.statusCode.code}\n" +
"Retrying: ${throwable.isRetryable}")
} else {
logger.error("Error sending CCR Request to PubSub. messageId: $messageId")
logger.error("Error sending CCR Request to PubSub topic ${publisher.topicName}. messageId: $messageId")
}
}

Expand Down
87 changes: 74 additions & 13 deletions ocsgw/src/main/java/org/ostelco/ocsgw/OcsServer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@ import org.ostelco.diameter.getLogger
import org.ostelco.diameter.model.ReAuthRequestType
import org.ostelco.diameter.model.SessionContext
import org.ostelco.ocsgw.datasource.DataSource
import org.ostelco.ocsgw.datasource.DataSourceOperations
import org.ostelco.ocsgw.datasource.DataSourceType
import org.ostelco.ocsgw.datasource.DataSourceType.Local
import org.ostelco.ocsgw.datasource.DataSourceType.Proxy
import org.ostelco.ocsgw.datasource.DataSourceType.PubSub
import org.ostelco.ocsgw.datasource.DataSourceType.gRPC
import org.ostelco.ocsgw.datasource.SecondaryDataSourceType
import org.ostelco.ocsgw.datasource.DataSourceType.Multi
import org.ostelco.ocsgw.datasource.local.LocalDataSource
import org.ostelco.ocsgw.datasource.multi.MultiDataSource
import org.ostelco.ocsgw.datasource.protobuf.GrpcDataSource
import org.ostelco.ocsgw.datasource.protobuf.ProtobufDataSource
import org.ostelco.ocsgw.datasource.protobuf.PubSubDataSource
Expand Down Expand Up @@ -126,18 +129,16 @@ object OcsServer {

this.defaultRequestedServiceUnit = appConfig.defaultRequestedServiceUnit

val protobufDataSource = ProtobufDataSource()
source = setupDataSource(appConfig.dataStoreType, appConfig)
source?.init(DataSourceOperations.creditControlAndActivate)
}

source = when (appConfig.dataStoreType) {
private fun setupDataSource(dataSourceType: DataSourceType, appConfig: AppConfig) : DataSource {
val protobufDataSource = ProtobufDataSource()
return when (dataSourceType) {
Proxy -> {
logger.info("Using ProxyDataSource")
val secondary = when (appConfig.secondaryDataStoreType) {
SecondaryDataSourceType.PubSub -> getPubSubDataSource(protobufDataSource, appConfig)
SecondaryDataSourceType.gRPC -> getGrpcDataSource(protobufDataSource, appConfig)
else -> getPubSubDataSource(protobufDataSource, appConfig)
}
secondary.init()
ProxyDataSource(secondary)
setupProxyDataSource(protobufDataSource, appConfig)
}
Local -> {
logger.info("Using LocalDataSource")
Expand All @@ -151,12 +152,72 @@ object OcsServer {
logger.info("Using GrpcDataSource")
getGrpcDataSource(protobufDataSource, appConfig)
}
Multi -> {
logger.info("Using MultiDataSource")
setupMultiDataSource(appConfig)
}
}
}

private fun setupProxyDataSource(protobufDataSource: ProtobufDataSource, appConfig: AppConfig) : DataSource {
val secondary = when (appConfig.secondaryDataSourceType) {
PubSub -> {
logger.info("SecondaryDataStore set to PubSub")
getPubSubDataSource(protobufDataSource, appConfig)
}
gRPC -> {
logger.info("SecondaryDataStore set to gRPC")
getGrpcDataSource(protobufDataSource, appConfig)
}
else -> {
logger.warn("Unknown DataStoreType {}", appConfig.dataStoreType)
LocalDataSource()
logger.info("Default SecondaryDataSource PubSub")
getPubSubDataSource(protobufDataSource, appConfig)
}
}
source?.init()
secondary.init(DataSourceOperations.creditControlAndActivate)
return ProxyDataSource(secondary)
}

private fun setupMultiDataSource(appConfig: AppConfig) : DataSource {
logger.info("Setting up InitDataSource")
val initDataSource = setupDataSource(appConfig.multiInitDataSourceType, appConfig)
logger.info("Setting up UpdateDataSource")
val updateDataSource = setupUpdateDataSource(appConfig, initDataSource)
logger.info("Setting up TerminateDataSource")
val terminateDataSource : DataSource
terminateDataSource = setupTerminateDataSource(appConfig, initDataSource, updateDataSource)
logger.info("Setting up ActivateDataSource")
val activateDataSouce = setupActivateDataSource(appConfig, initDataSource, updateDataSource, terminateDataSource)
return MultiDataSource(initDataSource, updateDataSource, terminateDataSource, activateDataSouce)
}

private fun setupUpdateDataSource(appConfig: AppConfig, initDataSource: DataSource) : DataSource {
return when {
appConfig.multiInitDataSourceType == appConfig.multiUpdateDataSourceType -> initDataSource
else -> setupDataSource(appConfig.multiUpdateDataSourceType, appConfig)
}
}

private fun setupTerminateDataSource(appConfig: AppConfig,
initDataSource: DataSource,
updateDataSource: DataSource): DataSource {
return when {
appConfig.multiInitDataSourceType == appConfig.multiTerminateDataSourceType -> initDataSource
appConfig.multiUpdateDataSourceType == appConfig.multiTerminateDataSourceType -> updateDataSource
else -> setupDataSource(appConfig.multiTerminateDataSourceType, appConfig)
}
}

private fun setupActivateDataSource(appConfig: AppConfig,
initDataSource: DataSource,
updateDataSource: DataSource,
terminateDataSource: DataSource): DataSource {
return when {
appConfig.multiInitDataSourceType == appConfig.multiActivateDataSourceType -> initDataSource
appConfig.multiUpdateDataSourceType == appConfig.multiActivateDataSourceType -> updateDataSource
appConfig.multiTerminateDataSourceType == appConfig.multiActivateDataSourceType -> terminateDataSource
else -> setupDataSource(appConfig.multiActivateDataSourceType, appConfig)
}
}

private fun getGrpcDataSource(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public interface DataSource {
/**
* Initiates the datasource
*/
void init();
void init(DataSourceOperations operations);

/**
* Forward a new initial/update/terminate request.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ enum class DataSourceType {
Local,
gRPC,
PubSub,
Proxy
Proxy,
Multi
}

enum class SecondaryDataSourceType {
gRPC,
PubSub
enum class DataSourceOperations {
activate,
creditControl,
creditControlAndActivate
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.ostelco.ocs.api.CreditControlRequestType;
import org.ostelco.ocsgw.OcsServer;
import org.ostelco.ocsgw.datasource.DataSource;
import org.ostelco.ocsgw.datasource.DataSourceOperations;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -27,7 +28,7 @@ public class LocalDataSource implements DataSource {
private static final Logger LOG = LoggerFactory.getLogger(LocalDataSource.class);

@Override
public void init() {
public void init(DataSourceOperations operations) {
// No init needed
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package org.ostelco.ocsgw.datasource.multi;

import org.ostelco.diameter.CreditControlContext;
import org.ostelco.ocs.api.CreditControlRequestType;
import org.ostelco.ocsgw.datasource.DataSource;
import org.ostelco.ocsgw.datasource.DataSourceOperations;

/**
* Proxy DataSource is a combination of multiple DataSources
*
* With this approach the CCR-I, CCR-U and CCR-T can use different DataSources
*/
public class MultiDataSource implements DataSource {

private DataSource initDatasource = null;
private DataSource updateDatasource = null;
private DataSource terminateDatasource = null;
private DataSource activateDatasource = null;

public MultiDataSource(DataSource initDatasource,
DataSource updateDatasource,
DataSource terminateDatasource,
DataSource activateDataSource) {
this.initDatasource = initDatasource;
this.updateDatasource = updateDatasource;
this.terminateDatasource = terminateDatasource;
this.activateDatasource = activateDataSource;
}

@Override
public void init(DataSourceOperations dataSourceOperations) {
initDatasource.init(DataSourceOperations.creditControl);
updateDatasource.init(DataSourceOperations.creditControl);
terminateDatasource.init(DataSourceOperations.creditControl);
activateDatasource.init(DataSourceOperations.activate);
}

@Override
public void handleRequest(CreditControlContext context) {

if (context.getOriginalCreditControlRequest().getRequestTypeAVPValue()
== CreditControlRequestType.INITIAL_REQUEST.getNumber()) {
initDatasource.handleRequest(context);
} else if (context.getOriginalCreditControlRequest().getRequestTypeAVPValue()
== CreditControlRequestType.UPDATE_REQUEST.getNumber()) {
updateDatasource.handleRequest(context);
} else if (context.getOriginalCreditControlRequest().getRequestTypeAVPValue()
== CreditControlRequestType.TERMINATION_REQUEST.getNumber()) {
terminateDatasource.handleRequest(context);
}
}

@Override
public boolean isBlocked(final String msisdn) {
return updateDatasource.isBlocked(msisdn);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.ostelco.ocs.api.CreditControlRequestType;
import org.ostelco.ocs.api.OcsServiceGrpc;
import org.ostelco.ocsgw.datasource.DataSource;
import org.ostelco.ocsgw.datasource.DataSourceOperations;
import org.ostelco.ocsgw.metrics.OcsgwMetrics;
import org.ostelco.ocsgw.utils.EventConsumer;
import org.ostelco.ocsgw.utils.EventProducer;
Expand Down Expand Up @@ -98,15 +99,30 @@ public GrpcDataSource(
}

@Override
public void init() {
public void init(DataSourceOperations operations) {

setupChannel();
initCreditControlRequestStream();
initActivateStream();
initKeepAlive();

switch (operations) {
case activate:
initActivateStream();
break;
case creditControl:
initCreditControl();
break;
case creditControlAndActivate:
initCreditControl();
initActivateStream();
break;
default:
}

ocsgwAnalytics.initAnalyticsRequestStream();
}

setupEventConsumer();
private void initCreditControl() {
initCreditControlRequestStream();
initCcrKeepAlive();
}

private void setupEventConsumer() {
Expand All @@ -129,7 +145,7 @@ private void setupChannel() {
// Set up a channel to be used to communicate as an OCS instance,
// to a gRPC instance.

final boolean disableTls = Boolean.valueOf(System.getenv("DISABLE_TLS"));
final boolean disableTls = Boolean.parseBoolean(System.getenv("DISABLE_TLS"));

try {
if (disableTls) {
Expand Down Expand Up @@ -198,6 +214,7 @@ public void onCompleted() {
// Nothing to do here
}
});
setupEventConsumer();
}

/**
Expand Down Expand Up @@ -232,7 +249,7 @@ public void onCompleted() {
* The keep alive messages are sent on the creditControlRequestStream
* to force it to stay open avoiding reconnects on the gRPC channel.
*/
private void initKeepAlive() {
private void initCcrKeepAlive() {
// this is used to keep connection alive
executorService.scheduleWithFixedDelay(() -> {
final CreditControlRequestInfo ccr = CreditControlRequestInfo.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.ostelco.diameter.CreditControlContext
import org.ostelco.diameter.getLogger
import org.ostelco.ocs.api.*
import org.ostelco.ocsgw.datasource.DataSource
import org.ostelco.ocsgw.datasource.DataSourceOperations
import java.util.*
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService
Expand All @@ -29,18 +30,16 @@ import java.util.concurrent.TimeUnit

class PubSubDataSource(
private val protobufDataSource: ProtobufDataSource,
projectId: String,
ccrTopicId: String,
private val projectId: String,
private val ccrTopicId: String,
private val ccaTopicId: String,
ccaSubscriptionId: String,
activateSubscriptionId: String) : DataSource {
private val ccaSubscriptionId: String,
private val activateSubscriptionId: String) : DataSource {

private val logger by getLogger()

private var singleThreadScheduledExecutor: ScheduledExecutorService = Executors.newSingleThreadScheduledExecutor()

private var pubSubChannelProvider: TransportChannelProvider? = null
private var publisher: Publisher
private lateinit var publisher: Publisher

init {

Expand All @@ -50,16 +49,27 @@ class PubSubDataSource(
// Create a publisher instance with default settings bound to the topic
pubSubChannelProvider = FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel))
}
}

override fun init(operations: DataSourceOperations) {
when (operations) {
DataSourceOperations.activate -> initActivate()
DataSourceOperations.creditControl -> initCreditControl()
DataSourceOperations.creditControlAndActivate -> {
initActivate()
initCreditControl()
}
}
}

private fun initCreditControl() {
publisher = setupPublisherToTopic(projectId, ccrTopicId)
setupCcaReceiver(projectId, ccaSubscriptionId)
initCcrKeepAlive()

setupActivateReceiver(projectId, activateSubscriptionId)
}

override fun init() {

private fun initActivate() {
setupActivateReceiver(projectId, activateSubscriptionId)
}

override fun handleRequest(context: CreditControlContext) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.ostelco.ocsgw.datasource.proxy;

import org.ostelco.ocsgw.datasource.DataSource;
import org.ostelco.ocsgw.datasource.DataSourceOperations;
import org.ostelco.ocsgw.datasource.local.LocalDataSource;
import org.ostelco.diameter.CreditControlContext;
import org.ostelco.ocs.api.CreditControlRequestType;
Expand All @@ -27,8 +28,8 @@ public ProxyDataSource(DataSource dataSource) {
}

@Override
public void init() {
// No init needed
public void init(DataSourceOperations operations) {
local.init(operations);
}

@Override
Expand Down
Loading