Skip to content

Commit

Permalink
finagle/finagle-core: Introduce ClearBroadcastContextFilter
Browse files Browse the repository at this point in the history
Problem

We want to use `MarshalledContext.retainIds` to clear the broadcast context
except for certain keys.

Solution

Create a filter `ClearBroadcastContextFilter`.

Differential Revision: https://phabricator.twitter.biz/D1179110
  • Loading branch information
jcrossley authored and jenkins committed Oct 29, 2024
1 parent 745f711 commit 8b2553c
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.twitter.finagle.context
import com.twitter.io.Buf
import com.twitter.util.{Return, Throw, Try}
import com.twitter.util.Return
import com.twitter.util.Throw
import com.twitter.util.Try

private[finagle] final class BackupRequest private ()

Expand Down Expand Up @@ -34,7 +36,7 @@ object BackupRequest {
}
}

private[finagle] val Ctx: Contexts.broadcast.Key[BackupRequest] = new Context
private[twitter] val Ctx: Contexts.broadcast.Key[BackupRequest] = new Context

/**
* Whether or not a request was initiated as a backup request.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ final class MarshalledContext private[context] extends Context {
/**
* The identifier used to lookup the key in the stored context.
*/
private[context] final val lookupId: String = normalizeId(id)
private[finagle] final val lookupId: String = normalizeId(id)

/**
* Marshal an A-typed value into a Buf.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.twitter.finagle.filter

import com.twitter.finagle.context.Contexts
import com.twitter.finagle._
import com.twitter.util.Future

/**
* ClearBroadcastContextFilter clears the broadcast context of all context keys except those specified.
*/
private[twitter] object ClearBroadcastContextFilter {
val role = Stack.Role("ClearBroadcastContextFilter")

/**
* Creates a [[com.twitter.finagle.Stackable]] [[com.twitter.finagle.Filter]] that clears the
* broadcast context of all context keys except those specified.
*/
def module[Req, Rep](
retainKeys: Set[Contexts.broadcast.Key[_]]
): Stackable[ServiceFactory[Req, Rep]] =
new Stack.Module0[ServiceFactory[Req, Rep]] {
val role = ClearBroadcastContextFilter.role
val description =
s"Clears the broadcast context of all keys except: ${retainKeys.mkString(",")}"
val lookupIds = retainKeys.map(_.lookupId)
def make(next: ServiceFactory[Req, Rep]): ServiceFactory[Req, Rep] = {
val filter = new SimpleFilter[Req, Rep] {
def apply(request: Req, service: Service[Req, Rep]): Future[Rep] = {
Contexts.broadcast.retainIds(lookupIds) {
service(request)
}
}
}
filter.andThen(next)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ object ClientId {
// As a matter of legacy, we need to support the notion of
// an empty client id. Old version of contexts could serialize
// the absence of a client id with an empty buffer.
private[finagle] val clientIdCtx =
private[twitter] val clientIdCtx =
new Contexts.broadcast.Key[Option[ClientId]]("com.twitter.finagle.thrift.ClientIdContext") {
def marshal(clientId: Option[ClientId]): Buf = clientId match {
case None => Buf.Empty
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.twitter.finagle.filter

import com.twitter.conversions.DurationOps._
import com.twitter.finagle._
import com.twitter.finagle.context.Contexts
import com.twitter.finagle.context.Deadline
import com.twitter.finagle.context.Requeues
import com.twitter.finagle.stack.nilStack
import com.twitter.io.Buf
import com.twitter.util.Await
import com.twitter.util.Future
import org.scalatest.funsuite.AnyFunSuite

class ClearBroadcastContextFilterTest extends AnyFunSuite {

test("clears context except for the configured keys") {
val clearContextFilter =
ClearBroadcastContextFilter.module[Unit, Set[String]](retainKeys = Set(Deadline))

val svcModule = new Stack.Module0[ServiceFactory[Unit, Set[String]]] {
val role = Stack.Role("svcModule")
val description = ""

def make(next: ServiceFactory[Unit, Set[String]]) =
ServiceFactory.const(
Service.mk[Unit, Set[String]](_ =>
Future.value(Contexts.broadcast
.marshal().map {
case (k, v) =>
Buf.Utf8.unapply(k).get
}.toSet)))
}

val factory = new StackBuilder[ServiceFactory[Unit, Set[String]]](nilStack[Unit, Set[String]])
.push(svcModule)
.push(clearContextFilter)
.make(Stack.Params.empty)

val svc: Service[Unit, Set[String]] = Await.result(factory(), 1.second)
Contexts.broadcast.let(Requeues, Requeues(5), Deadline, Deadline.ofTimeout(5.seconds)) {
assert(Await.result(svc(()), 1.second) == Set(Deadline.id))
}
}
}

0 comments on commit 8b2553c

Please sign in to comment.