1
1
{-# LANGUAGE BangPatterns #-}
2
+ {-# LANGUAGE BlockArguments #-}
2
3
{-# LANGUAGE DataKinds #-}
3
4
{-# LANGUAGE FlexibleContexts #-}
4
5
{-# LANGUAGE GADTs #-}
@@ -619,7 +620,7 @@ with args@Arguments {
619
620
connId
620
621
writer
621
622
handler =
622
- mask $ \ unmask -> async $ do
623
+ mask_ $ asyncWithUnmask \ unmask -> flip finally (cleanup unmask) do
623
624
runWithUnmask
624
625
(handler updateVersionDataFn socket writer
625
626
(TrConnectionHandler connId `contramap` tracer)
@@ -630,10 +631,9 @@ with args@Arguments {
630
631
(Mx. WithBearer connId `contramap` muxTracer))
631
632
withBuffer)
632
633
unmask
633
- `finally` cleanup
634
634
where
635
- cleanup :: m ()
636
- cleanup =
635
+ cleanup :: ( forall c . m c -> m c ) -> m ()
636
+ cleanup unmask =
637
637
-- We must ensure that we update 'connVar',
638
638
-- `acquireOutboundConnection` might be blocked on it awaiting for:
639
639
-- - handshake negotiation; or
@@ -642,7 +642,7 @@ with args@Arguments {
642
642
-- function after all is interruptible, because we unmask async
643
643
-- exceptions around 'threadDelay', but even if an async exception
644
644
-- hits there we will update `connVar`.
645
- uninterruptibleMask $ \ unmask -> do
645
+ uninterruptibleMask_ do
646
646
traceWith tracer (TrConnectionCleanup connId)
647
647
mbTransition <- modifyTMVar stateVar $ \ state -> do
648
648
eTransition <- atomically $ do
@@ -869,7 +869,7 @@ with args@Arguments {
869
869
hardLimit
870
870
socket
871
871
connId = do
872
- r <- modifyTMVar stateVar $ \ state -> do
872
+ r <- mask \ unmask -> modifyTMVar stateVar \ state -> unmask do
873
873
numberOfCons <- atomically $ countIncomingConnections state
874
874
875
875
let -- Check if after accepting this connection we get above the
@@ -1486,91 +1486,86 @@ with args@Arguments {
1486
1486
-- * closing the socket
1487
1487
-- * freeing the slot in connection manager state map
1488
1488
--
1489
- mask $ \ unmask -> do
1489
+ --
1490
+ -- connect
1491
+ --
1492
+ bracketOnError
1493
+ (openToConnect snocket peerAddr)
1494
+ (\ socket -> uninterruptibleMask_ $ do
1495
+ close snocket socket
1496
+ trs <- atomically $ modifyTMVarSTM stateVar $ \ state -> do
1497
+ connState <- readTVar connVar
1498
+ let state' = State. deleteAtRemoteAddr peerAddr mutableConnState state
1499
+ connState' = TerminatedState Nothing
1500
+ writeTVar connVar connState'
1501
+ return
1502
+ ( state'
1503
+ , [ mkTransition connState connState'
1504
+ , Transition (Known connState')
1505
+ Unknown
1506
+ ]
1507
+ )
1490
1508
1491
- --
1492
- -- connect
1493
- --
1509
+ traverse_ (traceWith trTracer . TransitionTrace connStateId) trs
1510
+ traceCounters stateVar
1511
+ )
1512
+ $ \ socket -> do
1513
+ traceWith tracer (TrConnectionNotFound provenance peerAddr)
1514
+ let addr = case addressType peerAddr of
1515
+ Nothing -> Nothing
1516
+ Just IPv4Address -> ipv4Address
1517
+ Just IPv6Address -> ipv6Address
1518
+ configureSocket socket addr
1519
+ -- only bind to the ip address if:
1520
+ -- the diffusion is given `ipv4/6` addresses;
1521
+ -- `diffusionMode` for this connection is
1522
+ -- `InitiatorAndResponderMode`.
1523
+ case addressType peerAddr of
1524
+ Just IPv4Address | InitiatorAndResponderDiffusionMode
1525
+ <- diffusionMode ->
1526
+ traverse_ (bind snocket socket)
1527
+ ipv4Address
1528
+ Just IPv6Address | InitiatorAndResponderDiffusionMode
1529
+ <- diffusionMode ->
1530
+ traverse_ (bind snocket socket)
1531
+ ipv6Address
1532
+ _ -> pure ()
1533
+
1534
+ traceWith tracer (TrConnect addr peerAddr diffusionMode)
1535
+ connect snocket socket peerAddr
1536
+ `catch` \ e -> do
1537
+ traceWith tracer (TrConnectError addr peerAddr e)
1538
+ -- the handler attached by `bracketOnError` will
1539
+ -- reset the state
1540
+ throwIO e
1541
+ localAddress <- getLocalAddr snocket socket
1542
+ let connId = ConnectionId { localAddress
1543
+ , remoteAddress = peerAddr
1544
+ }
1545
+ updated <- atomically $ modifyTMVarPure stateVar (swap . State. updateLocalAddr connId)
1546
+ unless updated $
1547
+ -- there exists a connection with exact same
1548
+ -- `ConnectionId`
1549
+ --
1550
+ -- NOTE:
1551
+ -- When we are connecting from our own `(ip, port)` to
1552
+ -- itself. In this case on linux, the `connect`
1553
+ -- returns, while `accept` doesn't. The outbound
1554
+ -- socket is connected to itself (simultaneuos TCP
1555
+ -- open?). Since the `accept` call never returns, the
1556
+ -- `connId` slot must have been available, and thus
1557
+ -- `State.updateLocalAddr` must have returned `True`.
1558
+ throwIO (withCallStack $ ConnectionExists provenance peerAddr)
1494
1559
1495
- (socket, connId) <-
1496
- unmask $ bracketOnError
1497
- (openToConnect snocket peerAddr)
1498
- (\ socket -> uninterruptibleMask_ $ do
1499
- close snocket socket
1500
- trs <- atomically $ modifyTMVarSTM stateVar $ \ state -> do
1501
- connState <- readTVar connVar
1502
- let state' = State. deleteAtRemoteAddr peerAddr mutableConnState state
1503
- connState' = TerminatedState Nothing
1504
- writeTVar connVar connState'
1505
- return
1506
- ( state'
1507
- , [ mkTransition connState connState'
1508
- , Transition (Known connState')
1509
- Unknown
1510
- ]
1511
- )
1512
-
1513
- traverse_ (traceWith trTracer . TransitionTrace connStateId) trs
1514
- traceCounters stateVar
1515
- )
1516
- $ \ socket -> do
1517
- traceWith tracer (TrConnectionNotFound provenance peerAddr)
1518
- let addr = case addressType peerAddr of
1519
- Nothing -> Nothing
1520
- Just IPv4Address -> ipv4Address
1521
- Just IPv6Address -> ipv6Address
1522
- configureSocket socket addr
1523
- -- only bind to the ip address if:
1524
- -- the diffusion is given `ipv4/6` addresses;
1525
- -- `diffusionMode` for this connection is
1526
- -- `InitiatorAndResponderMode`.
1527
- case addressType peerAddr of
1528
- Just IPv4Address | InitiatorAndResponderDiffusionMode
1529
- <- diffusionMode ->
1530
- traverse_ (bind snocket socket)
1531
- ipv4Address
1532
- Just IPv6Address | InitiatorAndResponderDiffusionMode
1533
- <- diffusionMode ->
1534
- traverse_ (bind snocket socket)
1535
- ipv6Address
1536
- _ -> pure ()
1537
-
1538
- traceWith tracer (TrConnect addr peerAddr diffusionMode)
1539
- connect snocket socket peerAddr
1540
- `catch` \ e -> do
1541
- traceWith tracer (TrConnectError addr peerAddr e)
1542
- -- the handler attached by `bracketOnError` will
1543
- -- reset the state
1544
- throwIO e
1545
- localAddress <- getLocalAddr snocket socket
1546
- let connId = ConnectionId { localAddress
1547
- , remoteAddress = peerAddr
1548
- }
1549
- updated <- atomically $ modifyTMVarPure stateVar (swap . State. updateLocalAddr connId)
1550
- unless updated $
1551
- -- there exists a connection with exact same
1552
- -- `ConnectionId`
1553
- --
1554
- -- NOTE:
1555
- -- When we are connecting from our own `(ip, port)` to
1556
- -- itself. In this case on linux, the `connect`
1557
- -- returns, while `accept` doesn't. The outbound
1558
- -- socket is connected to itself (simultaneuos TCP
1559
- -- open?). Since the `accept` call never returns, the
1560
- -- `connId` slot must have been available, and thus
1561
- -- `State.updateLocalAddr` must have returned `True`.
1562
- throwIO (withCallStack $ ConnectionExists provenance peerAddr)
1563
-
1564
- return (socket, connId)
1560
+ --
1561
+ -- fork connection handler; it will unmask exceptions
1562
+ --
1565
1563
1566
- - -
1567
- -- fork connection handler; it will unmask exceptions
1568
- --
1564
+ connThread < -
1565
+ forkConnectionHandler
1566
+ ( `updateVersionData` diffusionMode) stateVar mutableConnState socket connId writer handler
1569
1567
1570
- connThread <-
1571
- forkConnectionHandler
1572
- (`updateVersionData` diffusionMode) stateVar mutableConnState socket connId writer handler
1573
- return (connId, connThread)
1568
+ return (connId, connThread)
1574
1569
1575
1570
(trans, mbAssertion) <- atomically $ do
1576
1571
connState <- readTVar connVar
@@ -2419,10 +2414,9 @@ modifyTMVar :: ( MonadEvaluate m
2419
2414
=> StrictTMVar m a
2420
2415
-> (a -> m (a , b ))
2421
2416
-> m b
2422
- modifyTMVar v k =
2423
- mask $ \ restore -> do
2417
+ modifyTMVar v k = do
2424
2418
a <- atomically (takeTMVar v)
2425
- (a',b) <- restore (k a >>= evaluate)
2419
+ (a',b) <- (k a >>= evaluate)
2426
2420
`onException`
2427
2421
atomically (putTMVar v a)
2428
2422
atomically (putTMVar v a')
0 commit comments