Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE] Ensure exactly-once delivery without doc ID support in AOSS #1012

Open
dai-chen opened this issue Jan 8, 2025 · 0 comments
Open
Labels
enhancement New feature or request

Comments

@dai-chen
Copy link
Collaborator

dai-chen commented Jan 8, 2025

Is your feature request related to a problem?

In OpenSearch Serverless (AOSS) time series collections, document creation and update operations fail when a document ID is specified. This limitation affects Flint's ability to guarantee idempotency for index operations that rely on document IDs. For example:

24/12/19 23:58:46 WARN TaskSetManager: Lost task 39.0 in stage 3.0 (TID 3447) ([2600:1f14:38a0:a801:1a95:8524:74ed:54a8] executor 4): java.lang.RuntimeException: failure in bulk execution:
[0]: index [flint_glue_default_mv_idempotent_test_1], id [51fd2d0929ebb39c79ee9492ba064f654e56709c], message [OpenSearchException[OpenSearch exception [type=illegal_argument_exception, reason=Document ID is not supported in create/index operation request]]]
[1]: index [flint_glue_default_mv_idempotent_test_1], id [53b97544fb9831e0714a12ca243930539af06c3d], message [OpenSearchException[OpenSearch exception [type=illegal_argument_exception, reason=Document ID is not supported in create/index operation request]]]
[2]: index [flint_glue_default_mv_idempotent_test_1], id [5896fb03aa01e3962215b19eddefa07a8764fc55], message [OpenSearchException[OpenSearch exception [type=illegal_argument_exception, reason=Document ID is not supported in create/index operation request]]]
[3]: index [flint_glue_default_mv_idempotent_test_1], id [1ec78f9c430d1e7dfc682b76b0f8e27eb7e3c32d], message [OpenSearchException[OpenSearch exception [type=illegal_argument_exception, reason=Document ID is not supported in create/index operation request]]]
[4]: index [flint_glue_default_mv_idempotent_test_1], id [fcd5f0c77810f6382ff00ec942e7f1fcb529d61f], message [OpenSearchException[OpenSearch exception [type=illegal_argument_exception, reason=Document ID is not supported in create/index operation request]]]
	at org.opensearch.flint.core.storage.OpenSearchWriter.flush(OpenSearchWriter.java:64)
	at shaded.flint.com.fasterxml.jackson.core.json.WriterBasedJsonGenerator.flush(WriterBasedJsonGenerator.java:983)
	at org.apache.spark.sql.flint.json.FlintJacksonGenerator.flush(FlintJacksonGenerator.scala:257)
	at org.apache.spark.sql.flint.FlintPartitionWriter.write(FlintPartitionWriter.scala:64)
	at org.apache.spark.sql.flint.FlintPartitionWriter.write(FlintPartitionWriter.scala:24)
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.write(WriteToDataSourceV2Exec.scala:493)
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$1(WriteToDataSourceV2Exec.scala:448)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1410)
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:486)
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:425)
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:491)
	at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:388)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:174)
	at org.apache.spark.scheduler.Task.run(Task.scala:152)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:632)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:96)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:635)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)

The implications of these findings are as follows:

  1. Flint's skipping index creation will fail because it automatically uses source file path as the document ID.
  2. The deduplication approach proposed for APPEND mode in PR Enhance CV and MV idempotency via deterministic ID generation #946 will not function as intended.
  3. A similar deduplication approach (upsert with document ID) for future UPDATE mode support will also be ineffective.

What solution would you like?

Because OpenSearch lacks transaction and atomic bulk support, most approaches below can only guarantee eventual consistency for the data shown in the Dashboard. This means that dirty reads and partial results may occur and cannot be fully avoided. Potential approaches include:

  1. Remove Duplicate Data Upon Restart: This approach involves deleting duplicate data created by a failed microbatch upon restart before resuming ingestion. However, this approach requires additional fields like batchId stored in the index to support cleanup operations.
POST idemp_test/_doc
{
  "batchId": 1,
  "name": "hello"
}

DELETE idemp_test
{
  "query": {
    "match": {
      "batchId": 1
    }
  }
}
  1. Checkpointing Successful Bulk Requests: Track successful bulk requests in the sink's checkpoint. This approach requires deterministic bulk requests within each microbatch to ensure that the same data is sent during a restart.

  2. Query-Time Deduplication: OpenSearch DSL queries can return only the latest document per group using the top_hits aggregation. This approach works by keeping duplicate documents and deduplicating at query time.

POST idemp_test/_doc
{
  "batchId": 2,
  "name": "hello"
}

POST idemp_test/_doc
{
  "batchId": 3,
  "name": "hello"
}

POST idemp_test/_search
{
  "size": 0,
  "aggs": {
    "group_by_name": {
      "terms": {
        "field": "name.keyword",
        "size": 1000
      },
      "aggs": {
        "latest_batch_doc": {
          "top_hits": {
            "sort": [
              { "batchId": { "order": "desc" } }
            ],
            "size": 1
          }
        }
      }
    }
  }
}
  1. Staging Index for Intermediate Writes: Use a staging index for each microbatch write. After each batch succeeds, update the alias to include all relevant indexes. This approach may incur overhead due to the creation and maintenance of many indexes for each microbatch.
POST idemp_test_001/_doc
{
  "name": "hello"
}

POST idemp_test_002/_doc
{
  "name": "world"
}

POST /_aliases
{
  "actions": [
    { "add": { "index": "idemp_test_001", "alias": "idemp_test" } },
    { "add": { "index": "idemp_test_002", "alias": "idemp_test" } }
  ]
}

What alternatives have you considered?

Using AOSS search collections instead of time series collections to support document ID operations.

Do you have any additional context?

@dai-chen dai-chen added enhancement New feature or request untriaged labels Jan 8, 2025
@dai-chen dai-chen changed the title [FEATURE] Ensure idempotency without doc ID operation support in AOSS [FEATURE] Ensure exactly-once delivery without doc ID support in AOSS Jan 8, 2025
@dai-chen dai-chen removed the untriaged label Jan 8, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

1 participant