(defpackage :fwoar.websocket-sink (:use :cl ) (:export )) (in-package :fwoar.websocket-sink) (defparameter *obj-style* :hash-table) (defparameter *event-table* (make-array 100 :adjustable t :fill-pointer 0)) (defparameter *event-queue* (lparallel.queue:make-queue)) (defun parse-fa-body (ht) (prog1 ht (ecase *obj-style* (:hash-table (let ((body (gethash "body" ht))) (setf (gethash "body" ht) (or (when (not (equal body "")) (handler-case (yason:parse body :object-as *obj-style*) (error ()))) body)))) #+(or) (:alist (pushnew (assoc "body" ht) (yason:parse (gethash "body" ht) :object-as *obj-style*)))))) (defparameter *handlers* (make-hash-table :test 'equal)) (defgeneric handle-message (path message) (:method :around ((path string) message) (alexandria:if-let ((handler (gethash path *handlers*))) (handle-message handler message) (call-next-method))) (:method (path message) message)) (defun parse (string) (destructuring-bind (path result) (coerce (fwoar.string-utils:split #\space string :count 1) 'list) (format t "~&(PATH ~a) (RESULT ~a)~%" path result) (list path (handle-message path (parse-fa-body (yason:parse result :object-as *obj-style*)))))) (defun get-and-process-one-message (socket) (let* ((raw-reply (pzmq:recv-string socket :encoding :utf-8))) (format t "~&~a~%" raw-reply) (destructuring-bind (path payload . r) (parse raw-reply) (when r (format t "~&extra stuff? PATH: ~a~%~4t~a~%~4t~a~2%" path payload r)) payload))) (defmacro with-hub ((symbol port subscription) &body body) (alexandria:once-only (port subscription) `(pzmq:with-socket ,symbol (:sub :subscribe ,subscription) (pzmq:connect ,symbol ,port) ,@body))) (defvar *server*) (defvar *ws-server*) (defvar listeners ()) (defun update-listeners (new-listeners) (setf listeners new-listeners)) (defun main (&optional (wait t) (subscription "/") (port "tcp://127.0.0.1:5557")) (clack:clackup (lambda (env) (let ((server (websocket-driver:make-server env))) (event-emitter:on :open server (lambda () (push server listeners))) (event-emitter:on :close server (lambda (&key code reason) (declare (ignore code reason)) (setf listeners (remove server listeners)) (update-listeners listeners))) (lambda (responder) (declare (ignore responder)) (websocket-driver:start-connection server)))) :port 5009) (if wait (pzmq-process port subscription) (bt:make-thread (lambda () (pzmq-process port subscription)) :name "websocket-handler"))) (defun handle (reply) (mapcar (serapeum:op (handler-case (websocket-driver:send _1 (With-output-to-string (s) (yason:encode reply s))) (error () (format t "~&failed to push ~s to ~s~%" reply _1)))) listeners)) (defun pzmq-process (port subscription) (with-hub (pzmq port subscription) (loop for raw-reply = (pzmq:recv-string pzmq :encoding :utf-8) for (_ reply) = (parse raw-reply) do (format t "~&REPLY: ~s~%" reply) do (handle reply))))