Skip to content
This repository has been archived by the owner on Jun 30, 2022. It is now read-only.

Commit

Permalink
Small fixes in BigQuery snippets and wordcount example.
Browse files Browse the repository at this point in the history
  • Loading branch information
silviulica committed Feb 25, 2016
1 parent 8e90369 commit 3a56ce7
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 33 deletions.
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -332,10 +332,10 @@ you can write to.

```python
import google.cloud.dataflow as df
# The output table needs to point to something in your project.
output_table = 'YOUR_PROJECT:DATASET.TABLE'
input_table = 'clouddataflow-readonly:samples.weather_stations'
p = df.Pipeline('DirectPipelineRunner')
project = 'YOUR-PROJECT'
output_table = '%s:DATASET.TABLENAME' % project
p = df.Pipeline(argv=['--project', project])
(p
| df.Read('read', df.io.BigQuerySource(input_table))
| df.FlatMap(
Expand All @@ -357,12 +357,12 @@ of using the whole table.

```python
import google.cloud.dataflow as df
# The output table needs to point to something in your project.
output_table = 'YOUR_PROJECT:DATASET.TABLE'
project = 'YOUR-PROJECT'
output_table = '%s:DATASET.TABLENAME' % project
input_query = 'SELECT month, COUNT(month) AS tornado_count ' \
'FROM [clouddataflow-readonly:samples.weather_stations] ' \
'WHERE tornado=true GROUP BY month'
p = df.Pipeline('DirectPipelineRunner')
p = df.Pipeline(argv=['--project', project])
(p
| df.Read('read', df.io.BigQuerySource(query=input_query))
| df.Write('write', df.io.BigQuerySink(
Expand Down
33 changes: 6 additions & 27 deletions google/cloud/dataflow/examples/wordcount_debugging.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,23 +78,6 @@ def process(self, context):
context.aggregate_to(self.umatched_words, 1)


class AssertEqualsIgnoringOrderFn(df.DoFn):
"""A DoFn that asserts that its input is the same as the expected value.
This DoFn is useful only for testing purposes with small data sets. It will
materialize all of its input and assumes that its input is a singleton.
"""

def __init__(self, expected_elements):
super(AssertEqualsIgnoringOrderFn, self).__init__()
self.expected_elements = expected_elements

def process(self, context):
assert sorted(context.element) == sorted(self.expected_elements), (
'AssertEqualsIgnoringOrderFn input does not match expected value.'
'%s != %s' % (context.element, self.expected_elements))


class CountWords(df.PTransform):
"""A transform to count the occurrences of each word.
Expand Down Expand Up @@ -136,19 +119,15 @@ def run(argv=sys.argv[1:]):
p | df.io.Read('read', df.io.TextFileSource(known_args.input))
| CountWords() | df.ParDo('FilterText', FilterTextFn('Flourish|stomach')))

# AssertEqualsIgnoringOrderFn is a convenient DoFn to validate its input.
# Asserts are best used in unit tests with small data sets but is demonstrated
# here as a teaching tool.
# assert_that is a convenient PTransform that checks a PCollection has an
# expected value. Asserts are best used in unit tests with small data sets but
# is demonstrated here as a teaching tool.
#
# Note AssertEqualsIgnoringOrderFn does not provide any output and that
# successful completion of the Pipeline implies that the expectations were
# met. Learn more at
# Note assert_that does not provide any output and that successful completion
# of the Pipeline implies that the expectations were met. Learn more at
# https://cloud.google.com/dataflow/pipelines/testing-your-pipeline on how to
# test your pipeline.
# pylint: disable=expression-not-assigned
(filtered_words
| df.transforms.combiners.ToList('ToList')
| df.ParDo(AssertEqualsIgnoringOrderFn([('Flourish', 3), ('stomach', 1)])))
df.assert_that(filtered_words, df.equal_to([('Flourish', 3), ('stomach', 1)]))

# Format the counts into a PCollection of strings and write the output using a
# "Write" transform that has side effects.
Expand Down

0 comments on commit 3a56ce7

Please sign in to comment.