Skip to content

A comprehensive demonstration of Snowflake's OpenFlow platform showcasing intelligent data ingestion, schema evolution, and real-time analytics for music festival and concert data

License

Notifications You must be signed in to change notification settings

Snowflake-Labs/openflow-structured-data-pipeline-demo

Repository files navigation

🎵 MusicFlow - Snowflake OpenFlow Demo

A comprehensive demonstration of Snowflake's OpenFlow platform showcasing intelligent data ingestion, schema evolution, and real-time analytics for music festival and concert data. This demo leverages Apache NiFi, Snowflake Cortex AI, and Apache Iceberg to create an end-to-end data pipeline that automatically detects schema changes and evolves data structures.

🎬 Demo Video

MusicFlow Demo - Intelligent Data Pipelines

Watch the demo in action: MusicFlow - Intelligent Data Pipelines for Music Festival Analytics

🌟 Overview

The Problem We Solve

Building data pipelines for every new data source can be a real headache, taking up tons of time and money – sometimes even months! Even when the info is similar, different systems have totally different setups. This means what should be a quick job turns into months of intense, manual engineering.

Our Solution

What if all that manual data wrangling just poof, disappeared? With Snowflake OpenFlow, we can whip up smart data pipelines that tap into Cortex to figure out schemas from any incoming data – messy or neat. And thanks to Snowflake OpenCatalog, schema changes are a breeze, instantly building unified Iceberg analytics tables.

MusicFlow demonstrates how to build intelligent data pipelines that can:

  • Automatically ingest CSV files from various music festival sources (SoundWave Festival, Harmony Grove, Beat Valley, Music Mountain, Coastal Beats)
  • Intelligently detect schema changes using Snowflake Cortex AI
  • Dynamically evolve Iceberg table schemas without manual intervention
  • Provide real-time schema registry monitoring and analytics
  • Support multiple data sources with unified schema mapping

Key Features

  • 🤖 AI-Powered Schema Intelligence: Uses Snowflake Cortex to analyze CSV structure and determine schema evolution needs
  • 🔄 Dynamic Schema Evolution: Automatically adds new columns when new data fields are detected
  • 📊 Real-time Monitoring: Streamlit-based schema registry dashboard for pipeline visibility
  • 🏗️ External REST Catalog: Integrates with Snowflake OpenCatalog for Iceberg table management
  • 🎯 Multi-Source Support: Handles diverse festival data formats with semantic field mapping

🏗️ Architecture

Data Flow

---
config:
  layout: dagre
---
flowchart TD
    A["Google Shared Drive Folder"] --> B["CSV Files"]
    D["Schema Analysis"] --> E{"Cortex AI"}
    F["S3 Bucket"] --> I["Load into Iceberg Table"]
    G["Schema Evolution"] --> H["ALTER TABLE"]
    H -- Move to --> F
    B --> D
    E --> n1["New Schema"] & G
    n1 --> n2["CREATE"]
    n2 -- Move to --> F
    n1@{ shape: rect}
    n2@{ shape: rect}
Loading

Key Components

  1. Data Sources: Festival CSV files (SoundWave Festival, Harmony Grove, Beat Valley, etc.)
  2. Ingestion Layer: Snowflake OpenFlow with NiFi flows for file processing and routing
  3. AI Analysis: Snowflake Cortex for intelligent schema detection
  4. Storage Layer: Apache Iceberg tables via Snowflake OpenCatalog
  5. Monitoring: Streamlit dashboard for pipeline visibility
  6. Schema Registry: Metadata tracking and version management

🛠️ Prerequisites

Required Tools & Software

Tool Version Purpose Installation
Snowflake CLI Latest Database operations and SPCS management Install Guide
Polaris CLI Commit 52e30f0 OpenCatalog management and operations Clone Repository
Python 3.12+ Application runtime Python Downloads
jq Latest JSON processing brew install jq (macOS) or Download
Task Latest Task runner Taskfile.dev

Snowflake Requirements

⚠️ IMPORTANT: Snowflake Trials are NOT supported for this demo. You need a full Snowflake account with the following capabilities:

  • Snowflake Account with Enterprise Edition (Production/Full Account Required)
  • Snowpark Container Services (SPCS) enabled and configured
  • OpenFlow enabled and accessible
  • Cortex AI access for schema analysis
  • Snowflake OpenCatalog enabled and configured for Iceberg tables

Snowflake OpenCatalog Prerequisites

Before setting up the MusicFlow demo, you need to configure Snowflake OpenCatalog:

  1. OpenCatalog Account: Create a Snowflake OpenCatalog account
  2. Catalog Configuration: Set up a catalog with appropriate storage locations
  3. Service Connection: Configure service connections and credentials
  4. Snowflake Integration: Link OpenCatalog to your Snowflake account

AWS Prerequisites

  • AWS CLI configured with credentials
  • S3 Bucket for data ingestion staging your-music-flow-demo-ingest-data.
  • AWS IAM User with appropriate permissions for S3 to be used with Snowflake Openflow to move file to data ingestion bucket.
  • IAM Role with appropriate permissions for S3 and OpenCatalog access

Slack Prerequisites

  • Slack channel for notifications - Slack bot oauth token for notifications and a channel to send notifications to.

🚀 Quick Start

1. Environment Setup

# Clone the repository
git clone <repository-url>
cd openflow-structured-data-pipeline-demo

# Install Python dependencies using uv
uv sync

# Set up environment variables
cp env.template .env
# Edit .env with your Snowflake and AWS credentials

2. Snowflake Configuration

# Login to Snowflake
snow login

# Setup Snowflake environment (creates database, warehouse, role, and user)
task setup

# Verify environment setup
task env_check

3. Snowflake OpenCatalog Setup

đź’ˇ Note: It's recommended to perform OpenCatalog setup in a new terminal session to avoid conflicts with existing environment variables and ensure clean configuration.

Quick Setup (All Steps)

# Complete OpenCatalog setup with all dependencies
task opencatalog:setup USE_SNOWFLAKE_ROLE=your_admin_role

# Configure storage (S3 bucket and IAM roles)
task opencatalog:configure_storage USE_SNOWFLAKE_ROLE=your_admin_role

# Setup Snowflake integration
task opencatalog:setup_snowflake_integration USE_SNOWFLAKE_ROLE=your_admin_role

Individual Setup Steps

If you prefer to run setup steps individually:

# 1. Install Polaris CLI
task opencatalog:install_cli

# 2. Create catalog
task opencatalog:create_catalog

# 3. Create service principal and roles
task opencatalog:create_principal_roles_and_grants

# 4. Configure storage
task opencatalog:configure_storage USE_SNOWFLAKE_ROLE=your_admin_role

# 5. Setup Snowflake integration
task opencatalog:setup_snowflake_integration USE_SNOWFLAKE_ROLE=your_admin_role

# 6. Verify integration
task opencatalog:check_snowflake_integration USE_SNOWFLAKE_ROLE=your_admin_role

4. Snowflake Openflow Setup

IMPORTANT: Work in progress.

4. Demo Data Setup

# Set table parameters for the demo
export TABLE_NAMESPACE="events"
export TABLE_NAME="music_events"

# Create initial Iceberg table
task create_schema

# Download sample data sources
task download_sources

📊 Data Sources

The demo processes music festival data from multiple synthetic sources:

Base Schema Files (5 Core Fields)

  • SoundWave Festival: soundwave_events_2025.csv

    • Fields: event_id, artist_name, stage, start_time, ticket_price
    • Use: Initial table creation with base schema
  • Beat Valley: beat_valley_lineup_2025.csv

    • Fields: show_id, dj_performer, venue_section, time_block, admission_cost
    • Use: Demonstrates semantic mapping (different field names, same meaning)

Schema Evolution Files (Additional Columns)

  • SoundWave Festival Enhanced: soundwave_events_enhanced.csv

    • Base Fields: event_id, artist_name, stage, start_time, ticket_price
    • New Fields: genre (STRING), sponsor (STRING)
    • Use: Musical classification and sponsorship analytics
  • Harmony Grove: harmony_grove_lineup.csv

    • Base Fields: show_id, performer, location, date_time, ticket_cost
    • New Fields: capacity (BIGINT)
    • Use: Venue capacity planning and crowd management
  • Music Mountain: music_mountain_schedule.csv

    • Base Fields: slot_number, headliner, venue_area, time_slot, entry_fee
    • New Fields: max_attendance (BIGINT), day (STRING)
    • Use: Multi-day festival scheduling and attendance tracking
  • Coastal Beats: coastal_beats_performances.csv

    • Base Fields: performance_id, act_name, stage_location, show_time, ticket_price
    • New Fields: price_tier (STRING), weather_backup (STRING), city (STRING)
    • Use: Tiered pricing models, weather contingency planning, geographic analytics

Semantic Field Mapping

All files map to the unified events.music_events table through intelligent field matching:

