Provides common functionality that enhance usability of standard jvm.
Named executors helps to monitor threads state, tasks latency and throughput.
All pools can be dynamically reconfigured
@PropertyId("dao.thread.pool.size")
lateinit var daoPoolSize: DynamicProperty<Integer>
lateinit var profiler: Profiler
...
val executor = NamedExecutors.newDynamicPool(
"dao-pool",
daoPoolSize,
profiler)
Scheduler is based on ScheduledThreadPoolExecutor
but it can change it's rate dynamically.
@PropertyId("work.rate")
lateinit var rate: DynamicProperty<Long>
@PropertyId("pool.size")
lateinit var poolSize: DynamicProperty<Int>
lateinit var profiler: Profiler
...
val scheduler = NamedExecutors.newScheduler(
"regular-work-pool",
poolSize,
profiler)
scheduler.schedule(
Schedule.withRate(rate),
0,//delay
Runnable {
//do work here
})
Common metrics that will work out of the box:
pool.<poolName>.queue
- size of pending tasks submitted to poolpool.<poolName>.activeThreads
- count of currently running threadspool.<poolName>.await
- how many ms spent task in pending state before pool took task for executionpool.<poolName>.run
- how many ms task executedpool.<poolName>.poolSize
- current size of the pool
Special case is Common Fork Join Pool that uses different set of metrics:
lateinit var profiler: Profiler
...
//Enable Common Fork Join Pool profiling
NamedExecutors.profileCommonPool(profiler)
pool.commonPool.poolSize
- current size of the poolpool.commonPool.activeThread
- count of threads in the poolpool.commonPool.runningThread
- count of currently active not blocking threadspool.commonPool.queue
- size of pending tasks submitted to poolpool.commonPool.steal
- count of stolen tasks
CommonThreadPoolGuard, ThreadPoolGuard allows you to watch for queue size of the thread pool. If it outgrows threshold guard will invoke user handler and print stack trace of all threads.
val guard = CommonThreadPoolGuard(
profiler,
checkRate,
threshold) { queueSize, dump ->
log.error("Queue size $queueSize is too big. Current threads state: $dump")
}
Provides RateLimiter and RateLimitedDispatcher.
RateLimiter
interface is defined here as abstraction for RateLimitedDispatcher
.
You can wrap whatever rate limiter implementation you want into it.
Out of the box there is one implementation - ConfigurableRateLimiter
. Under the hood it uses AtomicRateLimiter
of resilience4j-ratelimiter preconfigured it to acquire limits smoothly.
AtomicRateLimiter does not distribute events inside refresh period, it releases all available permits immediately on
interval start if there is a demand for them. To distribute event in our interval of 1 second ConfigurableRateLimiter
divides it into chunks so that 1 chunk of time is limited to 1 permit.The drawback is that actual rate will be lower
than configured - depending on requests' distribution.
Enables async usage of rate limiter. Submitted tasks are executed in the order of submission.
Provides following metrics:
RateLimiterDispatcher.<dispatcherName>.queue_size
– incoming tasks queue sizeRateLimiterDispatcher.<dispatcherName>.queue_wait
– task's wait time in the queue before executionRateLimiterDispatcher.<dispatcherName>.acquire_limit
- time to acquire limitRateLimiterDispatcher.<dispatcherName>.supplied_operation
- supplied task execution duration
Provides SocketChecker.
Allows to check port availability by its port number. Allows to get random free port number in range from 30000 to 60000.
Hot to run jmh tests for id generator:
cd jfix-stdlib-id-generator-jmh
gradle --stop
gradle --no-daemon clean build jmh
Encapsulates batching execution of several client's operations as one task.
Follow example of batch sending of http requests to different hosts:
import ru.fix.aggregating.profiler.AggregatingProfiler
import ru.fix.stdlib.batching.BatchTask
import ru.fix.stdlib.batching.BatchingManager
import ru.fix.stdlib.batching.BatchingParameters
import java.net.URI
import java.net.http.HttpClient
import java.net.http.HttpRequest
import java.net.http.HttpResponse
import java.util.concurrent.CompletableFuture
import java.util.concurrent.Executors
import kotlin.streams.asSequence
data class SimpleHttpBatchingConfig(
val httpClient: HttpClient
)
data class SimpleBatchedHttpPayload(
val batchItemContent: Int
) {
val resultFuture: CompletableFuture<String> = CompletableFuture()
}
data class SimpleHttpBatchKeyType(val httpUri: URI)
object SimpleHttpBatchTask : BatchTask<SimpleHttpBatchingConfig, SimpleBatchedHttpPayload, SimpleHttpBatchKeyType> {
override fun process(
config: SimpleHttpBatchingConfig,
batch: MutableList<SimpleBatchedHttpPayload>,
key: SimpleHttpBatchKeyType
) {
// merge content from several requests into one batched
val joinedContent =
batch.joinToString(separator = ", ", prefix = "[", postfix = "]") { it.batchItemContent.toString() }
// send batched http request
config.httpClient.sendAsync(
HttpRequest.newBuilder()
.uri(key.httpUri)
.POST(HttpRequest.BodyPublishers.ofString(joinedContent))
.build(),
HttpResponse.BodyHandlers.ofLines()
).thenApplyAsync {
// let client code know that some response arrived
for ((index, response) in it.body().asSequence().withIndex()) {
batch[index].resultFuture.complete(response)
}
}
// do not block and process to the next batch
// HOWEVER it is up to client to decide whether to block or not
}
}
val batchingManagerProfiler = AggregatingProfiler()
val batchingManager = BatchingManager<SimpleHttpBatchingConfig, SimpleBatchedHttpPayload, SimpleHttpBatchKeyType>(
SimpleHttpBatchingConfig(
httpClient = HttpClient.newBuilder()
.executor(Executors.newSingleThreadExecutor())
.build()
),
SimpleHttpBatchTask,
BatchingParameters().apply {
batchSize = 256
batchThreads = 2
},
"simple-batch-manager",
batchingManagerProfiler
)
// Usage in clients
val testHost1 = SimpleHttpBatchKeyType(httpUri = URI.create("http://test-host-1"))
val testHost2 = SimpleHttpBatchKeyType(httpUri = URI.create("http://test-host-2"))
val testHost1Requests = (1..3).map {
SimpleBatchedHttpPayload(batchItemContent = it)
}
val testHost2Requests = (2..7).map {
SimpleBatchedHttpPayload(batchItemContent = it)
}
for (testHost1Request in testHost1Requests) {
batchingManager.enqueue(testHost1, testHost1Request)
}
for (testHost2Request in testHost2Requests) {
batchingManager.enqueue(testHost2, testHost2Request)
}
(testHost1Requests + testHost2Requests).forEach { request ->
request.resultFuture.thenApplyAsync { response ->
println("""Request with content='${request.batchItemContent} - response is '$response''""")
}
}