git.fiddlerwoaroof.com
websocket-sink.lisp
1abfbd92
 (defpackage :fwoar.websocket-sink
   (:use :cl )
   (:export ))
 (in-package :fwoar.websocket-sink)
 
3e71a0e1
 (defparameter *obj-style*
   :hash-table)
 (defparameter *event-table*
   (make-array 100
               :adjustable t
               :fill-pointer 0))
 (defparameter *event-queue*
   (lparallel.queue:make-queue))
1abfbd92
 
 (defun parse-fa-body (ht)
   (prog1 ht
     (ecase *obj-style*
3e71a0e1
       (:hash-table
        (let ((body (gethash "body" ht)))
          (setf (gethash "body" ht)
                (or (when (not (equal body ""))
                      (handler-case
                          (yason:parse body
                                       :object-as *obj-style*)
                        (error ())))
                    body))))
       #+(or)
       (:alist (pushnew (assoc "body" ht)
                        (yason:parse (gethash "body" ht)
                                     :object-as *obj-style*))))))
1abfbd92
 
 (defparameter *handlers* (make-hash-table :test 'equal))
 (defgeneric handle-message (path message)
   (:method :around ((path string) message)
     (alexandria:if-let ((handler (gethash path *handlers*)))
       (handle-message handler message)
       (call-next-method)))
   (:method (path message)
     message))
 
 (defun parse (string)
   (destructuring-bind (path result)
       (coerce (fwoar.string-utils:split #\space string
                                         :count 1)
               'list)
     (format t "~&(PATH ~a) (RESULT ~a)~%"
             path result)
     (list path
           (handle-message path
                           (parse-fa-body
                            (yason:parse result
                                         :object-as *obj-style*))))))
 
 (defun get-and-process-one-message (socket)
   (let* ((raw-reply (pzmq:recv-string socket :encoding :utf-8)))
     (format t "~&~a~%" raw-reply)
     (destructuring-bind (path payload . r) (parse raw-reply)
       (when r
         (format t "~&extra stuff? PATH: ~a~%~4t~a~%~4t~a~2%"
                 path
                 payload
                 r)) 
       payload)))
 
 (defmacro with-hub ((symbol port subscription) &body body)
   (alexandria:once-only (port subscription)
     `(pzmq:with-socket ,symbol (:sub :subscribe ,subscription)
        (pzmq:connect ,symbol ,port)
        ,@body)))
 
 (defvar *server*)
 (defvar *ws-server*)
 
 (defvar listeners ())
5225e932
 
7819c3f2
 (defun update-listeners (new-listeners)
   (setf listeners new-listeners))
 
1abfbd92
 (defun main (&optional (wait t) (subscription "/") (port "tcp://127.0.0.1:5557"))
   (clack:clackup (lambda (env)
                    (let ((server (websocket-driver:make-server env)))
                      (event-emitter:on :open server
                                        (lambda ()
                                          (push server listeners)))
                      (event-emitter:on :close server
                                        (lambda (&key code reason)
                                          (declare (ignore code reason))
                                          (setf listeners
7819c3f2
                                                (remove server listeners))
                                          (update-listeners listeners)))
1abfbd92
                      (lambda (responder)
                        (declare (ignore responder))
                        (websocket-driver:start-connection server))))
                  :port 5009)
   (if wait
       (pzmq-process port subscription)
       (bt:make-thread (lambda () (pzmq-process port subscription))
                       :name "websocket-handler")))
 
 (defun handle (reply)
   (mapcar (serapeum:op (handler-case
                            (websocket-driver:send _1
                                                   (With-output-to-string (s)
                                                     (yason:encode reply
                                                                   s)))
                          (error ()
                            (format t "~&failed to push ~s to ~s~%" reply _1))))
           listeners))
 
 (defun pzmq-process (port subscription)
   (with-hub (pzmq port subscription)
     (loop for raw-reply = (pzmq:recv-string pzmq :encoding :utf-8)
           for (_ reply) = (parse raw-reply)
           do (format t "~&REPLY: ~s~%" reply)
           do (handle reply))))