Skip to content

Commit

Permalink
change
Browse files Browse the repository at this point in the history
  • Loading branch information
annie-mac committed Oct 31, 2024
1 parent ce536f1 commit 088ebeb
Show file tree
Hide file tree
Showing 8 changed files with 335 additions and 64 deletions.
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-kafka-connect/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ Licensed under the MIT License.
--add-opens com.azure.cosmos/com.azure.cosmos.implementation=ALL-UNNAMED
--add-opens com.azure.cosmos/com.azure.cosmos.implementation.apachecommons.lang=ALL-UNNAMED
--add-opens com.azure.cosmos/com.azure.cosmos.implementation.caches=ALL-UNNAMED
--add-opens com.azure.cosmos/com.azure.cosmos.implementation.caches=com.azure.cosmos.kafka.connect
--add-opens com.azure.cosmos/com.azure.cosmos.implementation.faultinjection=ALL-UNNAMED
--add-opens com.azure.cosmos/com.azure.cosmos.implementation.guava25.base=ALL-UNNAMED
--add-opens com.azure.cosmos/com.azure.cosmos.implementation.routing=ALL-UNNAMED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,20 @@
package com.azure.cosmos.kafka.connect;

import com.azure.cosmos.CosmosAsyncClient;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosAsyncDatabase;
import com.azure.cosmos.implementation.apachecommons.lang.RandomUtils;
import com.azure.cosmos.kafka.connect.implementation.CosmosClientStore;
import com.azure.cosmos.kafka.connect.implementation.CosmosThroughputControlConfig;
import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConstants;
import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosExceptionsHelper;
import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosUtils;
import com.azure.cosmos.kafka.connect.implementation.sink.CosmosSinkConfig;
import com.azure.cosmos.kafka.connect.implementation.sink.CosmosSinkContainersConfig;
import com.azure.cosmos.kafka.connect.implementation.sink.CosmosSinkTask;
import com.azure.cosmos.kafka.connect.implementation.sink.CosmosSinkTaskConfig;
import com.azure.cosmos.models.PartitionKey;
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigValue;
Expand All @@ -25,6 +30,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand All @@ -42,20 +48,20 @@ public final class CosmosSinkConnector extends SinkConnector {

private CosmosSinkConfig sinkConfig;
private String connectorName;
private CosmosAsyncClient cosmosClient;

@Override
public void start(Map<String, String> props) {
LOGGER.info("Starting the kafka cosmos sink connector");
this.sinkConfig = new CosmosSinkConfig(props);
this.connectorName = props.containsKey(CONNECTOR_NAME) ? props.get(CONNECTOR_NAME).toString() : "EMPTY";
CosmosSinkContainersConfig containersConfig = this.sinkConfig.getContainersConfig();
CosmosAsyncClient cosmosAsyncClient =
this.cosmosClient =
CosmosClientStore.getCosmosClient(this.sinkConfig.getAccountConfig(), this.connectorName);
validateDatabaseAndContainers(
new ArrayList<>(containersConfig.getTopicToContainerMap().values()),
cosmosAsyncClient,
this.cosmosClient,
containersConfig.getDatabaseName());
cosmosAsyncClient.close();
}

@Override
Expand All @@ -68,8 +74,8 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
LOGGER.info("Setting task configurations with maxTasks {}", maxTasks);
List<Map<String, String>> configs = new ArrayList<>();

String clientMetadataCacheString = getClientMetadataCacheString();
String throughputControlClientMetadataCacheString = getThroughputControlClientMetadataCacheString();
String clientMetadataCachesString = getClientMetadataCachesSnapshotString();
String throughputControlClientMetadataCachesString = getThroughputControlClientMetadataCachesSnapshotString();

for (int i = 0; i < maxTasks; i++) {
Map<String, String> taskConfigs = this.sinkConfig.originalsStrings();
Expand All @@ -80,64 +86,88 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
RandomUtils.nextInt(1, 9999999)));
taskConfigs.put(
CosmosSinkTaskConfig.COSMOS_CLIENT_METADATA_CACHES_SNAPSHOT,
clientMetadataCacheString);
clientMetadataCachesString);
taskConfigs.put(
CosmosSinkTaskConfig.THROUGHPUT_CONTROL_COSMOS_CLIENT_METADATA_CACHES_SNAPSHOT,
throughputControlClientMetadataCacheString);
throughputControlClientMetadataCachesString);
configs.add(taskConfigs);
}

