Skip to content

Commit

Permalink
#40 add new source.
Browse files Browse the repository at this point in the history
  • Loading branch information
tanakaryo authored and tanakaryo committed Jan 22, 2024
1 parent 1a556d2 commit 48678a1
Show file tree
Hide file tree
Showing 12 changed files with 446 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -1,13 +1,25 @@
package dataflowtest;

import java.io.StringReader;

import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;

import org.w3c.dom.Document;
import org.xml.sax.InputSource;

/**
* Hello world!
*
*/
public class App
{
public static void main( String[] args )
public static void main( String[] args ) throws Exception
{
System.out.println( "Hello World!" );
String word ="<?xml version=\"1.0\" encoding=\"UTF-8\" ?><ProgramingList><Language id=\"001\" name=\"Java\">Javaは標準的なプログラミング言語です</Language><Language id=\"002\" name=\"Python\">Pythonは標準的なプログラミング言語です</Language></ProgramingList>";
DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
DocumentBuilder builder = factory.newDocumentBuilder();
InputSource is = new InputSource(new StringReader(word));
Document doc = builder.parse(is);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import java.io.IOException;

import org.apache.beam.examples.common.WriteOneFilePerWindow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.TextIO;
Expand All @@ -12,10 +11,8 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;

public class PubSubToGcs {
Expand Down Expand Up @@ -54,8 +51,8 @@ public static void main(String[] args) throws IOException {
options.setStreaming(true);

Pipeline pipeline = Pipeline.create(options);
//PCollection pc1 = pipeline.apply(Create.of("Hello"));

PubsubIO.Read<String> read = PubsubIO.readStrings().fromTopic(options.getInputTopic());
Window<String> window = Window.<String>into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize())));
TextIO.Write write = TextIO.write().withWindowedWrites().to(options.getBucketName()).withCompression(Compression.GZIP).withNumShards(5);

pipeline
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package dataflowtest.pipeline;

import java.io.StringReader;

import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
import org.w3c.dom.Document;
import org.xml.sax.InputSource;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

public class ParallelFilteringProcessPipeline {
/**
* Options
*/
public interface ParallelFilteringProcessPipelineOptions extends StreamingOptions {
@Description("The Cloud Pub/Sub topic to read from.")
@Required
String getInputTopic();

void setInputTopic(String value);

@Description("The Cloud Storage bucket of Main to write to")
@Required
String getBucketMain();

void setBucketMain(String value);

@Description("The Cloud Storage bucket of Sub to write to")
@Required
String getBucketSub();

void setBucketSub(String value);

@Description("Output file's window size in number of minutes.")
@Default.Integer(1)
Integer getWindowSizeMain();

void setWindowSizeMain(Integer value);

@Description("Output file's window size in num or min.")
@Default.Integer(1)
Integer getWindowSizeSub();

void setWindowSizeSub(Integer value);

@Description("Output files's num of sharding.")
@Default.Integer(1)
Integer getShardNumMain();

void setShardNumMain(Integer value);

@Description("Output files's num of sharding.")
@Default.Integer(1)
Integer getShardNumSub();

void setShardNumSub(Integer value);

@Description("Path of the output file including its filename prefix.")
@Required
String getOutput();

void setOutput(String value);
}

public static void main(String[] args) throws Exception {

ParallelFilteringProcessPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(ParallelFilteringProcessPipelineOptions.class);

options.setStreaming(true);

Pipeline pipeline = Pipeline.create(options);
PubsubIO.Read<String> read = PubsubIO.readStrings().fromTopic(options.getInputTopic());
PCollection<String> readItem = pipeline.apply("Read from Topic.", read);

// Main Process(JSON処理用)
readItem.apply("Filtering JSON",
ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void filteringJson(ProcessContext c) {
String word = c.element();
try {
ObjectMapper mapper = new ObjectMapper();
JsonNode root = mapper.readTree(word);
c.output(word);
} catch (Exception e) {
}
}
}))
.apply("Window Main",
Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowSizeMain()))))
.apply("Write to main", TextIO.write().withWindowedWrites().withNumShards(options.getShardNumMain())
.to(options.getBucketMain()));

