diff --git a/src/s_exp/mina.clj b/src/s_exp/mina.clj index 5246205..8ffd8e3 100644 --- a/src/s_exp/mina.clj +++ b/src/s_exp/mina.clj @@ -51,14 +51,55 @@ ;; ;; (prn :aasdf ((:headers _) "accept")) ;; ;; (prn (:headers _)) ;; r)) -;; (def s (start! -;; #'h -;; {:host "0.0.0.0" :port 8080 -;; :write-queue-length 10240 -;; :connection-options {:socket-send-buffer-size 1024}})) - ;; (stop! s) +;; (require 's-exp.mina.websocket) +;; (stop! s) +;; (do +;; (stop! s) +;; (def s (start! +;; {:host "0.0.0.0" :port 8080 +;; :websocket-endpoints {"/foo" +;; {:subprotocols ["chat"] +;; :error (fn [session e] +;; (prn :err e)) +;; :ping (fn [session data] +;; (prn :ping)) +;; :pong (fn [session data] +;; (prn :pong)) +;; :open (fn [session] +;; ;; (prn :open) +;; (prn :open) +;; (prn session) +;; ;; (s-exp.mina.websocket/send! session "open" true) +;; ;; (prn (.subProtocol session)) +;; ;; ;; (prn session) +;; ;; (prn :sent) +;; ;; (s-exp.mina.websocket/close! session 0 "asdf") +;; ) +;; :close (fn [session status data] +;; (prn :close status)) +;; ;; :http-upgrade +;; ;; (fn [p h] +;; ;; (prn :http-upgrade p h) +;; ;; (java.util.Optional/of h)) +;; :message (fn [session data last?] +;; (prn :message data last?) +;; (s-exp.mina.websocket/send! session data true))}} +;; :write-queue-length 10240 +;; :connection-options {:socket-send-buffer-size 1024}}))) +;; (require '[gniazdo.core :as ws]) -;; https://api.github.com/repos/mpenet/mina/commits/main?per_page=1 +;; (def socket +;; (ws/connect +;; "ws://localhost:8080/foo" +;; :on-receive #(prn 'received %) +;; :subprotocols ["chat"])) + +;; (ws/send-msg socket "hello") +;; (ws/send-msg socket (.getBytes "hello")) + +;; (ws/close socket) + +;; https://api.github.com/repos/mpenet/mina/commits/main?per_page=1 diff --git a/src/s_exp/mina/websocket/listener.clj b/src/s_exp/mina/websocket/listener.clj index 3293736..f563504 100644 --- a/src/s_exp/mina/websocket/listener.clj +++ b/src/s_exp/mina/websocket/listener.clj @@ -1,26 +1,82 @@ (ns s-exp.mina.websocket.listener (:import (io.helidon.common.buffers BufferData) - (io.helidon.http Headers) + (io.helidon.http Headers HeaderNames HeaderName Header HeaderValues) (io.helidon.http HttpPrologue) (io.helidon.http WritableHeaders) (io.helidon.webserver.websocket WsUpgrader) - (io.helidon.websocket WsListener WsSession) + (io.helidon.websocket WsListener WsSession WsUpgradeException) (java.util Optional))) (set! *warn-on-reflection* true) +;; Client must send Sec-WebSocket-Version and Sec-WebSocket-Key. +;; Server must confirm the protocol by returning Sec-WebSocket-Accept. +;; Client may send a list of application subprotocols via Sec-WebSocket-Protocol. +;; Server must select one of the advertised subprotocols and return it via +;; Sec-WebSocket-Protocol. If the server does not support any, then the +;; connection is aborted. +;; Client may send a list of protocol extensions in Sec-WebSocket-Extensions. +;; Server may confirm one or more selected extensions via +;; Sec-WebSocket-Extensions. If no extensions are provided, then the connection +;; proceeds without them. +;; Finally, once the preceding handshake is complete, and if the handshake is +;; successful, the connection can now be used as a two-way communication channel +;; for exchanging WebSocket messages. From here on, there is no other explicit +;; HTTP communication between the client and server, and the WebSocket protocol +;; takes over. +(defn- header-negotiate + [^HeaderName k allowed-values ^Headers headers ^WritableHeaders response-headers] + (when (seq allowed-values) + (if-let [selected-value + (reduce (fn [_ x] + (when (contains? allowed-values x) + (reduced x))) + nil + (.allValues (.get headers k) + true))] + (.set response-headers (HeaderValues/create k ^String selected-value)) + (throw (WsUpgradeException. (format "Failed negotiation for %s" + (.defaultCase k))))))) + +(defn negotiate-subprotocols! + [allowed-sub-protocols ^Headers headers ^WritableHeaders response-headers] + (header-negotiate WsUpgrader/PROTOCOL + allowed-sub-protocols + ^Headers headers + ^WritableHeaders response-headers)) + +(defn negotiate-extensions! + [allowed-extensions ^Headers headers ^WritableHeaders response-headers] + (header-negotiate WsUpgrader/EXTENSIONS + allowed-extensions + ^Headers headers + ^WritableHeaders response-headers)) + +(defn http-upgrade* + [^HttpPrologue http-prologue ^Headers request-headers + allowed-protocols allowed-extensions] + (let [response-headers (WritableHeaders/create)] + (negotiate-subprotocols! allowed-protocols + request-headers + response-headers) + (negotiate-extensions! allowed-extensions + request-headers + response-headers) + + (if (pos? (.size response-headers)) + (Optional/of response-headers) + (Optional/of request-headers)))) + (defn make-listener ^WsListener [{:as _listener - :keys [message ping pong close error open http-upgrade] + :keys [message ping pong close error open http-upgrade + subprotocols extensions] :or {message (constantly nil) ping (constantly nil) pong (constantly nil) close (constantly nil) error (constantly nil) - open (constantly nil) - http-upgrade (fn [^HttpPrologue _http-prologue - ^Headers headers] - (Optional/of headers))}}] + open (constantly nil)}}] (reify WsListener (^void onMessage [_ ^WsSession session ^String data ^boolean last?] @@ -38,4 +94,10 @@ (^void onOpen [_ ^WsSession session] (open session)) (^Optional onHttpUpgrade [_ ^HttpPrologue http-prologue ^Headers headers] - (http-upgrade http-prologue headers)))) + (if http-upgrade + (http-upgrade http-prologue headers) + (http-upgrade* http-prologue headers + (set subprotocols) + (set extensions)))))) + +