-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
211 lines (170 loc) · 8.24 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
# version 3
from trade_algorithm import indicators
from tvDatafeed import TvDatafeed
import pandas as pd
import os
import datetime
import numpy as np
from sklearn import preprocessing
import pythainav as nav
from binance.client import Client
from utils import get_data, load_data, read_config
pd.set_option('display.max_rows', 30)
pd.set_option('display.max_columns', 30)
pd.set_option('display.width', 1000)
tv = TvDatafeed(username=None,password=None)
# list stock config
json_stock = read_config(os.path.join(os.getcwd(),'config','list_stock','stock_config.json'))
assert json_stock is not None, "error read stock config"
# indicator config for long term
indicator_config_long = read_config(os.path.join(os.getcwd(),'config','indicator','long_indicator.json'))
assert indicator_config_long is not None, "error read indicator config (long term)"
# indicator config for short term
indicator_config_short = read_config(os.path.join(os.getcwd(),'config','indicator','short_indicator.json'))
assert indicator_config_short is not None, "error read indicator config (short term)"
# config
config = read_config(os.path.join(os.getcwd(),'config','config.json'))
assert config is not None, "error read config"
# find current date
current_date = datetime.datetime.today().date()
indicator_engine = indicators()
# prepare config
mode = config['mode']
max_days = config[mode]['len_data']
max_days = max(max_days)
for key_exc in json_stock[f'list_{mode}']:
print(f'exchange: {key_exc}')
dict_min_value_1 = {}
dict_min_value_2 = {}
dict_remaining_date = {}
dict_pair_daywithscore = {
"stock_score":[],
"min_date":[]
}
dict_stock_name_score = {}
for namest2dict in json_stock[f'list_{mode}'][key_exc]:
dict_stock_name_score.update({namest2dict:0})
dict_remaining_date.update({namest2dict:{}})
for t in config[mode]['len_data']:
dict_remaining_date[namest2dict].update({f"{t}":0})
for namest in json_stock[f'list_{mode}'][key_exc]:
if config['fetch_newdata'] == "on":
try:
data = get_data(tv, nav, Client, key_exc, namest, max_days, mode, config[mode])
if len(data) == 0:
continue
except Exception as e:
print(f'stock name: {namest}')
print(f'error: {e}')
continue
else:
data = load_data(namest, mode, key_exc, max_days)
score = 0
# first indicators
for indicator in config[mode]['long_indicators']:
# print('indicator:',indicator)
score += indicator_engine.process(f"{indicator}", data, indicator_config_long[f'{indicator}'])
# print('score:',score)
dict_stock_name_score[namest] = score
for t in config[mode]['len_data']:
data_follow_time = load_data(namest, mode, key_exc, t)
# find min value in dataframe
data_cal_number_days = data_follow_time[data_follow_time['close'] == min(list(data_follow_time['close']))]
# convert timestamp to datetime.date
if len(data_cal_number_days) > 1:
data_cal_number_days = data_cal_number_days.iloc[-1,:]
date_min_value = data_cal_number_days['datetime'].to_pydatetime().date()
else:
# convert numpy.datetime64 to datetime
date_min_value = pd.to_datetime(data_cal_number_days['datetime'].values[0]).to_pydatetime().date()
# date_min_value = data_cal_number_days['datetime'].to_pydatetime().date()
remaining_date = current_date - date_min_value
df_min_value = data_follow_time[data_follow_time['close'] == min(list(data_follow_time['close']))]
df_current_value = data_follow_time.iloc[-2:-1,:]
if t == 62:
dict_remaining_date[namest][f'{t}'] = remaining_date.days
dict_min_value_1[namest] = float(df_current_value['close'].values[0]) - float(min(list(data_follow_time['close'])))
else:
dict_remaining_date[namest][f'{t}'] = remaining_date.days
dict_min_value_2[namest] = float(df_current_value['close'].values[0]) - float(min(list(data_follow_time['close'])))
# sort data by current close value minus the smallest a value in the past
dict_min_value_1_sort = dict(sorted(dict_min_value_1.items(), key=lambda item: item[1], reverse=True))
dict_min_value_2_sort = dict(sorted(dict_min_value_2.items(), key=lambda item: item[1], reverse=True))
# increse score following by date
dict_prepare_sort = {}
score_follow_time = 0.5
for data_v in config[mode]['len_data']:
for key,_ in dict_remaining_date.items():
dict_prepare_sort[key] = dict_remaining_date[key][f"{data_v}"]
dict_remaining_date_sort = dict(sorted(dict_prepare_sort.items(), key=lambda item: item[1],reverse=True))
list_score = [i+score_follow_time for i in range(10)]
reverse_date_tomin_first = dict(sorted(dict_remaining_date_sort.items(), key=lambda item: item[1]))
print(f'List stock name of the lowest close value in the {data_v} days')
for k,v in reverse_date_tomin_first.items():
if int(v) <= 10:
dict_pair_daywithscore['min_date'].append(k)
print(k,v)
print('stock min day:',dict_pair_daywithscore['min_date'])
print('-'*100)
for add_score, key_score in zip(list_score,list(dict_remaining_date_sort.keys())[:10]):
dict_stock_name_score[key_score] = dict_stock_name_score[key_score] + add_score
score_follow_time+=0.5
# loop increase score
for dict_sort_value in [dict_min_value_1_sort, dict_min_value_2_sort]:
# print('dict_sort_value:',dict_sort_value)
# normalize score
list_score = [i*1000 for i in range(1,len(dict_sort_value)+1)]
np_score = np.array(list_score)
normalized_arr = preprocessing.normalize([np_score])
# print('normalize:',normalized_arr.tolist()[0])
dict_score = {}
for score,(k,v) in zip(normalized_arr.tolist()[0],dict_sort_value.items()):
dict_score.update({k:score})
# print('dict_score:',dict_score)
for k,v in dict_score.items():
dict_stock_name_score[k] = dict_stock_name_score[k] + dict_score[k]
# print('score:',dict_stock_name_score)
dict_stock_name_score_sort = dict(sorted(dict_stock_name_score.items(), key=lambda item: item[1], reverse=True))
# print('dict_stock_name_score_sort final:',dict_stock_name_score_sort)
print(f'days: {max_days}')
print('-'*100)
int_check = 0
for k,v in dict_stock_name_score_sort.items():
dict_pair_daywithscore['stock_score'].append(k)
print(k,v)
int_check+=1
if int_check >=10:
break
print('-'*100)
# pair
print(f'The name of the stock that has reached the lowest price in {config[mode]["len_data"]} days')
dict_pair_daywithscore['stock_score'] = dict_pair_daywithscore['stock_score'][:10]
for stock_sc in dict_pair_daywithscore['stock_score']:
if stock_sc in dict_pair_daywithscore['min_date']:
# list_date_less_than_10.append(stock_sc)
print(stock_sc)
# testing
# trend up
dict_stock_trend_up = {}
for tup in dict_pair_daywithscore['min_date']:
dict_stock_trend_up.update({tup:0})
for tup in dict_pair_daywithscore['min_date']:
try:
# data = get_data(tv, key_exc, tup, days)
# data = data.reset_index()
data = load_data(namest, mode, key_exc, t)
except Exception as e:
print(f'stock name: {tup}')
print(f'error: {e}')
continue
score = 0
# first indicators
for indicator in config[mode]['short_indicators']:
# print('indicator:',indicator)
score += indicator_engine.process(f"{indicator}", data, indicator_config_short[f'{indicator}'])
dict_stock_trend_up[tup] = score
dict_stock_trend_up_sort = dict(sorted(dict_stock_trend_up.items(), key=lambda item: item[1], reverse=True))
print('the name of stock is trend up')
for k,v in dict_stock_trend_up_sort.items():
print(k,v)
print()