// Sub Process(XML処理用)
readItem.apply("Filtering XML",
ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void filteringXML(ProcessContext c) {
String word = c.element();
try {
DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
DocumentBuilder builder = factory.newDocumentBuilder();
InputSource is = new InputSource(new StringReader(word));
Document doc = builder.parse(is);
c.output(word);
} catch (Exception e) {
}
}
}))
.apply("Window Sub",
Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowSizeSub()))))
.apply("Write to sub", TextIO.write().withWindowedWrites().withNumShards(options.getShardNumSub())
.to(options.getBucketSub()));

// Execute the pipeline and wait until it finishes running.
pipeline.run();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package dataflowtest.pipeline;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.joda.time.Duration;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

public class ParallelFilteringTagProcessPipeline {

/** Process Tags */
static final TupleTag<String> processJsonTag = new TupleTag<String>() {
};
static final TupleTag<String> processXmlTag = new TupleTag<String>() {
};

/**
* Options
*/
public interface ParallelFilteringTagProcessPipelineOptions extends StreamingOptions {
@Description("The Cloud Pub/Sub topic to read from.")
@Required
String getInputTopic();

void setInputTopic(String value);

@Description("The Cloud Storage bucket of Main to write to")
@Required
String getBucketMain();

void setBucketMain(String value);

@Description("The Cloud Storage bucket of Sub to write to")
@Required
String getBucketSub();

void setBucketSub(String value);

@Description("Output file's window size in number of minutes.")
@Default.Integer(1)
Integer getWindowSizeMain();

void setWindowSizeMain(Integer value);

@Description("Output file's window size in num or min.")
@Default.Integer(1)
Integer getWindowSizeSub();

void setWindowSizeSub(Integer value);

@Description("Output files's num of sharding.")
@Default.Integer(1)
Integer getShardNumMain();

void setShardNumMain(Integer value);

@Description("Output files's num of sharding.")
@Default.Integer(1)
Integer getShardNumSub();

void setShardNumSub(Integer value);

@Description("Path of the output file including its filename prefix.")
@Required
String getOutput();

void setOutput(String value);
}

public static void main(String[] args) throws Exception {

ParallelFilteringTagProcessPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(ParallelFilteringTagProcessPipelineOptions.class);

options.setStreaming(true);

Pipeline pipeline = Pipeline.create(options);
PubsubIO.Read<String> read = PubsubIO.readStrings().fromTopic(options.getInputTopic());
PCollection<String> readItem = pipeline.apply("Read from Topic.", read);

PCollectionTuple tagCollection = readItem.apply(ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
String element = c.element();
try {
ObjectMapper mapper = new ObjectMapper();
JsonNode node = mapper.readTree(element);
c.output(processJsonTag, element);
} catch (Exception e) {
c.output(processXmlTag, element);
}
}
}).withOutputTags(processJsonTag, TupleTagList.of(processXmlTag)));

// Main Process(JSON処理用)
tagCollection.get(processJsonTag)
.apply("Window for JSON Output",
Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowSizeMain()))))
.apply("Write to JSON Bucket", TextIO.write().withWindowedWrites().withNumShards(options.getShardNumMain())
.to(options.getBucketMain()));

// Sub Process(XML処理用)
tagCollection.get(processXmlTag)
.apply("Window for XML Output",
Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowSizeSub()))))
.apply("Write to XML Bucket", TextIO.write().withWindowedWrites().withNumShards(options.getShardNumSub())
.to(options.getBucketSub()));

// Execute the pipeline and wait until it finishes running.
pipeline.run();
}
}
Loading

0 comments on commit 48678a1

Please sign in to comment.