Skip to content

agrathwohl/t140llm

Repository files navigation

T140LLM

t140llm

A TypeScript library to convert LLM streaming responses into T.140 real-time text format for SIP, WebRTC and (S)RTP applications


Convert LLM streaming responses into T.140 real-time text

Table of contents

Pre-requisites

  • Node.js >= 10.18.1
  • NPM >= 6.13.4 (NPM comes with Node.js so there is no need to install separately.)

Setup

Install

# Install via NPM
$ npm install --save t140llm

Features

  • T.140 RTP Payload Formatting
  • T.140 redundancy
  • T.140 FEC (forward error correction)
  • (S)RTP Direct Delivery
  • Customizable Rate Limiting and Token Pooling
  • Custom Transport Streams (WebRTC, custom protocols, etc.)
  • UNIX SEQPACKET sockets (for supporting >1 LLM stream simultaneously)
  • UNIX STREAM sockets (for single LLM stream support)
  • WebSocket

Support

  • Vercel AI SDK
  • Anthropic SDK
  • OpenAI SDK
  • Cohere
  • Mistral
  • Amazon (Bedrock)
  • Google (Gemini/PaLM)
  • Ollama
  • Reasoning Support
  • Binary Data
  • Tools
  • LLM Output Metadata
  • PDFs/Documents
  • Images
  • Video
  • Signaling
  • Custom RTP Packet Data

Usage

Basic Usage

import { processAIStream } from "t140llm";
import { OpenAI } from "openai";

// Initialize your LLM client
const openai = new OpenAI({
  apiKey: process.env.OPENAI_API_KEY,
});

// Create a streaming response
const stream = await openai.chat.completions.create({
  model: "gpt-4",
  messages: [{ role: "user", content: "Write a short story." }],
  stream: true,
});

// Process the stream and convert to T.140
processAIStream(stream);

With Vercel AI SDK

import { processAIStream } from "t140llm";
import { StreamingTextResponse, Message } from "ai";
import { OpenAI } from "openai";

// Initialize OpenAI client
const openai = new OpenAI({
  apiKey: process.env.OPENAI_API_KEY,
});

// Example API route handler
export async function POST(req: Request) {
  const { messages }: { messages: Message[] } = await req.json();

  // Create a stream with the Vercel AI SDK
  const response = await openai.chat.completions.create({
    model: "gpt-4",
    messages,
    stream: true,
  });

  // Process the stream with t140llm
  processAIStream(response);

  // You can still return the response to the client
  return new StreamingTextResponse(response);
}

With Anthropic Claude

import { processAIStream } from "t140llm";
import Anthropic from "@anthropic-ai/sdk";

// Initialize Anthropic client
const anthropic = new Anthropic({
  apiKey: process.env.ANTHROPIC_API_KEY,
});

// Create a streaming response
const stream = await anthropic.messages.create({
  model: "claude-3-sonnet-20240229",
  messages: [{ role: "user", content: "Write a short story." }],
  stream: true,
});

// Process the stream and convert to T.140
processAIStream(stream);

With Mistral AI

import { processAIStream } from "t140llm";
import MistralClient from "@mistralai/mistralai";

// Initialize Mistral client
const mistral = new MistralClient({
  apiKey: process.env.MISTRAL_API_KEY,
});

// Create a streaming response
const stream = await mistral.chat({
  model: "mistral-large-latest",
  messages: [{ role: "user", content: "Write a short story." }],
  stream: true,
});

// Process the stream and convert to T.140
processAIStream(stream);

With Cohere

import { processAIStream } from "t140llm";
import { CohereClient } from "cohere-ai";

// Initialize Cohere client
const cohere = new CohereClient({
  token: process.env.COHERE_API_KEY,
});

// Create a streaming response
const stream = await cohere.chatStream({
  model: "command",
  message: "Write a short story.",
});

// Process the stream and convert to T.140
processAIStream(stream);

With Ollama

import { processAIStream } from "t140llm";
import { Ollama } from "ollama";

// Initialize Ollama client
const ollama = new Ollama();

// Create a streaming response
const stream = await ollama.chat({
  model: "llama3",
  messages: [{ role: "user", content: "Write a short story." }],
  stream: true,
});

