git.fiddlerwoaroof.com
Raw Blame History
(in-package :slacker)

(defvar *api-token*) 

(defun make-client (event-pump)
  (flet ((get-ws-url (slack-response)
           (gethash "url" slack-response)))
    (fw.lu:let-each (:be slack-data)
      (format nil "https://slack.com/api/rtm.connect?token=~a" *api-token*)
      (drakma:http-request slack-data :want-stream t)
      (yason:parse slack-data)

      (let* ((url (get-ws-url slack-data))
             (client (wsd:make-client url)))
        (setf (ws-client event-pump)
              client)
        (wsd:on :message client
                (lambda (message)
                  (chanl:send (result-queue event-pump)
                              message)))
        (wsd:on :close client
                (lambda (&key code reason)
                  (if (running event-pump)
                      (progn
                        (format t "~&Closed with code: ~a for reason ~a. Restarting in 2 seconds~%"
                                code reason)
                        (sleep 2)
                        (format t "~& ... restarting~%")
                        (make-client event-pump))
                      (format t "~&exiting~%"))))
        (wsd:start-connection client)))))

(defgeneric send-message (client type &key)
  (:documentation "Send a slack message")
  (:method :around ((client event-pump) _type &key)
    (declare (ignorable client _type))
    (let ((result (call-next-method)))
      (values result
              (wsd:send (ws-client client)
                        (with-output-to-string (s)
                          (yason:encode result s)))))))

