-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy patherrors.go
211 lines (181 loc) · 6.83 KB
/
errors.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
// Copyright 2022 The incite Authors. All rights reserved.
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file.
package incite
import (
"errors"
"fmt"
"io"
"strings"
"syscall"
"time"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
)
var (
// ErrClosed is the error returned by a read or query operation
// when the underlying stream or query manager has been closed.
ErrClosed = errors.New("incite: operation on a closed object")
)
// StartQueryError is returned by Stream.Read to indicate that the
// CloudWatch Logs service API returned a fatal error when attempting
// to start a chunk of the stream's query.
//
// When StartQueryError is returned by Stream.Read, the stream's query
// is considered failed and all subsequent reads on the stream will
// return an error.
type StartQueryError struct {
// Text is the text of the query that could not be started.
Text string
// Start is the start time of the query chunk that could not be
// started. If the query has more than one chunk, this could differ
// from the value originally set in the QuerySpec.
Start time.Time
// End is the end time of the query chunk that could not be started
// If the query has more than one chunk, this could differ from the
// value originally set in the QuerySpec.
End time.Time
// Cause is the causing error, which will typically be an AWS SDK
// for Go error type.
Cause error
}
func (err *StartQueryError) Error() string {
return fmt.Sprintf("incite: CloudWatch Logs failed to start query for chunk %q [%s..%s): %s", err.Text, err.Start, err.End, err.Cause)
}
func (err *StartQueryError) Unwrap() error {
return err.Cause
}
// TerminalQueryStatusError is returned by Stream.Read when CloudWatch
// Logs Insights indicated that a chunk of the stream's query is in a
// failed status, such as Cancelled, Failed, or Timeout.
//
// When TerminalQueryStatusError is returned by Stream.Read, the
// stream's query is considered failed and all subsequent reads on the
// stream will return an error.
type TerminalQueryStatusError struct {
// QueryID is the CloudWatch Logs Insights query ID of the chunk
// that was reported in a terminal status.
QueryID string
// Status is the status string returned by CloudWatch Logs via the
// GetQueryResults API act.
Status string
// Text is the text of the query that was reported in terminal
// status.
Text string
}
func (err *TerminalQueryStatusError) Error() string {
return fmt.Sprintf("incite: query ID %q has terminal status %q [query text %q]", err.QueryID, err.Status, err.Text)
}
// UnexpectedQueryError is returned by Stream.Read when the CloudWatch
// Logs Insights API behaved unexpectedly while Incite was polling a
// chunk status via the CloudWatch Logs GetQueryResults API act.
//
// When UnexpectedQueryError is returned by Stream.Read, the stream's
// query is considered failed and all subsequent reads on the stream
// will return an error.
type UnexpectedQueryError struct {
// QueryID is the CloudWatch Logs Insights query ID of the chunk
// that experienced an unexpected event.
QueryID string
// Text is the text of the query for the chunk.
Text string
// Cause is the causing error.
Cause error
}
func (err *UnexpectedQueryError) Error() string {
return fmt.Sprintf("incite: query ID %q had unexpected error [query text %q]: %s", err.QueryID, err.Text, err.Cause)
}
func (err *UnexpectedQueryError) Unwrap() error {
return err.Cause
}
func errNilStatus() error {
return errors.New(outputMissingStatusMsg)
}
func errNilResultField(i int) error {
return fmt.Errorf("incite: result field [%d] is nil", i)
}
func errNoKey() error {
return errors.New(fieldMissingKeyMsg)
}
func errNoValue(key string) error {
return fmt.Errorf("incite: result field missing value for key %q", key)
}
type errorClass int
const (
permanentClass errorClass = iota
throttlingClass
limitExceededClass
temporaryClass
)
func classifyError(err error) errorClass {
if x, ok := err.(awserr.Error); ok {
// Short-circuit if the HTTP status code indicates retryability.
if f, ok := err.(awserr.RequestFailure); ok {
status := f.StatusCode()
if status == 429 {
return throttlingClass
} else if status == 502 || status == 503 || status == 504 {
return temporaryClass
}
}
// Check for known CloudWatch Logs retryability codes.
switch x.Code() {
case cloudwatchlogs.ErrCodeLimitExceededException:
return limitExceededClass
case cloudwatchlogs.ErrCodeServiceUnavailableException:
return temporaryClass
}
// Check for throttling using common AWS service patterns for indicating
// throttling via exception. Omit 'e' suffix on 'throttl' to match
// Throttled and Throttling.
if strings.Contains(strings.ToLower(x.Code()), "throttl") ||
strings.Contains(strings.ToLower(x.Message()), "rate exceeded") {
return throttlingClass
}
// Recursively examine the cause error, if any.
return classifyError(x.OrigErr())
}
// TODO: We may also want to check for io.ErrUnexpectedEOF.
if errors.Is(err, io.EOF) {
return temporaryClass
}
var maybeTimeout interface{ Timeout() bool }
if errors.As(err, &maybeTimeout) && maybeTimeout.Timeout() {
return temporaryClass
}
var errno syscall.Errno
if errors.As(err, &errno) {
switch errno {
case syscall.ECONNREFUSED, syscall.ECONNRESET:
return temporaryClass
default:
return permanentClass
}
}
return permanentClass
}
const (
nilActionsMsg = "incite: nil actions"
nilStreamMsg = "incite: nil stream"
nilContextMsg = "incite: nil context"
textBlankMsg = "incite: blank query text"
startSubMillisecondMsg = "incite: start has sub-millisecond granularity"
endSubMillisecondMsg = "incite: end has sub-millisecond granularity"
endNotAfterStartMsg = "incite: end not after start"
noGroupsMsg = "incite: no log groups"
exceededMaxLimitMsg = "incite: exceeded MaxLimit"
chunkSubMillisecondMsg = "incite: chunk has sub-millisecond granularity"
splitUntilSubMillisecondMsg = "incite: split-until has sub-millisecond granularity"
splitUntilWithPreviewMsg = "incite: split-until incompatible with preview"
splitUntilWithoutMaxLimitMsg = "incite: split-until requires MaxLimit"
outputMissingQueryIDMsg = "incite: nil query ID in StartQuery output from CloudWatch Logs"
outputMissingStatusMsg = "incite: nil status in GetQueryResults output from CloudWatch Logs"
fieldMissingKeyMsg = "incite: result field missing key"
)
var (
errClosing = errors.New("incite: closing")
errReduceParallel = errors.New("incite: exceeded concurrency limit, reduce parallelism")
errStopChunk = errors.New("incite: owning stream died, cancel chunk")
errRestartChunk = errors.New("incite: transient chunk failure, restart chunk")
errSplitChunk = errors.New("incite: chunk maxed, split chunk")
)