// Process the stream and convert to T.140
processAIStream(stream);

Direct RTP Streaming

For direct RTP streaming without needing a WebSocket intermediary:

import { processAIStreamToRtp } from "t140llm";
import { OpenAI } from "openai";

// Initialize your LLM client
const openai = new OpenAI({
  apiKey: process.env.OPENAI_API_KEY,
});

// Create a streaming response
const stream = await openai.chat.completions.create({
  model: "gpt-4",
  messages: [{ role: "user", content: "Write a short story." }],
  stream: true,
});

// Stream directly to a remote endpoint using RTP
const transport = processAIStreamToRtp(
  stream,
  "192.168.1.100", // Remote IP address
  5004, // RTP port (optional, default: 5004)
  {
    payloadType: 96, // T.140 payload type (optional)
    ssrc: 12345, // RTP SSRC identifier (optional)
    initialSequenceNumber: 0, // Starting sequence number (optional)
    initialTimestamp: 0, // Starting timestamp (optional)
    timestampIncrement: 160, // Timestamp increment per packet (optional)
  },
);

// Later, you can close the transport if needed
// transport.close();

Secure SRTP Streaming

For secure SRTP streaming:

import { processAIStreamToSrtp, createSrtpKeysFromPassphrase } from "t140llm";
import { OpenAI } from "openai";

// Initialize your LLM client
const openai = new OpenAI({
  apiKey: process.env.OPENAI_API_KEY,
});

// Create a streaming response
const stream = await openai.chat.completions.create({
  model: "gpt-4",
  messages: [{ role: "user", content: "Write a short story." }],
  stream: true,
});

// Generate SRTP keys from a passphrase
// In a real application, you would exchange these securely with the remote endpoint
const { masterKey, masterSalt } = createSrtpKeysFromPassphrase(
  "your-secure-passphrase",
);

// Stream directly to a remote endpoint using SRTP
const transport = processAIStreamToSrtp(
  stream,
  "192.168.1.100", // Remote IP address
  {
    masterKey, // SRTP master key
    masterSalt, // SRTP master salt
    payloadType: 96, // T.140 payload type (optional)
  },
  5006, // SRTP port (optional, default: 5006)
);

// Later, you can close the transport if needed
// transport.close();

With Forward Error Correction

For RTP streaming with Forward Error Correction (FEC) according to RFC 5109:

import { processAIStreamToRtp } from "t140llm";
import { OpenAI } from "openai";

// Initialize your LLM client
const openai = new OpenAI({
  apiKey: process.env.OPENAI_API_KEY,
});

// Create a streaming response
const stream = await openai.chat.completions.create({
  model: "gpt-4",
  messages: [{ role: "user", content: "Write a short story." }],
  stream: true,
});

// Stream directly to a remote endpoint using RTP with FEC enabled
const transport = processAIStreamToRtp(
  stream,
  "192.168.1.100", // Remote IP address
  5004, // RTP port (optional, default: 5004)
  {
    payloadType: 96, // T.140 payload type
    ssrc: 12345, // RTP SSRC identifier
    // FEC configuration
    fecEnabled: true, // Enable Forward Error Correction
    fecPayloadType: 97, // Payload type for FEC packets
    fecGroupSize: 5, // Number of media packets to protect with one FEC packet
  },
);

// Later, you can close the transport if needed
// transport.close();

With Custom Transport

You can use your own transport mechanism instead of the built-in UDP socket:

import { processAIStreamToRtp } from "t140llm";
import { OpenAI } from "openai";

// Initialize your LLM client
const openai = new OpenAI({
  apiKey: process.env.OPENAI_API_KEY,
});

// Create a streaming response
const stream = await openai.chat.completions.create({
  model: "gpt-4",
  messages: [{ role: "user", content: "Write a short story." }],
  stream: true,
});

// Create a custom transport (e.g., WebRTC data channel, custom socket, etc.)
class MyCustomTransport {
  send(data, callback) {
    // Send the data using your custom transport mechanism
    console.log(`Sending ${data.length} bytes`);
    // ...your sending logic here...

    // Call the callback when done (or with an error if it failed)
    if (callback) callback();
  }

  close() {
    // Clean up resources when done
    console.log("Transport closed");
  }
}