Standard Field Semantic Equivalents Across Sources Type
event_id show_id, slot_number, performance_id BIGINT
artist_name dj_performer, performer, headliner, act_name, artist STRING
main_stage venue_section, location, venue_area, stage_location, stage STRING
start_time time_block, date_time, time_slot, show_time STRING
vip_price admission_cost, ticket_cost, entry_fee, ticket_price DOUBLE

Note on price_tier: The coastal_beats_performances.csv file includes both ticket_price (DOUBLE - numeric price) and price_tier (STRING - categorical tier like "Premium", "Standard", "Budget"). The ticket_price field maps to vip_price, while price_tier is added as a new column during schema evolution for tiered pricing analytics.

Each source has different field names and structures, demonstrating the schema evolution capabilities. All artist names, festival names, venue names, and sponsor brands are completely synthetic to avoid any trademark or copyright issues.

đź”§ Available Tasks

Core Environment Tasks

# Environment management
task setup                    # Setup Snowflake environment (database, warehouse, role, user)
task clean                    # Clean environment for fresh start
task env_check               # Check if environment variables are properly set

# Schema operations
task create_schema           # Create new Iceberg table with generated SQL
task evolve_schema           # Evolve existing table schema
task download_sources        # Download generated SQL files from Snowflake stage

# Data operations
task spark_sql               # Run Spark SQL queries against Iceberg tables
task truncate_schema_registry # Truncate the schema registry table
task truncate_music_flow_resources # Clean up music flow resources

OpenCatalog Tasks

# OpenCatalog setup
task opencatalog:setup                    # Complete OpenCatalog setup (all steps)
task opencatalog:install_cli             # Install Polaris CLI for OpenCatalog management
task opencatalog:create_catalog          # Create music flow demo catalog in OpenCatalog
task opencatalog:create_principal_roles_and_grants # Create service principal and roles
task opencatalog:configure_storage       # Configure OpenCatalog storage (S3 bucket and IAM)
task opencatalog:setup_snowflake_integration # Setup Snowflake OpenCatalog integration
task opencatalog:check_snowflake_integration # Check Snowflake integration status

# OpenCatalog utilities
task opencatalog:catalog_info            # Get catalog information
task opencatalog:env_check              # Check OpenCatalog environment variables

# OpenCatalog cleanup
task opencatalog:cleanup                 # Complete OpenCatalog cleanup (all steps)
task opencatalog:cleanup_resources       # Clean up OpenCatalog resources
task opencatalog:cleanup_snowflake_integration # Clean up Snowflake catalog integration

Usage Examples

# Basic setup workflow
task setup
task opencatalog:setup USE_SNOWFLAKE_ROLE=your_admin_role
task opencatalog:configure_storage USE_SNOWFLAKE_ROLE=your_admin_role
task opencatalog:setup_snowflake_integration USE_SNOWFLAKE_ROLE=your_admin_role

# Demo workflow
export TABLE_NAMESPACE="events"
export TABLE_NAME="music_events"
task create_schema
task evolve_schema

# Cleanup workflow
task clean
task opencatalog:cleanup

🤖 AI-Powered Solution in Action

Step 1: Schema Detection & Mapping

  • AI analyzes incoming CSV structure
  • Maps different field names semantically:
    • artist_name ↔ dj_performer ↔ performer → unified_artist
    • main_stage ↔ venue_section ↔ location → unified_stage
    • vip_price ↔ ticket_cost → unified_price

Step 2: Schema Evolution

  • Detects new fields (e.g., genre, sponsor, capacity, weather_backup, city)
  • Automatically evolves Iceberg table schema using AI-powered analysis
  • Adds new columns without breaking existing data or requiring downtime
  • Supports multiple evolution scenarios based on business needs

Step 3: Unified Analytics Table

Initial Table Creation (from soundwave_events_2025.csv):

CREATE ICEBERG TABLE "events"."music_events" (
    "event_id" BIGINT NOT NULL,
    "artist_name" STRING,
    "main_stage" STRING,
    "start_time" STRING,
    "vip_price" DOUBLE
);

After Schema Evolution (multiple sources processed):

