diff --git a/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/config/SinkConfig.java b/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/config/SinkConfig.java index 5a34896f0..59f6563fb 100644 --- a/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/config/SinkConfig.java +++ b/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/config/SinkConfig.java @@ -41,6 +41,7 @@ public class SinkConfig { private static final String BATCH_MAX_DELAY = "BATCH_MAX_DELAY"; private static final String BATCH_MAX_MESSAGES = "BATCH_MAX_MESSAGES"; private static final String BIG_QUERY_OUTPUT_MODE = "BIG_QUERY_OUTPUT_MODE"; + private static final String BIG_QUERY_DEFAULT_PROJECT = "BIG_QUERY_DEFAULT_PROJECT"; private static final String LOAD_MAX_BYTES = "LOAD_MAX_BYTES"; private static final String LOAD_MAX_DELAY = "LOAD_MAX_DELAY"; private static final String LOAD_MAX_FILES = "LOAD_MAX_FILES"; @@ -60,11 +61,11 @@ public class SinkConfig { private static final Set INCLUDE_ENV_VARS = ImmutableSet.of(INPUT_COMPRESSION, INPUT_PARALLELISM, INPUT_SUBSCRIPTION, BATCH_MAX_BYTES, BATCH_MAX_DELAY, BATCH_MAX_MESSAGES, - BIG_QUERY_OUTPUT_MODE, LOAD_MAX_BYTES, LOAD_MAX_DELAY, LOAD_MAX_FILES, OUTPUT_BUCKET, - OUTPUT_COMPRESSION, OUTPUT_FORMAT, OUTPUT_PARALLELISM, OUTPUT_TABLE, OUTPUT_TOPIC, - MAX_OUTSTANDING_ELEMENT_COUNT, MAX_OUTSTANDING_REQUEST_BYTES, SCHEMAS_LOCATION, - STREAMING_BATCH_MAX_BYTES, STREAMING_BATCH_MAX_DELAY, STREAMING_BATCH_MAX_MESSAGES, - STREAMING_DOCTYPES, STRICT_SCHEMA_DOCTYPES); + BIG_QUERY_OUTPUT_MODE, BIG_QUERY_DEFAULT_PROJECT, LOAD_MAX_BYTES, LOAD_MAX_DELAY, + LOAD_MAX_FILES, OUTPUT_BUCKET, OUTPUT_COMPRESSION, OUTPUT_FORMAT, OUTPUT_PARALLELISM, + OUTPUT_TABLE, OUTPUT_TOPIC, MAX_OUTSTANDING_ELEMENT_COUNT, MAX_OUTSTANDING_REQUEST_BYTES, + SCHEMAS_LOCATION, STREAMING_BATCH_MAX_BYTES, STREAMING_BATCH_MAX_DELAY, + STREAMING_BATCH_MAX_MESSAGES, STREAMING_DOCTYPES, STRICT_SCHEMA_DOCTYPES); // BigQuery.Write.Batch.getByteSize reports protobuf size, which can be ~1/3rd more // efficient than the JSON that actually gets sent over HTTP, so we use to 60% of the @@ -386,8 +387,11 @@ private static String getBigQueryOutputBucket(Env env) { return getGcsOutputBucket(env) + OUTPUT_TABLE + "=" + env.getString(OUTPUT_TABLE) + "/"; } - private static com.google.cloud.bigquery.BigQuery getBigQueryService(Env env) { - return BigQueryOptions.getDefaultInstance().getService(); + @VisibleForTesting + static com.google.cloud.bigquery.BigQuery getBigQueryService(Env env) { + BigQueryOptions.Builder builder = BigQueryOptions.getDefaultInstance().toBuilder(); + env.optString(BIG_QUERY_DEFAULT_PROJECT).ifPresent(builder::setProjectId); + return builder.build().getService(); } private static Storage getGcsService(Env env) { diff --git a/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/util/Env.java b/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/util/Env.java index 620bddb6c..b7026dffd 100644 --- a/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/util/Env.java +++ b/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/util/Env.java @@ -38,7 +38,8 @@ public boolean containsKey(String key) { return env.containsKey(key); } - private Optional optString(String key) { + /** Get the value of an optional environment variable. */ + public Optional optString(String key) { if (!include.contains(key)) { throw new IllegalArgumentException("key missing from include: " + key); } diff --git a/ingestion-sink/src/test/java/com/mozilla/telemetry/ingestion/sink/config/SinkConfigTest.java b/ingestion-sink/src/test/java/com/mozilla/telemetry/ingestion/sink/config/SinkConfigTest.java new file mode 100644 index 000000000..dc630010c --- /dev/null +++ b/ingestion-sink/src/test/java/com/mozilla/telemetry/ingestion/sink/config/SinkConfigTest.java @@ -0,0 +1,25 @@ +package com.mozilla.telemetry.ingestion.sink.config; + +import static org.junit.Assert.assertEquals; + +import com.google.cloud.bigquery.BigQueryOptions; +import com.google.common.collect.ImmutableSet; +import com.mozilla.telemetry.ingestion.sink.util.Env; +import org.junit.Rule; +import org.junit.Test; +import org.junit.contrib.java.lang.system.EnvironmentVariables; + +public class SinkConfigTest { + + @Rule + public final EnvironmentVariables environmentVariables = new EnvironmentVariables(); + + @Test + public void canDetectBqProject() { + environmentVariables.set("GOOGLE_CLOUD_PROJECT", "gcp-project"); + environmentVariables.set("BIG_QUERY_DEFAULT_PROJECT", "bq-project"); + Env env = new Env(ImmutableSet.of("BIG_QUERY_DEFAULT_PROJECT")); + assertEquals("gcp-project", BigQueryOptions.getDefaultInstance().getProjectId()); + assertEquals("bq-project", SinkConfig.getBigQueryService(env).getOptions().getProjectId()); + } +}