Skip to content

Commit

Permalink
More Tiingo tests and fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
jbaron committed Jan 18, 2024
1 parent 059be8f commit d7665d4
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ private class Handler(private val feed: TiingoLiveFeed) : WebSocketListener() {
private fun handleIEX(data: JsonArray) {
val type = data[0].asString
val symbol = data[3].asString.uppercase()
val asset = Config.getOrPutAsset(symbol) { Asset.forexPair(symbol) }
val asset = Config.getOrPutAsset(symbol) { Asset(symbol) }
val time = Instant.ofEpochMilli(0).plusNanos(data[2].asLong)

if (type == "Q") {
Expand Down Expand Up @@ -120,16 +120,16 @@ class TiingoLiveFeed private constructor(

companion object {

fun iex(configure: TiingoConfig.() -> Unit = {}): TiingoLiveFeed {
return TiingoLiveFeed("iex", 5, configure)
fun iex(thresholdLevel: Int = 5, configure: TiingoConfig.() -> Unit = {}): TiingoLiveFeed {
return TiingoLiveFeed("iex", thresholdLevel, configure)
}

fun crypto(configure: TiingoConfig.() -> Unit = {}): TiingoLiveFeed {
return TiingoLiveFeed("crypto", 2, configure)
fun crypto(thresholdLevel: Int = 2, configure: TiingoConfig.() -> Unit = {}): TiingoLiveFeed {
return TiingoLiveFeed("crypto", thresholdLevel, configure)
}

fun fx(configure: TiingoConfig.() -> Unit = {}): TiingoLiveFeed {
return TiingoLiveFeed("fx", 5, configure)
fun fx(thresholdLevel: Int = 5, configure: TiingoConfig.() -> Unit = {}): TiingoLiveFeed {
return TiingoLiveFeed("fx", thresholdLevel, configure)
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@

package org.roboquant.samples

import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.roboquant.Roboquant
import org.roboquant.common.*
import org.roboquant.feeds.AggregatorLiveFeed
import org.roboquant.feeds.PriceAction
import org.roboquant.feeds.apply
import org.roboquant.feeds.*
import org.roboquant.loggers.ConsoleLogger
import org.roboquant.metrics.ProgressMetric
import org.roboquant.strategies.EMAStrategy
Expand Down Expand Up @@ -94,6 +95,51 @@ internal class TiingoSamples {
println("average delay is ${sum/n}ms")
}


private inline fun <reified T : Action> Feed.apply2(
timeframe: Timeframe = Timeframe.INFINITE,
crossinline block: (T, Instant) -> Unit
) = runBlocking {

val channel = EventChannel(10_000, timeframe = timeframe)

val job = launch {
play(channel)
channel.close()
}

try {
while (true) {
val o = channel.receive()
o.actions.filterIsInstance<T>().forEach { block(it, o.time) }
}

} catch (_: ClosedReceiveChannelException) {
// Intentionally left empty
} finally {
channel.close()
if (job.isActive) job.cancel()
}

}


@Test
@Ignore
internal fun testLiveFeedMeasureKeepUp() {
val feed = TiingoLiveFeed.iex(0)
feed.subscribe() // subscribe to all iex stocks
var n = 0
var sum = 0L
feed.apply2<PriceAction>(Timeframe.next(1.minutes)) { _, time ->
val now = Instant.now()
sum += now.toEpochMilli() - time.toEpochMilli()
n++
}
feed.close()
println("average delay is ${sum/n}ms events=$n")
}

@Test
@Ignore
internal fun historic() {
Expand Down

0 comments on commit d7665d4

Please sign in to comment.