From f87fffaf8e01444e017887e6951baa8ea4672dc2 Mon Sep 17 00:00:00 2001 From: Ruchir Vani Date: Thu, 4 Jan 2024 13:40:24 -0500 Subject: [PATCH] Release -0.8.x 1. Upgrade kafka client 2. Clean up and upgrade other dependencies 3. Fix consumer group id 4. Update using apche kafka callback handler for Oauth --- docker/Dockerfile | 2 +- ncds-sdk/pom.xml | 78 ++-- .../com/nasdaq/ncdsclient/NCDSClient.java | 1 + .../consumer/NasdaqKafkaAvroConsumer.java | 57 ++- .../ncdsclient/internal/ReadSchemaTopic.java | 6 +- .../utils/AuthenticationConfigLoader.java | 68 +--- .../internal/utils/ConfigConstants.java | 86 +++++ .../internal/utils/KafkaConfigLoader.java | 17 +- ncds-sdk/src/main/resources/log4j.xml | 22 -- ncds-sdk/src/main/resources/log4j2.xml | 15 + ncds-sdk/src/test/resources/log4j.xml | 2 +- ncdssdk-client/pom.xml | 20 +- .../com/nasdaq/ncdsclient/NCDSSession.java | 7 +- .../clientAuthentication-config.properties | 2 +- .../main/resources/kafka-config.properties | 2 +- ncdssdk-client/src/main/resources/log4j.xml | 22 -- ncdssdk-client/src/main/resources/log4j2.xml | 15 + pom.xml | 346 ++++++++++++++++-- 18 files changed, 544 insertions(+), 224 deletions(-) create mode 100644 ncds-sdk/src/main/java/com/nasdaq/ncdsclient/internal/utils/ConfigConstants.java delete mode 100644 ncds-sdk/src/main/resources/log4j.xml create mode 100644 ncds-sdk/src/main/resources/log4j2.xml delete mode 100644 ncdssdk-client/src/main/resources/log4j.xml create mode 100644 ncdssdk-client/src/main/resources/log4j2.xml diff --git a/docker/Dockerfile b/docker/Dockerfile index 78f0ef1..f427756 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -10,7 +10,7 @@ RUN mvn -B \ ### Build Images ### ## SDK app ## -FROM strimzi/kafka:0.20.0-kafka-2.6.0 as sdk-app +FROM quay.io/strimzi/kafka:0.32.0-kafka-3.3.1 as sdk-app COPY . /home/kafka diff --git a/ncds-sdk/pom.xml b/ncds-sdk/pom.xml index 8af6ffe..f8cf59d 100644 --- a/ncds-sdk/pom.xml +++ b/ncds-sdk/pom.xml @@ -7,119 +7,111 @@ com.nasdaq.ncds ncds - 0.7.0 + 0.8.4 ncds-sdk jar - kafka_2.12 - 5.7.2 1.3.2 - 2.12.0 - 1.7.30 2.22.2 - 0.8.1 + SDK Provide Development Kit to connect with Kafka + + org.apache.avro + avro + + + org.apache.kafka + kafka-clients + + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-annotations + + + + com.fasterxml.jackson.core + jackson-databind + + org.json json - 20190722 + + org.slf4j + slf4j-simple + test + io.strimzi kafka-oauth-common - ${strimzi.oauth.version} - - - org.apache.kafka - ${kafkaScalaVersion} - ${kafka.version} - - - - org.slf4j - slf4j-log4j12 - - - javax.mail - mail - - - test - + + + org.apache.kafka + kafka_2.12 + com.salesforce.kafka.test kafka-junit-core - 3.2.3 - test org.junit.jupiter junit-jupiter-api - ${junit5.version} - test org.junit.jupiter junit-jupiter-params - ${junit5.version} - test org.mockito mockito-core - 2.28.2 - test org.apache.curator curator-test - ${curatorTestVersion} - test org.slf4j - slf4j-simple - ${slf4jVersion} - test + slf4j-api com.github.stephenc.high-scale-lib high-scale-lib - 1.1.4 - test org.apache.commons commons-lang3 - 3.3.2 - test diff --git a/ncds-sdk/src/main/java/com/nasdaq/ncdsclient/NCDSClient.java b/ncds-sdk/src/main/java/com/nasdaq/ncdsclient/NCDSClient.java index b177886..25e9fd4 100644 --- a/ncds-sdk/src/main/java/com/nasdaq/ncdsclient/NCDSClient.java +++ b/ncds-sdk/src/main/java/com/nasdaq/ncdsclient/NCDSClient.java @@ -30,6 +30,7 @@ public class NCDSClient { /** * * @param securityCfg - Authentication Security Properties passed from the Client + * @param kafkaCfg * @throws Exception - Java Exception */ public NCDSClient(Properties securityCfg,Properties kafkaCfg) throws Exception { diff --git a/ncds-sdk/src/main/java/com/nasdaq/ncdsclient/consumer/NasdaqKafkaAvroConsumer.java b/ncds-sdk/src/main/java/com/nasdaq/ncdsclient/consumer/NasdaqKafkaAvroConsumer.java index 2d966f4..49f4e76 100644 --- a/ncds-sdk/src/main/java/com/nasdaq/ncdsclient/consumer/NasdaqKafkaAvroConsumer.java +++ b/ncds-sdk/src/main/java/com/nasdaq/ncdsclient/consumer/NasdaqKafkaAvroConsumer.java @@ -32,41 +32,37 @@ public class NasdaqKafkaAvroConsumer { private KafkaConsumer kafkaConsumer; private String clientID; - private Properties securityProps; - private Properties kafkaProps; + private Properties properties = new Properties(); private ReadSchemaTopic readSchemaTopic = new ReadSchemaTopic(); public NasdaqKafkaAvroConsumer(Properties securityCfg,Properties kafkaCfg ) throws Exception { try { - if (kafkaCfg == null) + if (securityCfg == null) { + properties.setProperty(AuthenticationConfigLoader.OAUTH_CLIENT_ID, "unit-test"); // Just for the unit tests. + } + else { + properties.putAll(securityCfg); + } + if (kafkaCfg == null) { if (IsItJunit.isJUnitTest()) { Properties junitKafkaCfg = KafkaConfigLoader.loadConfig(); - kafkaProps = junitKafkaCfg; + properties.putAll(junitKafkaCfg); } else { throw new Exception("Kafka Configuration not Defined "); } - - else { - kafkaProps = kafkaCfg; - KafkaConfigLoader.validateAndAddSpecificProperties(kafkaProps); - } - - if (securityCfg == null) { - securityProps = new Properties(); - securityProps.setProperty(AuthenticationConfigLoader.OAUTH_CLIENT_ID, "unit-test"); // Just for the unit tests. } else { - securityProps = securityCfg; - + properties.putAll(kafkaCfg); + KafkaConfigLoader.validateAndAddSpecificProperties(properties); } } catch (Exception e) { throw (e); } - readSchemaTopic.setSecurityProps(securityProps); - readSchemaTopic.setKafkaProps(kafkaProps); - this.clientID = getClientID(securityProps); + readSchemaTopic.setSecurityProps(properties); + readSchemaTopic.setKafkaProps(properties); + this.clientID = getClientID(properties); } @@ -86,7 +82,7 @@ public KafkaConsumer getKafkaConsumer(String streamName) throws Exception { kafkaConsumer = getConsumer(kafkaSchema, streamName); TopicPartition topicPartition = new TopicPartition(streamName + ".stream",0); kafkaConsumer.assign(Collections.singletonList(topicPartition)); - if(kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).equals(OffsetResetStrategy.EARLIEST.toString().toLowerCase())) { + if(properties.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).equals(OffsetResetStrategy.EARLIEST.toString().toLowerCase())) { return seekToMidNight(topicPartition); } } @@ -144,21 +140,20 @@ public KafkaConsumer getKafkaConsumer(String streamName, long timestamp) throws public KafkaAvroConsumer getConsumer(Schema avroSchema, String streamName) throws Exception { try { - if(!IsItJunit.isJUnitTest()) { - ConfigProperties.resolveAndExportToSystemProperties(securityProps); - } +// if(!IsItJunit.isJUnitTest()) { +// ConfigProperties.resolveAndExportToSystemProperties(securityProps); +// } //Properties kafkaProps = KafkaConfigLoader.loadConfig(); - kafkaProps.put("key.deserializer", StringDeserializer.class.getName()); - kafkaProps.put("value.deserializer", AvroDeserializer.class.getName()); - if(!kafkaProps.containsKey(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)) { - kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.EARLIEST.toString().toLowerCase()); + properties.put("key.deserializer", StringDeserializer.class.getName()); + properties.put("value.deserializer", AvroDeserializer.class.getName()); + if(!properties.containsKey(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)) { + properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.EARLIEST.toString().toLowerCase()); } - if(!kafkaProps.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) { - kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, this.clientID + "_" + streamName + "_" + getDate()); + if(!properties.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) { + properties.put(ConsumerConfig.GROUP_ID_CONFIG, this.clientID);// + "_" + streamName + "_" + getDate()); } - ConfigProperties.resolve(kafkaProps); - return new KafkaAvroConsumer(kafkaProps, avroSchema); + return new KafkaAvroConsumer(properties, avroSchema); } catch (Exception e) { throw e; @@ -211,7 +206,7 @@ public KafkaConsumer getNewsConsumer(String topic) throws Exception { kafkaConsumer = getConsumer(newsSchema, topic); TopicPartition topicPartition = new TopicPartition(topic + ".stream",0); kafkaConsumer.assign(Collections.singletonList(topicPartition)); - if(kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).equals(OffsetResetStrategy.EARLIEST.toString().toLowerCase())) { + if(properties.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).equals(OffsetResetStrategy.EARLIEST.toString().toLowerCase())) { return seekToMidNight(topicPartition); } return kafkaConsumer; diff --git a/ncds-sdk/src/main/java/com/nasdaq/ncdsclient/internal/ReadSchemaTopic.java b/ncds-sdk/src/main/java/com/nasdaq/ncdsclient/internal/ReadSchemaTopic.java index 4aebc6b..de007e4 100644 --- a/ncds-sdk/src/main/java/com/nasdaq/ncdsclient/internal/ReadSchemaTopic.java +++ b/ncds-sdk/src/main/java/com/nasdaq/ncdsclient/internal/ReadSchemaTopic.java @@ -31,7 +31,7 @@ public ReadSchemaTopic(){ } public Schema readSchema(String topic) throws Exception { - KafkaConsumer schemaConsumer= getConsumer("Control-"+getClientID(securityProps)); + KafkaConsumer schemaConsumer= getConsumer(getClientID(securityProps)); Duration sec = Duration.ofSeconds(10); Schema messageSchema = null; ConsumerRecord lastRecord=null; @@ -88,7 +88,7 @@ public Set getTopics() throws Exception{ Set topics = new HashSet(); - KafkaConsumer schemaConsumer= getConsumer("Control-"+getClientID(securityProps)); + KafkaConsumer schemaConsumer= getConsumer(getClientID(securityProps)); Duration sec = Duration.ofSeconds(10); while (true) { ConsumerRecords schemaRecords = schemaConsumer.poll(sec); @@ -188,4 +188,4 @@ private long getTodayMidNightTimeStamp(){ return timestampFromMidnight; } - } +} \ No newline at end of file diff --git a/ncds-sdk/src/main/java/com/nasdaq/ncdsclient/internal/utils/AuthenticationConfigLoader.java b/ncds-sdk/src/main/java/com/nasdaq/ncdsclient/internal/utils/AuthenticationConfigLoader.java index 7cdd3d8..1a2198b 100644 --- a/ncds-sdk/src/main/java/com/nasdaq/ncdsclient/internal/utils/AuthenticationConfigLoader.java +++ b/ncds-sdk/src/main/java/com/nasdaq/ncdsclient/internal/utils/AuthenticationConfigLoader.java @@ -1,55 +1,28 @@ package com.nasdaq.ncdsclient.internal.utils; -import java.io.InputStream; -import java.util.Arrays; -import java.util.List; import java.util.Properties; -import java.util.logging.Logger; /** * Utility to load the auth configuration parameters. */ public class AuthenticationConfigLoader { - public static String OAUTH_TOKEN_ENDPOINT_URI ="oauth.token.endpoint.uri"; - public static String OAUTH_CLIENT_ID ="oauth.client.id"; - public static String OAUTH_CLIENT_SECRET="oauth.client.secret"; + public static String OAUTH_TOKEN_ENDPOINT_URI = ConfigConstants.STRIMZI_OAUTH_TOKEN_ENDPOINT_URI; + public static String OAUTH_CLIENT_ID = ConfigConstants.STRIMZI_OAUTH_CLIENT_ID; + public static String OAUTH_CLIENT_SECRET = ConfigConstants.STRIMZI_OAUTH_CLIENT_SECRET; public static String OAUTH_USERNAME_CLAIM="oauth.username.claim"; public static String getClientID(){ - String clientID; - try { - // Just for the unit test - Properties cfg = new Properties(); - cfg.setProperty(OAUTH_CLIENT_ID, "unit-test"); - - if(!IsItJunit.isJUnitTest()){ - clientID = cfg.getProperty(OAUTH_CLIENT_ID); - } - else { - clientID = "unit-test"; - } - return clientID; - } catch (Exception e) { - e.printStackTrace(); - return ""; - } + return "unit-test"; } public static String getClientID(Properties cfg){ - String clientID; try { if(!IsItJunit.isJUnitTest()){ - if (System.getenv("OAUTH_CLIENT_ID") == null) { - clientID = cfg.getProperty(OAUTH_CLIENT_ID); - } - else { - clientID = System.getenv("OAUTH_CLIENT_ID"); - } + return ConfigConstants.getPropertyOrEnv(cfg, ConfigConstants.NCDS_CLIENT_ID, ConfigConstants.ENV_OAUTH_CLIENT_ID); } else { - clientID = "unit-test"; + return "unit-test"; } - return clientID; } catch (Exception e) { e.printStackTrace(); return ""; @@ -57,28 +30,19 @@ public static String getClientID(Properties cfg){ } public static boolean validateSecurityConfig(Properties cfg, Properties kafkaCfg) throws Exception { - - addNasdaqSpecificAuthProperties(cfg); - if (cfg.getProperty(OAUTH_TOKEN_ENDPOINT_URI) == null) { - throw new Exception ("Authentication Setting :" + OAUTH_TOKEN_ENDPOINT_URI + " Missing" ); - } - if (cfg.getProperty(OAUTH_CLIENT_ID) == null && System.getenv("OAUTH_CLIENT_ID") == null ) { - throw new Exception ("Authentication Setting :" + OAUTH_CLIENT_ID + " Missing" ); + if (cfg.getProperty(OAUTH_TOKEN_ENDPOINT_URI) == null && cfg.getProperty(ConfigConstants.OAUTH_TOKEN_ENDPOINT_URI) == null ) { + throw new Exception(String.format("Authentication Setting : %s and %s (preferred) Missing", OAUTH_TOKEN_ENDPOINT_URI, ConfigConstants.OAUTH_TOKEN_ENDPOINT_URI) ); } - if (cfg.getProperty(OAUTH_CLIENT_SECRET) == null && System.getenv("OAUTH_CLIENT_SECRET") == null) { - throw new Exception("Authentication Setting :" + OAUTH_CLIENT_SECRET + " Missing" ); - } - if (cfg.getProperty(OAUTH_USERNAME_CLAIM) == null) { - throw new Exception("Authentication Setting :" + OAUTH_USERNAME_CLAIM + " Missing" ); + final String jaasConfig = cfg.getProperty(ConfigConstants.JAAS_CONFIG); + if (jaasConfig == null) { + if (ConfigConstants.getPropertyOrEnv(cfg, OAUTH_CLIENT_ID, ConfigConstants.ENV_OAUTH_CLIENT_ID) == null) { + throw new Exception ("Authentication Setting :" + OAUTH_CLIENT_ID + " Missing" ); + } + if (ConfigConstants.getPropertyOrEnv(cfg, OAUTH_CLIENT_SECRET, ConfigConstants.ENV_OAUTH_CLIENT_SECRET) == null) { + throw new Exception("Authentication Setting :" + OAUTH_CLIENT_SECRET + " Missing" ); + } } - return true; } - private static Properties addNasdaqSpecificAuthProperties(Properties p){ - if(!IsItJunit.isJUnitTest()) { - p.setProperty("oauth.username.claim","preferred_username"); - } - return p; - } } diff --git a/ncds-sdk/src/main/java/com/nasdaq/ncdsclient/internal/utils/ConfigConstants.java b/ncds-sdk/src/main/java/com/nasdaq/ncdsclient/internal/utils/ConfigConstants.java new file mode 100644 index 0000000..828013e --- /dev/null +++ b/ncds-sdk/src/main/java/com/nasdaq/ncdsclient/internal/utils/ConfigConstants.java @@ -0,0 +1,86 @@ +package com.nasdaq.ncdsclient.internal.utils; + +import java.util.Properties; + +/** + * + */ +public final class ConfigConstants { + + public static final String STRIMZI_OAUTH_TOKEN_ENDPOINT_URI = "oauth.token.endpoint.uri"; + public static final String STRIMZI_OAUTH_CLIENT_ID = "oauth.client.id"; + public static final String STRIMZI_OAUTH_CLIENT_SECRET = "oauth.client.secret"; + public static final String OAUTH_TOKEN_ENDPOINT_URI = "sasl.oauthbearer.token.endpoint.url"; + public static final String JAAS_CONFIG = "sasl.jaas.config"; + public static final String CONFLUENT_CLUSTER_ID_CONFIG = "confluent.cluster.id"; + public static final String CONFLUENT_IDENTITY_POOL_ID = "confluent.identity.pool.id"; + public static final String ENV_OAUTH_CLIENT_ID = "OAUTH_CLIENT_ID"; + public static final String ENV_OAUTH_CLIENT_SECRET = "OAUTH_CLIENT_SECRET"; + public static final String NCDS_CLIENT_ID = "ncds.client.id"; + + private static final String SASL_LOGIN_CALLBACK_HANDLER_CLASS = "sasl.login.callback.handler.class"; + private static final String SASL_MECHANISM = "sasl.mechanism"; + private static final String SECURITY_PROTOCOL = "security.protocol"; + public static final String JAAS_CONFIG_VALUE = "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId='%s' clientSecret='%s' extension_logicalCluster='%s' extension_identityPoolId='%s';"; + + private static final Properties defaultProperties = defaultClusterProperties(); + + private ConfigConstants() { + } + + public static final String getPropertyOrEnv(Properties props, String propsKey, String envKey) { + String property = props.getProperty(propsKey); + if (property != null) { + return property; + } + property = System.getProperty(propsKey); + if (property != null) { + return property; + } + return System.getenv(envKey); + } + + public static final void addDefaults(Properties props) { + addDefaults(props, SECURITY_PROTOCOL); + addDefaults(props, SASL_MECHANISM); + addDefaults(props, SASL_LOGIN_CALLBACK_HANDLER_CLASS); + } + + private static final void addDefaults(Properties props, String propsKey) { + if (props.getProperty(propsKey) == null) { + props.setProperty(propsKey, defaultProperties.getProperty(propsKey)); + } + } + + private static final Properties defaultClusterProperties() { + final Properties p = new Properties(); + p.setProperty(SECURITY_PROTOCOL, "SASL_SSL"); + p.setProperty(SASL_MECHANISM, "OAUTHBEARER"); + p.setProperty(SASL_LOGIN_CALLBACK_HANDLER_CLASS, "org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"); + return p; + } + + public static final void addJaasConfig(Properties source) { + final String akUri = source.getProperty(OAUTH_TOKEN_ENDPOINT_URI); + if (akUri == null) { + final String strimziUri = source.getProperty(STRIMZI_OAUTH_TOKEN_ENDPOINT_URI); + source.remove(STRIMZI_OAUTH_TOKEN_ENDPOINT_URI); + source.setProperty(OAUTH_TOKEN_ENDPOINT_URI, strimziUri); + } + + final String jaasConfig = source.getProperty(JAAS_CONFIG); + if (jaasConfig == null) { + final String clientId = getPropertyOrEnv(source, STRIMZI_OAUTH_CLIENT_ID, ENV_OAUTH_CLIENT_ID); + final String clientSecret = getPropertyOrEnv(source, STRIMZI_OAUTH_CLIENT_SECRET, ENV_OAUTH_CLIENT_SECRET); + final String clusterId = source.getProperty(CONFLUENT_CLUSTER_ID_CONFIG, "dummy"); + final String identityPoolId = source.getProperty(CONFLUENT_IDENTITY_POOL_ID, "dummy"); + source.remove(STRIMZI_OAUTH_CLIENT_ID); + source.remove(STRIMZI_OAUTH_CLIENT_SECRET); + source.remove(CONFLUENT_CLUSTER_ID_CONFIG); + source.remove(CONFLUENT_IDENTITY_POOL_ID); + source.setProperty(JAAS_CONFIG, String.format(JAAS_CONFIG_VALUE, clientId, clientSecret, clusterId, identityPoolId)); + //Store client id with a non-strimzi key for use in calculating group.id, if needed + source.setProperty(NCDS_CLIENT_ID, clientId); + } + } +} diff --git a/ncds-sdk/src/main/java/com/nasdaq/ncdsclient/internal/utils/KafkaConfigLoader.java b/ncds-sdk/src/main/java/com/nasdaq/ncdsclient/internal/utils/KafkaConfigLoader.java index 525ea2c..2b3caab 100644 --- a/ncds-sdk/src/main/java/com/nasdaq/ncdsclient/internal/utils/KafkaConfigLoader.java +++ b/ncds-sdk/src/main/java/com/nasdaq/ncdsclient/internal/utils/KafkaConfigLoader.java @@ -20,25 +20,24 @@ public static Properties loadConfig() throws Exception { try { inputStream = ClassLoader.getSystemResourceAsStream("junit-config.properties"); if (inputStream == null) { - System.out.println("kafka-config.properties: Unable to produce input Stream "); - throw new Exception ("kafka-config.properties: Unable to produce input Stream "); + System.out.println("junit-config.properties: Unable to produce input Stream "); + throw new Exception ("junit-config.properties: Unable to produce input Stream "); } cfg.load(inputStream); } catch (Exception e) { throw e; } - nasdaqSepecificConfig(cfg); + nasdaqSpecificConfig(cfg); return cfg; } - private static Properties nasdaqSepecificConfig(Properties p) throws KafkaPropertiesException{ + private static Properties nasdaqSpecificConfig(Properties p) throws KafkaPropertiesException{ //Properties p = new Properties(); if(!IsItJunit.isJUnitTest()) { - p.setProperty("security.protocol", "SASL_SSL"); - p.setProperty("sasl.mechanism", "OAUTHBEARER"); - p.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required ;"); - p.setProperty("sasl.login.callback.handler.class", "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler"); + ConfigConstants.addDefaults(p); + ConfigConstants.addJaasConfig(p); + p.remove(AuthenticationConfigLoader.OAUTH_USERNAME_CLAIM); } return p; } @@ -47,7 +46,7 @@ public static Properties validateAndAddSpecificProperties(Properties p) throws E if (p.getProperty(BOOTSTRAP_SERVERS) == null) { throw new Exception ("bootstrap.servers Properties is not set in the Kafka Configuration "); } - nasdaqSepecificConfig(p); + nasdaqSpecificConfig(p); return p; } } diff --git a/ncds-sdk/src/main/resources/log4j.xml b/ncds-sdk/src/main/resources/log4j.xml deleted file mode 100644 index 437425a..0000000 --- a/ncds-sdk/src/main/resources/log4j.xml +++ /dev/null @@ -1,22 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - \ No newline at end of file diff --git a/ncds-sdk/src/main/resources/log4j2.xml b/ncds-sdk/src/main/resources/log4j2.xml new file mode 100644 index 0000000..ad7f47f --- /dev/null +++ b/ncds-sdk/src/main/resources/log4j2.xml @@ -0,0 +1,15 @@ + + + + + + %d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n + + + + + + + + + diff --git a/ncds-sdk/src/test/resources/log4j.xml b/ncds-sdk/src/test/resources/log4j.xml index c1ff8f6..26097ec 100644 --- a/ncds-sdk/src/test/resources/log4j.xml +++ b/ncds-sdk/src/test/resources/log4j.xml @@ -3,7 +3,7 @@ - + diff --git a/ncdssdk-client/pom.xml b/ncdssdk-client/pom.xml index c4ac94b..9486234 100644 --- a/ncdssdk-client/pom.xml +++ b/ncdssdk-client/pom.xml @@ -7,7 +7,7 @@ com.nasdaq.ncds ncds - 0.7.0 + 0.8.4 ncdssdk-client @@ -22,6 +22,22 @@ ncds-sdk ${project.version} + + org.apache.logging.log4j + log4j-slf4j-impl + runtime + + + org.apache.logging.log4j + log4j-api + runtime + + + + org.apache.logging.log4j + log4j-core + runtime + @@ -57,7 +73,7 @@ + implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> com.nasdaq.ncdsclient.NCDSSession diff --git a/ncdssdk-client/src/main/java/com/nasdaq/ncdsclient/NCDSSession.java b/ncdssdk-client/src/main/java/com/nasdaq/ncdsclient/NCDSSession.java index 41507c2..506f7bf 100644 --- a/ncdssdk-client/src/main/java/com/nasdaq/ncdsclient/NCDSSession.java +++ b/ncdssdk-client/src/main/java/com/nasdaq/ncdsclient/NCDSSession.java @@ -13,8 +13,8 @@ import java.io.File; import java.io.FileInputStream; import java.io.InputStream; -import java.time.Duration; import java.util.Arrays; +import java.util.HashSet; import java.util.Map; import java.util.Properties; import java.util.Set; @@ -306,10 +306,10 @@ else if( testOption.equals("FILTERSTREAM")){ Set msgTypeSet = null; if (symbols != null) { - symbolSet = Arrays.stream(symbols.split(",")).map(String::trim).collect(Collectors.toSet()); + symbolSet = new HashSet<>(Arrays.stream(symbols.split(",")).map(String::trim).collect(Collectors.toSet())); } if (msgTypes != null) { - msgTypeSet = Arrays.stream(msgTypes.split(",")).map(String::trim).collect(Collectors.toSet()); + msgTypeSet = new HashSet<>(Arrays.stream(msgTypes.split(",")).map(String::trim).collect(Collectors.toSet())); } ncdsClient = new NCDSClient(securityCfg,kafkaConfig); Consumer consumer; @@ -361,6 +361,7 @@ else if( testOption.equals("FILTERSTREAM")){ } catch (Exception e) { System.out.println(e.getMessage()); + e.printStackTrace(); System.exit(1); } } diff --git a/ncdssdk-client/src/main/resources/clientAuthentication-config.properties b/ncdssdk-client/src/main/resources/clientAuthentication-config.properties index 6115ccb..a2434fb 100644 --- a/ncdssdk-client/src/main/resources/clientAuthentication-config.properties +++ b/ncdssdk-client/src/main/resources/clientAuthentication-config.properties @@ -1,3 +1,3 @@ oauth.token.endpoint.uri=https://{auth_endpoint_url}/auth/realms/pro-realm/protocol/openid-connect/token oauth.client.id=fake-client-id -oauth.client.secret=fake-client-secret +oauth.client.secret=fake-client-secret \ No newline at end of file diff --git a/ncdssdk-client/src/main/resources/kafka-config.properties b/ncdssdk-client/src/main/resources/kafka-config.properties index 94c74c3..60c6e28 100644 --- a/ncdssdk-client/src/main/resources/kafka-config.properties +++ b/ncdssdk-client/src/main/resources/kafka-config.properties @@ -12,4 +12,4 @@ session.timeout.ms=10000 heartbeat.interval.ms=3000 auto.offset.reset=latest max.poll.interval.ms=30000 -max.poll.records=1000000 +max.poll.records=1000000 \ No newline at end of file diff --git a/ncdssdk-client/src/main/resources/log4j.xml b/ncdssdk-client/src/main/resources/log4j.xml deleted file mode 100644 index 0ba864f..0000000 --- a/ncdssdk-client/src/main/resources/log4j.xml +++ /dev/null @@ -1,22 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - \ No newline at end of file diff --git a/ncdssdk-client/src/main/resources/log4j2.xml b/ncdssdk-client/src/main/resources/log4j2.xml new file mode 100644 index 0000000..a1b9ed9 --- /dev/null +++ b/ncdssdk-client/src/main/resources/log4j2.xml @@ -0,0 +1,15 @@ + + + + + + %d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n + + + + + + + + + diff --git a/pom.xml b/pom.xml index bc6da82..0d9acfa 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ 4.0.0 com.nasdaq.ncds ncds - 0.7.0 + 0.8.4 pom Nasdaq Cloud Data Service @@ -15,46 +15,326 @@ - 1.8.2 - 2.8.0 - 0.8.1 + 2.15.2 + 1.11.1 + 2.4.9 + 3.4.0 + 2.20.0 + 4.1.94.Final + 1.7.36 + 0.12.0 true + 5.7.2 + 1.3.2 + 2.12.0 + 2.22.2 + + + gitlab + https://git.nasdaq.com/api/v4/projects/${env.CI_PROJECT_ID}/packages/maven + + + gitlab + https://git.nasdaq.com/api/v4/projects/${env.CI_PROJECT_ID}/packages/maven + + - - - - org.apache.avro - avro - ${avro.version} - - - - - org.apache.kafka - kafka-clients - ${kafka.version} - - - - - io.strimzi - kafka-oauth-client - ${strimzi.oauth.version} - - - - org.slf4j - slf4j-log4j12 - 1.7.30 - - - + + + + org.apache.logging.log4j + log4j-slf4j-impl + ${log4j.version} + runtime + + + org.apache.logging.log4j + log4j-api + ${log4j.version} + runtime + + + + org.apache.logging.log4j + log4j-core + ${log4j.version} + runtime + + + + + org.apache.avro + avro + ${avro.version} + + + com.google.guava + guava + + + org.codehaus.jackson + jackson-mapper-asl + + + org.slf4j + slf4j-api + + + org.apache.commons + commons-compress + + + com.fasterxml.jackson.core + jackson-core + + + + + com.fasterxml.jackson.core + jackson-core + ${jackson.version} + + + com.fasterxml.jackson.core + jackson-annotations + ${jackson.version} + + + + com.fasterxml.jackson.core + jackson-databind + ${jackson.version} + + + + + org.apache.kafka + kafka-clients + ${kafka.version} + + + + + io.strimzi + kafka-oauth-client + ${strimzi.oauth.version} + + + io.strimzi + kafka-oauth-common + ${strimzi.oauth.version} + + + + org.slf4j + slf4j-log4j12 + 1.7.36 + + + + org.apache.kafka + kafka_2.12 + ${kafka.version} + + + + org.slf4j + slf4j-log4j12 + + + javax.mail + mail + + + test + + + org.json + json + 20230227 + + + + com.salesforce.kafka.test + kafka-junit-core + 3.2.3 + test + + + + + org.junit.jupiter + junit-jupiter-api + ${junit5.version} + test + + + + org.junit.jupiter + junit-jupiter-params + ${junit5.version} + test + + + + + org.mockito + mockito-core + 2.28.2 + test + + + + org.apache.curator + curator-test + ${curatorTestVersion} + test + + + + + org.slf4j + slf4j-simple + ${slf4jVersion} + + + + org.slf4j + slf4j-api + ${slf4jVersion} + + + org.apache.logging.log4j + log4j-to-slf4j + + + + + + com.github.stephenc.high-scale-lib + high-scale-lib + 1.1.4 + test + + + + org.apache.commons + commons-lang3 + 3.3.2 + test + + + + io.netty + netty-common + ${netty.version} + + + + io.netty + netty-codec + ${netty.version} + + + + io.netty + netty-buffer + ${netty.version} + + + + io.netty + netty-transport + ${netty.version} + + + + io.netty + netty-resolver-dns + ${netty.version} + + + + io.netty + netty-handler + ${netty.version} + + + + net.minidev + json-smart + ${json.smart.version} + + + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + 3.1.2 + + + copy-dependencies + package + + copy-dependencies + + + ${project.build.directory}/copy-dependencies + + + + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + ncds-sdk ncdssdk-client + + + requireReleaseDependencies + + + + org.apache.maven.plugins + maven-enforcer-plugin + 3.0.0-M2 + + + enforce-no-snapshots + + enforce + + + + + No Snapshots Allowed! + + + true + + + + + + + + +