Skip to content
This repository has been archived by the owner on Apr 18, 2018. It is now read-only.

Workers connections / b.s.util [ERROR] Async loop died! #167

Open
Ulitochka opened this issue Dec 7, 2015 · 6 comments
Open

Workers connections / b.s.util [ERROR] Async loop died! #167

Ulitochka opened this issue Dec 7, 2015 · 6 comments

Comments

@Ulitochka
Copy link

Hello.
Thanks for you work. But I have few nuances:

Building topology:
New python executable in /tmp/tmpznmQCP/resources/pyleus_venv/bin/python
Installing setuptools, pip, wheel...done.
Collecting pyleus==0.3.0
Collecting msgpack-python (from pyleus==0.3.0)
Collecting six (from pyleus==0.3.0)
Using cached six-1.10.0-py2.py3-none-any.whl
Collecting virtualenv (from pyleus==0.3.0)
Using cached virtualenv-13.1.2-py2.py3-none-any.whl
Collecting PyYAML (from pyleus==0.3.0)
Installing collected packages: msgpack-python, six, virtualenv, PyYAML, pyleus
Successfully installed PyYAML-3.11 msgpack-python-0.4.6 pyleus-0.3.0 six-1.10.0 virtualenv-13.1.2
Collecting psycopg2 (from -r /home/nutch/topologies/test_db/requirements.txt (line 1))
Installing collected packages: psycopg2
Successfully installed psycopg2-2.6.1
DEBUG:pyleus.cli.build:Assemble component module: test_db.test_spout
DEBUG:pyleus.cli.build:Assemble component module: test_db.test_bolt

Firstly, I have the problem like this: http://stackoverflow.com/questions/30732178/storm-error-async-loop-died
Which is fixed in Storm 0.9.5. But: "you will need to download and extract Storm 0.9.4 from (https://yelp.github.io/pyleus/install.html)"

Secondly, I have problem like this: https://mail-archives.apache.org/mod_mbox/storm-user/201507.mbox/%3CCAF5108hAeJuCe5s7JmvQ-KbHz+Fw2FtnA7nNq4kdXfjwrZSFxQ@mail.gmail.com%3E
The question is: how I can create connection between workers without SSH. Because in tutorial https://yelp.github.io/pyleus I don't see this topic.

@poros
Copy link
Contributor

poros commented Dec 7, 2015

For the first issue, I believe that we could upgrade the Storm version in Pyleus, it should be kinda safe. Pinging @ecanzonieri
In the meanwhile, I guess you need to patch pyleus to use it Storm 0.9.5 yourself, if you want a quick fix.

Regarding the second issue, it seems to me more a Storm problem than a Pyleus one... Is there any option that could help you in Storm and we are missing in Pyleus?

@Ulitochka
Copy link
Author

Thanks for your answer. I have another question related to the integration python and java.
How python code executed on separate machines? The python interpreter runs on each bolt and spout on each machine?

@poros
Copy link
Contributor

poros commented Dec 8, 2015

Yes, pretty much. You can see how the interpreter is called at https://github.com/Yelp/pyleus/blob/develop/topology_builder/src/main/java/com/yelp/pyleus/PythonComponentsFactory.java

The build of the virtualenv is done here https://github.com/Yelp/pyleus/blob/develop/pyleus/cli/build.py instead.

@Ulitochka
Copy link
Author

Thanks for links. I think I found my problem (import external python lybrary). Can you tell me please, this is correct spout code?

from __future__ import absolute_import
import logging
from collections import namedtuple
from random import choice
import time
from pyleus.storm import Spout

import psycopg2
from psycopg2 import connect
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT

log = logging.getLogger('test-spout')
Request = namedtuple("new_token", "id word")

class TestSpout(Spout):
     OPTIONS = ["host", "port", "password", "user", "dbname"]
     OUTPUT_FIELDS = Request

     def initialize(self):
         self.id_number = list()
         self.word = list()

         conn = connect(user=self.options["user"], host=self.options["host"],
         password=self.options["password"], port=self.options["port"])
         dbname = self.options["dbname"]
         cur = conn.cursor()

         cur.execute("SELECT id, word FROM public.test_table")
         massive = cur.fetchall()

         conn.commit()
         conn.close()

         self.id_number.append(massive[0][0])
         self.word.append(massive[0][1])

     def next_tuple(self):
         time.sleep(0.001)
         request = Request(choise(self.id_number), choise(self.word))
         log.debug(request)
         self.emit(request)

if __name__ == '__main__':
   logging.basicConfig(level=logging.DEBUG, filename='/tmp/test_spout.log', filemode='w',)
   TestSpout().run()

@poros
Copy link
Contributor

poros commented Dec 9, 2015

I can't guarantee that the code or the logic is correct (e.g. I see I couple of typos), but I can't see any major problem from a quick look...

@Ulitochka
Copy link
Author

May be the problem is in integration pyleus and psycopg2. This is a library for connection with postgresql db. Anybody test this library with pyleus?

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

No branches or pull requests

2 participants