diff --git a/Commands/Get-WebSocket.ps1 b/Commands/Get-WebSocket.ps1 new file mode 100644 index 0000000..37ee9b3 --- /dev/null +++ b/Commands/Get-WebSocket.ps1 @@ -0,0 +1,217 @@ +function Get-WebSocket { + <# + .SYNOPSIS + WebSockets in PowerShell. + .DESCRIPTION + Get-WebSocket allows you to connect to a websocket and handle the output. + .EXAMPLE + # Create a WebSocket job that connects to a WebSocket and outputs the results. + Get-WebSocket -WebSocketUri "wss://localhost:9669" + .EXAMPLE + # Get is the default verb, so we can just say WebSocket. + websocket wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.feed.post + .EXAMPLE + websocket jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.feed.post -Tail | + Foreach-Object { + $in = $_ + if ($in.commit.record.text -match '[\p{IsHighSurrogates}\p{IsLowSurrogates}]+') { + $matches.0 + } + } + #> + [CmdletBinding(PositionalBinding=$false)] + param( + # The Uri of the WebSocket to connect to. + [Parameter(Position=0,ValueFromPipelineByPropertyName)] + [Alias('Url','Uri')] + [uri]$WebSocketUri, + + # A ScriptBlock that will handle the output of the WebSocket. + [ScriptBlock] + $Handler, + + # Any variables to declare in the WebSocket job. + [Collections.IDictionary] + $Variable = @{}, + + # The name of the WebSocket job. + [string] + $Name, + + # The script to run when the WebSocket job starts. + [ScriptBlock] + $InitializationScript = {}, + + # The buffer size. Defaults to 16kb. + [int] + $BufferSize = 16kb, + + # The ScriptBlock to run after connection to a websocket. + # This can be useful for making any initial requests. + [ScriptBlock] + $OnConnect, + + # The ScriptBlock to run when an error occurs. + [ScriptBlock] + $OnError, + + # The ScriptBlock to run when the WebSocket job outputs an object. + [ScriptBlock] + $OnOutput, + + # The Scriptblock to run when the WebSocket job produces a warning. + [ScriptBlock] + $OnWarning, + + # If set, will tail the output of the WebSocket job, outputting results continuously instead of outputting a websocket job. + [switch] + $Watch, + + # The maximum time to wait for a connection to be established. + # By default, this is 7 seconds. + [TimeSpan] + $ConnectionTimeout = '00:00:07', + + # The Runspace where the handler should run. + # Runspaces allow you to limit the scope of the handler. + [Runspace] + $Runspace, + + # The RunspacePool where the handler should run. + # RunspacePools allow you to limit the scope of the handler to a pool of runspaces. + [Management.Automation.Runspaces.RunspacePool] + [Alias('Pool')] + $RunspacePool + ) + + begin { + $SocketJob = { + param([Collections.IDictionary]$Variable) + + foreach ($keyValue in $variable.GetEnumerator()) { + $ExecutionContext.SessionState.PSVariable.Set($keyValue.Key, $keyValue.Value) + } + + if ((-not $WebSocketUri) -or $webSocket) { + throw "No WebSocketUri" + } + + if (-not $WebSocketUri.Scheme) { + $WebSocketUri = [uri]"wss://$WebSocketUri" + } + + if (-not $BufferSize) { + $BufferSize = 16kb + } + + $CT = [Threading.CancellationToken]::None + + if (-not $webSocket) { + $ws = [Net.WebSockets.ClientWebSocket]::new() + $null = $ws.ConnectAsync($WebSocketUri, $CT).Wait() + } else { + $ws = $WebSocket + } + + $Variable.WebSocket = $ws + + + while ($true) { + if ($ws.State -ne 'Open') {break } + $Buf = [byte[]]::new($BufferSize) + $Seg = [ArraySegment[byte]]::new($Buf) + $null = $ws.ReceiveAsync($Seg, $CT).Wait() + $JS = $OutputEncoding.GetString($Buf, 0, $Buf.Count) + if ([string]::IsNullOrWhitespace($JS)) { continue } + try { + $webSocketMessage = ConvertFrom-Json $JS + if ($handler) { + $psCmd = + if ($runspace.LanguageMode -eq 'NoLanguage' -or + $runspacePool.InitialSessionState.LanguageMode -eq 'NoLanguage') { + $handler.GetPowerShell() + } elseif ($Runspace -or $RunspacePool) { + [PowerShell]::Create().AddScript($handler) + } + if ($psCmd) { + if ($Runspace) { + $psCmd.Runspace = $Runspace + } elseif ($RunspacePool) { + $psCmd.RunspacePool = $RunspacePool + } + } else { + $webSocketMessage | . $handler + } + + } else { + $webSocketMessage + } + } catch { + Write-Error $_ + } + } + } + } + + process { + foreach ($keyValuePair in $PSBoundParameters.GetEnumerator()) { + $Variable[$keyValuePair.Key] = $keyValuePair.Value + } + $webSocketJob = + if ($WebSocketUri) { + if (-not $name) { + $Name = $WebSocketUri + } + + Start-ThreadJob -ScriptBlock $SocketJob -Name $Name -InitializationScript $InitializationScript -ArgumentList $Variable + } elseif ($WebSocket) { + if (-not $name) { + $name = "websocket" + } + Start-ThreadJob -ScriptBlock $SocketJob -Name $Name -InitializationScript $InitializationScript -ArgumentList $Variable + } + + $subscriptionSplat = @{ + EventName = 'DataAdded' + MessageData = $webSocketJob + SupportEvent = $true + } + $eventSubscriptions = @( + if ($OnOutput) { + Register-ObjectEvent @subscriptionSplat -InputObject $webSocketJob.Output -Action $OnOutput + } + if ($OnError) { + Register-ObjectEvent @subscriptionSplat -InputObject $webSocketJob.Error -Action $OnError + } + if ($OnWarning) { + Register-ObjectEvent @subscriptionSplat -InputObject $webSocketJob.Warning -Action $OnWarning + } + ) + if ($eventSubscriptions) { + $variable['EventSubscriptions'] = $eventSubscriptions + } + + $webSocketConnectTimeout = [DateTime]::Now + $ConnectionTimeout + while (-not $variable['WebSocket'] -and + ([DateTime]::Now -lt $webSocketConnectTimeout)) { + Start-Sleep -Milliseconds 0 + } + + foreach ($keyValuePair in $Variable.GetEnumerator()) { + $webSocketJob.psobject.properties.add( + [psnoteproperty]::new($keyValuePair.Key, $keyValuePair.Value), $true + ) + } + $webSocketJob.pstypenames.insert(0, 'WebSocketJob') + if ($Watch) { + do { + $webSocketJob | Receive-Job + Start-Sleep -Milliseconds ( + 7, 11, 13, 17, 19, 23 | Get-Random + ) + } while ($webSocketJob.State -in 'Running','NotStarted') + } else { + $webSocketJob + } + } +}