Skip to content
This repository was archived by the owner on Jun 29, 2019. It is now read-only.

exchange rate #614

Merged
merged 3 commits into from
Apr 8, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions relayer/src/main/resources/actors.conf
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,9 @@ external_crawler {
sina = {
uri = "https://hq.sinajs.cn/rn=1list=%s"
}
exchangerate = {
uri = "https://api.exchangerate-api.com/v4/latest"
}
currencies = {
fiat = [CNY, JPY, EUR, GBP, KRW, HKD, MOP, CAD, AUD, SGD, PHP, THB, RUB, IDR, INR, USD]
token = [BTC, ETH]
Expand Down
6 changes: 0 additions & 6 deletions relayer/src/main/scala/io/lightcone/relayer/CoreModule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,8 @@ import io.lightcone.ethereum.extractor.tx.{
import io.lightcone.ethereum.persistence._
import io.lightcone.relayer.data._
import io.lightcone.relayer.actors._
import io.lightcone.relayer.external._
import io.lightcone.relayer.splitmerge._
import io.lightcone.relayer.socketio._

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
import slick.basic.DatabaseConfig
Expand Down Expand Up @@ -142,10 +140,6 @@ class CoreModule(
bind[EIP712Support].to[DefaultEIP712Support]

bind[SplitMergerProvider].to[DefaultSplitMergerProvider].in[Singleton]
bind[ExternalTickerFetcher].to[CMCExternalTickerFetcher].in[Singleton]
bind[FiatExchangeRateFetcher]
.to[SinaFiatExchangeRateFetcher]
.in[Singleton]

// --- bind primative types ---------------------
bind[Timeout].toInstance(Timeout(2.second))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.lightcone.relayer.actors

import akka.actor._
import akka.stream.ActorMaterializer
import akka.util.Timeout
import com.typesafe.config.Config
import io.lightcone.core._
Expand All @@ -25,6 +26,7 @@ import io.lightcone.persistence._
import io.lightcone.relayer.base._
import io.lightcone.relayer.data._
import io.lightcone.relayer.external._

import scala.concurrent.{ExecutionContext, Future}
import io.lightcone.relayer.jsonrpc._

Expand All @@ -42,8 +44,7 @@ object ExternalCrawlerActor extends DeployedAsSingleton {
timeout: Timeout,
dbModule: DatabaseModule,
actors: Lookup[ActorRef],
externalTickerFetcher: ExternalTickerFetcher,
fiatExchangeRateFetcher: FiatExchangeRateFetcher,
materializer: ActorMaterializer,
deployActorsIgnoringRoles: Boolean
): ActorRef = {
startSingleton(Props(new ExternalCrawlerActor()))
Expand All @@ -58,9 +59,8 @@ class ExternalCrawlerActor(
val timeProvider: TimeProvider,
val timeout: Timeout,
val actors: Lookup[ActorRef],
val materializer: ActorMaterializer,
val dbModule: DatabaseModule,
val externalTickerFetcher: ExternalTickerFetcher,
val fiatExchangeRateFetcher: FiatExchangeRateFetcher,
val system: ActorSystem)
extends InitializationRetryActor
with JsonSupport
Expand All @@ -69,6 +69,10 @@ class ExternalCrawlerActor(

@inline def metadataManagerActor = actors.get(MetadataManagerActor.name)

val cmcExternalTickerFetcher = new CMCExternalTickerFetcher()
val sinaFiatExchangeRateFetcher = new SinaFiatExchangeRateFetcher()
val exchangeRateAPIFetcher = new ExchangeRateAPIFetcher()

val selfConfig = config.getConfig(ExternalCrawlerActor.name)
val refreshIntervalInSeconds = selfConfig.getInt("refresh-interval-seconds")
val initialDelayInSeconds = selfConfig.getInt("initial-delay-in-seconds")
Expand All @@ -89,8 +93,8 @@ class ExternalCrawlerActor(
private def syncTickers() = this.synchronized {
log.info("ExternalCrawlerActor run sync job")
for {
tokenTickers_ <- externalTickerFetcher.fetchExternalTickers()
currencyTickers <- fiatExchangeRateFetcher.fetchExchangeRates()
tokenTickers_ <- cmcExternalTickerFetcher.fetchExternalTickers()
currencyTickers <- syncCurrencyTicker()
persistTickers <- if (tokenTickers_.nonEmpty && currencyTickers.nonEmpty) {
persistTickers(
currencyTickers,
Expand All @@ -109,6 +113,12 @@ class ExternalCrawlerActor(
}
}

private def syncCurrencyTicker() = {
sinaFiatExchangeRateFetcher.fetchExchangeRates() recoverWith {
case e: Exception => exchangeRateAPIFetcher.fetchExchangeRates()
}
}

private def persistTickers(
currencyTickersInUsd: Seq[TokenTickerRecord],
tokenTickersInUsd: Seq[TokenTickerRecord]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Copyright 2018 Loopring Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.lightcone.relayer.external

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.stream.ActorMaterializer
import com.google.inject.Inject
import com.typesafe.config.Config
import io.lightcone.core.{ErrorCode, ErrorException}
import io.lightcone.persistence.TokenTickerRecord
import io.lightcone.relayer.actors.ExternalCrawlerActor
import org.slf4s.Logging
import scala.concurrent.ExecutionContext
import org.json4s._
import org.json4s.native.JsonMethods._

class ExchangeRateAPIFetcher @Inject()(
implicit
val config: Config,
val system: ActorSystem,
val ec: ExecutionContext,
val materializer: ActorMaterializer)
extends FiatExchangeRateFetcher
with Logging {

private val currencyConfig = config.getConfig(ExternalCrawlerActor.name)
private val baseCurrency = currencyConfig.getString("base_currency")
private val uri =
s"${currencyConfig.getString("exchangerate.uri")}/${baseCurrency}"

implicit val formats = DefaultFormats

def fetchExchangeRates() =
for {
response <- Http().singleRequest(
HttpRequest(
method = HttpMethods.GET,
uri = Uri(uri)
)
)
res <- response match {
case HttpResponse(StatusCodes.OK, _, entity, _) =>
entity.dataBytes
.map(_.utf8String)
.runReduce(_ + _)
.map { j =>
/*{
"base":"USD",
"date":"2019-04-03",
"time_last_updated":1554286322,
"rates":{
"USD":1,
"AUD":1.41153166,
"CAD":1.33282692,
"CHF":0.99825594,
"CNY":6.7214362,
"EUR":0.89201931,
"GBP":0.76479757,
"HKD":7.84970753,
"ILS":3.61823014,
"INR":68.87786555,
"JPY":111.38021494,
"KRW":1137.40524964,
"MXN":19.13792238,
"MYR":4.0837263,
"NOK":8.59678897,
"NZD":1.477991,
"RUB":65.42496619,
"SEK":9.31267228,
"SGD":1.35588255,
"THB":31.76957704,
"TRY":5.60054616,
"ZAR":14.16934653
}
}
*/
val json = parse(j)
val rateMap =
(json \ "rates").values.asInstanceOf[Map[String, Double]]
rateMap.map { i =>
val price = if (i._1.toUpperCase == baseCurrency.toUpperCase) {
1.0
} else {
scala.util
.Try(
(BigDecimal(1) / BigDecimal(i._2))
.setScale(8, BigDecimal.RoundingMode.HALF_UP)
.toDouble
)
.toOption
.getOrElse(0.0)
}
new TokenTickerRecord(
symbol = i._1,
price = price,
dataSource = "ExchangeRate"
)
}.toSeq
}

case m =>
log.error(s"get currency rate data from exchangerate-api failed:$m")
throw ErrorException(
ErrorCode.ERR_INTERNAL_UNKNOWN,
"Failed request exchangerate-api"
)
}
} yield res
}
5 changes: 4 additions & 1 deletion relayer/src/test/resources/actors.conf
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,11 @@ external_crawler {
sina = {
uri = "https://hq.sinajs.cn/rn=1list=%s"
}
exchangerate = {
uri = "https://api.exchangerate-api.com/v4/latest"
}
currencies = {
fiat = [CNY, JPY, EUR, GBP, KRW, HKD, MOP, CAD, AUD, SGD, PHP, THB, RUB, IDR, INR, USD]
fiat = [CNY, JPY, EUR, GBP, KRW, HKD, CAD, AUD, SGD, THB, RUB, INR, USD]
token = [BTC, ETH]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,15 @@ class CMCCrawlerSpec
5.second
)
r.nonEmpty should be(true)
val map = r.map(t => t.symbol -> t.price).toMap
}

// "exchangerate-api" in {
// val f = for {
// r <- syncCurrencyTicker()
// } yield r
// val q = Await.result(f.mapTo[Seq[TokenTickerRecord]], timeout.duration)
// }

"request cmc tickers in USD and persist (CMCCrawlerActor)" in {
val f = for {
tokenSymbolSlugs_ <- dbModule.cmcCrawlerConfigForTokenDal.getConfigs()
Expand Down Expand Up @@ -283,4 +289,21 @@ class CMCCrawlerSpec
}
}

private def mockSinaCurrencyError(): Future[Seq[TokenTickerRecord]] = {
if (true) {
Future {
throw ErrorException(ErrorCode.ERR_INTERNAL_UNKNOWN, "mock error")
}
} else {
Future.successful(Seq.empty)
}
}

private def syncCurrencyTicker() = {
mockSinaCurrencyError() recoverWith {
case e: Exception => {
exchangeRateAPIFetcher.fetchExchangeRates()
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.rnorth.ducttape.unreliables.Unreliables
import org.testcontainers.containers.ContainerLaunchException
import akka.pattern._
import io.lightcone.core._
import io.lightcone.persistence.{CMCCrawlerConfigForToken, TokenTickerRecord}
import io.lightcone.persistence._
import io.lightcone.relayer.external._
import scalapb.json4s.Parser
import scala.concurrent.duration._
Expand All @@ -35,6 +35,7 @@ trait MetadataManagerSupport extends DatabaseModuleSupport {

implicit val externalTickerFetcher = new CMCExternalTickerFetcher()
implicit val fiatExchangeRateFetcher = new SinaFiatExchangeRateFetcher()
implicit val exchangeRateAPIFetcher = new ExchangeRateAPIFetcher()

val parser = new Parser(preservingProtoFieldNames = true) //protobuf 序列化为json不使用驼峰命名
var tickers: Seq[TokenTickerRecord] = Seq.empty[TokenTickerRecord]
Expand Down