Skip to content

Commit

Permalink
Use pipe implemenation for infinite IO
Browse files Browse the repository at this point in the history
  • Loading branch information
ikatz-drizly committed Jun 29, 2021
1 parent d9a6bc8 commit 3ddccac
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 15 deletions.
29 changes: 29 additions & 0 deletions lib/piperator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,33 @@ def self.pipe(enumerable)
def self.wrap(value)
Pipeline.wrap(value)
end

# Coerce any enumerator to be an IO (via pipe).
#
# Pro: infinite length without using infinite memory. Con: unseekable (as is IO::Pipe).
#
# @param enumerator [Enumerator] source of data; infinite sources are OK
# @yieldparam io_r [IO] readable IO
def self.infinite_io(enumerator)
stop = false
io_r, io_w = ::IO.pipe # not the IO from this library

# a thread writes all the data to the pipe. the pipe automatically buffers everything for us
thr = Thread.new do
enumerator.each do |chunk|
break if stop
io_w.write(chunk)
end
ensure
io_w.close
end

yield io_r
ensure
stop = true
io_r.read until io_r.eof? # must drain, or risk closing before writes finish -- broken pipe
io_r.close # must close ???
thr.join # must ensure that all data desired to be written is actually written
end

end
7 changes: 0 additions & 7 deletions lib/piperator/io.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,6 @@ def gets(separator = $INPUT_RECORD_SEPARATOR, _limit = nil)
read_with { @buffer.gets(separator) }
end

# Returns an enumerator of lines in the stream, without reading the entire stream into memory
#
# @return [Enumerator]
def each_line
Enumerator.new { |y| loop { y << gets&.gsub(/#{$INPUT_RECORD_SEPARATOR}$/, '') } }.lazy.take_while(&:itself).each
end

# Flush internal buffer until the last unread byte
def flush
if @buffer.pos == @buffer_read_pos
Expand Down
69 changes: 69 additions & 0 deletions spec/piperator/infinite_io_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
require 'spec_helper'



RSpec.describe "Piperator.infinite_io" do
repeated_string = "foobar\n"
infinite_foobar = Enumerator.new { |y| loop { y << repeated_string } }

describe '#read' do
subject { proc { |&b| Piperator.infinite_io(infinite_foobar) { |x| b.(x) } } }

it 'reads specific number of bytes' do
subject.call do |io|
expect(io.read(4)).to eq('foob')
end
end

it 'buffers rest and returns on next read' do
subject.call do |io|
expect(io.read(2)).to eq('fo')
expect(io.read(2)).to eq('ob')
expect(io.read(2)).to eq('ar')
end
end

it 'does not try to reach the end before working' do
n = 100000
subject.call do |io|
expect(io.read(n * repeated_string.length)).to eq(repeated_string * n)
end
end
end

describe '#gets' do
it 'returns characters until the separator' do
Piperator.infinite_io(infinite_foobar) do |io|
expect(io.gets).to eq("foobar\n")
end
end

it 'responds to gets with nil when enumerable is exhausted' do
n = 2
Piperator.infinite_io((["foobar\n"] * n).each) do |io|
n.times { expect(io.gets).to eq("foobar\n") }
expect(io.gets).to be_nil
end
end
end

describe '#eof?' do
it 'returns eof when enumerable is exhausted' do
n = 2
Piperator.infinite_io((["foobar\n"] * n).each) do |io|
expect(io.eof?).to be_falsey
n.times { expect(io.gets).to eq("foobar\n") }
expect(io.eof?).to be_truthy
end
end
end

describe "#each_line" do
it 'reevaluates line breaks' do
Piperator.infinite_io(["foo\n", "bar\n", "baz\nbmp"].lazy.each) do |io|
expect(io.each_line.map(&:strip)).to eq(["foo", "bar", "baz", "bmp"])
end
end
end

end
8 changes: 0 additions & 8 deletions spec/piperator/io_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,6 @@
end
end

describe '#each_line' do
subject { Piperator::IO.new(["foo\n", "bar\n", "baz\nbmp"].each) }

it 'return enumerated lines' do
expect(subject.each_line.to_a).to eq(["foo", "bar", "baz", "bmp"])
end
end

describe '#flush' do
subject { Piperator::IO.new(['a' * 16 * KILOBYTE].each) }
let(:flush_threshold) { Piperator::IO::FLUSH_THRESHOLD }
Expand Down

0 comments on commit 3ddccac

Please sign in to comment.