git.fiddlerwoaroof.com
crdt.lisp
eaee792a
 (defpackage :fwoar.crdt
   (:use :cl)
   (:export ))
 (in-package :fwoar.crdt)
 
 (defgeneric crdt-update (op crdt))
 (defgeneric crdt-query (crdt))
 (defgeneric crdt-compare (less greater))
 (defgeneric crdt-merge (id first second))
75ddf051
 (defgeneric crdt-serialize (crdt))
 (defgeneric crdt-deserialize (class state)
   (:method ((class symbol) state)
     (crdt-deserialize (closer-mop:class-prototype (find-class class))
                       state)))
eaee792a
 
 (fw.lu:defclass+ g-counter ()
75ddf051
   ((lock :reader unsafe-lock
          :initform (bt:make-lock "g-counter-lock"))
    (id :reader my-id :initarg :id)
eaee792a
    (size :reader size :initarg :size)
    (payload :reader payload)))
75ddf051
 (defmethod crdt-serialize ((crdt g-counter))
   (with-output-to-string (s)
     (format s "~d" (my-id crdt))
     (loop for v across (bt:with-lock-held ((unsafe-lock crdt))
                          (copy-seq (payload crdt)))
           do
              (format s "|~a" v)
           finally
              (format s "~%"))))
 #+comment
 (progn
   (crdt-serialize (positives *x*))
   )
 (defmethod crdt-deserialize ((crdt g-counter) state)
   (let* ((parts (map 'vector 'parse-integer
                      (fwoar.string-utils:split #\|
                                                (serapeum:trim-whitespace state))))
          (result (g-counter (elt parts 0)
                             (1- (length parts)))))
     (prog1 result
       (with-slots (payload) result
         (setf payload (subseq parts 1))))))
eaee792a
 (defmethod print-object ((o g-counter) s)
   (with-slots (id size) o
     (prin1 `(g-counter :id ,id :size ,size)
            s)))
 (defmethod initialize-instance :after ((crdt g-counter) &key size)
   (with-slots (payload) crdt
     (setf payload (make-array size))))
 (defmethod crdt-update ((op (eql :increment)) (crdt g-counter))
75ddf051
   (bt:with-lock-held ((unsafe-lock crdt))
     (incf (aref (payload crdt)
                 (my-id crdt)))))
eaee792a
 (defmethod crdt-query ((crdt g-counter))
   (reduce #'+
           (payload crdt)))
 (defmethod crdt-compare ((x g-counter)
                          (y g-counter))
   (every #'<=
          (payload x)
          (payload y)))
 (defmethod crdt-merge (id
                        (x g-counter)
                        (y g-counter))
   (assert (= (size x)
              (size y))
           (x y))
   (let ((z (g-counter id (size x))))
     (prog1 z
75ddf051
       (bt:with-lock-held ((unsafe-lock x))
         (bt:with-lock-held ((unsafe-lock y))
           (loop for i from 0 below (size x)
                 do
                    (setf (aref (payload z) i)
                          (max (aref (payload x) i)
                               (aref (payload y) i)))))))))
eaee792a
 
 (fw.lu:defclass+ pn-counter ()
   ((id :reader my-id :initarg :id)
    (size :reader size :initarg :size)
    (positives :reader positives)
    (negatives :reader negatives)))
 (defmethod initialize-instance :after ((crdt pn-counter) &key id size)
   (with-slots (positives negatives) crdt
     (setf positives (g-counter id size)
           negatives (g-counter id size))))
 (defmethod print-object ((o pn-counter) s)
   (with-slots (id size positives negatives) o
     (prin1 `(pn-counter :id ,id :size ,size)
            s)))
 (defmethod payload ((object pn-counter))
   (with-slots (positives negatives) object
     (list :positives (payload positives)
           :negatives (payload negatives))))
 (defmethod crdt-update ((op (eql :increment)) (crdt pn-counter))
   (crdt-update :increment (positives crdt)))
 (defmethod crdt-update ((op (eql :decrement)) (crdt pn-counter))
   (crdt-update :increment (negatives crdt)))
 (defmethod crdt-query ((crdt pn-counter))
   (- (crdt-query (positives crdt))
      (crdt-query (negatives crdt))))
 (defmethod crdt-compare ((x pn-counter)
                          (y pn-counter))
   (and (crdt-compare (positives x) (positives y))
        (crdt-compare (negatives x) (negatives y))))
 (defmethod crdt-merge (id
                        (x pn-counter)
                        (y pn-counter))
   (assert (= (size x)
              (size y))
           (x y))
   (let ((z (pn-counter id (size x))))
     (prog1 z
       (with-slots (positives negatives) z
         (with-slots ((p-x positives) (n-x negatives)) x
           (with-slots ((p-y positives) (n-y negatives)) y
             (setf positives (crdt-merge id p-x p-y)
                   negatives (crdt-merge id n-x n-y))))))))
 
 (defun make-cluster (size class)
   (values-list
    (loop for id from 0 below size
          collect (make-instance class :id id :size size))))