Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug][Connector-V2][Assert] AssertSinkWriter supports multiple concurrent operations #8357

Open
2 of 3 tasks
zhangshenghang opened this issue Dec 20, 2024 · 3 comments · May be fixed by #8746
Open
2 of 3 tasks

[Bug][Connector-V2][Assert] AssertSinkWriter supports multiple concurrent operations #8357

zhangshenghang opened this issue Dec 20, 2024 · 3 comments · May be fixed by #8746

Comments

@zhangshenghang
Copy link
Member

Search before asking

  • I had searched in the feature and found no similar feature requirement.

Description

The current AssertSinkWriter has a bug in the Close method to verify whether the data has been run completely, especially when dealing with multiple concurrent instances.
For example, if there are two AssertSinkWriter threads A and B, and A has finished running while B has not, A will execute the close method to count whether the overall execution is complete.

The correct logic is to perform validation after all threads have completed running, or alternatively, thread A only verifies the data it runs.

image

Usage Scenario

No response

Related issues

No response

Are you willing to submit a PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@zhangshenghang
Copy link
Member Author

You can use this configuration file to reproduce this issue

env {
  job.mode = "BATCH"
}

source {
  FakeSource {
    parallelism = 1
    tables_configs = [
      {
        row.num = 1000
        schema = {
          table = "test.abc"
          columns = [
            {
              name = "id"
              type = "bigint"
            },
            {
              name = "name"
              type = "string"
            },
            {
              name = "age"
              type = "int"
            }
          ]
        }
      },
      {
        row.num = 1000
        schema = {
          table = "test.xyz"
          columns = [
            {
              name = "id"
              type = "bigint"
            },
            {
              name = "name"
              type = "string"
            },
            {
              name = "age"
              type = "int"
            }
          ]
        }
      },
      {
        row.num = 1000
        schema = {
          table = "test.www"
          columns = [
            {
              name = "id"
              type = "bigint"
            },
            {
              name = "name"
              type = "string"
            },
            {
              name = "age"
              type = "int"
            }
          ]
        }
      }
    ]
  }
}

transform {
  FilterRowKind {
    table_match_regex = "test.a.*"
    table_transform = [{
      table_path = "test.xyz"
      exclude_kinds = ["INSERT"]
    }]
    exclude_kinds = ["INSERT"]
  }
}

sink {
  Assert {
    rules =
      {
        tables_configs = [
          {
            table_path = "test.abc"
            row_rules = [
              {
                rule_type = MIN_ROW
                rule_value = 0
              },
              {
                rule_type = MAX_ROW
                rule_value = 0
              }
            ]
          },
          {
            table_path = "test.xyz"
            row_rules = [
              {
                rule_type = MIN_ROW
                rule_value = 0
              },
              {
                rule_type = MAX_ROW
                rule_value = 0
              }
            ]
          },
          {
            table_path = "test.www"
            row_rules = [
              {
                rule_type = MIN_ROW
                rule_value = 1000
              },
              {
                rule_type = MAX_ROW
                rule_value = 1000
              }
            ]
          }
        ]
      }
  }
}

@corgy-w
Copy link
Contributor

corgy-w commented Dec 24, 2024

I'm in 'rehabilitation training' [/dog], if I have time I can try this issue

Copy link

This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.

@github-actions github-actions bot added the stale label Jan 26, 2025
@corgy-w corgy-w linked a pull request Feb 16, 2025 that will close this issue
4 tasks
@Hisoka-X Hisoka-X linked a pull request Feb 19, 2025 that will close this issue
4 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants