Skip to content
This repository has been archived by the owner on Jun 30, 2022. It is now read-only.

Commit

Permalink
Several refactorings in preparation for making the repo public.
Browse files Browse the repository at this point in the history
  • Loading branch information
silviulica committed Feb 25, 2016
1 parent 7ebad55 commit 8e90369
Show file tree
Hide file tree
Showing 53 changed files with 1,782 additions and 1,384 deletions.
3 changes: 0 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@ and streaming parallel data processing pipelines.
The Dataflow SDK for Python provides access to Dataflow capabilities
from the Python programming language.

## Table of Contents
[TOC]

## Status of this Release

This is the Google Cloud Dataflow SDK for Python version 0.2.0.
Expand Down
1 change: 1 addition & 0 deletions google/cloud/dataflow/coders/typecoders.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def register_standard_coders(self, fallback_coder):
"""Register coders for all basic and composite types."""
self._register_coder_internal(int, coders.VarIntCoder)
self._register_coder_internal(long, coders.VarIntCoder)
self._register_coder_internal(float, coders.FloatCoder)
self._register_coder_internal(str, coders.BytesCoder)
self._register_coder_internal(bytes, coders.BytesCoder)
self._register_coder_internal(unicode, coders.StrUtf8Coder)
Expand Down
145 changes: 77 additions & 68 deletions google/cloud/dataflow/dataflow_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import re
import unittest

from google.cloud.dataflow.error import PValueError
from google.cloud.dataflow.pipeline import Pipeline
from google.cloud.dataflow.pvalue import AsDict
from google.cloud.dataflow.pvalue import AsIter as AllOf
Expand All @@ -35,6 +34,8 @@
from google.cloud.dataflow.transforms import Map
from google.cloud.dataflow.transforms import ParDo
from google.cloud.dataflow.transforms import WindowInto
from google.cloud.dataflow.transforms.util import assert_that
from google.cloud.dataflow.transforms.util import equal_to
from google.cloud.dataflow.transforms.window import IntervalWindow
from google.cloud.dataflow.transforms.window import WindowFn

Expand All @@ -43,7 +44,7 @@ class DataflowTest(unittest.TestCase):
"""Dataflow integration tests."""

SAMPLE_DATA = 'aa bb cc aa bb aa \n' * 10
SAMPLE_RESULT = {'aa': 30, 'bb': 20, 'cc': 10}
SAMPLE_RESULT = [('cc', 10), ('bb', 20), ('aa', 30)]

# TODO(silviuc): Figure out a nice way to specify labels for stages so that
# internal steps get prepended with surorunding stage names.
Expand All @@ -61,25 +62,26 @@ def test_word_count(self):
result = (
(lines | FlatMap('GetWords', lambda x: re.findall(r'\w+', x)))
.apply('CountWords', DataflowTest.Count))
assert_that(result, equal_to(DataflowTest.SAMPLE_RESULT))
pipeline.run()
self.assertEqual(DataflowTest.SAMPLE_RESULT, dict(result.get()))

def test_map(self):
pipeline = Pipeline('DirectPipelineRunner')
lines = pipeline | Create('input', ['a', 'b', 'c'])
result = (lines
| Map('upper', str.upper)
| Map('prefix', lambda x, prefix: prefix + x, 'foo-'))
assert_that(result, equal_to(['foo-A', 'foo-B', 'foo-C']))
pipeline.run()
self.assertEqual(['foo-A', 'foo-B', 'foo-C'], list(result.get()))

def test_word_count_using_get(self):
pipeline = Pipeline('DirectPipelineRunner')
lines = pipeline | Create('SomeWords', [DataflowTest.SAMPLE_DATA])
result = (
(lines | FlatMap('GetWords', lambda x: re.findall(r'\w+', x)))
.apply('CountWords', DataflowTest.Count))
self.assertEqual(DataflowTest.SAMPLE_RESULT, dict(result.get()))
assert_that(result, equal_to(DataflowTest.SAMPLE_RESULT))
pipeline.run()

def test_par_do_with_side_input_as_arg(self):
pipeline = Pipeline('DirectPipelineRunner')
Expand All @@ -91,9 +93,8 @@ def test_par_do_with_side_input_as_arg(self):
'DecorateWords',
lambda x, pfx, sfx: ['%s-%s-%s' % (pfx, x, sfx)],
AsSingleton(prefix), suffix)
self.assertEquals(
['%s-%s-%s' % (prefix.get().next(), x, suffix) for x in words_list],
list(result.get()))
assert_that(result, equal_to(['xyz-%s-zyx' % x for x in words_list]))
pipeline.run()

