Skip to content

Commit

Permalink
wait for stream to finish flush before returning from close
Browse files Browse the repository at this point in the history
  • Loading branch information
vtjnash committed Aug 1, 2016
1 parent e78bf34 commit c4b1ed6
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 20 deletions.
8 changes: 6 additions & 2 deletions base/process.jl
Original file line number Diff line number Diff line change
Expand Up @@ -336,15 +336,19 @@ function uv_return_spawn(p::Ptr{Void}, exit_status::Int64, termsignal::Int32)
proc = unsafe_pointer_to_objref(data)::Process
proc.exitcode = exit_status
proc.termsignal = termsignal
if isa(proc.exitcb, Function) proc.exitcb(proc, exit_status, termsignal) end
if isa(proc.exitcb, Function)
proc.exitcb(proc, exit_status, termsignal)
end
ccall(:jl_close_uv, Void, (Ptr{Void},), proc.handle)
notify(proc.exitnotify)
nothing
end

function _uv_hook_close(proc::Process)
proc.handle = C_NULL
if isa(proc.closecb, Function) proc.closecb(proc) end
if isa(proc.closecb, Function)
proc.closecb(proc)
end
notify(proc.closenotify)
end

Expand Down
10 changes: 1 addition & 9 deletions base/socket.jl
Original file line number Diff line number Diff line change
Expand Up @@ -365,19 +365,11 @@ function UDPSocket()
throw(UVError("failed to create udp socket",err))
end
this.status = StatusInit
this
return this
end

show(io::IO, stream::UDPSocket) = print(io, typeof(stream), "(", uv_status_string(stream), ")")

function uvfinalize(uv::Union{TTY,PipeEndpoint,PipeServer,TCPServer,TCPSocket,UDPSocket})
if (uv.status != StatusUninit && uv.status != StatusInit)
close(uv)
end
disassociate_julia_struct(uv)
uv.handle = C_NULL
end

function _uv_hook_close(sock::UDPSocket)
sock.handle = C_NULL
sock.status = StatusClosed
Expand Down
31 changes: 26 additions & 5 deletions base/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -322,9 +322,26 @@ function wait_close(x::Union{LibuvStream, LibuvServer})
end

function close(stream::Union{LibuvStream, LibuvServer})
if isopen(stream) && stream.status != StatusClosing
ccall(:jl_close_uv,Void, (Ptr{Void},), stream.handle)
stream.status = StatusClosing
if isopen(stream)
if stream.status != StatusClosing
ccall(:jl_close_uv, Void, (Ptr{Void},), stream.handle)
stream.status = StatusClosing
end
if uv_handle_data(stream) != C_NULL
stream_wait(stream, stream.closenotify)
end
end
nothing
end

function uvfinalize(uv::Union{LibuvStream, LibuvServer})
if uv.handle != C_NULL
disassociate_julia_struct(uv.handle) # not going to call the usual close hooks
if uv.status != StatusUninit && uv.status != StatusInit
close(uv)
uv.handle = C_NULL
uv.status = StatusClosed
end
end
nothing
end
Expand Down Expand Up @@ -472,8 +489,10 @@ function uv_readcb(handle::Ptr{Void}, nread::Cssize_t, buf::Ptr{Void})
stream.status = StatusEOF # libuv called stop_reading already
notify(stream.readnotify)
notify(stream.closenotify)
else
close(stream)
elseif stream.status != StatusClosing
# begin shutdown of the stream
ccall(:jl_close_uv, Void, (Ptr{Void},), stream.handle)
stream.status = StatusClosing
end
else
# This is a fatal connection error. Shutdown requests as per the usual
Expand Down Expand Up @@ -1019,6 +1038,8 @@ function close(s::BufferStream)
notify(s.close_c; all=true)
nothing
end
uvfinalize(s::BufferStream) = nothing

read(s::BufferStream, ::Type{UInt8}) = (wait_readnb(s, 1); read(s.buffer, UInt8))
unsafe_read(s::BufferStream, a::Ptr{UInt8}, nb::UInt) = (wait_readnb(s, Int(nb)); unsafe_read(s.buffer, a, nb))
nb_available(s::BufferStream) = nb_available(s.buffer)
Expand Down
2 changes: 1 addition & 1 deletion src/jl_uv.c
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ JL_DLLEXPORT void jl_close_uv(uv_handle_t *handle)

JL_DLLEXPORT void jl_forceclose_uv(uv_handle_t *handle)
{
uv_close(handle,&jl_uv_closeHandle);
uv_close(handle, &jl_uv_closeHandle);
}

JL_DLLEXPORT void jl_uv_associate_julia_struct(uv_handle_t *handle,
Expand Down
10 changes: 7 additions & 3 deletions test/spawn.jl
Original file line number Diff line number Diff line change
Expand Up @@ -266,9 +266,9 @@ let bad = "bad\0name"
end

# issue #12829
let out = Pipe(), echo = `$exename --startup-file=no -e 'print(STDOUT, " 1\t", readstring(STDIN))'`, ready = Condition()
let out = Pipe(), echo = `$exename --startup-file=no -e 'print(STDOUT, " 1\t", readstring(STDIN))'`, ready = Condition(), t
@test_throws ArgumentError write(out, "not open error")
@async begin # spawn writer task
t = @async begin # spawn writer task
open(echo, "w", out) do in1
open(echo, "w", out) do in2
notify(ready)
Expand All @@ -283,10 +283,13 @@ let out = Pipe(), echo = `$exename --startup-file=no -e 'print(STDOUT, " 1\t", r
@test isreadable(out)
@test iswritable(out)
close(out.in)
@test !isopen(out.in)
is_windows() || @test !isopen(out.out) # it takes longer to propagate EOF through the Windows event system
@test_throws ArgumentError write(out, "now closed error")
@test isreadable(out)
@test !iswritable(out)
@test isopen(out)
is_windows() && Base.process_events(false) # should be enough steps to fully propagate EOF now
@test !isopen(out)
end
wait(ready) # wait for writer task to be ready before using `out`
@test nb_available(out) == 0
Expand All @@ -309,6 +312,7 @@ let out = Pipe(), echo = `$exename --startup-file=no -e 'print(STDOUT, " 1\t", r
@test isempty(read(out))
@test eof(out)
@test desc == "Pipe(open => active, 0 bytes waiting)"
wait(t)
end

# issue #8529
Expand Down

0 comments on commit c4b1ed6

Please sign in to comment.