diff --git a/LICENSE b/LICENSE index dd0e8235ce4..547b7d2e140 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ The MIT License (MIT) -Copyright 2015-2021 Knowm Inc. (http://knowm.org) and contributors. +Copyright 2015-2023 Knowm Inc. (http://knowm.org) and contributors. Copyright 2012-2015 Xeiam LLC (http://xeiam.com) and contributors. Permission is hereby granted, free of charge, to any person obtaining a copy @@ -19,4 +19,4 @@ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -THE SOFTWARE. \ No newline at end of file +THE SOFTWARE. diff --git a/pom.xml b/pom.xml index 18c55b9b4e8..72dfce6f48a 100644 --- a/pom.xml +++ b/pom.xml @@ -201,6 +201,7 @@ xchange-stream-hitbtc xchange-stream-huobi xchange-stream-kraken + xchange-stream-kucoin xchange-stream-lgo xchange-stream-okcoin xchange-stream-okex diff --git a/xchange-binance/src/main/java/org/knowm/xchange/binance/BinanceExchange.java b/xchange-binance/src/main/java/org/knowm/xchange/binance/BinanceExchange.java index 29d7e4b2e1e..2e2364b9615 100644 --- a/xchange-binance/src/main/java/org/knowm/xchange/binance/BinanceExchange.java +++ b/xchange-binance/src/main/java/org/knowm/xchange/binance/BinanceExchange.java @@ -16,6 +16,7 @@ public class BinanceExchange extends BaseExchange implements Exchange { public static final String SPECIFIC_PARAM_USE_SANDBOX = "Use_Sandbox"; public static final String SPECIFIC_PARAM_USE_FUTURES_SANDBOX = "Use_Sandbox_Futures"; + public static final String SPECIFIC_PARAM_FUTURES_ENABLED = "Futures_Enabled"; private static final String SPOT_URL = "https://api.binance.com"; public static final String FUTURES_URL = "https://fapi.binance.com"; @@ -77,6 +78,11 @@ public boolean isFuturesSandbox(){ exchangeSpecification.getExchangeSpecificParametersItem(SPECIFIC_PARAM_USE_FUTURES_SANDBOX)); } + public boolean isFuturesEnabled(){ + return Boolean.TRUE.equals( + exchangeSpecification.getExchangeSpecificParametersItem(SPECIFIC_PARAM_FUTURES_ENABLED)); + } + public boolean usingSandbox() { return enabledSandbox(exchangeSpecification); } @@ -101,7 +107,9 @@ public void remoteInit() { } } else { exchangeMetaData = BinanceAdapters.adaptExchangeMetaData(marketDataService.getExchangeInfo(), assetDetailMap); - BinanceAdapters.adaptFutureExchangeMetaData(exchangeMetaData, marketDataService.getFutureExchangeInfo()); + if(isFuturesEnabled()){ + BinanceAdapters.adaptFutureExchangeMetaData(exchangeMetaData, marketDataService.getFutureExchangeInfo()); + } } } catch (Exception e) { diff --git a/xchange-binance/src/main/java/org/knowm/xchange/binance/dto/trade/TrailingFlag.java b/xchange-binance/src/main/java/org/knowm/xchange/binance/dto/trade/TrailingFlag.java new file mode 100644 index 00000000000..deeb43d52e3 --- /dev/null +++ b/xchange-binance/src/main/java/org/knowm/xchange/binance/dto/trade/TrailingFlag.java @@ -0,0 +1,43 @@ +package org.knowm.xchange.binance.dto.trade; + +import org.knowm.xchange.dto.Order.IOrderFlags; + +/** + * @see trailing-stop-faq + * @author mrmx + */ +public enum TrailingFlag implements IOrderFlags { + /** Trailing of 0.01% */ + P0_01(1), + /** Trailing of 0.1% */ + P0_1(10), + /** Trailing of 1% */ + P1(100), + /** Trailing of 10% */ + P10(1000); + /** Basis Points, also known as BIP or BIPS, are used to indicate a percentage change. */ + private final long trailingBip; + + private TrailingFlag(long trailingBip) { + this.trailingBip = trailingBip; + } + + public long getTrailingBip() { + return trailingBip; + } + + static TrailingFlag of(Number percent) { + switch (percent.toString()) { + case "0.01": + return P0_01; + case "0.1": + return P0_1; + case "1": + return P1; + case "10": + return P10; + } + throw new IllegalArgumentException("Invalid trailing " + percent); + } +} \ No newline at end of file diff --git a/xchange-binance/src/main/java/org/knowm/xchange/binance/service/BinanceAccountService.java b/xchange-binance/src/main/java/org/knowm/xchange/binance/service/BinanceAccountService.java index f73ee801f5f..6aab63f855e 100644 --- a/xchange-binance/src/main/java/org/knowm/xchange/binance/service/BinanceAccountService.java +++ b/xchange-binance/src/main/java/org/knowm/xchange/binance/service/BinanceAccountService.java @@ -104,11 +104,12 @@ public AccountInfo getAccountInfo() throws IOException { wallets.add(BinanceAdapters.adaptBinanceSpotWallet(account())); } } else { - BinanceFutureAccountInformation futureAccountInformation = futuresAccount(); + if(exchange.isFuturesEnabled()){ + BinanceFutureAccountInformation futureAccountInformation = futuresAccount(); + wallets.add(BinanceAdapters.adaptBinanceFutureWallet(futureAccountInformation)); + openPositions.addAll(BinanceAdapters.adaptOpenPositions(futureAccountInformation.getPositions())); + } wallets.add(BinanceAdapters.adaptBinanceSpotWallet(account())); - wallets.add(BinanceAdapters.adaptBinanceFutureWallet(futureAccountInformation)); - openPositions.addAll(BinanceAdapters.adaptOpenPositions(futureAccountInformation.getPositions())); - } return new AccountInfo( exchange.getExchangeSpecification().getUserName(), diff --git a/xchange-binance/src/main/java/org/knowm/xchange/binance/service/BinanceTradeService.java b/xchange-binance/src/main/java/org/knowm/xchange/binance/service/BinanceTradeService.java index d8cc20299f8..357b6d9aef4 100644 --- a/xchange-binance/src/main/java/org/knowm/xchange/binance/service/BinanceTradeService.java +++ b/xchange-binance/src/main/java/org/knowm/xchange/binance/service/BinanceTradeService.java @@ -7,7 +7,6 @@ import java.util.List; import java.util.Optional; import java.util.stream.Collectors; - import lombok.Value; import org.knowm.xchange.binance.BinanceAdapters; import org.knowm.xchange.binance.BinanceErrorAdapter; @@ -35,9 +34,7 @@ public class BinanceTradeService extends BinanceTradeServiceRaw implements TradeService { - public BinanceTradeService( - BinanceExchange exchange, - ResilienceRegistries resilienceRegistries) { + public BinanceTradeService(BinanceExchange exchange, ResilienceRegistries resilienceRegistries) { super(exchange, resilienceRegistries); } @@ -54,13 +51,14 @@ public OpenOrders getOpenOrders(CurrencyPair pair) throws IOException { public OpenOrders getOpenOrders(OpenOrdersParams params) throws IOException { try { Instrument pair = null; - if(params instanceof OpenOrdersParamInstrument){ + if (params instanceof OpenOrdersParamInstrument) { pair = ((OpenOrdersParamInstrument) params).getInstrument(); - } else if(params instanceof OpenOrdersParamCurrencyPair){ + } else if (params instanceof OpenOrdersParamCurrencyPair) { pair = ((OpenOrdersParamCurrencyPair) params).getCurrencyPair(); } - return BinanceAdapters.adaptOpenOrders(openOrdersAllProducts(pair), pair instanceof FuturesContract); + return BinanceAdapters.adaptOpenOrders( + openOrdersAllProducts(pair), pair instanceof FuturesContract); } catch (BinanceException e) { throw BinanceErrorAdapter.adapt(e); @@ -69,12 +67,12 @@ public OpenOrders getOpenOrders(OpenOrdersParams params) throws IOException { @Override public String placeMarketOrder(MarketOrder mo) throws IOException { - return placeOrderAllProducts(OrderType.MARKET, mo, null, null, null, null, null,null); + return placeOrderAllProducts(OrderType.MARKET, mo, null, null, null, null, null, null); } @Override public String placeLimitOrder(LimitOrder limitOrder) throws IOException { - TimeInForce tif = timeInForceFromOrder(limitOrder).orElse(TimeInForce.GTC); + TimeInForce tif = getOrderFlag(limitOrder, TimeInForce.class).orElse(TimeInForce.GTC); OrderType type; if (limitOrder.hasFlag(org.knowm.xchange.binance.dto.trade.BinanceOrderFlags.LIMIT_MAKER)) { type = OrderType.LIMIT_MAKER; @@ -82,7 +80,8 @@ public String placeLimitOrder(LimitOrder limitOrder) throws IOException { } else { type = OrderType.LIMIT; } - return placeOrderAllProducts(type, limitOrder, limitOrder.getLimitPrice(), null, null, null,null, tif); + return placeOrderAllProducts( + type, limitOrder, limitOrder.getLimitPrice(), null, null, null, null, tif); } @Override @@ -93,18 +92,26 @@ public String placeStopOrder(StopOrder order) throws IOException { // allow // it at some point. TimeInForce tif = - timeInForceFromOrder(order).orElse(order.getLimitPrice() != null ? TimeInForce.GTC : null); - + getOrderFlag(order, TimeInForce.class) + .orElse(order.getLimitPrice() != null ? TimeInForce.GTC : null); + Long trailingDelta = + getOrderFlag(order, TrailingFlag.class).map(TrailingFlag::getTrailingBip).orElse(null); OrderType orderType = BinanceAdapters.adaptOrderType(order); return placeOrderAllProducts( - orderType, order, order.getLimitPrice(), order.getStopPrice(), null, null, order.getTrailValue(), tif); + orderType, + order, + order.getLimitPrice(), + order.getStopPrice(), + null, + trailingDelta, + order.getTrailValue(), + tif); } - private Optional timeInForceFromOrder(Order order) { - return order.getOrderFlags().stream() - .filter(flag -> flag instanceof TimeInForce) - .map(flag -> (TimeInForce) flag) + private Optional getOrderFlag(Order order, Class clazz) { + return (Optional) order.getOrderFlags().stream() + .filter(flag -> clazz.isAssignableFrom(flag.getClass())) .findFirst(); } @@ -121,36 +128,41 @@ private String placeOrderAllProducts( try { String orderId; - if(order.getInstrument() instanceof FuturesContract){ - orderId = newFutureOrder( - order.getInstrument(), - BinanceAdapters.convert(order.getType()), - type, - tif, - order.getOriginalAmount(), - order.hasFlag(org.knowm.xchange.binance.dto.trade.BinanceOrderFlags.REDUCE_ONLY), - limitPrice, - getClientOrderId(order), - stopPrice, - false, - null, - callBackRate, - null - ).getOrderId(); + if (order.getInstrument() instanceof FuturesContract) { + orderId = + newFutureOrder( + order.getInstrument(), + BinanceAdapters.convert(order.getType()), + type, + tif, + order.getOriginalAmount(), + order.hasFlag( + org.knowm.xchange.binance.dto.trade.BinanceOrderFlags.REDUCE_ONLY), + limitPrice, + order.getUserReference(), + stopPrice, + false, + null, + callBackRate, + null) + .getOrderId(); } else { - orderId = Long.toString(newOrder( - order.getInstrument(), - BinanceAdapters.convert(order.getType()), - type, - tif, - order.getOriginalAmount(), - quoteOrderQty, // TODO (BigDecimal)order.getExtraValue("quoteOrderQty") - limitPrice, - getClientOrderId(order), - stopPrice, - trailingDelta, // TODO (Long)order.getExtraValue("trailingDelta") - null, - null).orderId); + orderId = + Long.toString( + newOrder( + order.getInstrument(), + BinanceAdapters.convert(order.getType()), + type, + tif, + order.getOriginalAmount(), + quoteOrderQty, + limitPrice, + order.getUserReference(), + stopPrice, + trailingDelta, + null, + null) + .orderId); } return orderId; } catch (BinanceException e) { @@ -172,7 +184,7 @@ public void placeTestOrder( Long trailingDelta) throws IOException { try { - TimeInForce tif = timeInForceFromOrder(order).orElse(null); + TimeInForce tif = getOrderFlag(order, TimeInForce.class).orElse(null); testNewOrder( order.getInstrument(), BinanceAdapters.convert(order.getType()), @@ -181,7 +193,7 @@ public void placeTestOrder( order.getOriginalAmount(), quoteOrderQty, limitPrice, - getClientOrderId(order), + order.getUserReference(), stopPrice, trailingDelta, null); @@ -190,20 +202,6 @@ public void placeTestOrder( } } - private String getClientOrderId(Order order) { - - String clientOrderId = null; - for (IOrderFlags flags : order.getOrderFlags()) { - if (flags instanceof BinanceOrderFlags) { - BinanceOrderFlags bof = (BinanceOrderFlags) flags; - if (clientOrderId == null) { - clientOrderId = bof.getClientId(); - } - } - } - return clientOrderId; - } - @Override public boolean cancelOrder(CancelOrderParams params) throws IOException { try { @@ -216,10 +214,7 @@ public boolean cancelOrder(CancelOrderParams params) throws IOException { CancelOrderByInstrument paramInstrument = (CancelOrderByInstrument) params; CancelOrderByIdParams paramId = (CancelOrderByIdParams) params; cancelOrderAllProducts( - paramInstrument.getInstrument(), - BinanceAdapters.id(paramId.getOrderId()), - null, - null); + paramInstrument.getInstrument(), BinanceAdapters.id(paramId.getOrderId()), null, null); return true; } catch (BinanceException e) { @@ -246,8 +241,7 @@ public UserTrades getTradeHistory(TradeHistoryParams params) throws IOException TradeHistoryParamInstrument pairParams = (TradeHistoryParamInstrument) params; Instrument pair = pairParams.getInstrument(); if (pair == null) { - throw new ExchangeException( - "You need to provide the instrument to get the user trades."); + throw new ExchangeException("You need to provide the instrument to get the user trades."); } Long orderId = null; Long startTime = null; @@ -279,7 +273,8 @@ public UserTrades getTradeHistory(TradeHistoryParams params) throws IOException limit = limitParams.getLimit(); } - List binanceTrades = myTradesAllProducts(pair, orderId, startTime, endTime, fromId, limit); + List binanceTrades = + myTradesAllProducts(pair, orderId, startTime, endTime, fromId, limit); return BinanceAdapters.adaptUserTrades(binanceTrades, pair instanceof FuturesContract); } catch (BinanceException e) { @@ -294,22 +289,22 @@ public Collection getOrder(OrderQueryParams... params) throws IOException for (OrderQueryParams param : params) { if (!(param instanceof OrderQueryParamInstrument)) { throw new ExchangeException( - "Parameters must be an instance of OrderQueryParamInstrument"); + "Parameters must be an instance of OrderQueryParamInstrument"); } - OrderQueryParamInstrument orderQueryParamInstrument = - (OrderQueryParamInstrument) param; + OrderQueryParamInstrument orderQueryParamInstrument = (OrderQueryParamInstrument) param; if (orderQueryParamInstrument.getInstrument() == null - || orderQueryParamInstrument.getOrderId() == null) { + || orderQueryParamInstrument.getOrderId() == null) { throw new ExchangeException( - "You need to provide the currency pair and the order id to query an order."); + "You need to provide the currency pair and the order id to query an order."); } orders.add( - BinanceAdapters.adaptOrder( - orderStatusAllProducts( - orderQueryParamInstrument.getInstrument(), - BinanceAdapters.id(orderQueryParamInstrument.getOrderId()), - null), orderQueryParamInstrument.getInstrument() instanceof FuturesContract)); + BinanceAdapters.adaptOrder( + orderStatusAllProducts( + orderQueryParamInstrument.getInstrument(), + BinanceAdapters.id(orderQueryParamInstrument.getOrderId()), + null), + orderQueryParamInstrument.getInstrument() instanceof FuturesContract)); } return orders; } catch (BinanceException e) { @@ -325,16 +320,16 @@ public OpenPositions getOpenPositions() throws IOException { @Override public Collection cancelAllOrders(CancelAllOrders orderParams) throws IOException { - if(!(orderParams instanceof CancelOrderByInstrument)){ - throw new NotAvailableFromExchangeException("Parameters must be an instance of "+CancelOrderByInstrument.class.getSimpleName()); + if (!(orderParams instanceof CancelOrderByInstrument)) { + throw new NotAvailableFromExchangeException( + "Parameters must be an instance of " + CancelOrderByInstrument.class.getSimpleName()); } Instrument instrument = ((CancelOrderByInstrument) orderParams).getInstrument(); - return cancelAllOpenOrdersAllProducts(instrument) - .stream() - .map(binanceCancelledOrder -> Long.toString(binanceCancelledOrder.orderId)) - .collect(Collectors.toList()); + return cancelAllOpenOrdersAllProducts(instrument).stream() + .map(binanceCancelledOrder -> Long.toString(binanceCancelledOrder.orderId)) + .collect(Collectors.toList()); } @Override diff --git a/xchange-binance/src/test/java/org/knowm/xchange/binance/dto/trade/TrailingFlagTest.java b/xchange-binance/src/test/java/org/knowm/xchange/binance/dto/trade/TrailingFlagTest.java new file mode 100644 index 00000000000..f8cc18f15b8 --- /dev/null +++ b/xchange-binance/src/test/java/org/knowm/xchange/binance/dto/trade/TrailingFlagTest.java @@ -0,0 +1,37 @@ +package org.knowm.xchange.binance.dto.trade; + +import java.util.Arrays; +import java.util.List; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.catchIllegalArgumentException; +import org.junit.Test; + +/** @author mrmx */ +public class TrailingFlagTest { + + /** Test of method, of class BinanceOrderTrailingFlag. */ + @Test + public void testOfValue() { + System.out.println("testOfValue"); + List values = Arrays.asList(0.01, 0.1, 1, 10); + for (Number value : values) { + assertThat(TrailingFlag.of(value).getTrailingBip()).isBetween(1L, 1000L); + } + } + + /** Test of method, of class BinanceOrderTrailingFlag. */ + @Test + public void testOfValueWithInvalidNumbers() { + System.out.println("testOfValueWithInvalidNumbers"); + List values = Arrays.asList(0.011, 0.11, 2, 11); + for (Number value : values) { + assertThat( + catchIllegalArgumentException( + () -> { + TrailingFlag.of(value); + })) + .as("Invalid value " + value) + .isNotNull(); + } + } +} \ No newline at end of file diff --git a/xchange-binance/src/test/resources/logback.xml b/xchange-binance/src/test/resources/logback.xml index 5809b573500..8c86d2fa1c3 100644 --- a/xchange-binance/src/test/resources/logback.xml +++ b/xchange-binance/src/test/resources/logback.xml @@ -11,7 +11,7 @@ - + diff --git a/xchange-bitstamp/pom.xml b/xchange-bitstamp/pom.xml index 8e3004aa9cf..1998f2321c5 100644 --- a/xchange-bitstamp/pom.xml +++ b/xchange-bitstamp/pom.xml @@ -24,6 +24,12 @@ + + org.projectlombok + lombok + provided + + org.knowm.xchange xchange-core diff --git a/xchange-bitstamp/src/main/java/org/knowm/xchange/bitstamp/BitstampV2.java b/xchange-bitstamp/src/main/java/org/knowm/xchange/bitstamp/BitstampV2.java index 235bcd892a7..ff6644b1f62 100644 --- a/xchange-bitstamp/src/main/java/org/knowm/xchange/bitstamp/BitstampV2.java +++ b/xchange-bitstamp/src/main/java/org/knowm/xchange/bitstamp/BitstampV2.java @@ -1,6 +1,7 @@ package org.knowm.xchange.bitstamp; import java.io.IOException; +import java.util.List; import java.util.Objects; import javax.ws.rs.GET; import javax.ws.rs.Path; @@ -32,6 +33,11 @@ BitstampOrderBook getOrderBook(@PathParam("pair") Pair pair) BitstampTicker getTicker(@PathParam("pair") BitstampV2.Pair pair) throws IOException, BitstampException; + @GET + @Path("ticker/") + List getTickers() + throws IOException, BitstampException; + @GET @Path("ticker_hour/{pair}/") BitstampTicker getTickerHour(@PathParam("pair") BitstampV2.Pair pair) diff --git a/xchange-bitstamp/src/main/java/org/knowm/xchange/bitstamp/dto/marketdata/BitstampPairInfo.java b/xchange-bitstamp/src/main/java/org/knowm/xchange/bitstamp/dto/marketdata/BitstampPairInfo.java index 654684a4dea..63550380393 100644 --- a/xchange-bitstamp/src/main/java/org/knowm/xchange/bitstamp/dto/marketdata/BitstampPairInfo.java +++ b/xchange-bitstamp/src/main/java/org/knowm/xchange/bitstamp/dto/marketdata/BitstampPairInfo.java @@ -1,91 +1,34 @@ package org.knowm.xchange.bitstamp.dto.marketdata; import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Builder; +import lombok.Value; +import lombok.extern.jackson.Jacksonized; -/** @author Matej Spiller Muys */ -public final class BitstampPairInfo { +@Builder +@Jacksonized +@Value +public class BitstampPairInfo { - private final String name; - private final String urlSymbol; - private final Integer baseDecimals; - private final Integer counterDecimals; - private final String minimumOrder; - private final String trading; - private final String description; + @JsonProperty("name") + String name; - /** - * Constructor - * - * @param name - * @param urlSymbol - * @param baseDecimals - * @param counterDecimals - * @param minimumOrder - * @param trading - * @param description - */ - public BitstampPairInfo( - @JsonProperty("name") String name, - @JsonProperty("url_symbol") String urlSymbol, - @JsonProperty("base_decimals") Integer baseDecimals, - @JsonProperty("counter_decimals") Integer counterDecimals, - @JsonProperty("minimum_order") String minimumOrder, - @JsonProperty("trading") String trading, - @JsonProperty("description") String description) { - this.name = name; - this.urlSymbol = urlSymbol; - this.baseDecimals = baseDecimals; - this.counterDecimals = counterDecimals; - this.minimumOrder = minimumOrder; - this.trading = trading; - this.description = description; - } + @JsonProperty("url_symbol") + String urlSymbol; - public String getName() { - return name; - } + @JsonProperty("base_decimals") + Integer baseDecimals; - public String getUrlSymbol() { - return urlSymbol; - } + @JsonProperty("counter_decimals") + Integer counterDecimals; - public Integer getBaseDecimals() { - return baseDecimals; - } + @JsonProperty("minimum_order") + String minimumOrder; - public Integer getCounterDecimals() { - return counterDecimals; - } + @JsonProperty("trading") + String trading; - public String getMinimumOrder() { - return minimumOrder; - } + @JsonProperty("description") + String description; - public String getTrading() { - return trading; - } - - public String getDescription() { - return description; - } - - @Override - public String toString() { - - return "BitstampTicker [name=" - + name - + ", urlSymbol=" - + urlSymbol - + ", baseDecimals=" - + baseDecimals - + ", counterDecimals=" - + counterDecimals - + ", minimumOrder=" - + minimumOrder - + ", trading=" - + trading - + ", description=" - + description - + "]"; - } } diff --git a/xchange-bitstamp/src/main/java/org/knowm/xchange/bitstamp/dto/marketdata/BitstampTicker.java b/xchange-bitstamp/src/main/java/org/knowm/xchange/bitstamp/dto/marketdata/BitstampTicker.java index e0713ba1d41..58a3fa9f675 100644 --- a/xchange-bitstamp/src/main/java/org/knowm/xchange/bitstamp/dto/marketdata/BitstampTicker.java +++ b/xchange-bitstamp/src/main/java/org/knowm/xchange/bitstamp/dto/marketdata/BitstampTicker.java @@ -2,119 +2,43 @@ import com.fasterxml.jackson.annotation.JsonProperty; import java.math.BigDecimal; +import lombok.Builder; +import lombok.Value; +import lombok.extern.jackson.Jacksonized; -/** @author Matija Mazi */ -public final class BitstampTicker { +@Builder +@Jacksonized +@Value +public class BitstampTicker { - private final BigDecimal open; - private final BigDecimal last; - private final BigDecimal high; - private final BigDecimal low; - private final BigDecimal vwap; - private final BigDecimal volume; - private final BigDecimal bid; - private final BigDecimal ask; - private final long timestamp; + @JsonProperty("pair") + String pair; - /** - * Constructor - * - * @param open - * @param last - * @param high - * @param low - * @param vwap - * @param volume - * @param bid - * @param ask - */ - public BitstampTicker( - @JsonProperty("open") BigDecimal open, - @JsonProperty("last") BigDecimal last, - @JsonProperty("high") BigDecimal high, - @JsonProperty("low") BigDecimal low, - @JsonProperty("vwap") BigDecimal vwap, - @JsonProperty("volume") BigDecimal volume, - @JsonProperty("bid") BigDecimal bid, - @JsonProperty("ask") BigDecimal ask, - @JsonProperty("timestamp") long timestamp) { + @JsonProperty("open") + BigDecimal open; - this.open = open; - this.last = last; - this.high = high; - this.low = low; - this.vwap = vwap; - this.volume = volume; - this.bid = bid; - this.ask = ask; - this.timestamp = timestamp; - } + @JsonProperty("last") + BigDecimal last; - public BigDecimal getOpen() { - return open; - } + @JsonProperty("high") + BigDecimal high; - public BigDecimal getLast() { + @JsonProperty("low") + BigDecimal low; - return last; - } + @JsonProperty("vwap") + BigDecimal vwap; - public BigDecimal getHigh() { + @JsonProperty("volume") + BigDecimal volume; - return high; - } + @JsonProperty("bid") + BigDecimal bid; - public BigDecimal getLow() { + @JsonProperty("ask") + BigDecimal ask; - return low; - } + @JsonProperty("timestamp") + long timestamp; - public BigDecimal getVwap() { - - return vwap; - } - - public BigDecimal getVolume() { - - return volume; - } - - public BigDecimal getBid() { - - return bid; - } - - public BigDecimal getAsk() { - - return ask; - } - - public long getTimestamp() { - - return timestamp; - } - - @Override - public String toString() { - - return "BitstampTicker [open=" - + open - + ", last=" - + last - + ", high=" - + high - + ", low=" - + low - + ", vwap=" - + vwap - + ", volume=" - + volume - + ", bid=" - + bid - + ", ask=" - + ask - + ", timestamp=" - + timestamp - + "]"; - } } diff --git a/xchange-bitstamp/src/main/java/org/knowm/xchange/bitstamp/service/BitstampMarketDataServiceRaw.java b/xchange-bitstamp/src/main/java/org/knowm/xchange/bitstamp/service/BitstampMarketDataServiceRaw.java index f80e469fe61..6999ef7e529 100644 --- a/xchange-bitstamp/src/main/java/org/knowm/xchange/bitstamp/service/BitstampMarketDataServiceRaw.java +++ b/xchange-bitstamp/src/main/java/org/knowm/xchange/bitstamp/service/BitstampMarketDataServiceRaw.java @@ -1,6 +1,7 @@ package org.knowm.xchange.bitstamp.service; import java.io.IOException; +import java.util.List; import javax.annotation.Nullable; import org.knowm.xchange.Exchange; import org.knowm.xchange.bitstamp.BitstampV2; @@ -33,6 +34,14 @@ public BitstampTicker getBitstampTicker(CurrencyPair pair) throws IOException { } } + public List getBitstampTickers() throws IOException { + try { + return bitstampV2.getTickers(); + } catch (BitstampException e) { + throw handleError(e); + } + } + public BitstampTicker getBitstampTickerHourly(CurrencyPair pair) throws IOException { try { return bitstampV2.getTickerHour(new BitstampV2.Pair(pair)); diff --git a/xchange-kucoin/src/main/java/org/knowm/xchange/kucoin/KucoinAdapters.java b/xchange-kucoin/src/main/java/org/knowm/xchange/kucoin/KucoinAdapters.java index 65b8e8f9f5e..22bfd285fc6 100644 --- a/xchange-kucoin/src/main/java/org/knowm/xchange/kucoin/KucoinAdapters.java +++ b/xchange-kucoin/src/main/java/org/knowm/xchange/kucoin/KucoinAdapters.java @@ -229,7 +229,7 @@ public static OrderBook adaptOrderBook(CurrencyPair currencyPair, OrderBookRespo .sorted(Ordering.natural().onResultOf((PriceAndSize s) -> s.price).reversed()) .map(s -> adaptLimitOrder(currencyPair, BID, s, timestamp)) .collect(toCollection(LinkedList::new)); - return new OrderBook(timestamp, asks, bids); + return new OrderBook(timestamp, asks, bids, true); } private static LimitOrder adaptLimitOrder( diff --git a/xchange-kucoin/src/main/java/org/knowm/xchange/kucoin/KucoinMarketDataServiceRaw.java b/xchange-kucoin/src/main/java/org/knowm/xchange/kucoin/KucoinMarketDataServiceRaw.java index 07831ffe86c..94be7c89b2f 100644 --- a/xchange-kucoin/src/main/java/org/knowm/xchange/kucoin/KucoinMarketDataServiceRaw.java +++ b/xchange-kucoin/src/main/java/org/knowm/xchange/kucoin/KucoinMarketDataServiceRaw.java @@ -10,10 +10,12 @@ import java.util.Map; import java.util.stream.Collectors; import org.knowm.xchange.client.ResilienceRegistries; +import org.knowm.xchange.currency.Currency; import org.knowm.xchange.currency.CurrencyPair; import org.knowm.xchange.kucoin.dto.KlineIntervalType; import org.knowm.xchange.kucoin.dto.response.AllTickersResponse; import org.knowm.xchange.kucoin.dto.response.CurrenciesResponse; +import org.knowm.xchange.kucoin.dto.response.CurrencyResponseV2; import org.knowm.xchange.kucoin.dto.response.KucoinKline; import org.knowm.xchange.kucoin.dto.response.OrderBookResponse; import org.knowm.xchange.kucoin.dto.response.SymbolResponse; @@ -70,7 +72,7 @@ public TradeFeeResponse getKucoinBaseFee() throws IOException { return classifyingExceptions( () -> decorateApiCall( - () -> tradingFeeAPI.getBaseFee(apiKey, digest, nonceFactory, passphrase)) + () -> tradingFeeAPI.getBaseFee(apiKey, digest, nonceFactory, passphrase)) .withRetry(retry("baseFee")) .withRateLimiter(rateLimiter(PUBLIC_REST_ENDPOINT_RATE_LIMITER)) .call()); @@ -81,14 +83,18 @@ public List getKucoinTradeFee(String symbols) throws IOExcepti return classifyingExceptions( () -> decorateApiCall( - () -> - tradingFeeAPI.getTradeFee( - apiKey, digest, nonceFactory, passphrase, symbols)) + () -> + tradingFeeAPI.getTradeFee( + apiKey, digest, nonceFactory, passphrase, symbols)) .withRetry(retry("tradeFee")) .withRateLimiter(rateLimiter(PRIVATE_REST_ENDPOINT_RATE_LIMITER)) .call()); } + /** + * @deprecated use {@link #getKucoinSymbolsV2()} + */ + @Deprecated public List getKucoinSymbols() throws IOException { return classifyingExceptions( () -> @@ -98,6 +104,15 @@ public List getKucoinSymbols() throws IOException { .call()); } + public List getKucoinSymbolsV2() throws IOException { + return classifyingExceptions( + () -> + decorateApiCall(symbolApi::getSymbolsV2) + .withRetry(retry("symbols")) + .withRateLimiter(rateLimiter(PUBLIC_REST_ENDPOINT_RATE_LIMITER)) + .call()); + } + public List getKucoinCurrencies() throws IOException { return classifyingExceptions( () -> @@ -107,13 +122,22 @@ public List getKucoinCurrencies() throws IOException { .call()); } + public CurrencyResponseV2 getKucoinCurrencies(Currency currency) throws IOException { + return classifyingExceptions( + () -> + decorateApiCall(() -> symbolApi.getCurrencies(currency.getCurrencyCode())) + .withRetry(retry("currencies")) + .withRateLimiter(rateLimiter(PUBLIC_REST_ENDPOINT_RATE_LIMITER)) + .call()); + } + public OrderBookResponse getKucoinOrderBookPartial(CurrencyPair pair) throws IOException { return classifyingExceptions( () -> decorateApiCall( - () -> - orderBookApi.getPartOrderBookAggregated( - KucoinAdapters.adaptCurrencyPair(pair))) + () -> + orderBookApi.getPartOrderBookAggregated( + KucoinAdapters.adaptCurrencyPair(pair))) .withRetry(retry("partialOrderBook")) .withRateLimiter(rateLimiter(PUBLIC_REST_ENDPOINT_RATE_LIMITER)) .call()); @@ -123,9 +147,9 @@ public OrderBookResponse getKucoinOrderBookPartialShallow(CurrencyPair pair) thr return classifyingExceptions( () -> decorateApiCall( - () -> - orderBookApi.getPartOrderBookShallowAggregated( - KucoinAdapters.adaptCurrencyPair(pair))) + () -> + orderBookApi.getPartOrderBookShallowAggregated( + KucoinAdapters.adaptCurrencyPair(pair))) .withRetry(retry("partialShallowOrderBook")) .withRateLimiter(rateLimiter(PUBLIC_REST_ENDPOINT_RATE_LIMITER)) .call()); @@ -135,13 +159,13 @@ public OrderBookResponse getKucoinOrderBookFull(CurrencyPair pair) throws IOExce return classifyingExceptions( () -> decorateApiCall( - () -> - orderBookApi.getFullOrderBookAggregated( - KucoinAdapters.adaptCurrencyPair(pair), - apiKey, - digest, - nonceFactory, - passphrase)) + () -> + orderBookApi.getFullOrderBookAggregated( + KucoinAdapters.adaptCurrencyPair(pair), + apiKey, + digest, + nonceFactory, + passphrase)) .withRetry(retry("fullOrderBook")) .withRateLimiter(rateLimiter(PRIVATE_REST_ENDPOINT_RATE_LIMITER)) .call()); @@ -151,7 +175,7 @@ public List getKucoinTrades(CurrencyPair pair) throws IOEx return classifyingExceptions( () -> decorateApiCall( - () -> historyApi.getTradeHistories(KucoinAdapters.adaptCurrencyPair(pair))) + () -> historyApi.getTradeHistories(KucoinAdapters.adaptCurrencyPair(pair))) .withRetry(retry("tradeHistories")) .withRateLimiter(rateLimiter(PUBLIC_REST_ENDPOINT_RATE_LIMITER)) .call()); @@ -163,12 +187,12 @@ public List getKucoinKlines( classifyingExceptions( () -> decorateApiCall( - () -> - historyApi.getKlines( - KucoinAdapters.adaptCurrencyPair(pair), - startTime, - endTime, - type.code())) + () -> + historyApi.getKlines( + KucoinAdapters.adaptCurrencyPair(pair), + startTime, + endTime, + type.code())) .withRetry(retry("klines")) .withRateLimiter(rateLimiter(PUBLIC_REST_ENDPOINT_RATE_LIMITER)) .call()); diff --git a/xchange-kucoin/src/main/java/org/knowm/xchange/kucoin/dto/response/CurrencyResponseV2.java b/xchange-kucoin/src/main/java/org/knowm/xchange/kucoin/dto/response/CurrencyResponseV2.java new file mode 100644 index 00000000000..77f2259d45f --- /dev/null +++ b/xchange-kucoin/src/main/java/org/knowm/xchange/kucoin/dto/response/CurrencyResponseV2.java @@ -0,0 +1,70 @@ +package org.knowm.xchange.kucoin.dto.response; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.math.BigDecimal; +import java.util.List; +import lombok.Data; +import lombok.extern.jackson.Jacksonized; + +@Data +@Jacksonized +@JsonIgnoreProperties(ignoreUnknown = true) +public class CurrencyResponseV2 { + + @JsonProperty("currency") + private String currency; + + @JsonProperty("name") + private String name; + + @JsonProperty("fullName") + private String fullName; + + @JsonProperty("precision") + private BigDecimal precision; + + @JsonProperty("chains") + private List chains; + + @JsonProperty("withdrawalMinSize") + private String withdrawalMinSize; + + @JsonProperty("withdrawalMinFee") + private String withdrawalMinFee; + + @JsonProperty("isMarginEnabled") + private Boolean isMarginEnabled; + + @JsonProperty("isDebitEnabled") + private Boolean isDebitEnabled; + + @Data + public static class Chain { + + @JsonProperty("chainName") + private String chainName; + + @JsonProperty("chain") + private String chain; + + @JsonProperty("withdrawalMinSize") + private BigDecimal withdrawalMinSize; + + @JsonProperty("withdrawalMinFee") + private BigDecimal withdrawalMinFee; + + @JsonProperty("isWithdrawEnabled") + private Boolean isWithdrawEnabled; + + @JsonProperty("isDepositEnabled") + private Boolean isDepositEnabled; + + @JsonProperty("confirms") + private Long confirms; + + @JsonProperty("contractAddress") + private String contractAddress; + + } +} diff --git a/xchange-kucoin/src/main/java/org/knowm/xchange/kucoin/service/SymbolAPI.java b/xchange-kucoin/src/main/java/org/knowm/xchange/kucoin/service/SymbolAPI.java index 8343991b5bb..1cfa56f1166 100644 --- a/xchange-kucoin/src/main/java/org/knowm/xchange/kucoin/service/SymbolAPI.java +++ b/xchange-kucoin/src/main/java/org/knowm/xchange/kucoin/service/SymbolAPI.java @@ -7,18 +7,20 @@ import java.util.Map; import javax.ws.rs.GET; import javax.ws.rs.Path; +import javax.ws.rs.PathParam; import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; import javax.ws.rs.core.MediaType; import org.knowm.xchange.kucoin.dto.response.AllTickersResponse; import org.knowm.xchange.kucoin.dto.response.CurrenciesResponse; +import org.knowm.xchange.kucoin.dto.response.CurrencyResponseV2; import org.knowm.xchange.kucoin.dto.response.KucoinResponse; import org.knowm.xchange.kucoin.dto.response.SymbolResponse; import org.knowm.xchange.kucoin.dto.response.SymbolTickResponse; import org.knowm.xchange.kucoin.dto.response.TickerResponse; /** Based on code by chenshiwei on 2019/1/11. */ -@Path("api/v1") +@Path("api") @Produces(MediaType.APPLICATION_JSON) public interface SymbolAPI { @@ -26,27 +28,47 @@ public interface SymbolAPI { * Get a list of available currency pairs for trading. * * @return The available symbols. + * @deprecated use {@link #getSymbolsV2()} */ @GET - @Path("/symbols") + @Path("/v1/symbols") + @Deprecated KucoinResponse> getSymbols() throws IOException; + /** + * Get a list of available currency pairs for trading. + * + * @return The available symbols. + */ + @GET + @Path("/v2/symbols") + KucoinResponse> getSymbolsV2() throws IOException; + /** * Get a list of available currencies for trading. * * @return The available currencies. */ @GET - @Path("/currencies") + @Path("/v1/currencies") KucoinResponse> getCurrencies() throws IOException; + /** + * Get currency detail. + * + * @return The available currencies. + */ + @GET + @Path("/v2/currencies/{currency}") + KucoinResponse getCurrencies(@PathParam("currency") String currency) throws IOException; + /** * Get the fiat price of the currencies for the available trading pairs. * * @return USD fiat price of the currencies. */ @GET - @Path("/prices") + @Path("/v1/prices") KucoinResponse> getPrices() throws IOException; /** @@ -56,7 +78,7 @@ public interface SymbolAPI { * @return The ticker. */ @GET - @Path("/market/orderbook/level1") + @Path("/v1/market/orderbook/level1") KucoinResponse getTicker(@QueryParam("symbol") String symbol) throws IOException; /** @@ -65,7 +87,7 @@ public interface SymbolAPI { * @return The allTickersTickerResponse. */ @GET - @Path("/market/allTickers") + @Path("/v1/market/allTickers") KucoinResponse getTickers() throws IOException; /** @@ -76,7 +98,7 @@ public interface SymbolAPI { * @return The 24hr stats for the symbol. */ @GET - @Path("/market/stats") + @Path("/v1/market/stats") KucoinResponse getMarketStats(@QueryParam("symbol") String symbol) throws IOException; } diff --git a/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingMarketDataService.java b/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingMarketDataService.java index 6a37f118d58..72994362002 100644 --- a/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingMarketDataService.java +++ b/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingMarketDataService.java @@ -1,5 +1,8 @@ package info.bitrich.xchangestream.binance; +import static info.bitrich.xchangestream.binance.BinanceSubscriptionType.KLINE; +import static info.bitrich.xchangestream.service.netty.StreamingObjectMapperHelper.getObjectMapper; + import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.JsonNode; @@ -19,6 +22,15 @@ import io.reactivex.functions.Consumer; import io.reactivex.schedulers.Schedulers; import io.reactivex.subjects.BehaviorSubject; +import java.io.IOException; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.knowm.xchange.binance.BinanceAdapters; import org.knowm.xchange.binance.BinanceErrorAdapter; import org.knowm.xchange.binance.dto.BinanceException; @@ -34,19 +46,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -import static info.bitrich.xchangestream.binance.BinanceSubscriptionType.KLINE; -import static info.bitrich.xchangestream.service.netty.StreamingObjectMapperHelper.getObjectMapper; - public class BinanceStreamingMarketDataService implements StreamingMarketDataService { private static final Logger LOG = LoggerFactory.getLogger(BinanceStreamingMarketDataService.class); @@ -68,7 +67,7 @@ public class BinanceStreamingMarketDataService implements StreamingMarketDataSer private final Map> bookTickerSubscriptions; private final Map> orderbookSubscriptions; private final Map> tradeSubscriptions; - private final Map> orderBookUpdatesSubscriptions; + private final Map>> orderBookUpdatesSubscriptions; private final Map>> klineSubscriptions; private final Map> orderBookRawUpdatesSubscriptions; @@ -240,7 +239,7 @@ private Observable klinesStream(Instrument instrument, KlineInterv * This api provides the ability to start receiving updates immediately. It is allowed to * subscribe to this api and {@link #getOrderBook(Instrument, Object...)} at the same time. */ - public Observable getOrderBookUpdates( + public Observable> getOrderBookUpdates( Instrument instrument) { if (!service.isLiveSubscriptionEnabled() && !service.getProductSubscription().getOrderBook().contains(instrument)) { @@ -250,19 +249,18 @@ public Observable getOrderBookUpdates( instrument, this::initOrderBookUpdateIfAbsent); } - private Observable initOrderBookUpdateIfAbsent(Instrument instrument) { + private Observable> initOrderBookUpdateIfAbsent(Instrument instrument) { orderBookRawUpdatesSubscriptions.computeIfAbsent( instrument, s -> triggerObservableBody(rawOrderBookUpdates(instrument))); return createOrderBookUpdatesObservable(instrument); } - private Observable createOrderBookUpdatesObservable(Instrument instrument) { + private Observable> createOrderBookUpdatesObservable(Instrument instrument) { return orderBookRawUpdatesSubscriptions .get(instrument) .flatMap( depthTransaction -> - observableFromStream(extractOrderBookUpdates(instrument, depthTransaction))) - .share(); + Observable.create(emitter -> emitter.onNext(extractOrderBookUpdatesToArray(instrument, depthTransaction)))); } private String channelFromCurrency(Instrument instrument, String subscriptionType) { @@ -585,6 +583,34 @@ private Stream extractOrderBookUpdates( return Stream.concat(bidStream, askStream); } + private List extractOrderBookUpdatesToArray( + Instrument instrument, DepthBinanceWebSocketTransaction depthTransaction) { + BinanceOrderbook orderBookDiff = depthTransaction.getOrderBook(); + List orderBookUpdates = new ArrayList<>(); + orderBookDiff.bids.forEach( + (key, value) -> + orderBookUpdates.add( + new OrderBookUpdate( + OrderType.BID, + value, + instrument, + key, + depthTransaction.getEventTime(), + value))); + + orderBookDiff.asks.forEach( + (key, value) -> + orderBookUpdates.add( + new OrderBookUpdate( + OrderType.ASK, + value, + instrument, + key, + depthTransaction.getEventTime(), + value))); + return orderBookUpdates; + } + private Observable observableFromStream(Stream stream) { return Observable.create( emitter -> { diff --git a/xchange-stream-kucoin/pom.xml b/xchange-stream-kucoin/pom.xml new file mode 100644 index 00000000000..68d8b9a24a5 --- /dev/null +++ b/xchange-stream-kucoin/pom.xml @@ -0,0 +1,40 @@ + + + + 4.0.0 + + + org.knowm.xchange + xchange-parent + 5.0.14-SNAPSHOT + + + XChange Kucoin Stream + xchange-stream-kucoin + + + + org.knowm.xchange + xchange-stream-core + ${project.parent.version} + + + org.knowm.xchange + xchange-stream-service-netty + ${project.parent.version} + + + org.knowm.xchange + xchange-kucoin + ${project.parent.version} + + + com.google.guava + guava + + + org.projectlombok + lombok + + + \ No newline at end of file diff --git a/xchange-stream-kucoin/src/main/java/info/bitrich/xchangestream/kucoin/KucoinStreamingAdapters.java b/xchange-stream-kucoin/src/main/java/info/bitrich/xchangestream/kucoin/KucoinStreamingAdapters.java new file mode 100644 index 00000000000..e6c9e64c9f8 --- /dev/null +++ b/xchange-stream-kucoin/src/main/java/info/bitrich/xchangestream/kucoin/KucoinStreamingAdapters.java @@ -0,0 +1,45 @@ +package info.bitrich.xchangestream.kucoin; + +import info.bitrich.xchangestream.kucoin.dto.KucoinOrderEventData; +import info.bitrich.xchangestream.kucoin.dto.KucoinWebSocketOrderEvent; +import org.knowm.xchange.currency.CurrencyPair; +import org.knowm.xchange.dto.Order; +import org.knowm.xchange.dto.trade.LimitOrder; +import org.knowm.xchange.dto.trade.MarketOrder; + +import java.math.BigDecimal; +import java.util.Date; +import java.util.concurrent.TimeUnit; + +public class KucoinStreamingAdapters { + public static Order adaptOrder(KucoinWebSocketOrderEvent orderEvent) { + KucoinOrderEventData data = orderEvent.data; + + Order.OrderType orderType = "buy".equals(data.side) ? Order.OrderType.BID : Order.OrderType.ASK; + CurrencyPair currencyPair = data.getCurrencyPair(); + + Order.Builder orderBuilder = + "market".equals(data.orderType) ? new MarketOrder.Builder(orderType, currencyPair) : + new LimitOrder.Builder(orderType, currencyPair).limitPrice(new BigDecimal(data.price)); + + orderBuilder + .id(data.orderId) + .originalAmount(new BigDecimal(data.size)) + .timestamp(new Date(TimeUnit.NANOSECONDS.toMillis(data.orderTime))) + .cumulativeAmount(new BigDecimal(data.filledSize)) + .orderStatus(adaptStatus(data.status)) + ; + + return orderBuilder.build(); + } + + private static Order.OrderStatus adaptStatus(String status) { + if ("open".equals(status)) + return Order.OrderStatus.NEW; + if ("match".equals(status)) + return Order.OrderStatus.PARTIALLY_FILLED; + if ("done".equals(status)) + return Order.OrderStatus.FILLED; + return null; + } +} diff --git a/xchange-stream-kucoin/src/main/java/info/bitrich/xchangestream/kucoin/KucoinStreamingExchange.java b/xchange-stream-kucoin/src/main/java/info/bitrich/xchangestream/kucoin/KucoinStreamingExchange.java new file mode 100644 index 00000000000..24c6afebaea --- /dev/null +++ b/xchange-stream-kucoin/src/main/java/info/bitrich/xchangestream/kucoin/KucoinStreamingExchange.java @@ -0,0 +1,123 @@ +package info.bitrich.xchangestream.kucoin; + +import info.bitrich.xchangestream.core.ProductSubscription; +import info.bitrich.xchangestream.core.StreamingExchange; +import info.bitrich.xchangestream.core.StreamingTradeService; +import info.bitrich.xchangestream.service.netty.NettyStreamingService; +import info.bitrich.xchangestream.util.Events; +import io.reactivex.Completable; +import io.reactivex.Observable; +import org.knowm.xchange.kucoin.KucoinExchange; +import org.knowm.xchange.kucoin.dto.response.WebsocketResponse; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +public class KucoinStreamingExchange extends KucoinExchange implements StreamingExchange { + + private KucoinStreamingService publicStreamingService; + private KucoinStreamingService privateStreamingService; + private KucoinStreamingMarketDataService streamingMarketDataService; + private KucoinStreamingTradeService streamingTradeService; + + private final List> services = new ArrayList<>(); + private Runnable onApiCall; + + @Override + protected void initServices() { + super.initServices(); + this.onApiCall = Events.onApiCall(exchangeSpecification); + } + + @Override + public Completable connect(ProductSubscription... args) { + ProductSubscription subscriptions = args[0]; + + Completable complete = Completable.complete(); + services.clear(); + + if (subscriptions.hasUnauthenticated()) { + complete = complete.doOnComplete(() -> { + WebsocketResponse connectionDetails = getPublicWebsocketConnectionDetails(); + WebsocketResponse.InstanceServer instanceServer = connectionDetails.getInstanceServers().get(0); + String url = instanceServer.getEndpoint() + "?token=" + connectionDetails.getToken(); + + publicStreamingService = new KucoinStreamingService(url, instanceServer.getPingInterval(), false); + applyStreamingSpecification(getExchangeSpecification(), publicStreamingService); + publicStreamingService.connect().doOnError(ex -> logger.warn("encountered error while subscribing to public websocket", ex)).blockingAwait(); + + services.add(publicStreamingService); + streamingMarketDataService = new KucoinStreamingMarketDataService(publicStreamingService, getMarketDataService(), onApiCall); + }); + } + + if (subscriptions.hasAuthenticated()) { + if (exchangeSpecification.getApiKey() == null) { + throw new IllegalArgumentException("API key required for authenticated streams"); + } + + complete = complete.doOnComplete(() -> { + WebsocketResponse connectionDetails = getPrivateWebsocketConnectionDetails(); + WebsocketResponse.InstanceServer instanceServer = connectionDetails.getInstanceServers().get(0); + String url = instanceServer.getEndpoint() + "?token=" + connectionDetails.getToken(); + + privateStreamingService = new KucoinStreamingService(url, instanceServer.getPingInterval(), true); + applyStreamingSpecification(getExchangeSpecification(), privateStreamingService); + privateStreamingService.connect().doOnError(ex -> logger.warn("encountered error while subscribing to private websocket", ex)).blockingAwait(); + + services.add(privateStreamingService); + streamingTradeService = new KucoinStreamingTradeService(privateStreamingService); + }); + } + + return complete; + } + + @Override + public Completable disconnect() { + + if (publicStreamingService != null) { + publicStreamingService = null; + streamingMarketDataService = null; + } + + if (privateStreamingService != null) { + privateStreamingService = null; + } + + List completables = services.stream().map(NettyStreamingService::disconnect).collect(Collectors.toList()); + services.clear(); + return Completable.concat(completables); + } + + @Override + public boolean isAlive() { + return services.stream().anyMatch(NettyStreamingService::isSocketOpen); + } + + @Override + public Observable reconnectFailure() { + return Observable.concat(services.stream().map(NettyStreamingService::subscribeReconnectFailure).collect(Collectors.toList())); + } + + @Override + public Observable connectionSuccess() { + return Observable.concat(services.stream().map(NettyStreamingService::subscribeConnectionSuccess).collect(Collectors.toList())); + } + + @Override + public KucoinStreamingMarketDataService getStreamingMarketDataService() { + return streamingMarketDataService; + } + + @Override + public StreamingTradeService getStreamingTradeService() { + return streamingTradeService; + } + + @Override + public void useCompressedMessages(boolean compressedMessages) { + services.forEach(s -> s.useCompressedMessages(compressedMessages)); + } +} diff --git a/xchange-stream-kucoin/src/main/java/info/bitrich/xchangestream/kucoin/KucoinStreamingMarketDataService.java b/xchange-stream-kucoin/src/main/java/info/bitrich/xchangestream/kucoin/KucoinStreamingMarketDataService.java new file mode 100644 index 00000000000..9a18d1f9560 --- /dev/null +++ b/xchange-stream-kucoin/src/main/java/info/bitrich/xchangestream/kucoin/KucoinStreamingMarketDataService.java @@ -0,0 +1,181 @@ +package info.bitrich.xchangestream.kucoin; + +import com.fasterxml.jackson.databind.ObjectMapper; +import info.bitrich.xchangestream.core.StreamingMarketDataService; +import info.bitrich.xchangestream.kucoin.dto.KucoinOrderBookEventData; +import info.bitrich.xchangestream.kucoin.dto.KucoinOrderBookEvent; +import info.bitrich.xchangestream.kucoin.dto.KucoinTickerEvent; +import info.bitrich.xchangestream.service.netty.StreamingObjectMapperHelper; +import io.reactivex.Observable; +import io.reactivex.functions.Consumer; +import org.knowm.xchange.currency.CurrencyPair; +import org.knowm.xchange.dto.marketdata.OrderBook; +import org.knowm.xchange.dto.marketdata.Ticker; +import org.knowm.xchange.dto.marketdata.Trade; +import org.knowm.xchange.exceptions.NotYetImplementedForExchangeException; +import org.knowm.xchange.kucoin.KucoinAdapters; +import org.knowm.xchange.kucoin.KucoinMarketDataService; +import org.knowm.xchange.kucoin.dto.response.OrderBookResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + +class KucoinStreamingMarketDataService implements StreamingMarketDataService { + + private static final Logger logger = + LoggerFactory.getLogger(KucoinStreamingMarketDataService.class); + + private final ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper(); + private final Map> orderbookSubscriptions = new ConcurrentHashMap<>(); + private final Map> orderBookRawUpdatesSubscriptions = new ConcurrentHashMap<>(); + + private final KucoinStreamingService service; + private final KucoinMarketDataService marketDataService; + private final Runnable onApiCall; + + public KucoinStreamingMarketDataService( + KucoinStreamingService service, + KucoinMarketDataService marketDataService, + Runnable onApiCall + ) { + this.service = service; + this.marketDataService = marketDataService; + this.onApiCall = onApiCall; + } + + @Override + public Observable getTicker(CurrencyPair currencyPair, Object... args) { + String channelName = "/market/ticker:" + KucoinAdapters.adaptCurrencyPair(currencyPair); + return service + .subscribeChannel(channelName) + .doOnError(ex -> logger.warn("encountered error while subscribing to channel " + channelName, ex)) + .map(node -> mapper.treeToValue(node, KucoinTickerEvent.class)) + .map(KucoinTickerEvent::getTicker); + } + + @Override + public Observable getOrderBook(CurrencyPair currencyPair, Object... args) { + return orderbookSubscriptions.computeIfAbsent(currencyPair, this::initOrderBookIfAbsent); + } + + private Observable initOrderBookIfAbsent(CurrencyPair currencyPair) { + orderBookRawUpdatesSubscriptions.computeIfAbsent(currencyPair, + s -> triggerObservableBody(rawOrderBookUpdates(s))); + return createOrderBookObservable(currencyPair); + } + + private Observable rawOrderBookUpdates(CurrencyPair currencyPair) { + String channelName = "/market/level2:" + KucoinAdapters.adaptCurrencyPair(currencyPair); + + return service + .subscribeChannel(channelName) + .doOnError(ex -> logger.warn("encountered error while subscribing to channel " + channelName, ex)) + .map(it -> mapper.treeToValue(it, KucoinOrderBookEvent.class)) + .map(e -> e.data); + } + + + private Observable createOrderBookObservable(CurrencyPair currencyPair) { + // 1. Open a stream + // 2. Buffer the events you receive from the stream. + OrderbookSubscription subscription = + new OrderbookSubscription(orderBookRawUpdatesSubscriptions.get(currencyPair)); + + return subscription + .stream + // 3. Get a depth snapshot + // (we do this if we don't already have one or we've invalidated a previous one) + .doOnNext(transaction -> subscription.initSnapshotIfInvalid(currencyPair)) + + .doOnError(ex -> logger.warn("encountered error while processing order book event", ex)) + + // If we failed, don't return anything. Just keep trying until it works + .filter(transaction -> subscription.snapshotLastUpdateId.get() > 0L) + + // 4. Drop any event where u is <= lastUpdateId in the snapshot + .filter(depth -> depth.sequenceEnd > subscription.snapshotLastUpdateId.get()) + + // 5. The first processed should have U <= lastUpdateId+1 AND u >= lastUpdateId+1, and + // subsequent events would + // normally have u == lastUpdateId + 1 which is stricter version of the above - let's be + // more relaxed + // each update has absolute numbers so even if there's an overlap it does no harm + .filter( + depth -> { + long lastUpdateId = subscription.lastUpdateId.get(); + boolean result = lastUpdateId == 0L || + (depth.sequenceStart <= lastUpdateId + 1 && depth.sequenceEnd >= lastUpdateId + 1); + if (result) { + subscription.lastUpdateId.set(depth.sequenceEnd); + } else { + // If not, we re-sync + logger.info( + "Orderbook snapshot for {} out of date (last={}, U={}, u={}). This is normal. Re-syncing.", + currencyPair, + lastUpdateId, + depth.sequenceStart, + depth.sequenceEnd); + subscription.invalidateSnapshot(); + } + return result; + }) + + // 7. The data in each event is the absolute quantity for a price level + // 8. If the quantity is 0, remove the price level + // 9. Receiving an event that removes a price level that is not in your local order book can + // happen and is normal. + .map( + depth -> { + depth.update(currencyPair, subscription.orderBook); + return subscription.orderBook; + }) + .share(); + } + + + private Observable triggerObservableBody(Observable observable) { + Consumer NOOP = whatever -> {}; + observable.subscribe(NOOP); + return observable; + } + + @Override + public Observable getTrades(CurrencyPair currencyPair, Object... args) { + throw new NotYetImplementedForExchangeException(); + } + + private final class OrderbookSubscription { + final Observable stream; + final AtomicLong lastUpdateId = new AtomicLong(); + final AtomicLong snapshotLastUpdateId = new AtomicLong(); + OrderBook orderBook; + + private OrderbookSubscription(Observable stream) { + this.stream = stream; + } + + void invalidateSnapshot() { + snapshotLastUpdateId.set(0); + } + + void initSnapshotIfInvalid(CurrencyPair currencyPair) { + if (snapshotLastUpdateId.get() != 0) return; + try { + logger.info("Fetching initial orderbook snapshot for {} ", currencyPair); + onApiCall.run(); + OrderBookResponse book = marketDataService.getKucoinOrderBookFull(currencyPair); + lastUpdateId.set(Long.parseLong(book.getSequence())); + snapshotLastUpdateId.set(lastUpdateId.get()); + orderBook = KucoinAdapters.adaptOrderBook(currencyPair, book); + } catch (Exception e) { + logger.error("Failed to fetch initial order book for " + currencyPair, e); + snapshotLastUpdateId.set(0); + lastUpdateId.set(0); + orderBook = null; + } + } + } +} diff --git a/xchange-stream-kucoin/src/main/java/info/bitrich/xchangestream/kucoin/KucoinStreamingService.java b/xchange-stream-kucoin/src/main/java/info/bitrich/xchangestream/kucoin/KucoinStreamingService.java new file mode 100644 index 00000000000..dfcf8007bea --- /dev/null +++ b/xchange-stream-kucoin/src/main/java/info/bitrich/xchangestream/kucoin/KucoinStreamingService.java @@ -0,0 +1,99 @@ +package info.bitrich.xchangestream.kucoin; + +import com.fasterxml.jackson.databind.JsonNode; +import info.bitrich.xchangestream.kucoin.dto.KucoinWebSocketUnsubscribeMessage; +import info.bitrich.xchangestream.kucoin.dto.KucoinWebSocketSubscribeMessage; +import info.bitrich.xchangestream.service.netty.JsonNettyStreamingService; +import info.bitrich.xchangestream.service.netty.WebSocketClientHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker; +import io.reactivex.Completable; +import io.reactivex.CompletableSource; +import io.reactivex.Observable; +import io.reactivex.disposables.Disposable; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +class KucoinStreamingService extends JsonNettyStreamingService { + + private final AtomicLong refCount = new AtomicLong(); + private final Observable pingPongSrc; + private final boolean privateChannel; + private Disposable pingPongSubscription; + + public KucoinStreamingService(String apiUrl, int pingInterval, boolean privateChannel) { + super(apiUrl); + + this.privateChannel = privateChannel; + pingPongSrc = Observable.interval(pingInterval, pingInterval, TimeUnit.MILLISECONDS); + } + + @Override + public Completable connect() { + Completable conn = super.connect(); + + return conn.andThen( + (CompletableSource) + (completable) -> { + try { + if (pingPongSubscription != null && !pingPongSubscription.isDisposed()) { + pingPongSubscription.dispose(); + } + pingPongSubscription = pingPongSrc.subscribe(o -> this.sendMessage("{\"type\":\"ping\", \"id\": \"" + refCount.incrementAndGet() + "\"}")); + completable.onComplete(); + } catch (Exception e) { + completable.onError(e); + } + }); + } + + @Override + protected String getChannelNameFromMessage(JsonNode message) { + JsonNode topic = message.get("topic"); + return topic != null ? topic.asText() : null; + } + + @Override + public String getSubscribeMessage(String channelName, Object... args) throws IOException { + KucoinWebSocketSubscribeMessage message = new KucoinWebSocketSubscribeMessage(channelName, refCount.incrementAndGet(), privateChannel); + return objectMapper.writeValueAsString(message); + } + + @Override + public String getUnsubscribeMessage(String channelName, Object... args) throws IOException { + KucoinWebSocketUnsubscribeMessage message = new KucoinWebSocketUnsubscribeMessage(channelName, refCount.incrementAndGet()); + return objectMapper.writeValueAsString(message); + } + + @Override + protected void handleMessage(JsonNode message) { + JsonNode typeNode = message.get("type"); + if (typeNode != null) { + String type = typeNode.asText(); + if ("message".equals(type)) + super.handleMessage(message); + else if ("error".equals(type)) + super.handleError(message, new Exception(message.get("data").asText())); + } + } + + @Override + protected WebSocketClientHandler getWebSocketClientHandler(WebSocketClientHandshaker handshaker, WebSocketClientHandler.WebSocketMessageHandler handler) { + return new KucoinNettyWebSocketClientHandler(handshaker, handler); + } + + private class KucoinNettyWebSocketClientHandler extends NettyWebSocketClientHandler { + public KucoinNettyWebSocketClientHandler(WebSocketClientHandshaker handshaker, WebSocketMessageHandler handler) { + super(handshaker, handler); + } + + public void channelInactive(ChannelHandlerContext ctx) { + if (pingPongSubscription != null && !pingPongSubscription.isDisposed()) { + pingPongSubscription.dispose(); + } + super.channelInactive(ctx); + } + } +} diff --git a/xchange-stream-kucoin/src/main/java/info/bitrich/xchangestream/kucoin/KucoinStreamingTradeService.java b/xchange-stream-kucoin/src/main/java/info/bitrich/xchangestream/kucoin/KucoinStreamingTradeService.java new file mode 100644 index 00000000000..34a1e656ab7 --- /dev/null +++ b/xchange-stream-kucoin/src/main/java/info/bitrich/xchangestream/kucoin/KucoinStreamingTradeService.java @@ -0,0 +1,38 @@ +package info.bitrich.xchangestream.kucoin; + +import com.fasterxml.jackson.databind.ObjectMapper; +import info.bitrich.xchangestream.core.StreamingTradeService; +import info.bitrich.xchangestream.kucoin.dto.KucoinWebSocketOrderEvent; +import info.bitrich.xchangestream.service.netty.StreamingObjectMapperHelper; +import io.reactivex.Observable; +import org.knowm.xchange.currency.CurrencyPair; +import org.knowm.xchange.dto.Order; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KucoinStreamingTradeService implements StreamingTradeService { + + private static final Logger logger = LoggerFactory.getLogger(KucoinStreamingTradeService.class); + + private final ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper(); + + private final KucoinStreamingService service; + + public KucoinStreamingTradeService(KucoinStreamingService service) { + this.service = service; + } + + @Override + public Observable getOrderChanges(CurrencyPair currencyPair, Object... args) { + return getRawOrderChanges(currencyPair) + .map(KucoinStreamingAdapters::adaptOrder); + } + + public Observable getRawOrderChanges(CurrencyPair currencyPair) { + return service + .subscribeChannel("/spotMarket/tradeOrders") + .doOnError(ex -> logger.warn("encountered error while subscribing to order changes", ex)) + .map(node -> mapper.treeToValue(node, KucoinWebSocketOrderEvent.class)) + .filter(order -> currencyPair == null || currencyPair.equals(order.data.getCurrencyPair())); + } +} diff --git a/xchange-stream-kucoin/src/main/java/info/bitrich/xchangestream/kucoin/dto/KucoinOrderBookChanges.java b/xchange-stream-kucoin/src/main/java/info/bitrich/xchangestream/kucoin/dto/KucoinOrderBookChanges.java new file mode 100644 index 00000000000..a465ee887a3 --- /dev/null +++ b/xchange-stream-kucoin/src/main/java/info/bitrich/xchangestream/kucoin/dto/KucoinOrderBookChanges.java @@ -0,0 +1,16 @@ +package info.bitrich.xchangestream.kucoin.dto; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.ToString; + +import java.util.List; + +@ToString +public class KucoinOrderBookChanges { + + @JsonProperty("asks") + public List> asks; + + @JsonProperty("bids") + public List> bids; +} diff --git a/xchange-stream-kucoin/src/main/java/info/bitrich/xchangestream/kucoin/dto/KucoinOrderBookEvent.java b/xchange-stream-kucoin/src/main/java/info/bitrich/xchangestream/kucoin/dto/KucoinOrderBookEvent.java new file mode 100644 index 00000000000..bc1feb4b788 --- /dev/null +++ b/xchange-stream-kucoin/src/main/java/info/bitrich/xchangestream/kucoin/dto/KucoinOrderBookEvent.java @@ -0,0 +1,10 @@ +package info.bitrich.xchangestream.kucoin.dto; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.ToString; + +@ToString +public class KucoinOrderBookEvent extends KucoinWebSocketEvent { + @JsonProperty("data") + public KucoinOrderBookEventData data; +} diff --git a/xchange-stream-kucoin/src/main/java/info/bitrich/xchangestream/kucoin/dto/KucoinOrderBookEventData.java b/xchange-stream-kucoin/src/main/java/info/bitrich/xchangestream/kucoin/dto/KucoinOrderBookEventData.java new file mode 100644 index 00000000000..d7de8a085da --- /dev/null +++ b/xchange-stream-kucoin/src/main/java/info/bitrich/xchangestream/kucoin/dto/KucoinOrderBookEventData.java @@ -0,0 +1,43 @@ +package info.bitrich.xchangestream.kucoin.dto; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.ToString; +import org.knowm.xchange.currency.CurrencyPair; +import org.knowm.xchange.dto.Order; +import org.knowm.xchange.dto.marketdata.OrderBook; +import org.knowm.xchange.dto.trade.LimitOrder; + +import java.math.BigDecimal; +import java.util.List; + +@ToString +public class KucoinOrderBookEventData { + + @JsonProperty("sequenceStart") + public long sequenceStart; + + @JsonProperty("sequenceEnd") + public long sequenceEnd; + + @JsonProperty("symbol") + public String symbol; + + @JsonProperty("changes") + public KucoinOrderBookChanges changes; + + public void update(CurrencyPair currencyPair, OrderBook orderBook) { + update(currencyPair, orderBook, Order.OrderType.BID, changes.bids); + update(currencyPair, orderBook, Order.OrderType.ASK, changes.asks); + } + + private void update(CurrencyPair currencyPair, OrderBook orderBook, Order.OrderType orderType, List> changes) { + for (List change : changes) { + String price = change.get(0); + if (!"0".equals(price)) { + String size = change.get(1); + LimitOrder limitOrder = new LimitOrder(orderType, new BigDecimal(size), currencyPair, null, null, new BigDecimal(price)); + orderBook.update(limitOrder); + } + } + } +} diff --git a/xchange-stream-kucoin/src/main/java/info/bitrich/xchangestream/kucoin/dto/KucoinOrderEventData.java b/xchange-stream-kucoin/src/main/java/info/bitrich/xchangestream/kucoin/dto/KucoinOrderEventData.java new file mode 100644 index 00000000000..55e5b0b80f8 --- /dev/null +++ b/xchange-stream-kucoin/src/main/java/info/bitrich/xchangestream/kucoin/dto/KucoinOrderEventData.java @@ -0,0 +1,59 @@ +package info.bitrich.xchangestream.kucoin.dto; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.ToString; +import org.knowm.xchange.currency.CurrencyPair; +import org.knowm.xchange.kucoin.KucoinAdapters; + +@ToString +public class KucoinOrderEventData { + + @JsonProperty("symbol") + public String symbol; + + @JsonProperty("orderType") + public String orderType; + + @JsonProperty("side") + public String side; + + @JsonProperty("orderId") + public String orderId; + + @JsonProperty("type") + public String type; + + @JsonProperty("orderTime") + public long orderTime; + + @JsonProperty("size") + public String size; + + @JsonProperty("filledSize") + public String filledSize; + + @JsonProperty("price") + public String price; + + @JsonProperty("clientOid") + public String clientOid; + + @JsonProperty("remainSize") + public String remainSize; + + @JsonProperty("matchPrice") + public String matchPrice; + + @JsonProperty("matchSize") + public String matchSize; + + @JsonProperty("status") + public String status; + + @JsonProperty("ts") + public long timestamp; + + public CurrencyPair getCurrencyPair() { + return KucoinAdapters.adaptCurrencyPair(symbol); + } +} diff --git a/xchange-stream-kucoin/src/main/java/info/bitrich/xchangestream/kucoin/dto/KucoinTickerConverter.java b/xchange-stream-kucoin/src/main/java/info/bitrich/xchangestream/kucoin/dto/KucoinTickerConverter.java new file mode 100644 index 00000000000..0c66c2c63e2 --- /dev/null +++ b/xchange-stream-kucoin/src/main/java/info/bitrich/xchangestream/kucoin/dto/KucoinTickerConverter.java @@ -0,0 +1,35 @@ +package info.bitrich.xchangestream.kucoin.dto; + +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.type.TypeFactory; +import com.fasterxml.jackson.databind.util.Converter; +import org.knowm.xchange.dto.marketdata.Ticker; + +import java.math.BigDecimal; +import java.util.Date; + +public class KucoinTickerConverter implements Converter { + @Override + public Ticker convert(JsonNode jsonNode) { + return new Ticker.Builder() + .timestamp(new Date(jsonNode.get("time").longValue())) + .ask(new BigDecimal(jsonNode.get("bestAsk").asText())) + .askSize(new BigDecimal(jsonNode.get("bestAskSize").asText())) + .bid(new BigDecimal(jsonNode.get("bestBid").asText())) + .bidSize(new BigDecimal(jsonNode.get("bestBidSize").asText())) + .last(new BigDecimal(jsonNode.get("price").asText())) + .volume(new BigDecimal(jsonNode.get("size").asText())) + .build(); + } + + @Override + public JavaType getInputType(TypeFactory typeFactory) { + return typeFactory.constructType(JsonNode.class); + } + + @Override + public JavaType getOutputType(TypeFactory typeFactory) { + return typeFactory.constructType(Ticker.class); + } +} diff --git a/xchange-stream-kucoin/src/main/java/info/bitrich/xchangestream/kucoin/dto/KucoinTickerEvent.java b/xchange-stream-kucoin/src/main/java/info/bitrich/xchangestream/kucoin/dto/KucoinTickerEvent.java new file mode 100644 index 00000000000..23861f680fb --- /dev/null +++ b/xchange-stream-kucoin/src/main/java/info/bitrich/xchangestream/kucoin/dto/KucoinTickerEvent.java @@ -0,0 +1,16 @@ +package info.bitrich.xchangestream.kucoin.dto; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import lombok.ToString; +import org.knowm.xchange.dto.marketdata.Ticker; + +@ToString +public class KucoinTickerEvent { + @JsonProperty("data") @JsonDeserialize(converter = KucoinTickerConverter.class) + public Ticker ticker; + + public Ticker getTicker() { + return ticker; + } +} diff --git a/xchange-stream-kucoin/src/main/java/info/bitrich/xchangestream/kucoin/dto/KucoinWebSocketEvent.java b/xchange-stream-kucoin/src/main/java/info/bitrich/xchangestream/kucoin/dto/KucoinWebSocketEvent.java new file mode 100644 index 00000000000..1fe1acabc2b --- /dev/null +++ b/xchange-stream-kucoin/src/main/java/info/bitrich/xchangestream/kucoin/dto/KucoinWebSocketEvent.java @@ -0,0 +1,14 @@ +package info.bitrich.xchangestream.kucoin.dto; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class KucoinWebSocketEvent { + @JsonProperty("type") + public String type; + + @JsonProperty("topic") + public String topic; + + @JsonProperty("subject") + public String subject; +} diff --git a/xchange-stream-kucoin/src/main/java/info/bitrich/xchangestream/kucoin/dto/KucoinWebSocketOrderEvent.java b/xchange-stream-kucoin/src/main/java/info/bitrich/xchangestream/kucoin/dto/KucoinWebSocketOrderEvent.java new file mode 100644 index 00000000000..eb76323a2d3 --- /dev/null +++ b/xchange-stream-kucoin/src/main/java/info/bitrich/xchangestream/kucoin/dto/KucoinWebSocketOrderEvent.java @@ -0,0 +1,10 @@ +package info.bitrich.xchangestream.kucoin.dto; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.ToString; + +@ToString +public class KucoinWebSocketOrderEvent extends KucoinWebSocketEvent { + @JsonProperty("data") + public KucoinOrderEventData data; +} diff --git a/xchange-stream-kucoin/src/main/java/info/bitrich/xchangestream/kucoin/dto/KucoinWebSocketSubscribeMessage.java b/xchange-stream-kucoin/src/main/java/info/bitrich/xchangestream/kucoin/dto/KucoinWebSocketSubscribeMessage.java new file mode 100644 index 00000000000..644c24310ed --- /dev/null +++ b/xchange-stream-kucoin/src/main/java/info/bitrich/xchangestream/kucoin/dto/KucoinWebSocketSubscribeMessage.java @@ -0,0 +1,24 @@ +package info.bitrich.xchangestream.kucoin.dto; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class KucoinWebSocketSubscribeMessage { + + @JsonProperty("topic") + public final String topic; + + @JsonProperty("type") + public final String type = "subscribe"; + + @JsonProperty("id") + public final Long id; + + @JsonProperty("privateChannel") + public final boolean privateChannel; + + public KucoinWebSocketSubscribeMessage(String topic, Long ref, boolean privateChannel) { + this.topic = topic; + this.id = ref; + this.privateChannel = privateChannel; + } +} diff --git a/xchange-stream-kucoin/src/main/java/info/bitrich/xchangestream/kucoin/dto/KucoinWebSocketUnsubscribeMessage.java b/xchange-stream-kucoin/src/main/java/info/bitrich/xchangestream/kucoin/dto/KucoinWebSocketUnsubscribeMessage.java new file mode 100644 index 00000000000..ae12dde2990 --- /dev/null +++ b/xchange-stream-kucoin/src/main/java/info/bitrich/xchangestream/kucoin/dto/KucoinWebSocketUnsubscribeMessage.java @@ -0,0 +1,20 @@ +package info.bitrich.xchangestream.kucoin.dto; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class KucoinWebSocketUnsubscribeMessage { + + @JsonProperty("topic") + public final String topic; + + @JsonProperty("type") + public final String type = "unsubscribe"; + + @JsonProperty("id") + public final Long id; + + public KucoinWebSocketUnsubscribeMessage(String topic, Long id) { + this.topic = topic; + this.id = id; + } +} diff --git a/xchange-stream-okex/src/main/java/info/bitrich/xchangestream/okex/OkexStreamingExchange.java b/xchange-stream-okex/src/main/java/info/bitrich/xchangestream/okex/OkexStreamingExchange.java index 2592c306dd9..dc15d322066 100644 --- a/xchange-stream-okex/src/main/java/info/bitrich/xchangestream/okex/OkexStreamingExchange.java +++ b/xchange-stream-okex/src/main/java/info/bitrich/xchangestream/okex/OkexStreamingExchange.java @@ -8,7 +8,6 @@ import org.knowm.xchange.exceptions.NotYetImplementedForExchangeException; import org.knowm.xchange.okex.OkexExchange; - public class OkexStreamingExchange extends OkexExchange implements StreamingExchange { // Production URIs public static final String WS_PUBLIC_CHANNEL_URI = "wss://ws.okx.com:8443/ws/v5/public"; @@ -70,11 +69,8 @@ private String getPrivateApiUrl(Boolean useSandBox, Boolean isAws){ @Override public Completable disconnect() { - if(privateStreamingService != null){ - return streamingService.disconnect().concatWith(privateStreamingService.disconnect()); - } else { - return streamingService.disconnect(); - } + streamingService.pingPongDisconnectIfConnected(); + return streamingService.disconnect(); } @Override diff --git a/xchange-stream-okex/src/main/java/info/bitrich/xchangestream/okex/OkexStreamingMarketDataService.java b/xchange-stream-okex/src/main/java/info/bitrich/xchangestream/okex/OkexStreamingMarketDataService.java index 23352969b83..0ad5a08b423 100644 --- a/xchange-stream-okex/src/main/java/info/bitrich/xchangestream/okex/OkexStreamingMarketDataService.java +++ b/xchange-stream-okex/src/main/java/info/bitrich/xchangestream/okex/OkexStreamingMarketDataService.java @@ -1,10 +1,15 @@ package info.bitrich.xchangestream.okex; +import static info.bitrich.xchangestream.okex.OkexStreamingService.*; + import com.fasterxml.jackson.databind.ObjectMapper; import info.bitrich.xchangestream.core.StreamingMarketDataService; import info.bitrich.xchangestream.service.netty.StreamingObjectMapperHelper; import io.reactivex.Observable; import io.reactivex.subjects.PublishSubject; +import java.sql.Timestamp; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; import org.knowm.xchange.dto.Order; import org.knowm.xchange.dto.marketdata.*; import org.knowm.xchange.instrument.Instrument; @@ -13,12 +18,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.sql.Timestamp; -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; - -import static info.bitrich.xchangestream.okex.OkexStreamingService.*; - public class OkexStreamingMarketDataService implements StreamingMarketDataService { private static final Logger LOG = LoggerFactory.getLogger(OkexStreamingMarketDataService.class); @@ -26,7 +25,7 @@ public class OkexStreamingMarketDataService implements StreamingMarketDataServic private final OkexStreamingService service; private final ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper(); - private final Map> orderBookUpdatesSubscriptions; + private final Map>> orderBookUpdatesSubscriptions; public OkexStreamingMarketDataService(OkexStreamingService service) { this.service = service; @@ -162,13 +161,13 @@ public Observable getOrderBook(Instrument instrument, Object... args) }); } - public Observable getOrderBookUpdates(Instrument instrument) { + public Observable> getOrderBookUpdates(Instrument instrument) { return orderBookUpdatesSubscriptions.computeIfAbsent(instrument, v -> PublishSubject.create()); } private void orderBookUpdatesSubscriptions( Instrument instrument, List asks, List bids, Date date) { - List orderBookUpdate = new ArrayList<>(); + List orderBookUpdates = new ArrayList<>(); for (OkexPublicOrder ask : asks) { OrderBookUpdate o = new OrderBookUpdate( @@ -178,7 +177,7 @@ private void orderBookUpdatesSubscriptions( ask.getPrice(), date, ask.getVolume()); - orderBookUpdate.add(o); + orderBookUpdates.add(o); } for (OkexPublicOrder bid : bids) { OrderBookUpdate o = @@ -189,9 +188,9 @@ private void orderBookUpdatesSubscriptions( bid.getPrice(), date, bid.getVolume()); - orderBookUpdate.add(o); + orderBookUpdates.add(o); } - for (OrderBookUpdate o : orderBookUpdate) - orderBookUpdatesSubscriptions.get(instrument).onNext(o); - } + orderBookUpdatesSubscriptions.get(instrument).onNext(orderBookUpdates); + } + } diff --git a/xchange-stream-okex/src/main/java/info/bitrich/xchangestream/okex/OkexStreamingService.java b/xchange-stream-okex/src/main/java/info/bitrich/xchangestream/okex/OkexStreamingService.java index ae28f447227..94ef2a22a8f 100644 --- a/xchange-stream-okex/src/main/java/info/bitrich/xchangestream/okex/OkexStreamingService.java +++ b/xchange-stream-okex/src/main/java/info/bitrich/xchangestream/okex/OkexStreamingService.java @@ -9,17 +9,6 @@ import io.reactivex.CompletableSource; import io.reactivex.Observable; import io.reactivex.disposables.Disposable; -import org.knowm.xchange.ExchangeSpecification; -import org.knowm.xchange.exceptions.ExchangeException; -import org.knowm.xchange.exceptions.NotYetImplementedForExchangeException; -import org.knowm.xchange.okex.dto.OkexInstType; -import org.knowm.xchange.service.BaseParamsDigest; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.crypto.Mac; -import javax.crypto.SecretKey; -import javax.crypto.spec.SecretKeySpec; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.security.InvalidKeyException; @@ -27,6 +16,16 @@ import java.util.Base64; import java.util.Collections; import java.util.concurrent.TimeUnit; +import javax.crypto.Mac; +import javax.crypto.SecretKey; +import javax.crypto.spec.SecretKeySpec; +import org.knowm.xchange.ExchangeSpecification; +import org.knowm.xchange.exceptions.ExchangeException; +import org.knowm.xchange.exceptions.NotYetImplementedForExchangeException; +import org.knowm.xchange.okex.dto.OkexInstType; +import org.knowm.xchange.service.BaseParamsDigest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class OkexStreamingService extends JsonNettyStreamingService { @@ -170,4 +169,10 @@ private OkexSubscribeMessage.SubscriptionTopic getTopic(String channelName){ throw new NotYetImplementedForExchangeException("ChannelName: "+channelName+" has not implemented yet on "+this.getClass().getSimpleName()); } } + + public void pingPongDisconnectIfConnected() { + if (pingPongSubscription != null && !pingPongSubscription.isDisposed()) { + pingPongSubscription.dispose(); + } + } }