-
Notifications
You must be signed in to change notification settings - Fork 0
/
trade_algorithm.py
171 lines (138 loc) · 6.62 KB
/
trade_algorithm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
from talib.abstract import EMA, MACD
import talib.abstract as ta
import talib
import numpy as np
from indicators import mtf_ssl, pvt_with_divergence, adx_di, cross_rsi_line, macd_indicator_line\
, ak_macd_bb, ssl_hybrid, vwap
from decorators import log_errors
class indicators:
def __init__(self):
pass
@log_errors
def cross_ema(self, df, config):
low_span = config['low_span']
long_span = config['long_span']
df['low_df'] = EMA(df['close'], low_span)
df['long_df'] = EMA(df['close'], long_span)
low_line = list(df['low_df'])
long_line = list(df['long_df'])
# logic buy and sell
# buy 100>200
if low_line[-1] > long_line[-1]:
return 2
# sell 100<200
elif low_line[-1] < long_line[-1]:
return -1
return 0
@log_errors
def cross_rsi(self, df, config):
rsi_short_line = talib.RSI(df['close'],config['rsi_short'])
rsi_long_line = talib.RSI(df['close'],config['rsi_long'])
if rsi_short_line.iloc[-1] > rsi_long_line.iloc[-1]:
return 2
elif rsi_short_line.iloc[-1] < rsi_long_line.iloc[-1]:
return -1
return 0
@log_errors
def macd(self, df, config):
macd_line, signal_line, _ = MACD(df['close'], config['fastperiod'], config['slowperiod'], config['signalperiod'])
if macd_line[-1] > signal_line[-1]:
return 2
elif macd_line[-1] < signal_line[-1]:
return -1
return 0
@log_errors
def ichimoku_cloud(self, df, config):
# Tenkan-sen (Conversion Line): (9-period high + 9-period low)/2))
period9_high = df['high'].rolling(window=config['tenkansen_value']).max()
period9_low = df['low'].rolling(window=config['tenkansen_value']).min()
tenkan_sen = (period9_high + period9_low) / 2
# Kijun-sen (Base Line): (26-period high + 26-period low)/2))
period26_high = df['high'].rolling(window=config['kinjunsen_value']).max()
period26_low = df['low'].rolling(window=config['kinjunsen_value']).min()
kijun_sen = (period26_high + period26_low) / 2
# Senkou Span A (Leading Span A): (Conversion Line + Base Line)/2))
senkou_span_a = ((tenkan_sen + kijun_sen) / 2).shift(config['shift_value'])
# Senkou Span B (Leading Span B): (52-period high + 52-period low)/2))
period52_high = df['high'].rolling(window=config['senkou_b_value']).max()
period52_low = df['low'].rolling(window=config['senkou_b_value']).min()
senkou_span_b = ((period52_high + period52_low) / 2).shift(config['shift_value'])
df['cloud_green_line_a'] = senkou_span_a
df['cloud_red_line_b'] = senkou_span_b
# plot img
# plot_save_img(df['cloud_green_line_a'], df['cloud_red_line_b'], namest)
# logic buy and sell
close = df['close'].to_list()[-1]
cloud_green_line_a = df['cloud_green_line_a'].to_list()[-1]
cloud_red_line_b = df['cloud_red_line_b'].to_list()[-1]
close = float(close)
cloud_green_line_a = float(cloud_green_line_a)
cloud_red_line_b = float(cloud_red_line_b)
# buy
if close > cloud_green_line_a and close > cloud_red_line_b and cloud_green_line_a > cloud_red_line_b:
return 2
# sell
elif close < cloud_green_line_a and close < cloud_red_line_b and cloud_green_line_a < cloud_red_line_b:
return -1
return 0
@log_errors
def collect_mtfssl_pvtdiver(self, df, config):
pvtup, pvtdown, pvt_osc = pvt_with_divergence(df, config['low_span'], config['long_span'])
pvt_osc = pvt_osc[~np.isnan(pvt_osc)]
pvtup = pvtup.dropna()
pvtdown = pvtdown.dropna()
sslup, ssldown = mtf_ssl(df, config['length_mtf_ssl'])
sslup = sslup.dropna()
ssldown = ssldown.dropna()
# current close value must better than close value in past
result_distance = 0
if config['day'] == 1:
result_distance = df['close'].iloc[-1] - df['close'].iloc[-2]
# buy
if sslup.to_list()[-1] > ssldown.to_list()[-1] and pvt_osc[-1] > pvtup.to_list()[-1] and result_distance >= 3:
return 2 #False
# sell
elif sslup.to_list()[-1] < ssldown.to_list()[-1] and pvt_osc[-1] < pvtdown.to_list()[-1]:
return -1 #True
else:
config['day'] *= -1
result_distance = df['close'].iloc[-1] - df['close'].iloc[config['day']]
if sslup.to_list()[-1] > ssldown.to_list()[-1] and pvt_osc[-1] > pvtup.to_list()[-1] and result_distance >= 3:
return 2 #False
return 0
@log_errors
def adxdi_crossrsi(self, df, config):
adx, di_plus, di_minus = adx_di(df)
rsi_short_line, rsi_long_line = cross_rsi_line(df, config['rsi_short'], config['rsi_long'])
if adx[-1] > 25 and di_plus[-1] > di_minus[-1] and rsi_short_line.iloc[-1] > rsi_long_line.iloc[-1]:
return 2
elif adx[-1] < 25 and di_plus[-1] < di_minus[-1] and rsi_short_line.iloc[-1] < rsi_long_line.iloc[-1]:
return -1
return 0
@log_errors
def macd_crossrsi(self, df, config):
rsi_short_line, rsi_long_line = cross_rsi_line(df, config['rsi_short'], config['rsi_long'])
macd_line, signal_line = macd_indicator_line(df, config['fastperiod'], config['slowperiod'], config['signalperiod'])
if macd_line[-1] > signal_line[-1] and rsi_short_line.iloc[-1] > rsi_long_line.iloc[-1]:
return 2
elif macd_line[-1] < signal_line[-1] and rsi_short_line.iloc[-1] < rsi_long_line.iloc[-1]:
return -1
return 0
@log_errors
def macd_ssl_vwap(self, df, config):
up, down = ak_macd_bb(df, config['length_akmacdbb'], config['dev'], config['fastlength'], config['slowlength'])
bbmc = ssl_hybrid(df, config['len1_sslhy'], config['type_ssl'])
df = vwap(df, config['n_vwap'])
buyers = df[df["close"] > df["VWAP"]].shape[0] / df.shape[0] * 100
sellers = df[df["close"] < df["VWAP"]].shape[0] / df.shape[0] * 100
if (down.iloc[-1:].values[0] - up.iloc[-1:].values[0]) >= 1.5 and down.iloc[-1:].values[0] >= 1 and (bbmc[-1] - df['close'].iloc[-1:]) >= 2 and buyers > sellers:
return 2
else:
return 0
@log_errors
def process(self, name, df, config):
sum_score = 0
for cf in config:
result = eval(f"self.{name}")(df, cf)
sum_score+=result
return sum_score