This repository has been archived by the owner on Oct 25, 2024. It is now read-only.
forked from schwartzmx/gremtune
-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathresponse.go
195 lines (165 loc) · 7.35 KB
/
response.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
package gremcos
import (
"encoding/json"
"fmt"
"github.com/supplyon/gremcos/interfaces"
)
func (c *client) handleResponse(msg []byte) error {
resp, err := marshalResponse(msg)
// ignore the error here in case the response status code tells that an authentication is needed
if resp.Status.Code == interfaces.StatusAuthenticate { //Server request authentication
return c.authenticate(resp.RequestID)
}
c.saveResponse(resp, err)
return err
}
// marshalResponse creates a response struct for every incoming response for further manipulation
func marshalResponse(msg []byte) (interfaces.Response, error) {
resp := interfaces.Response{}
err := json.Unmarshal(msg, &resp)
if err != nil {
return resp, err
}
err = extractError(resp)
return resp, err
}
// saveResponse makes the response available for retrieval by the requester. Mutexes are used for thread safety.
func (c *client) saveResponse(resp interfaces.Response, err error) {
c.mux.Lock()
defer c.mux.Unlock()
var container []interface{}
existingData, ok := c.results.Load(resp.RequestID) // Retrieve old data container (for requests with multiple responses)
if ok {
container = existingData.([]interface{})
}
newdata := append(container, resp) // Create new data container with new data
c.results.Store(resp.RequestID, newdata) // Add new data to buffer for future retrieval
// obtain or create (if needed) the error notification channel for the currently active response
respNotifier, _ := c.responseNotifier.LoadOrStore(resp.RequestID, newSafeCloseErrorChannel(1))
respNotifierChannel := respNotifier.(*safeCloseErrorChannel)
// obtain or create (if needed) the status notification channel for the currently active response
responseStatusNotifier, _ := c.responseStatusNotifier.LoadOrStore(resp.RequestID, newSafeCloseIntChannel(1))
responseStatusNotifierChannel := responseStatusNotifier.(*safeCloseIntChannel)
// FIXME: This looks weird. the status code of the current response is only posted to the responseStatusNotifier channel
// if there is space left on the channel. If not then the status is just silently not posted (ignored).
if cap(responseStatusNotifierChannel.c) > len(responseStatusNotifierChannel.c) {
// Channel is not full so adding the response status to the channel else it will cause the method to wait till the response is read by requester
responseStatusNotifierChannel.c <- resp.Status.Code
}
// post an error in case it is not a partial message.
// note that here the given error can be nil.
// this is the good case that just completes the retrieval of the response
if resp.Status.Code != interfaces.StatusPartialContent {
respNotifierChannel.c <- err
}
}
// retrieveResponseAsync retrieves the response saved by saveResponse and send the retrieved repose to the channel .
func (c *client) retrieveResponseAsync(id string, responseChannel chan interfaces.AsyncResponse) {
var responseProcessedIndex int
responseNotifier, _ := c.responseNotifier.Load(id)
responseNotifierChannel := responseNotifier.(*safeCloseErrorChannel)
responseStatusNotifier, _ := c.responseStatusNotifier.Load(id)
responseStatusNotifierChannel := responseStatusNotifier.(*safeCloseIntChannel)
for status := range responseStatusNotifierChannel.c {
_ = status
// this block retrieves all but the last of the partial responses
// and sends it to the response channel
if dataI, ok := c.results.Load(id); ok {
d := dataI.([]interface{})
// Only retrieve all but one from the partial responses saved in results Map that are not sent to responseChannel
for i := responseProcessedIndex; i < len(d)-1; i++ {
responseProcessedIndex++
var asyncResponse = interfaces.AsyncResponse{}
asyncResponse.Response = d[i].(interfaces.Response)
// Send the Partial response object to the responseChannel
responseChannel <- asyncResponse
}
}
// Checks to see If there was an Error or full response that has been provided by cosmos
// If not, then continue with consuming the other partial messages
if len(responseNotifierChannel.c) <= 0 {
continue
}
//Checks to see If there was an Error or will get nil when final response has been provided by cosmos
err := <-responseNotifierChannel.c
if dataI, ok := c.results.Load(id); ok {
d := dataI.([]interface{})
// Retrieve all the partial responses that are not sent to responseChannel
for i := responseProcessedIndex; i < len(d); i++ {
responseProcessedIndex++
asyncResponse := interfaces.AsyncResponse{}
asyncResponse.Response = d[i].(interfaces.Response)
//when final partial response it sent it also sends the error message if there was an error on the last partial response retrival
if responseProcessedIndex == len(d) && err != nil {
asyncResponse.ErrorMessage = err.Error()
}
// Send the Partial response object to the responseChannel
responseChannel <- asyncResponse
}
}
// All the Partial response object including the final one has been sent to the responseChannel
break
}
// All the Partial response object including the final one has been sent to the responseChannel
// so closing responseStatusNotifierChannel, responseNotifierChannel, responseChannel and removing all the repose stored
responseStatusNotifierChannel.Close()
responseNotifierChannel.Close()
c.responseNotifier.Delete(id)
c.responseStatusNotifier.Delete(id)
c.deleteResponse(id)
close(responseChannel)
}
func emptyIfNilOrError(err error) string {
if err == nil {
return ""
}
return err.Error()
}
// retrieveResponse retrieves the response saved by saveResponse.
func (c *client) retrieveResponse(id string) ([]interfaces.Response, error) {
var responseErrorChannel *safeCloseErrorChannel
var responseStatusNotifierChannel *safeCloseIntChannel
// ensure that the cleanup is done in any case
defer func() {
if responseErrorChannel != nil {
responseErrorChannel.Close()
}
if responseStatusNotifierChannel != nil {
responseStatusNotifierChannel.Close()
}
c.responseNotifier.Delete(id)
c.responseStatusNotifier.Delete(id)
c.deleteResponse(id)
}()
responseErrorChannelUntyped, ok := c.responseNotifier.Load(id)
if !ok {
return nil, fmt.Errorf("response with id %s not found", id)
}
responseErrorChannel = responseErrorChannelUntyped.(*safeCloseErrorChannel)
responseStatusNotifierUntyped, ok := c.responseStatusNotifier.Load(id)
if !ok {
return nil, fmt.Errorf("response with id %s not found", id)
}
responseStatusNotifierChannel = responseStatusNotifierUntyped.(*safeCloseIntChannel)
err := <-responseErrorChannel.c
// Hint: Don't return here immediately in case the obtained error is != nil.
// We don't want to lose the responses obtained so far, especially the
// data stored in the attribute map of each response is useful.
// For example the response contains the request charge for this request.
dataI, ok := c.results.Load(id)
if !ok {
lastErr := c.LastError() // add more information to find out why there was no result
return nil, fmt.Errorf("no result for response with id %s found, err='%s'", id, emptyIfNilOrError(lastErr))
}
// cast the given data into an array of Responses
d := dataI.([]interface{})
data := make([]interfaces.Response, len(d))
for i := range d {
data[i] = d[i].(interfaces.Response)
}
return data, err
}
// deleteResponse deletes the response from the container. Used for cleanup purposes by requester.
func (c *client) deleteResponse(id string) {
c.results.Delete(id)
}