Skip to content

Commit

Permalink
Open several writers to increase BigQuery throughput
Browse files Browse the repository at this point in the history
Before this PR, the loader opened a single `JsonStreamWriter`, and
therefore a single stream (single bidirectional RPC). We observed the
loader seems to hit a bottleneck where it cannot write into BigQuery
quickly enough.

After this PR, the loader opens a configurable number of streams, which
is expected to prevent the bottleneck.

This implementation closely follows a similar change we made in the
snowflake loader: snowplow-incubator/snowflake-loader#57
  • Loading branch information
istreeter committed Feb 7, 2025
1 parent 3e1dd94 commit aa530c0
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ case class Environment[F[_]](
resolver: Resolver[F],
httpClient: Client[F],
tableManager: TableManager.WithHandledErrors[F],
writer: Writer.Provider[F],
writers: Vector[Writer.Provider[F]],
metrics: Metrics[F],
appHealth: AppHealth.Interface[F, Alert, RuntimeService],
alterTableWaitPolicy: RetryPolicy[F],
Expand Down Expand Up @@ -64,15 +64,17 @@ object Environment {
tableManager <- Resource.eval(TableManager.make(config.main.output.good, creds))
tableManagerWrapped <- Resource.eval(TableManager.withHandledErrors(tableManager, config.main.retries, appHealth))
writerBuilder <- Writer.builder(config.main.output.good, creds)
writerProvider <- Writer.provider(writerBuilder, config.main.retries, appHealth)
writerProviders <- Vector.range(0, config.main.batching.writeBatchConcurrency).traverse { _ =>
Writer.provider(writerBuilder, config.main.retries, appHealth)
}
} yield Environment(
appInfo = appInfo,
source = sourceAndAck,
badSink = badSink,
resolver = resolver,
httpClient = httpClient,
tableManager = tableManagerWrapped,
writer = writerProvider,
writers = writerProviders,
metrics = metrics,
appHealth = appHealth,
alterTableWaitPolicy = BigQueryRetrying.policyForAlterTableWait[F](config.main.retries),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ object Processing {
implicit val lookup: RegistryLookup[F] = Http4sRegistryLookup(env.httpClient)
val eventProcessingConfig = EventProcessingConfig(EventProcessingConfig.NoWindowing, env.metrics.setLatency)
Stream.eval(env.tableManager.createTableIfNotExists) *>
Stream.eval(env.writer.opened.use_) *>
Stream.eval(env.writers.head.opened.use_) *>
env.source.stream(eventProcessingConfig, eventProcessor(env))
}

Expand Down Expand Up @@ -109,7 +109,6 @@ object Processing {
in.through(parseBytes(badProcessor))
.through(BatchUp.withTimeout(env.batching.maxBytes, env.batching.maxDelay))
.through(transform(env, badProcessor))
.through(handleSchemaEvolution(env))
.through(writeToBigQuery(env, badProcessor))
.through(setE2ELatencyMetric(env))
.through(sendFailedEvents(env, badProcessor))
Expand Down Expand Up @@ -216,12 +215,16 @@ object Processing {
env: Environment[F],
badProcessor: BadRowProcessor
): Pipe[F, BatchAfterTransform, BatchAfterTransform] =
_.parEvalMap(env.batching.writeBatchConcurrency) { batch =>
writeUntilSuccessful(env, badProcessor, batch)
.onError { _ =>
env.appHealth.beUnhealthyForRuntimeService(RuntimeService.BigQueryClient)
}
}
_.zip(Stream.emits(env.writers).repeat)
.parEvalMap(env.writers.length) { case (batch, writerProvider) =>
for {
_ <- handleSchemaEvolution(env, writerProvider, batch)
result <- writeUntilSuccessful(writerProvider, env.alterTableWaitPolicy, badProcessor, batch)
.onError { _ =>
env.appHealth.beUnhealthyForRuntimeService(RuntimeService.BigQueryClient)
}
} yield result
}

implicit private class WriteResultOps[F[_]: Async](val attempt: F[Writer.WriteResult]) {

Expand All @@ -234,7 +237,10 @@ object Processing {
*/
private def errorsAllowedWithShortLogging = 4

def handlingServerSideSchemaMismatches(env: Environment[F]): F[Writer.WriteResult] = {
def handlingServerSideSchemaMismatches(
writerProvider: Writer.Provider[F],
alterTableWaitPolicy: RetryPolicy[F]
): F[Writer.WriteResult] = {
def onFailure(wr: Writer.WriteResult, details: RetryDetails): F[Unit] = {
val msg = show"Newly added columns have not yet propagated to the BigQuery Writer server-side. $details"
val log = wr match {
Expand All @@ -243,11 +249,11 @@ object Processing {
case _ =>
Logger[F].warn(msg)
}
log *> env.writer.closed.use_
log *> writerProvider.closed.use_
}
attempt
.retryingOnFailures(
policy = env.alterTableWaitPolicy,
policy = alterTableWaitPolicy,
wasSuccessful = {
case Writer.WriteResult.ServerSideSchemaMismatch(_) => false.pure[F]
case _ => true.pure[F]
Expand All @@ -256,7 +262,7 @@ object Processing {
)
}

def handlingWriterWasClosedByEarlierErrors(env: Environment[F]): F[Writer.WriteResult] = {
def handlingWriterWasClosedByEarlierErrors(writerProvider: Writer.Provider[F]): F[Writer.WriteResult] = {
def onFailure(wr: Writer.WriteResult, details: RetryDetails): F[Unit] = {
val msg =
"BigQuery Writer was already closed by an earlier exception in a different Fiber. Will reset the Writer and try again immediately"
Expand All @@ -266,7 +272,7 @@ object Processing {
case _ =>
Logger[F].warn(msg)
}
log *> env.writer.closed.use_
log *> writerProvider.closed.use_
}
val retryImmediately = PolicyDecision.DelayAndRetry(Duration.Zero)
attempt
Expand All @@ -282,17 +288,18 @@ object Processing {
}

private def writeUntilSuccessful[F[_]: Async](
env: Environment[F],
writerProvider: Writer.Provider[F],
alterTableWaitPolicy: RetryPolicy[F],
badProcessor: BadRowProcessor,
batch: BatchAfterTransform
): F[BatchAfterTransform] =
if (batch.toBeInserted.isEmpty)
batch.pure[F]
else
env.writer.opened
writerProvider.opened
.use(_.write(batch.toBeInserted.map(_._2)))
.handlingWriterWasClosedByEarlierErrors(env)
.handlingServerSideSchemaMismatches(env)
.handlingWriterWasClosedByEarlierErrors(writerProvider)
.handlingServerSideSchemaMismatches(writerProvider, alterTableWaitPolicy)
.flatMap {
case Writer.WriteResult.SerializationFailures(failures) =>
val (badRows, tryAgains) = batch.toBeInserted.zipWithIndex.foldLeft((List.empty[BadRow], List.empty[EventWithTransform])) {
Expand All @@ -306,7 +313,8 @@ object Processing {
}
}
writeUntilSuccessful(
env,
writerProvider,
alterTableWaitPolicy,
badProcessor,
batch.copy(toBeInserted = tryAgains, badAccumulated = batch.badAccumulated.prepend(badRows))
)
Expand All @@ -319,37 +327,41 @@ object Processing {
* table
*/
private def handleSchemaEvolution[F[_]: Async](
env: Environment[F]
): Pipe[F, BatchAfterTransform, BatchAfterTransform] =
_.evalTap { batch =>
env.writer.opened
.use(_.descriptor)
.flatMap { descriptor =>
val fieldsToAdd = BigQuerySchemaUtils.alterTableRequired(descriptor, batch.entities)
if (fieldsToAdd.nonEmpty) {
env.tableManager.addColumns(fieldsToAdd.toVector).flatMap { fieldsToExist =>
openWriterUntilFieldsExist(env, fieldsToExist)
}
} else {
Sync[F].unit
env: Environment[F],
writerProvider: Writer.Provider[F],
batch: BatchAfterTransform
): F[Unit] =
writerProvider.opened
.use(_.descriptor)
.flatMap { descriptor =>
val fieldsToAdd = BigQuerySchemaUtils.alterTableRequired(descriptor, batch.entities)
if (fieldsToAdd.nonEmpty) {
env.tableManager.addColumns(fieldsToAdd.toVector).flatMap { fieldsToExist =>
openWriterUntilFieldsExist(writerProvider, env.alterTableWaitPolicy, fieldsToExist)
}
} else {
Sync[F].unit
}
.onError { _ =>
env.appHealth.beUnhealthyForRuntimeService(RuntimeService.BigQueryClient)
}
}
}
.onError { _ =>
env.appHealth.beUnhealthyForRuntimeService(RuntimeService.BigQueryClient)
}

private def openWriterUntilFieldsExist[F[_]: Async](env: Environment[F], fieldsToExist: FieldList): F[Unit] =
env.writer.opened
private def openWriterUntilFieldsExist[F[_]: Async](
writerProvider: Writer.Provider[F],
alterTableWaitPolicy: RetryPolicy[F],
fieldsToExist: FieldList
): F[Unit] =
writerProvider.opened
.use(_.descriptor)
.retryingOnFailures(
policy = env.alterTableWaitPolicy,
policy = alterTableWaitPolicy,
wasSuccessful = { descriptor =>
(!BigQuerySchemaUtils.fieldsMissingFromDescriptor(descriptor, fieldsToExist)).pure[F]
},
onFailure = { case (_, details) =>
val msg = show"Newly added columns have not yet propagated to the BigQuery Writer client-side. $details"
Logger[F].warn(msg) *> env.writer.closed.use_
Logger[F].warn(msg) *> writerProvider.closed.use_
}
)
.void
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ object MockEnvironment {
resolver = Resolver[IO](Nil, None),
httpClient = testHttpClient,
tableManager = testTableManager(mocks.addColumnsResponse, state),
writer = writerColdswap,
writers = Vector(writerColdswap),
metrics = testMetrics(state),
appHealth = testAppHealth(state),
alterTableWaitPolicy = BigQueryRetrying.policyForAlterTableWait[IO](retriesConfig),
Expand Down

0 comments on commit aa530c0

Please sign in to comment.