Skip to content

Commit

Permalink
Merge branch 'master' into tzhang-si-release
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-tzhang authored Jun 16, 2022
2 parents 24ca57e + 8be21bd commit 2e0dce1
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 47 deletions.
54 changes: 31 additions & 23 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ Snowflake Ingest Service Java SDK

[![image](http://img.shields.io/:license-Apache%202-brightgreen.svg)](http://www.apache.org/licenses/LICENSE-2.0.txt)
[![image](https://github.com/snowflakedb/snowflake-ingest-java/workflows/Snowpipe%20Java%20SDK%20Tests/badge.svg)](https://github.com/snowflakedb/snowflake-ingest-java/actions)
[![image](https://codecov.io/gh/snowflakedb/snowflake-ingest-java/branch/master/graph/badge.svg)](https://codecov.io/gh/snowflakedb/snowflake-ingest-java)
[![image](https://maven-badges.herokuapp.com/maven-central/net.snowflake/snowflake-ingest-sdk/badge.svg?style=plastic)](https://repo.maven.apache.org/maven2/net/snowflake/snowflake-ingest-sdk/)

The Snowflake Ingest Service SDK allows users to ingest files into their
Snowflake data warehouse in a programmatic fashion via key-pair
authentication.
authentication. Currently, we support ingestion through the following APIs:
1. [Snowpipe](https://docs.snowflake.com/en/user-guide/data-load-snowpipe-rest-gs.html#client-requirement-java-or-python-sdk)
2. [Snowpipe Streaming](https://docs.snowflake.com/en/LIMITEDACCESS/snowpipe-streaming.html) - Under Private Preview

# Prerequisites

Expand All @@ -23,12 +24,12 @@ time.
Snowflake Authentication for the Ingest Service requires creating a 2048
bit RSA key pair and, registering the public key with Snowflake. For
detailed instructions, please visit the relevant [Snowflake
Documentation Page](docs.snowflake.net).
Documentation Page](https://docs.snowflake.com/en/user-guide/key-pair-auth.html).

## Maven (Developers only)

This SDK is developed as a [Maven](maven.apache.org) project. As a
result, you'll need to install maven to build the projects and, run
This SDK is developed as a Maven project. As a
result, you'll need to install Maven to build the projects and, run
tests.

# Adding as a Dependency
Expand All @@ -52,6 +53,30 @@ dependencies {
}
```

# Example

## Snowpipe

Check out `SnowflakeIngestBasicExample.java`

## Snowpipe Streaming

Check out `SnowflakeStreamingIngestExample.java`, which performs following operations:
1. Reads a JSON file which contains details regarding Snowflake Account, User, Role and Private Key. Take a look at `profile_streaming.json.example` for more details.
1. [Here](https://docs.snowflake.com/en/user-guide/key-pair-auth.html#configuring-key-pair-authentication) are the steps required to generate a private key.
2. Creates a `SnowflakeStreamingIngestClient` which can be used to open one or more Streaming Channels pointing to the same or different tables.
3. Creates a `SnowflakeStreamingIngestChannel` against a Database, Schema and Table.
1. Please note: The database, schema and table is expected to be present before opening the Channel. Example SQL queries to create them:
```sql
create or replace database MY_DATABASE;
create or replace schema MY_SCHEMA;
create or replace table MY_TABLE(c1 number);
```
4. Inserts 1000 rows into the channel created in 3rd step using the `insertRows` API on the Channel object
1. `insertRows` API also takes in an optional `offsetToken` String which can be associated to this batch of rows.
5. Calls `getLatestCommittedOffsetToken` on the channel until the appropriate offset is found in Snowflake.
6. Close the channel when the ingestion is done to make sure everything is committed.

# Building From Source

If you would like to build this project from source you can run the
Expand Down Expand Up @@ -100,21 +125,4 @@ you would need to remove the following scope limits in pom.xml
We use [Google Java format](https://github.com/google/google-java-format) to format the code. To format all files, run:
```bash
./format.sh
````

# Snowpipe Streaming Example (Still in Preview)

Run File `SnowflakeStreamingIngestExample.java` which performs following operations.
1. Reads a JSON file which contains details regarding Snowflake Account, User, Role and Private Key. Take a look at `profile_streaming.json.example` for more details.
1. [Here](https://docs.snowflake.com/en/user-guide/key-pair-auth.html#configuring-key-pair-authentication) are the steps required to generate a private key.
2. Creates a `SnowflakeStreamingIngestClient` which can be used to open one or more Streaming Channels against a table.
3. Creates a `SnowflakeStreamingIngestChannel` against a Database, Schema and Table name.
1. Please note: A Table is expected to be present before opening a Channel. Use following SQL queries and place respective Database, Schema and Table names in `profile_streaming.json` file
```sql
create or replace database MY_DATABASE;
create or replace schema MY_SCHEMA;
create or replace table MY_TABLE(c1 number);
```
4. Inserts a few rows (1000) into a channel created in 3rd step using the `insertRows` API on the Channel object
1. `insertRows` API also takes in an optional `offsetToken` String which can be associated to this batch of rows.
5. Calls `getLatestCommittedOffsetToken` on the channel until the appropriate offset is found in Snowflake.
````
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@
* <p>Please read the README.md file for detailed steps
*/
public class SnowflakeStreamingIngestExample {
private static String PROFILE_PATH = "profile_streaming.json";
// Please follow the example in profile_streaming.json.example to see the required properties, or
// if you have already set up profile.json with Snowpipe before, all you need is to add the "role"
// property.
private static String PROFILE_PATH = "profile.json";
private static final ObjectMapper mapper = new ObjectMapper();

public static void main(String[] args) throws Exception {
Expand All @@ -38,56 +41,59 @@ public static void main(String[] args) throws Exception {

// Create a streaming ingest client
try (SnowflakeStreamingIngestClient client =
SnowflakeStreamingIngestClientFactory.builder("CLIENT").setProperties(props).build()) {
SnowflakeStreamingIngestClientFactory.builder("MY_CLIENT").setProperties(props).build()) {

// Create an open channel request on table T_STREAMINGINGEST
// Create an open channel request on table MY_TABLE, note that the corresponding
// db/schema/table needs to be present
// Example: create or replace table MY_TABLE(c1 number);
OpenChannelRequest request1 =
OpenChannelRequest.builder("MY_CHANNEL")
.setDBName("MY_DATABASE")
.setSchemaName("MY_SCHEMA")
.setTableName("MY_TABLE")
.setOnErrorOption(OpenChannelRequest.OnErrorOption.CONTINUE)
.setOnErrorOption(
OpenChannelRequest.OnErrorOption.CONTINUE) // Another ON_ERROR option is ABORT
.build();

// Open a streaming ingest channel from the given client
SnowflakeStreamingIngestChannel channel1 = client.openChannel(request1);

// Insert few rows into the channel (Using insertRows API), along with the offset Token which
// corresponds to the row number
// Insert rows into the channel (Using insertRows API)
final int totalRowsInTable = 1000;
for (int val = 0; val < totalRowsInTable; val++) {
Map<String, Object> row = new HashMap<>();

// c1 corresponds to the column name in table
row.put("c1", val);

// Insert the row with the current offset_token
InsertValidationResponse response = channel1.insertRow(row, String.valueOf(val));
if (response.hasErrors()) {
// Simply throw if there is an exception
// Simply throw if there is an exception, or you can do whatever you want with the
// erroneous row
throw response.getInsertErrors().get(0).getException();
}
}

// Polling Snowflake to fetch offset token registered in Snowflake
final int expectedOffsetTokenInSnowflake = 999; // because it goes from 0 to 999
String offsetTokenFromSnowflake = channel1.getLatestCommittedOffsetToken();

// If needed, you can check the offset_token registered in Snowflake to make sure everything
// is committed
final int expectedOffsetTokenInSnowflake = totalRowsInTable - 1; // 0 based offset_token
final int maxRetries = 10;
int retryCount = 0;
while (offsetTokenFromSnowflake == null
|| !offsetTokenFromSnowflake.equals(String.valueOf(expectedOffsetTokenInSnowflake))) {
Thread.sleep(1_000);
offsetTokenFromSnowflake = channel1.getLatestCommittedOffsetToken();
retryCount++;
if (retryCount >= maxRetries) {
System.out.println(
String.format(
"Failed to look for required OffsetToken in Snowflake:%s after MaxRetryCounts:%s",
expectedOffsetTokenInSnowflake, maxRetries));
System.exit(1);

do {
String offsetTokenFromSnowflake = channel1.getLatestCommittedOffsetToken();
if (offsetTokenFromSnowflake != null
&& offsetTokenFromSnowflake.equals(String.valueOf(expectedOffsetTokenInSnowflake))) {
System.out.println("SUCCESSFULLY inserted " + totalRowsInTable + " rows");
break;
}
}
System.out.println("SUCCESSFULLY inserted " + totalRowsInTable + " rows");
retryCount++;
} while (retryCount < maxRetries);

// Close the channel, the function internally will make sure everything is committed (or throw
// an exception if there is any issue)
channel1.close().get();
}
}
}

0 comments on commit 2e0dce1

Please sign in to comment.