diff --git a/Project.toml b/Project.toml index be7ad4d..53f6419 100644 --- a/Project.toml +++ b/Project.toml @@ -1,7 +1,7 @@ name = "JolinPluto" uuid = "5b0b4ef8-f4e6-4363-b674-3f031f7b9530" authors = ["Stephan Sahm and contributors"] -version = "0.1.52" +version = "0.1.53" [deps] AWS = "fbe9abb3-538b-5e4e-ba9e-bc94f4f92ebc" diff --git a/ext/PythonExt.jl b/ext/PythonExt.jl index 5fe7cc0..31034a0 100644 --- a/ext/PythonExt.jl +++ b/ext/PythonExt.jl @@ -1,6 +1,7 @@ module PythonExt -import JolinPluto, PythonCall +import JolinPluto +using PythonCall function JolinPluto.repeat_queueget(q) function _repeat_queueget() @@ -13,4 +14,43 @@ function JolinPluto.repeat_queueget(q) JolinPluto.repeat_run(_repeat_queueget) end +function JolinPluto.start_python_thread(func) + threading = @pyconst(pyimport("threading")) + + if !JolinPluto.is_running_in_pluto_process() + # just start a plain thread without cleanup + stop_event = threading.Event() + threading.Thread(target=func, args=(stop_event,)).start() + return stop_event + end + + firsttime = Main.PlutoRunner.currently_running_user_requested_run[] + cell_id = Main.PlutoRunner.currently_running_cell_id[] + + if firsttime + stop_event_ref = Ref{Any}() + function cleanup_func() + isassigned(stop_event_ref) && stop_event_ref[].set() + nothing + end + Main.PlutoRunner.UseEffectCleanups.register_cleanup(cleanup_func, cell_id) + Main.PlutoRunner.UseEffectCleanups.register_cleanup(cell_id) do + haskey(JolinPluto.pluto_cell_cache, cell_id) && delete!(JolinPluto.pluto_cell_cache, cell_id) + end + + function _start_python_thread(func) + # NOTE: no need to do exception handling as channel exceptions are thrown on take! + # if cell is reloaded we stop the underlying process so that the previous Channel + # can be garbage collected + cleanup_func() + stop_event_ref[] = threading.Event() + threading.Thread(target=func, args=(stop_event_ref[],)).start() + return stop_event_ref[] + end + JolinPluto.pluto_cell_cache[cell_id] = _start_python_thread + _start_python_thread(func) + else + JolinPluto.pluto_cell_cache[cell_id](func) + end +end end \ No newline at end of file diff --git a/src/JolinPluto.jl b/src/JolinPluto.jl index baf5bbc..a23ac9c 100644 --- a/src/JolinPluto.jl +++ b/src/JolinPluto.jl @@ -4,7 +4,7 @@ module JolinPluto export @get_jwt, @authorize_aws export get_jwt, authorize_aws export @repeat_take!, @repeat_at, @repeat_run, @Channel -export repeat_take!, repeat_take, repeat_at, repeat_run, ChannelPluto, repeat_queueget, repeat_put_at, NoPut +export repeat_take!, repeat_take, repeat_at, repeat_run, ChannelPluto, repeat_queueget, ChannelWithRepeatedFill, NoPut, start_python_thread export @output_below, @clipboard_image_to_clipboard_html export output_below, clipboard_image_to_clipboard_html, embedLargeHTML export Setter, @get, @cell_ids_create_wrapper, @cell_ids_push! diff --git a/src/tasks.jl b/src/tasks.jl index 0096f58..5d69d07 100644 --- a/src/tasks.jl +++ b/src/tasks.jl @@ -373,27 +373,7 @@ function repeat_at( repeat_run(init, wait_and_repeatme) end -repeat_at(nexttime; kwargs...) = repeat_at(() -> nothing, nexttime; kwargs...) - - - -struct NoPutType end -const NoPut = NoPutType() - -""" - repeat_put_at(channel, (time) -> value, nexttime) - -Use this function from other languages which cannot safely run `put!(channel, value)`. -This ensures that the put! is called from julia, circumventing segmentation faults -because of async switch inside another-language-function. -""" -function repeat_put_at(channel, getvalue, nexttime; kwargs...) - repeat_at(nexttime; kwargs...) do time - value = getvalue(time) - value !== NoPut && put!(channel, value) - value - end -end +repeat_at(nexttime; kwargs...) = repeat_at((time) -> nothing, nexttime; kwargs...) """ @@ -495,6 +475,29 @@ function ChannelPluto(args...; kwargs...) end +struct NoPutType end +const NoPut = NoPutType() + +""" + ChannelWithRepeatedFill(get_next_value, 2; sleep_seconds=1.0) + +Creates a ChannelPluto which calls `get_next_value` repeatedly, interleaved by calls to +sleep of the given time (defaults to 0 seconds sleep). +""" +function ChannelWithRepeatedFill(get_next_value, args...; sleep_seconds=0.0, kwargs...) + ChannelPluto(args...; kwargs...) do ch + while true + value = get_next_value() + value !== NoPut && put!(ch, value) + sleep(sleep_seconds) + end + end +end + + +# TODO create Pluto queue analog for Python which automatically kills the previous thread if one was started + + """ repeat_queueget(python_queue_threaded) @@ -504,4 +507,15 @@ Outside pluto it will just wait for the first element to arrive and return that. IMPORTANT: This function is only available if `import PythonCall` was executed before `import JolinPluto`. """ -function repeat_queueget end \ No newline at end of file +function repeat_queueget end + + +""" + start_python_thread(func) # `func` gets stop_event as the only argument + +Like `threading.Thread(target=func, args=(threading.Event(),)).start()`, but such that the Event +is integrated into Pluto and will be automatically set if the thread should stop itself. + +IMPORTANT: This function is only available if `import PythonCall` was executed before `import JolinPluto`. +""" +function start_python_thread end \ No newline at end of file