Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug: Typesense destination ERROR [Errno 404] Not Found. #50396

Open
1 task done
soumilbaldota opened this issue Dec 22, 2024 · 3 comments
Open
1 task done

Bug: Typesense destination ERROR [Errno 404] Not Found. #50396

soumilbaldota opened this issue Dec 22, 2024 · 3 comments

Comments

@soumilbaldota
Copy link

soumilbaldota commented Dec 22, 2024

Connector Name

destination-typesense

Connector Version

v0.1.31

What step the error happened?

During the sync

Relevant information

Image
I have been getting this error when pushing data from postgres to typesense with the v0.1.31 connector version for destination-typesense

Relevant log output

2024-12-22 14:01:58 destination INFO flushing 1000 records
2024-12-22 14:01:58 destination ERROR [Errno 404] Not Found
Traceback (most recent call last):
  File "/airbyte/integration_code/main.py", line 11, in <module>
    DestinationTypesense().run(sys.argv[1:])
  File "/usr/local/lib/python3.10/site-packages/airbyte_cdk/destinations/destination.py", line 119, in run
    for message in output_messages:
  File "/usr/local/lib/python3.10/site-packages/airbyte_cdk/destinations/destination.py", line 113, in run_cmd
    yield from self._run_write(config=config, configured_catalog_path=parsed_args.catalog, input_stream=wrapped_stdin)
  File "/usr/local/lib/python3.10/site-packages/airbyte_cdk/destinations/destination.py", line 49, in _run_write
    yield from self.write(config=config, configured_catalog=catalog, input_messages=input_messages)
  File "/airbyte/integration_code/destination_typesense/destination.py", line 52, in write
    writer.queue_write_operation(record.stream, record.data)
  File "/airbyte/integration_code/destination_typesense/writer.py", line 26, in queue_write_operation
    self.flush()
  File "/airbyte/integration_code/destination_typesense/writer.py", line 39, in flush
    self.client.collections[stream].documents.import_(data)
  File "/usr/local/lib/python3.10/site-packages/typesense/documents.py", line 60, in import_
    api_response = self.api_call.post(self._endpoint_path('import'), docs_import, params, as_json=False)
  File "/usr/local/lib/python3.10/site-packages/typesense/api_call.py", line 141, in post
    return self.make_request(requests.post, endpoint, as_json,
  File "/usr/local/lib/python3.10/site-packages/typesense/api_call.py", line 113, in make_request
    raise ApiCall.get_exception(r.status_code)(r.status_code, error_message)
typesense.exceptions.ObjectNotFound: [Errno 404] Not Found
2024-12-22 14:01:58 replication-orchestrator INFO Destination finished successfully — exiting read dest...
2024-12-22 14:01:58 replication-orchestrator WARN Attempted to close a destination which is already closed.
2024-12-22 14:02:58 replication-orchestrator INFO failures: [ {
  "failureOrigin" : "destination",
  "failureType" : "system_error",
  "internalMessage" : "[Errno 404] Not Found",
  "externalMessage" : "Something went wrong in the connector. See the logs for more details.",
  "metadata" : {
    "attemptNumber" : 0,
    "jobId" : 453,
    "from_trace_message" : true,
    "connector_command" : "write"
  },
  "stacktrace" : "Traceback (most recent call last):\n  File \"/airbyte/integration_code/main.py\", line 11, in <module>\n    DestinationTypesense().run(sys.argv[1:])\n  File \"/usr/local/lib/python3.10/site-packages/airbyte_cdk/destinations/destination.py\", line 119, in run\n    for message in output_messages:\n  File \"/usr/local/lib/python3.10/site-packages/airbyte_cdk/destinations/destination.py\", line 113, in run_cmd\n    yield from self._run_write(config=config, configured_catalog_path=parsed_args.catalog, input_stream=wrapped_stdin)\n  File \"/usr/local/lib/python3.10/site-packages/airbyte_cdk/destinations/destination.py\", line 49, in _run_write\n    yield from self.write(config=config, configured_catalog=catalog, input_messages=input_messages)\n  File \"/airbyte/integration_code/destination_typesense/destination.py\", line 52, in write\n    writer.queue_write_operation(record.stream, record.data)\n  File \"/airbyte/integration_code/destination_typesense/writer.py\", line 26, in queue_write_operation\n    self.flush()\n  File \"/airbyte/integration_code/destination_typesense/writer.py\", line 39, in flush\n    self.client.collections[stream].documents.import_(data)\n  File \"/usr/local/lib/python3.10/site-packages/typesense/documents.py\", line 60, in import_\n    api_response = self.api_call.post(self._endpoint_path('import'), docs_import, params, as_json=False)\n  File \"/usr/local/lib/python3.10/site-packages/typesense/api_call.py\", line 141, in post\n    return self.make_request(requests.post, endpoint, as_json,\n  File \"/usr/local/lib/python3.10/site-packages/typesense/api_call.py\", line 113, in make_request\n    raise ApiCall.get_exception(r.status_code)(r.status_code, error_message)\ntypesense.exceptions.ObjectNotFound: [Errno 404] Not Found\n",
  "timestamp" : 1734856318083
}, {
  "failureOrigin" : "destination",
  "internalMessage" : "Destination process message delivery failed",
  "externalMessage" : "Something went wrong within the destination connector",
  "metadata" : {
    "attemptNumber" : 0,
    "jobId" : 453,
    "connector_command" : "write"
  },
  "stacktrace" : "io.airbyte.workers.internal.exception.DestinationException: Destination process message delivery failed\n\tat io.airbyte.workers.general.BufferedReplicationWorker.writeToDestination(BufferedReplicationWorker.java:453)\n\tat io.airbyte.workers.general.BufferedReplicationWorker.lambda$runAsyncWithTimeout$5(BufferedReplicationWorker.java:243)\n\tat java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)\n\tat java.base/java.lang.Thread.run(Thread.java:1583)\nCaused by: java.io.IOException: Broken pipe\n\tat java.base/sun.nio.ch.UnixFileDispatcherImpl.write0(Native Method)\n\tat java.base/sun.nio.ch.UnixFileDispatcherImpl.write(UnixFileDispatcherImpl.java:65)\n\tat java.base/sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:137)\n\tat java.base/sun.nio.ch.IOUtil.write(IOUtil.java:102)\n\tat java.base/sun.nio.ch.IOUtil.write(IOUtil.java:72)\n\tat java.base/sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:300)\n\tat java.base/sun.nio.ch.ChannelOutputStream.writeFully(ChannelOutputStream.java:68)\n\tat java.base/sun.nio.ch.ChannelOutputStream.write(ChannelOutputStream.java:105)\n\tat java.base/sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:309)\n\tat java.base/sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:381)\n\tat java.base/sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:357)\n\tat java.base/sun.nio.cs.StreamEncoder.lockedWrite(StreamEncoder.java:158)\n\tat java.base/sun.nio.cs.StreamEncoder.write(StreamEncoder.java:139)\n\tat java.base/java.io.OutputStreamWriter.write(OutputStreamWriter.java:219)\n\tat java.base/java.io.BufferedWriter.implFlushBuffer(BufferedWriter.java:178)\n\tat java.base/java.io.BufferedWriter.flushBuffer(BufferedWriter.java:163)\n\tat java.base/java.io.BufferedWriter.implWrite(BufferedWriter.java:334)\n\tat java.base/java.io.BufferedWriter.write(BufferedWriter.java:313)\n\tat java.base/java.io.Writer.write(Writer.java:278)\n\tat io.airbyte.workers.internal.VersionedAirbyteMessageBufferedWriter.write(VersionedAirbyteMessageBufferedWriter.java:39)\n\tat io.airbyte.workers.internal.LocalContainerAirbyteDestination.acceptWithNoTimeoutMonitor(LocalContainerAirbyteDestination.kt:136)\n\tat io.airbyte.workers.internal.LocalContainerAirbyteDestination.accept(LocalContainerAirbyteDestination.kt:95)\n\tat io.airbyte.workers.general.BufferedReplicationWorker.writeToDestination(BufferedReplicationWorker.java:438)\n\t... 5 more\n",
  "timestamp" : 1734856318217
}, {
  "failureOrigin" : "source",
  "internalMessage" : "Source process read attempt failed",
  "externalMessage" : "Something went wrong within the source connector",
  "metadata" : {
    "attemptNumber" : 0,
    "jobId" : 453,
    "connector_command" : "read"
  },
  "stacktrace" : "io.airbyte.workers.internal.exception.SourceException: Source process read attempt failed\n\tat io.airbyte.workers.general.BufferedReplicationWorker.readFromSource(BufferedReplicationWorker.java:375)\n\tat io.airbyte.workers.general.BufferedReplicationWorker.lambda$runAsyncWithHeartbeatCheck$3(BufferedReplicationWorker.java:222)\n\tat java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)\n\tat java.base/java.lang.Thread.run(Thread.java:1583)\nCaused by: java.lang.IllegalStateException: No exit code found.\n\tat io.airbyte.workers.internal.ContainerIOHandle.getExitCode(ContainerIOHandle.kt:104)\n\tat io.airbyte.workers.internal.LocalContainerAirbyteSource.getExitValue(LocalContainerAirbyteSource.kt:89)\n\tat io.airbyte.workers.general.BufferedReplicationWorker.readFromSource(BufferedReplicationWorker.java:355)\n\t... 5 more\n",
  "timestamp" : 1734856318218
}, {
  "failureOrigin" : "destination",
  "internalMessage" : "Destination process exited with non-zero exit code 1",
  "externalMessage" : "Something went wrong within the destination connector",
  "metadata" : {
    "attemptNumber" : 0,
    "jobId" : 453,
    "connector_command" : "write"
  },
  "stacktrace" : "io.airbyte.workers.internal.exception.DestinationException: Destination process exited with non-zero exit code 1\n\tat io.airbyte.workers.general.BufferedReplicationWorker.readFromDestination(BufferedReplicationWorker.java:502)\n\tat io.airbyte.workers.general.BufferedReplicationWorker.lambda$runAsync$2(BufferedReplicationWorker.java:215)\n\tat java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)\n\tat java.base/java.lang.Thread.run(Thread.java:1583)\n",
  "timestamp" : 1734856318221
}, {
  "failureOrigin" : "replication",
  "internalMessage" : "io.airbyte.workers.exception.WorkerException: Destination process exit with code 1. This warning is normal if the job was cancelled.",
  "externalMessage" : "Something went wrong during replication",
  "metadata" : {
    "attemptNumber" : 0,
    "jobId" : 453
  },
  "stacktrace" : "java.lang.RuntimeException: io.airbyte.workers.exception.WorkerException: Destination process exit with code 1. This warning is normal if the job was cancelled.\n\tat io.airbyte.workers.general.BufferedReplicationWorker$CloseableWithTimeout.lambda$close$0(BufferedReplicationWorker.java:547)\n\tat io.airbyte.workers.general.BufferedReplicationWorker.lambda$runAsyncWithTimeout$5(BufferedReplicationWorker.java:243)\n\tat java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)\n\tat java.base/java.lang.Thread.run(Thread.java:1583)\nCaused by: io.airbyte.workers.exception.WorkerException: Destination process exit with code 1. This warning is normal if the job was cancelled.\n\tat io.airbyte.workers.internal.LocalContainerAirbyteDestination.close(LocalContainerAirbyteDestination.kt:60)\n\tat io.airbyte.workers.general.BufferedReplicationWorker$CloseableWithTimeout.lambda$close$0(BufferedReplicationWorker.java:545)\n\t... 5 more\n",
  "timestamp" : 1734856318307
} ]

Contribute

  • Yes, I want to contribute
@marcosmarxm
Copy link
Member

It looks a problem with collections. @soumilbaldota feel free to ping me if you have issues making the contribution.

@soumilbaldota
Copy link
Author

soumilbaldota commented Dec 24, 2024

@marcosmarxm is there any easy way to get the configured_catalog.json out of postgres/airbyte to test the destination-typesense easily. It would be great if you could refer me to a readme. I have the integration_test.py running fine.

Also the readme needs to be updated with the poetry steps, they are still using the requirements.txt which does not exist, may confuse community devs like me.

edit: I saw the seed.py file in source-postgres but found no linked documentation

@soumilbaldota
Copy link
Author

The issue is only when the collection does not exist (i.e. the first sync).

Would you like me to add code to create the collection initially if it does not exist ? Let me know and I will send an PR.

These do not work for first sync

      "sync_mode": "incremental",
      "destination_sync_mode": "append"

      "sync_mode": "full_refresh",
      "destination_sync_mode": "append",

      "sync_mode": "full_refresh",
      "destination_sync_mode": "append_dedup",

This works for first sync

      "sync_mode": "full_refresh",
      "destination_sync_mode": "overwrite"

Second time onwards, all work

      "sync_mode": "incremental",
      "destination_sync_mode": "append_dedup",

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants