-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathDataStore.py
264 lines (238 loc) · 9.72 KB
/
DataStore.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
import asyncio
import datetime
import importlib
from fastapi import HTTPException
from RequestObject import AddExRequest, SetExSingalRequest, SetExStatusRequest, SetMartinRequest, SetTrendRequest
import StatisticsDao
from typing import Dict, Iterable, List
import schedule
from Controller import Controller
import ExchangeDao
from OrderInfoDao import OrderInfoDB, OrderInfoDB_Insert, OrderInfoDB_Query_All, OrderInfoDB_Update
from StrategyOrderDao import StrategyOrder
import StrategyOrderDao
from sdk.BinanceSdk import BinanceSdk
from sdk.BitgetSdk import BitgetSdk
from sdk.NexoSdk import NexoSdk
from sdk.OkxSdk import OkxSdk
from sdk.OrderClass import AccountInfo, SwapPostion
import aiomysql
import json
import os
import sys
from log import logger
ex_list: List[ExchangeDao.ExchangeDb] = []
controller_list: Dict[int, Controller] = {}
spot_positions: Dict[int, List[AccountInfo]] = {}
swap_positions: Dict[int, List[SwapPostion]] = {}
swap_account: Dict[int, AccountInfo] = {}
spot_account:Dict[int,AccountInfo]={}
strategy_orders:List[StrategyOrder]=[]
order_info:Dict[int,List[OrderInfoDB]]={}
json_conf:dict={}
async def create_pool():
global db_pool
db_pool= await aiomysql.create_pool(
host=json_conf['DB']['DB_HOST'],
port=json_conf['DB']['DB_PORT'],
user=json_conf['DB']['DB_USER'],
password=json_conf['DB']['DB_PASS'],
db=json_conf['DB']['DB_NAME'],
autocommit=True,
minsize=5,
maxsize=10,
echo=True,
pool_recycle=3,
loop=asyncio.get_event_loop()
)
def write_json_conf():
current_dir = os.path.dirname(os.path.realpath(__file__))
conf_file_path = os.path.join(current_dir, 'conf.json')
with open(conf_file_path, "w", encoding="utf-8") as json_file:
json.dump(json_conf, json_file,ensure_ascii=False, indent=4)
async def add_ex(data:AddExRequest):
try:
name = f"{data.ex.lower().capitalize()}Controller"
module = importlib.import_module(name)
class_ = getattr(module, name)
db=ExchangeDao.ExchangeDb(ex=data.ex,account=data.account,apikey=data.apikey,api_secret=data.api_secret,api_password=data.api_password)
await ExchangeDao.insert(db)
instance = class_(db)
ex_list.append(db)
controller_list[db.id] = instance
spot_positions[db.id]=[]
swap_positions[db.id]=[]
swap_account[db.id]=AccountInfo()
spot_account[db.id]=AccountInfo()
order_info[db.id]=[]
await instance.init()
return db
except Exception as e:
from HttpListener import Status
raise HTTPException(status_code=Status.ParamsError.value,
detail=f"Error: Class {data.ex} not found in module")
async def set_ex_status(data:SetExStatusRequest):
try:
if data.id in controller_list:
controller_list[data.id].cancel_job()
del controller_list[data.id]
del spot_positions[data.id]
del swap_positions[data.id]
del swap_account[data.id]
del spot_account[data.id]
del order_info[data.id]
for i in ex_list:
if i.id==data.id:
ex_list.remove(i)
break
if data.status==2:
return await ExchangeDao.del_physical(data.id)
elif data.status==1:
return await ExchangeDao.delete_soft(data.id)
else:
try:
all=ExchangeDao.exchange_db_query(True)
dbfind=[a for a in all if a.id==data.id]
if len(db)>0:
db=dbfind[0]
ExchangeDao.restore(db.id)
name = f"{db.ex.lower().capitalize()}Controller"
module = importlib.import_module(name)
class_ = getattr(module, name)
instance = class_(db)
ex_list.append(db)
controller_list[db.id] = instance
spot_positions[db.id]=[]
swap_positions[db.id]=[]
swap_account[db.id]=AccountInfo()
spot_account[db.id]=AccountInfo()
order_info[db.id]=[]
await instance.init()
return db
else:
from HttpListener import Status
raise HTTPException(status_code=Status.ParamsError.value,
detail=f"exchange_info表里查不到相关id={data.id}的信息")
except Exception as e:
from HttpListener import Status
raise HTTPException(status_code=Status.ParamsError.value,
detail=f"恢复交易所状态错误 err={e}")
except Exception as e:
from HttpListener import Status
raise HTTPException(status_code=Status.ParamsError.value,
detail=f"设置交易所状态信息错误 err={e}")
async def set_ex_tvsingal(data:SetExSingalRequest):
result=await ExchangeDao.set_tv_singal(data.id,data.no_open,data.no_close,data.no_move_asset)
if result and data.id in controller_list:
controller_list[data.id].exdata.no_close=data.no_close
controller_list[data.id].exdata.no_open=data.no_open
controller_list[data.id].exdata.no_move_asset=data.no_move_asset
return result
def update_martin_setting(data:SetMartinRequest):
if data.use_ratio:
json_conf['Martin']['HUOXING_INVEST_USE_RATIO']=data.use_ratio
import math
json_conf['Martin']['HUOXING_RATIO_INVEST']=math.fabs(data.ratio)
else:
if len(data.fixed)==4:
json_conf['Martin']['HUOXING_FIXED_INVERST']=data.fixed
json_conf['Martin']['HUOXING_INVEST_USE_RATIO']=data.use_ratio
else:
logger.error('设置马丁的固定投入金额数组长度不为4')
from HttpListener import Status
raise HTTPException(status_code=Status.ParamsError.value,
detail='设置马丁的固定投入金额数组长度不为4')
json_conf['Martin']['MAX_HUOXING_COUNT']=data.max_count
json_conf['Martin']['HUOXING_EXCEPT_NUM']=data.except_num
write_json_conf()
def update_trend_setting(data:SetTrendRequest):
json_conf['Trend']['TREND_RATIO_INVEST'] =data.use_ratio
json_conf['Trend']['TREND_TP_RATIO'] =data.tp
json_conf['Trend']['TREND_SL_RATIO'] =data.sl
if data.use_ratio:
json_conf['Trend']['TREND_RATIO_INVEST'] =data.num
else:
json_conf['Trend']['TREND_FIXED_INVEST'] =data.num
write_json_conf()
async def init():
global ex_list,order_info,json_conf
current_dir = os.path.dirname(os.path.realpath(__file__))
conf_file_path = os.path.join(current_dir, 'conf.json')
try:
with open(conf_file_path, 'r', encoding="utf-8") as file:
json_conf = json.load(file)
except Exception as e:
print(f"读取配置文件错误,程序退出 err={e}")
sys.exit(1)
await create_pool()
ex_list = await ExchangeDao.exchange_db_query()
order_info_list = await OrderInfoDB_Query_All()
for i in ex_list:
order_info[i.id]=[]
swap_account[i.id]=AccountInfo()
spot_account[i.id]=AccountInfo()
spot_positions[i.id]=[]
swap_positions[i.id]=[]
for info in order_info_list:
order_info[info.exId].append(info)
schedule.every().hour.at(":00").do(lambda:asyncio.create_task(every20minTask()))
schedule.every().hour.at(":20").do(lambda:asyncio.create_task(every20minTask()))
schedule.every().hour.at(":40").do(lambda:asyncio.create_task(every20minTask()))
schedule.every(2).hours.do(lambda:asyncio.create_task(OkxSdk.request_baseinfo()))
schedule.every(2).hours.do(lambda: asyncio.create_task(BinanceSdk.request_baseinfo()))
schedule.every(2).hours.do(lambda: asyncio.create_task(BitgetSdk.request_baseinfo()))
await asyncio.gather(
BinanceSdk.request_baseinfo(),
BitgetSdk.request_baseinfo(),
OkxSdk.request_baseinfo()
)
async def every20minTask():
l=[]
today = datetime.datetime.now()
if today.second!=0 or today.minute%20!=0:
return
total=0
for id,acc in swap_account.items():
db=StatisticsDao.TradingStatistics()
db.datetime=today
db.money=acc.total
if id in spot_account:
db.money+=spot_account[id].unrealizedPL+spot_account[id].total+spot_account[id].funding
db.exId=id
l.append(db)
total+=db.money
db=StatisticsDao.TradingStatistics()
db.money=total
db.exId=0
db.datetime=today
l.append(db)
await StatisticsDao.insert(l)
def getController(id: int):
return controller_list.get(id, None)
def insertStrategyOrder(so:StrategyOrderDao):
strategy_orders.append(so)
def delStrategyOrderById(id:int):
result=next(filter(lambda item: item.id == id, strategy_orders), None)
if result:
result.deleted=True
strategy_orders.remove(result)
StrategyOrderDao.update_strategy_order(result)
async def insert_orderinfo(data: OrderInfoDB):
await OrderInfoDB_Insert(data)
order_info[data.exId].append(data)
async def update_orderinfo(data):
if isinstance(data, OrderInfoDB):
await OrderInfoDB_Update(data)
elif isinstance(data, Iterable):
for d in data:
await OrderInfoDB_Update(d)
async def del_orderinfo(data):
if isinstance(data, OrderInfoDB):
data.delete = True
order_info[data.exId].remove(data)
await OrderInfoDB_Update(data)
elif isinstance(data, Iterable):
for d in data:
d.delete = True
order_info[d.exId].remove(d)
await OrderInfoDB_Update(d)