-
Notifications
You must be signed in to change notification settings - Fork 4
/
okex.go
162 lines (143 loc) · 3.95 KB
/
okex.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
package market
import (
"context"
"encoding/json"
"errors"
"github.com/gorilla/websocket"
"log"
"strings"
"time"
)
const okexUrl = "wss://real.OKEx.com:8443/ws/v3"
//ws连接超时时间
//超过这个时间 服务器没有ping或者pong 将断开重连
const okexPingCheck int64 = 5
const okexWsPingTimeout int64 = 10
//记录okex服务器最后pong时间
type okexHandler struct {
pongLastTime int64
}
//创建一个okex
//该流程中没有创建ws连接
func newOkEx(ctx context.Context) *Worker {
return &Worker{
ctx: ctx,
wsUrl: okexUrl,
handler: &okexHandler{
pongLastTime: time.Now().Unix(),
},
Organize: OkEx,
Status: runIng,
Subscribes: make(map[string][]byte),
Subscribing: make(map[string][]byte),
LastRunTimestamp: time.Duration(time.Now().UnixNano() / 1e6),
WsConn: nil,
List: newList(),
}
}
//对订阅数据进行格式化
func (h *okexHandler) formatSubscribeHandle(s *Subscriber) (b []byte) {
switch s.MarketType {
case SpotMarket:
b = []byte(`{"op": "subscribe", "args": ["spot/depth5:` + s.Symbol + `"]}`)
case FuturesMarket:
case OptionMarket:
case WapMarket:
}
return
}
//ping pong检测
//超过规定时间, okex服务器没有返回pong 就断开了连接
//满足pong后 向okex服务器发出ping请求
func (h *okexHandler) pingPongHandle(w *Worker) {
for {
select {
case <-time.NewTimer(time.Second * time.Duration(okexPingCheck)).C:
if (time.Now().Unix() - h.pongLastTime) > okexWsPingTimeout {
log.Printf("%s pingpong断线", OkEx)
w.closeRedialSub()
} else {
w.writeMessage(websocket.TextMessage, []byte("ping"))
}
}
}
}
//对okex返回数据进行格式化
//目前只处理二进制数据, okex返回其他数据不处理
func (h *okexHandler) formatMsgHandle(msgType int, msg []byte, w *Worker) (*Marketer, error) {
switch msgType {
case websocket.BinaryMessage:
msg, err := decode(msg)
if err != nil {
return nil, err
}
market, err := h.marketerMsg(msg)
if err == nil {
return market, err
}
h.pongMsg(msg)
h.subscribed(msg, w)
return nil, nil
default:
return nil, nil
}
}
//okex josn结构体
type okexProvider struct {
Table string `json:"table"` //订阅类型和深度
Data []struct {
Asks Depth `json:"asks"` //卖方深度
Bids Depth `json:"bids"` //买方深度
InstrumentId string `json:"instrument_id"` //合约或者币对
Timestamp time.Time `json:"timestamp"` //数据时间戳(毫秒)
} `json:"data"`
}
//解析json数据
//判断是否是深度数据
func (h *okexHandler) marketerMsg(msg []byte) (*Marketer, error) {
okexData := &okexProvider{}
err := json.Unmarshal(msg, okexData)
if err != nil {
return nil, err
}
if okexData.Table == "" {
return nil, errors.New("序列化市场深度错误")
}
//okex重连以后, 不会主动pong
h.pongLastTime = time.Now().Unix()
return h.newMarketer(okexData)
}
//将深度数据转换成统一的行情数据
func (h *okexHandler) newMarketer(p *okexProvider) (*Marketer, error) {
timestamp := time.Duration(p.Data[0].Timestamp.UnixNano() / 1e6)
return &Marketer{
Organize: OkEx,
Symbol: p.Data[0].InstrumentId,
BuyFirst: p.Data[0].Bids[0][0],
SellFirst: p.Data[0].Asks[0][0],
BuyDepth: p.Data[0].Bids,
SellDepth: p.Data[0].Asks,
Timestamp: timestamp,
Temporize: time.Duration(time.Now().UnixNano()/1e6) - timestamp,
}, nil
}
//验证是否是pong数据
func (h *okexHandler) pongMsg(msg []byte) {
if string(msg) == "pong" {
h.pongLastTime = time.Now().Unix()
}
}
//订阅消息结构体
type okexSubscriber struct {
Event string `json:"event"`
Channel string `json:"channel"`
}
//验证是否是订阅成功消息
//订阅成功后处理数据
func (h *okexHandler) subscribed(msg []byte, w *Worker) {
subscribe := &okexSubscriber{}
json.Unmarshal(msg, subscribe)
if subscribe.Event == "subscribe" {
w.subscribed(strings.Split(subscribe.Channel, ":")[1])
}
}