1abfbd92 |
(defpackage :fwoar.websocket-sink
(:use :cl )
(:export ))
(in-package :fwoar.websocket-sink)
|
3e71a0e1 |
(defparameter *obj-style*
:hash-table)
(defparameter *event-table*
(make-array 100
:adjustable t
:fill-pointer 0))
(defparameter *event-queue*
(lparallel.queue:make-queue))
|
1abfbd92 |
(defun parse-fa-body (ht)
(prog1 ht
(ecase *obj-style*
|
3e71a0e1 |
(:hash-table
(let ((body (gethash "body" ht)))
(setf (gethash "body" ht)
(or (when (not (equal body ""))
(handler-case
(yason:parse body
:object-as *obj-style*)
(error ())))
body))))
#+(or)
(:alist (pushnew (assoc "body" ht)
(yason:parse (gethash "body" ht)
:object-as *obj-style*))))))
|
1abfbd92 |
(defparameter *handlers* (make-hash-table :test 'equal))
(defgeneric handle-message (path message)
(:method :around ((path string) message)
(alexandria:if-let ((handler (gethash path *handlers*)))
(handle-message handler message)
(call-next-method)))
(:method (path message)
message))
(defun parse (string)
(destructuring-bind (path result)
(coerce (fwoar.string-utils:split #\space string
:count 1)
'list)
(format t "~&(PATH ~a) (RESULT ~a)~%"
path result)
(list path
(handle-message path
(parse-fa-body
(yason:parse result
:object-as *obj-style*))))))
(defun get-and-process-one-message (socket)
(let* ((raw-reply (pzmq:recv-string socket :encoding :utf-8)))
(format t "~&~a~%" raw-reply)
(destructuring-bind (path payload . r) (parse raw-reply)
(when r
(format t "~&extra stuff? PATH: ~a~%~4t~a~%~4t~a~2%"
path
payload
r))
payload)))
(defmacro with-hub ((symbol port subscription) &body body)
(alexandria:once-only (port subscription)
`(pzmq:with-socket ,symbol (:sub :subscribe ,subscription)
(pzmq:connect ,symbol ,port)
,@body)))
(defvar *server*)
(defvar *ws-server*)
(defvar listeners ())
|
5225e932 |
|
7819c3f2 |
(defun update-listeners (new-listeners)
(setf listeners new-listeners))
|
1abfbd92 |
(defun main (&optional (wait t) (subscription "/") (port "tcp://127.0.0.1:5557"))
(clack:clackup (lambda (env)
(let ((server (websocket-driver:make-server env)))
(event-emitter:on :open server
(lambda ()
(push server listeners)))
(event-emitter:on :close server
(lambda (&key code reason)
(declare (ignore code reason))
(setf listeners
|
7819c3f2 |
(remove server listeners))
(update-listeners listeners)))
|
1abfbd92 |
(lambda (responder)
(declare (ignore responder))
(websocket-driver:start-connection server))))
:port 5009)
(if wait
(pzmq-process port subscription)
(bt:make-thread (lambda () (pzmq-process port subscription))
:name "websocket-handler")))
(defun handle (reply)
(mapcar (serapeum:op (handler-case
(websocket-driver:send _1
(With-output-to-string (s)
(yason:encode reply
s)))
(error ()
(format t "~&failed to push ~s to ~s~%" reply _1))))
listeners))
(defun pzmq-process (port subscription)
(with-hub (pzmq port subscription)
(loop for raw-reply = (pzmq:recv-string pzmq :encoding :utf-8)
for (_ reply) = (parse raw-reply)
do (format t "~&REPLY: ~s~%" reply)
do (handle reply))))
|