(defmethod send-message ((client event-pump) (type (eql :ping)) &key data)
  (let* ((id (incf (latest-id client)))
         (message `(("id" . ,id)
                    ("type" . "ping"))))

    (when data
      (aconsf message "data" data))

    (format t "~&pinging with id: ~a at time: ~a~%" id (local-time:now))
    (incf (waiting-pings client))
    (alist-hash-table message
                      :test 'equal)))

(defmethod send-message ((client event-pump) (type (eql :message)) &key channel data thread)
  (let* ((id (incf (latest-id client)))
         (message `(("id" . ,id)
                    ("type" . "message")
                    ("channel" . ,channel)
                    ("text" . ,data)
                    ,@(unsplice
                       (when thread
                         `("thread_ts" . ,thread))))))
    (alist-hash-table message
                      :test 'equal)))


(define-condition connection-lost (error)
  ())

(defclass slack-client ()
  ((%commands :accessor commands :initform (make-hash-table))
   (%modules :accessor modules :initform '())))

(defgeneric start (event-pump client &key queue-pair))

(defgeneric bind (what where))
(defmethod bind ((queue-pair queue-pair) (event-pump event-pump))
  (setf (queue-pair event-pump) queue-pair))

(defun handle-work-queue (event-pump)
  (multiple-value-bind (message message-p)
      (chanl:recv (work-queue event-pump)
                  :blockp nil)
    (when message-p
      (funcall message
               event-pump))))

(defun send-pings (event-pump client)
  "Ping slack for connectivity, error if we have too many waiting pings."
  ;; Eventually drop client
  (declare (ignorable client))
  (if (> 100 (waiting-pings event-pump))
      (send-message event-pump :ping)
      (error 'connection-lost)))

(defun network-loop (event-pump modules)
  (declare (optimize (debug 3)))
  (loop for (module . args) in modules
        while (running event-pump) do
          (start-module event-pump
                        (apply #'attach-module
                               event-pump module args)))

  (fwoar.event-loop:run-loop event-pump))

(defun start-client (implementation &key (queue-pair (make-instance 'queue-pair)) modules impl-args)
  (let* ((event-pump (apply #'make-instance implementation
			    :queue-pair queue-pair
			    :client-factory (op (make-client _))
			    impl-args)))
    (setf (running event-pump) t)
    (values event-pump
            (bt:make-thread (lambda ()
                              (network-loop event-pump
                                            modules))
                            :name "Event Server"
                            :initial-bindings `((*api-token* . ,*api-token*))))))

(defmethod get-event-nonblocking ((event-pump event-pump) &key (object-as :hash-table))
  (multiple-value-bind (message message-p) (chanl:recv (result-queue event-pump) :blockp nil)
    (values (when message-p
              (yason:parse message :object-as object-as))
            message-p)))

(defmethod get-event ((queue-pair queue-pair) &key (object-as :hash-table))
  (multiple-value-bind (message message-p) (chanl:recv (result-queue queue-pair))
    (values (when message-p
              (yason:parse message :object-as object-as))
            message-p)))

(defparameter *ignored-messages* '(:pong :user_typing :desktop_notification))

(defgeneric handle-message (type event-pump ts channel message)
  (:method :around (type event-pump ts channel message)
    (flet ((report-continue (stream)
             (format stream "Skip message of type ~s" type))
           (report-retry (stream)
             (format stream "Retry message of type ~s" type)))
      (tagbody start
         (restart-case (call-next-method)
           (continue () :report report-continue)
           (skip-message () :report report-continue)
           (retry () :report report-retry
             (go start))))))
  (:method :before (type event-pump ts channel message)
    (declare (ignore event-pump ts channel))
    (unless (member type *ignored-messages*)
      (fresh-line)
      (yason:encode message)
      (terpri)))
  (:method (type (event-pump event-pump) ts channel message)
    (declare (ignore type event-pump ts channel message))
    nil)
  (:method ((type (eql :pong)) (event-pump event-pump) ts channel message)
    (declare (ignore ts type channel))
    (format t "~&Was waiting on ~a pings," (waiting-pings event-pump))
    (decf (waiting-pings event-pump))
    (format t "after pong received for ~a, now waiting on ~a~%"
            (gethash "reply_to" message)
            (waiting-pings event-pump))))

(defmethod handle-message ((type (eql :message)) (event-pump event-pump) ts channel message)
  (when-let* ((msg (gethash "text" message))
              (parsed-message (tokens msg))) 
    (when (eql #\; (elt msg 0))
      (handle-command event-pump message channel
                      (plump:decode-entities
                       (car parsed-message))
                      (cdr parsed-message))))) 

(defun event-loop (event-pump)
  (loop
    with message with message-p
    while (running event-pump) do
      (multiple-value-setq (message message-p)
        (get-event (queue-pair event-pump)))
    when message-p do
      (let ((type (gethash "type" message))
            (reply (gethash "reply_to" message))
            (ts (gethash "ts" message))
            (channel (gethash "channel" message)))
        (cond (type
               (handle-message (make-keyword (string-upcase type))
                               event-pump ts channel message))
              (reply )))
    do (sleep 0.01)))

(defun coordinate-threads (&optional queue-pair (implementation 'event-pump) args)
  (let* ((event-pump (start-client implementation
                                   :queue-pair queue-pair
                                   :modules '((hhgbot-augmented-assistant::js-executor))
                                   :impl-args args)))
    (bt:make-thread (lambda ()
                      (loop until (running event-pump)
                            finally
                               (unwind-protect (event-loop event-pump)
                                 (stop-slacker event-pump))))
                    :name "Event Loop") 
    event-pump))

(defvar *command-table* (make-hash-table :test 'equal))

(defun quote-output (str)
  (with-output-to-string (s)
    (format s "```~%~a```~%" str)))

(defmacro in-wq ((client-sym) &body body)
  `(let ((promise (blackbird-base:make-promise)))
     (values promise
             (chanl:send (work-queue ,client-sym)
                         (lambda (,client-sym)
                           (declare (ignorable ,client-sym))
                           (let ((result (progn ,@body)))
                             (blackbird-base:finish promise result)
                             result))))))

(defun queue-message (event-pump channel message &key quote thread)
  (let ((message (if quote (quote-output message)
                     message)))
    (in-wq (event-pump)
      (send-message event-pump :message
                    :channel channel
                    :data message
                    :thread thread))))

(define-condition command-error () ())
(define-condition unsupported-args (command-error) ())

(defgeneric add-command ())
(defmacro define-command (name (event-pump ts channel &rest args) &body body)
  (let* ((command-sym (intern (string-upcase name)))
         (has-rest (position '&rest args))
         (rest-sym (gensym "rest"))
         (args (if has-rest
                   args
                   (append args `(&rest ,rest-sym)))))
    `(progn
       (defun ,command-sym (,event-pump ,ts ,channel ,@args)
         (declare (ignorable ,event-pump ,ts ,@(when (not has-rest) `(,rest-sym))))
         ,@body)
       (setf (gethash ,name *command-table*) ',command-sym))))

(defun safe-apply (func event-pump message channel args)
  (with-simple-restart (continue "Skip command")
    (apply func event-pump message channel args)))

(defun handle-command (event-pump message channel command args)
  (declare (ignorable args))
  (let* ((command (subseq command 1))
         (handler (gethash command *command-table*)))
    (print (hash-table-alist *command-table*))
    (terpri)
    (print command)
    (if handler
        (safe-apply handler event-pump message channel args)
        (queue-message event-pump channel
                       (concat "I don't understand the command `" command "`.")
                       :thread (ensure-thread message)))))

(defun edit-message (ts channel text)
  (babel:octets-to-string
   (drakma:http-request "https://slack.com/api/chat.update"
                        :method :post
                        :content (concat "token=" *api-token*
                                         "&channel=" channel
                                         "&ts=" ts
                                         "&text=" text))))

(defmacro with-output-to-message ((stream event-pump channel &key quote thread) &body body)
  (once-only (event-pump channel quote)
    `(queue-message ,event-pump ,channel
                    (with-output-to-string (,stream)
                      ,@body)
                    :quote ,quote
                    :thread ,thread)))

(defmacro with-thread-info ((ts thread-ts in-thread is-reply) message &body body)
  (once-only (message)
    `(let* ((,ts (gethash "ts" ,message))
            (,thread-ts (gethash "thread_ts" ,message))
            (,in-thread (not (null ,thread-ts)))
            (,is-reply (and ,in-thread (string/= ,ts ,thread-ts))))
       ,@body)))

(defun ensure-thread (message)
  "Continue thread or else start a new one"
  (with-thread-info (ts thread-ts in-thread is-reply) message
    (declare (ignore is-reply))
    (if in-thread thread-ts ts)))

(defun keep-in-thread (message)
  "Continue thread or continue in main thread"
  (with-thread-info (ts thread-ts in-thread is-reply) message
    (declare (ignore is-reply))
    (if in-thread thread-ts nil)))

(define-command "help" (event-pump message channel)
  (let ((*print-right-margin* (max (or *print-right-margin* 0)
                                   80)))
    (with-thread-info (ts thread-ts in-thread is-reply) message
      (format t "~&THREAD INFO: (ts ~s) (thread-ts ~s) (in-thread ~s) (is-reply ~s)~%" ts thread-ts in-thread is-reply)
      (with-output-to-message (s event-pump channel :thread (ensure-thread message))
        (format s "I understand these commands:~%~{`~a`~^ ~}"
                (hash-table-keys *command-table*))
        :quote t))))


(defvar *id* 0)
(defun make-message (data channel)
  (incf *id*)
  (with-output-to-string (s)
    (yason:encode
     (alist-hash-table
      `(("id" . ,*id*)
        ("type" . "message")
        ("channel" . ,channel)
        ("text" . ,data)))
     s)))