Skip to content

Commit

Permalink
Branch out to atomic selectors
Browse files Browse the repository at this point in the history
  • Loading branch information
chouzar committed Jun 25, 2024
1 parent aaab9c8 commit 0a88745
Showing 1 changed file with 34 additions and 83 deletions.
117 changes: 34 additions & 83 deletions src/chip.gleam
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

import gleam/dict.{type Dict}
import gleam/erlang/process
import gleam/io
import gleam/list
import gleam/option
import gleam/otp/actor
Expand Down Expand Up @@ -174,10 +173,9 @@ pub fn info(registry: Registry(msg, tag, group)) {

// Server Code ::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::


pub opaque type Message(msg, tag, group) {
Register(Chip(msg, tag, group))
Demonitor(process.ProcessMonitor, process.Pid)
Demonitor(process.ProcessMonitor, Chip(msg, tag, group))
Lookup(process.Subject(Result(process.Subject(msg), Nil)), tag)
Members(process.Subject(List(process.Subject(msg))))
MembersAt(process.Subject(List(process.Subject(msg))), group)
Expand All @@ -193,8 +191,10 @@ pub opaque type Chip(msg, tag, group) {

type State(msg, tag, group) {
State(
// Keeps track of registered pids to understand when to add a new monitor down selector.
registration: Dict(process.Pid, Set(Chip(msg, tag, group))),
// Keeps track of monitors to understand when to de-register a process.
monitoring: Set(process.ProcessMonitor),
// A copy of the actor's internal selector, useful to track new monitor down messages.
selector: process.Selector(Message(msg, tag, group)),
// Store for all registered subjects.
registered: Set(process.Subject(msg)),
// Store for all tagged subjects.
Expand All @@ -205,14 +205,17 @@ type State(msg, tag, group) {
}

fn init() -> actor.InitResult(State(msg, tag, group), Message(msg, tag, group)) {
let selector = process.new_selector()

actor.Ready(
State(
registration: dict.new(),
monitoring: set.new(),
selector: selector,
registered: set.new(),
tagged: dict.new(),
grouped: dict.new(),
),
process.new_selector(),
selector,
)
}

Expand All @@ -222,24 +225,25 @@ fn loop(
) -> actor.Next(Message(msg, tag, group), State(msg, tag, group)) {
case message {
Register(registrant) -> {
let selection = monitor(state.registration, registrant)
let state = monitor(state, registrant)

state
|> into_registration(registrant)
|> into_registered(registrant)
|> into_tagged(registrant)
|> into_grouped(registrant)
|> actor.Continue(selection)
let state =
state
|> monitor(registrant)
|> into_registered(registrant)
|> into_tagged(registrant)
|> into_grouped(registrant)

actor.Continue(state, option.Some(state.selector))
}

Demonitor(monitor, pid) as event -> {
io.debug(event)
// TODO: Instead of checking the pid, check the monitor at the index
// But probably best idea to restore the single selection.
Demonitor(monitor, registrant) as event -> {
process.demonitor_process(monitor)

state
|> remove_registration(pid)
|> remove_from_registered(registrant)
|> remove_from_tagged(registrant)
|> remove_from_grouped(registrant)
|> actor.Continue(option.None)
}

Expand Down Expand Up @@ -268,48 +272,23 @@ fn loop(
}

fn monitor(
registration: Dict(process.Pid, Set(Chip(msg, tag, group))),
state: State(msg, tag, group),
registrant: Chip(msg, tag, group),
) -> option.Option(process.Selector(Message(msg, tag, group))) {
// Check if this process is already registered.
) -> State(msg, tag, group) {
// Monitor the Subject's process.
let pid = process.subject_owner(registrant.subject)
let monitor = process.monitor_process(pid)

case dict.get(registration, pid) {
Ok(_registrants) -> {
// When process is already registered do nothing.
option.None
}

Error(Nil) -> {
// When it is a new process, monitor it.
let monitor = process.monitor_process(pid)
let on_process_down = fn(_: process.ProcessDown) {
// This keeps track of registered subjects and where to look for them on de-registration.
Demonitor(monitor, pid)
}

option.Some(
process.new_selector()
|> process.selecting_process_down(monitor, on_process_down),
)
}
let on_process_down = fn(down: process.ProcessDown) {
// The registrant will let us understand where to remove subjects from
Demonitor(monitor, registrant)
}
}

fn into_registration(
state: State(msg, tag, group),
registrant: Chip(msg, tag, group),
) -> State(msg, tag, group) {
let add_registrant = fn(option) {
case option {
option.Some(registrants) -> registrants |> set.insert(registrant)
option.None -> set.new() |> set.insert(registrant)
}
}
let selector =
state.selector
|> process.selecting_process_down(monitor, on_process_down)

let pid = process.subject_owner(registrant.subject)
let registration = dict.update(state.registration, pid, add_registrant)
State(..state, registration: registration)
State(..state, selector: selector)
}

fn into_registered(
Expand Down Expand Up @@ -361,34 +340,6 @@ fn into_grouped(
}
}

fn remove_registration(
state: State(msg, tag, group),
pid: process.Pid,
) -> State(msg, tag, group) {
let registrants = case dict.get(state.registration, pid) {
Ok(registrants) -> set.to_list(registrants)
// TODO: Impossible state
Error(Nil) -> []
}

list.fold(registrants, state, fn(state, registrant) {
state
|> remove_from_registration(registrant)
|> remove_from_registered(registrant)
|> remove_from_tagged(registrant)
|> remove_from_grouped(registrant)
})
}

fn remove_from_registration(
state: State(msg, tag, group),
registrant: Chip(msg, tag, group),
) -> State(msg, tag, group) {
let pid = process.subject_owner(registrant.subject)
let registration = dict.delete(state.registration, pid)
State(..state, registration: registration)
}

fn remove_from_registered(
state: State(msg, tag, group),
registrant: Chip(msg, tag, group),
Expand Down

0 comments on commit 0a88745

Please sign in to comment.