From 0a8874557f0be5ea15e2cafcf936f2c8b8e9c226 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl?= Date: Tue, 25 Jun 2024 01:58:33 -0700 Subject: [PATCH] Branch out to atomic selectors --- src/chip.gleam | 117 ++++++++++++++----------------------------------- 1 file changed, 34 insertions(+), 83 deletions(-) diff --git a/src/chip.gleam b/src/chip.gleam index 1f09c33..c7387e0 100644 --- a/src/chip.gleam +++ b/src/chip.gleam @@ -4,7 +4,6 @@ import gleam/dict.{type Dict} import gleam/erlang/process -import gleam/io import gleam/list import gleam/option import gleam/otp/actor @@ -174,10 +173,9 @@ pub fn info(registry: Registry(msg, tag, group)) { // Server Code :::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: - pub opaque type Message(msg, tag, group) { Register(Chip(msg, tag, group)) - Demonitor(process.ProcessMonitor, process.Pid) + Demonitor(process.ProcessMonitor, Chip(msg, tag, group)) Lookup(process.Subject(Result(process.Subject(msg), Nil)), tag) Members(process.Subject(List(process.Subject(msg)))) MembersAt(process.Subject(List(process.Subject(msg))), group) @@ -193,8 +191,10 @@ pub opaque type Chip(msg, tag, group) { type State(msg, tag, group) { State( - // Keeps track of registered pids to understand when to add a new monitor down selector. - registration: Dict(process.Pid, Set(Chip(msg, tag, group))), + // Keeps track of monitors to understand when to de-register a process. + monitoring: Set(process.ProcessMonitor), + // A copy of the actor's internal selector, useful to track new monitor down messages. + selector: process.Selector(Message(msg, tag, group)), // Store for all registered subjects. registered: Set(process.Subject(msg)), // Store for all tagged subjects. @@ -205,14 +205,17 @@ type State(msg, tag, group) { } fn init() -> actor.InitResult(State(msg, tag, group), Message(msg, tag, group)) { + let selector = process.new_selector() + actor.Ready( State( - registration: dict.new(), + monitoring: set.new(), + selector: selector, registered: set.new(), tagged: dict.new(), grouped: dict.new(), ), - process.new_selector(), + selector, ) } @@ -222,24 +225,25 @@ fn loop( ) -> actor.Next(Message(msg, tag, group), State(msg, tag, group)) { case message { Register(registrant) -> { - let selection = monitor(state.registration, registrant) + let state = monitor(state, registrant) - state - |> into_registration(registrant) - |> into_registered(registrant) - |> into_tagged(registrant) - |> into_grouped(registrant) - |> actor.Continue(selection) + let state = + state + |> monitor(registrant) + |> into_registered(registrant) + |> into_tagged(registrant) + |> into_grouped(registrant) + + actor.Continue(state, option.Some(state.selector)) } - Demonitor(monitor, pid) as event -> { - io.debug(event) - // TODO: Instead of checking the pid, check the monitor at the index - // But probably best idea to restore the single selection. + Demonitor(monitor, registrant) as event -> { process.demonitor_process(monitor) state - |> remove_registration(pid) + |> remove_from_registered(registrant) + |> remove_from_tagged(registrant) + |> remove_from_grouped(registrant) |> actor.Continue(option.None) } @@ -268,48 +272,23 @@ fn loop( } fn monitor( - registration: Dict(process.Pid, Set(Chip(msg, tag, group))), + state: State(msg, tag, group), registrant: Chip(msg, tag, group), -) -> option.Option(process.Selector(Message(msg, tag, group))) { - // Check if this process is already registered. +) -> State(msg, tag, group) { + // Monitor the Subject's process. let pid = process.subject_owner(registrant.subject) + let monitor = process.monitor_process(pid) - case dict.get(registration, pid) { - Ok(_registrants) -> { - // When process is already registered do nothing. - option.None - } - - Error(Nil) -> { - // When it is a new process, monitor it. - let monitor = process.monitor_process(pid) - let on_process_down = fn(_: process.ProcessDown) { - // This keeps track of registered subjects and where to look for them on de-registration. - Demonitor(monitor, pid) - } - - option.Some( - process.new_selector() - |> process.selecting_process_down(monitor, on_process_down), - ) - } + let on_process_down = fn(down: process.ProcessDown) { + // The registrant will let us understand where to remove subjects from + Demonitor(monitor, registrant) } -} -fn into_registration( - state: State(msg, tag, group), - registrant: Chip(msg, tag, group), -) -> State(msg, tag, group) { - let add_registrant = fn(option) { - case option { - option.Some(registrants) -> registrants |> set.insert(registrant) - option.None -> set.new() |> set.insert(registrant) - } - } + let selector = + state.selector + |> process.selecting_process_down(monitor, on_process_down) - let pid = process.subject_owner(registrant.subject) - let registration = dict.update(state.registration, pid, add_registrant) - State(..state, registration: registration) + State(..state, selector: selector) } fn into_registered( @@ -361,34 +340,6 @@ fn into_grouped( } } -fn remove_registration( - state: State(msg, tag, group), - pid: process.Pid, -) -> State(msg, tag, group) { - let registrants = case dict.get(state.registration, pid) { - Ok(registrants) -> set.to_list(registrants) - // TODO: Impossible state - Error(Nil) -> [] - } - - list.fold(registrants, state, fn(state, registrant) { - state - |> remove_from_registration(registrant) - |> remove_from_registered(registrant) - |> remove_from_tagged(registrant) - |> remove_from_grouped(registrant) - }) -} - -fn remove_from_registration( - state: State(msg, tag, group), - registrant: Chip(msg, tag, group), -) -> State(msg, tag, group) { - let pid = process.subject_owner(registrant.subject) - let registration = dict.delete(state.registration, pid) - State(..state, registration: registration) -} - fn remove_from_registered( state: State(msg, tag, group), registrant: Chip(msg, tag, group),