diff --git a/.gitignore b/.gitignore index 3e4050bb..4c5ce0b6 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,4 @@ _release/ _resources/ *.sw* .tox/ +.python-version diff --git a/doc/source/api.rst b/doc/source/api.rst index 8e9ddd1a..fffcc473 100644 --- a/doc/source/api.rst +++ b/doc/source/api.rst @@ -6,6 +6,8 @@ Tuples ------ .. autoclass:: streamparse.Tuple + :no-members: + :no-inherited-members: You should never have to instantiate an instance of a :class:`streamparse.Tuple` yourself as streamparse handles this for you @@ -22,7 +24,7 @@ Components Both :class:`streamparse.Bolt` and :class:`streamparse.Spout` inherit from a common base-class, :class:`streamparse.storm.component.Component`. It extends pystorm's code for handling `Multi-Lang IPC between Storm and Python `__ -and adds suport for our Python Topology DSL. +and adds suport for our Python :ref:`topology_dsl`. Spouts ^^^^^^ @@ -31,19 +33,16 @@ Spouts are data sources for topologies, they can read from any data source and emit tuples into streams. .. autoclass:: streamparse.Spout - :inherited-members: :show-inheritance: Bolts ^^^^^ .. autoclass:: streamparse.Bolt - :inherited-members: :show-inheritance: .. autoclass:: streamparse.BatchingBolt - :inherited-members: :show-inheritance: @@ -51,4 +50,32 @@ Logging ------- .. autoclass:: streamparse.StormHandler - :inherited-members: + :show-inheritance: + :no-inherited-members: + +Topology DSL +------------ + +.. autoclass:: streamparse.Topology + :no-members: + :no-inherited-members: + +.. autoclass:: streamparse.Grouping + +.. autoclass:: streamparse.Stream + +.. autoclass:: streamparse.JavaBolt + :members: spec + :no-inherited-members: + +.. autoclass:: streamparse.JavaSpout + :members: spec + :no-inherited-members: + +.. autoclass:: streamparse.ShellBolt + :members: spec + :no-inherited-members: + +.. autoclass:: streamparse.ShellSpout + :members: spec + :no-inherited-members: diff --git a/doc/source/conf.py b/doc/source/conf.py index cac457c8..af312c37 100644 --- a/doc/source/conf.py +++ b/doc/source/conf.py @@ -100,6 +100,11 @@ # If true, keep warnings as "system message" paragraphs in the built documents. #keep_warnings = False +# Add classes' __init__ method docstring to class doc output +autoclass_content = 'both' + +# Cut down on a ton of repetition in our docs: +autodoc_default_flags = ['members', 'inherited-members'] # -- Options for HTML output ---------------------------------------------- @@ -140,7 +145,7 @@ # Add any paths that contain custom static files (such as style sheets) here, # relative to this directory. They are copied after the builtin static files, # so a file named "default.css" will overwrite the builtin "default.css". -html_static_path = ['_static'] +# html_static_path = ['_static'] # Add any extra paths that contain custom files (such as robots.txt or # .htaccess) here, relative to this directory. These files are copied diff --git a/doc/source/develop.rst b/doc/source/develop.rst index ff73447a..e0f522ef 100644 --- a/doc/source/develop.rst +++ b/doc/source/develop.rst @@ -6,25 +6,6 @@ Lein Install Leiningen according to the instructions in the quickstart. - -Using Local Clojure Interop Library ------------------------------------- - -You can tell ``lein`` to point directly at streamparse's Clojure repo and use the -code there for all of the interop commands, so that you can test changes while -developing. - -To do this, add a directory called ``checkouts`` and symlink it up:: - - mkdir checkouts - cd checkouts - ln -s ../../../streamparse/jvm streamparse - cd .. - -Now, comment out the ``com.parsely/streamparse`` dependency in ``project.clj``. -It will now pick up the Clojure commands from your local repo. So, now you can -tweak and change them! - Local pip installation ---------------------- @@ -51,7 +32,7 @@ You can clone Storm from Github here:: There are tags available for releases, e.g.:: - git checkout v0.9.2-incubating + git checkout v0.10.0 To build a local Storm release, use:: diff --git a/doc/source/faq.rst b/doc/source/faq.rst index e43fb94c..1f871e25 100644 --- a/doc/source/faq.rst +++ b/doc/source/faq.rst @@ -87,8 +87,7 @@ How should I install streamparse on the cluster nodes? streamparse assumes your Storm servers have Python, pip, and virtualenv installed. After that, the installation of all required dependencies (including streamparse itself) is taken care of via the `config.json` file for the -streamparse project and the sparse submit command. See :ref:`Remote Deployment` -for more information. +streamparse project and the ``sparse submit`` command. Should I install Clojure? ~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/doc/source/index.rst b/doc/source/index.rst index 86c806df..a8e273ed 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -21,7 +21,7 @@ Integrates with Apache Storm. develop faq -.. image:: https://raw.githubusercontent.com/Parsely/streamparse/master/doc/source/images/quickstart.gif +.. image:: images/quickstart.gif .. raw:: html @@ -37,4 +37,4 @@ Indices and tables * :ref:`modindex` * :ref:`search` -.. |logo| image:: https://raw.githubusercontent.com/Parsely/streamparse/master/doc/source/images/streamparse-logo.png +.. |logo| image:: images/streamparse-logo.png diff --git a/doc/source/quickstart.rst b/doc/source/quickstart.rst index 452b4d7b..fa326bea 100644 --- a/doc/source/quickstart.rst +++ b/doc/source/quickstart.rst @@ -57,9 +57,8 @@ the command-line tool, ``sparse``:: create wordcount/src/spouts/ create wordcount/src/spouts/__init__.py create wordcount/src/spouts/words.py - create wordcount/tasks.py create wordcount/topologies - create wordcount/topologies/wordcount.clj + create wordcount/topologies/wordcount.py create wordcount/virtualenvs create wordcount/virtualenvs/wordcount.txt Done. @@ -90,7 +89,7 @@ streamparse projects expect to have the following directory layout: "project.clj","leiningen project file, can be used to add external JVM dependencies." "src/","Python source files (bolts/spouts/etc.) for topologies." "tasks.py","Optional custom invoke tasks." - "topologies/","Contains topology definitions written using the `Clojure DSL `_ for Storm." + "topologies/","Contains topology definitions written using the :ref:`topology_dsl`. "virtualenvs/","Contains pip requirements files in order to install dependencies on remote Storm servers." @@ -98,184 +97,32 @@ Defining Topologies ------------------- Storm's services are Thrift-based and although it is possible to define a -topology in pure Python using Thrift, it introduces a host of additional -dependencies which are less than trivial to setup for local development. In -addition, it turns out that using Clojure to define topologies, still feels -fairly Pythonic, so the authors of streamparse decided this was a good -compromise. +topology in pure Python using Thrift. For details see :ref:`topology_dsl`. Let's have a look at the definition file created by using the ``sparse quickstart`` command. -.. code-block:: clojure +.. literalinclude:: ../../streamparse/bootstrap/project/topologies/wordcount.py + :language: python - (ns wordcount - (:use [streamparse.specs]) - (:gen-class)) - - (defn wordcount [options] - [ - ;; spout configuration - {"word-spout" (python-spout-spec - options - "spouts.words.WordSpout" - ["word"] - ) - } - ;; bolt configuration - {"count-bolt" (python-bolt-spec - options - {"word-spout" :shuffle} - "bolts.wordcount.WordCounter" - ["word" "count"] - :p 2 - ) - } - ] - ) - -The first block of code we encounter effectively states "import the Clojure DSL -functions for Storm." By convention, use the same name for the namespace -(``ns``) and function (``defn``) as the basename of the file ("wordcount"), -though these are not strictly required. - -.. code-block:: clojure - - (ns wordcount - (:use [streamparse.specs]) - (:gen-class)) - -The next block of code actually defines the topology and stores it into a -function named "wordcount". - -.. code-block:: clojure - - (defn wordcount [options] - [ - ;; spout configuration - {"word-spout" (python-spout-spec - options - "spouts.words.WordSpout" - ["word"] - ) - } - ;; bolt configuration - {"count-bolt" (python-bolt-spec - options - {"word-spout" :shuffle} - "bolts.wordcount.WordCounter" - ["word" "count"] - :p 2 - ) - } - ] - ) - -It turns out, the name of the function doesn't matter much; we've used -``wordcount`` above, but it could just as easily be ``bananas``. What is -important, is that **the function must return an array with only two -dictionaries and take one argument**, and that the last function in the file is -the DSL spec (i.e. do not add a ``defn`` below this function). - -The first dictionary holds a named mapping of all the spouts that exist in the -topology, the second holds a named mapping of all the bolts. The ``options`` -argument contains a mapping of topology settings. - -An additional benefit of defining topologies in Clojure is that we're able to -mix and match the types of spouts and bolts. In most cases, you may want to -use a pure Python topology, but you could easily use JVM-based spouts and bolts -or even spouts and bolts written in other languages like Ruby, Go, etc. - -Since you'll most often define spouts and bolts in Python however, we'll look -at two important functions provided by streamparse: ``python-spout-spec`` -and ``python-bolt-spec``. - -When creating a Python-based spout, we provide a name for the spout and a -definition of that spout via ``python-spout-spec``: - -.. code-block:: clojure - - {"sentence-spout-1" (python-spout-spec - ;; topology options passed in - options - ;; name of the python class to ``run`` - "spouts.SentenceSpout" - ;; output specification, what named fields will this spout emit? - ["sentence"] - ;; configuration parameters, can specify multiple - :p 2) - "sentence-spout-2" (shell-spout-spec - options - "spouts.OtherSentenceSpout" - ["sentence"])} - -In the example above, we've defined two spouts in our topology: -``sentence-spout-1`` and ``sentence-spout-2`` and told Storm to run these -components. ``python-spout-spec`` will use the ``options`` mapping to get -the path to the python executable that Storm will use and streamparse will -run the class provided. We've also let Storm know exactly what these spouts -will be emitting, namely a single field called ``sentence``. - -You'll notice that in ``sentence-spout-1``, we've passed an optional map of -configuration parameters ``:p 2``, which sets the spout to have 2 Python -processes. This is discussed in :ref:`parallelism`. - -Creating bolts is very similar and uses the ``python-bolt-spec`` function: - -.. code-block:: clojure - - {"sentence-splitter" (python-bolt-spec - ;; topology options passed in - options - ;; inputs, where does this bolt recieve it's tuples from? - {"sentence-spout-1" :shuffle - "sentence-spout-2" :shuffle} - ;; class to run - "bolts.SentenceSplitter" - ;; output spec, what tuples does this bolt emit? - ["word"] - ;; configuration parameters - :p 2) - "word-counter" (python-bolt-spec - options - ;; recieves tuples from "sentence-splitter", grouped by word - {"sentence-splitter" ["word"]} - "bolts.WordCounter" - ["word" "count"]) - "word-count-saver" (python-bolt-spec - ;; topology options passed in - options - {"word-counter" :shuffle} - "bolts.WordSaver" - ;; does not emit any fields - [])} - -In the example above, we define 3 bolts by name ``sentence-splitter``, -``word-counter`` and ``word-count-saver``. Since bolts are generally supposed -to process some input and optionally produce some output, we have to tell Storm -where a bolts inputs come from and whether or not we'd like Storm to use any -stream grouping on the tuples from the input source. - -In the ``sentence-splitter`` bolt, you'll notice that we define two input -sources for the bolt. It's completely fine to add multiple sources to any bolts. - -In the ``word-counter`` bolt, we've told Storm that we'd like the stream of +In the ``count_bolt`` bolt, we've told Storm that we'd like the stream of input tuples to be grouped by the named field ``word``. Storm offers -comprehensive options for `stream groupings -`_, +comprehensive options for +`stream groupings `_, but you will most commonly use a **shuffle** or **fields** grouping: * **Shuffle grouping**: Tuples are randomly distributed across the bolt’s tasks in a way such that each bolt is guaranteed to get an equal number of tuples. + This is the default grouping if no other is specified. * **Fields grouping**: The stream is partitioned by the fields specified in the grouping. For example, if the stream is grouped by the "user-id" field, tuples with the same "user-id" will always go to the same task, but tuples with different "user-id"’s may go to different tasks. There are more options to configure with spouts and bolts, we'd encourage you -to refer to `Storm's Concepts -`_ for more -information. +to refer to our :ref:`topology_dsl` docs or +`Storm's Concepts `_ for +more information. Spouts and Bolts ---------------- @@ -293,6 +140,7 @@ Let's create a spout that emits sentences until the end of time: class SentenceSpout(Spout): + outputs = ['sentence'] def initialize(self, stormconf, context): self.sentences = [ @@ -329,6 +177,7 @@ Now let's create a bolt that takes in sentences, and spits out words: from streamparse.bolt import Bolt class SentenceSplitterBolt(Bolt): + outputs = ['word'] def process(self, tup): sentence = tup.values[0] # extract the sentence @@ -339,7 +188,8 @@ Now let's create a bolt that takes in sentences, and spits out words: self.fail(tup) return - self.emit_many(words) + for word in words: + self.emit([word]) # tuple acknowledgement is handled automatically The bolt implementation is even simpler. We simply override the default @@ -348,10 +198,6 @@ an incoming spout or bolt. You are welcome to do whatever processing you would like in this method and can further emit tuples or not depending on the purpose of your bolt. -In the ``SentenceSplitterBolt`` above, we have decided to use the -``emit_many()`` method instead of ``emit()`` which is a bit more efficient when -sending a larger number of tuples to Storm. - If your ``process()`` method completes without raising an Exception, streamparse will automatically ensure any emits you have are anchored to the current tuple being processed and acknowledged after ``process()`` completes. @@ -433,6 +279,8 @@ option and value as in the following example: $ sparse run -o "topology.tick.tuple.freq.secs=2" ... +.. _remote_deployment: + Remote Deployment ----------------- diff --git a/doc/source/topologies.rst b/doc/source/topologies.rst index 3d77c3a3..024f13e4 100644 --- a/doc/source/topologies.rst +++ b/doc/source/topologies.rst @@ -1,264 +1,95 @@ -Topologies -========== - -Clojure Quick Reference Guide ------------------------------ -Topologies in streamparse are defined using Clojure. Here is a quick overview -so you don't get lost. - -Function definitions - ``(defn fn-name [options] expressions)`` defines a function called - ``fn-name`` that takes ``options`` as an argument and evaluates each of the - ``expressions``, treating the last evaluated expression as the return value - for a function. +.. versionadded:: 3.0.0 -Keyword arguments - In Clojure, keyword arguments are specified using paired-up positional - arguments. Thus ``:p 2`` is the ``p`` keyword set to value ``2``. -List - ``[val1 val2 ... valN]`` defines a list of N values. - -Map - ``{"key-1" val1 "key-2" val2 ... "key-N" valN}`` is a mapping of key-value - pairs. - -Comments - Anything after ``;;`` is a line comment. - -For Python programmers, Clojure can be a little tricky in that whitespace is -not significant, and ``,`` is treated as whitespace. This means ``[val1 val2]`` -and ``[val1, val2]`` are identical lists. Function definitions can similarly -take up multiple lines. +Topologies +========== -.. code-block:: clojure +Storm topologies are described as a Directed Acyclic Graph (DAG) of Storm +components, namely `bolts` and `spouts`. - (defn fn-name [options] - expression1 - expression2 - ;; ... - expressionN - ;; the value of expressionN is the returned value - ) +.. _topology_dsl: -Topology Files --------------- +Topology DSL +------------ -A topology file describes your topology in terms of Directed Acyclic Graph -(DAG) of Storm components, namely `bolts` and `spouts`. It uses the `Clojure -DSL`_ for this, along with some utility functions streamparse provides. +To simplify the process of creating Storm topologies, streamparse features a +Python Topology `DSL `_. +It lets you specify topologies as complex as those you can in `Java `_ +or `Clojure `_, +but in concise, readable Python. Topology files are located in ``topologies`` in your streamparse project folder. There can be any number of topology files for your project in this directory. -* topologies/my-topology.clj -* topologies/my-other-topology.clj -* topologies/my-third-topology.clj - -So on and so forth. - -A sample ``my-topology.clj``, would start off importing the streamparse -Clojure DSL functions. - -.. code-block:: clojure - - (ns my-topology - (:use [streamparse.specs]) - (:gen-class)) - -Notice the ``my-topology`` matches the name of the file. The next line is the -import of the streamparse utility functions. - -You could optionally avoid all of the streamparse-provided helper functions and -import your own functions or the Clojure DSL for Storm directly. - -.. code-block:: clojure - - (ns my-topology - (:use [backtype.storm.clojure]) - (:gen-class)) - -In the next part of the file, we setup a topology definition, also named -``my-topology`` (matching the ``ns`` line and filename). This definition is -actually a Clojure function that takes the topology options as a single map -argument. This function returns a list of 2 maps -- a spout map, and a bolt map. -These two maps define the DAG that is your topology. - -.. code-block:: clojure - - (defn my-topology [options] - [ - ;; spout configuration - {"my-python-spout" (python-spout-spec - ;; topology options passed in - options - ;; python class to run - "spouts.myspout.MySpout" - ;; output specification, what named fields will this spout emit? - ["data"] - ;; configuration parameters, can specify multiple or none at all - ) - } - - - ;; bolt configuration - {"my-python-bolt" (python-bolt-spec - ;; topology options pased in - options - ;; inputs, where does this bolt receive its tuples from? - {"my-python-spout" :shuffle} - ;; python class to run - "bolts.mybolt.MyBolt" - ;; output specification, what named fields will this spout emit? - ["data" "date"] - ;; configuration parameters, can specify multiple or none at all - :p 2 - ) - } - ] - ) - -Shell Spouts and Bolts ----------------------- - -The `Clojure DSL `_ -provides the ``shell-bolt-spec`` and ``shell-spout-spec`` -functions to handle bolts in non-JVM languages. - -The ``shell-spout-spec`` takes at least 2 arguments: - -1. The command line program to run (as a list of arguments) -2. A list of the named fields the spout will output -3. Any optional keyword arguments - -.. code-block:: clojure - - "my-shell-spout" (shell-spout-spec - ;; Command to run - ["python" "spout.py"] - ;; output specification, what named fields will this spout emit? - ["data"] - ;; configuration parameters, can specify multiple or none at all - :p 2 - ) - - -The ``shell-bolt-spec`` takes at least 3 arguments: - -1. A map of the input spouts and their groupings -2. The command line program to run (as a list of arguments) -3. A list of the named fields the spout will output -4. Any optional keyword arguments - -.. code-block:: clojure - - "my-shell-bolt" (shell-bolt-spec - ;; input spouts and their groupings - {"my-shell-spout" :shuffle} - ;; Command to run - ["bash" "mybolt.sh"] - ;; output specification, what named fields will this spout emit? - ["data"] - ;; configuration parameters, can specify multiple or none at all - :p 2 - ) - - -Python Spouts and Bolts ------------------------ - -The example topology above, and the ``sparse quickstart wordcount`` project -utilizes the ``python-spout-spec`` and ``python-bolt-spec`` provided by the -``streamparse.specs`` import statement. - -``(python-spout-spec ...)`` and ``(python-bolt-spec ...)`` are just convenience -functions provided by streamparse for creating topology components. They are -simply wrappers around ``(shell-spout-spec ...)`` and ``(shell-bolt-spec ...)``. +* ``topologies/my_topology.py`` +* ``topologies/my_other_topology.py`` +* ``topologies/my_third_topology.py`` +* ... -The ``python-spout-spec`` takes at least 3 arguments: +A valid :class:`~streamparse.Topology` may only have :class:`~streamparse.Bolt` +and :class:`~streamparse.Spout` attributes. -1. ``options`` - the topology options array passed in -2. The full path to the class to run. ``spouts.myspout.MySpout`` is actually the - ``MySpout`` class in ``src/spouts/myspout.py`` -3. A list of the named fields the spout will output -4. Any optional keyword arguments, such as parallelism ``:p 2`` +Simple Python Example +^^^^^^^^^^^^^^^^^^^^^ -The ``python-bolt-spec`` takes at least 4 arguments: - -1. ``options`` - the topology options array passed in -2. A map of the input spouts and their groupings (See below) -3. The full path to the class to run. ``bolts.mybolt.MyBolt`` is actually the - ``MyBolt`` class in ``src/bolts/mybolt.py`` -4. A list of the named fields the spout will output -5. Any optional keyword arguments, such as parallelism ``:p 2`` - -Parallelism is further discussed in :ref:`parallelism`. +The first step to putting together a topology, is creating the bolts and spouts, +so let's assume we have the following bolt and spout: +.. literalinclude:: ../../examples/redis/src/bolts.py + :language: python + :lines: 1-28 -Groupings -^^^^^^^^^ +.. literalinclude:: ../../examples/redis/src/spouts.py + :language: python -Storm offers comprehensive options for `stream groupings -`_, -but you will most commonly use a **shuffle** or **fields** grouping: +One important thing to note is that we have added an ``outputs`` attribute to +these classes, which specify the names of the output fields that will be +produced on their ``default`` streams. If we wanted to specify multiple +streams, we could do that by specifying a list of :class:`~streamparse.Stream` +objects. -* **Shuffle grouping**: Tuples are randomly distributed across the bolt’s tasks - in a way such that each bolt is guaranteed to get an equal number of tuples. -* **Fields grouping**: The stream is partitioned by the fields specified in the - grouping. For example, if the stream is grouped by the "user-id" field, - tuples with the same "user-id" will always go to the same task, but tuples - with different "user-id"’s may go to different tasks. +Now let's hook up the bolt to read from the spout: +.. literalinclude:: ../../examples/redis/topologies/wordcount_mem.py + :language: python -Streams -^^^^^^^ +.. note:: + Your project's ``src`` directory gets added to ``sys.path`` before your + topology is imported, so you should use absolute imports based on that. -Topologies support multiple streams when routing tuples between components. The -:meth:`~streamparse.storm.component.Component.emit` method takes an optional -`stream` argument to specify the stream ID. For example: +As you can see, :meth:`streamparse.Bolt.spec` and +:meth:`streamparse.Spout.spec` methods allow us to specify information about +the components in your topology and how they connect to each other. Their +respective docstrings outline all of the possible ways they can be used. -.. code-block:: python +Java Components +^^^^^^^^^^^^^^^ - self.emit([term, timestamp, lookup_result], stream='index') - self.emit([term, timestamp, lookup_result], stream='topic') +The topology DSL fully supports JVM-based bolts and spouts via the +:class:`~streamparse.JavaBolt` and :class:`~streamparse.JavaSpout` classes. -The topology definition can include these stream IDs to route between -components, and a component can specify more than one stream. Example with the -`Clojure DSL`_: +Here's an example of how we would use the +`Storm Kafka Spout `_: -.. code-block:: clojure +.. literalinclude:: ../../examples/kafka-jvm/topologies/pixelcount.py + :language: python +One limitation of the Thrift interface we use to send the topology to Storm is +that the constructors for Java components can only be passed basic Python data +types: `bool`, `bytes`, `float`, `int`, and `str`. - "lookup-bolt" (python-bolt-spec - options - {"search-bolt" :shuffle} - "birding.bolt.TwitterLookupBolt" - {"index" ["url" "timestamp" "search_result"] - "topic" ["url" "timestamp" "search_result"]} - :p 2 - ) - "elasticsearch-index-bolt" (python-bolt-spec - options - {["lookup-bolt" "index"] ["url" "timestamp" "search_result"]} - "birding.bolt.ElasticsearchIndexBolt" - [] - :p 1 - ) - "result-topic-bolt" (python-bolt-spec - options - {["lookup-bolt" "index"] ["url" "timestamp" "search_result"] - ["lookup-bolt" "topic"] ["url" "timestamp" "search_result"]} - "birding.bolt.ResultTopicBolt" - [] - :p 1 - ) +Components in Other Languages +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -Storm sets a default stream ID of ``"default"``, as described in its doc on -Streams_: +If you have components that are written in languages other than Java or Python, +you can have those as part of your topology as well—assuming you're using the +corresponding multi-lang library for that language. - Every stream is given an id when declared. Since single-stream spouts and - bolts are so common, ... the stream is given the default id of "default". +To do that you just need to use the :meth:`streamparse.ShellBolt.spec` and +:meth:`streamparse.ShellSpout.spec` methods. They take ``command`` and +``script`` arguments to specify a binary to run and its string-separated +arguments. Running Topologies @@ -270,12 +101,9 @@ What Streamparse Does When you run a topology either locally or by submitting to a cluster, streamparse will -1. Compile your .clj topology file -2. Execute the Clojure code by invoking your topology function, passing it the - ``options`` map -3. Get the DAG defined by the topology and pass it into the Storm Java interop - classes like StormSubmitter and LocalCluster -4. Run/submit your topology +1. Bundle all of your code into a JAR +2. Build a Thrift Topology struct out of your Python topology definition. +3. Pass the Thrift Topology struct to Nimbus on your Storm cluster. If you invoked streamparse with ``sparse run``, your code is executed directly from the ``src/`` directory. @@ -291,16 +119,15 @@ according to your needs. Dealing With Errors ^^^^^^^^^^^^^^^^^^^ -When detecting an error, bolt code can call its -:meth:`~streamparse.storm.bolt.Bolt.fail` method in order to have Storm call -the respective spout's :meth:`~streamparse.storm.spout.Spout.fail` -method. Known error/failure cases result in explicit callbacks to the spout -using this approach. +When detecting an error, bolt code can call its :meth:`~streamparse.Bolt.fail` +method in order to have Storm call the respective spout's +:meth:`~streamparse.Spout.fail` method. Known error/failure cases result in +explicit callbacks to the spout using this approach. Exceptions which propagate without being caught will cause the component to crash. On ``sparse run``, the entire topology will stop execution. On a running cluster (i.e. ``sparse submit``), Storm will auto-restart the crashed component -and the spout will receive a :meth:`~streamparse.storm.spout.Spout.fail` call. +and the spout will receive a :meth:`~streamparse.Spout.fail` call. If the spout's fail handling logic is to hold back the tuple and not re-emit it, then things will keep going. If it re-emits it, then it may crash that @@ -321,7 +148,7 @@ Common approaches are to: Parallelism and Workers ----------------------- -**In general, use the :p "parallelism hint" parameter per spout and bolt in +**In general, use the ``par`` "parallelism hint" parameter per spout and bolt in your configuration to control the number of Python processes per component.** Reference: `Understanding the Parallelism of a Storm Topology `_ @@ -333,10 +160,10 @@ Storm parallelism entities: * A `task` performs the actual data processing. (To simplify, you can think of it as a Python callable.) -Spout and bolt specs take a ``:p`` keyword to provide a parallelism hint to +Spout and bolt specs take a ``par`` keyword to provide a parallelism hint to Storm for the number of executors (threads) to use for the given spout/bolt; -for example, ``:p 2`` is a hint to use two executors. Because streamparse -implements spouts and bolts as independent Python processes, setting ``:p N`` +for example, ``par=2`` is a hint to use two executors. Because streamparse +implements spouts and bolts as independent Python processes, setting ``par=N`` results in N Python processes for the given spout/bolt. Many streamparse applications will need only to set this parallelism hint to @@ -357,9 +184,8 @@ respectively. The ``sparse`` command does not support Storm's rebalancing features; use ``sparse submit -f -p N`` to kill the running topology and redeploy it with N workers. -Note that `Storm's underlying thread implementation -`_, `LMAX Disruptor -`_, is designed with +Note that `the underlying Storm thread implementation `_, +`LMAX Disruptor `_, is designed with high-performance inter-thread messaging as a goal. Rule out Python-level issues when tuning your topology: @@ -367,5 +193,3 @@ when tuning your topology: * serialization/deserialization overhead of more data emitted than you need * slow routines/callables in your code -.. _`Clojure DSL`: http://storm.apache.org/documentation/Clojure-DSL.html -.. _Streams: http://storm.apache.org/documentation/Concepts.html#streams diff --git a/examples/redis/virtualenvs/wordcount_mem.txt b/examples/redis/virtualenvs/wordcount_mem.txt deleted file mode 100644 index 25f3d549..00000000 --- a/examples/redis/virtualenvs/wordcount_mem.txt +++ /dev/null @@ -1 +0,0 @@ -streamparse diff --git a/examples/redis/virtualenvs/wordcount_mem.txt b/examples/redis/virtualenvs/wordcount_mem.txt new file mode 120000 index 00000000..45e326ed --- /dev/null +++ b/examples/redis/virtualenvs/wordcount_mem.txt @@ -0,0 +1 @@ +wordcount_redis.txt \ No newline at end of file diff --git a/streamparse/__init__.py b/streamparse/__init__.py index f7afcba0..9417ff20 100644 --- a/streamparse/__init__.py +++ b/streamparse/__init__.py @@ -12,7 +12,8 @@ storm) from .dsl import Grouping, Stream, Topology from .storm import (BatchingBolt, Bolt, JavaBolt, JavaSpout, ShellBolt, - ShellSpout, Spout, StormHandler, TicklessBatchingBolt) + ShellSpout, Spout, StormHandler, TicklessBatchingBolt, + Tuple) from .version import __version__, VERSION # Enable default NullHandler to prevent "No handlers could be found for logger" @@ -40,6 +41,7 @@ 'Stream', 'TicklessBatchingBolt', 'Topology', + 'Tuple' ] __license__ = """ diff --git a/streamparse/bootstrap/__init__.py b/streamparse/bootstrap/__init__.py index 4222b273..132a54c1 100644 --- a/streamparse/bootstrap/__init__.py +++ b/streamparse/bootstrap/__init__.py @@ -8,10 +8,10 @@ import sys import shutil +import pkg_resources from fabric.colors import green, red, blue from jinja2 import Environment, FileSystemLoader -import pkg_resources _path_prefixes = [] _path_prefix = '' @@ -89,17 +89,18 @@ def quickstart(project_name): with _cd('src'): _mkdir('bolts') with _cd('bolts'): - _cp(_here('project', 'src', 'bolts', '__init__.py'), '__init__.py') - _cp(_here('project', 'src', 'bolts', 'wordcount.py'), 'wordcount.py') + _cp(_here('project', 'src', 'bolts', '__init__.py'), + '__init__.py') + _cp(_here('project', 'src', 'bolts', 'wordcount.py'), + 'wordcount.py') _mkdir('spouts') with _cd('spouts'): - _cp(_here('project', 'src', 'spouts', '__init__.py'), '__init__.py') + _cp(_here('project', 'src', 'spouts', '__init__.py'), + '__init__.py') _cp(_here('project', 'src', 'spouts', 'words.py'), 'words.py') - _cp(_here('project', 'tasks.py'), 'tasks.py') _mkdir('topologies') with _cd('topologies'): - _cp(_here('project', 'topologies', 'wordcount.clj'), - 'wordcount.clj') + _cp(_here('project', 'topologies', 'wordcount.py'), 'wordcount.py') _mkdir('virtualenvs') with _cd('virtualenvs'): _cp(_here('project', 'virtualenvs', 'wordcount.txt'), diff --git a/streamparse/bootstrap/project/src/bolts/wordcount.py b/streamparse/bootstrap/project/src/bolts/wordcount.py index 93b94b58..89073181 100644 --- a/streamparse/bootstrap/project/src/bolts/wordcount.py +++ b/streamparse/bootstrap/project/src/bolts/wordcount.py @@ -1,16 +1,25 @@ -from __future__ import absolute_import, print_function, unicode_literals - +import os from collections import Counter -from streamparse.bolt import Bolt + +from streamparse import Bolt -class WordCounter(Bolt): +class WordCountBolt(Bolt): + outputs = ['word', 'count'] def initialize(self, conf, ctx): - self.counts = Counter() + self.counter = Counter() + self.pid = os.getpid() + self.total = 0 + + def _increment(self, word, inc_by): + self.counter[word] += inc_by + self.total += inc_by def process(self, tup): word = tup.values[0] - self.counts[word] += 1 - self.emit([word, self.counts[word]]) - self.log('%s: %d' % (word, self.counts[word])) + self._increment(word, 10 if word == "dog" else 1) + if self.total % 1000 == 0: + self.logger.info("counted [{:,}] words [pid={}]".format(self.total, + self.pid)) + self.emit([word, self.counter[word]]) diff --git a/streamparse/bootstrap/project/src/spouts/words.py b/streamparse/bootstrap/project/src/spouts/words.py index a27aca67..30235c27 100644 --- a/streamparse/bootstrap/project/src/spouts/words.py +++ b/streamparse/bootstrap/project/src/spouts/words.py @@ -1,13 +1,13 @@ -from __future__ import absolute_import, print_function, unicode_literals +from itertools import cycle + +from streamparse import Spout -import itertools -from streamparse.spout import Spout class WordSpout(Spout): + outputs = ['word'] def initialize(self, stormconf, context): - self.words = itertools.cycle(['dog', 'cat', - 'zebra', 'elephant']) + self.words = cycle(['dog', 'cat', 'zebra', 'elephant']) def next_tuple(self): word = next(self.words) diff --git a/streamparse/bootstrap/project/tasks.py b/streamparse/bootstrap/project/tasks.py deleted file mode 100644 index e78d0bf6..00000000 --- a/streamparse/bootstrap/project/tasks.py +++ /dev/null @@ -1,11 +0,0 @@ -def pre_submit(topology_name, env_name, env_config): - """Override this function to perform custom actions prior to topology - submission. No SSH tunnels will be active when this function is called.""" - pass - - -def post_submit(topo_name, env_name, env_config): - """Override this function to perform custom actions after topology - submission. Note that the SSH tunnel to Nimbus will still be active - when this function is called.""" - pass diff --git a/streamparse/bootstrap/project/topologies/wordcount.clj b/streamparse/bootstrap/project/topologies/wordcount.clj deleted file mode 100644 index 79a985fc..00000000 --- a/streamparse/bootstrap/project/topologies/wordcount.clj +++ /dev/null @@ -1,24 +0,0 @@ -(ns wordcount - (:use [streamparse.specs]) - (:gen-class)) - -(defn wordcount [options] - [ - ;; spout configuration - {"word-spout" (python-spout-spec - options - "spouts.words.WordSpout" - ["word"] - ) - } - ;; bolt configuration - {"count-bolt" (python-bolt-spec - options - {"word-spout" :shuffle} - "bolts.wordcount.WordCounter" - ["word" "count"] - :p 2 - ) - } - ] -) diff --git a/streamparse/bootstrap/project/topologies/wordcount.py b/streamparse/bootstrap/project/topologies/wordcount.py new file mode 100644 index 00000000..a15fcd52 --- /dev/null +++ b/streamparse/bootstrap/project/topologies/wordcount.py @@ -0,0 +1,14 @@ +""" +Word count topology +""" + +from streamparse import Grouping, Topology + +from bolts.wordcount import WordCountBolt +from spouts.words import WordSpout + + +class WordCount(Topology): + word_spout = WordSpout.spec() + count_bolt = WordCountBolt.spec(inputs={word_spout: Grouping.fields('word')}, + par=2) diff --git a/streamparse/cli/submit.py b/streamparse/cli/submit.py index a2484436..ae5c0514 100644 --- a/streamparse/cli/submit.py +++ b/streamparse/cli/submit.py @@ -16,8 +16,7 @@ from ..dsl.component import JavaComponentSpec from ..dsl.topology import Topology, TopologyType from ..thrift import storm_thrift -from ..util import (activate_env, get_config, get_env_config, - get_nimbus_host_port, get_nimbus_client, +from ..util import (activate_env, get_config, get_env_config, get_nimbus_client, get_topology_definition) from .common import (add_ackers, add_debug, add_environment, add_name, add_options, add_par, add_wait, add_workers, @@ -26,8 +25,7 @@ from .kill import _kill_topology from .list import _list_topologies from .update_virtualenv import create_or_update_virtualenvs -from storm_thrift import (ComponentCommon, ComponentObject, GlobalStreamId, - JavaObject, ShellComponent, StreamInfo) +from storm_thrift import ShellComponent THRIFT_CHUNK_SIZE = 307200 @@ -92,7 +90,6 @@ def _get_topology_from_file(topology_file): def _submit_topology(topology_name, topology_class, uploaded_jar, env_config, workers, ackers, nimbus_client, options=None, debug=False): - host, port = get_nimbus_host_port(env_config) storm_options = {'topology.workers': workers, 'topology.acker.executors': ackers, 'topology.debug': debug} diff --git a/streamparse/dsl/component.py b/streamparse/dsl/component.py index 5ac0876e..3995affb 100644 --- a/streamparse/dsl/component.py +++ b/streamparse/dsl/component.py @@ -8,12 +8,12 @@ from copy import deepcopy import simplejson as json -from six import iteritems, string_types +from six import integer_types, iteritems, string_types, text_type from ..thrift import storm_thrift from .stream import Grouping, Stream from storm_thrift import (ComponentCommon, ComponentObject, GlobalStreamId, - JavaObject, ShellComponent, StreamInfo) + JavaObject, JavaObjectArg, ShellComponent, StreamInfo) class ComponentSpec(object): @@ -81,7 +81,8 @@ def _sanitize_inputs(inputs): # direct when given a ComponentSpec. If # GlobalStreamId, we're out of luck. # TODO: Document this. - if input_spec.common.streams['default'].direct: + default_stream = input_spec.common.streams.get('default') + if default_stream is not None and default_stream.direct: grouping = Grouping.DIRECT elif isinstance(input_spec, GlobalStreamId): stream_id = input_spec @@ -178,6 +179,26 @@ def __init__(self, component_cls, name=None, serialized_java=None, raise ValueError('full_class_name is required') if args_list is None: raise TypeError('args_list must not be None') + else: + # Convert arguments to JavaObjectArgs + for i, arg in enumerate(args_list): + if isinstance(arg, bool): + args_list[i] = JavaObjectArg(bool_arg=arg) + elif isinstance(arg, integer_types): + # Just use long all the time since Python 3 doesn't + # distinguish between long and int + args_list[i] = JavaObjectArg(long_arg=arg) + elif isinstance(arg, bytes): + args_list[i] = JavaObjectArg(binary_arg=arg) + elif isinstance(arg, text_type): + args_list[i] = JavaObjectArg(string_arg=arg) + elif isinstance(arg, float): + args_list[i] = JavaObjectArg(double_arg=arg) + else: + raise TypeError('Only basic data types can be specified' + ' as arguments to JavaObject ' + 'constructors. Given: {!r}' + .format(arg)) java_object = JavaObject(full_class_name=full_class_name, args_list=args_list) self.component_object = ComponentObject(java_object=java_object) @@ -194,7 +215,9 @@ def __init__(self, component_cls, name=None, command=None, script=None, if not command: raise ValueError('command is required') if script is None: - raise TypeError('script must not be None') + raise TypeError('script must not be None. If your command does not' + ' take arguments, specify the empty string for ' + 'script.') shell_component = ShellComponent(execution_command=command, script=script) self.component_object = ComponentObject(shell=shell_component) diff --git a/streamparse/dsl/stream.py b/streamparse/dsl/stream.py index 2f2db261..d8e89909 100644 --- a/streamparse/dsl/stream.py +++ b/streamparse/dsl/stream.py @@ -15,6 +15,15 @@ class Stream(storm_thrift.StreamInfo): A Storm output stream """ def __init__(self, fields=None, name='default', direct=False): + """ + :param fields: Field names for this stream. + :type fields: `list` or `tuple` of `str` + :param name: Name of stream. Defaults to ``default``. + :type name: `str` + :param direct: Whether or not this stream is direct. Default is `False`. + See :attr:`~streamparse.dsl.stream.Grouping.DIRECT`. + :type direct: `bool` + """ if fields is None: fields = [] elif isinstance(fields, (list, tuple)): @@ -57,7 +66,9 @@ class Grouping(object): consumer will receive this Tuple. Direct groupings can only be declared on streams that have been declared as direct streams. Tuples emitted to a direct stream must be emitted using the - the `emit_direct` method. + the `direct_task` parameter to the + :meth:`streamparse.Bolt.emit` and + :meth:`streamparse.Spout.emit` methods. :ivar ALL: The stream is replicated across all the Bolt's tasks. Use this grouping with care. :ivar NONE: This grouping specifies that you don't care how the stream is diff --git a/streamparse/storm/__init__.py b/streamparse/storm/__init__.py index 2f7fe0f6..e721125d 100644 --- a/streamparse/storm/__init__.py +++ b/streamparse/storm/__init__.py @@ -2,6 +2,8 @@ Package that adds streamparse-specific addition to pystorm classes """ +from pystorm import Tuple + from .bolt import BatchingBolt, Bolt, JavaBolt, ShellBolt, TicklessBatchingBolt from .component import Component, StormHandler from .spout import JavaSpout, ShellSpout, Spout diff --git a/streamparse/storm/bolt.py b/streamparse/storm/bolt.py index ecbf12a8..0c631ef8 100644 --- a/streamparse/storm/bolt.py +++ b/streamparse/storm/bolt.py @@ -12,6 +12,51 @@ class JavaBolt(Component): @classmethod def spec(cls, name=None, serialized_java=None, full_class_name=None, args_list=None, inputs=None, par=1, config=None, outputs=None): + """Create a :class:`JavaBoltSpec` for a Java Bolt. + + This spec represents this Bolt in a :class:`~streamparse.Topology`. + + You must add the appropriate entries to your classpath by editing your + project's ``project.clj`` file in order for this to work. + + :param name: Name of this Bolt. Defaults to name of + :class:`~streamparse.Topology` attribute this is assigned + to. + :type name: `str` + :param serialized_java: Serialized Java code representing the class. + You must either specify this, or + both ``full_class_name`` and ``args_list``. + :type serialized_java: `bytes` + :param full_class_name: Fully qualified class name (including the + package name) + :type full_class_name: `str` + :param args_list: A list of arguments to be passed to the constructor of + this class. + :type args_list: `list` of basic data types + :param inputs: Streams that feed into this Bolt. + + Two forms of this are acceptable: + + 1. A `dict` mapping from + :class:`~streamparse.dsl.component.ComponentSpec` to + :class:`~streamparse.Grouping`. + 2. A `list` of :class:`~streamparse.Stream` or + :class:`~streamparse.dsl.component.ComponentSpec`. + :param par: Parallelism hint for this Bolt. For Python + Components, this works out to be the number of Python + processes running it in the the topology (across all + machines). See :ref:`parallelism`. + :type par: `int` + :param config: Component-specific config settings to pass to Storm. + :type config: `dict` + :param outputs: Outputs this JavaBolt will produce. Acceptable forms + are: + + 1. A `list` of :class:`~streamparse.Stream` objects + describing the fields output on each stream. + 2. A `list` of `str` representing the fields output on + the ``default`` stream. + """ return JavaBoltSpec(cls, name=name, serialized_java=serialized_java, full_class_name=full_class_name, args_list=args_list, inputs=inputs, par=par, @@ -19,9 +64,51 @@ def spec(cls, name=None, serialized_java=None, full_class_name=None, class ShellBolt(Component): + """A Bolt that is started by running a command with a script argument.""" + @classmethod def spec(cls, name=None, command=None, script=None, inputs=None, par=None, config=None, outputs=None): + """Create a :class:`ShellBoltSpec` for a non-Java, non-Python Bolt. + + If you want to create a spec for a Python Bolt, use + :meth:`~streamparse.dsl.bolt.Bolt.spec`. + + This spec represents this Bolt in a :class:`~streamparse.Topology`. + + :param name: Name of this Bolt. Defaults to name of + :class:`~streamparse.Topology` attribute this is assigned + to. + :type name: `str` + :param command: Path to command the Storm will execute. + :type command: `str` + :param script: Arguments to `command`. Multiple arguments should just + be separated by spaces. + :type script: `str` + :param inputs: Streams that feed into this Bolt. + + Two forms of this are acceptable: + + 1. A `dict` mapping from + :class:`~streamparse.dsl.component.ComponentSpec` to + :class:`~streamparse.Grouping`. + 2. A `list` of :class:`~streamparse.Stream` or + :class:`~streamparse.dsl.component.ComponentSpec`. + :param par: Parallelism hint for this Bolt. For shell + Components, this works out to be the number of running it + in the the topology (across all machines). + See :ref:`parallelism`. + :type par: `int` + :param config: Component-specific config settings to pass to Storm. + :type config: `dict` + :param outputs: Outputs this ShellBolt will produce. Acceptable forms + are: + + 1. A `list` of :class:`~streamparse.Stream` objects + describing the fields output on each stream. + 2. A `list` of `str` representing the fields output on + the ``default`` stream. + """ return ShellBoltSpec(cls, command=command, script=script, name=name, inputs=inputs, par=par, config=config, outputs=outputs) @@ -31,6 +118,45 @@ class Bolt(pystorm.bolt.Bolt, ShellBolt): """pystorm Bolt with streamparse-specific additions""" @classmethod def spec(cls, name=None, inputs=None, par=None, config=None): + """Create a :class:`~ShellBoltSpec` for a Python Bolt. + + This spec represents this Bolt in a :class:`~streamparse.Topology`. + + :param name: Name of this Bolt. Defaults to name of + :class:`~streamparse.Topology` attribute this is assigned + to. + :type name: `str` + :param inputs: Streams that feed into this Bolt. + + Two forms of this are acceptable: + + 1. A `dict` mapping from + :class:`~streamparse.dsl.component.ComponentSpec` to + :class:`~streamparse.Grouping`. + 2. A `list` of :class:`~streamparse.Stream` or + :class:`~streamparse.dsl.component.ComponentSpec`. + :param par: Parallelism hint for this Bolt. For Python + Components, this works out to be the number of Python + processes running it in the the topology (across all + machines). See :ref:`parallelism`. + + .. note:: + This can also be specified as an attribute of your + :class:`~Bolt` subclass. + + :type par: `int` + :param config: Component-specific config settings to pass to Storm. + + .. note:: + This can also be specified as an attribute of your + :class:`~Bolt` subclass. + + :type config: `dict` + + .. note:: + This method does not take a ``outputs`` argument because + ``outputs`` should be an attribute of your :class:`~Bolt` subclass. + """ return ShellBoltSpec(cls, command='streamparse_run', script='{}.{}'.format(cls.__module__, cls.__name__), diff --git a/streamparse/storm/component.py b/streamparse/storm/component.py index 0727e9d5..43260995 100644 --- a/streamparse/storm/component.py +++ b/streamparse/storm/component.py @@ -2,17 +2,21 @@ Module to add streamparse-specific extensions to pystorm Component classes """ import pystorm -from pystorm.component import StormHandler +from pystorm.component import StormHandler # This is used by other code -from ..dsl.component import ComponentSpec class Component(pystorm.component.Component): - """pystorm Component with streamparse-specific additions""" + """pystorm Component with streamparse-specific additions + + :ivar outputs: The outputs + :ivar config: Component-specific config settings to pass to Storm. + """ outputs = None par = 1 config = None @classmethod - def spec(cls, name=None, inputs=None, par=None, config=None): - return ComponentSpec(cls, name=name, inputs=inputs, par=par, - config=config, outputs=cls.outputs) + def spec(cls, *args, **kwargs): + """This method exists only to give a more informative error message.""" + raise TypeError('Specifications should either be bolts or spouts. ' + 'Given: {!r}'.format(cls)) diff --git a/streamparse/storm/spout.py b/streamparse/storm/spout.py index 74d87566..bf3c073a 100644 --- a/streamparse/storm/spout.py +++ b/streamparse/storm/spout.py @@ -12,16 +12,80 @@ class JavaSpout(Component): @classmethod def spec(cls, name=None, serialized_java=None, full_class_name=None, args_list=None, par=1, config=None, outputs=None): + """Create a :class:`JavaSpoutSpec` for a Java Spout. + + This spec represents this Spout in a :class:`~streamparse.Topology`. + + You must add the appropriate entries to your classpath by editing your + project's ``project.clj`` file in order for this to work. + + :param name: Name of this Spout. Defaults to name of + :class:`~streamparse.Topology` attribute this is assigned + to. + :type name: `str` + :param serialized_java: Serialized Java code representing the class. + You must either specify this, or + both ``full_class_name`` and ``args_list``. + :type serialized_java: `bytes` + :param full_class_name: Fully qualified class name (including the + package name) + :type full_class_name: `str` + :param args_list: A list of arguments to be passed to the constructor of + this class. + :type args_list: `list` of basic data types + :param par: Parallelism hint for this Spout. See :ref:`parallelism`. + :type par: `int` + :param config: Component-specific config settings to pass to Storm. + :type config: `dict` + :param outputs: Outputs this JavaSpout will produce. Acceptable forms + are: + + 1. A `list` of :class:`~streamparse.Stream` objects + describing the fields output on each stream. + 2. A `list` of `str` representing the fields output on + the ``default`` stream. + """ return JavaSpoutSpec(cls, name=name, serialized_java=serialized_java, - full_class_name=full_class_name, - args_list=args_list, par=par, - config=config, outputs=outputs) + full_class_name=full_class_name, + args_list=args_list, par=par, + config=config, outputs=outputs) class ShellSpout(Component): @classmethod def spec(cls, name=None, command=None, script=None, par=None, config=None, outputs=None): + """Create a :class:`ShellSpoutSpec` for a non-Java, non-Python Spout. + + If you want to create a spec for a Python Spout, use + :meth:`~streamparse.dsl.bolt.Spout.spec`. + + This spec represents this Spout in a :class:`~streamparse.Topology`. + + :param name: Name of this Spout. Defaults to name of + :class:`~streamparse.Topology` attribute this is assigned + to. + :type name: `str` + :param command: Path to command the Storm will execute. + :type command: `str` + :param script: Arguments to `command`. Multiple arguments should just + be separated by spaces. + :type script: `str` + :param par: Parallelism hint for this Spout. For shell + Components, this works out to be the number of processes + running it in the the topology (across all machines). + See :ref:`parallelism`. + :type par: `int` + :param config: Component-specific config settings to pass to Storm. + :type config: `dict` + :param outputs: Outputs this ShellSpout will produce. Acceptable forms + are: + + 1. A `list` of :class:`~streamparse.Stream` objects + describing the fields output on each stream. + 2. A `list` of `str` representing the fields output on + the ``default`` stream. + """ return ShellSpoutSpec(cls, command=command, script=script, name=name, par=par, config=config, outputs=outputs) @@ -30,6 +94,36 @@ class Spout(pystorm.spout.Spout, ShellSpout): """pystorm Spout with streamparse-specific additions""" @classmethod def spec(cls, name=None, par=None, config=None): + """Create a :class:`~ShellBoltSpec` for a Python Spout. + + This spec represents this Spout in a :class:`~streamparse.Topology`. + + :param name: Name of this Spout. Defaults to name of + :class:`~streamparse.Topology` attribute this is assigned + to. + :type name: `str` + :param par: Parallelism hint for this Spout. For Python + Components, this works out to be the number of Python + processes running it in the the topology (across all + machines). See :ref:`parallelism`. + + .. note:: + This can also be specified as an attribute of your + :class:`~Spout` subclass. + + :type par: `int` + :param config: Component-specific config settings to pass to Storm. + + .. note:: + This can also be specified as an attribute of your + :class:`~Spout` subclass. + + :type config: `dict` + + .. note:: + This method does not take a ``outputs`` argument because + ``outputs`` should be an attribute of your :class:`~Spout` subclass. + """ return ShellSpoutSpec(cls, command='streamparse_run', script='{}.{}'.format(cls.__module__, cls.__name__), diff --git a/test/streamparse/test_dsl.py b/test/streamparse/test_dsl.py index 59d1fb74..10aa68a5 100644 --- a/test/streamparse/test_dsl.py +++ b/test/streamparse/test_dsl.py @@ -6,10 +6,11 @@ import unittest from io import BytesIO -from streamparse.dsl import Grouping, Topology +from streamparse.dsl import Grouping, Stream, Topology from streamparse.storm import (Bolt, Component, JavaBolt, JavaSpout, ShellBolt, ShellSpout, Spout) - +from streamparse.thrift import storm_thrift +from storm_thrift import JavaObjectArg log = logging.getLogger(__name__) @@ -22,6 +23,15 @@ class WordCountBolt(Bolt): outputs = ["word", "count"] +class MultiStreamWordCountBolt(Bolt): + outputs = [Stream(fields=['word', 'count']), + Stream(fields=['all_word_count'], name='sum')] + + +class DatabaseDumperBolt(Bolt): + outputs = [] + + class TopologyTests(unittest.TestCase): def test_basic_spec(self): class WordCount(Topology): @@ -31,8 +41,12 @@ class WordCount(Topology): par=8) self.assertEqual(len(WordCount.specs), 2) - self.assertEqual(list(WordCount.word_bolt.inputs.keys())[0], - WordCount.word_spout['default']) + self.assertEqual(list(WordCount.word_bolt.inputs.keys()), + [WordCount.word_spout['default']]) + self.assertEqual(WordCount.thrift_spouts['word_spout'].common.parallelism_hint, + 2) + self.assertEqual(WordCount.thrift_bolts['word_bolt'].common.parallelism_hint, + 8) self.assertEqual(WordCount.word_bolt.inputs[WordCount.word_spout['default']], Grouping.fields('word')) @@ -43,11 +57,32 @@ class WordCount(Topology): self.assertEqual(len(WordCount.specs), 2) self.assertEqual(len(WordCount.thrift_spouts), 1) self.assertEqual(len(WordCount.thrift_bolts), 1) - self.assertEqual(list(WordCount.word_bolt.inputs.keys())[0], - WordCount.word_spout['default']) + self.assertEqual(list(WordCount.word_bolt.inputs.keys()), + [WordCount.word_spout['default']]) self.assertEqual(WordCount.word_bolt.inputs[WordCount.word_spout['default']], Grouping.SHUFFLE) + def test_multi_stream_bolt(self): + class WordCount(Topology): + word_spout = WordSpout.spec(par=2) + word_bolt = MultiStreamWordCountBolt.spec(inputs=[word_spout], + par=8) + db_dumper_bolt = DatabaseDumperBolt.spec(par=4, + inputs=[word_bolt['sum'], + word_bolt['default']]) + self.assertEqual(len(WordCount.specs), 3) + self.assertEqual(len(WordCount.thrift_spouts), 1) + self.assertEqual(len(WordCount.thrift_bolts), 2) + self.assertEqual(list(WordCount.word_bolt.inputs.keys()), + [WordCount.word_spout['default']]) + self.assertEqual(WordCount.word_bolt.inputs[WordCount.word_spout['default']], + Grouping.SHUFFLE) + db_dumper_bolt_input_set = set(WordCount.db_dumper_bolt.inputs.keys()) + self.assertEqual(len(WordCount.db_dumper_bolt.inputs.keys()), + len(db_dumper_bolt_input_set)) + self.assertEqual(db_dumper_bolt_input_set, + {WordCount.word_bolt['sum'], WordCount.word_bolt['default']}) + def test_long_chain_spec(self): class WordCount(Topology): word_spout = WordSpout.spec() @@ -117,7 +152,15 @@ class WordCount(Topology): word_spout = WordSpout.spec() word_bolt = WordCountBolt.spec(inputs=[]) - def test_no_output_spout(self): + def test_no_outputs_spout_empty(self): + # All spouts must have output fields + class PointlessSpout(Spout): + outputs = [] + with self.assertRaises(ValueError): + class WordCount(Topology): + word_spout = PointlessSpout.spec() + + def test_no_outputs_spout(self): # All spouts must have output fields class PointlessSpout(Spout): outputs = [] @@ -218,3 +261,159 @@ def test_unknown_stream(self): class WordCount(Topology): word_spout = WordSpout.spec() word_bolt = WordCountBolt.spec(inputs=[word_spout['word']]) + + def test_perl_bolt(self): + # Make sure ShellBolt.spec works + class PerlWordCount(Topology): + word_spout = WordSpout.spec(par=2) + word_bolt = ShellBolt.spec(command='perl', script='count_words.pl', + inputs={word_spout: Grouping.fields("word")}, + par=8, outputs=['word', 'count']) + + self.assertEqual(len(PerlWordCount.specs), 2) + self.assertEqual(list(PerlWordCount.word_bolt.inputs.keys()), + [PerlWordCount.word_spout['default']]) + self.assertEqual(PerlWordCount.thrift_spouts['word_spout'].common.parallelism_hint, + 2) + self.assertEqual(PerlWordCount.thrift_bolts['word_bolt'].common.parallelism_hint, + 8) + self.assertEqual(PerlWordCount.word_bolt.inputs[PerlWordCount.word_spout['default']], + Grouping.fields('word')) + + def test_shell_bolt_no_command(self): + # Should raise ValueError if command is None + with self.assertRaises(ValueError): + class PerlWordCount(Topology): + word_spout = WordSpout.spec(par=2) + word_bolt = ShellBolt.spec(script='count_words.pl', + inputs={word_spout: Grouping.fields("word")}, + par=8, outputs=['word', 'count']) + + def test_shell_bolt_no_script(self): + # Should raise TypeError if script is None + with self.assertRaises(TypeError): + class PerlWordCount(Topology): + word_spout = WordSpout.spec(par=2) + word_bolt = ShellBolt.spec(command='perl', + inputs={word_spout: Grouping.fields("word")}, + par=8, outputs=['word', 'count']) + + def test_java_bolt_valid_arg_list(self): + # JavaBolt should work fine when given basic data types + class JavaWordCount(Topology): + word_spout = WordSpout.spec(par=2) + word_bolt = JavaBolt.spec(full_class_name='com.bar.foo.counter.WordCountBolt', + args_list=[u'foo', 1, b'\x09\x10', True, + 3.14159], + inputs={word_spout: Grouping.fields("word")}, + par=8, outputs=['word', 'count']) + java_object = JavaWordCount.thrift_bolts['word_bolt'].bolt_object.java_object + self.assertEqual(java_object.full_class_name, 'com.bar.foo.counter.WordCountBolt') + self.assertEqual(java_object.args_list, + [JavaObjectArg(string_arg=u'foo'), + JavaObjectArg(long_arg=1), + JavaObjectArg(binary_arg=b'\x09\x10'), + JavaObjectArg(bool_arg=True), + JavaObjectArg(double_arg=3.14159)]) + self.assertIsNone(JavaWordCount.thrift_bolts['word_bolt'].bolt_object.serialized_java) + self.assertIsNone(JavaWordCount.thrift_bolts['word_bolt'].bolt_object.shell) + + def test_java_bolt_invalid_arg_list(self): + # JavaBolt should raise TypeError when given something other than basic data types + with self.assertRaises(TypeError): + class JavaWordCount(Topology): + word_spout = WordSpout.spec(par=2) + word_bolt = JavaBolt.spec(full_class_name='com.bar.foo.counter.WordCountBolt', + args_list=[{'foo': 'bar'}, 1], + inputs={word_spout: Grouping.fields("word")}, + par=8, outputs=['word', 'count']) + + def test_java_bolt_valid_serialized_java(self): + # JavaBolt should work fine when given a byte string for serialized_java + serialized_java = b'\x01\x02\x03\x04\x05' + class JavaWordCount(Topology): + word_spout = WordSpout.spec(par=2) + word_bolt = JavaBolt.spec(serialized_java=serialized_java, + inputs={word_spout: Grouping.fields("word")}, + par=8, outputs=['word', 'count']) + self.assertEqual(JavaWordCount.thrift_bolts['word_bolt'].bolt_object.serialized_java, serialized_java) + self.assertIsNone(JavaWordCount.thrift_bolts['word_bolt'].bolt_object.java_object) + self.assertIsNone(JavaWordCount.thrift_bolts['word_bolt'].bolt_object.shell) + + def test_perl_spout(self): + # Make sure ShellBolt.spec works + class PerlWordCount(Topology): + word_spout = ShellSpout.spec(command='perl', + script='count_words.pl', par=2, + outputs=['word']) + word_bolt = WordCountBolt.spec(inputs={word_spout: Grouping.fields("word")}, + par=8) + self.assertEqual(len(PerlWordCount.specs), 2) + self.assertEqual(list(PerlWordCount.word_bolt.inputs.keys()), + [PerlWordCount.word_spout['default']]) + self.assertEqual(PerlWordCount.thrift_spouts['word_spout'].common.parallelism_hint, + 2) + self.assertEqual(PerlWordCount.thrift_bolts['word_bolt'].common.parallelism_hint, + 8) + self.assertEqual(PerlWordCount.word_bolt.inputs[PerlWordCount.word_spout['default']], + Grouping.fields('word')) + + def test_shell_spout_no_command(self): + # Should raise ValueError if command is None + with self.assertRaises(ValueError): + class PerlWordCount(Topology): + word_spout = ShellSpout.spec(script='count_words.pl', par=8, + outputs=['word']) + word_bolt = WordCountBolt.spec(inputs={word_spout: Grouping.fields("word")}, + par=8) + + def test_shell_spout_no_script(self): + # Should raise TypeError if script is None + with self.assertRaises(TypeError): + class PerlWordCount(Topology): + word_spout = ShellSpout.spec(command='perl', par=8, + outputs=['word']) + word_bolt = WordCountBolt.spec(inputs={word_spout: Grouping.fields("word")}, + par=8) + + def test_java_spout_valid_arg_list(self): + # JavaSpout should work fine when given basic data types + class JavaWordCount(Topology): + word_spout = JavaSpout.spec(full_class_name='com.bar.foo.counter.WordSpout', + args_list=[u'foo', 1, b'\x09\x10', True, + 3.14159], + par=8, outputs=['word']) + word_bolt = WordCountBolt.spec(inputs={word_spout: Grouping.fields("word")}, + par=8) + java_object = JavaWordCount.thrift_spouts['word_spout'].spout_object.java_object + self.assertEqual(java_object.full_class_name, 'com.bar.foo.counter.WordSpout') + self.assertEqual(java_object.args_list, + [JavaObjectArg(string_arg=u'foo'), + JavaObjectArg(long_arg=1), + JavaObjectArg(binary_arg=b'\x09\x10'), + JavaObjectArg(bool_arg=True), + JavaObjectArg(double_arg=3.14159)]) + self.assertIsNone(JavaWordCount.thrift_spouts['word_spout'].spout_object.serialized_java) + self.assertIsNone(JavaWordCount.thrift_spouts['word_spout'].spout_object.shell) + + def test_java_spout_invalid_arg_list(self): + # JavaSpout should raise TypeError when given something other than basic data types + with self.assertRaises(TypeError): + class JavaWordCount(Topology): + word_spout = JavaSpout.spec(full_class_name='com.bar.foo.counter.WordSpout', + args_list=[{'foo': 'bar'}, 1], + par=8, outputs=['word']) + word_bolt = WordCountBolt.spec(inputs={word_spout: Grouping.fields("word")}, + par=8) + + def test_java_spout_valid_serialized_java(self): + # JavaSpout should work fine when given a byte string for serialized_java + serialized_java = b'\x01\x02\x03\x04\x05' + class JavaWordCount(Topology): + word_spout = JavaSpout.spec(serialized_java=serialized_java, + par=8, outputs=['word']) + word_bolt = WordCountBolt.spec(inputs={word_spout: Grouping.fields("word")}, + par=8) + self.assertEqual(JavaWordCount.thrift_spouts['word_spout'].spout_object.serialized_java, serialized_java) + self.assertIsNone(JavaWordCount.thrift_spouts['word_spout'].spout_object.java_object) + self.assertIsNone(JavaWordCount.thrift_spouts['word_spout'].spout_object.shell)