diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/BlobDataBuilder.java b/src/main/java/net/snowflake/ingest/streaming/internal/BlobDataBuilder.java
new file mode 100644
index 000000000..4838b038e
--- /dev/null
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/BlobDataBuilder.java
@@ -0,0 +1,155 @@
+package net.snowflake.ingest.streaming.internal;
+
+import net.snowflake.ingest.utils.Logging;
+import net.snowflake.ingest.utils.ParameterProvider;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+import static net.snowflake.ingest.utils.Constants.MAX_BLOB_SIZE_IN_BYTES;
+
+/**
+ * Responsible for accepting data from channels and collating into collections that will be used to build the actual blobs
+ *
+ * A chunk is represented as a list of channel data from a single table
+ * A blob is represented as a list of chunks that must share the same schema (but not necessarily the same table)
+ *
+ * This class returns a list of blobs
+ */
+class BlobDataBuilder {
+ private static final Logging logger = new Logging(BlobDataBuilder.class);
+ private final List>>> allBlobs;
+ private final ParameterProvider parameterProvider;
+ private final String clientName;
+ private List>> currentBlob;
+ private ChannelData prevChannelData = null;
+ private float totalCurrentBlobSizeInBytes = 0F;
+ private float totalBufferSizeInBytes = 0F;
+
+ public BlobDataBuilder(String clientName, ParameterProvider parameterProvider) {
+ this.clientName = clientName;
+ this.parameterProvider = parameterProvider;
+ this.currentBlob = new ArrayList<>();
+ this.allBlobs = new ArrayList<>();
+ }
+
+ public List>>> getAllBlobData() {
+ addCurrentBlob();
+ return allBlobs;
+ }
+
+ public void appendDataForTable(Collection extends SnowflakeStreamingIngestChannelWithData> tableChannels) {
+ List> chunk = getChunkForTable(tableChannels);
+ appendChunk(chunk);
+ }
+
+ private List> getChunkForTable(Collection extends SnowflakeStreamingIngestChannelWithData> tableChannels) {
+ List> channelsDataPerTable = Collections.synchronizedList(new ArrayList<>());
+ // Use parallel stream since getData could be the performance bottleneck when we have a
+ // high number of channels
+ tableChannels.parallelStream()
+ .forEach(
+ channel -> {
+ if (channel.isValid()) {
+ ChannelData data = channel.getData();
+ if (data != null) {
+ channelsDataPerTable.add(data);
+ }
+ }
+ });
+ return channelsDataPerTable;
+ }
+
+ private void appendChunk(List> chunkData) {
+ if (chunkData.isEmpty()) {
+ return;
+ }
+
+ if (currentBlob.size() >= parameterProvider.getMaxChunksInBlob()) {
+ // Create a new blob if the current one already contains max allowed number of chunks
+ logger.logInfo(
+ "Max allowed number of chunks in the current blob reached. chunkCount={}"
+ + " maxChunkCount={}",
+ currentBlob.size(),
+ parameterProvider.getMaxChunksInBlob());
+
+ addCurrentBlob();
+ }
+
+ int i, start = 0;
+ for (i = 0; i < chunkData.size(); i++) {
+ ChannelData channelData = chunkData.get(i);
+ if (prevChannelData != null && shouldStopProcessing(
+ totalCurrentBlobSizeInBytes,
+ totalBufferSizeInBytes,
+ channelData,
+ prevChannelData)) {
+ logger.logInfo(
+ "Creation of another blob is needed because of blob/chunk size limit or"
+ + " different encryption ids or different schema, client={}, table={},"
+ + " blobSize={}, chunkSize={}, nextChannelSize={}, encryptionId1={},"
+ + " encryptionId2={}, schema1={}, schema2={}",
+ clientName,
+ channelData.getChannelContext().getTableName(),
+ totalCurrentBlobSizeInBytes,
+ totalBufferSizeInBytes,
+ channelData.getBufferSize(),
+ channelData.getChannelContext().getEncryptionKeyId(),
+ prevChannelData.getChannelContext().getEncryptionKeyId(),
+ channelData.getColumnEps().keySet(),
+ prevChannelData.getColumnEps().keySet());
+
+ if (i != start) {
+ currentBlob.add(chunkData.subList(start, i));
+ start = i;
+ }
+
+ addCurrentBlob();
+ }
+
+ totalCurrentBlobSizeInBytes += channelData.getBufferSize();
+ totalBufferSizeInBytes += channelData.getBufferSize();
+ prevChannelData = channelData;
+ }
+
+ if (i != start) {
+ currentBlob.add(chunkData.subList(start, i));
+ }
+ }
+
+ private void addCurrentBlob() {
+ if (!currentBlob.isEmpty()) {
+ allBlobs.add(currentBlob);
+ currentBlob = new ArrayList<>();
+ }
+ totalBufferSizeInBytes = 0;
+ totalCurrentBlobSizeInBytes = 0;
+ }
+
+ /**
+ * Check whether we should stop merging more channels into the same chunk, we need to stop in a
+ * few cases:
+ *
+ * When the blob size is larger than a certain threshold
+ *
+ *
When the chunk size is larger than a certain threshold
+ *
+ *
When the schemas are not the same
+ */
+ private boolean shouldStopProcessing(
+ float totalBufferSizeInBytes,
+ float totalBufferSizePerTableInBytes,
+ ChannelData current,
+ ChannelData prev) {
+ return totalBufferSizeInBytes + current.getBufferSize() > MAX_BLOB_SIZE_IN_BYTES
+ || totalBufferSizePerTableInBytes + current.getBufferSize()
+ > parameterProvider.getMaxChunkSizeInBytes()
+ || !Objects.equals(
+ current.getChannelContext().getEncryptionKeyId(),
+ prev.getChannelContext().getEncryptionKeyId())
+ || !current.getColumnEps().keySet().equals(prev.getColumnEps().keySet());
+ }
+}
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java
index f0f87e889..727f7e2af 100644
--- a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java
@@ -5,7 +5,6 @@
package net.snowflake.ingest.streaming.internal;
import static net.snowflake.ingest.utils.Constants.DISABLE_BACKGROUND_FLUSH;
-import static net.snowflake.ingest.utils.Constants.MAX_BLOB_SIZE_IN_BYTES;
import static net.snowflake.ingest.utils.Constants.MAX_THREAD_COUNT;
import static net.snowflake.ingest.utils.Constants.THREAD_SHUTDOWN_TIMEOUT_IN_SEC;
import static net.snowflake.ingest.utils.Utils.getStackTrace;
@@ -17,14 +16,7 @@
import java.security.InvalidAlgorithmParameterException;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Set;
+import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
@@ -377,180 +369,32 @@ private void createWorkers() {
Runtime.getRuntime().availableProcessors());
}
- /**
- * Distribute the flush tasks by iterating through all the channels in the channel cache and kick
- * off a build blob work when certain size has reached or we have reached the end
- *
- * @param tablesToFlush list of tables to flush
- */
+ private Iterator>> getChannelsToFlush(Set tablesToFlush) {
+ return this.channelCache.entrySet().stream()
+ .filter(e -> tablesToFlush.contains(e.getKey()))
+ .map(Map.Entry::getValue)
+ .iterator();
+ }
+
void distributeFlushTasks(Set tablesToFlush) {
- Iterator<
- Map.Entry<
- String, ConcurrentHashMap>>>
- itr =
- this.channelCache.entrySet().stream()
- .filter(e -> tablesToFlush.contains(e.getKey()))
- .iterator();
List, CompletableFuture>> blobs = new ArrayList<>();
- List> leftoverChannelsDataPerTable = new ArrayList<>();
// The API states that the number of available processors reported can change and therefore, we
// should poll it occasionally.
numProcessors = Runtime.getRuntime().availableProcessors();
- while (itr.hasNext() || !leftoverChannelsDataPerTable.isEmpty()) {
- List>> blobData = new ArrayList<>();
- float totalBufferSizeInBytes = 0F;
-
- // Distribute work at table level, split the blob if reaching the blob size limit or the
- // channel has different encryption key ids
- while (itr.hasNext() || !leftoverChannelsDataPerTable.isEmpty()) {
- List> channelsDataPerTable = Collections.synchronizedList(new ArrayList<>());
- if (!leftoverChannelsDataPerTable.isEmpty()) {
- channelsDataPerTable.addAll(leftoverChannelsDataPerTable);
- leftoverChannelsDataPerTable.clear();
- } else if (blobData.size()
- >= this.owningClient.getParameterProvider().getMaxChunksInBlob()) {
- // Create a new blob if the current one already contains max allowed number of chunks
- logger.logInfo(
- "Max allowed number of chunks in the current blob reached. chunkCount={}"
- + " maxChunkCount={}",
- blobData.size(),
- this.owningClient.getParameterProvider().getMaxChunksInBlob());
- break;
- } else {
- ConcurrentHashMap> table =
- itr.next().getValue();
- // Use parallel stream since getData could be the performance bottleneck when we have a
- // high number of channels
- table.values().parallelStream()
- .forEach(
- channel -> {
- if (channel.isValid()) {
- ChannelData data = channel.getData();
- if (data != null) {
- channelsDataPerTable.add(data);
- }
- }
- });
- }
-
- if (!channelsDataPerTable.isEmpty()) {
- int idx = 0;
- float totalBufferSizePerTableInBytes = 0F;
- while (idx < channelsDataPerTable.size()) {
- ChannelData channelData = channelsDataPerTable.get(idx);
- // Stop processing the rest of channels when needed
- if (idx > 0
- && shouldStopProcessing(
- totalBufferSizeInBytes,
- totalBufferSizePerTableInBytes,
- channelData,
- channelsDataPerTable.get(idx - 1))) {
- leftoverChannelsDataPerTable.addAll(
- channelsDataPerTable.subList(idx, channelsDataPerTable.size()));
- logger.logInfo(
- "Creation of another blob is needed because of blob/chunk size limit or"
- + " different encryption ids or different schema, client={}, table={},"
- + " blobSize={}, chunkSize={}, nextChannelSize={}, encryptionId1={},"
- + " encryptionId2={}, schema1={}, schema2={}",
- this.owningClient.getName(),
- channelData.getChannelContext().getTableName(),
- totalBufferSizeInBytes,
- totalBufferSizePerTableInBytes,
- channelData.getBufferSize(),
- channelData.getChannelContext().getEncryptionKeyId(),
- channelsDataPerTable.get(idx - 1).getChannelContext().getEncryptionKeyId(),
- channelData.getColumnEps().keySet(),
- channelsDataPerTable.get(idx - 1).getColumnEps().keySet());
- break;
- }
- totalBufferSizeInBytes += channelData.getBufferSize();
- totalBufferSizePerTableInBytes += channelData.getBufferSize();
- idx++;
- }
- // Add processed channels to the current blob, stop if we need to create a new blob
- blobData.add(channelsDataPerTable.subList(0, idx));
- if (idx != channelsDataPerTable.size()) {
- break;
- }
- }
- }
-
- if (blobData.isEmpty()) {
- continue;
- }
- // Kick off a build job
+ List>>> allBlobData = buildBlobData(getChannelsToFlush(tablesToFlush));
+ for (List>> blob : allBlobData) {
// Get the fully qualified table name from the first channel in the blob.
// This only matters when the client is in Iceberg mode. In Iceberg mode,
// all channels in the blob belong to the same table.
- String fullyQualifiedTableName =
- blobData.get(0).get(0).getChannelContext().getFullyQualifiedTableName();
-
+ final String fullyQualifiedTableName =
+ blob.get(0).get(0).getChannelContext().getFullyQualifiedTableName();
final BlobPath blobPath = this.storageManager.generateBlobPath(fullyQualifiedTableName);
- long flushStartMs = System.currentTimeMillis();
- if (this.owningClient.flushLatency != null) {
- latencyTimerContextMap.putIfAbsent(
- blobPath.fileRegistrationPath, this.owningClient.flushLatency.time());
- }
-
- // Copy encryptionKeysPerTable from owning client
- Map encryptionKeysPerTable =
- new ConcurrentHashMap<>();
- this.owningClient
- .getEncryptionKeysPerTable()
- .forEach((k, v) -> encryptionKeysPerTable.put(k, new EncryptionKey(v)));
-
- Supplier supplier =
- () -> {
- try {
- BlobMetadata blobMetadata =
- buildAndUpload(
- blobPath, blobData, fullyQualifiedTableName, encryptionKeysPerTable);
- blobMetadata.getBlobStats().setFlushStartMs(flushStartMs);
- return blobMetadata;
- } catch (Throwable e) {
- Throwable ex = e.getCause() == null ? e : e.getCause();
- String errorMessage =
- String.format(
- "Building blob failed, client=%s, blob=%s, exception=%s,"
- + " detail=%s, trace=%s, all channels in the blob will be"
- + " invalidated",
- this.owningClient.getName(),
- blobPath.fileRegistrationPath,
- ex,
- ex.getMessage(),
- getStackTrace(ex));
- logger.logError(errorMessage);
- if (this.owningClient.getTelemetryService() != null) {
- this.owningClient
- .getTelemetryService()
- .reportClientFailure(this.getClass().getSimpleName(), errorMessage);
- }
-
- if (e instanceof IOException) {
- invalidateAllChannelsInBlob(blobData, errorMessage);
- return null;
- } else if (e instanceof NoSuchAlgorithmException) {
- throw new SFException(e, ErrorCode.MD5_HASHING_NOT_AVAILABLE);
- } else if (e instanceof InvalidAlgorithmParameterException
- | e instanceof NoSuchPaddingException
- | e instanceof IllegalBlockSizeException
- | e instanceof BadPaddingException
- | e instanceof InvalidKeyException) {
- throw new SFException(e, ErrorCode.ENCRYPTION_FAILURE);
- } else {
- throw new SFException(e, ErrorCode.INTERNAL_ERROR, e.getMessage());
- }
- }
- };
-
- blobs.add(
- new Pair<>(
- new BlobData<>(blobPath.fileRegistrationPath, blobData),
- CompletableFuture.supplyAsync(supplier, this.buildUploadWorkers)));
+ // Kick off a build job
+ blobs.add(buildAndUploadBlob(fullyQualifiedTableName, blobPath, blob));
logger.logInfo(
"buildAndUpload task added for client={}, blob={}, buildUploadWorkers stats={}",
@@ -563,28 +407,78 @@ && shouldStopProcessing(
this.registerService.addBlobs(blobs);
}
- /**
- * Check whether we should stop merging more channels into the same chunk, we need to stop in a
- * few cases:
- *
- * When the blob size is larger than a certain threshold
- *
- *
When the chunk size is larger than a certain threshold
- *
- *
When the schemas are not the same
- */
- private boolean shouldStopProcessing(
- float totalBufferSizeInBytes,
- float totalBufferSizePerTableInBytes,
- ChannelData current,
- ChannelData prev) {
- return totalBufferSizeInBytes + current.getBufferSize() > MAX_BLOB_SIZE_IN_BYTES
- || totalBufferSizePerTableInBytes + current.getBufferSize()
- > this.owningClient.getParameterProvider().getMaxChunkSizeInBytes()
- || !Objects.equals(
- current.getChannelContext().getEncryptionKeyId(),
- prev.getChannelContext().getEncryptionKeyId())
- || !current.getColumnEps().keySet().equals(prev.getColumnEps().keySet());
+ private List>>> buildBlobData(Iterator>> tablesToFlush) {
+ BlobDataBuilder blobDataBuilder = new BlobDataBuilder<>(this.owningClient.getName(), this.owningClient.getParameterProvider());
+ while (tablesToFlush.hasNext()) {
+ ConcurrentHashMap> next = tablesToFlush.next();
+ Collection> tableChannels = next.values();
+ blobDataBuilder.appendDataForTable(tableChannels);
+ }
+
+ return blobDataBuilder.getAllBlobData();
+ }
+
+ private Pair, CompletableFuture> buildAndUploadBlob(String fullyQualifiedTableName, BlobPath blobPath, List>> blobData) {
+ long flushStartMs = System.currentTimeMillis();
+ if (this.owningClient.flushLatency != null) {
+ latencyTimerContextMap.putIfAbsent(
+ blobPath.fileRegistrationPath, this.owningClient.flushLatency.time());
+ }
+
+ // Copy encryptionKeysPerTable from owning client
+ Map encryptionKeysPerTable =
+ new ConcurrentHashMap<>();
+ this.owningClient
+ .getEncryptionKeysPerTable()
+ .forEach((k, v) -> encryptionKeysPerTable.put(k, new EncryptionKey(v)));
+
+ Supplier supplier =
+ () -> {
+ try {
+ BlobMetadata blobMetadata =
+ buildAndUpload(
+ blobPath, blobData, fullyQualifiedTableName, encryptionKeysPerTable);
+ blobMetadata.getBlobStats().setFlushStartMs(flushStartMs);
+ return blobMetadata;
+ } catch (Throwable e) {
+ Throwable ex = e.getCause() == null ? e : e.getCause();
+ String errorMessage =
+ String.format(
+ "Building blob failed, client=%s, blob=%s, exception=%s,"
+ + " detail=%s, trace=%s, all channels in the blob will be"
+ + " invalidated",
+ this.owningClient.getName(),
+ blobPath.fileRegistrationPath,
+ ex,
+ ex.getMessage(),
+ getStackTrace(ex));
+ logger.logError(errorMessage);
+ if (this.owningClient.getTelemetryService() != null) {
+ this.owningClient
+ .getTelemetryService()
+ .reportClientFailure(this.getClass().getSimpleName(), errorMessage);
+ }
+
+ if (e instanceof IOException) {
+ invalidateAllChannelsInBlob(blobData, errorMessage);
+ return null;
+ } else if (e instanceof NoSuchAlgorithmException) {
+ throw new SFException(e, ErrorCode.MD5_HASHING_NOT_AVAILABLE);
+ } else if (e instanceof InvalidAlgorithmParameterException
+ | e instanceof NoSuchPaddingException
+ | e instanceof IllegalBlockSizeException
+ | e instanceof BadPaddingException
+ | e instanceof InvalidKeyException) {
+ throw new SFException(e, ErrorCode.ENCRYPTION_FAILURE);
+ } else {
+ throw new SFException(e, ErrorCode.INTERNAL_ERROR, e.getMessage());
+ }
+ }
+ };
+
+ return new Pair<>(
+ new BlobData<>(blobPath.fileRegistrationPath, blobData),
+ CompletableFuture.supplyAsync(supplier, this.buildUploadWorkers));
}
/**
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java
index ee06eedcc..32c4bcf9e 100644
--- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java
@@ -36,7 +36,7 @@
*
* @param type of column data {@link ParquetChunkData})
*/
-class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIngestChannel {
+class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIngestChannel, SnowflakeStreamingIngestChannelWithData {
private static final Logging logger = new Logging(SnowflakeStreamingIngestChannelInternal.class);
@@ -181,7 +181,8 @@ public String getFullyQualifiedTableName() {
*
* @return a ChannelData object
*/
- ChannelData getData() {
+ @Override
+ public ChannelData getData() {
ChannelData data = this.rowBuffer.flush();
if (data != null) {
data.setChannelContext(channelFlushContext);
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelWithData.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelWithData.java
new file mode 100644
index 000000000..488437968
--- /dev/null
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelWithData.java
@@ -0,0 +1,7 @@
+package net.snowflake.ingest.streaming.internal;
+
+import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel;
+
+interface SnowflakeStreamingIngestChannelWithData extends SnowflakeStreamingIngestChannel {
+ ChannelData getData();
+}