;;; -*- Mode: LISP; Package: :cl-user; BASE: 10; Syntax: ANSI-Common-Lisp; -*- ;;; ;;; Touched: Mon Apr 30 10:26:07 2007 +0530 ;;; Time-stamp: <2007-05-03 23:21:56 madhu> ;;; Status: Experimental. Do not redistribute. ;;; Bugs-To: ;;; Copyright (C) 2007 Madhu. All Rights reserved. ;;; Branched-from: tsched.lisp 2.5 06/04/30 ;;; (defpackage "CRON-1-1" (:use "CL")) (in-package "CRON-1-1") (defclass timer-event () ((time-instant :accessor timer-timeout :type integer) (index :accessor timer-index :initform NIL :type (or null integer))) (:documentation "Internal. Basic timer event.")) (defun up-heap (index timers) "Internal. Priority Queue implementaion. TIMERS is an array of TIMER-EVENTs." (declare (type fixnum index) (type array timers)) (loop with timer = (aref timers index) and p for i = index then p while (> i 1) if (>= (timer-timeout timer) (timer-timeout (aref timers (setq p (ash i -1))))) do (loop-finish) ;break if we are greater than the parent else do (setf (timer-index (setf (aref timers i) (aref timers p))) i) finally (setf (timer-index (setf (aref timers i) timer)) i))) (defun down-heap (index timers) "Internal. Priority Queue implementaion. TIMERS is an array of TIMER-EVENTs." (declare (type fixnum index) (type array timers)) (loop with timer = (aref timers index) for i of-type fixnum = index then j for j = (* index 2) then (* j 2) while (< j (fill-pointer timers)) if (and (< j (1- (fill-pointer timers))) (> (timer-timeout (aref timers j)) (timer-timeout (aref timers (1+ j))))) do (incf j) ;j points to smaller of the two children if (<= (timer-timeout timer) (timer-timeout (aref timers j))) do (loop-finish) ;break if we are less than the smaller child else do (setf (timer-index (setf (aref timers i) (aref timers j))) i) finally (setf (timer-index (setf (aref timers i) timer)) i))) ;;; ;;; ;;; (defvar +default-pqueue-size+ 10 "Internal. Initial size of array.") (defvar +default-pqueue-incr+ 5 "Internal. Size to grow array.") (defclass priority-queue () ((name :initform "[Anonymous]" :type string :initarg :name) (incr :initform +default-pqueue-incr+ :type (integer 0) :initarg :incr) (process :type (or null MP::process) :initform nil) (lock :type MP:lock) (timers :type array)) (:documentation "Internal. A Priority Queue of events, with inter-process locking.")) (defmethod initialize-instance :after ((pq priority-queue) &key (size +default-pqueue-size+) &allow-other-keys) "Initialize an empty priority Queue." (with-slots (name lock timers) pq (setq lock (MP:make-lock (format nil "~A LOCK" name))) (setq timers (make-array (1+ size) :fill-pointer 1 :adjustable t)))) (defmethod pq-map ((pq priority-queue) (f function)) "Internal. Funcall F on each timer-event in the priority-queue PQ." (with-slots (timers lock) pq (MP:with-lock-held (lock) (loop for i from 1 below (fill-pointer timers) do (funcall f (aref timers i)))))) (defmethod pq-add ((pq priority-queue) (timer timer-event)) "Internal. Add given timer-event TIMER to priority-queue PQ." (assert (null (timer-index timer))) (with-slots (timers lock incr) pq (MP:with-lock-held (lock) (let ((free (fill-pointer timers))) (assert (= (vector-push-extend timer timers incr) free)) (setf (timer-index timer) free) (up-heap free timers)) timer))) (defmethod pq-clear ((pq priority-queue)) "Internal. Empty the priority-queue PQ of all timer-events." (with-slots (timers lock) pq (MP:with-lock-held (lock) (setf (fill-pointer timers) 1)))) (defmethod pq-get ((pq priority-queue) &optional timeout) "Internal. Dequeue and return the next timer-event in priority-queue PQ, or NIL if PQ is empty. If TIMEOUT is NON-NIL, return the next timer-event only if it has a timeout less than or equal to the specified TIMEOUT, otherwise return as values NIL and the difference in timeout values." (with-slots (timers lock) pq (MP:with-lock-held (lock) (when (> (fill-pointer timers) 1) (let ((timer (aref timers 1))) (when TIMEOUT (let ((timer-timeout (timer-timeout timer))) (if (> timer-timeout timeout) (return-from pq-get (values NIL (- timer-timeout timeout)))))) (setf (aref timers 1) (vector-pop timers)) ;swap with last (down-heap 1 timers) (setf (timer-index timer) nil) timer))))) (defmethod pq-delete ((pq priority-queue) (timer timer-event) &aux (index (timer-index timer))) "Internal. Remove given timer-event TIMER from priority-queue PQ." (with-slots (timers lock incr) pq (MP:with-lock-held (lock) (assert (and index (eq (aref timers index) timer))) (setf (aref timers index) (vector-pop timers)) ;swap with last (when (> (fill-pointer timers) 1) ;adjust index (if (or (= index 1) (> (timer-timeout (aref timers index)) (timer-timeout (aref timers (truncate index 2))))) (down-heap index timers) (up-heap index timers))) (setf (timer-index timer) nil) timer))) (defgeneric handle-event (timer-event) (:documentation "Internal. Protocol method for dispatching timer-events.")) (defvar +default-pqueue-sleep-time+ 60 "Internal. Maximum sleep time.") (defmethod pq-synchronize ((pq priority-queue) &key stop) "Start up the process associated with priority-queue PQ to dispatch timer-events. The process wakes up every +DEFAULT-PQUEUE-SIZE+ seconds and dequeues and handles current timer-events." (with-slots (process) pq (if process (mp:destroy-process process)) (setq process (if stop nil (mp:make-process #'(lambda () (loop (let ((timenow (get-universal-time))) (multiple-value-bind (timer-event sleep-time) (pq-get pq timenow) (cond (timer-event (handler-case (handle-event timer-event) (error (e) (format t "~&Error during HANDLE-EVENT ~A in ~A~&Ignored.~&" e pq)))) (t (sleep (if sleep-time (min sleep-time +default-pqueue-sleep-time+) +default-pqueue-sleep-time+)))))))) :name (format nil "PROCESS-FOR-~S" pq)))))) ;;; ;;; ;;; (defclass simple-task (timer-event) ((defined-at-utime :type integer) (separate-thread-p :type boolean :initform t :initarg :separate-thread-p) (handler :type (or null function) :initarg :handler :initform nil)) (:documentation "Run a given HANDLER once, on or after a given UTIME, perhaps in a separate thread of execution.")) (defmethod initialize-instance :after ((timer simple-task) &key utime &allow-other-keys) "If the UTIME argument is Not specified or is ZERO, Treat is as :NOW." (with-slots (defined-at-utime) timer (setq defined-at-utime (get-universal-time)) (setf (timer-timeout timer) (if (or (null utime) (zerop utime)) defined-at-utime utime)))) (defmethod handle-event ((timer simple-task)) (with-slots (defined-at-utime handler separate-thread-p) timer (format t "~&[~A] Handling ~A~& Scheduled to run at ~A~&" (user:iso-8601-date :stream nil) timer (user:iso-8601-date :stream nil :utime (timer-timeout timer))) (when handler (if separate-thread-p (MP:make-process handler :name (format nil "HANDLE-EVENT ~S" timer)) (funcall handler))))) ;;; ;;; ;;; (defclass periodic-task (simple-task) ((name :initform nil :initarg :name) (description :initform nil :initarg :documentation) (periodicity :type (integer 1) :initarg :periodicity) (last-updated-utime :type integer) (last-update-automatic-p :type boolean :initform nil) (parent-pq :type (or null priority-queue) :initarg :parent-pq :initform nil)) (:documentation "A task that gets requeued for execution every PERIODICITY seconds.")) (defmethod initialize-instance :after ((timer periodic-task) &key &allow-other-keys) (with-slots (periodicity) timer (assert periodicity nil "Must specify periodicity in seconds."))) (defmethod pq-add :after ((pq priority-queue) (timer periodic-task)) (with-slots (parent-pq) timer (cond (parent-pq (assert (eq pq parent-pq))) ; XXX optimistic pessimism (t (setq parent-pq pq))))) (defmethod handle-event :after ((timer periodic-task)) (with-slots (periodicity parent-pq last-update-automatic-p last-updated-utime) timer (let ((utime (get-universal-time))) (setq last-updated-utime utime last-update-automatic-p t) (incf (timer-timeout timer) periodicity) (let ((diff (- (timer-timeout timer) ; XXX (+ last-updated-utime periodicity)))) (when (> diff 5) (format t "~&HANDLE-EVENT (PERIODIC-TASK) ~A: Discrepancy in schedule: ~A" timer diff))) (pq-add parent-pq timer)))) #|| (defvar *event-queue* (make-instance 'priority-queue :name "*EVENT-QUEUE*") "Internal. Priority Queue of all timer-events.") (pq-add *event-queue* (setq $a (make-instance 'periodic-task :utime 0 :periodicity 60 :handler #'(lambda () (format t "~&~A ---MARK---~&" (user:iso-8601-date :stream nil))) :name "Mark" :description "MARK"))) (pq-synchronize *event-queue*) (untrace handle-event pq-add pq-get) (progn (pq-clear *event-queue*) (pq-synchronize *event-queue* :stop t)) ||# ;;; Local Variables: ;;; emacs-lisp-docstring-fill-column: 78 ;;; eval: (put 'prog 'common-lisp-indent-function 'lisp-indent-tagbody) ;;; End: