Skip to content

Commit

Permalink
fixup! fixup! fixup! fixup! fixup! fixup! websockets init
Browse files Browse the repository at this point in the history
  • Loading branch information
mpenet committed Oct 9, 2023
1 parent ff32f3d commit edb5365
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 15 deletions.
55 changes: 48 additions & 7 deletions src/s_exp/mina.clj
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,55 @@
;; ;; (prn :aasdf ((:headers _) "accept"))
;; ;; (prn (:headers _))
;; r))
;; (def s (start!
;; #'h
;; {:host "0.0.0.0" :port 8080
;; :write-queue-length 10240
;; :connection-options {:socket-send-buffer-size 1024}}))

;; (stop! s)
;; (require 's-exp.mina.websocket)
;; (stop! s)
;; (do
;; (stop! s)
;; (def s (start!
;; {:host "0.0.0.0" :port 8080
;; :websocket-endpoints {"/foo"
;; {:subprotocols ["chat"]
;; :error (fn [session e]
;; (prn :err e))
;; :ping (fn [session data]
;; (prn :ping))
;; :pong (fn [session data]
;; (prn :pong))
;; :open (fn [session]
;; ;; (prn :open)
;; (prn :open)
;; (prn session)
;; ;; (s-exp.mina.websocket/send! session "open" true)
;; ;; (prn (.subProtocol session))
;; ;; ;; (prn session)
;; ;; (prn :sent)
;; ;; (s-exp.mina.websocket/close! session 0 "asdf")
;; )
;; :close (fn [session status data]
;; (prn :close status))
;; ;; :http-upgrade
;; ;; (fn [p h]
;; ;; (prn :http-upgrade p h)
;; ;; (java.util.Optional/of h))
;; :message (fn [session data last?]
;; (prn :message data last?)
;; (s-exp.mina.websocket/send! session data true))}}
;; :write-queue-length 10240
;; :connection-options {:socket-send-buffer-size 1024}})))
;; (require '[gniazdo.core :as ws])

;; https://api.github.com/repos/mpenet/mina/commits/main?per_page=1
;; (def socket
;; (ws/connect
;; "ws://localhost:8080/foo"
;; :on-receive #(prn 'received %)
;; :subprotocols ["chat"]))

;; (ws/send-msg socket "hello")

;; (ws/send-msg socket (.getBytes "hello"))

;; (ws/close socket)

;; https://api.github.com/repos/mpenet/mina/commits/main?per_page=1

78 changes: 70 additions & 8 deletions src/s_exp/mina/websocket/listener.clj
Original file line number Diff line number Diff line change
@@ -1,26 +1,82 @@
(ns s-exp.mina.websocket.listener
(:import (io.helidon.common.buffers BufferData)
(io.helidon.http Headers)
(io.helidon.http Headers HeaderNames HeaderName Header HeaderValues)
(io.helidon.http HttpPrologue)
(io.helidon.http WritableHeaders)
(io.helidon.webserver.websocket WsUpgrader)
(io.helidon.websocket WsListener WsSession)
(io.helidon.websocket WsListener WsSession WsUpgradeException)
(java.util Optional)))

(set! *warn-on-reflection* true)

;; Client must send Sec-WebSocket-Version and Sec-WebSocket-Key.
;; Server must confirm the protocol by returning Sec-WebSocket-Accept.
;; Client may send a list of application subprotocols via Sec-WebSocket-Protocol.
;; Server must select one of the advertised subprotocols and return it via
;; Sec-WebSocket-Protocol. If the server does not support any, then the
;; connection is aborted.
;; Client may send a list of protocol extensions in Sec-WebSocket-Extensions.
;; Server may confirm one or more selected extensions via
;; Sec-WebSocket-Extensions. If no extensions are provided, then the connection
;; proceeds without them.
;; Finally, once the preceding handshake is complete, and if the handshake is
;; successful, the connection can now be used as a two-way communication channel
;; for exchanging WebSocket messages. From here on, there is no other explicit
;; HTTP communication between the client and server, and the WebSocket protocol
;; takes over.
(defn- header-negotiate
[^HeaderName k allowed-values ^Headers headers ^WritableHeaders response-headers]
(when (seq allowed-values)
(if-let [selected-value
(reduce (fn [_ x]
(when (contains? allowed-values x)
(reduced x)))
nil
(.allValues (.get headers k)
true))]
(.set response-headers (HeaderValues/create k ^String selected-value))
(throw (WsUpgradeException. (format "Failed negotiation for %s"
(.defaultCase k)))))))

(defn negotiate-subprotocols!
[allowed-sub-protocols ^Headers headers ^WritableHeaders response-headers]
(header-negotiate WsUpgrader/PROTOCOL
allowed-sub-protocols
^Headers headers
^WritableHeaders response-headers))

(defn negotiate-extensions!
[allowed-extensions ^Headers headers ^WritableHeaders response-headers]
(header-negotiate WsUpgrader/EXTENSIONS
allowed-extensions
^Headers headers
^WritableHeaders response-headers))

(defn http-upgrade*
[^HttpPrologue http-prologue ^Headers request-headers
allowed-protocols allowed-extensions]
(let [response-headers (WritableHeaders/create)]
(negotiate-subprotocols! allowed-protocols
request-headers
response-headers)
(negotiate-extensions! allowed-extensions
request-headers
response-headers)

(if (pos? (.size response-headers))
(Optional/of response-headers)
(Optional/of request-headers))))

(defn make-listener
^WsListener [{:as _listener
:keys [message ping pong close error open http-upgrade]
:keys [message ping pong close error open http-upgrade
subprotocols extensions]
:or {message (constantly nil)
ping (constantly nil)
pong (constantly nil)
close (constantly nil)
error (constantly nil)
open (constantly nil)
http-upgrade (fn [^HttpPrologue _http-prologue
^Headers headers]
(Optional/of headers))}}]
open (constantly nil)}}]

(reify WsListener
(^void onMessage [_ ^WsSession session ^String data ^boolean last?]
Expand All @@ -38,4 +94,10 @@
(^void onOpen [_ ^WsSession session]
(open session))
(^Optional onHttpUpgrade [_ ^HttpPrologue http-prologue ^Headers headers]
(http-upgrade http-prologue headers))))
(if http-upgrade
(http-upgrade http-prologue headers)
(http-upgrade* http-prologue headers
(set subprotocols)
(set extensions))))))


0 comments on commit edb5365

Please sign in to comment.