Skip to content

Commit

Permalink
use flow api source in calculating historical data, remove legacy osm…
Browse files Browse the repository at this point in the history
…osis only fetchers
  • Loading branch information
nullpointer0x00 committed Oct 3, 2024
1 parent a088cff commit 91f37f5
Show file tree
Hide file tree
Showing 10 changed files with 51 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ data class CmcHistoricalQuote(
val time_high: DateTime,
val time_low: DateTime,
val quote: Map<String, CmcQuote>,
val source: String
) {
fun toCsv(): MutableList<Any> =
this.quote[USD_UPPER]!!.let {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ data class HistoricalPrice(
val low: BigDecimal,
val close: BigDecimal,
val open: BigDecimal,
val volume: BigDecimal
val volume: BigDecimal,
val source: String
)

fun HistoricalPrice.toCsv(): List<String> = listOf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,8 @@ class TokenService(
) {
protected val logger = logger(TokenService::class)

private val historicalPriceFetchers: List<HistoricalPriceFetcher> by lazy {
historicalPriceFetcherFactory.createNhashFetchers()
}

private val deprecatedHistoricalPricingFetchers: List<HistoricalPriceFetcher> by lazy {
historicalPriceFetcherFactory.createOsmosisPriceFetcher()
protected val historicalPriceFetchers: List<HistoricalPriceFetcher> by lazy {
historicalPriceFetcherFactory.createNhashPricingFetchers()
}

fun getTokenDistributionStats() = transaction { TokenDistributionAmountsRecord.getStats() }
Expand Down Expand Up @@ -241,21 +237,14 @@ class TokenService(
VANILLA_MAPPER.readValue<CmcLatestDataAbbrev>(it)
}

fun fetchHistoricalPriceData(fromDate: DateTime?): List<HistoricalPrice> = runBlocking {
protected fun fetchHistoricalPriceData(fromDate: DateTime?): List<HistoricalPrice> = runBlocking {
val allPrices = historicalPriceFetchers.flatMap { fetcher ->
fetcher.fetchHistoricalPrice(fromDate)
}
return@runBlocking allPrices
}

fun fetchLegacyHistoricalPriceData(fromDate: DateTime?): List<HistoricalPrice> = runBlocking {
val allPrices = deprecatedHistoricalPricingFetchers.flatMap { fetcher ->
fetcher.fetchHistoricalPrice(fromDate)
}
return@runBlocking allPrices
}

fun processHistoricalData(startDate: DateTime, today: DateTime, historicalPrices: List<HistoricalPrice>): List<CmcHistoricalQuote> {
protected fun processHistoricalData(startDate: DateTime, today: DateTime, historicalPrices: List<HistoricalPrice>): List<CmcHistoricalQuote> {
val baseMap = Interval(startDate, today)
.let { int -> generateSequence(int.start) { dt -> dt.plusDays(1) }.takeWhile { dt -> dt < int.end } }
.map { it to emptyList<HistoricalPrice>() }.toMap().toMutableMap()
Expand All @@ -275,7 +264,6 @@ class TokenService(
val close = v.maxByOrNull { DateTime(it.time * 1000) }?.close ?: prevPrice
val closeDate = k.plusDays(1).minusMillis(1)
val usdVolume = v.sumOf { it.volume.toThirdDecimal() }.stripTrailingZeros()

CmcHistoricalQuote(
time_open = k,
time_close = closeDate,
Expand All @@ -299,11 +287,28 @@ class TokenService(
}
}

fun saveHistoricalData(record: CmcHistoricalQuote) {
TokenHistoricalDailyRecord.save(record.time_open.startOfDay(), record, "osmosis")
fun updateAndSaveTokenHistoricalData(startDate: DateTime, endDate: DateTime) {
val historicalPrices = fetchHistoricalPriceData(startDate) ?: return
val processedData = processHistoricalData(startDate, endDate, historicalPrices)
processedData.forEach { record ->
val source = historicalPriceFetchers.joinToString(separator = ",") { it.getSource() }
TokenHistoricalDailyRecord.save(record.time_open.startOfDay(), record, source)
}
}


fun updateAndSaveLatestTokenData(startDate: DateTime, today: DateTime) {
val list = fetchHistoricalPriceData(startDate)?.sortedBy { it.time }

list?.let {
val latestData = processLatestTokenData(it, today)
latestData?.let { data ->
cacheLatestTokenData(data)
}
}
}

fun processLatestTokenData(list: List<HistoricalPrice>, today: DateTime): CmcLatestDataAbbrev? {
protected fun processLatestTokenData(list: List<HistoricalPrice>, today: DateTime): CmcLatestDataAbbrev? {
val prevRecord = list.firstOrNull() ?: return null
val price = list.last().close.toThirdDecimal()
val percentChg = price.percentChange(prevRecord.close.toThirdDecimal())
Expand All @@ -316,7 +321,7 @@ class TokenService(
)
}

fun cacheLatestTokenData(data: CmcLatestDataAbbrev) {
protected fun cacheLatestTokenData(data: CmcLatestDataAbbrev) {
CacheUpdateRecord.updateCacheByKey(
CacheKeys.UTILITY_TOKEN_LATEST.key,
VANILLA_MAPPER.writeValueAsString(data)
Expand All @@ -328,7 +333,7 @@ class TokenService(
val baseFileName = filters.getFileNameBase()

val fileList = runBlocking {
val data = fetchLegacyHistoricalPriceData(filters.fromDate)
val data = fetchHistoricalPriceData(filters.fromDate)
listOf(
CsvData(
"TokenHistoricalData",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,35 +297,19 @@ class ScheduledTaskService(
@Scheduled(cron = "0 0 1 * * ?") // Every day at 1 am
fun updateTokenHistorical() {
val today = DateTime.now().startOfDay()
var startDate = today.minusMonths(1)
val latest = TokenHistoricalDailyRecord.getLatestDateEntry()
val defaultStartDate = today.minusMonths(1)

if (latest != null) {
startDate = latest.timestamp.minusDays(1).startOfDay()
}

val historicalPrices = tokenService.fetchLegacyHistoricalPriceData(startDate) ?: return
logger.info("Updating token historical data starting from $startDate with ${historicalPrices.size} buy records for roll-up.")
val latestEntryDate = TokenHistoricalDailyRecord.getLatestDateEntry()?.timestamp?.startOfDay()
val startDate = latestEntryDate?.minusDays(1) ?: defaultStartDate

val processedData = tokenService.processHistoricalData(startDate, today, historicalPrices)

processedData.forEach { record ->
TokenHistoricalDailyRecord.save(record.time_open.startOfDay(), record, "osmosis")
}
tokenService.updateAndSaveTokenHistoricalData(startDate, today)
}

@Scheduled(cron = "0 0/5 * * * ?") // Every 5 minutes
fun updateTokenLatest() {
val today = DateTime.now().withZone(DateTimeZone.UTC)
val startDate = today.minusDays(1)
val list = tokenService.fetchHistoricalPriceData(startDate)?.sortedBy { it.time }

list?.let {
val latestData = tokenService.processLatestTokenData(it, today)
latestData?.let { data ->
tokenService.cacheLatestTokenData(data)
}
}
tokenService.updateAndSaveLatestTokenData(startDate, today)
}

// Remove once the ranges have been updated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,6 @@ class FetcherConfiguration {
fun historicalPriceFetcherFactory(flowApiGrpcClient: FlowApiGrpcClient): HistoricalPriceFetcherFactory {
return HistoricalPriceFetcherFactory(flowApiGrpcClient)
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ class FlowApiPriceFetcher(
private val flowApiGrpcClient: FlowApiGrpcClient
) : HistoricalPriceFetcher {

override fun getSource(): String {
return "flow-api"
}
override fun fetchHistoricalPrice(fromDate: DateTime?): List<HistoricalPrice> {
val onChainNavEvents = getMarkerNavByPriceDenoms(fromDate, 17800)
return onChainNavEvents.map { navEvent ->
Expand All @@ -25,7 +28,8 @@ class FlowApiPriceFetcher(
low = pricePerHash,
close = pricePerHash,
open = pricePerHash,
volume = pricePerHash.multiply(volumeHash)
volume = pricePerHash.multiply(volumeHash),
source = getSource()
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,7 @@ import io.provenance.explorer.domain.models.HistoricalPrice
import org.joda.time.DateTime

interface HistoricalPriceFetcher {

fun getSource(): String
fun fetchHistoricalPrice(fromDate: DateTime?): List<HistoricalPrice>
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,10 @@ import io.provenance.explorer.grpc.flow.FlowApiGrpcClient
class HistoricalPriceFetcherFactory(
private val flowApiGrpcClient: FlowApiGrpcClient
) {
fun createNhashFetchers(): List<HistoricalPriceFetcher> {
fun createNhashPricingFetchers(): List<HistoricalPriceFetcher> {
return listOf(
OsmosisPriceFetcher(),
FlowApiPriceFetcher(UTILITY_TOKEN, listOf("uusd.trading", "uusdc.figure.se", "uusdt.figure.se"), flowApiGrpcClient)
)
}

fun createOsmosisPriceFetcher(): List<HistoricalPriceFetcher> {
return listOf(
OsmosisPriceFetcher()
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ import java.net.URLEncoder
class OsmosisPriceFetcher : HistoricalPriceFetcher {

val logger = logger(OsmosisPriceFetcher::class)

override fun getSource(): String {
return "osmosis"
}

override fun fetchHistoricalPrice(fromDate: DateTime?): List<HistoricalPrice> {
val osmosisHistoricalPrices = fetchOsmosisData(fromDate)
return osmosisHistoricalPrices.map { osmosisPrice ->
Expand All @@ -30,7 +35,8 @@ class OsmosisPriceFetcher : HistoricalPriceFetcher {
low = osmosisPrice.low,
close = osmosisPrice.close,
open = osmosisPrice.open,
volume = osmosisPrice.volume
volume = osmosisPrice.volume,
source = getSource()
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class HistoricalPriceFetcherFactoryTest {

@Test
fun `test createNhashFetchers`() {
val fetchers = factory.createNhashFetchers()
val fetchers = factory.createNhashPricingFetchers()
assertEquals(2, fetchers.size)
assertTrue(fetchers[0] is OsmosisPriceFetcher)
assertTrue(fetchers[1] is FlowApiPriceFetcher)
Expand Down

0 comments on commit 91f37f5

Please sign in to comment.