Skip to content

Commit

Permalink
Merge pull request #14 from globophobe/feature/release-v0.1.6
Browse files Browse the repository at this point in the history
Feature/release v0.1.6
  • Loading branch information
globophobe authored Oct 15, 2023
2 parents 5537b6d + ec87948 commit f3724f9
Show file tree
Hide file tree
Showing 25 changed files with 1,666 additions and 1,703 deletions.
2 changes: 0 additions & 2 deletions .env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,5 @@ GOOGLE_APPLICATION_CREDENTIALS=google-application-credentials.json
FIREBASE_ADMIN_CREDENTIALS=firebase-admin-credentials.json
PROJECT_ID=gcp-project-id
SERVICE_ACCOUNT=service-account
BIGQUERY_LOCATION=gcp-region
BIGQUERY_DATASET=cryptofeed-werks
SENTRY_DSN=optional
BINANCE_API_KEY=optional
4 changes: 0 additions & 4 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,9 @@ FROM python:3.9-slim-buster

ARG POETRY_EXPORT
ARG PROJECT_ID
ARG BIGQUERY_LOCATION
ARG BIGQUERY_DATASET
ARG SENTRY_DSN

ENV PROJECT_ID $PROJECT_ID
ENV BIGQUERY_LOCATION $BIGQUERY_LOCATION
ENV BIGQUERY_DATASET $BIGQUERY_DATASET
ENV SENTRY_DSN $SENTRY_DSN

ADD cryptofeed_werks /cryptofeed_werks/
Expand Down
27 changes: 0 additions & 27 deletions cryptofeed_werks/constants.py

This file was deleted.

6 changes: 4 additions & 2 deletions cryptofeed_werks/feed.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,20 @@


class Feed(BaseFeed):
"""Feed."""

def std_symbol_to_exchange_symbol(self, symbol: str) -> str:
"""Standard symbol to exchange symbol.
There are certainly valid reasons to standardize symbols,
but there are also reasons to not care
but there are also reasons to not care.
"""
return symbol

def exchange_symbol_to_std_symbol(self, symbol: str) -> str:
"""Exchange symbol to standard symbol.
Ditto
Ditto.
"""
return symbol

