git.fiddlerwoaroof.com
Raw Blame History
(defpackage :fwoar.websocket-sink
  (:use :cl )
  (:export ))
(in-package :fwoar.websocket-sink)

(defparameter *obj-style*
  :hash-table)
(defparameter *event-table*
  (make-array 100
              :adjustable t
              :fill-pointer 0))
(defparameter *event-queue*
  (lparallel.queue:make-queue))

(defun parse-fa-body (ht)
  (prog1 ht
    (ecase *obj-style*
      (:hash-table
       (let ((body (gethash "body" ht)))
         (setf (gethash "body" ht)
               (or (when (not (equal body ""))
                     (handler-case
                         (yason:parse body
                                      :object-as *obj-style*)
                       (error ())))
                   body))))
      #+(or)
      (:alist (pushnew (assoc "body" ht)
                       (yason:parse (gethash "body" ht)
                                    :object-as *obj-style*))))))

(defparameter *handlers* (make-hash-table :test 'equal))
(defgeneric handle-message (path message)
  (:method :around ((path string) message)
    (alexandria:if-let ((handler (gethash path *handlers*)))
      (handle-message handler message)
      (call-next-method)))
  (:method (path message)
    message))

(defun parse (string)
  (destructuring-bind (path result)
      (coerce (fwoar.string-utils:split #\space string
                                        :count 1)
              'list)
    (format t "~&(PATH ~a) (RESULT ~a)~%"
            path result)
    (list path
          (handle-message path
                          (parse-fa-body
                           (yason:parse result
                                        :object-as *obj-style*))))))

(defun get-and-process-one-message (socket)
  (let* ((raw-reply (pzmq:recv-string socket :encoding :utf-8)))
    (format t "~&~a~%" raw-reply)
    (destructuring-bind (path payload . r) (parse raw-reply)
      (when r
        (format t "~&extra stuff? PATH: ~a~%~4t~a~%~4t~a~2%"
                path
                payload
                r)) 
      payload)))

(defmacro with-hub ((symbol port subscription) &body body)
  (alexandria:once-only (port subscription)
    `(pzmq:with-socket ,symbol (:sub :subscribe ,subscription)
       (pzmq:connect ,symbol ,port)
       ,@body)))

(defvar *server*)
(defvar *ws-server*)

(defvar listeners ())

(defun update-listeners (new-listeners)
  (setf listeners new-listeners))

(defun main (&optional (wait t) (subscription "/") (port "tcp://127.0.0.1:5557"))
  (clack:clackup (lambda (env)
                   (let ((server (websocket-driver:make-server env)))
                     (event-emitter:on :open server
                                       (lambda ()
                                         (push server listeners)))
                     (event-emitter:on :close server
                                       (lambda (&key code reason)
                                         (declare (ignore code reason))
                                         (setf listeners
                                               (remove server listeners))
                                         (update-listeners listeners)))
                     (lambda (responder)
                       (declare (ignore responder))
                       (websocket-driver:start-connection server))))
                 :port 5009)
  (if wait
      (pzmq-process port subscription)
      (bt:make-thread (lambda () (pzmq-process port subscription))
                      :name "websocket-handler")))

(defun handle (reply)
  (mapcar (serapeum:op (handler-case
                           (websocket-driver:send _1
                                                  (With-output-to-string (s)
                                                    (yason:encode reply
                                                                  s)))
                         (error ()
                           (format t "~&failed to push ~s to ~s~%" reply _1))))
          listeners))

(defun pzmq-process (port subscription)
  (with-hub (pzmq port subscription)
    (loop for raw-reply = (pzmq:recv-string pzmq :encoding :utf-8)
          for (_ reply) = (parse raw-reply)
          do (format t "~&REPLY: ~s~%" reply)
          do (handle reply))))