From f9240b8df0ede1c228f6395c1bfdfca79af34c19 Mon Sep 17 00:00:00 2001 From: Alejandro Cordoba Bodhert Date: Sat, 1 Feb 2025 17:24:59 -0500 Subject: [PATCH] Chapter 20 (#22) * working throug chapter 20 * finished chapter 20 * comparing against franton branch * updating notes --- apps/naive/lib/naive.ex | 15 +- ...rvisor.ex => dynamic_trader_supervisor.ex} | 30 +- apps/naive/lib/naive/leader.ex | 245 ---------------- apps/naive/lib/naive/strategy.ex | 266 +++++++++++++----- apps/naive/lib/naive/supervisor.ex | 8 +- apps/naive/lib/naive/trader.ex | 89 +++--- apps/naive/test/naive/trader_test.exs | 2 +- apps/naive/test/support/mocks.ex | 1 - config/config.exs | 1 + config/test.exs | 3 +- notes/notes.md | 11 + 11 files changed, 296 insertions(+), 375 deletions(-) rename apps/naive/lib/naive/{dynamic_symbol_supervisor.ex => dynamic_trader_supervisor.ex} (72%) delete mode 100644 apps/naive/lib/naive/leader.ex diff --git a/apps/naive/lib/naive.ex b/apps/naive/lib/naive.ex index 96243c1..3b87201 100644 --- a/apps/naive/lib/naive.ex +++ b/apps/naive/lib/naive.ex @@ -3,23 +3,30 @@ defmodule Naive do Documentation for `Naive`. """ - alias Naive.DynamicSymbolSupervisor + alias Naive.DynamicTraderSupervisor + alias Naive.Trader def start_trading(symbol) do symbol |> String.upcase() - |> DynamicSymbolSupervisor.start_worker() + |> DynamicTraderSupervisor.start_worker() end def stop_trading(symbol) do symbol |> String.upcase() - |> DynamicSymbolSupervisor.stop_worker() + |> DynamicTraderSupervisor.stop_worker() end def shutdown_trading(symbol) do symbol |> String.upcase() - |> DynamicSymbolSupervisor.shutdown_worker() + |> DynamicTraderSupervisor.shutdown_worker() + end + + def get_positions(symbol) do + symbol + |> String.upcase() + |> Trader.get_positions() end end diff --git a/apps/naive/lib/naive/dynamic_symbol_supervisor.ex b/apps/naive/lib/naive/dynamic_trader_supervisor.ex similarity index 72% rename from apps/naive/lib/naive/dynamic_symbol_supervisor.ex rename to apps/naive/lib/naive/dynamic_trader_supervisor.ex index 6bec840..687e696 100644 --- a/apps/naive/lib/naive/dynamic_symbol_supervisor.ex +++ b/apps/naive/lib/naive/dynamic_trader_supervisor.ex @@ -1,18 +1,19 @@ -defmodule Naive.DynamicSymbolSupervisor do +defmodule Naive.DynamicTraderSupervisor do @moduledoc """ Dynamic symbol supervisor, in charge of supervise runtime created symbols """ use DynamicSupervisor + require Logger + alias Naive.Repo alias Naive.Schema.Settings - alias Naive.SymbolSupervisor + alias Naive.Strategy + alias Naive.Trader import Ecto.Query, only: [from: 2] - require Logger - - @registry :naive_symbol_supervisors + @registry :naive_traders def start_link(_arg) do DynamicSupervisor.start_link(__MODULE__, [], name: __MODULE__) @@ -22,7 +23,7 @@ defmodule Naive.DynamicSymbolSupervisor do DynamicSupervisor.init(strategy: :one_for_one) end - def autostart_workers() do + def autostart_workers do Repo.all( from(s in Settings, where: s.status == "on", @@ -34,13 +35,13 @@ defmodule Naive.DynamicSymbolSupervisor do def start_worker(symbol) do Logger.info("Starting trading on #{symbol}") - update_status(symbol, "on") + Strategy.update_status(symbol, "on") start_child(symbol) end def stop_worker(symbol) do Logger.info("Stopping trading on #{symbol}") - update_status(symbol, "off") + Strategy.update_status(symbol, "off") stop_child(symbol) end @@ -48,25 +49,20 @@ defmodule Naive.DynamicSymbolSupervisor do Logger.info("Shutdown of trading on #{symbol} initialized") {:ok, settings} = - update_status( + Strategy.update_status( symbol, "shutdown" ) - Naive.Leader.notify(:settings_updated, settings) - {:ok, settings} - end + Trader.notify(:settings_updated, settings) - defp update_status(symbol, status) when is_binary(symbol) and is_binary(status) do - Repo.get_by(Settings, symbol: symbol) - |> Ecto.Changeset.change(%{status: status}) - |> Repo.update() + {:ok, settings} end defp start_child(args) do DynamicSupervisor.start_child( __MODULE__, - {SymbolSupervisor, args} + {Trader, args} ) end diff --git a/apps/naive/lib/naive/leader.ex b/apps/naive/lib/naive/leader.ex deleted file mode 100644 index 62b0a49..0000000 --- a/apps/naive/lib/naive/leader.ex +++ /dev/null @@ -1,245 +0,0 @@ -defmodule Naive.Leader do - @moduledoc """ - Saves the states of the trading symbol - """ - use GenServer - - alias Naive.Repo - alias Naive.Schema.Settings - alias Naive.Trader - require Logger - - @type event_type :: atom - @callback notify(event_type, %Trader.State{}) :: :ok - - @binance_client Application.compile_env(:naive, :binance_client) - - defmodule State do - @moduledoc """ - State to store symbols information - """ - defstruct symbol: nil, - settings: nil, - traders: [] - end - - defmodule TraderData do - @moduledoc """ - Individual trader data - """ - defstruct pid: nil, - ref: nil, - state: nil - end - - def start_link(symbol) do - GenServer.start_link(__MODULE__, symbol, name: :"#{__MODULE__}-#{symbol}") - end - - def init(symbol) do - {:ok, %State{symbol: symbol}, {:continue, :start_traders}} - end - - def notify(:trader_state_updated, trader_state) do - GenServer.call( - :"#{__MODULE__}-#{trader_state.symbol}", - {:update_trader_state, trader_state} - ) - end - - def notify(:rebuy_triggered, trader_state) do - GenServer.call( - :"#{__MODULE__}-#{trader_state.symbol}", - {:rebuy_triggered, trader_state} - ) - end - - def notify(:settings_updated, settings) do - GenServer.call(:"#{__MODULE__}-#{settings.symbol}", {:update_settings, settings}) - end - - def handle_continue(:start_traders, %{symbol: symbol} = state) do - settings = fetch_symbol_settings(symbol) - trader_state = fresh_trader_state(settings) - traders = [start_new_trader(trader_state)] - - {:noreply, %{state | settings: settings, traders: traders}} - end - - def handle_call( - {:update_trader_state, new_trader_state}, - {trader_pid, _}, - %{traders: traders} = state - ) do - case Enum.find_index(traders, &(&1.pid == trader_pid)) do - nil -> - Logger.warning("Tried to update the state of trader that leader is not aware of") - {:reply, :ok, state} - - index -> - old_trader_data = Enum.at(traders, index) - new_trader_data = %{old_trader_data | :state => new_trader_state} - {:reply, :ok, %{state | :traders => List.replace_at(traders, index, new_trader_data)}} - end - end - - def handle_call( - {:rebuy_triggered, new_trader_state}, - {trader_pid, _}, - %{traders: traders, symbol: symbol, settings: settings} = state - ) do - case Enum.find_index(traders, &(&1.pid == trader_pid)) do - nil -> - Logger.warning("Rebuy triggered by trader that leader is not aware of") - {:reply, :ok, state} - - index -> - old_trader_data = Enum.at(traders, index) - new_trader_data = %{old_trader_data | :state => new_trader_state} - updated_traders = List.replace_at(traders, index, new_trader_data) - - updated_traders = - if settings.chunks == length(traders) do - Logger.info("All traders already started for #{symbol}") - updated_traders - else - if settings.status == "shutdown" do - Logger.warning( - "The leader won't start a new trader on #{symbol} " <> - "as symbol is in the 'shutdown' state" - ) - - updated_traders - else - Logger.info("Starting new trader for #{symbol}") - [start_new_trader(fresh_trader_state(settings)) | updated_traders] - end - end - - {:reply, :ok, %{state | :traders => updated_traders}} - end - end - - def handle_call({:update_settings, new_settings}, _, state) do - {:reply, :ok, %{state | settings: new_settings}} - end - - def handle_info( - {:DOWN, _ref, :process, trader_pid, :normal}, - %{traders: traders, symbol: symbol, settings: settings} = state - ) do - Logger.info("#{symbol} trader finished trade - restarting") - - case Enum.find_index(traders, &(&1.pid == trader_pid)) do - nil -> - Logger.warning( - "Tried to restart finished #{symbol} " <> - "trader that leader is not aware of" - ) - - if settings.status == "shutdown" and traders == [] do - Naive.stop_trading(state.symbol) - end - - {:noreply, state} - - index -> - new_traders = - if settings.status == "shutdown" do - Logger.warning( - "The leader won't start a new trader on #{symbol} " <> - "as symbol is in shutdown state" - ) - - if length(traders) == 1 do - Naive.stop_trading(state.symbol) - end - - List.delete_at(traders, index) - else - new_trader_data = start_new_trader(fresh_trader_state(settings)) - List.replace_at(traders, index, new_trader_data) - end - - {:noreply, %{state | traders: new_traders}} - end - end - - def handle_info( - {:DOWN, _ref, :process, trader_pid, reason}, - %{traders: traders, symbol: symbol} = state - ) do - Logger.error("#{symbol} trader died - reason #{reason} - trying to restart") - - case Enum.find_index(traders, &(&1.pid == trader_pid)) do - nil -> - Logger.warning( - "Tried to restart #{symbol} trader " <> - "but failed to find its cached state" - ) - - {:noreply, state} - - index -> - trader_data = Enum.at(traders, index) - new_trader_data = start_new_trader(trader_data.state) - new_traders = List.replace_at(traders, index, new_trader_data) - {:noreply, %{state | traders: new_traders}} - end - end - - defp fresh_trader_state(settings) do - %{ - struct(Trader.State, settings) - | id: :os.system_time(:millisecond), - budget: Decimal.div(settings.budget, settings.chunks), - rebuy_notified: false - } - end - - defp fetch_symbol_settings(symbol) do - symbol_filters = fetch_symbol_filters(symbol) - settings = Repo.get_by!(Settings, symbol: symbol) - - Map.merge( - symbol_filters, - Map.from_struct(settings) - ) - end - - defp fetch_symbol_filters(symbol) do - symbol_filters = - @binance_client.get_exchange_info() - |> elem(1) - |> Map.get(:symbols) - |> Enum.find(&(&1["symbol"] == symbol)) - |> Map.get("filters") - - tick_size = - symbol_filters - |> Enum.find(&(&1["filterType"] == "PRICE_FILTER")) - |> Map.get("tickSize") - - step_size = - symbol_filters - |> Enum.find(&(&1["filterType"] == "LOT_SIZE")) - |> Map.get("stepSize") - - %{ - tick_size: tick_size, - step_size: step_size - } - end - - defp start_new_trader(%Trader.State{} = state) do - {:ok, pid} = - DynamicSupervisor.start_child( - :"Naive.DynamicTraderSupervisor-#{state.symbol}", - {Naive.Trader, state} - ) - - ref = Process.monitor(pid) - - %TraderData{pid: pid, ref: ref, state: state} - end -end diff --git a/apps/naive/lib/naive/strategy.ex b/apps/naive/lib/naive/strategy.ex index 82dde42..77e639d 100644 --- a/apps/naive/lib/naive/strategy.ex +++ b/apps/naive/lib/naive/strategy.ex @@ -1,31 +1,101 @@ defmodule Naive.Strategy do @moduledoc """ - Module in charge of making and executing descicions + Module in charge of making and executing decisions """ alias Core.Struct.TradeEvent - alias Naive.Trader.State + alias Naive.Schema.Settings require Logger @binance_client Application.compile_env(:naive, :binance_client) - @leader Application.compile_env(:naive, :leader) @logger Application.compile_env(:core, :logger) @pubsub_client Application.compile_env(:core, :pubsub_client) + @repo Application.compile_env(:naive, :repo) + + defmodule Position do + @enforce_keys [ + :id, + :symbol, + :budget, + :buy_down_interval, + :profit_interval, + :rebuy_interval, + :rebuy_notified, + :tick_size, + :step_size + ] + defstruct [ + :id, + :symbol, + :budget, + :buy_order, + :sell_order, + :buy_down_interval, + :profit_interval, + :rebuy_interval, + :rebuy_notified, + :tick_size, + :step_size + ] + end + + def execute(%TradeEvent{} = trade_event, positions, settings) do + generate_decisions(positions, [], trade_event, settings) + |> Enum.map(fn {decision, position} -> + Task.async(fn -> execute_decision(decision, position, settings) end) + end) + |> Task.await_many() + |> then(&parse_results/1) + end - def execute(%TradeEvent{} = trade_event, %State{} = state) do - generate_decision(trade_event, state) - |> execute_decision(state) + def parse_results([]), do: :exit + + def parse_results([_ | _] = results) do + results + |> Enum.map(fn {:ok, new_position} -> new_position end) + |> then(&{:ok, &1}) + end + + def generate_decisions([], generated_results, _trade_event, _settings) do + generated_results + end + + def generate_decisions([position | rest] = positions, generated_results, trade_event, settings) do + current_positions = positions ++ (generated_results |> Enum.map(&elem(&1, 0))) + + case generate_decision(trade_event, position, current_positions, settings) do + :exit -> + generate_decisions(rest, generated_results, trade_event, settings) + + :rebuy -> + generate_decisions( + rest, + [{:skip, %{position | rebuy_notified: true}}, {:rebuy, position}] ++ generated_results, + trade_event, + settings + ) + + decision -> + generate_decisions( + rest, + [{decision, position} | generated_results], + trade_event, + settings + ) + end end def generate_decision( %TradeEvent{price: price}, - %State{ + %Position{ budget: budget, buy_order: nil, buy_down_interval: buy_down_interval, tick_size: tick_size, step_size: step_size - } + }, + _positions, + _settings ) do price = calculate_buy_price(price, buy_down_interval, tick_size) quantity = calculate_quantity(budget, price, step_size) @@ -35,13 +105,15 @@ defmodule Naive.Strategy do def generate_decision( %TradeEvent{buyer_order_id: order_id}, - %State{ + %Position{ buy_order: %Binance.OrderResponse{ order_id: order_id, status: "FILLED" }, sell_order: %Binance.OrderResponse{} - } + }, + _positions, + _settings ) when is_number(order_id) do :skip @@ -49,12 +121,14 @@ defmodule Naive.Strategy do def generate_decision( %TradeEvent{buyer_order_id: order_id}, - %State{ + %Position{ buy_order: %Binance.OrderResponse{ order_id: order_id }, sell_order: nil - } + }, + _positions, + _settings ) when is_number(order_id) do :fetch_buy_order @@ -62,7 +136,7 @@ defmodule Naive.Strategy do def generate_decision( %TradeEvent{}, - %State{ + %Position{ buy_order: %Binance.OrderResponse{ status: "FILLED", price: buy_price @@ -70,7 +144,9 @@ defmodule Naive.Strategy do sell_order: nil, profit_interval: profit_interval, tick_size: tick_size - } + }, + _positions, + _settings ) do sell_price = calculate_sell_price(buy_price, profit_interval, tick_size) @@ -79,42 +155,53 @@ defmodule Naive.Strategy do def generate_decision( %TradeEvent{}, - %State{ + %Position{ sell_order: %Binance.OrderResponse{status: "FILLED"} - } + }, + _positions, + settings ) do - :exit + if settings.status != "shutdown" do + :finished + else + :exit + end end def generate_decision( %TradeEvent{ seller_order_id: order_id }, - %State{ + %Position{ sell_order: %Binance.OrderResponse{ order_id: order_id } - } + }, + _positions, + _settings ) do :fetch_sell_order end def generate_decision( %TradeEvent{price: current_price}, - %State{ + %Position{ buy_order: %Binance.OrderResponse{price: buy_price}, rebuy_interval: rebuy_interval, rebuy_notified: false - } + }, + positions, + settings ) do - if trigger_rebuy?(buy_price, current_price, rebuy_interval) do + if trigger_rebuy?(buy_price, current_price, rebuy_interval) and settings.status != "shutdown" and + length(positions) < settings.chunks do :rebuy else :skip end end - def generate_decision(%TradeEvent{}, _state) do + def generate_decision(%TradeEvent{}, %Position{}, _positions, _settings) do :skip end @@ -162,14 +249,15 @@ defmodule Naive.Strategy do defp execute_decision( {:place_buy_order, price, quantity}, - %State{ + %Position{ id: id, symbol: symbol - } = state + } = position, + _settings ) do @logger.info( - "The trader(#{id}) is placing a BUY order " <> - "for #{symbol} @ #{price}, quantity: #{quantity}" + "Position (#{symbol}/#{id}): " <> + "Placing a BUY order @ #{price}, quantity: #{quantity}" ) {:ok, %Binance.OrderResponse{} = order} = @@ -177,25 +265,23 @@ defmodule Naive.Strategy do :ok = broadcast_order(order) - new_state = %{state | buy_order: order} - @leader.notify(:trader_state_updated, new_state) - - {:ok, new_state} + {:ok, %{position | buy_order: order}} end defp execute_decision( {:place_sell_order, sell_price}, - %State{ + %Position{ id: id, symbol: symbol, buy_order: %Binance.OrderResponse{ orig_qty: quantity } - } = state + } = position, + _settings ) do @logger.info( - "The trader(#{id}) is placing a SELL order for " <> - "#{symbol} @ #{sell_price}, quantity: #{quantity}." + "Position (#{symbol}/#{id}): The BUY order is now filled. " <> + "Placing a SELL order @ #{sell_price}, quantity: #{quantity}" ) {:ok, %Binance.OrderResponse{} = order} = @@ -203,16 +289,12 @@ defmodule Naive.Strategy do :ok = broadcast_order(order) - new_state = %{state | sell_order: order} - - @leader.notify(:trader_state_updated, new_state) - - {:ok, new_state} + {:ok, %{position | sell_order: order}} end defp execute_decision( :fetch_buy_order, - %State{ + %Position{ id: id, symbol: symbol, buy_order: @@ -220,37 +302,35 @@ defmodule Naive.Strategy do order_id: order_id, transact_time: timestamp } = buy_order - } = state + } = position, + _settings ) do - @logger.info("Trader's(#{id} #{symbol} buy order got partially filled") + @logger.info("Position (#{symbol}/#{id}): The BUY order is now partially filled") {:ok, %Binance.Order{} = current_buy_order} = @binance_client.get_order(symbol, timestamp, order_id) :ok = broadcast_order(current_buy_order) - buy_order = %{buy_order | status: current_buy_order.status} - new_state = %{state | buy_order: buy_order} - - @leader.notify(:trader_state_updated, new_state) - - {:ok, new_state} + {:ok, %{position | buy_order: buy_order}} end defp execute_decision( - :exit, - %State{ + :finished, + %Position{ id: id, symbol: symbol - } + }, + settings ) do - @logger.info("Trader(#{id}) finished trade cycle for #{symbol}") - :exit + new_position = generate_fresh_position(settings) + @logger.info("Position (#{symbol}/#{id}): Trade cycle finished") + {:ok, new_position} end defp execute_decision( :fetch_sell_order, - %State{ + %Position{ id: id, symbol: symbol, sell_order: @@ -258,34 +338,33 @@ defmodule Naive.Strategy do order_id: order_id, transact_time: timestamp } = sell_order - } = state + } = position, + _settings ) do - @logger.info("Trader's(#{id} #{symbol} SELL order got partially filled") + @logger.info("Position (#{symbol}/#{id}): The SELL order is now partially filled") {:ok, %Binance.Order{} = current_sell_order} = @binance_client.get_order(symbol, timestamp, order_id) :ok = broadcast_order(current_sell_order) - - new_state = %{state | sell_order: sell_order} - @leader.notify(:trader_state_updated, new_state) - {:ok, new_state} + sell_order = %{sell_order | status: current_sell_order.status} + {:ok, %{position | sell_order: sell_order}} end defp execute_decision( :rebuy, - %State{ + %Position{ id: id, symbol: symbol - } = state + }, + settings ) do - @logger.info("Rebuy triggered for #{symbol} by the trader(#{id})") - new_state = %{state | rebuy_notified: true} - @leader.notify(:rebuy_triggered, new_state) - {:ok, new_state} + new_position = generate_fresh_position(settings) + @logger.info("Position (#{symbol}/#{id}): Rebuy triggered. Starting a new position") + {:ok, new_position} end - defp execute_decision(:skip, state) do + defp execute_decision(:skip, state, _settings) do {:ok, state} end @@ -311,4 +390,55 @@ defmodule Naive.Strategy do is_working: true }) end + + def fetch_symbol_settings(symbol) do + exchange_info = + @binance_client.get_exchange_info() + + db_settings = @repo.get_by!(Settings, symbol: symbol) + + merge_filters_into_settings(exchange_info, db_settings, symbol) + end + + def merge_filters_into_settings(exchange_info, db_settings, symbol) do + symbol_filters = + exchange_info + |> elem(1) + |> Map.get(:symbols) + |> Enum.find(&(&1["symbol"] == symbol)) + |> Map.get("filters") + + tick_size = + symbol_filters + |> Enum.find(&(&1["filterType"] == "PRICE_FILTER")) + |> Map.get("tickSize") + + step_size = + symbol_filters + |> Enum.find(&(&1["filterType"] == "LOT_SIZE")) + |> Map.get("stepSize") + + Map.merge( + %{ + tick_size: tick_size, + step_size: step_size + }, + Map.from_struct(db_settings) + ) + end + + def generate_fresh_position(settings, id \\ :os.system_time(:millisecond)) do + %{ + struct(Position, settings) + | id: id, + budget: Decimal.div(settings.budget, settings.chunks), + rebuy_notified: false + } + end + + def update_status(symbol, status) when is_binary(symbol) and is_binary(status) do + @repo.get_by(Settings, symbol: symbol) + |> Ecto.Changeset.change(%{status: status}) + |> @repo.update() + end end diff --git a/apps/naive/lib/naive/supervisor.ex b/apps/naive/lib/naive/supervisor.ex index a4e7a6a..762965c 100644 --- a/apps/naive/lib/naive/supervisor.ex +++ b/apps/naive/lib/naive/supervisor.ex @@ -5,9 +5,9 @@ defmodule Naive.Supervisor do """ use Supervisor - alias Naive.DynamicSymbolSupervisor + alias Naive.DynamicTraderSupervisor - @registry :naive_symbol_supervisors + @registry :naive_traders def start_link(init_arg) do Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__) @@ -16,8 +16,8 @@ defmodule Naive.Supervisor do def init(_init_arg) do children = [ {Registry, [keys: :unique, name: @registry]}, - DynamicSymbolSupervisor, - {Task, fn -> DynamicSymbolSupervisor.autostart_workers() end} + {DynamicTraderSupervisor, []}, + {Task, fn -> DynamicTraderSupervisor.autostart_workers() end} ] Supervisor.init(children, strategy: :rest_for_one) diff --git a/apps/naive/lib/naive/trader.ex b/apps/naive/lib/naive/trader.ex index 3b620a6..7e220ad 100644 --- a/apps/naive/lib/naive/trader.ex +++ b/apps/naive/lib/naive/trader.ex @@ -6,59 +6,80 @@ defmodule Naive.Trader do use GenServer, restart: :temporary alias Core.Struct.TradeEvent + alias Naive.Strategy require Logger @logger Application.compile_env(:core, :logger) @pubsub_client Application.compile_env(:core, :pubsub_client) + @registry :naive_traders defmodule State do @moduledoc """ Trader State """ - @enforce_keys [ - :id, - :symbol, - :budget, - :buy_down_interval, - :profit_interval, - :rebuy_interval, - :rebuy_notified, - :tick_size, - :step_size - ] - defstruct [ - :id, - :symbol, - :budget, - :buy_order, - :sell_order, - :buy_down_interval, - :profit_interval, - :rebuy_interval, - :rebuy_notified, - :tick_size, - :step_size - ] + @enforce_keys [:settings, :positions] + defstruct [:settings, positions: []] end - def start_link(%State{} = state) do - GenServer.start_link(__MODULE__, state) - end - - def init(%State{id: id, symbol: symbol} = state) do + def start_link(symbol) do symbol = String.upcase(symbol) + GenServer.start_link(__MODULE__, symbol, name: via_tuple(symbol)) + end - @logger.info("Initializing a new trader(#{id}) for #{symbol}") + def init(symbol) do + @logger.info("Initializing new trader for #{symbol}") @pubsub_client.subscribe(Core.PubSub, "TRADE_EVENTS:#{symbol}") - {:ok, state} + {:ok, nil, {:continue, {:start_position, symbol}}} + end + + def handle_continue({:start_position, symbol}, _state) do + settings = Strategy.fetch_symbol_settings(symbol) + positions = [Strategy.generate_fresh_position(settings)] + {:noreply, %State{settings: settings, positions: positions}} + end + + def notify(:settings_updated, settings) do + call_trader(settings.symbol, {:update_settings, settings}) + end + + def get_positions(symbol) do + call_trader(symbol, {:get_positions, symbol}) + end + + def handle_call({:update_settings, new_settings}, _, state) do + {:reply, :ok, %{state | settings: new_settings}} + end + + def handle_call({:get_positions, _symbol}, _, state) do + {:reply, state.positions, state} end def handle_info(%TradeEvent{} = trade_event, %State{} = state) do - case Naive.Strategy.execute(trade_event, state) do - {:ok, new_state} -> {:noreply, new_state} - :exit -> {:stop, :normal, state} + case Naive.Strategy.execute(trade_event, state.positions, state.settings) do + {:ok, updated_positions} -> + {:noreply, %{state | positions: updated_positions}} + + :exit -> + {:ok, _settings} = Strategy.update_status(trade_event.symbol, "off") + Logger.info("Trading for #{trade_event.symbol} stopped") + {:stop, :normal, state} + end + end + + defp call_trader(symbol, data) do + case Registry.lookup(@registry, symbol) do + [{pid, _}] -> + GenServer.call(pid, data) + + _ -> + Logger.warning("Unable to locate trader process assigned to #{symbol}") + {:error, :unable_to_locate_trader} end end + + defp via_tuple(symbol) do + {:via, Registry, {@registry, symbol}} + end end diff --git a/apps/naive/test/naive/trader_test.exs b/apps/naive/test/naive/trader_test.exs index 509e640..457f970 100644 --- a/apps/naive/test/naive/trader_test.exs +++ b/apps/naive/test/naive/trader_test.exs @@ -41,7 +41,7 @@ defmodule Naive.TraderTest do end defp dummy_trader_state() do - %Naive.Trader.State{ + %Naive.Strategy.Position{ id: 100_000_000, symbol: "XRPUSDT", budget: "200", diff --git a/apps/naive/test/support/mocks.ex b/apps/naive/test/support/mocks.ex index d8a8753..5953a86 100644 --- a/apps/naive/test/support/mocks.ex +++ b/apps/naive/test/support/mocks.ex @@ -4,7 +4,6 @@ defmodule Naive.Support.Mocks do """ Mox.defmock(Test.BinanceMock, for: BinanceMock) - Mox.defmock(Test.Naive.LeaderMock, for: Naive.Leader) Mox.defmock(Test.LoggerMock, for: Core.Test.Logger) Mox.defmock(Test.PubSubMock, for: Core.Test.PubSub) end diff --git a/config/config.exs b/config/config.exs index ee3238d..f17d5ba 100644 --- a/config/config.exs +++ b/config/config.exs @@ -30,6 +30,7 @@ config :naive, binance_client: BinanceMock, ecto_repos: [Naive.Repo], leader: Naive.Leader, + repo: Naive.Repo, trading: %{ defaults: %{ chunks: 5, diff --git a/config/test.exs b/config/test.exs index 4ca7f04..475191f 100644 --- a/config/test.exs +++ b/config/test.exs @@ -6,4 +6,5 @@ config :core, config :naive, binance_client: Test.BinanceMock, - leader: Test.Naive.LeaderMock + leader: Test.Naive.LeaderMock, + repo: Test.Naive.RepoMock diff --git a/notes/notes.md b/notes/notes.md index 2cb26f1..18a0004 100644 --- a/notes/notes.md +++ b/notes/notes.md @@ -148,4 +148,15 @@ the main idea is to keep the code simply, and only use process when needed. the procces must not be used for code organization. +## Chapter 20 +a pattern that I see a lot is that first we do the "impure part first , that is basically +network calls, going to the database, whatever that implies something impure, and once we got all +the data we go to do the pure part, so we dont have to mock a lot in testing +it comes to my mind that recursion, and mappign and reducing is a building block for +programming in fp, also this about what the guy for Grox Io saids. map reduce convert + +build, convert or transforms, CRC -> construct reduce convert + +is better to not overoptimize, sometime a simple proccess with a tasks can handle the load, do +not optimize ahead of time