Skip to content

Commit

Permalink
:fix: make events group id configurable by event type
Browse files Browse the repository at this point in the history
Signed-off-by: riccardomodanese <[email protected]>
  • Loading branch information
riccardomodanese committed Dec 11, 2024
1 parent e4837dc commit 916e4f1
Show file tree
Hide file tree
Showing 24 changed files with 462 additions and 52 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2016, 2022 Eurotech and/or its affiliates and others
* Copyright (c) 2016, 2024 Eurotech and/or its affiliates and others
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
Expand All @@ -17,12 +17,13 @@

import org.eclipse.kapua.KapuaException;
import org.eclipse.kapua.broker.artemis.plugin.security.setting.BrokerSetting;
import org.eclipse.kapua.broker.artemis.plugin.security.setting.BrokerSettingKey;
import org.eclipse.kapua.broker.artemis.plugin.utils.BrokerHostResolver;
import org.eclipse.kapua.broker.artemis.plugin.utils.BrokerIdResolver;
import org.eclipse.kapua.broker.artemis.plugin.utils.BrokerIdentity;
import org.eclipse.kapua.broker.artemis.plugin.utils.DefaultBrokerHostResolver;
import org.eclipse.kapua.broker.artemis.plugin.utils.DefaultBrokerIdResolver;
import org.eclipse.kapua.commons.ContainerIdResolver;
import org.eclipse.kapua.commons.DefaultContainerIdResolver;
import org.eclipse.kapua.commons.core.AbstractKapuaModule;
import org.eclipse.kapua.commons.core.JaxbClassProvider;
import org.eclipse.kapua.commons.liquibase.DatabaseCheckUpdate;
Expand Down Expand Up @@ -67,12 +68,6 @@ String metricModuleName() {
return "broker-telemetry";
}

@Provides
@Named("eventsModuleName")
String eventModuleName() {
return "telemetry";
}

@Provides
@Singleton
@Named("brokerHost")
Expand All @@ -88,7 +83,49 @@ BrokerIdResolver brokerIdResolver(BrokerSetting brokerSettings) throws KapuaExce

@Singleton
@Provides
BrokerHostResolver brokerHostResolver(BrokerSetting brokerSettings) throws KapuaException {
return new DefaultBrokerHostResolver(brokerSettings.getString(BrokerSettingKey.BROKER_HOST));
ContainerIdResolver containerIdResolver(SystemSetting systemSetting) throws KapuaException {
return new DefaultContainerIdResolver(systemSetting.getString(SystemSettingKey.CONTAINER_ID));
}

@Singleton
@Provides
BrokerHostResolver brokerHostResolver(SystemSetting systemSetting) throws KapuaException {
return new DefaultBrokerHostResolver(systemSetting.getString(SystemSettingKey.BROKER_HOST));
}

@Provides
@Named("accountEvtSubscriptionGroupId")
String accountEvtSubscriptionGroupId(ContainerIdResolver containerIdResolver) {
return "brk-tel-" + containerIdResolver.getContainerId();
}

@Provides
@Named("authenticationEvtSubscriptionGroupId")
String authenticationEvtSubscriptionGroupId(ContainerIdResolver containerIdResolver) {
return "brk-tel-" + containerIdResolver.getContainerId();
}

@Provides
@Named("authorizationEvtSubscriptionGroupId")
String authorizationEvtSubscriptionGroupId(ContainerIdResolver containerIdResolver) {
return "brk-tel-" + containerIdResolver.getContainerId();
}

@Provides
@Named("deviceConnectionEvtSubscriptionGroupId")
String deviceConnectionEvtSubscriptionGroupId(ContainerIdResolver containerIdResolver) {
return "brk-tel-" + containerIdResolver.getContainerId();
}

@Provides
@Named("deviceRegistryEvtSubscriptionGroupId")
String deviceRegistryEvtSubscriptionGroupId(ContainerIdResolver containerIdResolver) {
return "brk-tel-" + containerIdResolver.getContainerId();
}

@Provides
@Named("userEvtSubscriptionGroupId")
String userEvtSubscriptionGroupId(ContainerIdResolver containerIdResolver) {
return "brk-tel-" + containerIdResolver.getContainerId();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2019, 2022 Eurotech and/or its affiliates and others
* Copyright (c) 2019, 2024 Eurotech and/or its affiliates and others
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
Expand Down Expand Up @@ -90,4 +90,5 @@ ServiceClient authServiceClient(
public String authServiceRequestAddress() {
return "$SYS/SVC/auth/request";
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*******************************************************************************
* Copyright (c) 2024 Eurotech and/or its affiliates and others
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* Eurotech - initial API and implementation
*******************************************************************************/
package org.eclipse.kapua.commons;

/**
* Resolves the container id
*/
public interface ContainerIdResolver {

String getContainerId();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*******************************************************************************
* Copyright (c) 2017, 2024 Eurotech and/or its affiliates and others
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* Eurotech - initial API and implementation
*******************************************************************************/
package org.eclipse.kapua.commons;

import com.google.inject.Inject;
import org.apache.commons.lang3.StringUtils;
import org.eclipse.kapua.KapuaErrorCodes;
import org.eclipse.kapua.KapuaException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Lookup from the configuration file
*
* @since 1.0
*/
public class DefaultContainerIdResolver implements ContainerIdResolver {

protected static final Logger logger = LoggerFactory.getLogger(DefaultContainerIdResolver.class);

private static final String CANNOT_FIND_IP_ERROR_MSG = "Cannot resolve the container id. Please check the configuration!";
private final String containerId;

@Inject
public DefaultContainerIdResolver(String containerId) throws KapuaException {
this.containerId = containerId;
logger.info("Loaded container id: {}", this.containerId);
if (StringUtils.isEmpty(this.containerId)) {
throw new KapuaException(KapuaErrorCodes.INTERNAL_ERROR, CANNOT_FIND_IP_ERROR_MSG);
}
}

@Override
public String getContainerId() {
return containerId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,11 @@ private String getSubscriptionName(String address, String subscriber) {
public ServiceEventTransactionalModule(
ServiceEventClientConfiguration[] serviceEventClientConfigurations,
String internalAddress,
String uniqueClientId,
String subscriptionGroupId,
ServiceEventHouseKeeperFactory serviceEventTransactionalHousekeeperFactory,
ServiceEventBus serviceEventBus) {
this.serviceEventBus = serviceEventBus;
this.serviceEventClientConfigurations = appendClientId(uniqueClientId, serviceEventClientConfigurations);
this.serviceEventClientConfigurations = appendClientId(subscriptionGroupId, serviceEventClientConfigurations);
this.internalAddress = internalAddress;
this.houseKeeperFactory = serviceEventTransactionalHousekeeperFactory;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
*/
public enum SystemSettingKey implements SettingKey {

/**
* Container id (used for example to set a unique identifier for event listeners so events could be received by all the instances of a certain type)
*/
CONTAINER_ID("container.id"),
/**
* Cluster name
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ broker.connector.internal.amqp.name=amqp
broker.connector.internal.username=internalUsername
broker.connector.internal.password=internalPassword

container.id=default

character.encoding=UTF-8

#
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import javax.inject.Named;
import javax.inject.Singleton;

import org.eclipse.kapua.commons.ContainerIdResolver;
import org.eclipse.kapua.commons.core.AbstractKapuaModule;
import org.eclipse.kapua.commons.util.xml.JAXBContextProvider;

Expand All @@ -33,9 +34,46 @@ String metricModuleName() {
return "web-console";
}

@Singleton
@Provides
@Named("eventsModuleName")
String eventModuleName() {
return "console";
ContainerIdResolver containerIdResolver(SystemSetting systemSetting) throws KapuaException {
return new DefaultContainerIdResolver(systemSetting.getString(SystemSettingKey.CONTAINER_ID));
}

@Provides
@Named("accountEvtSubscriptionGroupId")
String accountEvtSubscriptionGroupId(ContainerIdResolver containerIdResolver) {
return "console-" + containerIdResolver.getContainerId();
}

@Provides
@Named("authenticationEvtSubscriptionGroupId")
String authenticationEvtSubscriptionGroupId(ContainerIdResolver containerIdResolver) {
return "console-" + containerIdResolver.getContainerId();
}

@Provides
@Named("authorizationEvtSubscriptionGroupId")
String authorizationEvtSubscriptionGroupId(ContainerIdResolver containerIdResolver) {
return "console-" + containerIdResolver.getContainerId();
}

@Provides
@Named("deviceConnectionEvtSubscriptionGroupId")
String deviceConnectionEvtSubscriptionGroupId(ContainerIdResolver containerIdResolver) {
return "brk-tel-" + containerIdResolver.getContainerId();
}

@Provides
@Named("deviceRegistryEvtSubscriptionGroupId")
String deviceRegistryEvtSubscriptionGroupId(ContainerIdResolver containerIdResolver) {
return "console-" + containerIdResolver.getContainerId();
}

@Provides
@Named("userEvtSubscriptionGroupId")
String userEvtSubscriptionGroupId(ContainerIdResolver containerIdResolver) {
return "console-" + containerIdResolver.getContainerId();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,14 @@
import javax.inject.Named;
import javax.inject.Singleton;

import org.eclipse.kapua.KapuaException;
import org.eclipse.kapua.commons.ContainerIdResolver;
import org.eclipse.kapua.commons.DefaultContainerIdResolver;
import org.eclipse.kapua.commons.core.AbstractKapuaModule;
import org.eclipse.kapua.commons.core.JaxbClassProvider;
import org.eclipse.kapua.commons.liquibase.DatabaseCheckUpdate;
import org.eclipse.kapua.commons.setting.system.SystemSetting;
import org.eclipse.kapua.commons.setting.system.SystemSettingKey;
import org.eclipse.kapua.commons.util.xml.JAXBContextProvider;
import org.eclipse.kapua.commons.util.xml.JAXBContextProviderImpl;
import org.eclipse.kapua.commons.util.xml.XmlRootAnnotatedJaxbClassesScanner;
Expand Down Expand Up @@ -48,9 +53,53 @@ String metricModuleName() {
return MetricsLifecycle.CONSUMER_LIFECYCLE;
}

@Singleton
@Provides
@Named("eventsModuleName")
String eventModuleName() {
return "lifecycle";
ContainerIdResolver containerIdResolver(SystemSetting systemSetting) throws KapuaException {
return new DefaultContainerIdResolver(systemSetting.getString(SystemSettingKey.CONTAINER_ID));
}

@Provides
@Named("accountEvtSubscriptionGroupId")
String accountEvtSubscriptionGroupId(ContainerIdResolver containerIdResolver) {
return "con-lif-" + containerIdResolver.getContainerId();
}

@Provides
@Named("authenticationEvtSubscriptionGroupId")
String authenticationEvtSubscriptionGroupId(ContainerIdResolver containerIdResolver) {
return "con-lif-" + containerIdResolver.getContainerId();
}

@Provides
@Named("authorizationEvtSubscriptionGroupId")
String authorizationEvtSubscriptionGroupId(ContainerIdResolver containerIdResolver) {
return "con-lif-" + containerIdResolver.getContainerId();
}

@Provides
@Named("deviceConnectionEvtSubscriptionGroupId")
String deviceConnectionEvtSubscriptionGroupId(ContainerIdResolver containerIdResolver) {
return "brk-tel-" + containerIdResolver.getContainerId();
}

@Provides
@Named("deviceRegistryEvtSubscriptionGroupId")
String deviceRegistryEvtSubscriptionGroupId(ContainerIdResolver containerIdResolver) {
return "con-lif-" + containerIdResolver.getContainerId();
}

@Provides
@Named("userEvtSubscriptionGroupId")
String userEvtSubscriptionGroupId(ContainerIdResolver containerIdResolver) {
return "con-lif-" + containerIdResolver.getContainerId();
}

@Provides
@Singleton
@Named("clusterName")
String clusterName(SystemSetting systemSetting) {
return systemSetting.getString(SystemSettingKey.CLUSTER_NAME);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,14 @@
import javax.inject.Named;
import javax.inject.Singleton;

import org.eclipse.kapua.KapuaException;
import org.eclipse.kapua.commons.ContainerIdResolver;
import org.eclipse.kapua.commons.DefaultContainerIdResolver;
import org.eclipse.kapua.commons.core.AbstractKapuaModule;
import org.eclipse.kapua.commons.core.JaxbClassProvider;
import org.eclipse.kapua.commons.liquibase.DatabaseCheckUpdate;
import org.eclipse.kapua.commons.setting.system.SystemSetting;
import org.eclipse.kapua.commons.setting.system.SystemSettingKey;
import org.eclipse.kapua.commons.util.xml.JAXBContextProvider;
import org.eclipse.kapua.commons.util.xml.JAXBContextProviderImpl;
import org.eclipse.kapua.commons.util.xml.XmlRootAnnotatedJaxbClassesScanner;
Expand Down Expand Up @@ -48,9 +53,45 @@ String metricModuleName() {
return MetricsTelemetry.CONSUMER_TELEMETRY;
}

@Singleton
@Provides
@Named("eventsModuleName")
String eventModuleName() {
return "telemetry";
ContainerIdResolver containerIdResolver(SystemSetting systemSetting) throws KapuaException {
return new DefaultContainerIdResolver(systemSetting.getString(SystemSettingKey.CONTAINER_ID));
}

@Provides
@Named("accountEvtSubscriptionGroupId")
String accountEvtSubscriptionGroupId(ContainerIdResolver containerIdResolver) {
return "con-tel-" + containerIdResolver.getContainerId();
}

@Provides
@Named("authenticationEvtSubscriptionGroupId")
String authenticationEvtSubscriptionGroupId(ContainerIdResolver containerIdResolver) {
return "con-tel-" + containerIdResolver.getContainerId();
}

@Provides
@Named("authorizationEvtSubscriptionGroupId")
String authorizationEvtSubscriptionGroupId(ContainerIdResolver containerIdResolver) {
return "con-tel-" + containerIdResolver.getContainerId();
}

@Provides
@Named("deviceConnectionEvtSubscriptionGroupId")
String deviceConnectionEvtSubscriptionGroupId(ContainerIdResolver containerIdResolver) {
return "brk-tel-" + containerIdResolver.getContainerId();
}

@Provides
@Named("deviceRegistryEvtSubscriptionGroupId")
String deviceRegistryEvtSubscriptionGroupId(ContainerIdResolver containerIdResolver) {
return "con-tel-" + containerIdResolver.getContainerId();
}

@Provides
@Named("userEvtSubscriptionGroupId")
String userEvtSubscriptionGroupId(ContainerIdResolver containerIdResolver) {
return "con-tel-" + containerIdResolver.getContainerId();
}
}
Loading

0 comments on commit 916e4f1

Please sign in to comment.