Skip to content

Commit

Permalink
Converted Go server from port to exec.
Browse files Browse the repository at this point in the history
Part of #3
  • Loading branch information
oubiwann committed Nov 21, 2022
1 parent cefb49a commit 1614064
Show file tree
Hide file tree
Showing 2 changed files with 234 additions and 71 deletions.
302 changes: 232 additions & 70 deletions apps/ports/src/ports/go/server.lfe
Original file line number Diff line number Diff line change
@@ -1,118 +1,280 @@
(defmodule ports.go.server
(behaviour gen_server)
;; gen_server implementation
(export
(start_link 0)
(stop 0))
;; callback implementation
(export
(code_change 3)
(handle_call 3)
(handle_cast 2)
(handle_info 2)
(init 1)
(terminate 2))
;; Go server API
(export
(pid 0)
(port 0)
(port-info 0)
(send 1)))
(send 1))
;; management API
(export
(healthy? 0)
(os-process-alive? 0)
(state 0)
(status 0))
;; debug API
(export
(pid 0)
(echo 1)))

(include-lib "logjam/include/logjam.hrl")

;;;;;::=--------------------=::;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;;::=- config functions -=::;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;;;;::=--------------------=::;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;

(defun SERVER () (MODULE))
(defun DELIMITER () '(10))
(defun DELIMITER () #"\n")
(defun GO-BIN () "go/src/github.com/geomyidia/erlang-port-examples/bin/echo")
(defun GO-TIMEOUT () 100)

(defun initial-state ()
(let ((log-level (logjam:read-log-level "config/sys.config"))
(node-name (io_lib:format "~s" `(,(erlang:node)))))
`#m(opts ()
args ()
binary ,(GO-BIN)
pid undefined
os-pid undefined)))

(defun genserver-opts () '())
(defun unknown-command (data)
`#(error ,(lists:flatten (++ "Unknown command: " data))))

;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;; gen_server API ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;

(defun start_link ()
(log-info "Starting Go server controller ...")
(gen_server:start_link `#(local ,(SERVER))
(MODULE)
(initial-state)
(genserver-opts)))

(defun stop ()
(gen_server:call (MODULE) 'stop))

(defun start_link ()
(gen_server:start_link `#(local ,(SERVER)) (MODULE) '() '()))

;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;; Supervisor Callbacks ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;

(defun init (_args)
(defun init (state)
(log-debug "Initialising Go server controller ...")
(erlang:process_flag 'trap_exit 'true)
`#(ok ,(create-port)))
(let ((start-state (start-exec (self) state)))
(log-debug "Start state: ~p" (list start-state))
`#(ok ,(maps:merge state start-state))))

(defun handle_call
(('stop _from port)
(stop-port port)
`#(stop normal ok ,port))
(('port _from port)
`#(reply ,port ,port))
((msg from port)
(let ((msg-bin (erlang:term_to_binary msg)))
(logger:debug "Sending data: ~p" `(,msg-bin))
(! port `#(,(self) #(command (,msg-bin ,(DELIMITER)))))
(let ((data (ports.util:receive-line port (GO-TIMEOUT))))
(logger:debug "Got data: ~p" `(,data))
(case data
(#b()
(logger:error "Got empty data from ~p; continuing ..." `(,from))
`#(reply 'nodata ,port))
(_
(logger:debug "Data: ~p" `(,data))
`#(reply ,(erlang:binary_to_term data '(safe)) ,port)))))))

(defun handle_cast (_msg state)
`#(noreply ,state))
;; Management
((`#(state) _from state)
`#(reply ,state ,state))
((`#(status os-process) _from (= `#m(os-pid ,os-pid) state))
`#(reply ,(ps-alive? os-pid) ,state))
;; Stop
(('stop _from state)
(log-notice "Stopping Go server ...")
`#(stop normal ok ,state))
;; Testing / debugging
((`#(echo ,msg) _from state)
`#(reply ,msg ,state))
;; Fall-through
((message _from state)
`#(reply ,(unknown-command (io_lib:format "~p" `(,message))) ,state)))

(defun handle_cast
;; Simple command (new format)
(((= `(#(command ,_)) cmd) (= `#m(os-pid ,os-pid) state))
(let ((hex-msg (hex-encode cmd)))
(exec:send os-pid hex-msg)
`#(noreply ,state)))
;; Command with args
(((= `(#(command ,_) #(args ,_)) cmd) (= `#m(os-pid ,os-pid) state))
(let ((hex-msg (hex-encode cmd)))
(exec:send os-pid hex-msg)
`#(noreply ,state)))
;; Go server commands - old format, still used
(((= `#(command ,_) cmd) (= `#m(os-pid ,os-pid) state))
(let ((hex-msg (hex-encode cmd)))
(exec:send os-pid hex-msg)
`#(noreply ,state)))
((msg state)
(log-warn "Got undexected cast msg: ~p" (list msg))
`#(noreply ,state)))

(defun handle_info
((`#(EXIT ,_from normal) port)
(logger:debug "The Go echo server is exiting (normal).")
(stop)
`#(noreply ,port))
((`#(EXIT ,_from shutdown) port)
(logger:debug "The Go echo server is exiting (shutdown).")
(stop)
`#(noreply ,port))
((`#(EXIT ,from ,reason) port)
(logger:error "Go echo process ~p exited! (Reason: ~p)" `(,from ,reason))
(stop)
`#(noreply ,port))
((msg port)
(logger:debug "The Go echo server is handling info of type: ~p." `(,msg))
`#(noreply ,port)))
;; Standard-output messages
((`#(stdout ,_pid ,msg) state)
(io:format "~s" (list (binary_to_list msg)))
`#(noreply ,state))
;; Standard-error messages
((`#(stderr ,_pid ,msg) state)
(io:format "~s" (list (binary_to_list msg)))
`#(noreply ,state))
;; Port EOL-based messages
((`#(,port #(data #(eol ,msg))) state) (when (is_port port))
(log-info (sanitize-goserver-msg msg))
`#(noreply ,state))
;; Port line-based messages
((`#(,port #(data #(,line-msg ,msg))) state) (when (is_port port))
(log-info "Unknown line message:~p~s" `(,line-msg ,(sanitize-goserver-msg msg)))
`#(noreply ,state))
;; General port messages
((`#(,port #(data ,msg)) state) (when (is_port port))
(log-info "Message from the Go server port:~n~s" `(,(sanitize-goserver-msg msg)))
`#(noreply ,state))
;; Exit-handling
((`#(,port #(exit_status ,exit-status)) state) (when (is_port port))
(log-warn "~p: exited with status ~p" `(,port ,exit-status))
`#(noreply ,state))
((`#(EXIT ,_from normal) state)
(logger:info "The Go server controller is exiting (normal).")
`#(noreply ,state))
((`#(EXIT ,_from shutdown) state)
(logger:info "The Go server controller is exiting (shutdown).")
`#(noreply ,state))
((`#(EXIT ,pid ,reason) state)
(log-notice "Process ~p exited! (Reason: ~p)" `(,pid ,reason))
`#(noreply ,state))
;; Fall-through
((msg state)
(log-debug "Unknwon info: ~p" `(,msg))
`#(noreply ,state)))

(defun terminate
((normal _port)
(logger:info "The Go echo server is terminating."))
((shutdown _port)
(logger:info "The supervisor is shutting down the Go echo server."))
((reason _port)
(logger:info "The Go echo server is terminating for reason: ~p." `(,reason))))
((reason `#m(os-pid ,os-pid))
(log-notice "Terminating the Go server controller (~p)..." `(,reason))
(catch (exec:stop os-pid))
'ok))

(defun code_change (_old-version port _extra)
`#(ok ,port))

;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;; API ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;;;;::=-----------------=::;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;;::=- Go server API -=::;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;;;;::=-----------------=::;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;

(defun send (msg)
(erlang:process_flag 'trap_exit 'true)
(try
(gen_server:cast (MODULE) msg)
(catch
((tuple 'exit `#(noproc ,_) _stack)
(log-err "Go server not running"))
((tuple type value stack)
(log-err "Unexpected port error.~ntype: ~p~nvalue: ~p~nstacktrace: ~p"
(list type value stack))))))

;;;;;::=-----------------=::;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;;::=- management API -=::;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;;;;::=-----------------=::;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;

(defun healthy? ()
(let ((vals (maps:values (status))))
(not (lists:member 'false vals))))

(defun os-process-alive? ()
(gen_server:call (SERVER) #(status os-process)))

(defun state ()
(gen_server:call (SERVER) #(state)))

(defun status ()
(gen_server:call (SERVER) #(status all)))

;;;;;::=-----------------=::;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;;::=- debugging API -=::;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;;;;::=-----------------=::;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;

(defun pid ()
(erlang:whereis (MODULE)))
(erlang:whereis (SERVER)))

(defun port ()
(gen_server:call (MODULE) 'port))
(defun echo (msg)
(gen_server:call (SERVER) `#(echo ,msg)))

(defun port-info ()
(erlang:port_info (port)))
;;;;;::=-------------------------=::;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;;::=- exec (port) functions -=::;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;;;;::=-------------------------=::;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;

(defun send (msg)
(gen_server:call (MODULE) msg))
(defun start-exec
((mgr-pid (= `#m(args ,args binary ,bin) state))
(log-debug "Starting Go server executable ...")
(maps:merge state (run mgr-pid bin args))))

;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;; Internal Functions ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defun run (mgr-pid cmd args)
(run mgr-pid cmd args #m()))

(defun run (mgr-pid cmd args opts)
(let ((opts (run-opts mgr-pid opts)))
(log-debug "Starting OS process ~s with args ~p and opts ~p"
(list cmd args opts))
(let ((exec-str (join-cmd-args cmd args)))
(log-debug "Using exec string: ~s" (list exec-str))
(let ((`#(ok ,pid ,os-pid) (exec:run_link exec-str opts)))
`#m(pid ,pid os-pid ,os-pid)))))

;;;;;::=-------------------------------=::;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;;::=- utility / support functions -=::;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;;;;::=-------------------------------=::;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;

(defun sanitize-goserver-msg (msg)
(log-debug "Binary message: ~p" `(,msg))
(clj:-> msg
(binary_to_list)
(string:replace "\\" "")
(string:trim)))

(defun join-cmd-args (cmd args)
(clj:-> (list cmd)
(lists:append args)
(string:join " ")))

(defun default-run-opts (mgr-pid)
`(stdin
pty
#(stdout ,mgr-pid)
#(stderr ,mgr-pid)
monitor))

(defun run-opts (mgr-pid opts)
(if (maps:is_key 'run-opts opts)
(mref opts 'run-opts)
(default-run-opts mgr-pid)))

(defun has-str? (string pattern)
(case (string:find string pattern)
('nomatch 'false)
(_ 'true)))

(defun ps-alive? (os-pid)
(has-str? (ps-pid os-pid) (integer_to_list os-pid)))

(defun ps-pid (pid-str)
(os:cmd (++ "ps -o pid -p" pid-str)))

(defun create-port ()
(ports.util:create-port
(filename:join (ports.util:priv-dir) (GO-BIN)) '()))
(defun hex-encode (data)
(let* ((bin (erlang:term_to_binary data))
(delim (DELIMITER))
(hex-msg (binary ((undermidi.util:bin->hex bin) binary) (delim binary))))
(log-debug "Created hex msg: ~p" (list hex-msg))
hex-msg))

(defun stop-port (port)
(! port `#(,(self) 'close)))
(defun go-log-level (lfe-level)
(case lfe-level
('all "trace")
('debug "debug")
('info "info")
('notice "warning")
('warning "warning")
('error "error")
(_ "fatal")))
3 changes: 2 additions & 1 deletion rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@

{deps, [
{lfe, "2.0.1"},
{logjam, "1.0.5"}
{logjam, "1.0.5"},
{erlexec, "2.0.0"}
]}.

{plugins, [
Expand Down

0 comments on commit 1614064

Please sign in to comment.