Browse code
feat: Adjust crdt for distributed operation
Ed L authored on 18/10/2020 08:24:17
Showing 1 changed files
Showing 1 changed files
... | ... |
@@ -7,11 +7,40 @@ |
7 | 7 |
(defgeneric crdt-query (crdt)) |
8 | 8 |
(defgeneric crdt-compare (less greater)) |
9 | 9 |
(defgeneric crdt-merge (id first second)) |
10 |
+(defgeneric crdt-serialize (crdt)) |
|
11 |
+(defgeneric crdt-deserialize (class state) |
|
12 |
+ (:method ((class symbol) state) |
|
13 |
+ (crdt-deserialize (closer-mop:class-prototype (find-class class)) |
|
14 |
+ state))) |
|
10 | 15 |
|
11 | 16 |
(fw.lu:defclass+ g-counter () |
12 |
- ((id :reader my-id :initarg :id) |
|
17 |
+ ((lock :reader unsafe-lock |
|
18 |
+ :initform (bt:make-lock "g-counter-lock")) |
|
19 |
+ (id :reader my-id :initarg :id) |
|
13 | 20 |
(size :reader size :initarg :size) |
14 | 21 |
(payload :reader payload))) |
22 |
+(defmethod crdt-serialize ((crdt g-counter)) |
|
23 |
+ (with-output-to-string (s) |
|
24 |
+ (format s "~d" (my-id crdt)) |
|
25 |
+ (loop for v across (bt:with-lock-held ((unsafe-lock crdt)) |
|
26 |
+ (copy-seq (payload crdt))) |
|
27 |
+ do |
|
28 |
+ (format s "|~a" v) |
|
29 |
+ finally |
|
30 |
+ (format s "~%")))) |
|
31 |
+#+comment |
|
32 |
+(progn |
|
33 |
+ (crdt-serialize (positives *x*)) |
|
34 |
+ ) |
|
35 |
+(defmethod crdt-deserialize ((crdt g-counter) state) |
|
36 |
+ (let* ((parts (map 'vector 'parse-integer |
|
37 |
+ (fwoar.string-utils:split #\| |
|
38 |
+ (serapeum:trim-whitespace state)))) |
|
39 |
+ (result (g-counter (elt parts 0) |
|
40 |
+ (1- (length parts))))) |
|
41 |
+ (prog1 result |
|
42 |
+ (with-slots (payload) result |
|
43 |
+ (setf payload (subseq parts 1)))))) |
|
15 | 44 |
(defmethod print-object ((o g-counter) s) |
16 | 45 |
(with-slots (id size) o |
17 | 46 |
(prin1 `(g-counter :id ,id :size ,size) |
... | ... |
@@ -20,8 +49,9 @@ |
20 | 49 |
(with-slots (payload) crdt |
21 | 50 |
(setf payload (make-array size)))) |
22 | 51 |
(defmethod crdt-update ((op (eql :increment)) (crdt g-counter)) |
23 |
- (incf (aref (payload crdt) |
|
24 |
- (my-id crdt)))) |
|
52 |
+ (bt:with-lock-held ((unsafe-lock crdt)) |
|
53 |
+ (incf (aref (payload crdt) |
|
54 |
+ (my-id crdt))))) |
|
25 | 55 |
(defmethod crdt-query ((crdt g-counter)) |
26 | 56 |
(reduce #'+ |
27 | 57 |
(payload crdt))) |
... | ... |
@@ -38,11 +68,13 @@ |
38 | 68 |
(x y)) |
39 | 69 |
(let ((z (g-counter id (size x)))) |
40 | 70 |
(prog1 z |
41 |
- (loop for i from 0 below (size x) |
|
42 |
- do |
|
43 |
- (setf (aref (payload z) i) |
|
44 |
- (max (aref (payload x) i) |
|
45 |
- (aref (payload y) i))))))) |
|
71 |
+ (bt:with-lock-held ((unsafe-lock x)) |
|
72 |
+ (bt:with-lock-held ((unsafe-lock y)) |
|
73 |
+ (loop for i from 0 below (size x) |
|
74 |
+ do |
|
75 |
+ (setf (aref (payload z) i) |
|
76 |
+ (max (aref (payload x) i) |
|
77 |
+ (aref (payload y) i))))))))) |
|
46 | 78 |
|
47 | 79 |
(fw.lu:defclass+ pn-counter () |
48 | 80 |
((id :reader my-id :initarg :id) |