-
-
Notifications
You must be signed in to change notification settings - Fork 83
WIP: Implement COPY … FROM STDIN
#566
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
This is still WIP. To do items include: - [ ] Test the various error cases, I have mostly focused on the success case so far - [ ] Test the backpressure support - [ ] Change the public API to accept the table + columns to copy into as well as options so that we can build the `COPY` query instead of letting the user write it - [ ] Add an API that allows binary transfer of data
/// A handle to send | ||
public struct PostgresCopyFromWriter: Sendable { | ||
private let channelHandler: NIOLoopBound<PostgresChannelHandler> | ||
private let context: NIOLoopBound<ChannelHandlerContext> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use the handlerContext: ChannelHandlerContext?
with !
in PostgresChannelHandler
instead.
func executeImmediatelyOrSchedule(_ task: @Sendable @escaping () -> Void) { | ||
if inEventLoop { | ||
return task() | ||
} | ||
return execute(task) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this needs to allocate even in the fast path. we don't like that.
struct NotWritableError: Error, CustomStringConvertible { | ||
var description = "No data must be written to `PostgresCopyFromWriter` after it has sent a CopyDone or CopyFail message, ie. after the closure producing the copy data has finished" | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this isn't used anywhere.
// TODO: Write doc comment | ||
public func copyFrom( | ||
_ query: PostgresQuery, | ||
writeData: @escaping @Sendable (PostgresCopyFromWriter) async throws -> Void, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
closure should be last argument.
} | ||
self.avoidingStateMachineCoW { state in | ||
state = .copyingData(.readyToSend) | ||
continuation.resume() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
side effect triggered in state machine. return the continuation out and succeed it in the PostgresChannelHandler.
@@ -88,7 +88,23 @@ struct ConnectionStateMachine { | |||
case sendParseDescribeBindExecuteSync(PostgresQuery) | |||
case sendBindExecuteSync(PSQLExecuteStatement) | |||
case failQuery(EventLoopPromise<PSQLRowStream>, with: PSQLError, cleanupContext: CleanUpContext?) | |||
/// Fail a query's execution by throwing an error on the given continuation. | |||
case failQueryContinuation(any AnyErrorContinuation, with: PSQLError, cleanupContext: CleanUpContext?) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
existential, where Existential isn't necessary.
This is still WIP. To do items include:
COPY
query instead of letting the user write itPostgresCopyFromWriter
to reduce the number ofCopyData
messages we need to send (and thus the protocol overhead). Alternatively, we can leave that kind of optimization to the client.Fixes #290