Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Demos-Collection] Collecting Use Case Demos for Apache SeaTunnel #8388

Open
davidzollo opened this issue Dec 26, 2024 · 4 comments
Open

[Demos-Collection] Collecting Use Case Demos for Apache SeaTunnel #8388

davidzollo opened this issue Dec 26, 2024 · 4 comments

Comments

@davidzollo
Copy link
Contributor

davidzollo commented Dec 26, 2024

🎯 Collecting Use Case Demos for Apache SeaTunnel

Dear Apache SeaTunnel community members,

We are initiating a community effort to collect real-world use case demos to enrich our documentation and help more users quickly get started and solve their problems with Apache SeaTunnel! 🚀

Why are these demos important?

Many users encounter unique scenarios while using SeaTunnel. By collecting and maintaining these demos, we can:

  • Help newcomers: Showcase the broad range of use cases supported by SeaTunnel.
  • Provide quick-start solutions: Offer ready-made templates to reduce troubleshooting time.
  • Foster community collaboration: Enable others to build and improve upon these examples.

What can you contribute?

1. Share your specific use case:

  • Scenario Description: What problem did you solve?
  • SeaTunnel Version: The version you were using.
  • Configuration or Code Snippet: Key configuration or scripts.
  • Outcome: A brief description of the result.

2. Submit a complete demo:

  • Organize your configuration files, sample datasets, and documentation into a complete demo.
  • Submit a PR to the demos/ directory in the SeaTunnel repository.

3. Share ideas even without a demo:

  • If you don't have a complete demo, feel free to share your ideas here. We will help organize and improve them.

Example:

  • Scenario: Batch Single Table Synchronization from MySQL to MySQL
  • SeaTunnel Version: 2.3.8
  • Configuration:
env {
    job.name = "mysql_to_elasticsearch"
    job.mode = "BATCH"
    parallelism = 6
}

source {
    Jdbc {
        url = "jdbc:mysql://xxx:3306/source_db"
        driver = "com.mysql.cj.jdbc.Driver"
        user = "test_user"
        password = "test_pwd"

        table_list = [
            {
                query = "select * from source_db.t1"
                table_path = "source_db.t1"
            }
        ]
        result_table_name = "my-source-1"
    }
}

sink {
    Jdbc {
        source_table_name = "my-source-1"
        url = "jdbc:mysql://localhost:3306/sink_db"
        driver = "com.mysql.cj.jdbc.Driver"
        user = "test_user"
        password = "test_pwd"

        database = "sink_db"
        table = "${table_name}_copy"
        generate_sink_sql = true
        batch_size = 1000

        schema_save_mode = "RECREATE_SCHEMA"
        data_save_mode = "KEEP_SCHEMA_DROP_DATA"
    }
}

🎯 Calling All Users!

We’re collecting amazing demos for the project to showcase features and improve the experience! 🎉

📌 Why Your Contribution Matters!

  • Why contribute?Your demo could become part of the official examples, helping more developers get started quickly!
  • How to contribute? Submit your demo in the comment section—the more detailed, the better!

🔥 Show off your skills and join us in building a thriving community! 🚀

@NookVoive
Copy link

我很乐意参与这个提议
I am willing to participate in this proposal
这是一个我的Oracle-CDC连接配置:

env {
  parallelism = 2
  job.mode = "STREAMING"
  checkpoint.interval = 2000
}

source {
  Oracle-CDC {
    base-url = "jdbc:oracle:thin:BI_ETL/xxxxxx@ip:1521:ars"
    username = "BI_ETL"
    password = "xxxxxx"
    source.reader.close.timeout = 120000
    
    database-names = ["ARS"]
    schema-names = ["BI_ETL"]
    table-names = ["ARS.BI_ETL.TEST_DIM_C"]
    
    result_table_name = "TEST_DIM_C"

  }
}

sink {
  Console {
  }
}

@davidzollo
Copy link
Contributor Author

davidzollo commented Jan 14, 2025

Using the MySQL CDC Connector to Read Historical Full Data and CDC Incremental Data from Multiple Tables in a MySQL Database to Doris

使用 MySQL CDC Connector 从 MySQL 数据库读取多表历史全量和 cdc 增量数据到 Doris

env {
  job.mode = "STREAMING"
  parallelism = 1
}
source {
  MySQL-CDC {
    base-url = "jdbc:mysql://datasource01:3306/qa_source"
    username = "root"
    password = "root@123"
    
    table-names = ["qa_source.batch_mysql_to_doris", "qa_source.batch_mysql_to_doris_offline_incremental_where"]
    startup.mode = "latest"
  
  }
}