def test_par_do_with_side_input_as_keyword_arg(self):
pipeline = Pipeline('DirectPipelineRunner')
Expand All @@ -105,9 +106,8 @@ def test_par_do_with_side_input_as_keyword_arg(self):
'DecorateWords',
lambda x, pfx, sfx: ['%s-%s-%s' % (pfx, x, sfx)],
prefix, sfx=AsSingleton(suffix))
self.assertEquals(
['%s-%s-%s' % (prefix, x, suffix.get().next()) for x in words_list],
list(result.get()))
assert_that(result, equal_to(['zyx-%s-xyz' % x for x in words_list]))
pipeline.run()

def test_par_do_with_do_fn_object(self):
class SomeDoFn(DoFn):
Expand All @@ -123,9 +123,8 @@ def process(self, context, prefix, suffix):
suffix = pipeline | Create('SomeString', ['xyz']) # side in
result = words | ParDo('DecorateWordsDoFn', SomeDoFn(), prefix,
suffix=AsSingleton(suffix))
self.assertEquals(
['%s-%s-%s' % (prefix, x, suffix.get().next()) for x in words_list],
list(result.get()))
assert_that(result, equal_to(['zyx-%s-xyz' % x for x in words_list]))
pipeline.run()

def test_par_do_with_multiple_outputs_and_using_yield(self):
class SomeDoFn(DoFn):
Expand All @@ -142,9 +141,10 @@ def process(self, context):
nums = pipeline | Create('Some Numbers', [1, 2, 3, 4])
results = nums | ParDo(
'ClassifyNumbers', SomeDoFn()).with_outputs('odd', 'even', main='main')
self.assertEquals([1, 2, 3, 4], list(results.main.get()))
self.assertEquals([1, 3], list(results.odd.get()))
self.assertEquals([2, 4], list(results.even.get()))
assert_that(results.main, equal_to([1, 2, 3, 4]))
assert_that(results.odd, equal_to([1, 3]), label='assert:odd')
assert_that(results.even, equal_to([2, 4]), label='assert:even')
pipeline.run()

def test_par_do_with_multiple_outputs_and_using_return(self):
def some_fn(v):
Expand All @@ -157,9 +157,10 @@ def some_fn(v):
nums = pipeline | Create('Some Numbers', [1, 2, 3, 4])
results = nums | FlatMap(
'ClassifyNumbers', some_fn).with_outputs('odd', 'even', main='main')
self.assertEquals([1, 2, 3, 4], list(results.main.get()))
self.assertEquals([1, 3], list(results.odd.get()))
self.assertEquals([2, 4], list(results.even.get()))
assert_that(results.main, equal_to([1, 2, 3, 4]))
assert_that(results.odd, equal_to([1, 3]), label='assert:odd')
assert_that(results.even, equal_to([2, 4]), label='assert:even')
pipeline.run()

def test_empty_singleton_side_input(self):
pipeline = Pipeline('DirectPipelineRunner')
Expand All @@ -170,53 +171,61 @@ def my_fn(k, s):
v = ('empty' if isinstance(s, EmptySideInput) else 'full')
return [(k, v)]
result = pcol | FlatMap('compute', my_fn, AsSingleton(side))
self.assertEquals([(1, 'empty'), (2, 'empty')], sorted(result.get()))
assert_that(result, equal_to([(1, 'empty'), (2, 'empty')]))
pipeline.run()

def test_multi_valued_singleton_side_input(self):
pipeline = Pipeline('DirectPipelineRunner')
pcol = pipeline | Create('start', [1, 2])
side = pipeline | Create('side', [3, 4]) # 2 values in side input.
result = pcol | FlatMap('compute', lambda x, s: [x * s], AsSingleton(side))
pcol | FlatMap('compute', lambda x, s: [x * s], AsSingleton(side))
with self.assertRaises(ValueError) as e:
result.get()
pipeline.run()

def test_default_value_singleton_side_input(self):
pipeline = Pipeline('DirectPipelineRunner')
pcol = pipeline | Create('start', [1, 2])
side = pipeline | Create('side', []) # 0 values in side input.
result = (
pcol | FlatMap('compute', lambda x, s: [x * s], AsSingleton(side, 10)))
self.assertEquals([10, 20], sorted(result.get()))
assert_that(result, equal_to([10, 20]))
pipeline.run()

def test_iterable_side_input(self):
pipeline = Pipeline('DirectPipelineRunner')
pcol = pipeline | Create('start', [1, 2])
side = pipeline | Create('side', [3, 4]) # 2 values in side input.
result = pcol | FlatMap('compute',
lambda x, s: [x * y for y in s], AllOf(side))
self.assertEquals([3, 4, 6, 8], sorted(result.get()))
assert_that(result, equal_to([3, 4, 6, 8]))
pipeline.run()

def test_undeclared_side_outputs(self):
pipeline = Pipeline('DirectPipelineRunner')
nums = pipeline | Create('Some Numbers', [1, 2, 3, 4])
results = nums | FlatMap(
'ClassifyNumbers',
lambda x: [x, SideOutputValue('even' if x % 2 == 0 else 'odd', x)]
).with_outputs()
self.assertEquals([1, 2, 3, 4], list(results[None].get()))
self.assertEquals([1, 3], list(results.odd.get()))
self.assertEquals([2, 4], list(results.even.get()))
).with_outputs('odd', 'even', main='main')
# TODO(silviuc): Revisit this test to check for undeclared side outputs.
# This should work with .with_outputs() without any tags declared and
# the results[None] should work also.
assert_that(results.main, equal_to([1, 2, 3, 4]))
assert_that(results.odd, equal_to([1, 3]), label='assert:odd')
assert_that(results.even, equal_to([2, 4]), label='assert:even')
pipeline.run()

def test_empty_side_outputs(self):
pipeline = Pipeline('DirectPipelineRunner')
nums = pipeline | Create('Some Numbers', [1, 3, 5])
results = nums | FlatMap(
'ClassifyNumbers',
lambda x: [x, SideOutputValue('even' if x % 2 == 0 else 'odd', x)]
).with_outputs()
self.assertEquals([1, 3, 5], list(results[None].get()))
self.assertEquals([1, 3, 5], list(results.odd.get()))
self.assertEquals([], list(results.even.get()))
).with_outputs('odd', 'even', main='main')
assert_that(results.main, equal_to([1, 3, 5]))
assert_that(results.odd, equal_to([1, 3, 5]), label='assert:odd')
assert_that(results.even, equal_to([]), label='assert:even')
pipeline.run()

def test_as_list_and_as_dict_side_inputs(self):
a_list = [5, 1, 3, 2, 9]
Expand All @@ -229,10 +238,17 @@ def test_as_list_and_as_dict_side_inputs(self):
'concatenate',
lambda x, the_list, the_dict: [[x, the_list, the_dict]],
AsList(side_list), AsDict(side_pairs))
[[result_elem, result_list, result_dict]] = results.get()
self.assertEquals(1, result_elem)
self.assertEquals(sorted(a_list), sorted(result_list))
self.assertEquals(sorted(some_pairs), sorted(result_dict.iteritems()))

def matcher(expected_elem, expected_list, expected_pairs):
def match(actual):
[[actual_elem, actual_list, actual_dict]] = actual
equal_to([expected_elem])([actual_elem])
equal_to(expected_list)(actual_list)
equal_to(expected_pairs)(actual_dict.iteritems())
return match

assert_that(results, matcher(1, a_list, some_pairs))
pipeline.run()

def test_as_list_without_unique_labels(self):
a_list = [1, 2, 3]
Expand All @@ -258,10 +274,16 @@ def test_as_list_with_unique_labels(self):
lambda x, ls1, ls2: [[x, ls1, ls2]],
AsList(side_list), AsList(side_list, label='label'))

[[result_elem, result_ls1, result_ls2]] = results.get()
self.assertEquals(1, result_elem)
self.assertEquals(sorted(a_list), sorted(result_ls1))
self.assertEquals(sorted(a_list), sorted(result_ls2))
def matcher(expected_elem, expected_list):
def match(actual):
[[actual_elem, actual_list1, actual_list2]] = actual
equal_to([expected_elem])([actual_elem])
equal_to(expected_list)(actual_list1)
equal_to(expected_list)(actual_list2)
return match

