Skip to content

Commit

Permalink
#40 add new pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
tanakaryo authored and tanakaryo committed Jan 20, 2024
1 parent f417ee4 commit 068cbb2
Show file tree
Hide file tree
Showing 9 changed files with 545 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"java.configuration.updateBuildConfiguration": "interactive",
"java.compile.nullAnalysis.mode": "automatic"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>dataflowtest</groupId>
<artifactId>dataflowtest</artifactId>
<name>dataflowtest</name>
<version>1.0-SNAPSHOT</version>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>${maven-exec-plugin.version}</version>
<configuration>
<cleanupDaemonThreads>false</cleanupDaemonThreads>
</configuration>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven-compiler-plugin.version}</version>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>${maven-jar-plugin.version}</version>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
<mainClass>com.examples.pubsub.streaming.PubSubToGCS</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<artifactId>maven-shade-plugin</artifactId>
<version>${maven-shade-plugin.version}</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<finalName>${project.artifactId}-bundled-${project.version}</finalName>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/LICENSE</exclude>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer />
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<releases>
<enabled>false</enabled>
</releases>
<snapshots />
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
</repository>
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>
<properties>
<maven-jar-plugin.version>3.3.0</maven-jar-plugin.version>
<maven.compiler.source>1.8</maven.compiler.source>
<beam.version>2.52.0</beam.version>
<maven-shade-plugin.version>3.5.1</maven-shade-plugin.version>
<maven-compiler-plugin.version>3.11.0</maven-compiler-plugin.version>
<maven.compiler.target>1.8</maven.compiler.target>
<slf4j.version>2.0.9</slf4j.version>
<maven-exec-plugin.version>3.1.1</maven-exec-plugin.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
</project>
20 changes: 20 additions & 0 deletions services/cloud-dataflow/java/test2/dataflowtest/deploy.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!/bin/bash

gsutil mb -c standard -l asia-northeast1 gs://jp20240120dftestbkt

mvn compile exec:java \
-Dexec.mainClass=dataflowtest.PubSubToGcs \
-Dexec.cleanupDaemonThreads=false \
-Dexec.args=" \
--jobName=dftest \
--project=$PROJECT_ID \
--region=asia-northeast1 \
--workerZone=asia-northeast1-a \
--workerMachineType=e2-medium \
--inputTopic=projects/$PROJECT_ID/topics/$TOPIC_ID \
--output=gs://$BUCKET_NAME/samples/output \
--gcpTempLocation=gs://$BUCKET_NAME/temp \
--runner=DataflowRunner \
--windowSize=2 \
--serviceAccount=$SERVICE_ACCOUNT \
--bucketName=gs://jp20240120dftestbkt"
162 changes: 156 additions & 6 deletions services/cloud-dataflow/java/test2/dataflowtest/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,163 @@
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<name>dataflowtest</name>
<url>http://maven.apache.org</url>
<dependencies>

<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

<beam.version>2.52.0</beam.version>

<maven-compiler-plugin.version>3.11.0</maven-compiler-plugin.version>
<maven-exec-plugin.version>3.1.1</maven-exec-plugin.version>
<maven-jar-plugin.version>3.3.0</maven-jar-plugin.version>
<maven-shade-plugin.version>3.5.1</maven-shade-plugin.version>
<slf4j.version>2.0.9</slf4j.version>
</properties>

<repositories>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven-compiler-plugin.version}</version>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>${maven-jar-plugin.version}</version>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
<mainClass>com.examples.pubsub.streaming.PubSubToGCS</mainClass>
</manifest>
</archive>
</configuration>
</plugin>

<!--
Configures `mvn package` to produce a bundled jar ("fat jar") for runners
that require this for job submission to a cluster.
-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>${maven-shade-plugin.version}</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<finalName>${project.artifactId}-bundled-${project.version}</finalName>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/LICENSE</exclude>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>

<pluginManagement>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>${maven-exec-plugin.version}</version>
<configuration>
<cleanupDaemonThreads>false</cleanupDaemonThreads>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>

<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>${beam.version}</version>
</dependency>

<!--
By default, the starter project has a dependency on the Beam DirectRunner
to enable development and testing of pipelines. To run on another of the
Beam runners, add its module to this pom.xml according to the
runner-specific setup instructions on the Beam website:
http://beam.apache.org/documentation/#runners
-->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>${beam.version}</version>
</dependency>

<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-examples-java</artifactId>
<version>${beam.version}</version>
</dependency>

<!-- slf4j API frontend binding with JUL backend -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
<version>${slf4j.version}</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package dataflowtest;

import java.io.IOException;

import org.apache.beam.examples.common.WriteOneFilePerWindow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;

public class PubSubToGcs {

public interface PubSubToGcsOptions extends StreamingOptions {
@Description("The Cloud Pub/Sub topic to read from.")
@Required
String getInputTopic();

void setInputTopic(String value);

@Description("The Cloud Storage bucket to write to")
@Required
String getBucketName();

void setBucketName(String value);

@Description("Output file's window size in number of minutes.")
@Default.Integer(1)
Integer getWindowSize();

void setWindowSize(Integer value);

@Description("Path of the output file including its filename prefix.")
@Required
String getOutput();

void setOutput(String value);
}

public static void main(String[] args) throws IOException {

PubSubToGcsOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(PubSubToGcsOptions.class);

options.setStreaming(true);

Pipeline pipeline = Pipeline.create(options);
//PCollection pc1 = pipeline.apply(Create.of("Hello"));

TextIO.Write write = TextIO.write().withWindowedWrites().to(options.getBucketName()).withCompression(Compression.GZIP).withNumShards(5);

pipeline
// 1) Read string messages from a Pub/Sub topic.
.apply("Read PubSub Messages", PubsubIO.readStrings().fromTopic(options.getInputTopic()))
// 2) Group the messages into fixed-sized minute intervals.
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))))
// 3) Write one file to GCS for every window of messages.
.apply("Write Files to GCS", write);

// Execute the pipeline and wait until it finishes running.
pipeline.run();
}

}
Loading

0 comments on commit 068cbb2

Please sign in to comment.