// Stream using the custom transport
const customTransport = new MyCustomTransport();
const transport = processAIStreamToRtp(
  stream,
  "dummy-address", // Not used with custom transport
  5004, // Not used with custom transport
  {
    customTransport, // Your custom transport implementation
    payloadType: 96,
    redEnabled: true, // You can still use features like redundancy with custom transport
  },
);

// The transport will be closed automatically when the stream ends

Pre-connecting to Transport

You can establish the transport connection before the LLM stream is available, which can reduce latency when the stream starts:

import { createT140WebSocketConnection } from "t140llm";

// Create the WebSocket connection early, before the LLM stream is available
const { connection, attachStream } = createT140WebSocketConnection(
  "ws://localhost:5004",
);

// Later, when the LLM stream becomes available, attach it to the existing connection
function handleLLMResponse(llmStream) {
  // Attach the stream to the pre-created connection
  attachStream(llmStream, {
    processBackspaces: true,
    handleMetadata: true,
  });
}

// Similar pre-connection functions are available for all transport types:
// - createDirectSocketTransport()
// - createT140RtpTransport()
// - createT140SrtpTransport()

This is especially useful in scenarios where:

  1. You want to establish the connection in advance to minimize latency
  2. You need to reuse the same transport for multiple LLM streams
  3. Your architecture needs to separate transport creation from stream processing

See the examples/pre_connect_example.js file for complete examples of pre-connecting with different transport types.

With Reasoning Stream Processing

Some LLM providers can stream their reasoning process as separate metadata alongside the generated text. This allows applications to show both the LLM's thought process and its final output:

import { processAIStream } from "t140llm";
import Anthropic from "@anthropic-ai/sdk";

// Initialize Anthropic client
const anthropic = new Anthropic({
  apiKey: process.env.ANTHROPIC_API_KEY,
});

// Create a streaming response with reasoning
const stream = await anthropic.messages.create({
  model: "claude-3-sonnet-20240229",
  messages: [{ role: "user", content: "Solve this math problem: 2x + 5 = 13" }],
  stream: true,
});

// Create a custom reasoning handler
const handleReasoning = (metadata) => {
  if (metadata.type === "reasoning") {
    console.log("REASONING:", metadata.content);
  }
};

// Process the stream with reasoning handling
processAIStream(stream, "ws://localhost:3000", {
  handleMetadata: true,
  metadataCallback: handleReasoning,
  sendMetadataOverWebsocket: true, // Also send reasoning over WebSocket
});

For more advanced usage, including separate transports for text and reasoning, see the examples/reasoning_example.js and examples/reasoning_direct_socket_example.js examples.

Why?

The T.140 protocol is a well-defined standard for transmitting text conversations over IP networks in real-time, making it an effective way to transmit text as it is being written to satelites, noisy environments, and environments where low latency transmission is a requirement. Unlike other approaches, the T.140 standard enables transmission of text before the entire message has been both composed and sent.

Because LLMs do not make mistakes while "typing," there is no true downside to using such an approach for transmitting the data they output. That said, we did provide support for backspace characters, should you require this! Using T.140, you can both reduce the overall file size of packets being delivered, and improve your quality of experience when latency is a particularly sensitive measurement. Typically you can expect at minimum a 10% reduction in latency compared with websockets.

How It Works

WebSocket Mode

  1. The library sets up a WebSocket server to receive text chunks.
  2. When an LLM stream is processed, each text chunk is sent through the WebSocket.
  3. The WebSocket server encapsulates the text in T.140 format using RTP packets.
  4. The RTP packets are sent through a Unix SEQPACKET socket.
  5. Your application can read from this socket to get the real-time text data.

Direct RTP Mode

  1. The library creates a UDP socket to send RTP packets.
  2. When an LLM stream is processed, each text chunk is packaged as T.140 in an RTP packet.
  3. The RTP packets are sent directly to the specified IP address and port.
  4. If Forward Error Correction (FEC) is enabled, the library will:
    • Store packets in a buffer
    • Generate FEC packets using XOR-based operations following RFC 5109
    • Send FEC packets at configured intervals (based on group size)
  5. Your application can receive these packets directly from the UDP socket, using FEC packets to recover from packet loss.

