diff --git a/mpcompat/mp-compat-sbcl.lisp b/mpcompat/mp-compat-sbcl.lisp index c6682e9b..863aa47a 100644 --- a/mpcompat/mp-compat-sbcl.lisp +++ b/mpcompat/mp-compat-sbcl.lisp @@ -188,7 +188,10 @@ A null timeout means wait forever." (defun mailbox-empty? (mbox) "Check if the Lisp mailbox is empty. Return generalized T/F." (sb-concurrency:mailbox-empty-p mbox)) - + +(defun mailbox-empty-p (mbox) + (mailbox-empty? mbox)) + ;; -------------------------------------------------------------------------- diff --git a/useful-macros/com.ral.useful-macros.asd b/useful-macros/com.ral.useful-macros.asd index 5b93d2d5..2ce49f48 100644 --- a/useful-macros/com.ral.useful-macros.asd +++ b/useful-macros/com.ral.useful-macros.asd @@ -116,7 +116,7 @@ THE SOFTWARE. #+:LISPWORKS (:file "safe-streams") #+:LISPWORKS (:file "safe-read-patch") (:file "safe-read-from-string") - #+:LISPWORKS (:file "objc") + #+(AND :LISPWORKS :MACOSX) (:file "objc") #+:LISPWORKS (:file "my-complete-symbol") ;; fix problem in LW for hierarchical package support diff --git a/useful-macros/packages.lisp b/useful-macros/packages.lisp index b18e1734..68f5ed1d 100644 --- a/useful-macros/packages.lisp +++ b/useful-macros/packages.lisp @@ -739,8 +739,8 @@ THE SOFTWARE. #:merge-plist #:string-interp - #:st-to-objc - #:objc-invoke-st + #+(AND :LISPWORKS :MACOSX) #:st-to-objc + #+(AND :LISPWORKS :MACOSX) #:objc-invoke-st #:with-unique-names #:rebinding diff --git a/xTActors/Examples/simple.lisp b/xTActors/Examples/simple.lisp new file mode 100644 index 00000000..8a4bfdc5 --- /dev/null +++ b/xTActors/Examples/simple.lisp @@ -0,0 +1,26 @@ +;; Very simple example on how to use actors + +(defpackage #:simple + (:use :common-lisp) + (:export + #:echo + #:receiver + #:run + )) + +(in-package #:simple) + + +(defparameter receiver + (actors:create + (lambda (msg) + (print (format nil "~a: ~a~%" :receiver msg))))) + +(defparameter echo + (actors:create + (lambda (msg) + (print (format nil "~a: ~a~%" :echo msg)) + (actors:send receiver msg)))) + +(defun run () + (actors:send echo :hello)) diff --git a/xTActors/Examples/stresstest.lisp b/xTActors/Examples/stresstest.lisp new file mode 100644 index 00000000..66ad8a96 --- /dev/null +++ b/xTActors/Examples/stresstest.lisp @@ -0,0 +1,34 @@ +;; Computationally heavy usage of actors. All CPUs are used. + +(defpackage #:stresstest + (:use :common-lisp) + (:export + #:echo + #:receiver + #:run + )) + +(in-package #:stresstest) + +(defparameter *large-number* 1000000000) + +(defparameter receiver + (actors:create + (lambda (idx msg) + (format t "idx: ~a, msg: ~a~%" idx msg)))) + +(defun create-worker () + (actors:create + (lambda (idx cnt) + (let ((answer (loop for i from 1 to cnt count (oddp i)))) + (actors:send receiver idx answer))))) + +(defparameter main + (actors:create + (lambda (idx cnt) + (let ((worker (create-worker))) + (actors:send worker idx cnt))))) + +(defun run () + (dotimes (n 20) + (actors:send main n *large-number*))) diff --git a/xTActors/actors-base/macros.lisp b/xTActors/actors-base/macros.lisp index 1c96501a..7bd541be 100755 --- a/xTActors/actors-base/macros.lisp +++ b/xTActors/actors-base/macros.lisp @@ -232,3 +232,11 @@ (editor:setup-indent "α" 1) (editor:indent-like "β" 'destructuring-bind)) +;; -------------------------------------------------- + +(defmacro yield (&body body) + ;; exit Actor to allow concurrent actions, then resume + ;; forces our continuation to the back of the event queue. + `(β _ + (send β) + ,@body)) diff --git a/xTActors/actors-base/packages.lisp b/xTActors/actors-base/packages.lisp index ac3ab82f..42d76afd 100644 --- a/xTActors/actors-base/packages.lisp +++ b/xTActors/actors-base/packages.lisp @@ -264,6 +264,7 @@ THE SOFTWARE. #:splay #:watchdog-timer #:safe-serializer + #:yield )) #+(OR :ALLEGRO :CCL) diff --git a/xTActors/actors-extra/transactional-db.lisp b/xTActors/actors-extra/transactional-db.lisp index 25814e58..2615754e 100644 --- a/xTActors/actors-extra/transactional-db.lisp +++ b/xTActors/actors-extra/transactional-db.lisp @@ -93,13 +93,6 @@ (defpackage com.ral.actors.kv-database (:use #:cl :com.ral.actors) - (:local-nicknames - (#:um #:com.ral.useful-macros) - (#:uuid #:com.ral.uuid) - (#:loenc #:com.ral.lisp-object-encoder) - (#:self-sync #:com.ral.self-sync) - (#:sets #:com.ral.rb-trees.sets) - (#:maps #:com.ral.rb-trees.maps)) (:export #:kvdb )) @@ -116,7 +109,7 @@ ;; ------------------- ;; commit after update - ((cust :commit old-db new-db retry) + (( (cust . retry) :commit old-db new-db) (cond ((eql old-db db) ;; make sure we have correct version (cond ((eql new-db db) ;; no real change @@ -303,7 +296,7 @@ (defun add-rec (cust key val) (loenc:encode (list key val)) ;; this will barf if either key or val is non-externalizable (with-db db - (send dbmgr cust :commit db (maps:add db key val) self) + (send dbmgr `(,cust . ,self) :commit db (maps:add db key val)) )) (defun remove-rec (cust key) @@ -312,7 +305,7 @@ (new-db (if (eql val self) db (maps:remove db key)))) - (send dbmgr cust :commit db new-db self) + (send dbmgr `(,cust . ,self) :commit db new-db) ))) (defun lookup (cust key &optional default) @@ -345,7 +338,7 @@ ((cust :req) (repeat-send dbmgr)) - ((cust :commit old-db new-db retry) + (( (cust . retry) :commit old-db new-db) (repeat-send dbmgr)) ))) diff --git a/xTActors/actors-machine/actors-machine.lisp b/xTActors/actors-machine/actors-machine.lisp index df337062..15bbfe01 100644 --- a/xTActors/actors-machine/actors-machine.lisp +++ b/xTActors/actors-machine/actors-machine.lisp @@ -129,8 +129,8 @@ (in-package "actors-machine") (defvar *mcpu-istream*) -(defvar *ctxt*) -(defvar *ac-machine* nil) +(defvar *self-ctxt*) +(defvar *ac-machine* nil) (defstruct (am-actor (:include actor))) @@ -146,7 +146,7 @@ (when (sys:compare-and-swap *ac-machine* nil t) ;; Start the A-Machine, if not already running, or about to run. (setf *mcpu-istream* (mp:make-mailbox) - *ac-machine* (mp:process-run-function "ActorMachine" () #'am-cpu) + *ac-machine* (mp:process-run-function "Actor Machine" () #'am-cpu) *send-message* #'running-send-message)) (apply #'send-message target msg)) @@ -177,12 +177,18 @@ (defmacro instr* (&rest op) `(apply #'instr ,@op)) +#| (defun contin (ctxt cont &rest args) (instr* ctxt cont :cont args)) +|# + +(defun contin (ctxt cont &rest args) + (send* cont ctxt args)) (defmacro contin* (ctxt cont &rest args) `(apply #'contin ,ctxt ,cont ,@args)) + (defun #1=am-cpu () (loop (with-simple-restart (abort "Do next A-Machine instruction") @@ -194,56 +200,52 @@ (contin ctxt cont)) ((ctxt cont :become new-beh-fn) - (let ((new-actor (funcall new-beh-fn nil :get-am-actor))) + (let ((new-actor (funcall new-beh-fn sink :get-am-actor))) (setf (ctxt-pend-become ctxt) (am-actor-beh new-actor)) (contin ctxt cont))) ((ctxt cont :create new-beh-fn) - (let ((new-actor (funcall new-beh-fn nil :get-am-actor))) + (let ((new-actor (funcall new-beh-fn sink :get-am-actor))) (contin ctxt cont new-actor))) ((ctxt cont :oper fn . args) (contin* ctxt cont (multiple-value-list (apply fn args)))) ((ctxt cont :cont . msg) + ;; interesting - anti-blocking tactic + ;; (this is an A-Machine emulator, not a speed contest.) (send* cont ctxt msg)) ((ctxt :commit) - (let* ((old-beh (ctxt-original-beh ctxt)) - (new-beh (ctxt-pend-become ctxt)) - (actor (ctxt-actor ctxt)) - (cur-beh (am-actor-beh actor)) - (msgs (ctxt-pend-send ctxt))) + (let ((actor (ctxt-actor ctxt)) + (old-beh (ctxt-original-beh ctxt)) + (new-beh (ctxt-pend-become ctxt))) - (cond ((eql new-beh old-beh) - ;; no BECOME performed - (send-messages msgs)) - - ((eql cur-beh old-beh) - ;; BECOME and nobody else got there first - (setf (am-actor-beh actor) new-beh) - (send-messages msgs)) + (cond ((or (null new-beh) + (sys:compare-and-swap (am-actor-beh actor) old-beh new-beh)) + ;; no BECOME performed, or else we got here first + (send-messages (ctxt-pend-send ctxt))) (t ;; must retry - (setf (ctxt-original-beh ctxt) cur-beh - (ctxt-pend-become ctxt) cur-beh - (ctxt-pend-send ctxt) nil) - (let ((msg (ctxt-original-msg ctxt))) - (contin* ctxt cur-beh msg))) - ))) + (let ((cur-beh (am-actor-beh actor))) + (setf (ctxt-original-beh ctxt) cur-beh + (ctxt-pend-become ctxt) nil + (ctxt-pend-send ctxt) nil) + (let ((msg (ctxt-original-msg ctxt))) + (contin* ctxt cur-beh msg))) + )))) ((:rx target . msg) (let* ((beh (am-actor-beh target)) (ctxt (make-ctxt :actor target - :pend-send nil :original-beh beh - :pend-become beh :original-msg msg))) (contin* ctxt target msg))) ((:exit) (setf *send-message* #'startup-send-message + *mcpu-istream* nil *ac-machine* nil) (sys:ensure-memory-after-store) (return-from #1#)) @@ -258,70 +260,66 @@ ;; ----------------------------------------------- ;; Writing Actor Machine Behaviors -(defun chk-commit (body) - (if (endp body) - `((commit)) - body)) +(um:eval-always + (defun chk-commit (body) + (if (endp body) + `((commit)) + body))) (defmacro sending ((&rest msg) &body body) - `(β (*ctxt*) - (instr *ctxt* β :send ,@msg) + `(β (*self-ctxt*) + (instr *self-ctxt* β :send ,@msg) ,@(chk-commit body))) (defmacro sending* ((&rest msg) &body body) - `(β (*ctxt*) - (instr* *ctxt* β :send ,@msg) + `(β (*self-ctxt*) + (instr* *self-ctxt* β :send ,@msg) ,@(chk-commit body))) (defmacro becoming (beh-fn-form &body body) - `(β (*ctxt*) - (instr *ctxt* β :become ,beh-fn-form) + `(β (*self-ctxt*) + (instr *self-ctxt* β :become ,beh-fn-form) ,@(chk-commit body))) (defmacro creating ((ans beh-fn-form) &body body) - `(β (*ctxt* ,ans) - (instr *ctxt* β :create ,beh-fn-form) + `(β (*self-ctxt* ,ans) + (instr *self-ctxt* β :create ,beh-fn-form) ,@(chk-commit body))) (defmacro operating ((ans (fn &rest args)) &body body) - `(β (*ctxt* ,ans) - (instr *ctxt* β :oper #',fn ,@args) + `(β (*self-ctxt* ,ans) + (instr *self-ctxt* β :oper #',fn ,@args) ,@(chk-commit body))) (defun commit () - (mp:mailbox-send *mcpu-istream* (list *ctxt* :commit))) + (mp:mailbox-send *mcpu-istream* (list *self-ctxt* :commit))) ;; ---------------------------------- ;; Defining new A-Machine Behaviors -(defun make-am-beh (clause) - (destructuring-bind (pat . body) clause - `(,pat - ,@(when (and (consp body) - (eql 'when (car body)) - (cadr body)) - (prog1 - `(when ,(cadr body)) - (setf body (cddr body)))) - ,@body))) +(defmacro am-lambda (args &body body) + `(lambda* ,(cons '*self-ctxt* args) + (symbol-macrolet ((self (ctxt-actor *self-ctxt*))) + ,@body))) + + (defmacro def-am-beh (name args &body clauses) - (lw:with-unique-names (am-actor msg cust) + (lw:with-unique-names (am-actor msg) `(defun ,name ,args ;; We contain an embedded A-Machine Actor, carrying the real ;; behavior code. (let ((,am-actor (make-am-actor - :beh (lambda (*ctxt* &rest ,msg) - (symbol-macrolet ((self (ctxt-actor *ctxt*))) - (match ,msg - ,@(mapcar #'make-am-beh clauses)))) + :beh (am-lambda (&rest ,msg) + (match ,msg + ,@clauses)) ))) ;; ...and present a normal Actor behavior skin to the outside ;; world. Messages sent here from outside the A-Machine get ;; redirected to the embedded A-Machine Actor. (alambda - ((,cust :get-am-actor) - (send ,cust ,am-actor) + ((cust :get-am-actor) + (send cust ,am-actor) ,am-actor) (,msg @@ -388,6 +386,10 @@ (creating (am-a (count-to-beh from to)) (sending (am-a :next))))) +(let ((supv (create (count-start-beh)))) + (dotimes (ix 100) + (send supv ix (+ ix 100)))) + (let ((supv (create (count-start-beh)))) (send supv 15 20) (send supv 100 105) @@ -397,6 +399,14 @@ (send (create (count-to-beh 15 20)) :next) (send (create (count-to-beh 100 105)) :next) (send (create (count-to-beh 1500 1508)) :next)) + +(def-am-beh am-error-beh () + (_ + (operating (ans (/ 0))) + )) + +(let ((am (create (am-error-beh)))) + (send am)) |# diff --git a/xTActors/actors-machine/packages.lisp b/xTActors/actors-machine/packages.lisp index fb1decab..ca6ea8b3 100644 --- a/xTActors/actors-machine/packages.lisp +++ b/xTActors/actors-machine/packages.lisp @@ -18,4 +18,6 @@ #:operating #:commit #:def-am-beh + #:am-lambda + #:kill-am ))