forked from BigAndSweet/HIST
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmodel.py
154 lines (123 loc) · 6.43 KB
/
model.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
import torch
import torch.nn as nn
import torch.nn.init as init
import numpy as np
from utils import cal_cos_similarity
class MLP(nn.Module):
def __init__(self, d_feat, hidden_size=512, num_layers=3, dropout=0.0):
super().__init__()
self.mlp = nn.Sequential()
for i in range(num_layers):
if i > 0:
self.mlp.add_module('drop_%d'%i, nn.Dropout(dropout))
self.mlp.add_module('fc_%d'%i, nn.Linear(
360 if i == 0 else hidden_size, hidden_size))
self.mlp.add_module('relu_%d'%i, nn.ReLU())
self.mlp.add_module('fc_out', nn.Linear(hidden_size, 1))
def forward(self, x):
# feature
# [N, F]
return self.mlp(x).squeeze()
class HIST(nn.Module):
def __init__(self, d_feat=6, hidden_size=64, num_layers=2, dropout=0.0, base_model="GRU", K =3):
super().__init__()
self.d_feat = d_feat
self.hidden_size = hidden_size
self.rnn = nn.GRU(
input_size=d_feat,
hidden_size=hidden_size,
num_layers=num_layers,
batch_first=True,
dropout=dropout,
)
self.fc_es = nn.Linear(hidden_size, hidden_size)
torch.nn.init.xavier_uniform_(self.fc_es.weight)
self.fc_is = nn.Linear(hidden_size, hidden_size)
torch.nn.init.xavier_uniform_(self.fc_is.weight)
self.fc_es_fore = nn.Linear(hidden_size, hidden_size)
torch.nn.init.xavier_uniform_(self.fc_es_fore.weight)
self.fc_is_fore = nn.Linear(hidden_size, hidden_size)
torch.nn.init.xavier_uniform_(self.fc_is_fore.weight)
self.fc_es_back = nn.Linear(hidden_size, hidden_size)
torch.nn.init.xavier_uniform_(self.fc_es_back.weight)
self.fc_is_back = nn.Linear(hidden_size, hidden_size)
torch.nn.init.xavier_uniform_(self.fc_is_back.weight)
self.fc_indi = nn.Linear(hidden_size, hidden_size)
torch.nn.init.xavier_uniform_(self.fc_indi.weight)
self.leaky_relu = nn.LeakyReLU()
self.softmax_s2t = torch.nn.Softmax(dim = 0)
self.softmax_t2s = torch.nn.Softmax(dim = 1)
self.fc_out_es = nn.Linear(hidden_size, 1)
self.fc_out_is = nn.Linear(hidden_size, 1)
self.fc_out_indi = nn.Linear(hidden_size, 1)
self.fc_out = nn.Linear(hidden_size, 1)
self.K = K
def cal_cos_similarity(self, x, y): # the 2nd dimension of x and y are the same
xy = x.mm(torch.t(y))
x_norm = torch.sqrt(torch.sum(x*x, dim =1)).reshape(-1, 1)
y_norm = torch.sqrt(torch.sum(y*y, dim =1)).reshape(-1, 1)
cos_similarity = xy/x_norm.mm(torch.t(y_norm))
cos_similarity[cos_similarity != cos_similarity] = 0
return cos_similarity
def forward(self, x, concept_matrix, market_value):
device = torch.device(torch.get_device(x))
x_hidden = x.reshape(len(x), self.d_feat, -1) # [N, F, T]
x_hidden = x_hidden.permute(0, 2, 1) # [N, T, F]
x_hidden, _ = self.rnn(x_hidden)
x_hidden = x_hidden[:, -1, :]
# Predefined Concept Module
market_value_matrix = market_value.reshape(market_value.shape[0], 1).repeat(1, concept_matrix.shape[1])
stock_to_concept = concept_matrix * market_value_matrix
stock_to_concept_sum = torch.sum(stock_to_concept, 0).reshape(1, -1).repeat(stock_to_concept.shape[0], 1)
stock_to_concept_sum = stock_to_concept_sum.mul(concept_matrix)
stock_to_concept_sum = stock_to_concept_sum + (torch.ones(stock_to_concept.shape[0], stock_to_concept.shape[1]).to(device))
stock_to_concept = stock_to_concept / stock_to_concept_sum
hidden = torch.t(stock_to_concept).mm(x_hidden)
hidden = hidden[hidden.sum(1)!=0]
stock_to_concept = x_hidden.mm(torch.t(hidden))
# stock_to_concept = cal_cos_similarity(x_hidden, hidden)
stock_to_concept = self.softmax_s2t(stock_to_concept)
hidden = torch.t(stock_to_concept).mm(x_hidden)
concept_to_stock = cal_cos_similarity(x_hidden, hidden)
concept_to_stock = self.softmax_t2s(concept_to_stock)
e_shared_info = concept_to_stock.mm(hidden)
e_shared_info = self.fc_es(e_shared_info)
e_shared_back = self.fc_es_back(e_shared_info)
output_es = self.fc_es_fore(e_shared_info)
output_es = self.leaky_relu(output_es)
pred_es = self.fc_out_es(output_es).squeeze()
# Hidden Concept Module
i_shared_info = x_hidden - e_shared_back
hidden = i_shared_info
i_stock_to_concept = cal_cos_similarity(i_shared_info, hidden)
dim = i_stock_to_concept.shape[0]
diag = i_stock_to_concept.diagonal(0)
i_stock_to_concept = i_stock_to_concept * (torch.ones(dim, dim) - torch.eye(dim)).to(device)
# row = torch.linspace(0,dim-1,dim).to(device).long()
# column = i_stock_to_concept.argmax(1)
row = torch.linspace(0, dim-1, dim).reshape([-1, 1]).repeat(1, self.K).reshape(1, -1).long().to(device)
column = torch.topk(i_stock_to_concept, self.K, dim = 1)[1].reshape(1, -1)
mask = torch.zeros([i_stock_to_concept.shape[0], i_stock_to_concept.shape[1]], device = i_stock_to_concept.device)
mask[row, column] = 1
i_stock_to_concept = i_stock_to_concept * mask
i_stock_to_concept = i_stock_to_concept + torch.diag_embed((i_stock_to_concept.sum(0)!=0).float()*diag)
hidden = torch.t(i_shared_info).mm(i_stock_to_concept).t()
hidden = hidden[hidden.sum(1)!=0]
i_concept_to_stock = cal_cos_similarity(i_shared_info, hidden)
i_concept_to_stock = self.softmax_t2s(i_concept_to_stock)
i_shared_info = i_concept_to_stock.mm(hidden)
i_shared_info = self.fc_is(i_shared_info)
i_shared_back = self.fc_is_back(i_shared_info)
output_is = self.fc_is_fore(i_shared_info)
output_is = self.leaky_relu(output_is)
pred_is = self.fc_out_is(output_is).squeeze()
# Individual Information Module
individual_info = x_hidden - e_shared_back - i_shared_back
output_indi = individual_info
output_indi = self.fc_indi(output_indi)
output_indi = self.leaky_relu(output_indi)
pred_indi = self.fc_out_indi(output_indi).squeeze()
# Stock Trend Prediction
all_info = output_es + output_is + output_indi
pred_all = self.fc_out(all_info).squeeze()
return pred_all