-
Notifications
You must be signed in to change notification settings - Fork 4
/
data.go
201 lines (162 loc) · 4.05 KB
/
data.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
package market
import (
"encoding/json"
"strconv"
"sync"
"time"
)
//深度数据格式
type Depth [][2]string
//格式化浮点深度数据
//最大格式化float64
func (d Depth) formatFloat(params [][2]float64) Depth {
val := make(Depth, len(params))
for k, v := range params {
val[k][0] = strconv.FormatFloat(v[0], 'g', -1, 64)
val[k][1] = strconv.FormatFloat(v[1], 'g', -1, 64)
}
return val
}
//基础行情结构
type Marketer struct {
Organize Organize `json:"organize"` //交易所
Symbol string `json:"symbol"` //订阅币对
BuyFirst string `json:"buy_first,omitempty"` //买一价格
SellFirst string `json:"sell_first,omitempty"` //卖一价格
BuyDepth Depth `json:"buy_depth,omitempty"` //市场买深度
SellDepth Depth `json:"sell_depth,omitempty"` //市场卖深度
Timestamp time.Duration `json:"timestamp,omitempty"` //数据更新时间(毫秒)
Temporize time.Duration `json:"temporize,omitempty"` //网络延迟(毫秒)
}
//序列化为json
func (m *Marketer) MarshalJson() []byte {
j, _ := json.Marshal(m)
return j
}
//基础的lister类型
//主要为了实现主动查询
type Lister struct {
data map[string]*Marketer
lock sync.RWMutex
}
func newList() *Lister {
return &Lister{
data: make(map[string]*Marketer),
}
}
//序列化为json
func (l *Lister) MarshalJson() string {
l.lock.RLock()
defer l.lock.RUnlock()
j, _ := json.Marshal(l.data)
return string(j)
}
//追加一条数据
//如果数据已经存在, 则更新数据
func (l *Lister) Add(k string, m *Marketer) {
l.lock.Lock()
defer l.lock.Unlock()
l.data[k] = m
}
//删除一条数据
func (l *Lister) Del(k string) {
l.lock.Lock()
defer l.lock.Unlock()
delete(l.data, k)
}
//查找一个或者多个key
//返回一个新的lister结构体
func (l *Lister) Find(s ...string) *Lister {
l.lock.RLock()
defer l.lock.RUnlock()
newL := newList()
for _, k := range s {
if v, ok := l.data[k]; ok {
newL.data[k] = v
}
}
return newL
}
func (l *Lister) ToMap() map[string]*Marketer {
return l.data
}
//lister gc机制
//exs单位是秒. 表示数据过期的时间
//数据过期后删除
func (l *Lister) gc(exs time.Duration) {
t := time.Duration(time.Now().UnixNano() / 1e6)
exs = exs / time.Millisecond
l.lock.Lock()
defer l.lock.Unlock()
for k, v := range l.data {
if (t - v.Timestamp) > exs {
delete(l.data, k)
}
}
}
//交易类型
type MarketType int
//币币交易/现货交易类型
const SpotMarket MarketType = 1
//期货交易/交割交易类型
const FuturesMarket MarketType = 2
//永续交易类型
const WapMarket MarketType = 3
//期权交易类型
const OptionMarket MarketType = 4
//平台常量类型
type Organize string
//火币平台常量
const HuoBi Organize = "huobi"
//okex平台常量
const OkEx Organize = "okex"
//外部订阅时的结构体
type Subscriber struct {
Symbol string
Organize Organize
MarketType MarketType
}
//只允许写入Subscriber channel
//暴露给外部使用
var WriteSubscribing chan<- *Subscriber
//只允许读取Subscriber channel
//不允许外部使用
var readSubscribing <-chan *Subscriber
func init() {
var subscribing = make(chan *Subscriber, 2)
WriteSubscribing = subscribing
readSubscribing = subscribing
}
//只允许读取market channel
type readMarketer <-chan *Marketer
//只允许写入market channel
type writeMarketer struct {
buffer chan<- *Marketer
lock sync.Mutex
}
var readWriteMarketer = make(chan *Marketer, 1000)
//读取暴露给外部使用
var ReadMarketPool readMarketer = readWriteMarketer
//写入数据只能内部使用
var writeMarketPool struct {
writeMarketer
}
func init() {
writeMarketPool.buffer = readWriteMarketer
}
//使用channel对market实现环形数据结构
//超过channel缓存时, 删除过期的值
//主动停止timer, 防止可能的内存泄露
func (w writeMarketer) writeRingBuffer(m *Marketer) {
w.lock.Lock()
defer func() {
w.lock.Unlock()
}()
if len(w.buffer) == cap(w.buffer) {
select {
case <-ReadMarketPool:
default:
}
}
w.buffer <- m
}