Secure SRTP Mode

  1. The library creates a UDP socket and initializes an SRTP session with the provided keys.
  2. When an LLM stream is processed, each text chunk is packaged as T.140 in an RTP packet.
  3. The RTP packets are encrypted using SRTP with the configured encryption parameters.
  4. The encrypted SRTP packets are sent to the specified IP address and port.
  5. Your application can decrypt and receive these packets using the same SRTP parameters.

API Reference

processAIStream(stream, [websocketUrl])

  • stream The streaming data source that emits text chunks.
  • websocketUrl <string> Optional. WebSocket URL to connect to. Defaults to ws://localhost:8765.
  • returns:

Processes an AI stream and sends the text chunks as T.140 data through a WebSocket.

processAIStreamToRtp(stream, remoteAddress, [remotePort], [rtpConfig])

  • stream The streaming data source that emits text chunks.
  • remoteAddress <string> The remote IP address to send RTP packets to. Only used if no custom transport is provided.
  • remotePort <number> Optional. The remote port to send RTP packets to. Defaults to 5004. Only used if no custom transport is provided.
  • rtpConfig Optional. Configuration options for RTP:
    • payloadType <number> Optional. The RTP payload type. Defaults to 96.
    • ssrc <number> Optional. The RTP synchronization source. Defaults to a cryptographically secure random value.
    • initialSequenceNumber <number> Optional. The initial sequence number. Defaults to 0.
    • initialTimestamp <number> Optional. The initial timestamp. Defaults to 0.
    • timestampIncrement <number> Optional. The timestamp increment per packet. Defaults to 160.
    • fecEnabled <boolean> Optional. Enable Forward Error Correction. Defaults to false.
    • fecPayloadType <number> Optional. The payload type for FEC packets. Defaults to 97.

