Skip to content

subprocessio.py: Apply a native English speaker's guesses to those sq… #18

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 26 additions & 20 deletions subprocessio.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
#!/usr/bin/env python
'''
Module provides a class allowing to wrap communication over subprocess.Popen
input, output, error streams into a meaningfull, non-blocking, concurrent stream
input, output, error streams into a meaningful, non-blocking, concurrent stream
processor exposing the output data as an iterator fitting to be a return value
passed by a WSGI applicaiton to a WSGI server per PEP 3333.
passed by a WSGI application to a WSGI server per PEP 3333.

Copyright (c) 2011 Daniel Dotsenko <[email protected]>

Expand Down Expand Up @@ -61,15 +61,17 @@ def __init__(self, source):

def run(self):
t = self.writeiface
if self.bytes:
os.write(t, self.bytes)
else:
s = self.source
b = s.read(4096)
while b:
os.write(t, b)
try:
if self.bytes:
os.write(t, self.bytes)
else:
s = self.source
b = s.read(4096)
os.close(t)
while b:
os.write(t, b)
b = s.read(4096)
finally:
os.close(t)

@property
def output(self):
Expand Down Expand Up @@ -219,7 +221,7 @@ def reading_paused(self):
@property
def done_reading_event(self):
'''
Done_reding does not mean that the iterator's buffer is empty.
Done_reading does not mean that the iterator's buffer is empty.
Iterator might have done reading from underlying source, but the read
chunks might still be available for serving through .next() method.

Expand All @@ -229,11 +231,11 @@ def done_reading_event(self):
@property
def done_reading(self):
'''
Done_reding does not mean that the iterator's buffer is empty.
Done_reading does not mean that the iterator's buffer is empty.
Iterator might have done reading from underlying source, but the read
chunks might still be available for serving through .next() method.

@return An Bool value.
@return A Bool value.
'''
return self.worker.EOF.is_set()

Expand All @@ -242,15 +244,15 @@ def length(self):
'''
returns int.

This is the lenght of the que of chunks, not the length of
This is the length of the que of chunks, not the length of
the combined contents in those chunks.

__len__() cannot be meaningfully implemented because this
reader is just flying throuh a bottomless pit content and
can only know the lenght of what it already saw.
reader is just flying through a bottomless pit of content and
can only know the length of what it already saw.

If __len__() on WSGI server per PEP 3333 returns a value,
the responce's length will be set to that. In order not to
the response's length will be set to that. In order not to
confuse WSGI PEP3333 servers, we will not implement __len__
at all.
'''
Expand Down Expand Up @@ -285,11 +287,11 @@ class SubprocessIOChunker():
does not block the parallel inpipe reading occurring parallel thread.)

The purpose of the object is to allow us to wrap subprocess interactions into
and interable that can be passed to a WSGI server as the application's return
an iterable that can be passed to a WSGI server as the application's return
value. Because of stream-processing-ability, WSGI does not have to read ALL
of the subprocess's output and buffer it, before handing it to WSGI server for
HTTP response. Instead, the class initializer reads just a bit of the stream
to figure out if error ocurred or likely to occur and if not, just hands the
to figure out if error occurred or likely to occur and if not, just hands the
further iteration over subprocess output to the server for completion of HTTP
response.

Expand Down Expand Up @@ -362,6 +364,7 @@ def __init__(self, cmd, inputstream = None, buffer_size = 65536, chunk_size = 40
self.process = _p
self.output = bg_out
self.error = bg_err
self.inputstream = inputstream

def __iter__(self):
return self
Expand All @@ -388,7 +391,10 @@ def close(self):
self.error.close()
except:
pass
try:
os.close(self.inputstream)
except:
pass

def __del__(self):
self.close()