ALTER ICEBERG TABLE "events"."music_events" 
ADD COLUMN "genre" STRING COMMENT 'Musical genre of the performance',
    COLUMN "sponsor" STRING COMMENT 'Sponsoring organization or brand',
    COLUMN "capacity" BIGINT COMMENT 'Maximum venue capacity',
    COLUMN "max_attendance" BIGINT COMMENT 'Maximum attendance limit',
    COLUMN "day" STRING COMMENT 'Day of the multi-day festival',
    COLUMN "price_tier" STRING COMMENT 'Ticket pricing tier (Premium/Standard/Budget)',
    COLUMN "weather_backup" STRING COMMENT 'Weather backup plan or indoor venue option',
    COLUMN "city" STRING COMMENT 'City where the performance takes place';

Final Unified Table:

-- Unified schema accommodating all festival data sources
"events"."music_events" (
    "event_id" BIGINT NOT NULL,      -- Unified: event_id/show_id/slot_number/performance_id
    "artist_name" STRING,             -- Unified: artist_name/dj_performer/performer/headliner/act_name
    "main_stage" STRING,              -- Unified: main_stage/venue_section/location/venue_area/stage_location
    "start_time" STRING,              -- Unified: start_time/time_block/date_time/time_slot/show_time
    "vip_price" DOUBLE,               -- Unified: vip_price/admission_cost/ticket_cost/entry_fee/ticket_price
    "genre" STRING,                   -- New: Musical classification
    "sponsor" STRING,                 -- New: Sponsorship tracking
    "capacity" BIGINT,                -- New: Venue capacity planning
    "max_attendance" BIGINT,          -- New: Attendance limits
    "day" STRING,                     -- New: Multi-day scheduling
    "price_tier" STRING,              -- New: Tiered pricing models
    "weather_backup" STRING,          -- New: Weather contingency
    "city" STRING                     -- New: Geographic analytics
);

🎯 The Result: One Unified Table, Any Data Format

  • Works for retail, logistics, finance, healthcare, any industry
  • Minutes of smart automation instead of months of manual work
  • Scales easily from three data sources to three thousand

🚀 Live Demo Commands

This demo showcases three key scenarios using different CSV files:

1. Create Schema (Initial Table Creation)

# Copy the first CSV file to Google Shared Drive folder
# File: soundwave_events_2025.csv
# This will trigger schema analysis and table creation

# Wait for the pipeline to process the file and generate SQL DDL files
# The file will be automatically moved to the data ingestion bucket

# Create the Iceberg table with the detected schema
export TABLE_NAMESPACE="events"
export TABLE_NAME="music_events"
task create_schema

# Watch rows from CSV loaded into Iceberg table

Expected Result: New Iceberg table events.music_events created with base columns (event_id, artist_name, main_stage, start_time, vip_price)

2. Just Do Ingestion (Same Schema)

# Copy a CSV file with semantically matching schema (5 base fields)
# File: beat_valley_lineup_2025.csv
# Schema: show_id,dj_performer,venue_section,time_block,admission_cost
# Maps to: event_id,artist_name,main_stage,start_time,vip_price

# Wait for the pipeline to process the file
# No schema changes detected - data is directly ingested

# No additional task needed - data automatically loads into existing table

Expected Result: New rows appended to existing events.music_events table without schema changes. Field names are semantically mapped (e.g., dj_performer → artist_name, venue_section → main_stage)

3. Evolve Schema (New Columns Detected)

# Copy a CSV file with additional columns beyond the base 5 fields
# Choose one file based on desired new columns:

# Option A: Add genre and sponsor information
#   File: soundwave_events_enhanced.csv
#   New columns: genre (STRING), sponsor (STRING)
#   Use case: Musical classification and sponsorship tracking

# Option B: Add venue capacity information
#   File: harmony_grove_lineup.csv
#   New columns: capacity (BIGINT)
#   Use case: Venue capacity planning and management

# Option C: Add attendance tracking and scheduling
#   File: music_mountain_schedule.csv
#   New columns: max_attendance (BIGINT), day (STRING)
#   Use case: Multi-day festival planning and attendance limits

# Option D: Add weather contingency and location details
#   File: coastal_beats_performances.csv
#   New columns: price_tier (STRING), weather_backup (STRING), city (STRING)
#   Use case: Weather planning, tiered pricing, and geographic tracking

# Wait for the pipeline to process the file and generate ALTER TABLE SQL

# Evolve the table schema with new columns
task evolve_schema

# Watch new columns added and rows loaded into Iceberg table

Expected Result: Schema evolved with new columns and data loaded with new fields populated

Schema Evolution Examples:

