Skip to content

redis-applied-ai/redis-ml-inference-pipeline

Folders and files

NameName
Last commit message
Last commit date

Latest commit

ย 

History

2 Commits
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 

Repository files navigation

Redis Logo

๐Ÿš€ Redis ML Inference Pipeline

High-Performance Fraud Detection with Redis Stack

Architecture โ€ข Features โ€ข Quick Start โ€ข Performance โ€ข Statistics โ€ข Monitoring โ€ข Advanced Options


Transform your machine learning models into resilient, real-time decision engines

This reference architecture demonstrates how to build high-performance, scalable ML inference pipelines using Redis Stack. This solution is specifically designed for fraud detection but can be adapted for any real-time prediction scenario requiring low latency, high throughput, and horizontal scalability.

Key Benefits:

  • Low Latency: <50ms end-to-end processing time
  • High Throughput: 1000+ transactions/second with multiple workers
  • Horizontal Scaling: Add workers to linearly increase throughput
  • Resilience: Automatic recovery from failures using streams and consumer groups
  • Observability: Built-in latency tracking and performance metrics by test run

๐Ÿ—๏ธ Architecture

Redis ML Inference Architecture

This reference implementation leverages Redis Stack's capabilities to create a complete real-time ML inference pipeline:

1. ๐Ÿ“Š Data Layer

Redis JSON stores transaction data and user profiles with fully indexed fields:

  • User profiles contain static information like preferences and risk categories
  • Historical transactions provide context for fraud pattern detection
  • RediSearch provides high-performance querying across all data types

2. ๐Ÿงฎ Feature Layer

Combines static and dynamic features in real-time:

  • Pre-computed features stored in Redis are retrieved in microseconds
  • On-the-fly calculated features (e.g., spending patterns) use Redis Query Engine
  • Efficient vector operations for geospatial or embedding-based features

3. ๐Ÿค– Model Layer

Stores and manages ML models directly in Redis:

  • Versioned model storage with redis-model-store
  • Automatic serialization/deserialization of scikit-learn models
  • Optimized for fast inference (5-10ms per prediction)

4. ๐Ÿ”„ Inference Layer

Stream-based architecture for scalable, resilient processing:

  • Redis Streams provide persistence, exactly-once delivery, and fault tolerance
  • Consumer groups enable parallel processing across multiple workers
  • Configurable batch processing improves throughput for high-volume scenarios

5. ๐Ÿ“ˆ Monitoring Layer

Real-time result tracking and performance analysis:

  • Run-based metrics segmentation for testing and comparison
  • Latency tracking at multiple pipeline stages
  • Comprehensive statistics for performance tuning

๐Ÿ’พ ML Features

The feature engineering system combines multiple data sources for comprehensive fraud detection:

Feature Type Examples Source Description
Static
(Profile)
home_location
preferred_card
user_segment
risk_score
Redis JSON Precomputed during profile creation, stored with RedisJSON for sub-millisecond access. These provide baseline user behavior patterns.
Dynamic transaction_velocity
amount_vs_average
location_deviation
Calculated Computed at inference time using RediSearch to quickly aggregate historical data. These capture recent behavior changes that might indicate fraud.
Transaction amount
merchant_id
timestamp
device_info
Input Stream Provided with each transaction from source systems. These represent the current activity being evaluated for potential fraud.
Derived is_home_location
is_preferred_card
time_since_last_txn
Hybrid Features created by combining static and transaction data. These provide additional context by relating current activity to known patterns.

๐Ÿš€ Quick Start

Prerequisites

  • Python 3.10+
  • Redis Stack 7.2+ (or Redis 8.0+)
  • Poetry for dependency management

Installation

# Clone the repository
git clone https://github.com/yourusername/redis-ml-inference-pipeline
cd redis-ml-inference-pipeline

# Install dependencies
poetry install

Getting Started (Happy Path)

Follow these steps to get the pipeline running with minimal configuration:

  1. Prepare the Environment

    # Initialize the system (loads data, creates indices, trains model)
    poetry run prepare
  2. Run an End-to-End Test

    # Start a complete load test with default settings
    poetry run load-test --tps 10 --duration 30

That's it! This will:

  • Train and store a fraud detection model
  • Start an inference worker
  • Generate synthetic transactions
  • Process them through the ML pipeline
  • Display detailed statistics when complete

Viewing Results

To see detailed statistics for your test run:

# View statistics for the most recent run
poetry run monitor --stats

๐Ÿš„ Performance

The pipeline is optimized for high throughput and low latency:

  • Single Worker: ~100-200 TPS with ~10ms average latency
  • Parallel Mode: ~500 TPS with 4 threads per worker
  • Horizontal Scaling: Linear scaling with additional workers (1000+ TPS with 5+ workers)
  • Batch Processing: 2-5x throughput improvement with batch sizes of 10-50

๐Ÿ“Š Statistics System

The statistics system provides comprehensive metrics for each test run:

Key Features

  • Run-Based Tracking: All metrics are segregated by unique run ID
  • Complete Metrics: Captures transactions, latency, throughput, and fraud rates
  • Centralized Storage: All statistics stored in Redis for persistence
  • Worker Aggregation: Combines metrics from multiple workers
  • Real-Time Updates: Statistics available during and after test runs

Viewing Statistics

# Show statistics for a specific run
poetry run monitor --stats --run-id your-run-id

# List all available test runs
poetry run monitor --runs

# View statistics for the most recent run
poetry run monitor --stats

Available Metrics

  • Basic Statistics: Transactions processed, fraud detected, throughput
  • Latency Metrics: Avg, median, p95, p99, min/max latency
  • Worker Information: Number of workers, processing modes, thread counts
  • Test Details: Duration, pattern type, start/end times

