Skip to content

Commit

Permalink
Client per channel
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Nov 17, 2024
1 parent df2089d commit fe61b61
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ case class Environment[F[_]](
badSink: Sink[F],
httpClient: Client[F],
tableManager: TableManager[F],
channel: Channel.Provider[F],
channels: Vector[Channel.Provider[F]],
metrics: Metrics[F],
appHealth: AppHealth.Interface[F, Alert, RuntimeService],
batching: Config.Batching,
Expand Down Expand Up @@ -63,16 +63,20 @@ object Environment {
badSink <- toSink(config.output.bad.sink).onError(_ => Resource.eval(appHealth.beUnhealthyForRuntimeService(RuntimeService.BadSink)))
metrics <- Resource.eval(Metrics.build(config.monitoring.metrics))
tableManager <- Resource.eval(TableManager.make(config.output.good, appHealth, config.retries))
channelOpener <- Channel.opener(config.output.good, config.batching, config.retries, appHealth)
channelProvider <- Channel.provider(channelOpener, config.retries, appHealth)
channelProviders <- Vector.range(0, config.batching.uploadConcurrency).traverse { index =>
for {
channelOpener <- Channel.opener(config.output.good, config.batching, config.retries, appHealth, index)
channelProvider <- Channel.provider(channelOpener, config.retries, appHealth)
} yield channelProvider
}
cpuParallelism = chooseCpuParallelism(config)
} yield Environment(
appInfo = appInfo,
source = sourceAndAck,
badSink = badSink,
httpClient = httpClient,
tableManager = tableManager,
channel = channelProvider,
channels = channelProviders,
metrics = metrics,
appHealth = appHealth,
batching = config.batching,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
package com.snowplowanalytics.snowplow.snowflake.processing

import cats.effect.{Async, Poll, Resource, Sync}
import cats.effect.std.Semaphore
import cats.implicits._
import com.snowplowanalytics.snowplow.runtime.AppHealth
import com.snowplowanalytics.snowplow.runtime.processing.Coldswap
Expand All @@ -25,7 +24,6 @@ import org.typelevel.log4cats.slf4j.Slf4jLogger
import java.time.ZoneOffset
import java.util.Properties
import scala.jdk.CollectionConverters._
import scala.concurrent.duration.DurationLong

trait Channel[F[_]] {

Expand Down Expand Up @@ -109,13 +107,13 @@ object Channel {
config: Config.Snowflake,
batchingConfig: Config.Batching,
retriesConfig: Config.Retries,
appHealth: AppHealth.Interface[F, Alert, RuntimeService]
appHealth: AppHealth.Interface[F, Alert, RuntimeService],
index: Int
): Resource[F, Opener[F]] =
for {
client <- createClient(config, batchingConfig, retriesConfig, appHealth)
semaphore <- Resource.eval(Semaphore[F](1L))
} yield new Opener[F] {
def open: F[CloseableChannel[F]] = createChannel[F](config, client).map(impl[F](_, semaphore))
def open: F[CloseableChannel[F]] = createChannel[F](config, client, index).map(impl[F])
}

def provider[F[_]: Async](
Expand All @@ -140,32 +138,18 @@ object Channel {
Resource.makeFull(make)(_.close)
}

private def impl[F[_]: Async](channel: SnowflakeStreamingIngestChannel, semaphore: Semaphore[F]): CloseableChannel[F] =
private def impl[F[_]: Async](channel: SnowflakeStreamingIngestChannel): CloseableChannel[F] =
new CloseableChannel[F] {

def write(rows: List[Iterable[Map[String, AnyRef]]]): F[WriteResult] = {

val attempt: F[WriteResult] = semaphore.permit
.surround {
for {
responses <- rows.traverse { inner =>
Sync[F].blocking(channel.insertRows(inner.map(_.asJava).asJava, null))
}
future <- Sync[F].delay(SnowsFlakePlowInterop.flushChannel(channel))
_ <- Sync[F].untilDefinedM {
for {
_ <- Sync[F].sleep(100.millis)
isEmpty <- Sync[F].delay(SnowsFlakePlowInterop.isEmpty(channel))
} yield if (isEmpty) Some(()) else None
}
} yield (future, responses)
}
.flatMap { case (future, responses) =>
for {
_ <- Async[F].fromCompletableFuture(Sync[F].pure(future))
isValid <- Sync[F].delay(channel.isValid)
} yield if (isValid) WriteResult.WriteFailures(parseResponse(responses)) else WriteResult.ChannelIsInvalid
}
val attempt: F[WriteResult] = for {
responses <- rows.traverse { inner =>
Sync[F].blocking(channel.insertRows(inner.map(_.asJava).asJava, null))
}
_ <- flushChannel[F](channel)
isValid <- Sync[F].delay(channel.isValid)
} yield if (isValid) WriteResult.WriteFailures(parseResponse(responses)) else WriteResult.ChannelIsInvalid

attempt.recover {
case sfe: SFException if sfe.getVendorCode === SFErrorCode.INVALID_CHANNEL.getMessageCode =>
Expand Down Expand Up @@ -203,20 +187,22 @@ object Channel {

private def createChannel[F[_]: Async](
config: Config.Snowflake,
client: SnowflakeStreamingIngestClient
client: SnowflakeStreamingIngestClient,
index: Int
): F[SnowflakeStreamingIngestChannel] = {
val channelName = s"${config.channel}-$index"
val request = OpenChannelRequest
.builder(config.channel)
.builder(channelName)
.setDBName(config.database)
.setSchemaName(config.schema)
.setTableName(config.table)
.setOnErrorOption(OpenChannelRequest.OnErrorOption.CONTINUE)
.setDefaultTimezone(ZoneOffset.UTC)
.build

Logger[F].info(s"Opening channel ${config.channel}") *>
Logger[F].info(s"Opening channel ${channelName}") *>
Async[F].blocking(client.openChannel(request)) <*
Logger[F].info(s"Successfully opened channel ${config.channel}")
Logger[F].info(s"Successfully opened channel ${channelName}")
}

private def channelProperties(config: Config.Snowflake, batchingConfig: Config.Batching): Properties = {
Expand Down Expand Up @@ -266,4 +252,15 @@ object Channel {
Resource.makeFull(make)(client => Sync[F].blocking(client.close()))
}

/**
* Flushes the channel
*
* The public interface of the Snowflake SDK does not tell us when the events are safely written
* to Snowflake. So we must cast it to an Internal class so we get access to the `flush()` method.
*/
private def flushChannel[F[_]: Async](channel: SnowflakeStreamingIngestChannel): F[Unit] =
Async[F].fromCompletableFuture {
Async[F].delay(SnowsFlakePlowInterop.flushChannel(channel))
}.void

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package com.snowplowanalytics.snowplow.snowflake.processing

import cats.implicits._
import cats.effect.implicits._
import cats.{Applicative, Foldable}
import cats.effect.{Async, Sync}
import cats.effect.kernel.Unique
Expand Down Expand Up @@ -42,7 +43,7 @@ object Processing {
def stream[F[_]: Async](env: Environment[F]): Stream[F, Nothing] = {
val eventProcessingConfig = EventProcessingConfig(EventProcessingConfig.NoWindowing)
Stream.eval(env.tableManager.initializeEventsTable()) *>
Stream.eval(env.channel.opened.use_) *>
Stream.eval(env.channels.parTraverse(_.opened.use_)) *>
env.source.stream(eventProcessingConfig, eventProcessor(env))
}

Expand Down Expand Up @@ -123,7 +124,6 @@ object Processing {
in.through(setLatency(env.metrics))
.through(parseAndTransform(env, badProcessor))
.through(BatchUp.withTimeout(env.batching.maxBytes, env.batching.maxDelay))
.prefetchN(2)
.through(writeToSnowflake(env, badProcessor))
.through(sendFailedEvents(env, badProcessor))
.through(sendMetrics(env))
Expand Down Expand Up @@ -190,15 +190,17 @@ object Processing {
env: Environment[F],
badProcessor: BadRowProcessor
): Pipe[F, BatchAfterTransform, BatchAfterTransform] =
_.parEvalMap(env.batching.uploadConcurrency) { batch =>
for {
batch <- writeAttempt1(env, badProcessor, batch)
batch <- writeAttempt2(env, badProcessor, batch)
} yield batch
}
_.zip(Stream.emits(env.channels).repeat)
.parEvalMap(env.channels.length) { case (batch, channelProvider) =>
for {
batch <- writeAttempt1(env, badProcessor, channelProvider, batch)
batch <- writeAttempt2(env, badProcessor, channelProvider, batch)
} yield batch
}

private def withWriteAttempt[F[_]: Sync](
env: Environment[F],
channelProvider: Channel.Provider[F],
batch: BatchAfterTransform
)(
handleFailures: List[Channel.WriteFailure] => F[BatchAfterTransform]
Expand All @@ -208,14 +210,14 @@ object Processing {
batch.pure[F]
else
Sync[F].untilDefinedM {
env.channel.opened
channelProvider.opened
.use { channel =>
channel.write(batch.toBeInserted.map(_.view.map(_._2)))
}
.flatMap {
case Channel.WriteResult.ChannelIsInvalid =>
// Reset the channel and immediately try again
env.channel.closed.use_.as(none)
channelProvider.closed.use_.as(none)
case Channel.WriteResult.WriteFailures(notWritten) =>
handleFailures(notWritten).map(Some(_))
}
Expand All @@ -236,13 +238,14 @@ object Processing {
private def writeAttempt1[F[_]: Sync](
env: Environment[F],
badProcessor: BadRowProcessor,
channelProvider: Channel.Provider[F],
batch: BatchAfterTransform
): F[BatchAfterTransform] =
withWriteAttempt(env, batch) { notWritten =>
withWriteAttempt(env, channelProvider, batch) { notWritten =>
val parsedResult = ParsedWriteResult.buildFrom(batch.toBeInserted, notWritten)
for {
_ <- abortIfFatalException[F](parsedResult.unexpectedFailures)
_ <- handleSchemaEvolution(env, parsedResult.extraColsRequired)
_ <- handleSchemaEvolution(env, channelProvider, parsedResult.extraColsRequired)
} yield {
val moreBad = parsedResult.unexpectedFailures.map { case (event, sfe) =>
badRowFromEnqueueFailure(badProcessor, event, sfe)
Expand All @@ -263,9 +266,10 @@ object Processing {
private def writeAttempt2[F[_]: Sync](
env: Environment[F],
badProcessor: BadRowProcessor,
channelProvider: Channel.Provider[F],
batch: BatchAfterTransform
): F[BatchAfterTransform] =
withWriteAttempt(env, batch) { notWritten =>
withWriteAttempt(env, channelProvider, batch) { notWritten =>
val mapped = notWritten match {
case Nil => Nil
case more =>
Expand Down Expand Up @@ -328,12 +332,13 @@ object Processing {
*/
private def handleSchemaEvolution[F[_]: Sync](
env: Environment[F],
channelProvider: Channel.Provider[F],
extraColsRequired: Set[String]
): F[Unit] =
if (extraColsRequired.isEmpty)
().pure[F]
else
env.channel.closed.surround {
channelProvider.closed.surround {
env.tableManager.addColumns(extraColsRequired.toList)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

package net.snowflake.ingest.streaming.internal

import cats.implicits._
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel
import net.snowflake.ingest.streaming.internal.SnowflakeStreamingIngestChannelInternal

Expand All @@ -27,7 +26,4 @@ object SnowsFlakePlowInterop {
def flushChannel(channel: SnowflakeStreamingIngestChannel): CompletableFuture[Void] =
channel.asInstanceOf[SnowflakeStreamingIngestChannelInternal[_]].flush(false)

def isEmpty(channel: SnowflakeStreamingIngestChannel): Boolean =
channel.asInstanceOf[SnowflakeStreamingIngestChannelInternal[_]].getRowBuffer.getSize === 0.0f

}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ object MockEnvironment {
badSink = testBadSink(mocks.badSinkResponse, state),
httpClient = testHttpClient,
tableManager = testTableManager(state),
channel = channelColdswap,
channels = Vector(channelColdswap),
metrics = testMetrics(state),
appHealth = testAppHealth(state),
batching = Config.Batching(
Expand Down

0 comments on commit fe61b61

Please sign in to comment.