assert_that(results, matcher(1, a_list))
pipeline.run()

def test_as_dict_without_unique_labels(self):
some_kvs = [('a', 1), ('b', 2)]
Expand All @@ -286,30 +308,17 @@ def test_as_dict_with_unique_labels(self):
'test',
lambda x, dct1, dct2: [[x, dct1, dct2]],
AsDict(side_kvs), AsDict(side_kvs, label='label'))
[[result_elem, result_dict1, result_dict2]] = results.get()
self.assertEquals(1, result_elem)
self.assertEquals(sorted(some_kvs), sorted(result_dict1.iteritems()))
self.assertEquals(sorted(some_kvs), sorted(result_dict2.iteritems()))

def test_runner_clear(self):
"""Tests for Runner.clear() method.
Note that it is not expected that users of the SDK will call this directly.
More likely intermediate layers will call this to control the amount of
caching for computed values.
"""
pipeline = Pipeline('DirectPipelineRunner')
words_list = ['aa', 'bb', 'cc']
words = pipeline | Create('SomeWords', words_list)
result = words | FlatMap('DecorateWords', lambda x: ['x-%s' % x])
self.assertEquals(['x-%s' % x for x in words_list], list(result.get()))
# Now clear the entire pipeline.
pipeline.runner.clear(pipeline)
self.assertRaises(PValueError, pipeline.runner.get_pvalue, result)
# Recompute and clear the pvalue node.
result.get()
pipeline.runner.clear(pipeline, node=result)
self.assertRaises(PValueError, pipeline.runner.get_pvalue, result)

def matcher(expected_elem, expected_kvs):
def match(actual):
[[actual_elem, actual_dict1, actual_dict2]] = actual
equal_to([expected_elem])([actual_elem])
equal_to(expected_kvs)(actual_dict1.iteritems())
equal_to(expected_kvs)(actual_dict2.iteritems())
return match

assert_that(results, matcher(1, some_kvs))
pipeline.run()

def test_window_transform(self):
class TestWindowFn(WindowFn):
Expand All @@ -327,10 +336,10 @@ def merge(self, existing_windows):
result = (numbers
| WindowInto('W', windowfn=TestWindowFn())
| GroupByKey('G'))
assert_that(
result, equal_to([(1, [10]), (1, [10]), (2, [20]),
(2, [20]), (3, [30]), (3, [30])]))
pipeline.run()
self.assertEqual(
[(1, [10]), (1, [10]), (2, [20]), (2, [20]), (3, [30]), (3, [30])],
sorted(result.get(), key=lambda x: x[0]))


if __name__ == '__main__':
Expand Down
30 changes: 16 additions & 14 deletions google/cloud/dataflow/examples/complete/autocomplete.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,34 @@

from __future__ import absolute_import

import argparse
import logging
import re
import sys

import google.cloud.dataflow as df
from google.cloud.dataflow.utils.options import add_option
from google.cloud.dataflow.utils.options import get_options


def run(options=None):
p = df.Pipeline(options=get_options(options))
def run(argv=sys.argv[1:]):

parser = argparse.ArgumentParser()
parser.add_argument('--input',
required=True,
help='Input file to process.')
parser.add_argument('--output',
required=True,
help='Output file to write results to.')
known_args, pipeline_args = parser.parse_known_args(argv)

p = df.Pipeline(argv=pipeline_args)

(p # pylint: disable=expression-not-assigned
| df.io.Read('read', df.io.TextFileSource(p.options.input))
| df.io.Read('read', df.io.TextFileSource(known_args.input))
| df.FlatMap('split', lambda x: re.findall(r'[A-Za-z\']+', x))
| TopPerPrefix('TopPerPrefix', 5)
| df.Map('format',
lambda (prefix, candidates): '%s: %s' % (prefix, candidates))
| df.io.Write('write', df.io.TextFileSink(p.options.output)))
| df.io.Write('write', df.io.TextFileSink(known_args.output)))
p.run()


Expand Down Expand Up @@ -65,14 +75,6 @@ def extract_prefixes((word, count)):
yield prefix, (count, word)


add_option(
'--input', dest='input', required=True,
help='Input file to process.')
add_option(
'--output', dest='output', required=True,
help='Output file to write results to.')


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
Loading

0 comments on commit 8e90369

Please sign in to comment.