createT140WebSocketConnection(websocketUrl, [options])

  • websocketUrl <string> Optional. WebSocket URL to connect to. Defaults to ws://localhost:8765.
  • options Optional. Configuration options:
    • tlsOptions Optional. SSL/TLS options for secure WebSocket connections.
    • returns: An object containing:
      • connection The WebSocket connection
      • attachStream A function to attach a TextDataStream to this connection

      Creates a WebSocket connection that can be used for T.140 transport. This allows establishing the connection before the LLM stream is available.

      createDirectSocketTransport(socketPath, [rtpConfig])

      • socketPath <string> Optional. Path to the SEQPACKET socket. Defaults to the library's default socket path.
      • rtpConfig Optional. Configuration options for RTP (same as in processAIStreamToRtp).
      • returns: An object containing:
        • transport <Socket|TransportStream> The direct socket or custom transport
        • attachStream A function to attach a TextDataStream to this transport
        • rtpState Current RTP state (sequence number, timestamp, ssrc)

          Creates a direct socket transport that can be used for T.140 RTP transmission. This allows establishing the connection before the LLM stream is available.

          createT140RtpTransport(remoteAddress, [remotePort], [rtpConfig])

          • remoteAddress <string> The remote IP address to send RTP packets to.
          • remotePort <number> Optional. The remote port to send RTP packets to. Defaults to 5004.
          • rtpConfig Optional. Configuration options for RTP (same as in processAIStreamToRtp).
          • returns: An object containing:
            • transport The RTP transport instance
            • attachStream A function to attach a TextDataStream to this transport

            Creates an RTP transport that can be used for T.140 transmission. This allows establishing the connection before the LLM stream is available.

            createT140SrtpTransport(remoteAddress, srtpConfig, [remotePort])

            • remoteAddress <string> The remote IP address to send SRTP packets to.
            • srtpConfig SRTP configuration with master key and salt.
            • remotePort <number> Optional. The remote port to send SRTP packets to. Defaults to 5006.
            • returns: An object containing:
              • transport The RTP transport instance configured for SRTP
              • attachStream A function to attach a TextDataStream to this transport

              Creates an SRTP transport that can be used for secure T.140 transmission. This allows establishing the connection before the LLM stream is available.

              processAIStreamToRtp(stream, remoteAddress, [remotePort], [rtpConfig])

              • stream The streaming data source that emits text chunks.
              • remoteAddress <string> The remote IP address to send RTP packets to.
              • remotePort <number> Optional. The remote port to send RTP packets to. Defaults to 5004.
              • rtpConfig Optional. Configuration options for RTP:
                • payloadType <number> Optional. The RTP payload type. Defaults to 96.
                • ssrc <number> Optional. The RTP synchronization source. Defaults to a cryptographically secure random value.
                • initialSequenceNumber <number> Optional. The initial sequence number. Defaults to 0.
                • initialTimestamp <number> Optional. The initial timestamp. Defaults to 0.
                • timestampIncrement <number> Optional. The timestamp increment per packet. Defaults to 160.
                • fecEnabled <boolean> Optional. Enable Forward Error Correction. Defaults to false.
                • fecPayloadType <number> Optional. The payload type for FEC packets. Defaults to 97.
                • fecGroupSize <number> Optional. Number of media packets to protect with one FEC packet. Defaults to 5.
                • customTransport Optional. A custom transport implementation to use instead of the default UDP socket.
              • returns: The transport object that can be used to close the connection.

              Processes an AI stream and sends the text chunks directly as T.140 data over RTP. When FEC is enabled, it adds Forward Error Correction packets according to RFC 5109 to help recover from packet loss. If a custom transport is provided, it will be used instead of creating a UDP socket.

              processAIStreamToSrtp(stream, remoteAddress, srtpConfig, [remotePort])

              • stream The streaming data source that emits text chunks.
              • remoteAddress <string> The remote IP address to send SRTP packets to. Only used if no custom transport is provided.
              • srtpConfig SRTP configuration including master key and salt.
                • masterKey Required. The SRTP master key.
                • masterSalt Required. The SRTP master salt.
                • profile <number> Optional. The SRTP crypto profile.
                • customTransport Optional. A custom transport implementation to use instead of the default UDP socket.
              • remotePort <number> Optional. The remote port to send SRTP packets to. Defaults to 5006. Only used if no custom transport is provided.
              • returns: The transport object that can be used to close the connection.

              Processes an AI stream and sends the text chunks directly as T.140 data over secure SRTP. If a custom transport is provided, it will be used instead of creating a UDP socket.

              createRtpPacket(sequenceNumber, timestamp, payload, [options])

              • sequenceNumber <number> RTP sequence number.
              • timestamp <number> RTP timestamp.
              • payload <string> Text payload to encapsulate.
              • options <Partial> Optional. Configuration options for the RTP packet.
              • returns: RTP packet with T.140 payload.

              Creates an RTP packet with a T.140 payload.

              createSrtpKeysFromPassphrase(passphrase)

              • passphrase <string> A passphrase to derive SRTP keys from.
              • returns: An object containing the masterKey and masterSalt for SRTP.

                Creates SRTP master key and salt from a passphrase. For production, use a more secure key derivation function.

                T140RtpTransport

                A class that manages RTP/SRTP connections for sending T.140 data.

                constructor(remoteAddress, [remotePort], [config])

                • remoteAddress <string> The remote IP address to send packets to. Only used if no custom transport is provided.
                • remotePort <number> Optional. The remote port to send packets to. Defaults to 5004. Only used if no custom transport is provided.
                • config Optional. Configuration options for RTP, including FEC options and custom transport.
                  • customTransport Optional. A custom transport implementation to use instead of the default UDP socket.

                setupSrtp(srtpConfig)

                • srtpConfig SRTP configuration including master key and salt.
                • returns:

                Initializes and configures SRTP for secure transmission.

                sendText(text)

                • text <string> The text to send as T.140.
                • returns:

                Sends text data as T.140 over RTP or SRTP. If FEC is enabled, it will also generate and send FEC packets according to the configured group size.

                close()

                • returns:

                Closes the UDP socket or custom transport and cleans up resources. If FEC is enabled, it will send any remaining FEC packets before closing.

                TransportStream Interface

                An interface that custom transport implementations must follow to be compatible with T140RtpTransport.

                send(data, callback)

                • data The packet data to send.
                • callback Optional. Called when the packet has been sent or if an error occurred.
                  • error Optional. The error that occurred during sending, if any.
                • returns:

                Sends a packet through the transport.

                close()

                • returns:

                Optional method to close the transport and clean up resources.

                License

                MIT License © agrathwohl