Skip to content

Commit

Permalink
SNOW-1654124: Write file name to metadata at the place when we create…
Browse files Browse the repository at this point in the history
… the file (#824)

- Write file name to metadata at the place when we create the file (only works for serializeFromJavaObjects)
- Looks like serializeFromParquetWriteBuffers is not maintained to work with primaryFileId as the ParquetWriter is created during setupSchema, we would better remove the code as a whole if we decide to not use it, or a bigger, separated change is required.
  • Loading branch information
sfc-gh-tzhang authored Sep 10, 2024
1 parent 97bd52a commit 3734061
Show file tree
Hide file tree
Showing 12 changed files with 140 additions and 83 deletions.
44 changes: 36 additions & 8 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,30 @@
<version>1.14.9</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>jakarta.xml.bind</groupId>
<artifactId>jakarta.xml.bind-api</artifactId>
</exclusion>
<exclusion>
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
Expand Down Expand Up @@ -478,6 +502,10 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.parquet/parquet-hadoop -->
<dependency>
<groupId>org.apache.parquet</groupId>
Expand Down Expand Up @@ -756,8 +784,8 @@
<ignoreNonCompile>true</ignoreNonCompile>
<ignoredDependencies>
<!-- We defined these as direct dependencies (as opposed to just declaring it in dependencyManagement)
to workaround https://issues.apache.org/jira/browse/MNG-7982. Now the dependency analyzer complains that
the dependency is unused, so we ignore it here-->
to workaround https://issues.apache.org/jira/browse/MNG-7982. Now the dependency analyzer complains that
the dependency is unused, so we ignore it here-->
<ignoredDependency>org.apache.commons:commons-compress</ignoredDependency>
<ignoredDependency>org.apache.commons:commons-configuration2</ignoredDependency>
</ignoredDependencies>
Expand Down Expand Up @@ -852,9 +880,9 @@
<configuration>
<errorRemedy>failFast</errorRemedy>
<!--
The list of allowed licenses. If you see the build failing due to "There are some forbidden licenses used, please
check your dependencies", verify the conditions of the license and add the reference to it here.
-->
The list of allowed licenses. If you see the build failing due to "There are some forbidden licenses used, please
check your dependencies", verify the conditions of the license and add the reference to it here.
-->
<includedLicenses>
<includedLicense>Apache License 2.0</includedLicense>
<includedLicense>BSD 2-Clause License</includedLicense>
Expand Down Expand Up @@ -1167,9 +1195,9 @@
</executions>
</plugin>
<!--
Plugin executes license processing Python script, which copies third party license files into the directory
target/generated-licenses-info/META-INF/third-party-licenses, which is then included in the shaded JAR.
-->
Plugin executes license processing Python script, which copies third party license files into the directory
target/generated-licenses-info/META-INF/third-party-licenses, which is then included in the shaded JAR.
-->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -496,11 +496,10 @@ public InsertValidationResponse insertRows(
* Flush the data in the row buffer by taking the ownership of the old vectors and pass all the
* required info back to the flush service to build the blob
*
* @param filePath the name of the file the data will be written in
* @return A ChannelData object that contains the info needed by the flush service to build a blob
*/
@Override
public ChannelData<T> flush(final String filePath) {
public ChannelData<T> flush() {
logger.logDebug("Start get data for channel={}", channelFullyQualifiedName);
if (this.bufferedRowCount > 0) {
Optional<T> oldData = Optional.empty();
Expand All @@ -518,7 +517,7 @@ public ChannelData<T> flush(final String filePath) {
try {
if (this.bufferedRowCount > 0) {
// Transfer the ownership of the vectors
oldData = getSnapshot(filePath);
oldData = getSnapshot();
oldRowCount = this.bufferedRowCount;
oldBufferSize = this.bufferSize;
oldRowSequencer = this.channelState.incrementAndGetRowSequencer();
Expand Down Expand Up @@ -615,12 +614,8 @@ void reset() {
this.statsMap.replaceAll((key, value) -> value.forkEmpty());
}

/**
* Get buffered data snapshot for later flushing.
*
* @param filePath the name of the file the data will be written in
*/
abstract Optional<T> getSnapshot(final String filePath);
/** Get buffered data snapshot for later flushing. */
abstract Optional<T> getSnapshot();

@VisibleForTesting
abstract Object getVectorValueAt(String column, int index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ void distributeFlushTasks(Set<String> tablesToFlush) {
.forEach(
channel -> {
if (channel.isValid()) {
ChannelData<T> data = channel.getData(blobPath);
ChannelData<T> data = channel.getData();
if (data != null) {
channelsDataPerTable.add(data);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,10 @@ private SerializationResult serializeFromJavaObjects(
}

Map<String, String> metadata = channelsDataPerTable.get(0).getVectors().metadata;
// We insert the filename in the file itself as metadata so that streams can work on replicated
// mixed tables. For a more detailed discussion on the topic see SNOW-561447 and
// http://go/streams-on-replicated-mixed-tables
metadata.put(Constants.PRIMARY_FILE_ID_KEY, StreamingIngestUtils.getShortname(filePath));
parquetWriter =
new BdecParquetWriter(
mergedData,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import net.snowflake.ingest.connection.TelemetryService;
import net.snowflake.ingest.streaming.OffsetTokenVerificationFunction;
import net.snowflake.ingest.streaming.OpenChannelRequest;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.SFException;
import org.apache.parquet.hadoop.BdecParquetWriter;
Expand Down Expand Up @@ -262,12 +261,7 @@ boolean hasColumns() {
}

@Override
Optional<ParquetChunkData> getSnapshot(final String filePath) {
// We insert the filename in the file itself as metadata so that streams can work on replicated
// mixed tables. For a more detailed discussion on the topic see SNOW-561447 and
// http://go/streams-on-replicated-mixed-tables
metadata.put(Constants.PRIMARY_FILE_ID_KEY, StreamingIngestUtils.getShortname(filePath));

Optional<ParquetChunkData> getSnapshot() {
List<List<Object>> oldData = new ArrayList<>();
if (!clientBufferParameters.getEnableParquetInternalBuffering()) {
data.forEach(r -> oldData.add(new ArrayList<>(r)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,9 @@ InsertValidationResponse insertRows(
* Flush the data in the row buffer by taking the ownership of the old vectors and pass all the
* required info back to the flush service to build the blob
*
* @param filePath the name of the file the data will be written in
* @return A ChannelData object that contains the info needed by the flush service to build a blob
*/
ChannelData<T> flush(final String filePath);
ChannelData<T> flush();

/**
* Close the row buffer and release resources. Note that the caller needs to handle
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,11 +220,10 @@ public String getFullyQualifiedTableName() {
/**
* Get all the data needed to build the blob during flush
*
* @param filePath the name of the file the data will be written in
* @return a ChannelData object
*/
ChannelData<T> getData(final String filePath) {
ChannelData<T> data = this.rowBuffer.flush(filePath);
ChannelData<T> getData() {
ChannelData<T> data = this.rowBuffer.flush();
if (data != null) {
data.setChannelContext(channelFlushContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,15 @@
*/
public class BdecParquetReader implements AutoCloseable {
private final InternalParquetRecordReader<List<Object>> reader;
private final ParquetFileReader fileReader;

/**
* @param data buffer where the data that has to be read resides.
* @throws IOException
*/
public BdecParquetReader(byte[] data) throws IOException {
ParquetReadOptions options = ParquetReadOptions.builder().build();
ParquetFileReader fileReader = ParquetFileReader.open(new BdecInputFile(data), options);
fileReader = ParquetFileReader.open(new BdecInputFile(data), options);
reader = new InternalParquetRecordReader<>(new BdecReadSupport(), options.getRecordFilter());
reader.initialize(fileReader, options);
}
Expand All @@ -60,6 +61,11 @@ public List<Object> read() throws IOException {
}
}

/** Get the key value metadata in the file */
public Map<String, String> getKeyValueMetadata() {
return fileReader.getFileMetaData().getKeyValueMetaData();
}

/**
* Close the reader.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ void setParameterOverride(Map<String, Object> parameterOverride) {

ChannelData<T> flushChannel(String name) {
SnowflakeStreamingIngestChannelInternal<T> channel = channels.get(name);
ChannelData<T> channelData = channel.getRowBuffer().flush(name + "_snowpipe_streaming.bdec");
ChannelData<T> channelData = channel.getRowBuffer().flush();
channelData.setChannelContext(channel.getChannelContext());
this.channelData.add(channelData);
return channelData;
Expand Down
Loading

0 comments on commit 3734061

Please sign in to comment.