return configs;
}

private String getClientMetadataCacheString() {
CosmosAsyncClient cosmosAsyncClient =
CosmosClientStore.getCosmosClient(this.sinkConfig.getAccountConfig(), this.connectorName);

private String getClientMetadataCachesSnapshotString() {
CosmosSinkContainersConfig containersConfig = this.sinkConfig.getContainersConfig();
List<String> containers = new ArrayList<>(containersConfig.getTopicToContainerMap().values());
CosmosAsyncDatabase database = cosmosAsyncClient.getDatabase(containersConfig.getDatabaseName());
List<String> containerNames = new ArrayList<>(containersConfig.getTopicToContainerMap().values());
CosmosAsyncDatabase database = this.cosmosClient.getDatabase(containersConfig.getDatabaseName());

for (String container : containers) {
database.getContainer(container)
.read()
.onErrorResume(throwable -> {
LOGGER.warn("Failed to read container {}", container, throwable);
return Mono.empty();
})
.block();
// read a random item from each container to populate the collection cache
for (String containerName : containerNames) {
CosmosAsyncContainer container = database.getContainer(containerName);
readRandomItemFromContainer(container);
}

// read a random item from throughput control container if it is enabled and use the same account config as the cosmos client
CosmosThroughputControlConfig cosmosThroughputControlConfig = this.sinkConfig.getThroughputControlConfig();
if (cosmosThroughputControlConfig.isThroughputControlEnabled()) {
if (cosmosThroughputControlConfig.getThroughputControlAccountConfig() == null) {
CosmosAsyncContainer throughputControlContainer =
this.cosmosClient
.getDatabase(cosmosThroughputControlConfig.getGlobalThroughputControlDatabaseName())
.getContainer(cosmosThroughputControlConfig.getGlobalThroughputControlContainerName());
readRandomItemFromContainer(throughputControlContainer);
}
}

return KafkaCosmosUtils.convertClientMetadataCacheToString(cosmosAsyncClient);
return KafkaCosmosUtils.convertClientMetadataCacheSnapshotToString(this.cosmosClient);
}

private String getThroughputControlClientMetadataCacheString() {
private String getThroughputControlClientMetadataCachesSnapshotString() {
CosmosAsyncClient throughputControlClient = null;
if (this.sinkConfig.getThroughputControlConfig().isThroughputControlEnabled()
&& this.sinkConfig.getThroughputControlConfig().getThroughputControlAccountConfig() != null) {
throughputControlClient = CosmosClientStore.getCosmosClient(
this.sinkConfig.getThroughputControlConfig().getThroughputControlAccountConfig(),
this.connectorName
);

throughputControlClient
.getDatabase(
this.sinkConfig.getThroughputControlConfig().getGlobalThroughputControlDatabaseName())
.getContainer(this.sinkConfig.getThroughputControlConfig().getGlobalThroughputControlContainerName())
.read()
CosmosThroughputControlConfig throughputControlConfig = this.sinkConfig.getThroughputControlConfig();

try {
if (throughputControlConfig.isThroughputControlEnabled()
&& throughputControlConfig.getThroughputControlAccountConfig() != null) {
throughputControlClient = CosmosClientStore.getCosmosClient(
throughputControlConfig.getThroughputControlAccountConfig(),
this.connectorName
);
}

if (throughputControlClient != null) {
readRandomItemFromContainer(
throughputControlClient
.getDatabase(throughputControlConfig.getGlobalThroughputControlDatabaseName())
.getContainer(throughputControlConfig.getGlobalThroughputControlContainerName()));
}
return KafkaCosmosUtils.convertClientMetadataCacheSnapshotToString(throughputControlClient);

} finally {
if (throughputControlClient != null) {
throughputControlClient.close();
}
}
}

private void readRandomItemFromContainer(CosmosAsyncContainer container) {
if (container != null) {
container.readItem(UUID.randomUUID().toString(), new PartitionKey(UUID.randomUUID().toString()), JsonNode.class)
.onErrorResume(throwable -> {
LOGGER.warn("Failed to read throughput control container", throwable);
if (!KafkaCosmosExceptionsHelper.isNotFoundException(throwable)) {
LOGGER.warn("Failed to read item from container {}", container.getId(), throwable);
}
return Mono.empty();
})
.block();

}

return KafkaCosmosUtils.convertClientMetadataCacheToString(throughputControlClient);
}

@Override
public void stop() {
if (this.cosmosClient != null) {
this.cosmosClient.close();
}
LOGGER.info("Kafka Cosmos sink connector {} is stopped.");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@

import com.azure.cosmos.CosmosAsyncClient;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosAsyncDatabase;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.implementation.apachecommons.lang.RandomUtils;
import com.azure.cosmos.implementation.apachecommons.lang.tuple.Pair;
import com.azure.cosmos.kafka.connect.implementation.CosmosClientStore;
import com.azure.cosmos.kafka.connect.implementation.CosmosMasterKeyAuthConfig;
import com.azure.cosmos.kafka.connect.implementation.CosmosThroughputControlConfig;
import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConstants;
import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosExceptionsHelper;
import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosUtils;
Expand All @@ -30,8 +32,10 @@
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.CosmosContainerResponse;
import com.azure.cosmos.models.FeedRange;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.models.PartitionKeyDefinition;
import com.azure.cosmos.models.ThroughputProperties;
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigValue;
Expand All @@ -50,6 +54,7 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -238,8 +243,8 @@ private List<Map<String, String>> getFeedRangeTaskConfigs(List<FeedRangeTaskUnit
}

List<Map<String, String>> feedRangeTaskConfigs = new ArrayList<>();
String clientMetadataCacheString = getClientMetadataCacheString();
String throughputControlClientMetadataCacheString = getThroughputControlClientMetadataCacheString();
String clientMetadataCachesString = getClientMetadataCachesSnapshotString();
String throughputControlClientMetadataCachesString = getThroughputControlClientMetadataCachesSnapshotString();

partitionedTaskUnits.forEach(feedRangeTaskUnits -> {
Map<String, String> taskConfigs = this.config.originalsStrings();
Expand All @@ -252,10 +257,10 @@ private List<Map<String, String>> getFeedRangeTaskConfigs(List<FeedRangeTaskUnit
RandomUtils.nextInt(1, 9999999)));
taskConfigs.put(
CosmosSourceTaskConfig.COSMOS_CLIENT_METADATA_CACHES_SNAPSHOT,
clientMetadataCacheString);
clientMetadataCachesString);
taskConfigs.put(
CosmosSourceTaskConfig.THROUGHPUT_CONTROL_COSMOS_CLIENT_METADATA_CACHES_SNAPSHOT,
throughputControlClientMetadataCacheString);
throughputControlClientMetadataCachesString);
feedRangeTaskConfigs.add(taskConfigs);
});

Expand Down Expand Up @@ -497,31 +502,82 @@ private Map<String, String> getContainersTopicMap(List<CosmosContainerProperties
return effectiveContainersTopicMap;
}

private String getClientMetadataCacheString() {
return KafkaCosmosUtils.convertClientMetadataCacheToString(this.cosmosClient);
private String getClientMetadataCachesSnapshotString() {

CosmosSourceContainersConfig containersConfig = this.config.getContainersConfig();
List<String> containerNames =
this.monitorThread
.getAllContainers()
.block()
.stream().map(CosmosContainerProperties::getId)
.collect(Collectors.toList());;
CosmosAsyncDatabase database = this.cosmosClient.getDatabase(containersConfig.getDatabaseName());

// read a random item from each container to populate the collection cache
for (String containerName : containerNames) {
CosmosAsyncContainer container = database.getContainer(containerName);
this.readRandomItemFromContainer(container);
}

// read a random item from throughput control container if it is enabled and use the same account config as the cosmos client
CosmosThroughputControlConfig cosmosThroughputControlConfig = this.config.getThroughputControlConfig();
if (cosmosThroughputControlConfig.isThroughputControlEnabled()) {
if (cosmosThroughputControlConfig.getThroughputControlAccountConfig() == null) {
CosmosAsyncContainer throughputControlContainer =
this.cosmosClient
.getDatabase(cosmosThroughputControlConfig.getGlobalThroughputControlDatabaseName())
.getContainer(cosmosThroughputControlConfig.getGlobalThroughputControlContainerName());
readRandomItemFromContainer(throughputControlContainer);
}
}

// read a random item from metadata container if COSMOS storage type is used
if (this.config.getMetadataConfig().getStorageType() == CosmosMetadataStorageType.COSMOS) {
readRandomItemFromContainer(database.getContainer(this.config.getMetadataConfig().getStorageName()));
}

return KafkaCosmosUtils.convertClientMetadataCacheSnapshotToString(this.cosmosClient);
}

private String getThroughputControlClientMetadataCacheString() {
private String getThroughputControlClientMetadataCachesSnapshotString() {
CosmosAsyncClient throughputControlClient = null;
if (this.config.getThroughputControlConfig().isThroughputControlEnabled()
&& this.config.getThroughputControlConfig().getThroughputControlAccountConfig() != null) {
throughputControlClient = CosmosClientStore.getCosmosClient(
this.config.getThroughputControlConfig().getThroughputControlAccountConfig(),
this.connectorName
);
try {
CosmosThroughputControlConfig throughputControlConfig = this.config.getThroughputControlConfig();
if (throughputControlConfig.isThroughputControlEnabled()
&& throughputControlConfig.getThroughputControlAccountConfig() != null) {
throughputControlClient = CosmosClientStore.getCosmosClient(
this.config.getThroughputControlConfig().getThroughputControlAccountConfig(),
this.connectorName
);
}

if (throughputControlClient != null) {
this.readRandomItemFromContainer(
throughputControlClient
.getDatabase(throughputControlConfig.getGlobalThroughputControlDatabaseName())
.getContainer(throughputControlConfig.getGlobalThroughputControlContainerName())
);
}

throughputControlClient.getDatabase(this.config.getThroughputControlConfig().getGlobalThroughputControlDatabaseName())
.getContainer(this.config.getThroughputControlConfig().getGlobalThroughputControlContainerName())
.read()
return KafkaCosmosUtils.convertClientMetadataCacheSnapshotToString(throughputControlClient);
} finally {
if (throughputControlClient != null) {
throughputControlClient.close();
}
}
}

private void readRandomItemFromContainer(CosmosAsyncContainer container) {
if (container != null) {
container.readItem(UUID.randomUUID().toString(), new PartitionKey(UUID.randomUUID().toString()), JsonNode.class)
.onErrorResume(throwable -> {
LOGGER.warn("Failed to read throughput control container", throwable);
if (!KafkaCosmosExceptionsHelper.isNotFoundException(throwable)) {
LOGGER.warn("Failed to read item from container {}", container.getId(), throwable);
}
return Mono.empty();
})
.block();

}

return KafkaCosmosUtils.convertClientMetadataCacheToString(throughputControlClient);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public static CosmosClientMetadataCachesSnapshot getCosmosClientMetadataFromStri
return null;
}

public static String convertClientMetadataCacheToString(CosmosAsyncClient client) {
public static String convertClientMetadataCacheSnapshotToString(CosmosAsyncClient client) {
if (client == null) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.config.ConfigDef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -21,7 +19,6 @@
import java.util.stream.Collectors;

public class CosmosSourceTaskConfig extends CosmosSourceConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(CosmosSourceTaskConfig.class);
private static final ObjectMapper OBJECT_MAPPER = Utils.getSimpleObjectMapper();

public static final String SOURCE_METADATA_TASK_UNIT = "azure.cosmos.source.task.metadataTaskUnit";
Expand Down
Loading

0 comments on commit 088ebeb

Please sign in to comment.