diff --git a/apps/ports/src/ports/go/server.lfe b/apps/ports/src/ports/go/server.lfe index 5e933b8..7de1221 100644 --- a/apps/ports/src/ports/go/server.lfe +++ b/apps/ports/src/ports/go/server.lfe @@ -1,8 +1,10 @@ (defmodule ports.go.server (behaviour gen_server) + ;; gen_server implementation (export (start_link 0) (stop 0)) + ;; callback implementation (export (code_change 3) (handle_call 3) @@ -10,109 +12,269 @@ (handle_info 2) (init 1) (terminate 2)) + ;; Go server API (export - (pid 0) - (port 0) - (port-info 0) - (send 1))) + (send 1)) + ;; management API + (export + (healthy? 0) + (os-process-alive? 0) + (state 0) + (status 0)) + ;; debug API + (export + (pid 0) + (echo 1))) + +(include-lib "logjam/include/logjam.hrl") + +;;;;;::=--------------------=::;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;;;::=- config functions -=::;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;;;;;::=--------------------=::;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; (defun SERVER () (MODULE)) -(defun DELIMITER () '(10)) +(defun DELIMITER () #"\n") (defun GO-BIN () "go/src/github.com/geomyidia/erlang-port-examples/bin/echo") (defun GO-TIMEOUT () 100) +(defun initial-state () + (let ((log-level (logjam:read-log-level "config/sys.config")) + (node-name (io_lib:format "~s" `(,(erlang:node))))) + `#m(opts () + args () + binary ,(GO-BIN) + pid undefined + os-pid undefined))) + +(defun genserver-opts () '()) +(defun unknown-command (data) + `#(error ,(lists:flatten (++ "Unknown command: " data)))) + ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;; gen_server API ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +(defun start_link () + (log-info "Starting Go server controller ...") + (gen_server:start_link `#(local ,(SERVER)) + (MODULE) + (initial-state) + (genserver-opts))) + (defun stop () (gen_server:call (MODULE) 'stop)) -(defun start_link () - (gen_server:start_link `#(local ,(SERVER)) (MODULE) '() '())) - ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;; Supervisor Callbacks ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -(defun init (_args) +(defun init (state) + (log-debug "Initialising Go server controller ...") (erlang:process_flag 'trap_exit 'true) - `#(ok ,(create-port))) + (let ((start-state (start-exec (self) state))) + (log-debug "Start state: ~p" (list start-state)) + `#(ok ,(maps:merge state start-state)))) (defun handle_call - (('stop _from port) - (stop-port port) - `#(stop normal ok ,port)) - (('port _from port) - `#(reply ,port ,port)) - ((msg from port) - (let ((msg-bin (erlang:term_to_binary msg))) - (logger:debug "Sending data: ~p" `(,msg-bin)) - (! port `#(,(self) #(command (,msg-bin ,(DELIMITER))))) - (let ((data (ports.util:receive-line port (GO-TIMEOUT)))) - (logger:debug "Got data: ~p" `(,data)) - (case data - (#b() - (logger:error "Got empty data from ~p; continuing ..." `(,from)) - `#(reply 'nodata ,port)) - (_ - (logger:debug "Data: ~p" `(,data)) - `#(reply ,(erlang:binary_to_term data '(safe)) ,port))))))) - -(defun handle_cast (_msg state) - `#(noreply ,state)) + ;; Management + ((`#(state) _from state) + `#(reply ,state ,state)) + ((`#(status os-process) _from (= `#m(os-pid ,os-pid) state)) + `#(reply ,(ps-alive? os-pid) ,state)) + ;; Stop + (('stop _from state) + (log-notice "Stopping Go server ...") + `#(stop normal ok ,state)) + ;; Testing / debugging + ((`#(echo ,msg) _from state) + `#(reply ,msg ,state)) + ;; Fall-through + ((message _from state) + `#(reply ,(unknown-command (io_lib:format "~p" `(,message))) ,state))) + +(defun handle_cast + ;; Simple command (new format) + (((= `(#(command ,_)) cmd) (= `#m(os-pid ,os-pid) state)) + (let ((hex-msg (hex-encode cmd))) + (exec:send os-pid hex-msg) + `#(noreply ,state))) + ;; Command with args + (((= `(#(command ,_) #(args ,_)) cmd) (= `#m(os-pid ,os-pid) state)) + (let ((hex-msg (hex-encode cmd))) + (exec:send os-pid hex-msg) + `#(noreply ,state))) + ;; Go server commands - old format, still used + (((= `#(command ,_) cmd) (= `#m(os-pid ,os-pid) state)) + (let ((hex-msg (hex-encode cmd))) + (exec:send os-pid hex-msg) + `#(noreply ,state))) + ((msg state) + (log-warn "Got undexected cast msg: ~p" (list msg)) + `#(noreply ,state))) (defun handle_info - ((`#(EXIT ,_from normal) port) - (logger:debug "The Go echo server is exiting (normal).") - (stop) - `#(noreply ,port)) - ((`#(EXIT ,_from shutdown) port) - (logger:debug "The Go echo server is exiting (shutdown).") - (stop) - `#(noreply ,port)) - ((`#(EXIT ,from ,reason) port) - (logger:error "Go echo process ~p exited! (Reason: ~p)" `(,from ,reason)) - (stop) - `#(noreply ,port)) - ((msg port) - (logger:debug "The Go echo server is handling info of type: ~p." `(,msg)) - `#(noreply ,port))) + ;; Standard-output messages + ((`#(stdout ,_pid ,msg) state) + (io:format "~s" (list (binary_to_list msg))) + `#(noreply ,state)) + ;; Standard-error messages + ((`#(stderr ,_pid ,msg) state) + (io:format "~s" (list (binary_to_list msg))) + `#(noreply ,state)) + ;; Port EOL-based messages + ((`#(,port #(data #(eol ,msg))) state) (when (is_port port)) + (log-info (sanitize-goserver-msg msg)) + `#(noreply ,state)) + ;; Port line-based messages + ((`#(,port #(data #(,line-msg ,msg))) state) (when (is_port port)) + (log-info "Unknown line message:~p~s" `(,line-msg ,(sanitize-goserver-msg msg))) + `#(noreply ,state)) + ;; General port messages + ((`#(,port #(data ,msg)) state) (when (is_port port)) + (log-info "Message from the Go server port:~n~s" `(,(sanitize-goserver-msg msg))) + `#(noreply ,state)) + ;; Exit-handling + ((`#(,port #(exit_status ,exit-status)) state) (when (is_port port)) + (log-warn "~p: exited with status ~p" `(,port ,exit-status)) + `#(noreply ,state)) + ((`#(EXIT ,_from normal) state) + (logger:info "The Go server controller is exiting (normal).") + `#(noreply ,state)) + ((`#(EXIT ,_from shutdown) state) + (logger:info "The Go server controller is exiting (shutdown).") + `#(noreply ,state)) + ((`#(EXIT ,pid ,reason) state) + (log-notice "Process ~p exited! (Reason: ~p)" `(,pid ,reason)) + `#(noreply ,state)) + ;; Fall-through + ((msg state) + (log-debug "Unknwon info: ~p" `(,msg)) + `#(noreply ,state))) (defun terminate - ((normal _port) - (logger:info "The Go echo server is terminating.")) - ((shutdown _port) - (logger:info "The supervisor is shutting down the Go echo server.")) - ((reason _port) - (logger:info "The Go echo server is terminating for reason: ~p." `(,reason)))) + ((reason `#m(os-pid ,os-pid)) + (log-notice "Terminating the Go server controller (~p)..." `(,reason)) + (catch (exec:stop os-pid)) + 'ok)) (defun code_change (_old-version port _extra) `#(ok ,port)) -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;;; API ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;;;;;::=-----------------=::;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;;;::=- Go server API -=::;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;;;;;::=-----------------=::;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(defun send (msg) + (erlang:process_flag 'trap_exit 'true) + (try + (gen_server:cast (MODULE) msg) + (catch + ((tuple 'exit `#(noproc ,_) _stack) + (log-err "Go server not running")) + ((tuple type value stack) + (log-err "Unexpected port error.~ntype: ~p~nvalue: ~p~nstacktrace: ~p" + (list type value stack)))))) + +;;;;;::=-----------------=::;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;;;::=- management API -=::;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;;;;;::=-----------------=::;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(defun healthy? () + (let ((vals (maps:values (status)))) + (not (lists:member 'false vals)))) + +(defun os-process-alive? () + (gen_server:call (SERVER) #(status os-process))) + +(defun state () + (gen_server:call (SERVER) #(state))) + +(defun status () + (gen_server:call (SERVER) #(status all))) + +;;;;;::=-----------------=::;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;;;::=- debugging API -=::;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;;;;;::=-----------------=::;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; (defun pid () - (erlang:whereis (MODULE))) + (erlang:whereis (SERVER))) -(defun port () - (gen_server:call (MODULE) 'port)) +(defun echo (msg) + (gen_server:call (SERVER) `#(echo ,msg))) -(defun port-info () - (erlang:port_info (port))) +;;;;;::=-------------------------=::;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;;;::=- exec (port) functions -=::;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;;;;;::=-------------------------=::;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -(defun send (msg) - (gen_server:call (MODULE) msg)) +(defun start-exec + ((mgr-pid (= `#m(args ,args binary ,bin) state)) + (log-debug "Starting Go server executable ...") + (maps:merge state (run mgr-pid bin args)))) -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;;; Internal Functions ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +(defun run (mgr-pid cmd args) + (run mgr-pid cmd args #m())) + +(defun run (mgr-pid cmd args opts) + (let ((opts (run-opts mgr-pid opts))) + (log-debug "Starting OS process ~s with args ~p and opts ~p" + (list cmd args opts)) + (let ((exec-str (join-cmd-args cmd args))) + (log-debug "Using exec string: ~s" (list exec-str)) + (let ((`#(ok ,pid ,os-pid) (exec:run_link exec-str opts))) + `#m(pid ,pid os-pid ,os-pid))))) + +;;;;;::=-------------------------------=::;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;;;::=- utility / support functions -=::;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;;;;;::=-------------------------------=::;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(defun sanitize-goserver-msg (msg) + (log-debug "Binary message: ~p" `(,msg)) + (clj:-> msg + (binary_to_list) + (string:replace "\\" "") + (string:trim))) + +(defun join-cmd-args (cmd args) + (clj:-> (list cmd) + (lists:append args) + (string:join " "))) + +(defun default-run-opts (mgr-pid) + `(stdin + pty + #(stdout ,mgr-pid) + #(stderr ,mgr-pid) + monitor)) + +(defun run-opts (mgr-pid opts) + (if (maps:is_key 'run-opts opts) + (mref opts 'run-opts) + (default-run-opts mgr-pid))) + +(defun has-str? (string pattern) + (case (string:find string pattern) + ('nomatch 'false) + (_ 'true))) + +(defun ps-alive? (os-pid) + (has-str? (ps-pid os-pid) (integer_to_list os-pid))) + +(defun ps-pid (pid-str) + (os:cmd (++ "ps -o pid -p" pid-str))) -(defun create-port () - (ports.util:create-port - (filename:join (ports.util:priv-dir) (GO-BIN)) '())) +(defun hex-encode (data) + (let* ((bin (erlang:term_to_binary data)) + (delim (DELIMITER)) + (hex-msg (binary ((undermidi.util:bin->hex bin) binary) (delim binary)))) + (log-debug "Created hex msg: ~p" (list hex-msg)) + hex-msg)) -(defun stop-port (port) - (! port `#(,(self) 'close))) +(defun go-log-level (lfe-level) + (case lfe-level + ('all "trace") + ('debug "debug") + ('info "info") + ('notice "warning") + ('warning "warning") + ('error "error") + (_ "fatal"))) diff --git a/rebar.config b/rebar.config index 3aee9c2..23f93fb 100644 --- a/rebar.config +++ b/rebar.config @@ -11,7 +11,8 @@ {deps, [ {lfe, "2.0.1"}, - {logjam, "1.0.5"} + {logjam, "1.0.5"}, + {erlexec, "2.0.0"} ]}. {plugins, [