;;; -*- Mode: LISP; Package: :cl-user; BASE: 10; Syntax: ANSI-Common-Lisp; -*- ;;; ;;; Touched: Fri Mar 09 15:26:32 2007 +0530 ;;; Time-stamp: <2007-11-22 11:40:41 madhu> ;;; Status: Experimental. DO NOT REDISTRIBUTE. ;;; $Id: $ ;;; ;;; Task execution in multiple lisp (Unix) processes for CMUCL. Based ;;; on ideas and code from: ;;; ;;; 1. http://franz.com/support/tech_corner/forksmp.lisp ;;; http://franz.com/support/tech_corner/forksmp.lhtml ;;; 2. Fred Gilham's timed-unix-process on c.l.l message ;;; ;;; 3. John Mallery's HTTP:server;task-queue.lisp in CL-HTTP ;;; (defpackage "FORKSMP" (:use "CL") (:export "MAKE-TASK-QUEUE" "ENSURE-ACTIVE-TASK-QUEUE" "PUSH-TASK-QUEUE")) (in-package "FORKSMP") ;;; ---------------------------------------------------------------------- ;;; ;;; ;;; (defstruct (queue (:constructor %make-queue)) "A tail-consing queue with inter-process locking." name lock list tail-cons) (defun make-queue (name &key initial-contents) (%make-queue :name name :list initial-contents :lock (MP:make-lock (format nil "~A LOCK" name)) :tail-cons (last initial-contents))) (defun queue-delete (queue &optional (item nil item-supplied-p) &key (test #'eql)) "If ITEM is supplied delete all occurrences from the QUEUE's list. Otherwise delete all items in QUEUE's list." (MP:with-lock-held ((queue-lock queue)) (if (not item-supplied-p) (setf (queue-list queue) nil (queue-tail-cons queue) nil) (loop with prev-cons = nil for x on (queue-list queue) for (first . rest) = x if (funcall test first item) do (cond (prev-cons (setf (cdr prev-cons) rest)) (t (setf (queue-list queue) rest))) (when (endp rest) (assert (eq (queue-tail-cons queue) x)) (setf (queue-tail-cons queue) prev-cons)) else do (setq prev-cons x))))) (defun queue-push (queue item &optional front-p) "Push ITEM onto the end of QUEUE's list. If FRONT-P is non-NIL push ITEM at its begining." (declare (type queue queue)) (let ((new-cons (cons item nil)) (tail-cons (queue-tail-cons queue)) (list (queue-list queue))) (MP:with-lock-held ((queue-lock queue)) (cond (list (assert tail-cons) (assert (endp (cdr tail-cons))) (cond (front-p (setf (cdr new-cons) list) (setf (queue-list queue) new-cons)) (t (setf (cdr tail-cons) new-cons) (setf (queue-tail-cons queue) new-cons)))) (t (assert (null tail-cons)) (setf (queue-tail-cons queue) new-cons (queue-list queue) new-cons)))))) (defun queue-pop (queue &key wait) "Return the item at the front of QUEUE's list. When the list is empty, if WAIT is non-NIL, block until QUEUE is non-empty. Otherwise return NIL." (declare (type queue queue)) (loop (MP:with-lock-held ((queue-lock queue)) (if (endp (queue-list queue)) (when (null wait) (assert (null (queue-tail-cons queue))) (return nil)) (let ((item (pop (queue-list queue)))) (if (endp (queue-list queue)) (setf (queue-tail-cons queue) nil) (assert (queue-tail-cons queue))) (return item)))) (MP:process-wait "Waiting for queue to become non-empty" #'queue-list queue))) #|| (defvar $q (make-queue "Test Queue")) (defvar $x 0) (defun pop-foo () (warn "$q = ~A~&popped ~A" $q (setq $x (queue-pop $q :wait t)))) (defvar $f (MP:make-process #'pop-foo)) (queue-push $q 42) (MP:destroy-process $f) (queue-push $q NIL) (queue-push $q 66 t) (queue-pop $q) (queue-delete $q nil) ||# ;;; ---------------------------------------------------------------------- ;;; ;;; ;;; (defstruct (sub-process (:print-function (lambda (sub-process stream depth) (declare (ignore depth) (type stream stream)) (format stream "#" (sub-process-to-stream sub-process) (sub-process-from-stream sub-process) (sub-process-pid sub-process)))) (:constructor %make-sub-process) (:copier nil)) "A Unix (Lisp) process." to-stream from-stream pid) (defun pipe () "Return as values two character streams that can be read from and written to." (multiple-value-bind (input-fd output-fd) (UNIX:unix-pipe) (unless input-fd (error "pipe(2) failed with error code ~D" output-fd)) (values (sys:make-fd-stream input-fd :output nil :buffering :none :input t :element-type 'character) (sys:make-fd-stream output-fd :output t :buffering :none :input nil :element-type 'character)))) (defvar *all-sub-processes* (make-queue "All sub-processes")) (defvar *close-network-descriptors* t "If non-NIL, close all network FDs in child unix process.") (defun close-network-descriptors (&optional (maxfd 1024)) (dotimes (descriptor maxfd) (multiple-value-bind (host port) (ignore-errors (ext:get-peer-host-and-port descriptor)) (when host (format t "Closing descriptor ~D, peer port ~D~%" descriptor port) (ignore-errors (unix:unix-close descriptor)))))) (defun make-sub-process (function) "Call fork(2). FUNCTION is funcalled in the child with two arguments: the streams for input from and output to the child." (multiple-value-bind (from-child-read from-child-write) (pipe) (multiple-value-bind (to-child-read to-child-write) (pipe) (let ((pid (UNIX:unix-fork))) (cond ((zerop pid) ;; Child (close from-child-read) (close to-child-write) ;; XXX other descriptors are shared with parent. (when *close-network-descriptors* (ignore-errors (close-network-descriptors))) (unwind-protect (funcall function to-child-read from-child-write) (ignore-errors (close to-child-read)) (ignore-errors (close from-child-write)) (UNIX:unix-exit 0))) (t (close from-child-write) ;; Parent (close to-child-read) (let ((sub-process (%make-sub-process :to-stream to-child-write :from-stream from-child-read :pid pid))) (queue-push *all-sub-processes* sub-process) sub-process))))))) ;;; ---------------------------------------------------------------------- ;;; ;;; simple sub-process IO scheme using the reader and printer (franz) ;;; (defvar *error-marker* '**ERROR**) (defun evaluate-in-sub-process (sub-process form) "Internal. FORM should be a function call of the form (FUNCTION &REST ARGS) where FUNCTION is a symbol and ARGS are symbols, strings, or numbers. Evaluates FORM synchronously in SUB-PROCESS." (let ((to-stream (sub-process-to-stream sub-process)) (from-stream (sub-process-from-stream sub-process))) (write-char #\( to-stream) (write-string (symbol-name (car form)) to-stream) (write-char #\Space to-stream) (dolist (argument (cdr form)) (format t "WAIT-FOR-PROCESSOR: arg: ~s~%" argument) (cond ((stringp argument) (write-char #\" to-stream) (write-string argument to-stream) (write-char #\" to-stream) (write-char #\Space to-stream)) ((symbolp argument) #|| (write-string (package-name (symbol-package argument)) to-stream) (write-char #\: to-stream) (write-char #\: to-stream) ||# (write-string (symbol-name argument) to-stream)) ((integerp argument) (write-string (prin1-to-string argument) to-stream)) (t (error "Cannot convert this argument: ~s." argument))) (write-char #\space to-stream)) (write-char #\) to-stream) (terpri to-stream) (force-output to-stream) (let ((result (read from-stream nil from-stream))) (cond ((eq result from-stream) (error "Sub-process did not complete for ~s." (car form))) ((eq *error-marker* result) (warn "Sub-process did not complete for ~s." (car form))) (t result))))) (defvar *stack-backtrace-number-of-frames* 20) (defmethod write-stack-backtrace (c stream &optional (n-frames *stack-backtrace-number-of-frames*)) (debug:backtrace n-frames stream) (force-output stream)) (defun child-subprocess (input-stream output-stream) "Internal. Example function which runs in the sub-process and reads forms from INPUT-STREAM, evaluates them and writes the result to OUTPUT-STREAM. Errors in the evaluation are passed back to the parent using a unique symbol bound to *error-marker*." (let (form) (loop (setq form (with-standard-io-syntax (read input-stream nil input-stream))) (format t "CHILD-SUBPROCESS: form is ~s~%" form) (when (eq input-stream form) (force-output output-stream) (close output-stream) (return)) (let* ((result (block nil (handler-bind ((error (lambda (c) (format t "CHILD ERROR: ~a~%" c) (debug:backtrace) (cons *error-marker* (format nil "~a" c))))) (eval form))))) (format t "CHILD-SUBPROCESS: result is ~s~%" result) (write result :stream output-stream) (write-char #\newline output-stream) (force-output output-stream))))) ;;; ---------------------------------------------------------------------- ;;; ;;; Code to reap terminated processes. (fgilham) ;;; (alien:def-alien-routine ("wait4" c-wait4) c-call:int (wpid c-call:int) (status c-call:int :out) (options c-call:int) (rusage c-call:int)) (defun wait4 (pid &optional do-not-hang check-for-stopped) "Return any available status information on child processed. " (multiple-value-bind (pid status) (c-wait4 pid (logior (if do-not-hang ext::wait-wnohang 0) (if check-for-stopped ext::wait-wuntraced 0)) 0) (cond ((or (minusp pid) (zerop pid)) nil) ((eql (ldb (byte 8 0) status) ext::wait-wstopped) (values pid :stopped (ldb (byte 8 8) status))) ((zerop (ldb (byte 7 0) status)) (values pid :exited (ldb (byte 8 8) status))) (t (let ((signal (ldb (byte 7 0) status))) (values pid (if (or (eql signal UNIX:sigstop) (eql signal UNIX:sigtstp) (eql signal UNIX:sigttin) (eql signal UNIX:sigttou)) :stopped :signaled) signal (not (zerop (ldb (byte 1 7) status))))))))) ;;; ;;; ;;; (defun destroy-sub-process (sub-process) "Internal. Close streams, and kill(2) the sub-process." (let ((to-stream (sub-process-to-stream sub-process))) (when to-stream (ignore-errors (close to-stream :abort t)) (setf (sub-process-to-stream sub-process) nil) t)) (let ((from-stream (sub-process-from-stream sub-process))) (when from-stream (ignore-errors (close from-stream :abort t)) (setf (sub-process-from-stream sub-process) nil) t)) (let ((pid (sub-process-pid sub-process))) (when pid (UNIX:unix-kill pid UNIX:sigterm) (wait4 pid) (setf (sub-process-pid sub-process) nil))) (queue-delete *all-sub-processes* sub-process) sub-process) #|| (setq $a (make-sub-process #'child-subprocess)) (read (sub-process-from-stream $a)) (evaluate-in-sub-process $a '(+ 2 30)) (evaluate-in-sub-process $a '(format t "~A ~D~%" "foo" 42)) (destroy-sub-process $a) ||# ;;; ---------------------------------------------------------------------- ;;; ;;; ;;; (defstruct (task-queue (:constructor %make-task-queue) (:copier nil)) "A Task Queue to evaluate forms (tasks) in any of its child-subprocesses." run-p process wait-whostate number-of-child-subprocesses child-subprocesses queue) (defun make-task-queue (name n &key tasks) "NAME is a string. N is the number of child sub processes to spawn." (%make-task-queue :run-p t :process nil :wait-whostate "Task Wait" :number-of-child-subprocesses n :queue (make-queue (format nil "~A TASKS" name) :initial-contents tasks) :child-subprocesses (make-queue (format nil "~A FREE CHILD SUB PROCS" name) :initial-contents (loop for i below n collect (make-sub-process 'child-subprocess))))) (defun task-queue-execute-task (task-queue task) "Find a free child subprocess and evaluate TASK in it." (let ((sub-process (queue-pop (task-queue-child-subprocesses task-queue) :wait t))) (unwind-protect (evaluate-in-sub-process sub-process task) (queue-push (task-queue-child-subprocesses task-queue) sub-process)))) (defun task-queue-execute-pending-tasks (task-queue) (loop with task do (unwind-protect (cond ((setq task (queue-pop (task-queue-queue task-queue))) (task-queue-execute-task task-queue task) (setq task nil)) (t (return-from task-queue-execute-pending-tasks nil))) (when task (queue-push (task-queue-queue task-queue) task T))) while (task-queue-run-p task-queue))) (defun task-queue-pending-tasks-p (task-queue) (queue-list (task-queue-queue task-queue))) (defun task-queue-main-loop (task-queue) (loop with wait-whostate = (task-queue-wait-whostate task-queue) do (MP:process-wait wait-whostate (lambda (tq) (and (task-queue-run-p tq) (task-queue-pending-tasks-p tq))) task-queue) (task-queue-execute-pending-tasks task-queue))) (defun start-task-queue (task-queue) (let ((process (or (task-queue-process task-queue) (setf (task-queue-process task-queue) (MP:make-process nil :name (format nil "~A PROCESS" (queue-name (task-queue-queue task-queue)))))))) (MP:process-preset process #'task-queue-main-loop task-queue) (MP:enable-process process) (setf (task-queue-run-p task-queue) t) process)) (defun task-queue-waiting-p (task-queue) (equalp (MP:process-whostate (task-queue-process task-queue)) (task-queue-wait-whostate task-queue))) (defun stop-task-queue (task-queue) ;XXX (let ((process (task-queue-process task-queue))) (when (and process (mp:process-active-p process)) (setf (task-queue-run-p task-queue) nil) (cond ((eq process (MP:current-process)) (MP:disable-process process)) (t (MP:process-wait "Task Queue Shutdown (waiting)" #'task-queue-waiting-p task-queue) (MP:make-process (lambda () (MP:disable-process process)) :name "Task Queue Shutdown (disabling)"))) process))) (defun task-queue-kill-process (task-queue) (let ((process (task-queue-process task-queue))) (when process (prog1 process (MP:destroy-process process) (setf (task-queue-process task-queue) nil))))) (defun task-queue-kill-sub-processes (task-queue) (let ((child-subprocesses (task-queue-child-subprocesses task-queue))) (when (queue-list child-subprocesses) (prog1 (mapcar (lambda (x) (ignore-errors (destroy-sub-process x))) (queue-list child-subprocesses)) (queue-delete child-subprocesses))))) (defun task-queue-maybe-create-sub-processes (task-queue) (let ((child-subprocesses (task-queue-child-subprocesses task-queue))) (unless (queue-list child-subprocesses) (loop for i below (task-queue-number-of-child-subprocesses task-queue) collect (queue-push child-subprocesses (make-sub-process 'child-subprocess)))))) (defun task-queue-active-p (task-queue) (let ((process (task-queue-process task-queue))) (and process (MP:process-active-p process)))) (defun ensure-active-task-queue (task-queue) (prog1 task-queue (or (task-queue-active-p task-queue) (start-task-queue task-queue)))) (defun push-task-queue (task-queue task-form) (queue-push (task-queue-queue task-queue) task-form)) #|| (defparameter *task-number* 0) (defun cl-user::task (i) (format t "~&Task: ~D ~D~%" i (incf *task-number*)) (force-output)) (defvar $tq (make-task-queue "Test TQ1" 2)) (ensure-active-task-queue $tq) (push-task-queue $tq '(pprint 42)) (loop for i from 1 to 2 do (push-task-queue $tq `(user::task ,i))) (push-task-queue $tq `(cerror "Continue." "foo" )) (queue-list (task-queue-queue $tq)) (queue-list (task-queue-child-subprocesses $tq)) (task-queue-waiting-p $tq) (task-queue-active-p $tq) (task-queue-run-p $tq) (task-queue-kill-sub-processes $tq) (task-queue-maybe-create-sub-processes $tq) (stop-task-queue $tq) (task-queue-kill-process $tq) (setf (task-queue-run-p $tq) t) (setf (task-queue-run-p $tq) nil) (task-queue-child-subprocesses $tq) (mapcar 'destroy-sub-process (queue-list (task-queue-child-subprocesses $tq))) (mapcar 'destroy-sub-process (queue-list *all-sub-processes*))) (task-queue-process $tq) (mp:destroy-process (task-queue-process $tq)) (setf (task-queue-process $tq) nil) (mp::enable-process (task-queue-process $tq)) (mp::process-add-run-reason (task-queue-process $tq) :enable) (mp::process-active-p (task-queue-process $tq)) ;; TODO check active sub-processes ||#