From a4f8e9aae9303d35bfa122bbff0a0996d4a36943 Mon Sep 17 00:00:00 2001 From: Artem Date: Wed, 5 May 2021 17:03:37 +0300 Subject: [PATCH] Linter. Fix: open orders updates --- .golangci.yml | 11 ++ examples/auth/main.go | 5 +- examples/trades/main.go | 11 +- rest/methods_test.go | 26 ++-- rest/private_methods.go | 8 +- rest/private_methods_test.go | 24 ++-- rest/responses.go | 2 +- websocket/auth.go | 22 +++- websocket/client.go | 110 ++++++++-------- websocket/client_test.go | 247 ++++++++++++----------------------- websocket/consts.go | 17 +-- websocket/factories.go | 82 +++++++----- websocket/factories_test.go | 16 +-- websocket/messages.go | 28 ++-- websocket/options.go | 13 ++ websocket/parameters.go | 36 +++-- websocket/parse_utils.go | 6 +- websocket/transport.go | 24 ++-- 18 files changed, 337 insertions(+), 351 deletions(-) create mode 100644 .golangci.yml create mode 100644 websocket/options.go diff --git a/.golangci.yml b/.golangci.yml new file mode 100644 index 0000000..462df10 --- /dev/null +++ b/.golangci.yml @@ -0,0 +1,11 @@ +linters: + enable: + - bodyclose + - goconst + - gocritic + - gofmt + - interfacer + - maligned + - prealloc + - unconvert + - unparam \ No newline at end of file diff --git a/examples/auth/main.go b/examples/auth/main.go index 93c45cb..2b346bc 100644 --- a/examples/auth/main.go +++ b/examples/auth/main.go @@ -7,7 +7,10 @@ import ( ) func main() { - c := ws.NewAuth("", "") + c := ws.NewAuth( + "", "", + ws.WithParams(ws.NewDefaultSandboxAuthParameters()), + ) if err := c.Connect(); err != nil { log.Fatal("Error connecting to web socket : ", err) diff --git a/examples/trades/main.go b/examples/trades/main.go index 9188e48..b0a34a2 100644 --- a/examples/trades/main.go +++ b/examples/trades/main.go @@ -9,24 +9,23 @@ import ( func main() { c := ws.New() - err := c.Connect() - if err != nil { + + if err := c.Connect(); err != nil { log.Fatal("Error connecting to web socket : ", err) } pairs := []string{ws.BTCUSD} // subscribe to BTCUSD trades - err = c.SubscribeTrades(pairs) - if err != nil { + if err := c.SubscribeTrades(pairs); err != nil { log.Fatal(err) } go func() { time.Sleep(time.Second * 30) log.Print("Unsubsribing...") - err = c.Unsubscribe(ws.ChanTrades, pairs) - if err != nil { + + if err := c.Unsubscribe(ws.ChanTrades, pairs); err != nil { log.Fatal(err) } log.Print("Success!") diff --git a/rest/methods_test.go b/rest/methods_test.go index 7dc69e9..92b869f 100644 --- a/rest/methods_test.go +++ b/rest/methods_test.go @@ -107,7 +107,7 @@ func TestKraken_Assets(t *testing.T) { assets: nil, }, want: map[string]Asset{ - "ADA": Asset{ + "ADA": { AlternateName: "ADA", AssetClass: "currency", Decimals: 8, @@ -127,7 +127,7 @@ func TestKraken_Assets(t *testing.T) { assets: []string{"ADA"}, }, want: map[string]Asset{ - "ADA": Asset{ + "ADA": { AlternateName: "ADA", AssetClass: "currency", Decimals: 8, @@ -190,7 +190,7 @@ func TestKraken_AssetPairs(t *testing.T) { pairs: []string{"ADACAD"}, }, want: map[string]AssetPair{ - "ADACAD": AssetPair{ + "ADACAD": { Altname: "ADACAD", WSName: "ADA/CAD", AssetClassBase: "currency", @@ -222,7 +222,7 @@ func TestKraken_AssetPairs(t *testing.T) { pairs: nil, }, want: map[string]AssetPair{ - "ADACAD": AssetPair{ + "ADACAD": { Altname: "ADACAD", WSName: "ADA/CAD", AssetClassBase: "currency", @@ -307,7 +307,7 @@ func TestKraken_Ticker(t *testing.T) { pairs: []string{"ADACAD"}, }, want: map[string]Ticker{ - "ADACAD": Ticker{ + "ADACAD": { Ask: Level{ Price: 0.108312, WholeLotVolume: 6418., @@ -373,7 +373,7 @@ func TestKraken_Candles(t *testing.T) { response := OHLCResponse{ Last: 1554222360, Candles: map[string][]Candle{ - "ADACAD": []Candle{{ + "ADACAD": {{ Time: 1554179640, Open: 0.0005000, High: 0.0005000, @@ -496,26 +496,26 @@ func TestKraken_GetOrderBook(t *testing.T) { depth: 2, }, want: map[string]OrderBook{ - "ADACAD": OrderBook{ + "ADACAD": { Asks: []OrderBookItem{ - OrderBookItem{ + { Price: 0.109441, Volume: 6741.072, Timestamp: 1554223624, }, - OrderBookItem{ + { Price: 0.109442, Volume: 4950.724, Timestamp: 1554223614, }, }, Bids: []OrderBookItem{ - OrderBookItem{ + { Price: 0.090494, Volume: 2789.652, Timestamp: 1554223622, }, - OrderBookItem{ + { Price: 0.090493, Volume: 6379.886, Timestamp: 1554223620, @@ -584,7 +584,7 @@ func TestKraken_GetTrades(t *testing.T) { want: TradeResponse{ Last: "1554221914617956627", ADACAD: []Trade{ - Trade{ + { Price: 0.093280, Volume: 2968.26413227, Time: 1553959154.2509, @@ -655,7 +655,7 @@ func TestKraken_GetSpread(t *testing.T) { want: SpreadResponse{ Last: 1554224725, ADACAD: []Spread{ - Spread{ + { Time: 1554224145, Ask: 0.109331, Bid: 0.091118, diff --git a/rest/private_methods.go b/rest/private_methods.go index bd9abc0..171bbf8 100644 --- a/rest/private_methods.go +++ b/rest/private_methods.go @@ -80,11 +80,13 @@ func (api *Kraken) QueryOrders(needTrades bool, userRef string, txIDs ...string) if userRef != "" { data.Set("userref", userRef) } - if len(txIDs) > 50 { + + switch { + case len(txIDs) > 50: return nil, fmt.Errorf("Maximum count of requested orders is 50") - } else if len(txIDs) == 0 { + case len(txIDs) == 0: return nil, fmt.Errorf("txIDs is required") - } else { + default: data.Set("txid", strings.Join(txIDs, ",")) } diff --git a/rest/private_methods_test.go b/rest/private_methods_test.go index 98346f2..c3e4b41 100644 --- a/rest/private_methods_test.go +++ b/rest/private_methods_test.go @@ -46,7 +46,7 @@ func TestKraken_GetDepositMethods(t *testing.T) { StatusCode: 200, Body: ioutil.NopCloser(bytes.NewReader(depositMethodsJSON)), }, - want: []DepositMethods{DepositMethods{Method: "Ether (Hex)", Limit: false, Fee: "0.0000000000", GenAddress: true}}, + want: []DepositMethods{{Method: "Ether (Hex)", Limit: false, Fee: "0.0000000000", GenAddress: true}}, wantErr: false, }, } @@ -91,7 +91,7 @@ func TestKraken_GetDepositStatuses(t *testing.T) { StatusCode: 200, Body: ioutil.NopCloser(bytes.NewReader(depositStatusesJSON)), }, - want: []DepositStatuses{DepositStatuses{Method: "Ether (Hex)", Aclass: "currency", Asset: "XETH", Refid: "sometest1", + want: []DepositStatuses{{Method: "Ether (Hex)", Aclass: "currency", Asset: "XETH", Refid: "sometest1", Txid: "sometest2", Info: "sometest3", Amount: "6.91", Fee: "0.0000000000", Time: 1617014556, Status: "Success"}, }, wantErr: false, @@ -244,7 +244,7 @@ func TestKraken_GetOpenOrders(t *testing.T) { }, want: OpenOrdersResponse{ Orders: map[string]OrderInfo{ - "OR3XZM-5EN2R-LS5X51": OrderInfo{ + "OR3XZM-5EN2R-LS5X51": { RefID: nil, UserRef: nil, Status: "open", @@ -320,7 +320,7 @@ func TestKraken_GetClosedOrders(t *testing.T) { want: ClosedOrdersResponse{ Count: 20, Orders: map[string]OrderInfo{ - "OK46ER-A2BXK-YOLKE1": OrderInfo{ + "OK46ER-A2BXK-YOLKE1": { RefID: nil, UserRef: nil, Status: "canceled", @@ -396,7 +396,7 @@ func TestKraken_QueryOrders(t *testing.T) { Body: ioutil.NopCloser(bytes.NewReader(queryOrdersJSON)), }, want: map[string]OrderInfo{ - "OLNYE1-H3BBJ-JD2LGC": OrderInfo{ + "OLNYE1-H3BBJ-JD2LGC": { RefID: nil, UserRef: nil, Status: "canceled", @@ -473,7 +473,7 @@ func TestKraken_GetTradesHistory(t *testing.T) { want: TradesHistoryResponse{ Count: 1, Trades: map[string]PrivateTrade{ - "TO3MMA-BSBGV-XUV4A1": PrivateTrade{ + "TO3MMA-BSBGV-XUV4A1": { OrderID: "OSQQQ5-MBKL6-O4YYE1", PositionID: "TYE7IH-QCG76-BVMCM1", Pair: "XXBTZUSD", @@ -534,7 +534,7 @@ func TestKraken_QueryTrades(t *testing.T) { Body: ioutil.NopCloser(bytes.NewReader(queryTradesJSON)), }, want: map[string]PrivateTrade{ - "TO3MMA-BSBGV-XUV4A1": PrivateTrade{ + "TO3MMA-BSBGV-XUV4A1": { OrderID: "OSQQQ5-MBKL6-O4YYE1", PositionID: "TYE7IH-QCG76-BVMCM1", Pair: "XXBTZUSD", @@ -594,7 +594,7 @@ func TestKraken_GetOpenPositions(t *testing.T) { Body: ioutil.NopCloser(bytes.NewReader(openPositionsJSON)), }, want: map[string]Position{ - "TYE7IH-QCG76-BVMCM1": Position{ + "TYE7IH-QCG76-BVMCM1": { OrderID: "OK7SOC-SGF3O-F54S51", Status: "open", Pair: "XXBTZUSD", @@ -658,7 +658,7 @@ func TestKraken_GetLedgersInfo(t *testing.T) { }, want: LedgerInfoResponse{ Ledgers: map[string]Ledger{ - "LGPNZQ-2SLSA-C7QCT1": Ledger{ + "LGPNZQ-2SLSA-C7QCT1": { RefID: "TI2NBU-IICD2-BAVYO1", Time: 1570623111.9096, LedgerType: "rollover", @@ -715,7 +715,7 @@ func TestKraken_QueryLedgers(t *testing.T) { Body: ioutil.NopCloser(bytes.NewReader(queryLedgerJSON)), }, want: map[string]Ledger{ - "LTCH4T-LG5FS-MKGVD1": Ledger{ + "LTCH4T-LG5FS-MKGVD1": { RefID: "TYE7IH-QCG76-BVMCM1", Time: 1570551111.2568, LedgerType: "rollover", @@ -774,7 +774,7 @@ func TestKraken_GetTradeVolume(t *testing.T) { Currency: "ZUSD", Volume: 1000, Fees: map[string]Fees{ - "XXBTZUSD": Fees{ + "XXBTZUSD": { Fee: 0.16, MinFee: 0.1, MaxFee: 0.26, @@ -784,7 +784,7 @@ func TestKraken_GetTradeVolume(t *testing.T) { }, }, FeesMaker: map[string]Fees{ - "XXBTZUSD": Fees{ + "XXBTZUSD": { Fee: 0.06, MinFee: 0, MaxFee: 0.16, diff --git a/rest/responses.go b/rest/responses.go index bcfe788..72c3eb2 100644 --- a/rest/responses.go +++ b/rest/responses.go @@ -651,8 +651,8 @@ type TradesHistoryResponse struct { // DepositMethods - respons on GetDepositMethods request type DepositMethods struct { Method string `json:"method"` - Limit bool `json:"limit"` Fee string `json:"fee"` + Limit bool `json:"limit"` GenAddress bool `json:"gen-address"` } diff --git a/websocket/auth.go b/websocket/auth.go index 0dac723..7e1df92 100644 --- a/websocket/auth.go +++ b/websocket/auth.go @@ -17,26 +17,20 @@ type AuthClient struct { } // NewAuth - constructor for AuthClient -func NewAuth(key, secret string) *AuthClient { +func NewAuth(key, secret string, opts ...AuthOption) *AuthClient { api := rest.New(key, secret) data, err := api.GetWebSocketsToken() if err != nil { panic(err) } - params := NewDefaultAuthParameters() c := &AuthClient{ Client: &Client{ - asyncFactory: &websocketAsynchronousFactory{ - parameters: params, - }, isConnected: false, - parameters: params, listener: make(chan interface{}), terminal: false, shutdown: make(chan struct{}), asynchronous: nil, - heartbeat: time.Now().Add(params.HeartbeatTimeout), hbChannel: make(chan error), subscriptions: make(map[int64]*SubscriptionStatus), factories: make(map[string]ParseFactory), @@ -45,6 +39,20 @@ func NewAuth(key, secret string) *AuthClient { token: data.Token, tokenExpiresTimer: time.NewTimer(time.Duration(data.Expires)), } + + for i := range opts { + opts[i](c) + } + + if c.parameters == nil { + c.parameters = NewDefaultAuthParameters() + } + + c.asyncFactory = &websocketAsynchronousFactory{ + parameters: c.parameters, + } + c.heartbeat = time.Now().Add(c.parameters.HeartbeatTimeout) + c.createFactories() c.createAuthFactories() return c diff --git a/websocket/client.go b/websocket/client.go index 9f03043..d45121a 100644 --- a/websocket/client.go +++ b/websocket/client.go @@ -35,9 +35,6 @@ type Client struct { timeout int64 // read timeout asynchronous asynchronous - isConnected bool - terminal bool - init bool heartbeat time.Time hbChannel chan error @@ -50,12 +47,16 @@ type Client struct { // downstream listener channel to deliver API objects listener chan interface{} - // race management - waitGroup sync.WaitGroup - subscriptions map[int64]*SubscriptionStatus factories map[string]ParseFactory + isConnected bool + terminal bool + init bool + + // race management + waitGroup sync.WaitGroup + isConnectedMux sync.Mutex heartbeatMux sync.Mutex } @@ -145,26 +146,26 @@ func (c *Client) listenDisconnect() { if e != nil { log.Printf("socket disconnect: %s", e.Error()) } - c.setIsConnected(false) - err := c.reconnect(e) - if err != nil { + c.setIsConnected() + + if err := c.reconnect(e); err != nil { log.Printf("socket disconnect: %s", err.Error()) } case <-c.shutdown: log.Printf("Shutdown listen disconnect") - c.setIsConnected(false) + c.setIsConnected() return case e := <-c.listenHeartbeat(): log.Printf("Heartbeat") if e != nil { c.closeAsyncAndWait(c.parameters.ShutdownTimeout) - err := c.reconnect(nil) - if err != nil { + + if err := c.reconnect(nil); err != nil { log.Printf("socket disconnect: %s", err.Error()) } } - c.setIsConnected(false) + c.setIsConnected() } } @@ -195,12 +196,11 @@ func (c *Client) reset() { func (c *Client) connect() error { err := c.asynchronous.Connect() - c.setIsConnected(err == nil) + c.setIsConnected() return err } func (c *Client) resubscribe() error { - ctx := context.Background() for _, sub := range c.subscriptions { s := SubscriptionRequest{ Event: EventSubscribe, @@ -208,8 +208,7 @@ func (c *Client) resubscribe() error { Subscription: sub.Subscription, } - err := c.asynchronous.Send(ctx, s) - if err != nil { + if err := c.asynchronous.Send(context.Background(), s); err != nil { log.Println(err) return err } @@ -236,8 +235,7 @@ func (c *Client) reconnect(err error) error { c.reset() if err = c.connect(); err == nil { - err = c.resubscribe() - if err == nil { + if err = c.resubscribe(); err == nil { log.Print("reconnect OK") return nil } @@ -268,8 +266,7 @@ func (c *Client) listenUpstream() { case msg := <-c.asynchronous.Listen(): if msg != nil { // log.Printf("[DEBUG]: %s\n", msg) - err := c.handleMessage(msg) - if err != nil { + if err := c.handleMessage(msg); err != nil { log.Printf("[WARN]: %s\n", err) } } @@ -312,17 +309,21 @@ func (c *Client) closeAsyncAndWait(t time.Duration) { c.waitGroup.Wait() } -func (c *Client) handleMessage(msg []byte) (err error) { +func (c *Client) handleMessage(msg []byte) error { t := bytes.TrimLeftFunc(msg, unicode.IsSpace) c.updateHeartbeat() - if bytes.HasPrefix(t, []byte("[")) { - err = c.handleChannel(msg) - } else if bytes.HasPrefix(t, []byte("{")) { - err = c.handleEvent(msg) - } else { - return fmt.Errorf("unexpected message: %s", msg) + + if len(t) == 0 { + return fmt.Errorf("Empty response: %s", string(msg)) + } + switch t[0] { + case '[': + return c.handleChannel(msg) + case '{': + return c.handleEvent(msg) + default: + return fmt.Errorf("Unexpected message: %s", string(msg)) } - return err } func (c *Client) createFactories() { @@ -338,25 +339,22 @@ func (c *Client) createFactory(name string, factory ParseFactory) { } func (c *Client) handleEvent(msg []byte) error { - event := &EventType{} - err := json.Unmarshal(msg, event) - if err != nil { + var event EventType + if err := json.Unmarshal(msg, &event); err != nil { return err } - switch event.Event { + switch event.Event { case EventPong: - pong := PongResponse{} - err = json.Unmarshal(msg, &pong) - if err != nil { + var pong PongResponse + if err := json.Unmarshal(msg, &pong); err != nil { return err } log.Print("Pong received") case EventSystemStatus: - systemStatus := SystemStatus{} - err = json.Unmarshal(msg, &systemStatus) - if err != nil { + var systemStatus SystemStatus + if err := json.Unmarshal(msg, &systemStatus); err != nil { return err } log.Printf("Status: %s", systemStatus.Status) @@ -364,9 +362,8 @@ func (c *Client) handleEvent(msg []byte) error { log.Printf("Version: %s", systemStatus.Version) case EventSubscriptionStatus: - status := SubscriptionStatus{} - err = json.Unmarshal(msg, &status) - if err != nil { + var status SubscriptionStatus + if err := json.Unmarshal(msg, &status); err != nil { return err } @@ -386,39 +383,39 @@ func (c *Client) handleEvent(msg []byte) error { } } case EventCancelOrderStatus: - cancelOrderResponse := CancelOrderResponse{} - err = json.Unmarshal(msg, &cancelOrderResponse) - if err != nil { + var cancelOrderResponse CancelOrderResponse + if err := json.Unmarshal(msg, &cancelOrderResponse); err != nil { return err } - if cancelOrderResponse.Status == StatusError { + switch cancelOrderResponse.Status { + case StatusError: log.Printf("[ERROR] %s", cancelOrderResponse.ErrorMessage) - } else if cancelOrderResponse.Status == StatusOK { + case StatusOK: log.Print("[INFO] Order successfully cancelled") c.listener <- DataUpdate{ ChannelName: EventCancelOrder, Data: cancelOrderResponse, } - } else { + default: log.Printf("[ERROR] Unknown status: %s", cancelOrderResponse.Status) } case EventAddOrderStatus: - addOrderResponse := AddOrderResponse{} - err = json.Unmarshal(msg, &addOrderResponse) - if err != nil { + var addOrderResponse AddOrderResponse + if err := json.Unmarshal(msg, &addOrderResponse); err != nil { return err } - if addOrderResponse.Status == StatusError { + switch addOrderResponse.Status { + case StatusError: log.Printf("[ERROR] %s", addOrderResponse.ErrorMessage) - } else if addOrderResponse.Status == StatusOK { + case StatusOK: log.Print("[INFO] Order successfully sent") c.listener <- DataUpdate{ ChannelName: EventAddOrder, Data: addOrderResponse, } - } else { + default: log.Printf("[ERROR] Unknown status: %s", addOrderResponse.Status) } case EventHeartbeat: @@ -430,8 +427,7 @@ func (c *Client) handleEvent(msg []byte) error { func (c *Client) handleChannel(msg []byte) error { var data DataUpdate - err := json.Unmarshal(msg, &data) - if err != nil { + if err := json.Unmarshal(msg, &data); err != nil { return err } @@ -458,7 +454,7 @@ func (c *Client) IsConnected() bool { return c.isConnected } -func (c *Client) setIsConnected(value bool) { +func (c *Client) setIsConnected() { c.isConnectedMux.Lock() defer c.isConnectedMux.Unlock() diff --git a/websocket/client_test.go b/websocket/client_test.go index 09b1abc..fe82b83 100644 --- a/websocket/client_test.go +++ b/websocket/client_test.go @@ -197,7 +197,7 @@ func TestClient_handleChannel(t *testing.T) { name: "Test `not found factory`", fields: fields{ subscriptions: map[int64]*SubscriptionStatus{ - 1: &SubscriptionStatus{ + 1: { Subscription: Subscription{ Name: ChanTicker, }, @@ -214,7 +214,7 @@ func TestClient_handleChannel(t *testing.T) { name: "Test invalid parse message", fields: fields{ subscriptions: map[int64]*SubscriptionStatus{ - 1: &SubscriptionStatus{ + 1: { Subscription: Subscription{ Name: ChanTicker, }, @@ -233,7 +233,7 @@ func TestClient_handleChannel(t *testing.T) { name: "Test valid message", fields: fields{ subscriptions: map[int64]*SubscriptionStatus{ - 1: &SubscriptionStatus{ + 1: { Subscription: Subscription{ Name: ChanSpread, }, @@ -305,7 +305,7 @@ func TestClient_handleMessage(t *testing.T) { fields: fields{ heartbeat: time.Now(), subscriptions: map[int64]*SubscriptionStatus{ - 1: &SubscriptionStatus{ + 1: { ChannelID: 1, Subscription: Subscription{ Name: ChanSpread, @@ -520,51 +520,37 @@ func TestClient_Ping(t *testing.T) { } func TestClient_Unsubscribe(t *testing.T) { - type fields struct { - isError bool - } - type args struct { + tests := []struct { + name string channelType string pairs []string - } - tests := []struct { - name string - fields fields - args args - wantErr bool + isError bool + wantErr bool }{ { - name: "Test unsubscribe method: success", - fields: fields{ - isError: false, - }, - args: args{ - channelType: ChanTicker, - pairs: []string{BTCCAD}, - }, - wantErr: false, + name: "Test unsubscribe method: success", + isError: false, + channelType: ChanTicker, + pairs: []string{BTCCAD}, + wantErr: false, }, { - name: "Test unsubscribe method: failed", - fields: fields{ - isError: true, - }, - args: args{ - channelType: ChanTicker, - pairs: []string{BTCCAD}, - }, - wantErr: true, + name: "Test unsubscribe method: failed", + isError: true, + channelType: ChanTicker, + pairs: []string{BTCCAD}, + wantErr: true, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { c := &Client{ asynchronous: &mockAsynchronous{ - isSendError: tt.fields.isError, + isSendError: tt.isError, }, parameters: NewDefaultSandboxParameters(), } - if err := c.Unsubscribe(tt.args.channelType, tt.args.pairs); (err != nil) != tt.wantErr { + if err := c.Unsubscribe(tt.channelType, tt.pairs); (err != nil) != tt.wantErr { t.Errorf("Client.Unsubscribe() error = %v, wantErr %v", err, tt.wantErr) } }) @@ -572,39 +558,26 @@ func TestClient_Unsubscribe(t *testing.T) { } func TestClient_SubscribeBook(t *testing.T) { - type fields struct { - isError bool - } - type args struct { - pairs []string - depth int64 - } tests := []struct { name string - fields fields - args args + pairs []string + depth int64 + isError bool wantErr bool }{ { - name: "Test SubscribeBook method: success", - fields: fields{ - isError: false, - }, - args: args{ - depth: Depth10, - pairs: []string{BTCCAD}, - }, + name: "Test SubscribeBook method: success", + isError: false, + depth: Depth10, + pairs: []string{BTCCAD}, wantErr: false, }, { - name: "Test SubscribeBook method: failed", - fields: fields{ - isError: true, - }, - args: args{ - depth: Depth10, - pairs: []string{BTCCAD}, - }, + name: "Test SubscribeBook method: failed", + isError: true, + depth: Depth10, + pairs: []string{BTCCAD}, + wantErr: true, }, } @@ -612,11 +585,11 @@ func TestClient_SubscribeBook(t *testing.T) { t.Run(tt.name, func(t *testing.T) { c := &Client{ asynchronous: &mockAsynchronous{ - isSendError: tt.fields.isError, + isSendError: tt.isError, }, parameters: NewDefaultSandboxParameters(), } - if err := c.SubscribeBook(tt.args.pairs, tt.args.depth); (err != nil) != tt.wantErr { + if err := c.SubscribeBook(tt.pairs, tt.depth); (err != nil) != tt.wantErr { t.Errorf("Client.SubscribeBook() error = %v, wantErr %v", err, tt.wantErr) } }) @@ -624,36 +597,22 @@ func TestClient_SubscribeBook(t *testing.T) { } func TestClient_SubscribeSpread(t *testing.T) { - type fields struct { - isError bool - } - type args struct { - pairs []string - } tests := []struct { name string - fields fields - args args + pairs []string + isError bool wantErr bool }{ { - name: "Test SubscribeSpread method: success", - fields: fields{ - isError: false, - }, - args: args{ - pairs: []string{BTCCAD}, - }, + name: "Test SubscribeSpread method: success", + isError: false, + pairs: []string{BTCCAD}, wantErr: false, }, { - name: "Test SubscribeSpread method: failed", - fields: fields{ - isError: true, - }, - args: args{ - pairs: []string{BTCCAD}, - }, + name: "Test SubscribeSpread method: failed", + isError: true, + pairs: []string{BTCCAD}, wantErr: true, }, } @@ -661,11 +620,11 @@ func TestClient_SubscribeSpread(t *testing.T) { t.Run(tt.name, func(t *testing.T) { c := &Client{ asynchronous: &mockAsynchronous{ - isSendError: tt.fields.isError, + isSendError: tt.isError, }, parameters: NewDefaultSandboxParameters(), } - if err := c.SubscribeSpread(tt.args.pairs); (err != nil) != tt.wantErr { + if err := c.SubscribeSpread(tt.pairs); (err != nil) != tt.wantErr { t.Errorf("Client.SubscribeSpread() error = %v, wantErr %v", err, tt.wantErr) } }) @@ -673,36 +632,22 @@ func TestClient_SubscribeSpread(t *testing.T) { } func TestClient_SubscribeTrades(t *testing.T) { - type fields struct { - isError bool - } - type args struct { - pairs []string - } tests := []struct { name string - fields fields - args args + pairs []string + isError bool wantErr bool }{ { - name: "Test SubscribeTrades method: success", - fields: fields{ - isError: false, - }, - args: args{ - pairs: []string{BTCCAD}, - }, + name: "Test SubscribeTrades method: success", + isError: false, + pairs: []string{BTCCAD}, wantErr: false, }, { - name: "Test SubscribeTrades method: failed", - fields: fields{ - isError: true, - }, - args: args{ - pairs: []string{BTCCAD}, - }, + name: "Test SubscribeTrades method: failed", + isError: true, + pairs: []string{BTCCAD}, wantErr: true, }, } @@ -710,11 +655,11 @@ func TestClient_SubscribeTrades(t *testing.T) { t.Run(tt.name, func(t *testing.T) { c := &Client{ asynchronous: &mockAsynchronous{ - isSendError: tt.fields.isError, + isSendError: tt.isError, }, parameters: NewDefaultSandboxParameters(), } - if err := c.SubscribeTrades(tt.args.pairs); (err != nil) != tt.wantErr { + if err := c.SubscribeTrades(tt.pairs); (err != nil) != tt.wantErr { t.Errorf("Client.SubscribeTrades() error = %v, wantErr %v", err, tt.wantErr) } }) @@ -722,51 +667,37 @@ func TestClient_SubscribeTrades(t *testing.T) { } func TestClient_SubscribeCandles(t *testing.T) { - type fields struct { - isError bool - } - type args struct { + tests := []struct { + name string pairs []string interval int64 - } - tests := []struct { - name string - fields fields - args args - wantErr bool + isError bool + wantErr bool }{ { - name: "Test SubscribeCandles method: success", - fields: fields{ - isError: false, - }, - args: args{ - pairs: []string{BTCCAD}, - interval: Interval10080, - }, - wantErr: false, + name: "Test SubscribeCandles method: success", + isError: false, + pairs: []string{BTCCAD}, + interval: Interval10080, + wantErr: false, }, { - name: "Test SubscribeCandles method: failed", - fields: fields{ - isError: true, - }, - args: args{ - pairs: []string{BTCCAD}, - interval: Interval10080, - }, - wantErr: true, + name: "Test SubscribeCandles method: failed", + isError: true, + pairs: []string{BTCCAD}, + interval: Interval10080, + wantErr: true, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { c := &Client{ asynchronous: &mockAsynchronous{ - isSendError: tt.fields.isError, + isSendError: tt.isError, }, parameters: NewDefaultSandboxParameters(), } - if err := c.SubscribeCandles(tt.args.pairs, tt.args.interval); (err != nil) != tt.wantErr { + if err := c.SubscribeCandles(tt.pairs, tt.interval); (err != nil) != tt.wantErr { t.Errorf("Client.SubscribeCandles() error = %v, wantErr %v", err, tt.wantErr) } }) @@ -774,36 +705,22 @@ func TestClient_SubscribeCandles(t *testing.T) { } func TestClient_SubscribeTicker(t *testing.T) { - type fields struct { - isError bool - } - type args struct { - pairs []string - } tests := []struct { name string - fields fields - args args + pairs []string + isError bool wantErr bool }{ { - name: "Test SubscribeTicker method: success", - fields: fields{ - isError: false, - }, - args: args{ - pairs: []string{BTCCAD}, - }, + name: "Test SubscribeTicker method: success", + isError: false, + pairs: []string{BTCCAD}, wantErr: false, }, { - name: "Test SubscribeTicker method: failed", - fields: fields{ - isError: true, - }, - args: args{ - pairs: []string{BTCCAD}, - }, + name: "Test SubscribeTicker method: failed", + isError: true, + pairs: []string{BTCCAD}, wantErr: true, }, } @@ -811,11 +728,11 @@ func TestClient_SubscribeTicker(t *testing.T) { t.Run(tt.name, func(t *testing.T) { c := &Client{ asynchronous: &mockAsynchronous{ - isSendError: tt.fields.isError, + isSendError: tt.isError, }, parameters: NewDefaultSandboxParameters(), } - if err := c.SubscribeTicker(tt.args.pairs); (err != nil) != tt.wantErr { + if err := c.SubscribeTicker(tt.pairs); (err != nil) != tt.wantErr { t.Errorf("Client.SubscribeTicker() error = %v, wantErr %v", err, tt.wantErr) } }) @@ -993,7 +910,7 @@ func TestClient_resubscribe(t *testing.T) { isSendError: tt.fields.isError, }, subscriptions: map[int64]*SubscriptionStatus{ - 1: &SubscriptionStatus{ + 1: { Pair: BTCCAD, Subscription: Subscription{}, }, @@ -1264,7 +1181,7 @@ func TestClient_reconnect(t *testing.T) { isConnectError: false, isSendError: true, subscriptions: map[int64]*SubscriptionStatus{ - 1: &SubscriptionStatus{ + 1: { Pair: BTCCAD, Subscription: Subscription{ Name: ChanTicker, diff --git a/websocket/consts.go b/websocket/consts.go index 762720b..293ab3a 100644 --- a/websocket/consts.go +++ b/websocket/consts.go @@ -2,9 +2,10 @@ package websocket // URLs const ( - ProdBaseURL = "wss://ws.kraken.com" - AuthBaseURL = "wss://ws-auth.kraken.com" - SandboxBaseURL = "wss://beta-ws.kraken.com" + ProdBaseURL = "wss://ws.kraken.com" + AuthBaseURL = "wss://ws-auth.kraken.com" + SandboxBaseURL = "wss://beta-ws.kraken.com" + AuthSandboxBaseURL = "wss://beta-ws-auth.kraken.com" ) // Available channels @@ -36,11 +37,11 @@ const ( // Intervals const ( - Interval1 = 1 - Interval5 = 5 - Interval15 = 15 - Interval30 = 30 - Interval60 = 60 + Interval1 = 1 + Interval5 = 5 + Interval15 = 15 + Interval30 = 30 + Interval60 = 60 Interval240 = 240 Interval1440 = 1440 Interval10080 = 10080 diff --git a/websocket/factories.go b/websocket/factories.go index a7dd3ca..34d8900 100644 --- a/websocket/factories.go +++ b/websocket/factories.go @@ -231,40 +231,58 @@ func (f *openOrdersFactory) Parse(data interface{}, pair string) (interface{}, e return upd, fmt.Errorf("Can't parse data %#v", data) } - if len(body) != 2 { - return upd, fmt.Errorf("Can't parse data %#v", data) - } - - for key, value := range body[0].(map[string]map[string]interface{}) { - upd.Order[key] = OpenOrder{ - Cost: valToFloat64(value["cost"]), - Fee: valToFloat64(value["fee"]), - LimitPrice: valToFloat64(value["limitprice"]), - Misc: value["misc"].(string), - Oflags: value["oflags"].(string), - OpenTime: valToFloat64(value["opentm"]), - StartTime: valToFloat64(value["starttm"]), - ExpireTime: valToFloat64(value["expiretm"]), - Price: valToFloat64(value["price"]), - Refid: value["refid"].(string), - Status: value["status"].(string), - StopPrice: valToFloat64(value["stopprice"]), - UserRef: int(value["userref"].(float64)), - Vol: valToFloat64(value["vol"]), - VolExec: valToFloat64(value["vol_exec"]), - - Descr: OpenOrderDescr{ - Close: value["close"].(string), - Leverage: value["leverage"].(string), - Order: value["order"].(string), - Ordertype: value["ordertype"].(string), - Pair: value["pair"].(string), - Price: valToFloat64(value["price"]), - Price2: valToFloat64(value["price2"]), - Type: value["type"].(string), - }, + for i := range body { + order, ok := body[i].(map[string]interface{}) + if !ok { + return upd, fmt.Errorf("Can't parse order %#v", body[i]) + } + for orderID, orderBody := range order { + value, ok := orderBody.(map[string]interface{}) + if !ok { + return upd, fmt.Errorf("Can't parse order body %#v", body[i]) + } + upd.Order[orderID] = OpenOrder{ + Cost: valToFloat64(value["cost"]), + Fee: valToFloat64(value["fee"]), + LimitPrice: valToFloat64(value["limitprice"]), + Misc: getString(value, "misc"), + Oflags: getString(value, "oflags"), + OpenTime: valToFloat64(value["opentm"]), + StartTime: valToFloat64(value["starttm"]), + ExpireTime: valToFloat64(value["expiretm"]), + Price: valToFloat64(value["price"]), + Refid: getString(value, "refid"), + Status: getString(value, "status"), + StopPrice: valToFloat64(value["stopprice"]), + UserRef: int(value["userref"].(float64)), + Vol: valToFloat64(value["vol"]), + VolExec: valToFloat64(value["vol_exec"]), + + Descr: OpenOrderDescr{ + Close: getString(value, "close"), + Leverage: getString(value, "leverage"), + Order: getString(value, "order"), + Ordertype: getString(value, "ordertype"), + Pair: getString(value, "pair"), + Price: valToFloat64(value["price"]), + Price2: valToFloat64(value["price2"]), + Type: getString(value, "type"), + }, + } } } return upd, nil } + +func getString(data map[string]interface{}, key string) string { + value, ok := data[key] + if !ok { + return "" + } + str, ok := value.(string) + if !ok { + return "" + } + return str +} diff --git a/websocket/factories_test.go b/websocket/factories_test.go index edac85f..58fa6fe 100644 --- a/websocket/factories_test.go +++ b/websocket/factories_test.go @@ -246,7 +246,7 @@ func Test_tradesFactory_Parse(t *testing.T) { }, f: &tradesFactory{}, want: []TradeUpdate{ - TradeUpdate{ + { Pair: BTCCAD, Time: float64(t1.Unix()), Price: 5541.2, @@ -255,7 +255,7 @@ func Test_tradesFactory_Parse(t *testing.T) { OrderType: Limit, Misc: "", }, - TradeUpdate{ + { Pair: BTCCAD, Time: float64(t2.Unix()), Price: 6060., @@ -415,13 +415,13 @@ func Test_bookFactory_Parse(t *testing.T) { IsSnapshot: true, Pair: BTCCAD, Asks: []OrderBookItem{ - OrderBookItem{ + { Price: 5541.3, Volume: 2.50700000, Time: float64(t1.Unix()), Republish: false, }, - OrderBookItem{ + { Price: 5541.8, Volume: 0.33000000, Time: float64(t2.Unix()), @@ -429,13 +429,13 @@ func Test_bookFactory_Parse(t *testing.T) { }, }, Bids: []OrderBookItem{ - OrderBookItem{ + { Price: 5541.2, Volume: 1.52900000, Time: float64(t1.Unix()), Republish: false, }, - OrderBookItem{ + { Price: 5539.9, Volume: 0.30000000, Time: float64(t2.Unix()), @@ -472,7 +472,7 @@ func Test_bookFactory_Parse(t *testing.T) { IsSnapshot: false, Pair: BTCCAD, Asks: []OrderBookItem{ - OrderBookItem{ + { Price: 5541.3, Volume: 2.50700000, Time: float64(t1.Unix()), @@ -480,7 +480,7 @@ func Test_bookFactory_Parse(t *testing.T) { }, }, Bids: []OrderBookItem{ - OrderBookItem{ + { Price: 5541.2, Volume: 1.52900000, Time: float64(t1.Unix()), diff --git a/websocket/messages.go b/websocket/messages.go index 0371490..55dab32 100644 --- a/websocket/messages.go +++ b/websocket/messages.go @@ -87,25 +87,25 @@ func (u *DataUpdate) UnmarshalJSON(data []byte) error { } if len(raw) == 3 { - var ok bool u.Data = raw[0] - - if u.ChannelName, ok = raw[1].(string); !ok { + chanName, ok := raw[1].(string) + if !ok { return fmt.Errorf("expected message to have channel name as 2nd element but got %#v instead", raw[1]) } + u.ChannelName = chanName - var sequenceMap map[string]interface{} - if sequenceMap, ok = raw[2].(map[string]interface{}); !ok { + sequenceMap, ok := raw[2].(map[string]interface{}) + if !ok { return fmt.Errorf("expected message to have JSON object as 3rd element but got %#v instead", raw[2]) } - var sequenceRaw interface{} - if sequenceRaw, ok = sequenceMap["sequence"]; !ok { + sequenceRaw, ok := sequenceMap["sequence"] + if !ok { return fmt.Errorf("expected message to have sequence in JSON object as 3rd element but got %#v instead", raw[2]) } - var seq float64 - if seq, ok = sequenceRaw.(float64); !ok { + seq, ok := sequenceRaw.(float64) + if !ok { return fmt.Errorf("expected message to have sequence integer in JSON object as 3rd element but got %#v instead", raw[2]) } @@ -119,15 +119,15 @@ func (u *DataUpdate) UnmarshalJSON(data []byte) error { } u.ChannelID = int64(chID) - u.ChannelName, ok = raw[len(raw)-2].(string) + u.ChannelName, ok = raw[2].(string) if !ok { return fmt.Errorf("expected message with (n - 2) element channel name but got %#v instead", raw[len(raw)-2]) } - u.Pair, ok = raw[len(raw)-1].(string) + u.Pair, ok = raw[3].(string) if !ok { return fmt.Errorf("expected message with (n - 2) element pair but got %#v instead", raw[len(raw)-1]) } - u.Data = raw[1 : len(raw)-2][0] + u.Data = raw[1] return nil } @@ -305,7 +305,7 @@ type AddOrderResponse struct { Event string `json:"event"` Status string `json:"status"` TxID string `json:"txid"` - ErrorMessage string `json:"errorMessage,omiempty"` + ErrorMessage string `json:"errorMessage,omitempty"` } // CancelOrderRequest - @@ -316,7 +316,7 @@ type CancelOrderRequest struct { // CancelOrderResponse - type CancelOrderResponse struct { - ErrorMessage string `json:"errorMessage,omiempty"` + ErrorMessage string `json:"errorMessage,omitempty"` Event string `json:"event"` Status string `json:"status"` } diff --git a/websocket/options.go b/websocket/options.go new file mode 100644 index 0000000..6f54f01 --- /dev/null +++ b/websocket/options.go @@ -0,0 +1,13 @@ +package websocket + +// AuthOption - option function for `AuthClient` +type AuthOption func(*AuthClient) + +// WithParams - add custom params to `AuthClient` +func WithParams(params *Parameters) AuthOption { + return func(auth *AuthClient) { + if params != nil { + auth.parameters = params + } + } +} diff --git a/websocket/parameters.go b/websocket/parameters.go index 1f75654..6e8c4de 100644 --- a/websocket/parameters.go +++ b/websocket/parameters.go @@ -6,20 +6,19 @@ import ( // Parameters defines adapter behavior. type Parameters struct { - AutoReconnect bool - ReconnectInterval time.Duration - ReconnectAttempts int - reconnectTry int - ShutdownTimeout time.Duration - ContextTimeout time.Duration - - ResubscribeOnReconnect bool + URL string + ReconnectInterval time.Duration + ShutdownTimeout time.Duration + ContextTimeout time.Duration HeartbeatCheckPeriod time.Duration HeartbeatTimeout time.Duration - LogTransport bool - URL string + ReconnectAttempts int + reconnectTry int + LogTransport bool + AutoReconnect bool + ResubscribeOnReconnect bool } // NewDefaultParameters - create default Parameters object for prod @@ -72,3 +71,20 @@ func NewDefaultAuthParameters() *Parameters { ContextTimeout: time.Second * 5, } } + +// NewDefaultSandboxAuthParameters - create default Parameters object for sanbox auth +func NewDefaultSandboxAuthParameters() *Parameters { + return &Parameters{ + AutoReconnect: true, + ReconnectInterval: time.Second, + reconnectTry: 0, + ReconnectAttempts: 5, + URL: AuthSandboxBaseURL, + ShutdownTimeout: time.Second * 5, + ResubscribeOnReconnect: true, + HeartbeatTimeout: time.Second * 3, // HB = 3s + HeartbeatCheckPeriod: time.Millisecond * 100, + LogTransport: false, // log transport send/recv, + ContextTimeout: time.Second * 5, + } +} diff --git a/websocket/parse_utils.go b/websocket/parse_utils.go index 4795bf2..cf2ab91 100644 --- a/websocket/parse_utils.go +++ b/websocket/parse_utils.go @@ -41,7 +41,7 @@ func parseLevel(data []interface{}) Level { } func parseValues(data []interface{}) Values { - switch data[0].(type) { + switch val := data[0].(type) { case string: return Values{ Today: valToFloat64(data[0]), @@ -53,7 +53,7 @@ func parseValues(data []interface{}) Values { last24h = v } return Values{ - Today: data[0].(int), + Today: val, Last24: last24h, } @@ -63,7 +63,7 @@ func parseValues(data []interface{}) Values { last24h = v } return Values{ - Today: data[0].(float64), + Today: val, Last24: last24h, } default: diff --git a/websocket/transport.go b/websocket/transport.go index d4e62eb..70dfc77 100644 --- a/websocket/transport.go +++ b/websocket/transport.go @@ -36,16 +36,16 @@ func newWs(baseURL string, logTransport bool) *ws { } type ws struct { - ws connInterface + BaseURL string + ws connInterface + downstream chan []byte + shutdown chan struct{} // signal to kill looping goroutines + finished chan error // signal to parent with error, if applicable + wsLock sync.Mutex - BaseURL string TLSSkipVerify bool - downstream chan []byte userShutdown bool logTransport bool - - shutdown chan struct{} // signal to kill looping goroutines - finished chan error // signal to parent with error, if applicable } func (w *ws) Connect() error { @@ -68,10 +68,12 @@ func (w *ws) Connect() error { ws, resp, err := d.Dial(w.BaseURL, nil) if err != nil { if err == websocket.ErrBadHandshake { - log.Printf("bad handshake: status code %d", resp.StatusCode) + log.Printf("bad handshake") } return err } + defer resp.Body.Close() + w.ws = ws go w.listenWs() return nil @@ -86,11 +88,10 @@ func (w *ws) Send(ctx context.Context, msg interface{}) error { } bs, err := json.Marshal(msg) - - // log.Printf("[DEBUG]: %s\n", bs) if err != nil { return err } + // log.Printf("[DEBUG]: %s\n", bs) select { case <-ctx.Done(): @@ -102,11 +103,12 @@ func (w *ws) Send(ctx context.Context, msg interface{}) error { w.wsLock.Lock() defer w.wsLock.Unlock() + if w.logTransport { log.Printf("ws->srv: %s", string(bs)) } - err = w.ws.WriteMessage(websocket.TextMessage, bs) - if err != nil { + + if err := w.ws.WriteMessage(websocket.TextMessage, bs); err != nil { w.cleanup(err) return err }