๐Ÿ“Š Monitoring and Metrics

The pipeline includes comprehensive monitoring capabilities for real-time analysis and performance tuning:

Live Monitoring

# View real-time transaction processing
poetry run monitor

Historical Analysis

# View detailed statistics for a specific run
poetry run monitor --stats --run-id your-run-id

# List all available runs
poetry run monitor --runs

๐Ÿ› ๏ธ Advanced Options

Component-Based Execution

For more granular control, you can run individual components separately:

  1. Start Inference Workers

    # Start a single worker in standard mode
    poetry run worker
    
    # Start a worker with parallel processing
    poetry run worker --parallel --threads 4
    
    # Start multiple workers
    poetry run worker --workers 3 --parallel
  2. Generate Test Transactions

    # Generate a specific number of transactions
    poetry run generate --count 100 --interval 0.1
    
    # Generate continuous traffic with a specific pattern
    poetry run generate --tps 50 --duration 60 --pattern wave
  3. Monitor Results

    # Monitor all transactions in real-time
    poetry run monitor
    
    # Show only the latest results
    poetry run monitor --latest --count 20

Traffic Patterns

The transaction generator supports different traffic patterns:

  • constant: Steady stream of transactions at the specified TPS
  • wave: Sinusoidal pattern that varies the TPS over time
  • spike: Sudden burst of transactions in the middle of the test
  • random: Random fluctuations in transaction rate

Example:

# Generate a wave pattern of transactions
poetry run generate --tps 50 --duration 120 --pattern wave

Worker Processing Modes

The inference worker supports different processing models:

  • Sequential: Process one transaction at a time (default)
  • Parallel: Process multiple transactions in parallel using threads
  • Batch: Process transactions in batches for higher throughput

Example:

# Run worker in parallel mode with 4 threads
poetry run worker --parallel --threads 4 --batch-size 20

Run ID System

Each test uses a unique run ID for isolation and tracking:

# Generate transactions with a specific run ID
poetry run generate --tps 20 --duration 30 --run-id custom-test-1

# View statistics for that run
poetry run monitor --stats --run-id custom-test-1

Configuration Options

Redis Connection Settings (click to expand)

Set these environment variables to connect to a custom Redis instance:

export REDIS_HOST=your-redis-host
export REDIS_PORT=your-redis-port
export REDIS_PASSWORD=your-redis-password
Key Configuration Options (click to expand)

All settings are in pipeline/settings.py and can be overridden with environment variables:

  • NAMESPACE: Prefix for Redis keys (default: "inference-pipeline")
  • FRAUD_THRESHOLD: Detection threshold (default: 0.7)
  • RAW_DATA_PATH: Path to transaction data (default: "data/raw/transactions_200.json")
  • MODEL_NAME: Name for the model in ModelStore (default: "fraud-detection-model")
  • TRANSACTION_STREAM: Name for input transaction stream (default: "transaction_stream")
  • FRAUD_RESULTS_STREAM: Name for results stream (default: "fraud_results_stream")
  • DEFAULT_CONSUMER_GROUP: Default consumer group name (default: "inference_workers")
  • MAX_STREAM_LENGTH: Maximum length of streams before trimming (default: 10000)
  • RESULT_TTL: Time-to-live for results in seconds (default: 86400)

๐Ÿ“œ Command Reference

Worker Command (click to expand)
poetry run worker [OPTIONS]

Options:

  • --model-version: Specific model version to use
  • --batch-size: Number of messages to process in each batch (default: 10)
  • --timeout: Timeout in milliseconds for blocking read (default: 5000)
  • --workers: Number of worker processes to start (default: 1)
  • --parallel: Process in parallel mode using threads
  • --threads: Number of threads to use in parallel mode (default: 4)
  • --consumer-group: Consumer group name (default: "inference_workers")
Generator Command (click to expand)
poetry run generate [OPTIONS]

Options:

  • --count: Number of transactions to generate (default: 10)
  • --interval: Time interval between transactions in seconds (default: 1.0)
  • --tps: Transactions per second (alternative to --interval)
  • --duration: Duration in seconds (alternative to --count)
  • --fraud-ratio: Ratio of fraudulent transactions (default: 0.1)
  • --threads: Number of threads for parallel generation (default: 1)
  • --pattern: Traffic pattern to generate (choices: constant, wave, spike, random)
  • --run-id: Specific run ID to use (optional, auto-generated if not provided)
Monitor Command (click to expand)
poetry run monitor [OPTIONS]

Options:

  • --latest: Show latest results and exit
  • --count: Number of results to display (default: 10)
  • --interval: Refresh interval in seconds for live monitoring (default: 2.0)
  • --run-id: Filter results by specific run ID
  • --runs: List all available test runs
  • --stats: Show detailed statistics for a run
Load Test Command (click to expand)
poetry run load-test [OPTIONS]

Transaction Generator Options:

  • --tps: Transactions per second to generate (default: 100.0)
  • --duration: Test duration in seconds (default: 60)
  • --fraud-ratio: Ratio of fraudulent transactions (default: 0.1)
  • --pattern: Traffic pattern to generate (default: constant)
  • --generator-threads: Threads for transaction generator (default: 2)

Worker Options:

  • --workers: Number of workers to start (default: 2)
  • --batch-size: Batch size for processing transactions (default: 10)
  • --parallel: Use parallel processing within workers
  • --worker-threads: Threads per worker for parallel processing (default: 4)
  • --model-version: Specific model version to use

Test Options:

  • --run-id: Specific run ID to use (optional)

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages