diff --git a/example/web/elm/src/Chat.elm b/example/web/elm/src/Chat.elm index 947f199..ab1dc01 100644 --- a/example/web/elm/src/Chat.elm +++ b/example/web/elm/src/Chat.elm @@ -31,6 +31,7 @@ type alias Model = , state : State , messages : List Message , composedMessage : String + , accessToken : Int } @@ -53,6 +54,7 @@ initModel = , messages = [] , state = LeftLobby , composedMessage = "" + , accessToken = 1 } @@ -73,6 +75,7 @@ type Msg | NewMsg JD.Value | UserJoinedMsg JD.Value | SendComposedMessage + | RefreshAccessToken update : Msg -> Model -> ( Model, Cmd Msg ) @@ -114,6 +117,9 @@ update message model = Err err -> model ! [] + RefreshAccessToken -> + { model | accessToken = model.accessToken + 1 } ! [] + -- Decoder @@ -143,9 +149,11 @@ lobbySocket = {-| Initialize a socket with the default heartbeat intervall of 30 seconds -} -socket : Socket -socket = +socket : Int -> Socket Msg +socket accessToken = Socket.init lobbySocket + |> Socket.withParams [ ( "accessToken", toString accessToken ) ] + |> Socket.onDie RefreshAccessToken lobby : String -> Channel Msg @@ -162,16 +170,20 @@ lobby userName = subscriptions : Model -> Sub Msg subscriptions model = - case model.state of - JoiningLobby -> - Phoenix.connect socket [ lobby model.userName ] + let + connect = + Phoenix.connect (socket model.accessToken) + in + case model.state of + JoiningLobby -> + connect [ lobby model.userName ] - JoinedLobby -> - Phoenix.connect socket [ lobby model.userName ] + JoinedLobby -> + connect [ lobby model.userName ] - -- we already open the socket connection so that we can faster join the lobby - _ -> - Phoenix.connect socket [] + -- we already open the socket connection so that we can faster join the lobby + _ -> + connect [] @@ -235,7 +247,7 @@ button model = Html.button [ buttonClass False, Events.onClick (UpdateState LeavingLobby) ] [ Html.text "Leave lobby" ] -chatMessages : List (Message) -> Html Msg +chatMessages : List Message -> Html Msg chatMessages messages = Html.div [ Attr.class "chat-messages" ] (List.map chatMessage messages) diff --git a/src/Phoenix.elm b/src/Phoenix.elm index 65e960d..4e056d0 100644 --- a/src/Phoenix.elm +++ b/src/Phoenix.elm @@ -30,7 +30,7 @@ import Phoenix.Push as Push exposing (Push) type MySub msg - = Connect Socket (List (Channel msg)) + = Connect (Socket msg) (List (Channel msg)) {-| Declare a socket you want to connect to and the channels you want to join. The effect manager will open the socket connection, join the channels. See `Phoenix.Socket` and `Phoenix.Channel` for more configuration and behaviour details. @@ -54,7 +54,7 @@ type MySub msg **Note**: An empty channel list keeps the socket connection open. -} -connect : Socket -> List (Channel msg) -> Sub msg +connect : Socket msg -> List (Channel msg) -> Sub msg connect socket channels = subscription (Connect socket channels) @@ -101,7 +101,9 @@ subMap : (a -> b) -> MySub a -> MySub b subMap func sub = case sub of Connect socket channels -> - Connect socket (List.map (InternalChannel.channelMap func) channels) + Connect + (socket |> InternalSocket.socketMap func) + (channels |> List.map (InternalChannel.channelMap func)) type alias Message = @@ -113,15 +115,15 @@ type alias Message = type alias State msg = - { sockets : InternalSocketsDict + { sockets : InternalSocketsDict msg , channels : InternalChannelsDict msg , selfCallbacks : Dict Ref (SelfCallback msg) , channelQueues : ChannelQueuesDict msg } -type alias InternalSocketsDict = - Dict Endpoint InternalSocket +type alias InternalSocketsDict msg = + Dict Endpoint (InternalSocket msg) type alias InternalChannelsDict msg = @@ -210,7 +212,7 @@ onEffects router cmds subs state = -- BUILD SOCKETS -buildSocketsDict : List (MySub msg) -> Dict String Socket +buildSocketsDict : List (MySub msg) -> Dict String (Socket msg) buildSocketsDict subs = let insert sub dict = @@ -257,29 +259,29 @@ sendPushsHelp cmds state = <&> sendPushsHelp rest -handleSocketsUpdate : Platform.Router msg (Msg msg) -> Dict String Socket -> InternalSocketsDict -> Task Never InternalSocketsDict +handleSocketsUpdate : Platform.Router msg (Msg msg) -> Dict String (Socket msg) -> InternalSocketsDict msg -> Task Never (InternalSocketsDict msg) handleSocketsUpdate router definedSockets stateSockets = let - -- leftStep: endpoints where we have to open a new socket connection - leftStep endpoint definedSocket getNewSockets = + -- addedSocketsStep: endpoints where we have to open a new socket connection + addedSocketsStep endpoint definedSocket taskChain = let socket = (InternalSocket.internalSocket definedSocket) in - getNewSockets - <&> \newSockets -> + taskChain + <&> \addedSockets -> attemptOpen router 0 socket <&> \pid -> - Task.succeed (Dict.insert endpoint (InternalSocket.opening 0 pid socket) newSockets) + Task.succeed (Dict.insert endpoint (InternalSocket.opening 0 pid socket) addedSockets) -- we update the authentication parameters - bothStep endpoint definedSocket stateSocket getNewSockets = - Task.map (Dict.insert endpoint <| InternalSocket.update definedSocket stateSocket) getNewSockets + retainedSocketsStep endpoint definedSocket stateSocket taskChain = + taskChain |> Task.map (Dict.insert endpoint <| InternalSocket.update definedSocket stateSocket) - rightStep endpoint stateSocket getNewSockets = - InternalSocket.close stateSocket &> getNewSockets + removedSocketsStep endpoint stateSocket taskChain = + InternalSocket.close stateSocket &> taskChain in - Dict.merge leftStep bothStep rightStep definedSockets stateSockets (Task.succeed Dict.empty) + Dict.merge addedSocketsStep retainedSocketsStep removedSocketsStep definedSockets stateSockets (Task.succeed Dict.empty) handleChannelsUpdate : Platform.Router msg (Msg msg) -> InternalChannelsDict msg -> InternalChannelsDict msg -> Task Never (InternalChannelsDict msg) @@ -358,7 +360,7 @@ sendJoinChannel router endpoint internalChannel = -- STATE UPDATE HELPERS -updateSocket : Endpoint -> InternalSocket -> State msg -> State msg +updateSocket : Endpoint -> InternalSocket msg -> State msg -> State msg updateSocket endpoint socket state = { state | sockets = Dict.insert endpoint socket state.sockets } @@ -378,7 +380,7 @@ removeChannelQueue endpoint topic state = { state | channelQueues = Helpers.removeIn endpoint topic state.channelQueues } -insertSocket : Endpoint -> InternalSocket -> State msg -> State msg +insertSocket : Endpoint -> InternalSocket msg -> State msg -> State msg insertSocket endpoint socket state = { state | sockets = Dict.insert endpoint socket state.sockets @@ -498,8 +500,21 @@ onSelfMsg router selfMsg state = finalNewState pid = Task.map (updateSocket endpoint (InternalSocket.opening backoffIteration pid internalSocket)) getNewState + + notifyApp = + case + ( internalSocket.socket.onDie + , internalSocket |> InternalSocket.isOpening + ) + of + ( Just onDie, False ) -> + Platform.sendToApp router onDie &> Task.succeed () + + _ -> + Task.succeed () in - attemptOpen router backoff internalSocket + notifyApp + &> attemptOpen router backoff internalSocket |> Task.andThen finalNewState Receive endpoint message -> @@ -902,7 +917,7 @@ pushSocket endpoint message selfCb state = -- OPENING WEBSOCKETS WITH EXPONENTIAL BACKOFF -attemptOpen : Platform.Router msg (Msg msg) -> Float -> InternalSocket -> Task x Process.Id +attemptOpen : Platform.Router msg (Msg msg) -> Float -> InternalSocket msg -> Task x Process.Id attemptOpen router backoff ({ connection, socket } as internalSocket) = let goodOpen ws = @@ -918,7 +933,7 @@ attemptOpen router backoff ({ connection, socket } as internalSocket) = Process.spawn (after backoff &> actuallyAttemptOpen) -open : InternalSocket -> Platform.Router msg (Msg msg) -> Task WS.BadOpen WS.WebSocket +open : InternalSocket msg -> Platform.Router msg (Msg msg) -> Task WS.BadOpen WS.WebSocket open socket router = let onMessage _ msg = diff --git a/src/Phoenix/Internal/Socket.elm b/src/Phoenix/Internal/Socket.elm index dc39af7..83266c9 100644 --- a/src/Phoenix/Internal/Socket.elm +++ b/src/Phoenix/Internal/Socket.elm @@ -25,30 +25,49 @@ type Connection | Connected WS.WebSocket Int -type alias InternalSocket = - { connection : Connection, socket : Socket.Socket } +type alias InternalSocket msg = + { connection : Connection, socket : Socket.Socket msg } -internalSocket : Socket.Socket -> InternalSocket +internalSocket : Socket.Socket msg -> InternalSocket msg internalSocket socket = { connection = Closed, socket = socket } +-- INSPECT + + +isOpening : InternalSocket msg -> Bool +isOpening internalSocket = + case internalSocket.connection of + Opening _ _ -> + True + + _ -> + False + + + -- MODIFY -opening : Int -> Process.Id -> InternalSocket -> InternalSocket +socketMap : (a -> b) -> Socket.Socket a -> Socket.Socket b +socketMap func socket = + { socket | onDie = Maybe.map func socket.onDie } + + +opening : Int -> Process.Id -> InternalSocket msg -> InternalSocket msg opening backoff pid socket = { socket | connection = (Opening backoff pid) } -connected : WS.WebSocket -> InternalSocket -> InternalSocket +connected : WS.WebSocket -> InternalSocket msg -> InternalSocket msg connected ws socket = { socket | connection = (Connected ws 0) } -increaseRef : InternalSocket -> InternalSocket +increaseRef : InternalSocket msg -> InternalSocket msg increaseRef socket = case socket.connection of Connected ws ref -> @@ -58,16 +77,33 @@ increaseRef socket = socket -update : Socket.Socket -> InternalSocket -> InternalSocket -update socket { connection } = - InternalSocket connection socket +update : Socket.Socket msg -> InternalSocket msg -> InternalSocket msg +update nextSocket { connection, socket } = + let + updatedConnection = + if nextSocket.params /= socket.params then + resetBackoff connection + else + connection + in + InternalSocket updatedConnection nextSocket + + +resetBackoff : Connection -> Connection +resetBackoff connection = + case connection of + Opening backoff pid -> + Opening 0 pid + + _ -> + connection -- PUSH -push : Message -> InternalSocket -> Task x (Maybe Ref) +push : Message -> InternalSocket msg -> Task x (Maybe Ref) push message { connection, socket } = case connection of Connected ws ref -> @@ -105,7 +141,7 @@ push message { connection, socket } = -- OPEN CONNECTIONs -open : InternalSocket -> WS.Settings -> Task WS.BadOpen WS.WebSocket +open : InternalSocket msg -> WS.Settings -> Task WS.BadOpen WS.WebSocket open { socket } settings = let query = @@ -134,7 +170,7 @@ after backoff = -- CLOSE CONNECTIONS -close : InternalSocket -> Task x () +close : InternalSocket msg -> Task x () close { connection } = case connection of Opening _ pid -> @@ -151,17 +187,17 @@ close { connection } = -- HELPERS -get : Endpoint -> Dict Endpoint InternalSocket -> Maybe InternalSocket +get : Endpoint -> Dict Endpoint (InternalSocket msg) -> Maybe (InternalSocket msg) get endpoint dict = Dict.get endpoint dict -getRef : Endpoint -> Dict Endpoint InternalSocket -> Maybe Ref +getRef : Endpoint -> Dict Endpoint (InternalSocket msg) -> Maybe Ref getRef endpoint dict = get endpoint dict |> Maybe.andThen ref -ref : InternalSocket -> Maybe Ref +ref : InternalSocket msg -> Maybe Ref ref { connection } = case connection of Connected _ ref_ -> @@ -171,7 +207,7 @@ ref { connection } = Nothing -debugLogMessage : InternalSocket -> a -> a +debugLogMessage : InternalSocket msg -> a -> a debugLogMessage { socket } msg = if socket.debug then Debug.log "Received" msg diff --git a/src/Phoenix/Socket.elm b/src/Phoenix/Socket.elm index 9d1caa3..1ede09c 100644 --- a/src/Phoenix/Socket.elm +++ b/src/Phoenix/Socket.elm @@ -1,4 +1,4 @@ -module Phoenix.Socket exposing (Socket, init, heartbeatIntervallSeconds, withoutHeartbeat, reconnectTimer, withParams, withDebug) +module Phoenix.Socket exposing (Socket, init, heartbeatIntervallSeconds, withoutHeartbeat, reconnectTimer, withParams, withDebug, onDie) {-| A socket declares to which endpoint a socket connection should be established. @@ -6,7 +6,7 @@ module Phoenix.Socket exposing (Socket, init, heartbeatIntervallSeconds, without @docs Socket # Helpers -@docs init, withParams, heartbeatIntervallSeconds, withoutHeartbeat, reconnectTimer, withDebug +@docs init, withParams, heartbeatIntervallSeconds, withoutHeartbeat, reconnectTimer, withDebug, onDie -} import Time exposing (Time) @@ -14,17 +14,18 @@ import Time exposing (Time) {-| Representation of a Socket connection -} -type alias Socket = - PhoenixSocket +type alias Socket msg = + PhoenixSocket msg -type alias PhoenixSocket = +type alias PhoenixSocket msg = { endpoint : String , params : List ( String, String ) , heartbeatIntervall : Time , withoutHeartbeat : Bool , reconnectTimer : Int -> Float , debug : Bool + , onDie : Maybe msg } @@ -32,7 +33,7 @@ type alias PhoenixSocket = init "ws://localhost:4000/socket/websocket" -} -init : String -> Socket +init : String -> Socket msg init endpoint = { endpoint = endpoint , params = [] @@ -40,6 +41,7 @@ init endpoint = , withoutHeartbeat = False , reconnectTimer = defaultReconnectTimer , debug = False + , onDie = Nothing } @@ -48,7 +50,7 @@ init endpoint = init "ws://localhost:4000/socket/websocket" |> withParams [("token", "GYMXZwXzKFzfxyGntVkYt7uAJnscVnFJ")] -} -withParams : List ( String, String ) -> Socket -> Socket +withParams : List ( String, String ) -> Socket msg -> Socket msg withParams params socket = { socket | params = params } @@ -58,7 +60,7 @@ withParams params socket = init "ws://localhost:4000/socket/websocket" |> heartbeatIntervallSeconds 60 -} -heartbeatIntervallSeconds : Int -> Socket -> Socket +heartbeatIntervallSeconds : Int -> Socket msg -> Socket msg heartbeatIntervallSeconds intervall socket = { socket | heartbeatIntervall = (toFloat intervall) * Time.second } @@ -68,7 +70,7 @@ heartbeatIntervallSeconds intervall socket = init "ws://localhost:4000/socket/websocket" |> withoutHeartbeat -} -withoutHeartbeat : Socket -> Socket +withoutHeartbeat : Socket msg -> Socket msg withoutHeartbeat socket = { socket | withoutHeartbeat = True } @@ -83,18 +85,33 @@ withoutHeartbeat socket = With this function you can specify a custom strategy. -} -reconnectTimer : (Int -> Time) -> Socket -> Socket +reconnectTimer : (Int -> Time) -> Socket msg -> Socket msg reconnectTimer timerFunc socket = { socket | reconnectTimer = timerFunc } {-| Enable debug logs for the socket. Every incoming and outgoing message will be printed. -} -withDebug : Socket -> Socket +withDebug : Socket msg -> Socket msg withDebug socket = { socket | debug = True } +{-| Set a callback which will be called if the socket connection got interrupted. Useful for updating query params like access tokens. + + + + type Msg = + + RefreshAccessToken | ... + + + + init "ws://localhost:4000/socket/websocket" + + |> withParams [ ( "accessToken", "abc123" ) ] + + |> onDie RefreshAccessToken + + +-} +onDie : msg -> Socket msg -> Socket msg +onDie onDie_ socket = + { socket | onDie = Just onDie_ } + + defaultReconnectTimer : Int -> Time defaultReconnectTimer failedAttempts = if failedAttempts < 1 then