-
Notifications
You must be signed in to change notification settings - Fork 58
/
Copy pathapi_query_handler.go
281 lines (243 loc) · 8.94 KB
/
api_query_handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
package handlers
import (
"context"
"fmt"
gomath "math"
"strings"
"time"
"golang.org/x/sync/errgroup"
"go.uber.org/zap"
"github.com/skip-mev/connect/v2/oracle/config"
"github.com/skip-mev/connect/v2/pkg/math"
"github.com/skip-mev/connect/v2/providers/base/api/metrics"
providertypes "github.com/skip-mev/connect/v2/providers/types"
)
// APIQueryHandler is an interface that encapsulates querying a data provider for info.
// The handler must respect the context timeout and cancel the request if the context
// is cancelled. All responses must be sent to the response channel. These are processed
// asynchronously by the provider.
//
//go:generate mockery --name APIQueryHandler --output ./mocks/ --case underscore
type APIQueryHandler[K providertypes.ResponseKey, V providertypes.ResponseValue] interface {
Query(
ctx context.Context,
ids []K,
responseCh chan<- providertypes.GetResponse[K, V],
)
}
// APIFetcher is an interface that encapsulates fetching data from a provider. This interface
// is meant to abstract over the various processes of interacting w/ GRPC, JSON-RPC, REST, etc. APIs.
//
//go:generate mockery --name APIFetcher --output ./mocks/ --case underscore
type APIFetcher[K providertypes.ResponseKey, V providertypes.ResponseValue] interface {
// Fetch fetches data from the API for the given IDs. The response is returned as a map of IDs to
// their respective responses. The request should respect the context timeout and cancel the request
// if the context is cancelled.
Fetch(
ctx context.Context,
ids []K,
) providertypes.GetResponse[K, V]
}
// APIQueryHandlerImpl is the default API implementation of the QueryHandler interface.
// This is used to query using http requests. It manages querying the data provider
// by using the APIDataHandler and RequestHandler. All responses are sent to the
// response channel. In the case where the APIQueryHandler is atomic, the handler
// will make a single request for all IDs. If the APIQueryHandler is not
// atomic, the handler will make a request for each ID in a separate go routine.
type APIQueryHandlerImpl[K providertypes.ResponseKey, V providertypes.ResponseValue] struct {
logger *zap.Logger
metrics metrics.APIMetrics
config config.APIConfig
// fetcher is responsible for fetching data from the API.
fetcher APIFetcher[K, V]
}
// NewAPIQueryHandler creates a new APIQueryHandler. It manages querying the data
// provider by using the APIDataHandler and RequestHandler.
func NewAPIQueryHandler[K providertypes.ResponseKey, V providertypes.ResponseValue](
logger *zap.Logger,
cfg config.APIConfig,
requestHandler RequestHandler,
apiHandler APIDataHandler[K, V],
metrics metrics.APIMetrics,
) (APIQueryHandler[K, V], error) {
fetcher, err := NewRestAPIFetcher(requestHandler, apiHandler, metrics, cfg, logger)
if err != nil {
return nil, fmt.Errorf("failed to create api fetcher: %w", err)
}
return &APIQueryHandlerImpl[K, V]{
logger: logger.With(zap.String("api_query_handler", cfg.Name)),
config: cfg,
metrics: metrics,
fetcher: fetcher,
}, nil
}
// NewAPIQueryHandlerWithFetcher creates a new APIQueryHandler with a custom api fetcher.
func NewAPIQueryHandlerWithFetcher[K providertypes.ResponseKey, V providertypes.ResponseValue](
logger *zap.Logger,
cfg config.APIConfig,
fetcher APIFetcher[K, V],
metrics metrics.APIMetrics,
) (APIQueryHandler[K, V], error) {
if err := cfg.ValidateBasic(); err != nil {
return nil, fmt.Errorf("invalid provider config: %w", err)
}
if !cfg.Enabled {
return nil, fmt.Errorf("api query handler is not enabled for the provider")
}
if logger == nil {
return nil, fmt.Errorf("no logger specified for api query handler")
}
if metrics == nil {
return nil, fmt.Errorf("no metrics specified for api query handler")
}
if fetcher == nil {
return nil, fmt.Errorf("no fetcher specified for api query handler")
}
return &APIQueryHandlerImpl[K, V]{
logger: logger.With(zap.String("api_query_handler", cfg.Name)),
config: cfg,
metrics: metrics,
fetcher: fetcher,
}, nil
}
// Query is used to query the API data provider for the given IDs. This method blocks
// until all responses have been sent to the response channel. Query will only
// make N concurrent requests at a time, where N is the capacity of the response channel.
func (h *APIQueryHandlerImpl[K, V]) Query(
ctx context.Context,
ids []K,
responseCh chan<- providertypes.GetResponse[K, V],
) {
if len(ids) == 0 {
h.logger.Debug("no ids to query")
return
}
h.logger.Debug("starting api query handler")
defer func() {
if r := recover(); r != nil {
h.logger.Error("panic in api query handler", zap.Any("panic", r))
}
h.logger.Debug("finished api query handler")
}()
// Set the concurrency limit based on the maximum number of queries allowed for a single
// interval.
wg := errgroup.Group{}
limit := math.Min(h.config.MaxQueries, len(ids))
wg.SetLimit(limit)
h.logger.Debug("setting concurrency limit", zap.Int("limit", limit))
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// If our task is atomic, we can make a single request for all the IDs. Otherwise,
// we need to make a request for each ID.
var tasks []func() error
if h.config.Atomic {
tasks = append(tasks, h.subTask(ctx, ids, responseCh))
} else {
// Calculate the batch size based on the configuration.
batchSize := math.Max(1, h.config.BatchSize)
// determine the number of queries we need to make based on the batch size.
threads := int(gomath.Ceil(float64(len(ids)) / float64(batchSize)))
// update limit in accordance with # of threads necessary, we want to avoid unnecessary go routines
// if the number of threads (tasks) is less than the limit.
limit = math.Min(limit, threads)
for i := 0; i < threads; i++ {
// Calculate the start and end indices for the batch.
start := i * batchSize
end := math.Min(len(ids), (i+1)*batchSize)
// Create a new task for the batch.
tasks = append(tasks, h.subTask(ctx, ids[start:end], responseCh))
}
h.logger.Debug(
"created sub-tasks",
zap.Int("threads", threads),
zap.Int("limit", limit),
zap.Int("batch_size", batchSize),
)
}
// Block each task until the wait group has capacity to accept a new response.
index := 0
ticker, stop := tickerWithImmediateFirstTick(h.config.Interval)
defer stop()
MainLoop:
for {
select {
case <-ctx.Done():
h.logger.Debug("context cancelled, stopping queries")
break MainLoop
case <-ticker:
// spin up limit number of tasks
for i := 0; i < limit; i++ {
wg.Go(tasks[index%len(tasks)])
index++
}
h.logger.Debug("interval complete", zap.Duration("interval", h.config.Interval), zap.Int("index", index))
}
}
// Wait for all tasks to complete.
h.logger.Debug("waiting for api sub-tasks to complete")
if err := wg.Wait(); err != nil {
h.logger.Debug("error querying ids", zap.Error(err))
}
h.logger.Debug("all api sub-tasks completed")
}
// tickerWithImmediateFirstTick creates a ticker that sends an initial tick immediately, and then ticks
// at the specified interval.
func tickerWithImmediateFirstTick(d time.Duration) (<-chan struct{}, func()) {
ticker := time.NewTicker(d)
ch := make(chan struct{})
// Send an initial tick immediately.
go func() {
ch <- struct{}{}
for range ticker.C { // send ticks at the specified interval
ch <- struct{}{}
}
close(ch)
}()
return ch, func() {
ticker.Stop()
}
}
// subTask is the subtask that is used to query the data provider for the given IDs,
// parse the response, and write the response to the response channel.
func (h *APIQueryHandlerImpl[K, V]) subTask(
ctx context.Context,
ids []K,
responseCh chan<- providertypes.GetResponse[K, V],
) func() error {
return func() error {
// Recover from any panics that occur.
defer func() {
if r := recover(); r != nil {
h.logger.Error("panic occurred in subtask", zap.Any("panic", r), zap.Any("ids", ids))
}
h.logger.Debug("finished subtask", zap.Any("ids", ids))
}()
h.logger.Debug("starting subtask", zap.Any("ids", ids))
h.writeResponse(ctx, responseCh, h.fetcher.Fetch(ctx, ids))
return nil
}
}
// writeResponse is used to write the response to the response channel.
func (h *APIQueryHandlerImpl[K, V]) writeResponse(
ctx context.Context,
responseCh chan<- providertypes.GetResponse[K, V],
response providertypes.GetResponse[K, V],
) {
// Write the response to the response channel. We only do so if the
// context has not been cancelled. Otherwise, we risk writing to a
// channel that is not being read from.
select {
case <-ctx.Done():
h.logger.Debug("context cancelled, stopping write response")
return
case responseCh <- response:
h.logger.Debug("wrote response", zap.String("response", response.String()))
}
// Update the metrics.
for id := range response.Resolved {
h.metrics.AddProviderResponse(h.config.Name, strings.ToLower(id.String()), providertypes.OK)
}
for id, unresolvedResult := range response.UnResolved {
h.metrics.AddProviderResponse(h.config.Name, strings.ToLower(id.String()), unresolvedResult.Code())
}
}