Skip to content

Commit

Permalink
This splits the logic to structure channel data into blob collections…
Browse files Browse the repository at this point in the history
… out into a separate class and refactors the main nested loop into separate methods within that new class. The collections produced by the new class should be identical to those prior to the change.
  • Loading branch information
sfc-gh-bkroeker committed Jan 6, 2025
1 parent 5d11892 commit f61388b
Show file tree
Hide file tree
Showing 8 changed files with 309 additions and 230 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package net.snowflake.ingest.streaming.internal;

import net.snowflake.ingest.utils.Logging;
import net.snowflake.ingest.utils.ParameterProvider;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

import static net.snowflake.ingest.utils.Constants.MAX_BLOB_SIZE_IN_BYTES;

/**
* Responsible for accepting data from channels and collating into collections that will be used to build the actual blobs
* <p>
* A chunk is represented as a list of channel data from a single table
* A blob is represented as a list of chunks that must share the same schema (but not necessarily the same table)
* <p>
* This class returns a list of blobs
*/
class BlobDataBuilder<T> {
private static final Logging logger = new Logging(BlobDataBuilder.class);
private final List<List<List<ChannelData<T>>>> allBlobs;
private final ParameterProvider parameterProvider;
private final String clientName;
private List<List<ChannelData<T>>> currentBlob;
private ChannelData<T> prevChannelData = null;
private float totalCurrentBlobSizeInBytes = 0F;
private float totalBufferSizeInBytes = 0F;

public BlobDataBuilder(String clientName, ParameterProvider parameterProvider) {
this.clientName = clientName;
this.parameterProvider = parameterProvider;
this.currentBlob = new ArrayList<>();
this.allBlobs = new ArrayList<>();
}

public List<List<List<ChannelData<T>>>> getAllBlobData() {
addCurrentBlob();
return allBlobs;
}

public void appendDataForTable(Collection<? extends SnowflakeStreamingIngestChannelFlushable<T>> tableChannels) {
List<ChannelData<T>> chunk = getChunkForTable(tableChannels);
appendChunk(chunk);
}

private List<ChannelData<T>> getChunkForTable(Collection<? extends SnowflakeStreamingIngestChannelFlushable<T>> tableChannels) {
List<ChannelData<T>> channelsDataPerTable = Collections.synchronizedList(new ArrayList<>());
// Use parallel stream since getData could be the performance bottleneck when we have a
// high number of channels
tableChannels.parallelStream()
.forEach(
channel -> {
if (channel.isValid()) {
ChannelData<T> data = channel.getData();
if (data != null) {
channelsDataPerTable.add(data);
}
}
});
return channelsDataPerTable;
}

private void appendChunk(List<ChannelData<T>> chunkData) {
if (chunkData.isEmpty()) {
return;
}

if (currentBlob.size() >= parameterProvider.getMaxChunksInBlob()) {
// Create a new blob if the current one already contains max allowed number of chunks
logger.logInfo(
"Max allowed number of chunks in the current blob reached. chunkCount={}"
+ " maxChunkCount={}",
currentBlob.size(),
parameterProvider.getMaxChunksInBlob());

addCurrentBlob();
}

int i, start = 0;
for (i = 0; i < chunkData.size(); i++) {
ChannelData<T> channelData = chunkData.get(i);
if (prevChannelData != null && shouldStopProcessing(
totalCurrentBlobSizeInBytes,
totalBufferSizeInBytes,
channelData,
prevChannelData)) {
logger.logInfo(
"Creation of another blob is needed because of blob/chunk size limit or"
+ " different encryption ids or different schema, client={}, table={},"
+ " blobSize={}, chunkSize={}, nextChannelSize={}, encryptionId1={},"
+ " encryptionId2={}, schema1={}, schema2={}",
clientName,
channelData.getChannelContext().getTableName(),
totalCurrentBlobSizeInBytes,
totalBufferSizeInBytes,
channelData.getBufferSize(),
channelData.getChannelContext().getEncryptionKeyId(),
prevChannelData.getChannelContext().getEncryptionKeyId(),
channelData.getColumnEps().keySet(),
prevChannelData.getColumnEps().keySet());

if (i != start) {
currentBlob.add(chunkData.subList(start, i));
start = i;
}

addCurrentBlob();
}

totalCurrentBlobSizeInBytes += channelData.getBufferSize();
totalBufferSizeInBytes += channelData.getBufferSize();
prevChannelData = channelData;
}

if (i != start) {
currentBlob.add(chunkData.subList(start, i));
}
}

private void addCurrentBlob() {
if (!currentBlob.isEmpty()) {
allBlobs.add(currentBlob);
currentBlob = new ArrayList<>();
}
totalBufferSizeInBytes = 0;
totalCurrentBlobSizeInBytes = 0;
}

/**
* Check whether we should stop merging more channels into the same chunk, we need to stop in a
* few cases:
*
* <p>When the blob size is larger than a certain threshold
*
* <p>When the chunk size is larger than a certain threshold
*
* <p>When the schemas are not the same
*/
private boolean shouldStopProcessing(
float totalBufferSizeInBytes,
float totalBufferSizePerTableInBytes,
ChannelData<T> current,
ChannelData<T> prev) {
return totalBufferSizeInBytes + current.getBufferSize() > MAX_BLOB_SIZE_IN_BYTES
|| totalBufferSizePerTableInBytes + current.getBufferSize()
> parameterProvider.getMaxChunkSizeInBytes()
|| !Objects.equals(
current.getChannelContext().getEncryptionKeyId(),
prev.getChannelContext().getEncryptionKeyId())
|| !current.getColumnEps().keySet().equals(prev.getColumnEps().keySet());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class ChannelCache<T> {
// Cache to hold all the valid channels, the key for the outer map is FullyQualifiedTableName and
// the key for the inner map is ChannelName
private final ConcurrentHashMap<
String, ConcurrentHashMap<String, SnowflakeStreamingIngestChannelInternal<T>>>
String, ConcurrentHashMap<String, SnowflakeStreamingIngestChannelFlushable<T>>>
cache = new ConcurrentHashMap<>();

/** Flush information for each table including last flush time and if flush is needed */
Expand All @@ -45,8 +45,8 @@ static class FlushInfo {
*
* @param channel
*/
void addChannel(SnowflakeStreamingIngestChannelInternal<T> channel) {
ConcurrentHashMap<String, SnowflakeStreamingIngestChannelInternal<T>> channels =
void addChannel(SnowflakeStreamingIngestChannelFlushable<T> channel) {
ConcurrentHashMap<String, SnowflakeStreamingIngestChannelFlushable<T>> channels =
this.cache.computeIfAbsent(
channel.getFullyQualifiedTableName(), v -> new ConcurrentHashMap<>());

Expand All @@ -55,7 +55,7 @@ void addChannel(SnowflakeStreamingIngestChannelInternal<T> channel) {
this.tableFlushInfo.putIfAbsent(
channel.getFullyQualifiedTableName(), new FlushInfo(System.currentTimeMillis(), false));

SnowflakeStreamingIngestChannelInternal<T> oldChannel =
SnowflakeStreamingIngestChannelFlushable<T> oldChannel =
channels.put(channel.getName(), channel);
// Invalidate old channel if it exits to block new inserts and return error to users earlier
if (oldChannel != null) {
Expand Down Expand Up @@ -136,7 +136,7 @@ void setNeedFlush(String fullyQualifiedTableName, boolean needFlush) {
}

/** Returns an immutable set view of the mappings contained in the channel cache. */
Set<Map.Entry<String, ConcurrentHashMap<String, SnowflakeStreamingIngestChannelInternal<T>>>>
Set<Map.Entry<String, ConcurrentHashMap<String, SnowflakeStreamingIngestChannelFlushable<T>>>>
entrySet() {
return Collections.unmodifiableSet(cache.entrySet());
}
Expand All @@ -155,11 +155,11 @@ void closeAllChannels() {

/** Remove a channel in the channel cache if the channel sequencer matches */
// TODO: background cleaner to cleanup old stale channels that are not closed?
void removeChannelIfSequencersMatch(SnowflakeStreamingIngestChannelInternal<T> channel) {
void removeChannelIfSequencersMatch(SnowflakeStreamingIngestChannelFlushable<T> channel) {
cache.computeIfPresent(
channel.getFullyQualifiedTableName(),
(k, v) -> {
SnowflakeStreamingIngestChannelInternal<T> channelInCache = v.get(channel.getName());
SnowflakeStreamingIngestChannelFlushable<T> channelInCache = v.get(channel.getName());
// We need to compare the channel sequencer in case the old channel was already been
// removed
return channelInCache != null
Expand All @@ -180,10 +180,10 @@ void invalidateChannelIfSequencersMatch(
Long channelSequencer,
String invalidationCause) {
String fullyQualifiedTableName = String.format("%s.%s.%s", dbName, schemaName, tableName);
ConcurrentHashMap<String, SnowflakeStreamingIngestChannelInternal<T>> channelsMapPerTable =
ConcurrentHashMap<String, SnowflakeStreamingIngestChannelFlushable<T>> channelsMapPerTable =
cache.get(fullyQualifiedTableName);
if (channelsMapPerTable != null) {
SnowflakeStreamingIngestChannelInternal<T> channel = channelsMapPerTable.get(channelName);
SnowflakeStreamingIngestChannelFlushable<T> channel = channelsMapPerTable.get(channelName);
if (channel != null && channel.getChannelSequencer().equals(channelSequencer)) {
channel.invalidate("invalidate with matched sequencer", invalidationCause);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ static class ChannelStatusRequestDTO {
// Client Sequencer
private final Long clientSequencer;

ChannelStatusRequestDTO(SnowflakeStreamingIngestChannelInternal channel) {
ChannelStatusRequestDTO(SnowflakeStreamingIngestChannelFlushable channel) {
this.channelName = channel.getName();
this.databaseName = channel.getDBName();
this.schemaName = channel.getSchemaName();
Expand Down
Loading

0 comments on commit f61388b

Please sign in to comment.