Skip to content

Commit

Permalink
update historical service to pull from multiple tickers, update start…
Browse files Browse the repository at this point in the history
… time calculation
  • Loading branch information
nullpointer0x00 committed Oct 24, 2023
1 parent bce5a11 commit aa8de91
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ class ValidatorMarketRateStatsRecord(id: EntityID<Int>) : IntEntity(id) {
}

fun findByAddress(address: String, fromDate: DateTime?, toDate: DateTime?, count: Int) = transaction {
val query = ValidatorMarketRateStatsTable.select { ValidatorMarketRateStatsTable.operatorAddress eq address }
val query =
ValidatorMarketRateStatsTable.select { ValidatorMarketRateStatsTable.operatorAddress eq address }
if (fromDate != null) {
query.andWhere { ValidatorMarketRateStatsTable.date greaterEq fromDate }
}
Expand Down Expand Up @@ -214,7 +215,7 @@ class ChainAumHourlyRecord(id: EntityID<Int>) : IntEntity(id) {
fun getAumForPeriod(fromDate: DateTime, toDate: DateTime) = transaction {
ChainAumHourlyRecord.find {
(ChainAumHourlyTable.datetime greaterEq fromDate) and
(ChainAumHourlyTable.datetime lessEq toDate.plusDays(1))
(ChainAumHourlyTable.datetime lessEq toDate.plusDays(1))
}
.orderBy(Pair(ChainAumHourlyTable.datetime, SortOrder.ASC))
.toList()
Expand Down Expand Up @@ -272,6 +273,14 @@ class TokenHistoricalDailyRecord(id: EntityID<DateTime>) : Entity<DateTime>(id)
.orderBy(Pair(TokenHistoricalDailyTable.timestamp, SortOrder.DESC))
.firstOrNull()?.data?.quote?.get(USD_UPPER)?.close ?: BigDecimal.ZERO
}

fun getLatestDateEntry(): TokenHistoricalDailyRecord? = transaction {
return@transaction TokenHistoricalDailyRecord
.all()
.orderBy(Pair(TokenHistoricalDailyTable.timestamp, SortOrder.DESC))
.limit(1)
.firstOrNull()
}
}

var timestamp by TokenHistoricalDailyTable.timestamp
Expand All @@ -292,7 +301,7 @@ class ProcessQueueRecord(id: EntityID<Int>) : IntEntity(id) {
fun findByType(processType: ProcessQueueType) = transaction {
ProcessQueueRecord.find {
(ProcessQueueTable.processType eq processType.name) and
(ProcessQueueTable.processing eq false)
(ProcessQueueTable.processing eq false)
}.toList()
}

Expand All @@ -305,7 +314,7 @@ class ProcessQueueRecord(id: EntityID<Int>) : IntEntity(id) {
fun delete(processType: ProcessQueueType, value: String) = transaction {
ProcessQueueTable.deleteWhere {
(ProcessQueueTable.processType eq processType.name) and
(ProcessQueueTable.processValue eq value)
(ProcessQueueTable.processValue eq value)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,10 +223,10 @@ class TokenService(private val accountClient: AccountGrpcClient) {
}
}

fun getHistoricalFromDlob(startTime: DateTime): DlobHistBase? = runBlocking {
fun getHistoricalFromDlob(startTime: DateTime, tickerId: String): DlobHistBase? = runBlocking {
try {
KTOR_CLIENT_JAVA.get("https://www.dlob.io:443/gecko/external/api/v1/exchange/historical_trades") {
parameter("ticker_id", "HASH_USD")
KTOR_CLIENT_JAVA.get("https://test.dlob.io:443/gecko/external/api/v1/exchange/historical_trades") {
parameter("ticker_id", tickerId)
parameter("type", "buy")
parameter("start_time", DateTimeFormat.forPattern("dd-MM-yyyy").print(startTime))
accept(ContentType.Application.Json)
Expand All @@ -240,53 +240,14 @@ class TokenService(private val accountClient: AccountGrpcClient) {
}
}

suspend fun getHistoricalFromDlobNew(startTime: DateTime): DlobHistBase? {
val dlobContractUrls = listOf(
"https://www.dlob.io/gecko/external/api/v1/order-books/pb1w6ul64t5fjcg65mmscec758dgyml6xmmw5fy2vyxxc9dhq3tmhusyzcj3r/aggregate?unit=YEAR",
"https://www.dlob.io/gecko/external/api/v1/order-books/pb18vd8fpwxzck93qlwghaj6arh4p7c5n894vnu5g/aggregate?unit=YEAR"
)
var dlobHistorBase: DlobHistBase? = null
for (url in dlobContractUrls) {
try {
val data = fetchDataFromUrl(url, startTime)
if (data != null) {
if (dlobHistorBase == null) {
dlobHistorBase = data
}else {
val combinedBuy = dlobHistorBase.buy + data.buy
dlobHistorBase = dlobHistorBase.copy(buy = combinedBuy)
}
}
} catch (e: ResponseException) {
logger.error("Error fetching from Dlob: ${e.response}")
} catch (e: Exception) {
logger.error("Error fetching from Dlob: ${e.message}")
} catch (e: Throwable) {
logger.error("Error fetching from Dlob: ${e.message}")
}
}
fun getHistoricalFromDlob(startTime: DateTime): DlobHistBase {
val tickerIds = listOf("HASH_USD", "HASH_USDOMNI")

return dlobHistorBase
}
val dlobHistorical = tickerIds
.mapNotNull { getHistoricalFromDlob(startTime, it)?.buy }
.flatten()

private suspend fun fetchDataFromUrl(url: String, startTime: DateTime): DlobHistBase? {
return try {
KTOR_CLIENT_JAVA.get(url) {
parameter("ticker_id", "HASH_USD")
parameter("type", "buy")
parameter("start_time", DateTimeFormat.forPattern("dd-MM-yyyy").print(startTime))
accept(ContentType.Application.Json)
}.body()
} catch (e: ResponseException) {
logger.error("Error fetching from Dlob: ${e.response}")
null
} catch (e: Exception) {
logger.error("Error fetching from Dlob: ${e.message}")
null
} catch (e: Throwable) {
logger.error("Error fetching from Dlob: ${e.message}")
null
}
return DlobHistBase(dlobHistorical)
}

fun getTokenHistorical(fromDate: DateTime?, toDate: DateTime?) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class AsyncService(
protected val logger = logger(AsyncService::class)
protected var collectHistorical = true

@Scheduled(initialDelay = 0L, fixedDelay = 5000L)
// @Scheduled(initialDelay = 0L, fixedDelay = 5000L)
fun updateLatestBlockHeightJob() {
val index = getBlockIndex()
val startHeight = blockService.getLatestBlockHeight()
Expand Down Expand Up @@ -277,10 +277,15 @@ class AsyncService(
@Scheduled(cron = "0 0 0/1 * * ?") // Every hour
fun saveChainAum() = explorerService.saveChainAum()

@Scheduled(cron = "0 0 1 * * ?") // Every day at 1 am
@Scheduled(cron = "0 0 1 * * *") // Every day at 1 am
fun updateTokenHistorical() {
val today = DateTime.now().startOfDay()
val startDate = today.minusMonths(1)
var startDate = today.minusMonths(1)
var latest = TokenHistoricalDailyRecord.getLatestDateEntry()
if (latest != null) {
startDate = latest.timestamp.minusDays(1)
}

val dlobRes = tokenService.getHistoricalFromDlob(startDate) ?: return
val baseMap = Interval(startDate, today)
.let { int -> generateSequence(int.start) { dt -> dt.plusDays(1) }.takeWhile { dt -> dt < int.end } }
Expand All @@ -306,22 +311,26 @@ class AsyncService(
time_low = if (low != null) DateTime(low.trade_timestamp * 1000) else k,
quote = mapOf(
USD_UPPER to
CmcQuote(
open = open,
high = high?.price ?: prevPrice,
low = low?.price ?: prevPrice,
close = close,
volume = usdVolume,
market_cap = close.multiply(tokenService.totalSupply().divide(UTILITY_TOKEN_BASE_MULTIPLIER)),
timestamp = closeDate
)
CmcQuote(
open = open,
high = high?.price ?: prevPrice,
low = low?.price ?: prevPrice,
close = close,
volume = usdVolume,
market_cap = close.multiply(
tokenService.totalSupply().divide(UTILITY_TOKEN_BASE_MULTIPLIER)
),
timestamp = closeDate
)
)
).also { prevPrice = close }
TokenHistoricalDailyRecord.save(record.time_open.startOfDay(), record)
}
}

@Scheduled(cron = "0 0/5 * * * ?") // Every 5 minutes
// @Scheduled(cron = "0 0/5 * * * ?") // Every 5 minutes
@Scheduled(initialDelay = 0L, fixedDelay = 5000L)

fun updateTokenLatest() {
val today = DateTime.now().withZone(DateTimeZone.UTC)
val startDate = today.minusDays(7)
Expand All @@ -346,7 +355,10 @@ class AsyncService(
today,
mapOf(USD_UPPER to CmcLatestQuoteAbbrev(price, percentChg, vol24Hr, marketCap, today))
)
CacheUpdateRecord.updateCacheByKey(CacheKeys.UTILITY_TOKEN_LATEST.key, VANILLA_MAPPER.writeValueAsString(rec))
CacheUpdateRecord.updateCacheByKey(
CacheKeys.UTILITY_TOKEN_LATEST.key,
VANILLA_MAPPER.writeValueAsString(rec)
)
}
}

Expand Down Expand Up @@ -443,7 +455,8 @@ class AsyncService(
try {
transaction { it.apply { this.processing = true } }
send(it.processValue)
} catch (_: Exception) { }
} catch (_: Exception) {
}
}
}
}
Expand Down

0 comments on commit aa8de91

Please sign in to comment.