sink {
    Doris {
        fenodes = "datasource01:8034"
        query-port = 9034
        username = root
        password = "root@123"
        schema_save_mode = "RECREATE_SCHEMA"
        database = "e2e_sink"
        table = "${table_name}_from_mysql"
        sink.enable-2pc = "true"
        sink.enable-delete = "true"
        sink.label-prefix = "test_json"
        doris.config = {
            format="json"
            read_json_by_line="true"
        }
    }
}

@davidzollo
Copy link
Contributor Author

Batch task from MySQL to Doris

env {
  execution.parallelism = 4
  job.mode = "BATCH"
}

source {
  Jdbc {
    result_table_name = "tab1"
    url = "jdbc:mysql://x.x.x.x:9999/test?useSSL=false&serverTimezone=GMT%2b8"
    driver = "com.mysql.cj.jdbc.Driver"
    connection_check_timeout_sec = 100
    user = "u"
    password = "p"
    query = "select id,'2025-02-11' as pt_dt, name, task_type, task_execute_type, task_code, task_definition_version, process_instance_id, state, submit_time, start_time, end_time, host, execute_path, log_path, alert_flag, retry_times, pid, app_link, task_params, flag, retry_interval, max_retry_times, task_instance_priority, worker_group, environment_code, environment_config, executor_id, first_submit_time, delay_time, var_pool, task_group_id, dry_run, cpu_quota, memory_max, test_flag, is_cache, cache_key, process_instance_name, project_code, executor_name,'2025-02-11 14:38:18' as backup_time from ql"
    partition_column = "id"
    partition_num = 4
    fetch_size = 2000
  }
}

sink {
  Doris {
    fenodes = "x.x.x.x:9999"
    username = "u"
    password = "p"
    database = "bd_seb_test"
    table = "t_ds_task_instance_backup"
    sink.label-prefix = "mysql_to_doris" 
    sink.enable-2pc = "false"
    doris.batch.size = 500000
    sink.buffer-size = 104857600
    sink.max-retries = 5
    doris.config = {
      format = "json"                             
      read_json_by_line = "true"         
    }
  }
}

@davidzollo davidzollo changed the title [Demos-Collection] Encourage and Collect Demo Contributions [Demos-Collection] Collecting Use Case Demos for Apache SeaTunnel Feb 12, 2025
@davidzollo davidzollo pinned this issue Feb 12, 2025
@davidzollo
Copy link
Contributor Author

Streaming task from Kafka to Doris

env {
  execution.parallelism = 4 # It is recommended to adjust according to the number of Kafka partitions, keeping it consistent with the partition count
  job.mode = "STREAMING"
  checkpoint.interval = 30000
  checkpoint.timeout = 600000
  # Your current rate limits seem high but reasonable, ~700MB/s
  read_limit.bytes_per_second=700000000
  read_limit.rows_per_second=40000
}

source {
  
    Kafka {
      result_table_name = "kafka_log"
      #Kafka server address
      bootstrap.servers = "xxxxx"
      topic = "xxxx"
      consumer.group = "kafka2table"
      start_mode = "earliest"
      kafka.config = {
            "fetch.min.bytes" = "1048576" # 1MB, increase the minimum batch fetch size
            "fetch.max.wait.ms" = "500"    # Wait time when data is insufficient
            "max.partition.fetch.bytes" = "5242880" # 5MB, maximum data fetch per partition
            "max.poll.records" = "5000"     # Maximum number of records per poll
            "isolation.level" = "read_committed" # Ensure data consistency
      }
      format = json
      schema={
        fields={
            ev=STRING
            pg=STRING
            uuid=STRING
            userId=bigint
            fromDevice=STRING
            ip=STRING
            source=STRING
            np=STRING
            lp=STRING
            tg=STRING
            ch=STRING
            v=STRING
            nt=STRING
            wifi=STRING
            dbd=STRING
            dmd=STRING
            bs=STRING
            browser_version=STRING
            ext=STRING
            sid=STRING
            timestamp=bigint
            reporttime=bigint
        }
      }
  }
}

transform {
  Sql {
    source_table_name = "kafka_log"
    result_table_name = "log"
    query = "select ev as event,pg as page,uuid,userId as userid,fromDevice as platform,ip,source,np as nextpage,lp as lastpage,tg as target,ch as channel,v as version,nt as network,wifi,dbd as device_brand,dmd as device_model,bs as browser,browser_version,ext as extra,sid as sessionid,timestamp,reporttime,CURRENT_DATE as dt from kafka_log"
  }
}

sink {
  
  Doris {
    source_table_name = "log"
    
    fenodes = "dxxx"
    username = xxx
    password = "xxxx"
    table.identifier = "ods.ods_log"
    sink.label-prefix = "log"

    sink.enable-2pc = "false"
    doris.batch.size = 500000
    sink.buffer-size = 104857600
    sink.max-retries = 5

    doris.config {
        format="json"
        read_json_by_line="true"
    }
  }
}


Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants