Skip to content

Commit

Permalink
ingestion is better as one ongoing thread/async
Browse files Browse the repository at this point in the history
  • Loading branch information
schlichtanders committed Nov 16, 2023
1 parent 31c314a commit a58cc05
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 25 deletions.
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name = "JolinPluto"
uuid = "5b0b4ef8-f4e6-4363-b674-3f031f7b9530"
authors = ["Stephan Sahm <[email protected]> and contributors"]
version = "0.1.52"
version = "0.1.53"

[deps]
AWS = "fbe9abb3-538b-5e4e-ba9e-bc94f4f92ebc"
Expand Down
42 changes: 41 additions & 1 deletion ext/PythonExt.jl
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
module PythonExt

import JolinPluto, PythonCall
import JolinPluto
using PythonCall

function JolinPluto.repeat_queueget(q)
function _repeat_queueget()
Expand All @@ -13,4 +14,43 @@ function JolinPluto.repeat_queueget(q)
JolinPluto.repeat_run(_repeat_queueget)
end

function JolinPluto.start_python_thread(func)
threading = @pyconst(pyimport("threading"))

if !JolinPluto.is_running_in_pluto_process()
# just start a plain thread without cleanup
stop_event = threading.Event()
threading.Thread(target=func, args=(stop_event,)).start()
return stop_event
end

firsttime = Main.PlutoRunner.currently_running_user_requested_run[]
cell_id = Main.PlutoRunner.currently_running_cell_id[]

if firsttime
stop_event_ref = Ref{Any}()
function cleanup_func()
isassigned(stop_event_ref) && stop_event_ref[].set()
nothing
end
Main.PlutoRunner.UseEffectCleanups.register_cleanup(cleanup_func, cell_id)
Main.PlutoRunner.UseEffectCleanups.register_cleanup(cell_id) do
haskey(JolinPluto.pluto_cell_cache, cell_id) && delete!(JolinPluto.pluto_cell_cache, cell_id)
end

function _start_python_thread(func)
# NOTE: no need to do exception handling as channel exceptions are thrown on take!
# if cell is reloaded we stop the underlying process so that the previous Channel
# can be garbage collected
cleanup_func()
stop_event_ref[] = threading.Event()
threading.Thread(target=func, args=(stop_event_ref[],)).start()
return stop_event_ref[]
end
JolinPluto.pluto_cell_cache[cell_id] = _start_python_thread
_start_python_thread(func)
else
JolinPluto.pluto_cell_cache[cell_id](func)
end
end
end
2 changes: 1 addition & 1 deletion src/JolinPluto.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ module JolinPluto
export @get_jwt, @authorize_aws
export get_jwt, authorize_aws
export @repeat_take!, @repeat_at, @repeat_run, @Channel
export repeat_take!, repeat_take, repeat_at, repeat_run, ChannelPluto, repeat_queueget, repeat_put_at, NoPut
export repeat_take!, repeat_take, repeat_at, repeat_run, ChannelPluto, repeat_queueget, ChannelWithRepeatedFill, NoPut, start_python_thread
export @output_below, @clipboard_image_to_clipboard_html
export output_below, clipboard_image_to_clipboard_html, embedLargeHTML
export Setter, @get, @cell_ids_create_wrapper, @cell_ids_push!
Expand Down
58 changes: 36 additions & 22 deletions src/tasks.jl
Original file line number Diff line number Diff line change
Expand Up @@ -373,27 +373,7 @@ function repeat_at(
repeat_run(init, wait_and_repeatme)
end

repeat_at(nexttime; kwargs...) = repeat_at(() -> nothing, nexttime; kwargs...)



struct NoPutType end
const NoPut = NoPutType()

"""
repeat_put_at(channel, (time) -> value, nexttime)
Use this function from other languages which cannot safely run `put!(channel, value)`.
This ensures that the put! is called from julia, circumventing segmentation faults
because of async switch inside another-language-function.
"""
function repeat_put_at(channel, getvalue, nexttime; kwargs...)
repeat_at(nexttime; kwargs...) do time
value = getvalue(time)
value !== NoPut && put!(channel, value)
value
end
end
repeat_at(nexttime; kwargs...) = repeat_at((time) -> nothing, nexttime; kwargs...)


"""
Expand Down Expand Up @@ -495,6 +475,29 @@ function ChannelPluto(args...; kwargs...)
end


struct NoPutType end
const NoPut = NoPutType()

"""
ChannelWithRepeatedFill(get_next_value, 2; sleep_seconds=1.0)
Creates a ChannelPluto which calls `get_next_value` repeatedly, interleaved by calls to
sleep of the given time (defaults to 0 seconds sleep).
"""
function ChannelWithRepeatedFill(get_next_value, args...; sleep_seconds=0.0, kwargs...)
ChannelPluto(args...; kwargs...) do ch
while true
value = get_next_value()
value !== NoPut && put!(ch, value)
sleep(sleep_seconds)
end
end
end


# TODO create Pluto queue analog for Python which automatically kills the previous thread if one was started



"""
repeat_queueget(python_queue_threaded)
Expand All @@ -504,4 +507,15 @@ Outside pluto it will just wait for the first element to arrive and return that.
IMPORTANT: This function is only available if `import PythonCall` was executed before `import JolinPluto`.
"""
function repeat_queueget end
function repeat_queueget end


"""
start_python_thread(func) # `func` gets stop_event as the only argument
Like `threading.Thread(target=func, args=(threading.Event(),)).start()`, but such that the Event
is integrated into Pluto and will be automatically set if the thread should stop itself.
IMPORTANT: This function is only available if `import PythonCall` was executed before `import JolinPluto`.
"""
function start_python_thread end

0 comments on commit a58cc05

Please sign in to comment.