From 160c4dcc8a80cdd652d3fe772f8c531f61f1badc Mon Sep 17 00:00:00 2001
From: Alec Huang <alec.huang@snowflake.com>
Date: Mon, 25 Nov 2024 15:07:52 -0800
Subject: [PATCH] addresss comments

---
 .../streaming/internal/it/BigFilesITBase.java | 235 ------------------
 .../streaming/internal/it/FDNBigFilesIT.java  | 206 ++++++++++++++-
 2 files changed, 200 insertions(+), 241 deletions(-)
 delete mode 100644 src/test/java/net/snowflake/ingest/streaming/internal/it/BigFilesITBase.java

diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/it/BigFilesITBase.java b/src/test/java/net/snowflake/ingest/streaming/internal/it/BigFilesITBase.java
deleted file mode 100644
index 4a78111fd..000000000
--- a/src/test/java/net/snowflake/ingest/streaming/internal/it/BigFilesITBase.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
- */
-
-package net.snowflake.ingest.streaming.internal.it;
-
-import static net.snowflake.ingest.TestUtils.verifyTableRowCount;
-import static net.snowflake.ingest.utils.Constants.ROLE;
-import static net.snowflake.ingest.utils.ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM;
-import static net.snowflake.ingest.utils.ParameterProvider.ENABLE_ICEBERG_STREAMING;
-
-import java.sql.Connection;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Random;
-import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import net.snowflake.ingest.TestUtils;
-import net.snowflake.ingest.streaming.OpenChannelRequest;
-import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel;
-import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClientFactory;
-import net.snowflake.ingest.streaming.internal.SnowflakeStreamingIngestClientInternal;
-import net.snowflake.ingest.utils.Constants;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameter;
-
-/** Ingest large amount of rows. */
-@RunWith(Parameterized.class)
-public abstract class BigFilesITBase {
-  private static final String TEST_DB_PREFIX = "STREAMING_INGEST_TEST_DB";
-  private static final String TEST_SCHEMA = "STREAMING_INGEST_TEST_SCHEMA";
-
-  private Properties prop;
-
-  private SnowflakeStreamingIngestClientInternal<?> client;
-  private Connection jdbcConnection;
-  private String testDb;
-  private boolean enableIcebergStreaming;
-
-  @Parameter(0)
-  public String compressionAlgorithm;
-
-  @Parameter(1)
-  public Constants.IcebergSerializationPolicy icebergSerializationPolicy;
-
-  public void beforeAll(boolean enableIcebergStreaming) throws Exception {
-    testDb = TEST_DB_PREFIX + "_" + UUID.randomUUID().toString().substring(0, 4);
-    // Create a streaming ingest client
-    jdbcConnection = TestUtils.getConnection(true);
-
-    jdbcConnection
-        .createStatement()
-        .execute(String.format("create or replace database %s;", testDb));
-    jdbcConnection
-        .createStatement()
-        .execute(String.format("create or replace schema %s.%s;", testDb, TEST_SCHEMA));
-    // Set timezone to UTC
-    jdbcConnection.createStatement().execute("alter session set timezone = 'UTC';");
-    jdbcConnection
-        .createStatement()
-        .execute(String.format("use warehouse %s", TestUtils.getWarehouse()));
-
-    prop = TestUtils.getProperties(Constants.BdecVersion.THREE, false);
-    if (prop.getProperty(ROLE).equals("DEFAULT_ROLE")) {
-      prop.setProperty(ROLE, "ACCOUNTADMIN");
-    }
-    prop.setProperty(BDEC_PARQUET_COMPRESSION_ALGORITHM, compressionAlgorithm);
-    prop.setProperty(ENABLE_ICEBERG_STREAMING, String.valueOf(enableIcebergStreaming));
-    client =
-        (SnowflakeStreamingIngestClientInternal<?>)
-            SnowflakeStreamingIngestClientFactory.builder("client1").setProperties(prop).build();
-    this.enableIcebergStreaming = enableIcebergStreaming;
-  }
-
-  @After
-  public void afterAll() throws Exception {
-    client.close();
-    jdbcConnection.createStatement().execute(String.format("drop database %s", testDb));
-  }
-
-  @Test
-  public void testManyRowsMultipleChannelsToMultipleTable()
-      throws SQLException, ExecutionException, InterruptedException {
-    String tableNamePrefix = "t_big_table_";
-
-    int numTables = 2;
-    int numChannels = 4; // channels are assigned round-robin to tables.
-    int batchSize = 10000;
-    int numBatches = 10; // number of rows PER CHANNEL is batchSize * numBatches
-    boolean isNullable = false;
-
-    Map<Integer, Integer> tableIdToNumChannels = new HashMap<>();
-    for (int i = 0; i < numChannels; i++) {
-      tableIdToNumChannels.put(
-          i % numTables, tableIdToNumChannels.getOrDefault(i % numTables, 0) + 1);
-    }
-    for (int i = 0; i < numTables; i++) {
-      String tableName = tableNamePrefix + i;
-      createTableForTest(tableName);
-    }
-
-    ingestRandomRowsToTable(
-        tableNamePrefix, numTables, numChannels, batchSize, numBatches, isNullable);
-
-    for (int i = 0; i < numTables; i++) {
-      int numChannelsToTable = tableIdToNumChannels.get(i);
-      verifyTableRowCount(
-          batchSize * numBatches * numChannelsToTable,
-          jdbcConnection,
-          testDb,
-          TEST_SCHEMA,
-          tableNamePrefix + i);
-
-      // select * to ensure scanning works
-      ResultSet result =
-          jdbcConnection
-              .createStatement()
-              .executeQuery(
-                  String.format(
-                      "select * from %s.%s.%s", testDb, TEST_SCHEMA, tableNamePrefix + i));
-      result.next();
-      Assert.assertNotNull(result.getString("STR"));
-    }
-  }
-
-  private void ingestRandomRowsToTable(
-      String tablePrefix,
-      int numTables,
-      int numChannels,
-      int batchSize,
-      int iterations,
-      boolean isNullable)
-      throws ExecutionException, InterruptedException {
-
-    final List<Map<String, Object>> rows = Collections.synchronizedList(new ArrayList<>());
-    for (int i = 0; i < batchSize; i++) {
-      Random r = new Random();
-      rows.add(TestUtils.getRandomRow(r, isNullable));
-    }
-
-    ExecutorService testThreadPool = Executors.newFixedThreadPool(numChannels);
-    CompletableFuture[] futures = new CompletableFuture[numChannels];
-    List<SnowflakeStreamingIngestChannel> channelList =
-        Collections.synchronizedList(new ArrayList<>());
-    for (int i = 0; i < numChannels; i++) {
-      final String channelName = "CHANNEL" + i;
-      int finalI = i;
-      futures[i] =
-          CompletableFuture.runAsync(
-              () -> {
-                int targetTable = finalI % numTables;
-                SnowflakeStreamingIngestChannel channel =
-                    openChannel(tablePrefix + targetTable, channelName);
-                channelList.add(channel);
-                for (int val = 0; val < iterations; val++) {
-                  TestUtils.verifyInsertValidationResponse(
-                      channel.insertRows(rows, Integer.toString(val)));
-                }
-              },
-              testThreadPool);
-    }
-    CompletableFuture joined = CompletableFuture.allOf(futures);
-    joined.get();
-    channelList.forEach(channel -> TestUtils.waitChannelFlushed(channel, iterations));
-    testThreadPool.shutdown();
-  }
-
-  private void createTableForTest(String tableName) {
-    try {
-      if (enableIcebergStreaming) {
-        jdbcConnection
-            .createStatement()
-            .execute(
-                String.format(
-                    "create or replace iceberg table %s (\n"
-                        + "                                    num_2_1 decimal(2, 1),\n"
-                        + "                                    num_4_2 decimal(4, 2),\n"
-                        + "                                    num_9_4 decimal(9, 4),\n"
-                        + "                                    num_18_7 decimal(18, 7),\n"
-                        + "                                    num_38_15 decimal(38, 15),\n"
-                        + "                                    num_float float,\n"
-                        + "                                    str string,\n"
-                        + "                                    bin binary)\n"
-                        + "catalog = 'SNOWFLAKE'\n"
-                        + "external_volume = 'streaming_ingest'\n"
-                        + "base_location = 'SDK_IT/%s/%s'\n"
-                        + "storage_serialization_policy = %s;",
-                    tableName, testDb, tableName, icebergSerializationPolicy.name()));
-      } else {
-        jdbcConnection
-            .createStatement()
-            .execute(
-                String.format(
-                    "create or replace table %s (\n"
-                        + "                                    num_2_1 NUMBER(2, 1),\n"
-                        + "                                    num_4_2 NUMBER(4, 2),\n"
-                        + "                                    num_9_4 NUMBER(9, 4),\n"
-                        + "                                    num_18_7 NUMBER(18, 7),\n"
-                        + "                                    num_38_15 NUMBER(38, 15),\n"
-                        + "                                    num_float FLOAT,\n"
-                        + "                                    str VARCHAR(256),\n"
-                        + "                                    bin BINARY(256));",
-                    tableName));
-      }
-    } catch (SQLException e) {
-      throw new RuntimeException("Cannot create table " + tableName, e);
-    }
-  }
-
-  private SnowflakeStreamingIngestChannel openChannel(String tableName, String channelName) {
-    OpenChannelRequest request =
-        OpenChannelRequest.builder(channelName)
-            .setDBName(testDb)
-            .setSchemaName(TEST_SCHEMA)
-            .setTableName(tableName)
-            .setOnErrorOption(OpenChannelRequest.OnErrorOption.CONTINUE)
-            .build();
-
-    // Open a streaming ingest channel from the given client
-    return client.openChannel(request);
-  }
-}
diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/it/FDNBigFilesIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/it/FDNBigFilesIT.java
index 181e7e6f9..df4ffa341 100644
--- a/src/test/java/net/snowflake/ingest/streaming/internal/it/FDNBigFilesIT.java
+++ b/src/test/java/net/snowflake/ingest/streaming/internal/it/FDNBigFilesIT.java
@@ -4,17 +4,211 @@
 
 package net.snowflake.ingest.streaming.internal.it;
 
