Skip to content

Commit

Permalink
Re-implement per-context busses in reorganized Stream code
Browse files Browse the repository at this point in the history
  • Loading branch information
matthewkaney committed Apr 12, 2024
1 parent 4691224 commit c3d9dd3
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 94 deletions.
5 changes: 5 additions & 0 deletions src/Sound/Tidal/Stream/Config.hs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
module Sound.Tidal.Stream.Config where

import Control.Monad (when)

import qualified Sound.Tidal.Clock as Clock

{-
Expand Down Expand Up @@ -42,3 +44,6 @@ defaultConfig = Config {cCtrlListen = True,
cVerbose = True,
cClockConfig = Clock.defaultConfig
}

verbose :: Config -> String -> IO ()
verbose c s = when (cVerbose c) $ putStrLn s
40 changes: 9 additions & 31 deletions src/Sound/Tidal/Stream/Listen.hs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
module Sound.Tidal.Stream.Listen where

import Data.Maybe (fromJust, catMaybes, isJust)
import Data.Maybe (fromJust)
import Control.Concurrent.MVar
import Control.Monad (when)
import System.IO (hPutStrLn, stderr)
import qualified Data.Map as Map
import qualified Sound.Osc.Fd as O
import qualified Sound.Osc.Time.Timeout as O
import qualified Network.Socket as N
import qualified Control.Exception as E

Expand Down Expand Up @@ -50,29 +49,14 @@ openListener c
catchAny = E.catch

-- Listen to and act on OSC control messages
ctrlResponder :: Int -> Config -> Stream -> IO ()
ctrlResponder waits c (stream@(Stream {sListen = Just sock}))
= do ms <- recvMessagesTimeout 2 sock
if (null ms)
then do checkHandshake -- there was a timeout, check handshake
ctrlResponder (waits+1) c stream
else do mapM_ act ms
ctrlResponder 0 c stream
where
checkHandshake = do busses <- readMVar (sBusses stream)
when (null busses) $ do when (waits == 0) $ verbose c $ "Waiting for SuperDirt (v.1.7.2 or higher).."
sendHandshakes stream

act (O.Message "/dirt/hello" _) = sendHandshakes stream
act (O.Message "/dirt/handshake/reply" xs) = do prev <- swapMVar (sBusses stream) $ bufferIndices xs
-- Only report the first time..
when (null prev) $ verbose c $ "Connected to SuperDirt."
return ()
where
bufferIndices [] = []
bufferIndices (x:xs') | x == (O.AsciiString $ O.ascii "&controlBusIndices") = catMaybes $ takeWhile isJust $ map O.datum_integral xs'
| otherwise = bufferIndices xs'
ctrlResponder :: Config -> Stream -> IO ()
ctrlResponder _ (stream@(Stream {sListen = Just sock})) = loop
where
loop :: IO ()
loop = do O.recvMessages sock >>= mapM_ act
loop
-- External controller commands
act :: O.Message -> IO ()
act (O.Message "/ctrl" (O.Int32 k:v:[]))
= act (O.Message "/ctrl" [O.string $ show k,v])
act (O.Message "/ctrl" (O.AsciiString k:v@(O.Float _):[]))
Expand Down Expand Up @@ -109,10 +93,4 @@ ctrlResponder waits c (stream@(Stream {sListen = Just sock}))
withID (O.AsciiString k) func = func $ (ID . O.ascii_to_string) k
withID (O.Int32 k) func = func $ (ID . show) k
withID _ _ = return ()
ctrlResponder _ _ _ = return ()

verbose :: Config -> String -> IO ()
verbose c s = when (cVerbose c) $ putStrLn s

recvMessagesTimeout :: (O.Transport t) => Double -> t -> IO [O.Message]
recvMessagesTimeout n sock = fmap (maybe [] O.packetMessages) $ O.recvPacketTimeout n sock
ctrlResponder _ _ = return ()
9 changes: 2 additions & 7 deletions src/Sound/Tidal/Stream/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import Sound.Tidal.Stream.Types
import Sound.Tidal.Stream.Listen
import Sound.Tidal.Stream.Target
import Sound.Tidal.Stream.Process
import Sound.Tidal.Stream.UI

{-
Main.hs - Start tidals stream, listen and act on incoming messages
Expand Down Expand Up @@ -45,7 +44,6 @@ startStream :: Config -> [(Target, [OSC])] -> IO Stream
startStream config oscmap = do
sMapMV <- newMVar Map.empty
pMapMV <- newMVar Map.empty
bussesMV <- newMVar []
globalFMV <- newMVar id

tidal_status_string >>= verbose config
Expand All @@ -54,10 +52,9 @@ startStream config oscmap = do

cxs <- getCXs config oscmap

clockRef <- Clock.clocked (cClockConfig config) (doTick sMapMV bussesMV pMapMV globalFMV cxs listen)
clockRef <- Clock.clocked (cClockConfig config) (doTick sMapMV pMapMV globalFMV cxs)

let stream = Stream {sConfig = config,
sBusses = bussesMV,
sStateMV = sMapMV,
sClockRef = clockRef,
-- sLink = abletonLink,
Expand All @@ -68,10 +65,8 @@ startStream config oscmap = do
sCxs = cxs
}

sendHandshakes stream

-- Spawn a thread to handle OSC control messages
_ <- forkIO $ ctrlResponder 0 config stream
_ <- forkIO $ ctrlResponder config stream
return stream

startMulti :: [Target] -> Config -> IO ()
Expand Down
24 changes: 11 additions & 13 deletions src/Sound/Tidal/Stream/Process.hs
Original file line number Diff line number Diff line change
Expand Up @@ -76,23 +76,20 @@ data ProcessedEvent =
-- because the likely reason is that something is wrong with the current pattern.

doTick :: MVar ValueMap -- pattern state
-> MVar [Int] -- busses
-> MVar PlayMap -- currently playing
-> MVar (ControlPattern -> ControlPattern) -- current global fx
-> [Cx] -- target addresses
-> Maybe O.Udp -- network socket
-> (Time,Time) -- current arc
-> Double -- nudge
-> Clock.LinkOperations -- ableton link operations
-> IO ()
doTick stateMV busMV playMV globalFMV cxs listen (st,end) nudge ops =
doTick stateMV playMV globalFMV cxs (st,end) nudge ops =
E.handle (\ (e :: E.SomeException) -> do
hPutStrLn stderr $ "Failed to Stream.doTick: " ++ show e
hPutStrLn stderr $ "Return to previous pattern."
setPreviousPatternOrSilence playMV) (do
sMap <- takeMVar stateMV
pMap <- readMVar playMV
busses <- readMVar busMV
sGlobalF <- readMVar globalFMV
bpm <- (Clock.getTempo ops)
let
Expand All @@ -109,13 +106,14 @@ doTick stateMV busMV playMV globalFMV cxs listen (st,end) nudge ops =
(sMap'', es') = resolveState sMap' es
tes <- processCps ops es'
-- For each OSC target
forM_ cxs $ \cx@(Cx target _ oscs _ _) -> do
forM_ cxs $ \cx@(Cx target _ oscs _ _ bussesMV) -> do
busses <- mapM readMVar bussesMV
-- Latency is configurable per target.
-- Latency is only used when sending events live.
let latency = oLatency target
ms = concatMap (\e -> concatMap (toOSC busses e) oscs) tes
-- send the events to the OSC target
forM_ ms $ \m -> (send listen cx latency extraLatency m) `E.catch` \(e :: E.SomeException) ->
forM_ ms $ \m -> (send cx latency extraLatency m) `E.catch` \(e :: E.SomeException) ->
hPutStrLn stderr $ "Failed to send. Is the '" ++ oName target ++ "' target running? " ++ show e
putMVar stateMV sMap'')

Expand Down Expand Up @@ -154,8 +152,8 @@ processCps ops = mapM processEvent
}


toOSC :: [Int] -> ProcessedEvent -> OSC -> [(Double, Bool, O.Message)]
toOSC busses pe osc@(OSC _ _)
toOSC :: Maybe [Int] -> ProcessedEvent -> OSC -> [(Double, Bool, O.Message)]
toOSC maybeBusses pe osc@(OSC _ _)
= catMaybes (playmsg:busmsgs)
-- playmap is a ValueMap where the keys don't start with ^ and are not ""
-- busmap is a ValueMap containing the rest of the keys from the event value
Expand Down Expand Up @@ -190,8 +188,8 @@ toOSC busses pe osc@(OSC _ _)
O.Message mungedPath vs
)
| otherwise = Nothing
toBus n | null busses = n
| otherwise = busses !!! n
toBus n | Just busses <- maybeBusses, (not . null) busses = busses !!! n
| otherwise = n
busmsgs = map
(\(('^':k), (VI b)) -> do v <- Map.lookup k playmap

Check warning on line 194 in src/Sound/Tidal/Stream/Process.hs

View workflow job for this annotation

GitHub Actions / cabal latest - ghc latest

Pattern match(es) are non-exhaustive

Check warning on line 194 in src/Sound/Tidal/Stream/Process.hs

View workflow job for this annotation

GitHub Actions / cabal 3.8.1.0 - ghc 9.4.1

Pattern match(es) are non-exhaustive
return $ (tsPart,
Expand Down Expand Up @@ -282,8 +280,8 @@ hasSolo = (>= 1) . length . filter solo . Map.elems
-- However, since the full arc is processed at once and since Link does not support
-- scheduling, tempo change may affect scheduling of events that happen earlier
-- in the normal stream (the one handled by onTick).
onSingleTick :: Config -> Clock.ClockRef -> MVar ValueMap -> MVar [Int] -> MVar PlayMap -> MVar (ControlPattern -> ControlPattern) -> [Cx] -> Maybe O.Udp -> ControlPattern -> IO ()
onSingleTick config clockRef stateMV busMV _ globalFMV cxs listen pat = do
onSingleTick :: Config -> Clock.ClockRef -> MVar ValueMap -> MVar PlayMap -> MVar (ControlPattern -> ControlPattern) -> [Cx] -> ControlPattern -> IO ()
onSingleTick config clockRef stateMV _ globalFMV cxs pat = do
ops <- Clock.getZeroedLinkOperations (cClockConfig config) clockRef
pMapMV <- newMVar $ Map.singleton "fake"
(PlayState {pattern = pat,
Expand All @@ -293,7 +291,7 @@ onSingleTick config clockRef stateMV busMV _ globalFMV cxs listen pat = do
}
)
-- The nowArc is a full cycle
doTick stateMV busMV pMapMV globalFMV cxs listen (0,1) 0 ops
doTick stateMV pMapMV globalFMV cxs (0,1) 0 ops



Expand Down
81 changes: 56 additions & 25 deletions src/Sound/Tidal/Stream/Target.hs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
module Sound.Tidal.Stream.Target where

import qualified Sound.Osc.Fd as O
import qualified Sound.Osc.Time.Timeout as O
import qualified Network.Socket as N
import Data.Maybe (fromJust, isJust)
import Control.Concurrent (forkOS, threadDelay)
import Data.Maybe (fromJust, isJust, catMaybes)
import Control.Concurrent (newMVar, readMVar, swapMVar, forkIO, forkOS, threadDelay)
import Control.Monad (when)
import Foreign (Word8)

import Sound.Tidal.Pattern
Expand Down Expand Up @@ -31,33 +33,65 @@ import Sound.Tidal.Stream.Config

getCXs :: Config -> [(Target, [OSC])] -> IO [Cx]
getCXs config oscmap = mapM (\(target, os) -> do
remote_addr <- resolve (oAddress target) (show $ oPort target)
remote_bus_addr <- if isJust $ oBusPort target
then Just <$> resolve (oAddress target) (show $ fromJust $ oBusPort target)
else return Nothing
remote_addr <- resolve (oAddress target) (oPort target)
remote_bus_addr <- mapM (resolve (oAddress target)) (oBusPort target)
remote_busses <- sequence (oBusPort target >> Just (newMVar []))

let broadcast = if cCtrlBroadcast config then 1 else 0
u <- O.udp_socket (\sock sockaddr -> do N.setSocketOption sock N.Broadcast broadcast
N.connect sock sockaddr
u <- O.udp_socket (\sock _ -> do N.setSocketOption sock N.Broadcast broadcast
) (oAddress target) (oPort target)
return $ Cx {cxUDP = u, cxAddr = remote_addr, cxBusAddr = remote_bus_addr, cxTarget = target, cxOSCs = os}
let cx = Cx {cxUDP = u, cxAddr = remote_addr, cxBusAddr = remote_bus_addr, cxBusses = remote_busses, cxTarget = target, cxOSCs = os}
_ <- forkIO $ handshake cx config
return cx
) oscmap

resolve :: String -> String -> IO N.AddrInfo
resolve :: String -> Int -> IO N.AddrInfo
resolve host port = do let hints = N.defaultHints { N.addrSocketType = N.Stream }
addr:_ <- N.getAddrInfo (Just hints) (Just host) (Just port)
addr:_ <- N.getAddrInfo (Just hints) (Just host) (Just $ show port)
return addr

handshake :: Cx -> Config -> IO ()
handshake Cx { cxUDP = udp, cxBusses = Just bussesMV, cxAddr = addr } c = sendHandshake >> listen 0
where
sendHandshake :: IO ()
sendHandshake = O.sendTo udp (O.Packet_Message $ O.Message "/dirt/handshake" []) (N.addrAddress addr)
listen :: Int -> IO ()
listen waits = do ms <- recvMessagesTimeout 2 udp
if null ms
then do checkHandshake waits -- there was a timeout, check handshake
listen (waits+1)
else do mapM_ respond ms
listen 0
checkHandshake :: Int -> IO ()
checkHandshake waits = do busses <- readMVar bussesMV
when (null busses) $ do when (waits == 0) $ verbose c $ "Waiting for SuperDirt (v.1.7.2 or higher).."
sendHandshake
respond :: O.Message -> IO ()
respond (O.Message "/dirt/hello" _) = sendHandshake
respond (O.Message "/dirt/handshake/reply" xs) = do prev <- swapMVar bussesMV $ bufferIndices xs
-- Only report the first time..
when (null prev) $ verbose c $ "Connected to SuperDirt."
respond _ = return ()
bufferIndices :: [O.Datum] -> [Int]
bufferIndices [] = []
bufferIndices (x:xs') | x == O.AsciiString (O.ascii "&controlBusIndices") = catMaybes $ takeWhile isJust $ map O.datum_integral xs'
| otherwise = bufferIndices xs'
handshake _ _ = return ()

recvMessagesTimeout :: (O.Transport t) => Double -> t -> IO [O.Message]
recvMessagesTimeout n sock = fmap (maybe [] O.packetMessages) $ O.recvPacketTimeout n sock

-- send has three modes:
-- Send events early using timestamp in the OSC bundle - used by Superdirt
-- Send events early by adding timestamp to the OSC message - used by Dirt
-- Send events live by delaying the thread
send :: Maybe O.Udp -> Cx -> Double -> Double -> (Double, Bool, O.Message) -> IO ()
send listen cx latency extraLatency (time, isBusMsg, m)
| oSchedule target == Pre BundleStamp = sendBndl isBusMsg listen cx $ O.Bundle timeWithLatency [m]
| oSchedule target == Pre MessageStamp = sendO isBusMsg listen cx $ addtime m
send :: Cx -> Double -> Double -> (Double, Bool, O.Message) -> IO ()
send cx latency extraLatency (time, isBusMsg, m)
| oSchedule target == Pre BundleStamp = sendBndl isBusMsg cx $ O.Bundle timeWithLatency [m]
| oSchedule target == Pre MessageStamp = sendO isBusMsg cx $ addtime m
| otherwise = do _ <- forkOS $ do now <- O.time
threadDelay $ floor $ (timeWithLatency - now) * 1000000
sendO isBusMsg listen cx m
sendO isBusMsg cx m
return ()
where addtime (O.Message mpath params) = O.Message mpath ((O.int32 sec):((O.int32 usec):params))
ut = O.ntpr_to_posix timeWithLatency
Expand All @@ -68,18 +102,15 @@ send listen cx latency extraLatency (time, isBusMsg, m)
target = cxTarget cx
timeWithLatency = time - latency + extraLatency

sendBndl :: Bool -> (Maybe O.Udp) -> Cx -> O.Bundle -> IO ()
sendBndl isBusMsg (Just listen) cx bndl = O.sendTo listen (O.Packet_Bundle bndl) (N.addrAddress addr)
sendBndl :: Bool -> Cx -> O.Bundle -> IO ()
sendBndl isBusMsg cx bndl = O.sendTo (cxUDP cx) (O.Packet_Bundle bndl) (N.addrAddress addr)
where addr | isBusMsg && isJust (cxBusAddr cx) = fromJust $ cxBusAddr cx
| otherwise = cxAddr cx
sendBndl _ Nothing cx bndl = O.sendBundle (cxUDP cx) bndl

sendO :: Bool -> (Maybe O.Udp) -> Cx -> O.Message -> IO ()
sendO isBusMsg (Just listen) cx msg = O.sendTo listen (O.Packet_Message msg) (N.addrAddress addr)
where addr | isBusMsg && isJust (cxBusAddr cx) = fromJust $ cxBusAddr cx
| otherwise = cxAddr cx
sendO _ Nothing cx msg = O.sendMessage (cxUDP cx) msg

sendO :: Bool -> Cx -> O.Message -> IO ()
sendO isBusMsg cx msg = O.sendTo (cxUDP cx) (O.Packet_Message msg) (N.addrAddress addr)
where addr | isBusMsg && isJust (cxBusAddr cx) = fromJust $ cxBusAddr cx
| otherwise = cxAddr cx

superdirtTarget :: Target
superdirtTarget = Target {oName = "SuperDirt",
Expand Down
4 changes: 2 additions & 2 deletions src/Sound/Tidal/Stream/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import qualified Sound.Tidal.Clock as Clock
import Sound.Tidal.Stream.Config

data Stream = Stream {sConfig :: Config,
sBusses :: MVar [Int],
sStateMV :: MVar ValueMap,
-- sOutput :: MVar ControlPattern,
sClockRef :: Clock.ClockRef,
Expand All @@ -27,7 +26,8 @@ data Cx = Cx {cxTarget :: Target,
cxUDP :: O.Udp,
cxOSCs :: [OSC],
cxAddr :: N.AddrInfo,
cxBusAddr :: Maybe N.AddrInfo
cxBusAddr :: Maybe N.AddrInfo,
cxBusses :: Maybe (MVar [Int])
}

data StampStyle = BundleStamp
Expand Down
18 changes: 2 additions & 16 deletions src/Sound/Tidal/Stream/UI.hs
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
{-# LANGUAGE BangPatterns, ScopedTypeVariables #-}
module Sound.Tidal.Stream.UI where

import Data.Maybe (isJust)
import qualified Data.Map as Map
import qualified Control.Exception as E
import Control.Concurrent.MVar
import System.IO (hPutStrLn, stderr)
import System.Random (getStdRandom, randomR)
import qualified Sound.Osc.Fd as O

import qualified Sound.Tidal.Clock as Clock
import Sound.Tidal.Stream.Types
import Sound.Tidal.Stream.Config
import Sound.Tidal.Stream.Process
import Sound.Tidal.Stream.Target

import Sound.Tidal.Pattern
import Sound.Tidal.ID
Expand Down Expand Up @@ -74,7 +71,7 @@ streamOnce st p = do i <- getStdRandom $ randomR (0, 8192)
streamFirst st $ rotL (toRational (i :: Int)) p

streamFirst :: Stream -> ControlPattern -> IO ()
streamFirst stream pat = onSingleTick (sConfig stream) (sClockRef stream) (sStateMV stream) (sBusses stream) (sPMapMV stream) (sGlobalFMV stream) (sCxs stream) (sListen stream) pat
streamFirst stream pat = onSingleTick (sConfig stream) (sClockRef stream) (sStateMV stream) (sPMapMV stream) (sGlobalFMV stream) (sCxs stream) pat

streamMute :: Stream -> ID -> IO ()
streamMute s k = withPatIds s [k] (\x -> x {mute = True})
Expand Down Expand Up @@ -140,15 +137,4 @@ streamSetB :: Stream -> String -> Pattern Bool -> IO ()
streamSetB = streamSet

streamSetR :: Stream -> String -> Pattern Rational -> IO ()
streamSetR = streamSet

-- It only really works to handshake with one target at the moment..
sendHandshakes :: Stream -> IO ()
sendHandshakes stream = mapM_ sendHandshake $ filter (oHandshake . cxTarget) (sCxs stream)
where sendHandshake cx = if (isJust $ sListen stream)
then
do -- send it _from_ the udp socket we're listening to, so the
-- replies go back there
sendO False (sListen stream) cx $ O.Message "/dirt/handshake" []
else
hPutStrLn stderr "Can't handshake with SuperCollider without control port."
streamSetR = streamSet

0 comments on commit c3d9dd3

Please sign in to comment.