be6f8571 |
(in-package :slacker)
(defvar *api-token*)
(defun make-client (event-pump)
(flet ((get-ws-url (slack-response)
|
5c15279a |
(gethash "url" slack-response)))
|
be6f8571 |
(fw.lu:let-each (:be slack-data)
|
ad204acd |
(format nil "https://slack.com/api/rtm.connect?token=~a" *api-token*)
|
be6f8571 |
(drakma:http-request slack-data :want-stream t)
(yason:parse slack-data)
(let* ((url (get-ws-url slack-data))
|
5c15279a |
(client (wsd:make-client url)))
(setf (ws-client event-pump)
client)
(wsd:on :message client
(lambda (message)
(chanl:send (result-queue event-pump)
message)))
|
aff414d8 |
(wsd:on :close client
(lambda (&key code reason)
|
d3239c16 |
(if (running event-pump)
(progn
(format t "~&Closed with code: ~a for reason ~a. Restarting in 2 seconds~%"
code reason)
(sleep 2)
(format t "~& ... restarting~%")
(make-client event-pump))
(format t "~&exiting~%"))))
|
aff414d8 |
(wsd:start-connection client)))))
|
be6f8571 |
(defgeneric send-message (client type &key)
(:documentation "Send a slack message")
(:method :around ((client event-pump) _type &key)
(declare (ignorable client _type))
(let ((result (call-next-method)))
|
5c15279a |
(values result
(wsd:send (ws-client client)
(with-output-to-string (s)
(yason:encode result s)))))))
|
be6f8571 |
|
60e29634 |
(defmethod send-message ((client event-pump) (type (eql :ping)) &key data)
(let* ((id (incf (latest-id client)))
(message `(("id" . ,id)
("type" . "ping"))))
(when data
(aconsf message "data" data))
(format t "~&pinging with id: ~a at time: ~a~%" id (local-time:now))
(incf (waiting-pings client))
(alist-hash-table message
:test 'equal)))
(defmethod send-message ((client event-pump) (type (eql :message)) &key channel data thread)
(let* ((id (incf (latest-id client)))
|
5c15279a |
(message `(("id" . ,id)
("type" . "message")
("channel" . ,channel)
("text" . ,data)
,@(unsplice
(when thread
`("thread_ts" . ,thread))))))
|
60e29634 |
(alist-hash-table message
|
5c15279a |
:test 'equal)))
|
60e29634 |
|
be6f8571 |
|
60e29634 |
(define-condition connection-lost (error)
|
1543ce00 |
())
|
60e29634 |
(defclass slack-client ()
((%commands :accessor commands :initform (make-hash-table))
(%modules :accessor modules :initform '())))
(defgeneric start (event-pump client &key queue-pair))
(defgeneric bind (what where))
(defmethod bind ((queue-pair queue-pair) (event-pump event-pump))
(setf (queue-pair event-pump) queue-pair))
(defun handle-work-queue (event-pump)
(multiple-value-bind (message message-p)
(chanl:recv (work-queue event-pump)
|
5c15279a |
:blockp nil)
|
60e29634 |
(when message-p
(funcall message
|
5c15279a |
event-pump))))
|
60e29634 |
(defun send-pings (event-pump client)
|
811dd7ec |
"Ping slack for connectivity, error if we have too many waiting pings."
|
60e29634 |
;; Eventually drop client
(declare (ignorable client))
(if (> 100 (waiting-pings event-pump))
(send-message event-pump :ping)
(error 'connection-lost)))
|
eb60be80 |
(defun network-loop (event-pump modules)
|
60e29634 |
(declare (optimize (debug 3)))
(loop for (module . args) in modules
|
9fe08141 |
while (running event-pump) do
(start-module event-pump
(apply #'attach-module
event-pump module args)))
|
801447ba |
(fwoar.event-loop:run-loop event-pump))
|
60e29634 |
|
e98c1493 |
(defun start-client (implementation &key (queue-pair (make-instance 'queue-pair)) modules impl-args)
(let* ((event-pump (apply #'make-instance implementation
:queue-pair queue-pair
:client-factory (op (make-client _))
impl-args)))
|
9fe08141 |
(setf (running event-pump) t)
|
148e6242 |
(values event-pump
|
5c15279a |
(bt:make-thread (lambda ()
|
811dd7ec |
(network-loop event-pump
modules))
|
5c15279a |
:name "Event Server"
:initial-bindings `((*api-token* . ,*api-token*))))))
|
be6f8571 |
(defmethod get-event-nonblocking ((event-pump event-pump) &key (object-as :hash-table))
(multiple-value-bind (message message-p) (chanl:recv (result-queue event-pump) :blockp nil)
(values (when message-p
|
5c15279a |
(yason:parse message :object-as object-as))
message-p)))
|
be6f8571 |
|
60e29634 |
(defmethod get-event ((queue-pair queue-pair) &key (object-as :hash-table))
(multiple-value-bind (message message-p) (chanl:recv (result-queue queue-pair))
|
be6f8571 |
(values (when message-p
|
5c15279a |
(yason:parse message :object-as object-as))
message-p)))
|
be6f8571 |
|
9fe08141 |
(defparameter *ignored-messages* '(:pong :user_typing :desktop_notification))
|
111b7353 |
|
9cf1fdb1 |
(defgeneric handle-message (type event-pump ts channel message)
|
111b7353 |
(:method :around (type event-pump ts channel message)
(flet ((report-continue (stream)
(format stream "Skip message of type ~s" type))
(report-retry (stream)
(format stream "Retry message of type ~s" type)))
(tagbody start
(restart-case (call-next-method)
(continue () :report report-continue)
(skip-message () :report report-continue)
(retry () :report report-retry
(go start))))))
|
9cf1fdb1 |
(:method :before (type event-pump ts channel message)
(declare (ignore event-pump ts channel))
(unless (member type *ignored-messages*)
(fresh-line)
(yason:encode message)
(terpri)))
(:method (type (event-pump event-pump) ts channel message)
(declare (ignore type event-pump ts channel message))
nil)
(:method ((type (eql :pong)) (event-pump event-pump) ts channel message)
(declare (ignore ts type channel))
|
60e29634 |
(format t "~&Was waiting on ~a pings," (waiting-pings event-pump))
|
9cf1fdb1 |
(decf (waiting-pings event-pump))
|
60e29634 |
(format t "after pong received for ~a, now waiting on ~a~%"
|
5c15279a |
(gethash "reply_to" message)
(waiting-pings event-pump))))
|
60e29634 |
(defmethod handle-message ((type (eql :message)) (event-pump event-pump) ts channel message)
(when-let* ((msg (gethash "text" message))
|
5c15279a |
(parsed-message (tokens msg)))
|
60e29634 |
(when (eql #\; (elt msg 0))
(handle-command event-pump message channel
|
811dd7ec |
(plump:decode-entities
(car parsed-message))
|
5c15279a |
(cdr parsed-message)))))
|
60e29634 |
|
be6f8571 |
(defun event-loop (event-pump)
|
9fe08141 |
(loop
with message with message-p
while (running event-pump) do
(multiple-value-setq (message message-p)
(get-event (queue-pair event-pump)))
when message-p do
(let ((type (gethash "type" message))
(reply (gethash "reply_to" message))
(ts (gethash "ts" message))
(channel (gethash "channel" message)))
(cond (type
(handle-message (make-keyword (string-upcase type))
event-pump ts channel message))
(reply )))
do (sleep 0.01)))
|
be6f8571 |
|
e98c1493 |
(defun coordinate-threads (&optional queue-pair (implementation 'event-pump) args)
|
111b7353 |
(let* ((event-pump (start-client implementation
:queue-pair queue-pair
|
e98c1493 |
:modules '((hhgbot-augmented-assistant::js-executor))
|
00d255bf |
:impl-args args)))
|
801447ba |
(bt:make-thread (lambda ()
(loop until (running event-pump)
finally
(unwind-protect (event-loop event-pump)
(stop-slacker event-pump))))
|
5c15279a |
:name "Event Loop")
|
be6f8571 |
event-pump))
|
f2c4664e |
(defvar *command-table* (make-hash-table :test 'equal))
|
be6f8571 |
(defun quote-output (str)
(with-output-to-string (s)
(format s "```~%~a```~%" str)))
(defmacro in-wq ((client-sym) &body body)
`(let ((promise (blackbird-base:make-promise)))
(values promise
|
5c15279a |
(chanl:send (work-queue ,client-sym)
(lambda (,client-sym)
(declare (ignorable ,client-sym))
(let ((result (progn ,@body)))
(blackbird-base:finish promise result)
result))))))
|
be6f8571 |
|
60e29634 |
(defun queue-message (event-pump channel message &key quote thread)
|
be6f8571 |
(let ((message (if quote (quote-output message)
|
5c15279a |
message)))
|
be6f8571 |
(in-wq (event-pump)
(send-message event-pump :message
|
5c15279a |
:channel channel
:data message
:thread thread))))
|
be6f8571 |
(define-condition command-error () ())
(define-condition unsupported-args (command-error) ())
|
60e29634 |
(defgeneric add-command ())
|
be6f8571 |
(defmacro define-command (name (event-pump ts channel &rest args) &body body)
(let* ((command-sym (intern (string-upcase name)))
|
5c15279a |
(has-rest (position '&rest args))
(rest-sym (gensym "rest"))
(args (if has-rest
args
(append args `(&rest ,rest-sym)))))
|
be6f8571 |
`(progn
(defun ,command-sym (,event-pump ,ts ,channel ,@args)
|
5c15279a |
(declare (ignorable ,event-pump ,ts ,@(when (not has-rest) `(,rest-sym))))
,@body)
|
60e29634 |
(setf (gethash ,name *command-table*) ',command-sym))))
|
be6f8571 |
|
60e29634 |
(defun safe-apply (func event-pump message channel args)
|
5ea361e0 |
(with-simple-restart (continue "Skip command")
|
60e29634 |
(apply func event-pump message channel args)))
|
5ea361e0 |
|
60e29634 |
(defun handle-command (event-pump message channel command args)
|
be6f8571 |
(declare (ignorable args))
|
811dd7ec |
(let* ((command (subseq command 1))
|
5c15279a |
(handler (gethash command *command-table*)))
|
be6f8571 |
(print (hash-table-alist *command-table*))
(terpri)
(print command)
(if handler
|
5c15279a |
(safe-apply handler event-pump message channel args)
|
811dd7ec |
(queue-message event-pump channel
(concat "I don't understand the command `" command "`.")
:thread (ensure-thread message)))))
|
5ea361e0 |
|
be6f8571 |
(defun edit-message (ts channel text)
(babel:octets-to-string
(drakma:http-request "https://slack.com/api/chat.update"
|
5c15279a |
:method :post
:content (concat "token=" *api-token*
"&channel=" channel
"&ts=" ts
"&text=" text))))
|
be6f8571 |
|
60e29634 |
(defmacro with-output-to-message ((stream event-pump channel &key quote thread) &body body)
|
be6f8571 |
(once-only (event-pump channel quote)
`(queue-message ,event-pump ,channel
|
5c15279a |
(with-output-to-string (,stream)
,@body)
:quote ,quote
:thread ,thread)))
|
60e29634 |
(defmacro with-thread-info ((ts thread-ts in-thread is-reply) message &body body)
(once-only (message)
`(let* ((,ts (gethash "ts" ,message))
|
5c15279a |
(,thread-ts (gethash "thread_ts" ,message))
(,in-thread (not (null ,thread-ts)))
(,is-reply (and ,in-thread (string/= ,ts ,thread-ts))))
|
60e29634 |
,@body)))
(defun ensure-thread (message)
"Continue thread or else start a new one"
(with-thread-info (ts thread-ts in-thread is-reply) message
(declare (ignore is-reply))
(if in-thread thread-ts ts)))
(defun keep-in-thread (message)
"Continue thread or continue in main thread"
(with-thread-info (ts thread-ts in-thread is-reply) message
(declare (ignore is-reply))
(if in-thread thread-ts nil)))
(define-command "help" (event-pump message channel)
|
be6f8571 |
(let ((*print-right-margin* (max (or *print-right-margin* 0)
|
5c15279a |
80)))
|
60e29634 |
(with-thread-info (ts thread-ts in-thread is-reply) message
(format t "~&THREAD INFO: (ts ~s) (thread-ts ~s) (in-thread ~s) (is-reply ~s)~%" ts thread-ts in-thread is-reply)
(with-output-to-message (s event-pump channel :thread (ensure-thread message))
|
5c15279a |
(format s "I understand these commands:~%~{`~a`~^ ~}"
(hash-table-keys *command-table*))
:quote t))))
|
be6f8571 |
|
f2c4664e |
(defvar *id* 0)
|
be6f8571 |
(defun make-message (data channel)
(incf *id*)
(with-output-to-string (s)
(yason:encode
(alist-hash-table
`(("id" . ,*id*)
|
5c15279a |
("type" . "message")
("channel" . ,channel)
("text" . ,data)))
|
be6f8571 |
s)))
|