Skip to content

Commit

Permalink
Fix build breakage due to spotlessScalaCheck failures (#502)
Browse files Browse the repository at this point in the history
Fix build breakage due to spotlessScalaCheck failures on upstream streams-scala code. (Ran './gradlew :spotlessApply' to auto-format; build succeeds now. Fix matches upstream Apache actions, and build now succeeds.)
  • Loading branch information
groelofs authored Jan 27, 2024
1 parent 4253d39 commit e72c1bf
Show file tree
Hide file tree
Showing 23 changed files with 308 additions and 213 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,17 @@ object ImplicitConversions {
implicit def groupedFromSerde[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): Grouped[K, V] =
Grouped.`with`[K, V]

implicit def joinedFromKeyValueOtherSerde[K, V, VO](implicit keySerde: Serde[K],
valueSerde: Serde[V],
otherValueSerde: Serde[VO]): Joined[K, V, VO] =
implicit def joinedFromKeyValueOtherSerde[K, V, VO](implicit
keySerde: Serde[K],
valueSerde: Serde[V],
otherValueSerde: Serde[VO]
): Joined[K, V, VO] =
Joined.`with`[K, V, VO]

implicit def materializedFromSerde[K, V, S <: StateStore](implicit keySerde: Serde[K],
valueSerde: Serde[V]): Materialized[K, V, S] =
implicit def materializedFromSerde[K, V, S <: StateStore](implicit
keySerde: Serde[K],
valueSerde: Serde[V]
): Materialized[K, V, S] =
Materialized.`with`[K, V, S]

implicit def producedFromSerde[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): Produced[K, V] =
Expand All @@ -95,8 +99,10 @@ object ImplicitConversions {
implicit def repartitionedFromSerde[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): Repartitioned[K, V] =
Repartitioned.`with`[K, V]

implicit def streamJoinFromKeyValueOtherSerde[K, V, VO](implicit keySerde: Serde[K],
valueSerde: Serde[V],
otherValueSerde: Serde[VO]): StreamJoined[K, V, VO] =
implicit def streamJoinFromKeyValueOtherSerde[K, V, VO](implicit
keySerde: Serde[K],
valueSerde: Serde[V],
otherValueSerde: Serde[VO]
): StreamJoined[K, V, VO] =
StreamJoined.`with`[K, V, VO]
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,10 @@ object Serdes {
}
)

def fromFn[T >: Null](serializer: (String, T) => Array[Byte],
deserializer: (String, Array[Byte]) => Option[T]): Serde[T] =
def fromFn[T >: Null](
serializer: (String, T) => Array[Byte],
deserializer: (String, Array[Byte]) => Option[T]
): Serde[T] =
JSerdes.serdeFrom(
new Serializer[T] {
override def serialize(topic: String, data: T): Array[Byte] = serializer(topic, data)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) {
* @see #table(String)
* @see `org.apache.kafka.streams.StreamsBuilder#table`
*/
def table[K, V](topic: String, materialized: Materialized[K, V, ByteArrayKeyValueStore])(
implicit consumed: Consumed[K, V]
def table[K, V](topic: String, materialized: Materialized[K, V, ByteArrayKeyValueStore])(implicit
consumed: Consumed[K, V]
): KTable[K, V] =
new KTable(inner.table[K, V](topic, consumed, materialized))

Expand All @@ -146,8 +146,8 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) {
* @return a `GlobalKTable` for the specified topic
* @see `org.apache.kafka.streams.StreamsBuilder#globalTable`
*/
def globalTable[K, V](topic: String, materialized: Materialized[K, V, ByteArrayKeyValueStore])(
implicit consumed: Consumed[K, V]
def globalTable[K, V](topic: String, materialized: Materialized[K, V, ByteArrayKeyValueStore])(implicit
consumed: Consumed[K, V]
): GlobalKTable[K, V] =
inner.globalTable(topic, consumed, materialized)

Expand Down Expand Up @@ -177,10 +177,12 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) {
"Use #addGlobalStore(StoreBuilder, String, Consumed, org.apache.kafka.streams.processor.api.ProcessorSupplier) instead.",
"2.7.0"
)
def addGlobalStore[K, V](storeBuilder: StoreBuilder[_ <: StateStore],
topic: String,
consumed: Consumed[K, V],
stateUpdateSupplier: ProcessorSupplier[K, V]): StreamsBuilderJ =
def addGlobalStore[K, V](
storeBuilder: StoreBuilder[_ <: StateStore],
topic: String,
consumed: Consumed[K, V],
stateUpdateSupplier: ProcessorSupplier[K, V]
): StreamsBuilderJ =
inner.addGlobalStore(storeBuilder, topic, consumed, stateUpdateSupplier)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class BranchedKStream[K, V](val inner: BranchedKStreamJ[K, V]) {
def noDefaultBranch(): Map[String, KStream[K, V]] = toScalaMap(inner.noDefaultBranch())

private def toScalaMap(m: util.Map[String, kstream.KStream[K, V]]): collection.immutable.Map[String, KStream[K, V]] =
m.asScala.map {
case (name, kStreamJ) => (name, new KStream(kStreamJ))
m.asScala.map { case (name, kStreamJ) =>
(name, new KStream(kStreamJ))
}.toMap
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,10 @@ class CogroupedKStream[KIn, VOut](val inner: CogroupedKStreamJ[KIn, VOut]) {
* @param aggregator a function that computes a new aggregate result
* @return a [[CogroupedKStream]]
*/
def cogroup[VIn](groupedStream: KGroupedStream[KIn, VIn],
aggregator: (KIn, VIn, VOut) => VOut): CogroupedKStream[KIn, VOut] =
def cogroup[VIn](
groupedStream: KGroupedStream[KIn, VIn],
aggregator: (KIn, VIn, VOut) => VOut
): CogroupedKStream[KIn, VOut] =
new CogroupedKStream(inner.cogroup(groupedStream.inner, aggregator.asAggregator))

/**
Expand All @@ -58,8 +60,8 @@ class CogroupedKStream[KIn, VOut](val inner: CogroupedKStreamJ[KIn, VOut]) {
* (rolling) aggregate for each key
* @see `org.apache.kafka.streams.kstream.CogroupedKStream#aggregate`
*/
def aggregate(initializer: => VOut)(
implicit materialized: Materialized[KIn, VOut, ByteArrayKeyValueStore]
def aggregate(initializer: => VOut)(implicit
materialized: Materialized[KIn, VOut, ByteArrayKeyValueStore]
): KTable[KIn, VOut] = new KTable(inner.aggregate((() => initializer).asInitializer, materialized))

/**
Expand All @@ -74,8 +76,8 @@ class CogroupedKStream[KIn, VOut](val inner: CogroupedKStreamJ[KIn, VOut]) {
* (rolling) aggregate for each key
* @see `org.apache.kafka.streams.kstream.CogroupedKStream#aggregate`
*/
def aggregate(initializer: => VOut, named: Named)(
implicit materialized: Materialized[KIn, VOut, ByteArrayKeyValueStore]
def aggregate(initializer: => VOut, named: Named)(implicit
materialized: Materialized[KIn, VOut, ByteArrayKeyValueStore]
): KTable[KIn, VOut] = new KTable(inner.aggregate((() => initializer).asInitializer, named, materialized))

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ object Consumed {
* @tparam V value type
* @return a new instance of [[Consumed]]
*/
def `with`[K, V](timestampExtractor: TimestampExtractor)(implicit keySerde: Serde[K],
valueSerde: Serde[V]): ConsumedJ[K, V] =
def `with`[K, V](
timestampExtractor: TimestampExtractor
)(implicit keySerde: Serde[K], valueSerde: Serde[V]): ConsumedJ[K, V] =
ConsumedJ.`with`(timestampExtractor).withKeySerde(keySerde).withValueSerde(valueSerde)

/**
Expand All @@ -73,7 +74,8 @@ object Consumed {
* @param resetPolicy the offset reset policy to be used. If `null` the default reset policy from config will be used
* @return a new instance of [[Consumed]]
*/
def `with`[K, V](resetPolicy: Topology.AutoOffsetReset)(implicit keySerde: Serde[K],
valueSerde: Serde[V]): ConsumedJ[K, V] =
def `with`[K, V](
resetPolicy: Topology.AutoOffsetReset
)(implicit keySerde: Serde[K], valueSerde: Serde[V]): ConsumedJ[K, V] =
ConsumedJ.`with`(resetPolicy).withKeySerde(keySerde).withValueSerde(valueSerde)
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@ object Joined {
* @param otherValueSerde the otherValue serde to use. If `null` the default value serde from config will be used
* @return new [[org.apache.kafka.streams.kstream.Joined]] instance with the provided serdes
*/
def `with`[K, V, VO](implicit keySerde: Serde[K],
valueSerde: Serde[V],
otherValueSerde: Serde[VO]): JoinedJ[K, V, VO] =
def `with`[K, V, VO](implicit
keySerde: Serde[K],
valueSerde: Serde[V],
otherValueSerde: Serde[VO]
): JoinedJ[K, V, VO] =
JoinedJ.`with`(keySerde, valueSerde, otherValueSerde)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,9 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) {
* latest (rolling) aggregate for each key
* @see `org.apache.kafka.streams.kstream.KGroupedStream#reduce`
*/
def reduce(reducer: (V, V) => V,
named: Named)(implicit materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] =
def reduce(reducer: (V, V) => V, named: Named)(implicit
materialized: Materialized[K, V, ByteArrayKeyValueStore]
): KTable[K, V] =
new KTable(inner.reduce(reducer.asReducer, materialized))

/**
Expand All @@ -125,8 +126,8 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) {
* latest (rolling) aggregate for each key
* @see `org.apache.kafka.streams.kstream.KGroupedStream#aggregate`
*/
def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR)(
implicit materialized: Materialized[K, VR, ByteArrayKeyValueStore]
def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR)(implicit
materialized: Materialized[K, VR, ByteArrayKeyValueStore]
): KTable[K, VR] =
new KTable(inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator, materialized))

Expand All @@ -141,8 +142,8 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) {
* latest (rolling) aggregate for each key
* @see `org.apache.kafka.streams.kstream.KGroupedStream#aggregate`
*/
def aggregate[VR](initializer: => VR, named: Named)(aggregator: (K, V, VR) => VR)(
implicit materialized: Materialized[K, VR, ByteArrayKeyValueStore]
def aggregate[VR](initializer: => VR, named: Named)(aggregator: (K, V, VR) => VR)(implicit
materialized: Materialized[K, VR, ByteArrayKeyValueStore]
): KTable[K, VR] =
new KTable(inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator, named, materialized))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,9 @@ class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) {
* latest (rolling) aggregate for each key
* @see `org.apache.kafka.streams.kstream.KGroupedTable#reduce`
*/
def reduce(adder: (V, V) => V,
subtractor: (V, V) => V)(implicit materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] =
def reduce(adder: (V, V) => V, subtractor: (V, V) => V)(implicit
materialized: Materialized[K, V, ByteArrayKeyValueStore]
): KTable[K, V] =
new KTable(inner.reduce(adder.asReducer, subtractor.asReducer, materialized))

/**
Expand All @@ -92,8 +93,8 @@ class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) {
* latest (rolling) aggregate for each key
* @see `org.apache.kafka.streams.kstream.KGroupedTable#reduce`
*/
def reduce(adder: (V, V) => V, subtractor: (V, V) => V, named: Named)(
implicit materialized: Materialized[K, V, ByteArrayKeyValueStore]
def reduce(adder: (V, V) => V, subtractor: (V, V) => V, named: Named)(implicit
materialized: Materialized[K, V, ByteArrayKeyValueStore]
): KTable[K, V] =
new KTable(inner.reduce(adder.asReducer, subtractor.asReducer, named, materialized))

Expand All @@ -109,8 +110,8 @@ class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) {
* latest (rolling) aggregate for each key
* @see `org.apache.kafka.streams.kstream.KGroupedTable#aggregate`
*/
def aggregate[VR](initializer: => VR)(adder: (K, V, VR) => VR, subtractor: (K, V, VR) => VR)(
implicit materialized: Materialized[K, VR, ByteArrayKeyValueStore]
def aggregate[VR](initializer: => VR)(adder: (K, V, VR) => VR, subtractor: (K, V, VR) => VR)(implicit
materialized: Materialized[K, VR, ByteArrayKeyValueStore]
): KTable[K, VR] =
new KTable(
inner.aggregate((() => initializer).asInitializer, adder.asAggregator, subtractor.asAggregator, materialized)
Expand All @@ -129,14 +130,16 @@ class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) {
* latest (rolling) aggregate for each key
* @see `org.apache.kafka.streams.kstream.KGroupedTable#aggregate`
*/
def aggregate[VR](initializer: => VR, named: Named)(adder: (K, V, VR) => VR, subtractor: (K, V, VR) => VR)(
implicit materialized: Materialized[K, VR, ByteArrayKeyValueStore]
def aggregate[VR](initializer: => VR, named: Named)(adder: (K, V, VR) => VR, subtractor: (K, V, VR) => VR)(implicit
materialized: Materialized[K, VR, ByteArrayKeyValueStore]
): KTable[K, VR] =
new KTable(
inner.aggregate((() => initializer).asInitializer,
adder.asAggregator,
subtractor.asAggregator,
named,
materialized)
inner.aggregate(
(() => initializer).asInitializer,
adder.asAggregator,
subtractor.asAggregator,
named,
materialized
)
)
}
Loading

0 comments on commit e72c1bf

Please sign in to comment.