+import static net.snowflake.ingest.TestUtils.verifyTableRowCount;
+import static net.snowflake.ingest.utils.Constants.ROLE;
+import static net.snowflake.ingest.utils.ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import net.snowflake.ingest.TestUtils;
+import net.snowflake.ingest.streaming.OpenChannelRequest;
+import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel;
+import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClientFactory;
+import net.snowflake.ingest.streaming.internal.SnowflakeStreamingIngestClientInternal;
+import net.snowflake.ingest.utils.Constants;
+import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+/** Ingest large amount of rows. */
+@RunWith(Parameterized.class)
+public class FDNBigFilesIT {
+  private static final String TEST_DB_PREFIX = "STREAMING_INGEST_TEST_DB";
+  private static final String TEST_SCHEMA = "STREAMING_INGEST_TEST_SCHEMA";
+
+  private Properties prop;
+
+  private SnowflakeStreamingIngestClientInternal<?> client;
+  private Connection jdbcConnection;
+  private String testDb;
 
-public class FDNBigFilesIT extends BigFilesITBase {
-  @Parameterized.Parameters(name = "compressionAlgorithm={0}")
-  public static Object[][] compressionAlgorithms() {
-    return new Object[][] {{"GZIP", null}, {"ZSTD", null}};
+  @Parameters(name = "{index}: {0}")
+  public static Object[] compressionAlgorithms() {
+    return new Object[] {"GZIP", "ZSTD"};
   }
 
+  @Parameter public String compressionAlgorithm;
+
   @Before
-  public void before() throws Exception {
-    super.beforeAll(false);
+  public void beforeAll() throws Exception {
+    testDb = TEST_DB_PREFIX + "_" + UUID.randomUUID().toString().substring(0, 4);
+    // Create a streaming ingest client
+    jdbcConnection = TestUtils.getConnection(true);
+
+    jdbcConnection
+        .createStatement()
+        .execute(String.format("create or replace database %s;", testDb));
+    jdbcConnection
+        .createStatement()
+        .execute(String.format("create or replace schema %s.%s;", testDb, TEST_SCHEMA));
+    // Set timezone to UTC
+    jdbcConnection.createStatement().execute("alter session set timezone = 'UTC';");
+    jdbcConnection
+        .createStatement()
+        .execute(String.format("use warehouse %s", TestUtils.getWarehouse()));
+
+    prop = TestUtils.getProperties(Constants.BdecVersion.THREE, false);
+    if (prop.getProperty(ROLE).equals("DEFAULT_ROLE")) {
+      prop.setProperty(ROLE, "ACCOUNTADMIN");
+    }
+    prop.setProperty(BDEC_PARQUET_COMPRESSION_ALGORITHM, compressionAlgorithm);
+    client =
+        (SnowflakeStreamingIngestClientInternal<?>)
+            SnowflakeStreamingIngestClientFactory.builder("client1").setProperties(prop).build();
+  }
+
+  @After
+  public void afterAll() throws Exception {
+    client.close();
+    jdbcConnection.createStatement().execute(String.format("drop database %s", testDb));
+  }
+
+  @Test
+  public void testManyRowsMultipleChannelsToMultipleTable()
+      throws SQLException, ExecutionException, InterruptedException {
+    String tableNamePrefix = "t_big_table_";
+
+    int numTables = 2;
+    int numChannels = 4; // channels are assigned round-robin to tables.
+    int batchSize = 10000;
+    int numBatches = 10; // number of rows PER CHANNEL is batchSize * numBatches
+    boolean isNullable = false;
+
+    Map<Integer, Integer> tableIdToNumChannels = new HashMap<>();
+    for (int i = 0; i < numChannels; i++) {
+      tableIdToNumChannels.put(
+          i % numTables, tableIdToNumChannels.getOrDefault(i % numTables, 0) + 1);
+    }
+    for (int i = 0; i < numTables; i++) {
+      String tableName = tableNamePrefix + i;
+      createTableForTest(tableName);
+    }
+
+    ingestRandomRowsToTable(
+        tableNamePrefix, numTables, numChannels, batchSize, numBatches, isNullable);
+
+    for (int i = 0; i < numTables; i++) {
+      int numChannelsToTable = tableIdToNumChannels.get(i);
+      verifyTableRowCount(
+          batchSize * numBatches * numChannelsToTable,
+          jdbcConnection,
+          testDb,
+          TEST_SCHEMA,
+          tableNamePrefix + i);
+
+      // select * to ensure scanning works
+      ResultSet result =
+          jdbcConnection
+              .createStatement()
+              .executeQuery(
+                  String.format(
+                      "select * from %s.%s.%s", testDb, TEST_SCHEMA, tableNamePrefix + i));
+      result.next();
+      Assert.assertNotNull(result.getString("STR"));
+    }
+  }
+
+  private void ingestRandomRowsToTable(
+      String tablePrefix,
+      int numTables,
+      int numChannels,
+      int batchSize,
+      int iterations,
+      boolean isNullable)
+      throws ExecutionException, InterruptedException {
+
+    final List<Map<String, Object>> rows = Collections.synchronizedList(new ArrayList<>());
+    for (int i = 0; i < batchSize; i++) {
+      Random r = new Random();
+      rows.add(TestUtils.getRandomRow(r, isNullable));
+    }
+
+    ExecutorService testThreadPool = Executors.newFixedThreadPool(numChannels);
+    CompletableFuture[] futures = new CompletableFuture[numChannels];
+    List<SnowflakeStreamingIngestChannel> channelList =
+        Collections.synchronizedList(new ArrayList<>());
+    for (int i = 0; i < numChannels; i++) {
+      final String channelName = "CHANNEL" + i;
+      int finalI = i;
+      futures[i] =
+          CompletableFuture.runAsync(
+              () -> {
+                int targetTable = finalI % numTables;
+                SnowflakeStreamingIngestChannel channel =
+                    openChannel(tablePrefix + targetTable, channelName);
+                channelList.add(channel);
+                for (int val = 0; val < iterations; val++) {
+                  TestUtils.verifyInsertValidationResponse(
+                      channel.insertRows(rows, Integer.toString(val)));
+                }
+              },
+              testThreadPool);
+    }
+    CompletableFuture joined = CompletableFuture.allOf(futures);
+    joined.get();
+    channelList.forEach(channel -> TestUtils.waitChannelFlushed(channel, iterations));
+    testThreadPool.shutdown();
+  }
+
+  private void createTableForTest(String tableName) {
+    try {
+      jdbcConnection
+          .createStatement()
+          .execute(
+              String.format(
+                  "create or replace table %s (\n"
+                      + "                                    num_2_1 NUMBER(2, 1),\n"
+                      + "                                    num_4_2 NUMBER(4, 2),\n"
+                      + "                                    num_9_4 NUMBER(9, 4),\n"
+                      + "                                    num_18_7 NUMBER(18, 7),\n"
+                      + "                                    num_38_15 NUMBER(38, 15),\n"
+                      + "                                    num_float FLOAT,\n"
+                      + "                                    str VARCHAR(256),\n"
+                      + "                                    bin BINARY(256));",
+                  tableName));
+    } catch (SQLException e) {
+      throw new RuntimeException("Cannot create table " + tableName, e);
+    }
+  }
+
+  private SnowflakeStreamingIngestChannel openChannel(String tableName, String channelName) {
+    OpenChannelRequest request =
+        OpenChannelRequest.builder(channelName)
+            .setDBName(testDb)
+            .setSchemaName(TEST_SCHEMA)
+            .setTableName(tableName)
+            .setOnErrorOption(OpenChannelRequest.OnErrorOption.CONTINUE)
+            .build();
+
+    // Open a streaming ingest channel from the given client
+    return client.openChannel(request);
   }
 }