Skip to content

Commit

Permalink
Archive older versions (#13)
Browse files Browse the repository at this point in the history
* Archive older versions

* Fix link

* Improve text

* Remove legacy doc

* Improve

* Fix links

* Add badge

* Improve link

* Fix links

* Indentation

* Add link

* Improve format

* Add 💙

* Rename kcat
  • Loading branch information
rogervinas authored Nov 10, 2023
1 parent 07a40b1 commit efadcc4
Show file tree
Hide file tree
Showing 12 changed files with 329 additions and 461 deletions.
506 changes: 183 additions & 323 deletions README.md

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions src/main/kotlin/com/rogervinas/stream/MyApplication.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ import org.springframework.boot.runApplication
class MyApplication

fun main(args: Array<String>) {
runApplication<MyApplication>(*args)
}
runApplication<MyApplication>(*args)
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
package com.rogervinas.stream.domain

interface MyEventConsumer {
fun consume(event: MyEvent)
}
fun consume(event: MyEvent)
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
package com.rogervinas.stream.domain

interface MyEventProducer {
fun produce(event: MyEvent)
}
fun produce(event: MyEvent)
}
32 changes: 16 additions & 16 deletions src/main/kotlin/com/rogervinas/stream/functional/MyConfiguration.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,22 @@ import org.springframework.context.annotation.Configuration
@Configuration
class MyConfiguration {

@Bean("my-producer")
fun myStreamEventProducer(): MyEventProducer {
return MyStreamEventProducer()
}
@Bean("my-producer")
fun myStreamEventProducer(): MyEventProducer {
return MyStreamEventProducer()
}

@Bean("my-consumer")
fun myStreamEventConsumer(consumer: MyEventConsumer): MyStreamEventConsumer {
return MyStreamEventConsumer(consumer)
}
@Bean("my-consumer")
fun myStreamEventConsumer(consumer: MyEventConsumer): MyStreamEventConsumer {
return MyStreamEventConsumer(consumer)
}

@Bean
fun myEventConsumer(): MyEventConsumer {
return object : MyEventConsumer {
override fun consume(event: MyEvent) {
println("Received ${event.text}")
}
}
@Bean
fun myEventConsumer(): MyEventConsumer {
return object : MyEventConsumer {
override fun consume(event: MyEvent) {
println("Received ${event.text}")
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ import java.util.function.Consumer

class MyStreamEventConsumer(private val consumer: MyEventConsumer) : Consumer<MyEventPayload> {

override fun accept(payload: MyEventPayload) {
consumer.consume(fromPayload(payload))
}
override fun accept(payload: MyEventPayload) {
consumer.consume(fromPayload(payload))
}

private fun fromPayload(payload: MyEventPayload): MyEvent {
return MyEvent(payload.string)
}
}
private fun fromPayload(payload: MyEventPayload): MyEvent {
return MyEvent(payload.string)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,25 @@ import java.util.function.Supplier

class MyStreamEventProducer : Supplier<Flux<Message<MyEventPayload>>>, MyEventProducer {

val sink = Sinks.many().unicast().onBackpressureBuffer<Message<MyEventPayload>>()
val sink = Sinks.many().unicast().onBackpressureBuffer<Message<MyEventPayload>>()

override fun produce(event: MyEvent) {
val message = MessageBuilder
.withPayload(toPayload(event))
.setHeader(KafkaHeaders.MESSAGE_KEY, toKey(event))
.build()
sink.emitNext(message, FAIL_FAST)
}
override fun produce(event: MyEvent) {
val message = MessageBuilder
.withPayload(toPayload(event))
.setHeader(KafkaHeaders.MESSAGE_KEY, toKey(event))
.build()
sink.emitNext(message, FAIL_FAST)
}

override fun get(): Flux<Message<MyEventPayload>> {
return sink.asFlux()
}
override fun get(): Flux<Message<MyEventPayload>> {
return sink.asFlux()
}

private fun toPayload(event: MyEvent): MyEventPayload {
return MyEventPayload(event.text, event.text.length)
}
private fun toPayload(event: MyEvent): MyEventPayload {
return MyEventPayload(event.text, event.text.length)
}

private fun toKey(event: MyEvent): String {
return "key-${event.text.length}"
}
}
private fun toKey(event: MyEvent): String {
return "key-${event.text.length}"
}
}
68 changes: 34 additions & 34 deletions src/main/kotlin/com/rogervinas/stream/shared/MyContainers.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,40 +16,40 @@ import javax.annotation.PreDestroy
@Profile("docker-compose")
class MyContainers {

private val KAFKA = "kafka"
private val KAFKA_PORT = 9094

private val ZOOKEEPER = "zookeeper"
private val ZOOKEEPER_PORT = 2181

private val container = DockerComposeContainer<Nothing>(File("docker-compose.yml"))
.apply { withLocalCompose(true) }
.apply {
withExposedService(KAFKA, KAFKA_PORT, WaitAllStrategy(WITH_INDIVIDUAL_TIMEOUTS_ONLY)
.apply { withStrategy(forListeningPortFixDockerDesktop322()) }
.apply { withStrategy(forLogMessage(".*creating topics.*", 1)) }
)
}
.apply {
withExposedService(ZOOKEEPER, ZOOKEEPER_PORT, WaitAllStrategy(WITH_INDIVIDUAL_TIMEOUTS_ONLY)
.apply { withStrategy(forListeningPortFixDockerDesktop322()) }
.apply { withStrategy(forLogMessage(".*binding to port.*", 1)) }
)
}

private fun forListeningPortFixDockerDesktop322() = HostPortWaitStrategyFixDockerDesktop322()

@PostConstruct
fun start() = container.start()

@PreDestroy
fun stop() = container.stop()
private val KAFKA = "kafka"
private val KAFKA_PORT = 9094

private val ZOOKEEPER = "zookeeper"
private val ZOOKEEPER_PORT = 2181

private val container = DockerComposeContainer<Nothing>(File("docker-compose.yml"))
.apply { withLocalCompose(true) }
.apply {
withExposedService(KAFKA, KAFKA_PORT, WaitAllStrategy(WITH_INDIVIDUAL_TIMEOUTS_ONLY)
.apply { withStrategy(forListeningPortFixDockerDesktop322()) }
.apply { withStrategy(forLogMessage(".*creating topics.*", 1)) }
)
}
.apply {
withExposedService(ZOOKEEPER, ZOOKEEPER_PORT, WaitAllStrategy(WITH_INDIVIDUAL_TIMEOUTS_ONLY)
.apply { withStrategy(forListeningPortFixDockerDesktop322()) }
.apply { withStrategy(forLogMessage(".*binding to port.*", 1)) }
)
}

private fun forListeningPortFixDockerDesktop322() = HostPortWaitStrategyFixDockerDesktop322()

@PostConstruct
fun start() = container.start()

@PreDestroy
fun stop() = container.stop()
}

class HostPortWaitStrategyFixDockerDesktop322 : HostPortWaitStrategy() {
override fun getLivenessCheckPorts(): MutableSet<Int> {
return super.getLivenessCheckPorts().stream()
.filter { port -> port > 0 }
.collect(Collectors.toSet())
}
}
override fun getLivenessCheckPorts(): MutableSet<Int> {
return super.getLivenessCheckPorts().stream()
.filter { port -> port > 0 }
.collect(Collectors.toSet())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ import com.fasterxml.jackson.annotation.JsonCreator
import com.fasterxml.jackson.annotation.JsonProperty

class MyEventPayload @JsonCreator constructor(
@JsonProperty("string") val string: String,
@JsonProperty("number") val number: Int
)
@JsonProperty("string") val string: String,
@JsonProperty("number") val number: Int
)
8 changes: 6 additions & 2 deletions src/test/kotlin/com/rogervinas/stream/MyApplicationShould.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package com.rogervinas.stream

import com.nhaarman.mockito_kotlin.*
import com.nhaarman.mockito_kotlin.any
import com.nhaarman.mockito_kotlin.argumentCaptor
import com.nhaarman.mockito_kotlin.doThrow
import com.nhaarman.mockito_kotlin.timeout
import com.nhaarman.mockito_kotlin.verify
import com.rogervinas.stream.domain.MyEvent
import com.rogervinas.stream.domain.MyEventConsumer
import com.rogervinas.stream.domain.MyEventProducer
Expand All @@ -18,7 +22,7 @@ import org.springframework.boot.test.mock.mockito.MockBean
import org.springframework.core.env.Environment
import org.springframework.test.context.ActiveProfiles
import java.time.Duration
import java.util.*
import java.util.UUID
import java.util.function.Consumer

@SpringBootTest(webEnvironment = DEFINED_PORT)
Expand Down
80 changes: 42 additions & 38 deletions src/test/kotlin/com/rogervinas/stream/MyKafkaConsumerHelper.kt
Original file line number Diff line number Diff line change
@@ -1,45 +1,49 @@
package com.rogervinas.stream

import org.apache.kafka.clients.consumer.*
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.consumer.OffsetResetStrategy
import org.apache.kafka.common.serialization.StringDeserializer
import java.time.Duration
import java.util.*
import kotlin.collections.ArrayList
import java.util.Properties
import java.util.UUID

class MyKafkaConsumerHelper(bootstrapServers: String, topic: String) {

private val consumer: Consumer<String, String>

companion object {
private const val MILLIS_POLL: Long = 250
}

init {
consumer = KafkaConsumer(consumerConfig(bootstrapServers))
consumer.subscribe(arrayListOf(topic))
}

fun consumeAll(): List<ConsumerRecord<String, String>> {
return consumeAtLeast(1, Duration.ofSeconds(1))
}

fun consumeAtLeast(numberOfRecords: Int, timeout: Duration): List<ConsumerRecord<String, String>> {
val records: MutableList<ConsumerRecord<String, String>> = ArrayList()
var millisLeft = timeout.toMillis()
do {
consumer.poll(Duration.ofMillis(MILLIS_POLL)).iterator().forEachRemaining { records.add(it) }
millisLeft -= MILLIS_POLL
} while (millisLeft > 0 && records.size < numberOfRecords)
return records
}

private fun consumerConfig(bootstrapServers: String): Properties {
val config = Properties()
config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString())
config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
config.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java.name)
config.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java.name)
config.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.EARLIEST.name.toLowerCase())
return config
}
}
private val consumer: Consumer<String, String>

companion object {
private const val MILLIS_POLL: Long = 250
}

init {
consumer = KafkaConsumer(consumerConfig(bootstrapServers))
consumer.subscribe(arrayListOf(topic))
}

fun consumeAll(): List<ConsumerRecord<String, String>> {
return consumeAtLeast(1, Duration.ofSeconds(1))
}

fun consumeAtLeast(numberOfRecords: Int, timeout: Duration): List<ConsumerRecord<String, String>> {
val records: MutableList<ConsumerRecord<String, String>> = ArrayList()
var millisLeft = timeout.toMillis()
do {
consumer.poll(Duration.ofMillis(MILLIS_POLL)).iterator().forEachRemaining { records.add(it) }
millisLeft -= MILLIS_POLL
} while (millisLeft > 0 && records.size < numberOfRecords)
return records
}

private fun consumerConfig(bootstrapServers: String): Properties {
val config = Properties()
config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString())
config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
config.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java.name)
config.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java.name)
config.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.EARLIEST.name.toLowerCase())
return config
}
}
28 changes: 14 additions & 14 deletions src/test/kotlin/com/rogervinas/stream/MyKafkaProducerHelper.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,19 @@ import java.util.*

class MyKafkaProducerHelper(bootstrapServers: String) {

private val producer: Producer<String, String>
private val producer: Producer<String, String>

init {
val config = Properties()
config[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name
config[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name
config[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
producer = KafkaProducer(config)
}
init {
val config = Properties()
config[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name
config[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name
config[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
producer = KafkaProducer(config)
}

@Throws(Exception::class)
fun send(topic: String?, body: String) {
producer.send(ProducerRecord(topic, body)).get()
producer.flush()
}
}
@Throws(Exception::class)
fun send(topic: String?, body: String) {
producer.send(ProducerRecord(topic, body)).get()
producer.flush()
}
}

0 comments on commit efadcc4

Please sign in to comment.