Expand Down
2 changes: 2 additions & 0 deletions cryptofeed_werks/trades/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from .candles import CandleCallback
from .trade_cluster import TradeClusterCallback
from .significant_trades import SignificantTradeCallback
from .trades import (
NonSequentialIntegerTradeCallback,
Expand All @@ -16,6 +17,7 @@
"GCPPubSubTradeCallback",
"CandleCallback",
"TradeCallback",
"TradeClusterCallback",
"SignificantTradeCallback",
"SequentialIntegerTradeCallback",
"NonSequentialIntegerTradeCallback",
Expand Down
43 changes: 10 additions & 33 deletions cryptofeed_werks/trades/candles.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,27 @@
from decimal import Decimal
from typing import List, Optional, Tuple
from typing import Callable, List, Optional, Tuple

from cryptofeed.backends.aggregate import AggregateCallback

from .constants import NOTIONAL
from .window import WindowMixin


class CandleCallback(WindowMixin, AggregateCallback):
def __init__(
self, *args, window_seconds: int = 60, top_n: Optional[int] = None, **kwargs
) -> None:
super().__init__(*args, **kwargs)
class CandleCallback(WindowMixin):
"""Candle callback."""

def __init__(self, handler: Callable, window_seconds: int = 60) -> None:
"""Init."""
self.handler = handler
self.window_seconds = window_seconds
self.window = {}
self.top_n = top_n
self.trades = {}

async def __call__(self, trade: dict, timestamp: float) -> Tuple[dict, float]:
"""Call."""
candle = self.main(trade)
if candle is not None:
await self.handler(candle, timestamp)

def main(self, trade: dict) -> Optional[dict]:
"""Main."""
symbol = trade["symbol"]
timestamp = trade["timestamp"]
self.trades.setdefault(symbol, [])
Expand All @@ -47,7 +46,7 @@ def aggregate(self, trades: List[dict], is_late: bool = False) -> Optional[dict]
"""Aggregate."""
first_trade = trades[0]
prices = self.get_prices(trades)
candle = {
return {
"exchange": first_trade["exchange"],
"symbol": first_trade["symbol"],
"timestamp": self.get_start(first_trade["timestamp"]),
Expand All @@ -62,9 +61,6 @@ def aggregate(self, trades: List[dict], is_late: bool = False) -> Optional[dict]
"totalBuyTicks": sum([t["totalBuyTicks"] for t in trades]),
"totalTicks": sum([t["totalTicks"] for t in trades]),
}
if self.top_n:
candle["topN"] = self.get_top_n(trades)
return candle

def get_prices(self, trades: List[dict]) -> List[Decimal]:
"""Get prices."""
Expand All @@ -75,22 +71,3 @@ def get_prices(self, trades: List[dict]) -> List[Decimal]:
if value:
prices.append(value)
return prices

def get_top_n(self, trades: List[dict]) -> List[dict]:
"""Get top N."""
filtered = [t for t in trades if "uid" in t]
filtered.sort(key=lambda t: t[NOTIONAL], reverse=True)
top_n = filtered[: self.top_n]
for trade in top_n:
for key in list(trade):
if key not in (
"timestamp",
"price",
"volume",
"notional",
"tickRule",
"ticks",
):
del trade[key]
top_n.sort(key=lambda t: t["timestamp"])
return top_n
2 changes: 0 additions & 2 deletions cryptofeed_werks/trades/constants.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
VOLUME = "volume"
NOTIONAL = "notional"
TICKS = "ticks"

THRESH_ATTRS = (VOLUME, NOTIONAL, TICKS)
4 changes: 4 additions & 0 deletions cryptofeed_werks/trades/gcppubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@


class GCPPubSubTradeCallback(GCPPubSubCallback):
"""GCP Pub/Sub trade callback."""

default_key = TRADES

def __init__(self, *args, **kwargs) -> None:
"""Initialize."""
super().__init__(*args, **kwargs)
self.topics = {}

Expand All @@ -23,6 +26,7 @@ def get_topic(self) -> str:
pass

async def __call__(self, trade: dict, timestamp: float) -> None:
"""Call."""
topic = self.get_topic_path(trade)
message = self.get_message(trade)
await self.write(topic, message)
Expand Down
16 changes: 9 additions & 7 deletions cryptofeed_werks/trades/significant_trades.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,37 @@
from decimal import Decimal
from typing import List, Optional, Tuple

from cryptofeed.backends.aggregate import AggregateCallback

from .constants import NOTIONAL, TICKS, VOLUME
from .window import WindowMixin


class SignificantTradeCallback(WindowMixin, AggregateCallback):
class SignificantTradeCallback(WindowMixin):
"""Significant trade callback."""

def __init__(
self,
*args,
handler,
significant_trade_filter: int = 1000,
window_seconds: Optional[int] = None,
**kwargs
) -> None:
super().__init__(*args, **kwargs)
"""Initialize."""
self.handler = handler
self.significant_trade_filter = Decimal(significant_trade_filter)
self.trades = {}
self.window_seconds = window_seconds
self.window = {}

async def __call__(self, trade: dict, timestamp: float) -> Tuple[dict, float]:
"""Call."""
result = self.main(trade)
if isinstance(result, list):
for t in result:
await self.handler(t, timestamp)
elif isinstance(result, dict):
await self.handler(result, timestamp)

def main(self, trade: dict) -> dict:
def main(self, trade: dict) -> Optional[dict]:
"""Main."""
symbol = trade["symbol"]
timestamp = trade["timestamp"]
self.trades.setdefault(symbol, [])
Expand Down
96 changes: 96 additions & 0 deletions cryptofeed_werks/trades/trade_cluster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
from decimal import Decimal
from statistics import median
from typing import List, Optional, Tuple

from .constants import VOLUME
from .window import WindowMixin


class TradeClusterCallback(WindowMixin):
"""Trade cluter callback."""

def __init__(
self,
handler,
window_seconds: Optional[int] = None,
) -> None:
"""Init."""
self.handler = handler
self.tick_rule = None
self.trades = {}
self.window_seconds = window_seconds
self.window = {}

async def __call__(self, trade: dict, timestamp: float) -> Tuple[dict, float]:
"""Call."""
result = self.main(trade)
if isinstance(result, dict):
await self.handler(result, timestamp)

def main(self, trade: dict) -> Optional[dict]:
"""Main."""
symbol = trade["symbol"]
trades = self.trades.setdefault(symbol, [])
timestamp = trade["timestamp"]
window = self.get_window(symbol, timestamp)
if window is not None:
# Was message received late?
if window["start"] is not None and timestamp < window["start"]:
# FUBAR
return self.aggregate([trade], is_late=True)
# Is window exceeded?
elif window["stop"] is not None and timestamp >= window["stop"]:
ticks = []
# Get tick
tick = self.get_tick(symbol)
if tick is not None:
ticks.append(tick)
# Append trade
self.trades[symbol].append(trade)
# Set window
self.set_window(symbol, timestamp)
# Finally, return ticks
return self.get_tick(symbol, trade)
else:
return self.get_trade_cluster_or_tick(symbol, trade)

def get_trade_cluster_or_tick(self, symbol: str, trade: dict) -> Optional[dict]:
"""Get trade cluster or tick."""
trades = self.trades.setdefault(symbol, [])
is_sequential = True
is_same_direction = True
tick_rule = trade.get("tickRule")
if self.tick_rule:
is_same_direction = self.tick_rule == tick_rule
if is_same_direction:
self.trades[symbol].append(trade)
self.tick_rule = tick_rule
elif len(trades):
tick = self.aggregate(trades)
# Reset
self.trades[symbol] = [trade]
self.tick_rule = tick_rule
return tick

def aggregate(self, trades: List[dict], is_late: bool = False) -> None:
"""Aggregate."""
first_trade = trades[0]
last_trade = trades[-1]
data = {
"exchange": first_trade["exchange"],
"symbol": first_trade["symbol"],
"timestamp": first_trade["timestamp"],
"price": last_trade["price"],
"high": max(t.get("high", t["price"]) for t in trades),
"low": min(t.get("low", t["price"]) for t in trades),
"tickRule": self.tick_rule,
}
volume = ["volume", "totalBuyVolume", "totalVolume"]
notional = ["notional", "totalBuyNotional", "totalNotional"]
ticks = ["ticks", "totalBuyTicks", "totalTicks"]
for sample_type in volume + notional + ticks:
value = sum([t.get(sample_type, 0) for t in trades])
if sample_type in ticks:
value = int(value)
data[sample_type] = value
return data
15 changes: 10 additions & 5 deletions cryptofeed_werks/trades/trades.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
from typing import Optional, Tuple

from cryptofeed.backends.aggregate import AggregateCallback


class TradeCallback(AggregateCallback):
class TradeCallback:
"""
Aggregate sequences of trades that have equal symbol, timestamp, nanoseconds, and
tick rule.
"""

def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
def __init__(self, handler) -> None:
"""Initialize."""
self.handler = handler
self.trades = {}

async def __call__(self, trade: dict, timestamp: float) -> Tuple[dict, float]:
"""Call."""
t = self.main(trade)
if t is not None:
await self.handler(t, timestamp)
Expand All @@ -24,6 +24,7 @@ def main(self, trade: dict) -> dict:
return self.aggregate(t)

def prepare_trade(self, trade: dict) -> dict:
"""Prepare trade."""
if "ticks" not in trade:
trade["ticks"] = 1 # b/c Binance
if "isSequential" not in trade:
Expand Down Expand Up @@ -71,10 +72,12 @@ class SequentialIntegerTradeCallback(TradeCallback):
"""Coinbase has sequential IDs"""

def __init__(self, *args, **kwargs) -> None:
"""Initialize."""
super().__init__(*args, **kwargs)
self.uids = {}

def main(self, trade: dict) -> dict:
"""Main."""
t = self.prepare_trade(trade)
symbol = t["symbol"]
uid = self.uids.get(symbol, None)
Expand All @@ -90,10 +93,12 @@ class NonSequentialIntegerTradeCallback(TradeCallback):
"""Bitfinex has non-sequential IDs"""

def __init__(self, *args, **kwargs) -> None:
"""Initialize."""
super().__init__(*args, **kwargs)
self.uids = {}

def main(self, trade: dict) -> dict:
"""Main."""
t = self.prepare_trade(trade)
symbol = t["symbol"]
uid = self.uids.get(symbol, None)
Expand Down
Loading

0 comments on commit f3724f9

Please sign in to comment.