diff --git a/twitter-eventlistener-plugin/pom.xml b/twitter-eventlistener-plugin/pom.xml index ccc821de73e92..f850dd22d4939 100644 --- a/twitter-eventlistener-plugin/pom.xml +++ b/twitter-eventlistener-plugin/pom.xml @@ -119,7 +119,7 @@ com.google.cloud google-cloud-bigquery - 1.55.0 + 1.84.0 com.fasterxml.jackson.core @@ -145,6 +145,46 @@ org.slf4j jcl-over-slf4j + + javax.annotation + javax.annotation-api + + + com.google.errorprone + error_prone_annotations + + + com.google.j2objc + j2objc-annotations + + + + + + com.google.auth + google-auth-library-oauth2-http + 0.16.2 + + + com.google.guava + guava + + + com.fasterxml.jackson.core + jackson-core + + + com.google.errorprone + error_prone_annotations + + + com.google.j2objc + j2objc-annotations + + + commons-logging + commons-logging + diff --git a/twitter-eventlistener-plugin/src/main/java/com/twitter/presto/plugin/eventlistener/TwitterEventListenerConfig.java b/twitter-eventlistener-plugin/src/main/java/com/twitter/presto/plugin/eventlistener/TwitterEventListenerConfig.java index 2d869289ab2d7..1e321c323cece 100644 --- a/twitter-eventlistener-plugin/src/main/java/com/twitter/presto/plugin/eventlistener/TwitterEventListenerConfig.java +++ b/twitter-eventlistener-plugin/src/main/java/com/twitter/presto/plugin/eventlistener/TwitterEventListenerConfig.java @@ -29,6 +29,7 @@ public class TwitterEventListenerConfig private String knowledgeBaseFile; private String scribeCategory; private String bqTableFullName; + private String serviceAccountKeyFile; public String getSlackConfigFile() { @@ -137,4 +138,16 @@ public TwitterEventListenerConfig setBqTableFullName(String bqTableFullName) this.bqTableFullName = bqTableFullName; return this; } + + public String getServiceAccountKeyFile() + { + return serviceAccountKeyFile; + } + + @Config("event-listener.bq-json-key-file") + public TwitterEventListenerConfig setServiceAccountKeyFile(String serviceAccountKeyFile) + { + this.serviceAccountKeyFile = serviceAccountKeyFile; + return this; + } } diff --git a/twitter-eventlistener-plugin/src/main/java/com/twitter/presto/plugin/eventlistener/bq/BqStreamer.java b/twitter-eventlistener-plugin/src/main/java/com/twitter/presto/plugin/eventlistener/bq/BqStreamer.java index e8c4272f8cb65..5bb3007117993 100644 --- a/twitter-eventlistener-plugin/src/main/java/com/twitter/presto/plugin/eventlistener/bq/BqStreamer.java +++ b/twitter-eventlistener-plugin/src/main/java/com/twitter/presto/plugin/eventlistener/bq/BqStreamer.java @@ -18,6 +18,8 @@ import com.facebook.presto.spi.eventlistener.QueryIOMetadata; import com.facebook.presto.spi.eventlistener.QueryMetadata; import com.facebook.presto.spi.eventlistener.QueryStatistics; +import com.google.auth.oauth2.GoogleCredentials; +import com.google.auth.oauth2.ServiceAccountCredentials; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryOptions; import com.google.cloud.bigquery.InsertAllRequest; @@ -30,6 +32,9 @@ import javax.inject.Inject; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -57,7 +62,7 @@ public BqStreamer(TwitterEventListenerConfig config) String datasetName = parts.get(1); String tableName = parts.get(2); - this.bigquery = BigQueryOptions.getDefaultInstance().getService(); + this.bigquery = createInstance(config.getServiceAccountKeyFile()); this.tableId = TableId.of(projectId, datasetName, tableName); } @@ -78,6 +83,24 @@ public void handleQueryCompleted(QueryCompletedEvent queryCompletedEvent) } } + private static BigQuery createInstance(String jsonKeyPath) + { + if (jsonKeyPath == null) { + // fall back to default Instance by looking up the VM metadata + return BigQueryOptions.getDefaultInstance().getService(); + } + + GoogleCredentials credentials; + try (FileInputStream serviceAccountStream = new FileInputStream(new File(jsonKeyPath))) { + credentials = ServiceAccountCredentials.fromStream(serviceAccountStream); + } + catch (IOException e) { + throw new IllegalArgumentException(); + } + + return BigQueryOptions.newBuilder().setCredentials(credentials).build().getService(); + } + private static Map prepareRowContent(QueryCompletedEvent queryCompletedEvent) { Map rowContent = new HashMap<>(); diff --git a/twitter-eventlistener-plugin/src/test/java/com/twitter/presto/plugin/eventlistener/TestTwitterEventListenerConfig.java b/twitter-eventlistener-plugin/src/test/java/com/twitter/presto/plugin/eventlistener/TestTwitterEventListenerConfig.java index 9f33354fa08c1..e8bb77c2e1ccc 100644 --- a/twitter-eventlistener-plugin/src/test/java/com/twitter/presto/plugin/eventlistener/TestTwitterEventListenerConfig.java +++ b/twitter-eventlistener-plugin/src/test/java/com/twitter/presto/plugin/eventlistener/TestTwitterEventListenerConfig.java @@ -34,7 +34,9 @@ public void testDefaults() .setSlackNotificationTemplateFile(null) .setKnowledgeBaseFile(null) .setSlackUri(null) - .setSlackUsers(null)); + .setSlackUsers(null) + .setBqTableFullName(null) + .setServiceAccountKeyFile(null)); } @Test @@ -49,6 +51,8 @@ public void testExplicitPropertyMappings() .put("event-listener.slack-notification-template-file", "/etc/config/notification.json") .put("event-listener.slack-uri", "https://slack.com") .put("event-listener.slack-users", "user1|user2") + .put("event-listener.bq-table-full-name", "test-bq") + .put("event-listener.bq-json-key-file", "key.json") .build(); TwitterEventListenerConfig expected = new TwitterEventListenerConfig() @@ -59,7 +63,9 @@ public void testExplicitPropertyMappings() .setSlackHttpProxy(HostAndPort.fromString("localhost:1008")) .setSlackNotificationTemplateFile("/etc/config/notification.json") .setSlackUri(URI.create("https://slack.com")) - .setSlackUsers("user1|user2"); + .setSlackUsers("user1|user2") + .setBqTableFullName("test-bq") + .setServiceAccountKeyFile("key.json"); ConfigAssertions.assertFullMapping(properties, expected); }