File Base Fields (Semantic Mapping) New Columns Added Data Types Business Value
soundwave_events_enhanced.csv event_id, artist_name→artist, stage→stage, start_time→start_time, ticket_price→ticket_price genre, sponsor STRING, STRING Music classification, sponsorship analytics
harmony_grove_lineup.csv event_id→show_id, artist_name→performer, stage→location, start_time→date_time, ticket_price→ticket_cost capacity BIGINT Venue capacity planning, crowd management
music_mountain_schedule.csv event_id→slot_number, artist_name→headliner, stage→venue_area, start_time→time_slot, ticket_price→entry_fee max_attendance, day BIGINT, STRING Multi-day scheduling, attendance tracking
coastal_beats_performances.csv event_id→performance_id, artist_name→act_name, stage→stage_location, start_time→show_time, ticket_price→ticket_price price_tier, weather_backup, city STRING, STRING, STRING Tiered pricing, weather contingency, location analytics

NOTE: Each milestone in the pipeline will send notifications to your configured Slack channel, including:

  • File detected and processing started
  • Schema analysis completed (create/evolve/ingest)
  • SQL DDL generated and staged
  • File moved to ingestion bucket
  • Table created/evolved successfully
  • Data ingestion completed

🎯 Usage Examples

Creating a New Table

# Set table parameters
export TABLE_NAMESPACE="events"
export TABLE_NAME="music_events"

# Create table with generated SQL
task create_schema

Evolving Schema

# When new fields are detected, evolve the schema
task evolve_schema

Cleanup

# Clean up OpenCatalog resources
task opencatalog:cleanup

# Clean up all MusicFlow resources
task clean

Complete Demo Workflow

# 1. Initial setup
task setup
task opencatalog:setup USE_SNOWFLAKE_ROLE=your_admin_role
task opencatalog:configure_storage USE_SNOWFLAKE_ROLE=your_admin_role
task opencatalog:setup_snowflake_integration USE_SNOWFLAKE_ROLE=your_admin_role

# 2. Demo data setup
export TABLE_NAMESPACE="events"
export TABLE_NAME="music_events"
task create_schema

# 3. Upload data and evolve schema
# (Upload CSV files to S3 with appropriate tags)
task evolve_schema

# 4. Cleanup when done
task clean
task opencatalog:cleanup

🔍 Troubleshooting

Common Issues

  1. Schema Analysis Failures: Ensure Cortex AI is enabled and accessible
  2. Iceberg Table Creation: Verify OpenCatalog configuration and permissions
  3. OpenFlow Service Errors: Check SPCS service status and runtime extensions
  4. Permission Issues: Verify service account has appropriate roles
  5. Trial Account Limitations: Ensure you're using a full Snowflake account, not a trial
  6. OpenCatalog Connection Issues: Verify OC_ environment variables and service credentials
  7. Storage Access Issues: Check AWS IAM role permissions and external ID configuration

Debug Commands

# Check environment setup
task env_check
task opencatalog:env_check

# View schema registry
snow sql --query "SELECT * FROM metadata.schema_registry"

# Check OpenFlow service status
snow spcs service list --database your_demo_database

# View service logs
snow spcs service logs --service-name your_service_name

# Test OpenCatalog connection
task opencatalog:catalog_info

# Verify catalog integration in Snowflake
task opencatalog:check_snowflake_integration USE_SNOWFLAKE_ROLE=your_admin_role

# Check Snowflake integration status
snow sql --query "SHOW CATALOG INTEGRATIONS"

# Test Iceberg table creation
snow sql --query "CREATE ICEBERG TABLE IF NOT EXISTS test_table (id INT) IN CATALOG music_flow_catalog"

🏗️ Technology Stack

Intelligent Data Pipelines. Instant Analytics.

Technology Purpose Integration
Snowflake Data Platform & Analytics Core platform
Snowflake OpenFlow Data Pipeline Orchestration Apache NiFi-based flows
Cortex AI Schema Intelligence AI-powered analysis
Snowflake OpenCatalog(Managed Apache Polaris) Open Catalog Iceberg table management
Apache Iceberg Data Lakehouse Format Unified table format

📚 Additional Resources

🤝 Contributing

  1. Fork the repository
  2. Create a feature branch
  3. Make your changes
  4. Test thoroughly
  5. Submit a pull request

đź“„ License

Copyright (c) Snowflake Inc. All rights reserved. Licensed under the Apache 2.0 license.


🎵 MusicFlow - Intelligent Data Pipelines for Music Festival Analytics

"Minutes of smart automation instead of months of manual work"

About

A comprehensive demonstration of Snowflake's OpenFlow platform showcasing intelligent data ingestion, schema evolution, and real-time analytics for music festival and concert data

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published