forked from rewardStyle/kinetic
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathkinetic.go
220 lines (200 loc) · 6.42 KB
/
kinetic.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
package kinetic
import (
"context"
"net/http"
"sync"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
)
// kineticOptions is a struct that holds all of Kinetic's configurable parameters.
type kineticOptions struct {
awsConfig *aws.Config //
logLevel aws.LogLevelType // log level for configuring the LogHelper's log level
}
// defaultKineticOptions instantiates a kineticOptions with default values.
func defaultKineticOptions() *kineticOptions {
return &kineticOptions{
awsConfig: aws.NewConfig().WithHTTPClient(
&http.Client{
Timeout: 2 * time.Minute,
}),
logLevel: aws.LogOff,
}
}
// AwsConfigOptionsFn is a method signature for defining functional option methods for configuring Kinetic.
type AwsConfigOptionsFn func(*kineticOptions) error
// AwsConfigCredentials is a functional option method for configuring Kinetic's AwsConfig credentials.
func AwsConfigCredentials(accessKey, secretKey, sessionToken string) AwsConfigOptionsFn {
return func(o *kineticOptions) error {
o.awsConfig.WithCredentials(credentials.NewStaticCredentials(accessKey, secretKey, sessionToken))
return nil
}
}
// AwsConfigRegion is a functional option method for configuring Kinetic's AwsConfig region.
func AwsConfigRegion(region string) AwsConfigOptionsFn {
return func(o *kineticOptions) error {
o.awsConfig.WithRegion(region)
return nil
}
}
// AwsConfigEndpoint is a functional option method for configuring Kinetic's AwsConfig endpoint.
func AwsConfigEndpoint(endpoint string) AwsConfigOptionsFn {
return func(o *kineticOptions) error {
o.awsConfig.WithEndpoint(endpoint)
return nil
}
}
// AwsConfigLogger is a functional option method for configuring Kinetic's AwsConfig logger.
func AwsConfigLogger(logger aws.Logger) AwsConfigOptionsFn {
return func(o *kineticOptions) error {
o.awsConfig.WithLogger(logger)
return nil
}
}
// AwsConfigLogLevel is a functional option method for configuring Kinetic's AwsConfig log level.
func AwsConfigLogLevel(logLevel aws.LogLevelType) AwsConfigOptionsFn {
return func(o *kineticOptions) error {
o.awsConfig.WithLogLevel(logLevel)
return nil
}
}
// AwsConfigHTTPClientTimeout is a functional option method for configuring Kinetic's
// AwsConfig HTTP client timeout.
func AwsConfigHTTPClientTimeout(timeout time.Duration) AwsConfigOptionsFn {
return func(o *kineticOptions) error {
o.awsConfig.WithHTTPClient(&http.Client{
Timeout: timeout,
})
return nil
}
}
// LogLevel is a functional option method for configuring Kinetic's log level.
func LogLevel(logLevel aws.LogLevelType) AwsConfigOptionsFn {
return func(o *kineticOptions) error {
o.logLevel = logLevel & 0xffff0000
return nil
}
}
// Kinetic provides access to a Kinesis client and provides some utility methods for interacting
// with the AWS Kinesis service.
type Kinetic struct {
*kineticOptions
*LogHelper
clientMu sync.Mutex
kclient kinesisiface.KinesisAPI
Session *session.Session
}
// NewKinetic creates a new instance of Kinetic.
func NewKinetic(optionFns ...AwsConfigOptionsFn) (*Kinetic, error) {
kineticOptions := defaultKineticOptions()
for _, optionFn := range optionFns {
optionFn(kineticOptions)
}
sess, err := session.NewSession(kineticOptions.awsConfig)
if err != nil {
return nil, err
}
return &Kinetic{
kineticOptions: kineticOptions,
LogHelper: &LogHelper{
LogLevel: kineticOptions.logLevel,
Logger: sess.Config.Logger,
},
Session: sess,
}, nil
}
func (k *Kinetic) ensureKinesisClient() {
k.clientMu.Lock()
defer k.clientMu.Unlock()
if k.kclient == nil {
k.kclient = kinesis.New(k.Session)
}
}
// CreateStream creates a new Kinesis stream.
func (k *Kinetic) CreateStream(stream string, shards int) error {
k.ensureKinesisClient()
_, err := k.kclient.CreateStream(&kinesis.CreateStreamInput{
StreamName: aws.String(stream),
ShardCount: aws.Int64(int64(shards)),
})
if err != nil {
k.LogError("Error creating kinesis stream:", err)
}
return err
}
// WaitUntilStreamExists is meant to be used after CreateStream to wait until a
// Kinesis stream is ACTIVE.
func (k *Kinetic) WaitUntilStreamExists(ctx context.Context, stream string, opts ...request.WaiterOption) error {
k.ensureKinesisClient()
return k.kclient.WaitUntilStreamExistsWithContext(ctx, &kinesis.DescribeStreamInput{
StreamName: aws.String(stream), // Required
}, opts...)
}
// DeleteStream deletes an existing Kinesis stream.
func (k *Kinetic) DeleteStream(stream string) error {
k.ensureKinesisClient()
_, err := k.kclient.DeleteStream(&kinesis.DeleteStreamInput{
StreamName: aws.String(stream),
})
if err != nil {
k.LogError("Error deleting kinesis stream:", err)
}
return err
}
// WaitUntilStreamDeleted is meant to be used after DeleteStream to wait until a
// Kinesis stream no longer exists.
func (k *Kinetic) WaitUntilStreamDeleted(ctx context.Context, stream string, opts ...request.WaiterOption) error {
k.ensureKinesisClient()
w := request.Waiter{
Name: "WaitUntilStreamIsDeleted",
MaxAttempts: 18,
Delay: request.ConstantWaiterDelay(10 * time.Second),
Acceptors: []request.WaiterAcceptor{
{
State: request.SuccessWaiterState,
Matcher: request.ErrorWaiterMatch,
Expected: kinesis.ErrCodeResourceNotFoundException,
},
},
Logger: k.Session.Config.Logger,
NewRequest: func(opts []request.Option) (*request.Request, error) {
req, _ := k.kclient.DescribeStreamRequest(&kinesis.DescribeStreamInput{
StreamName: aws.String(stream), // Required
})
req.SetContext(ctx)
req.ApplyOptions(opts...)
return req, nil
},
}
w.ApplyOptions(opts...)
return w.WaitWithContext(ctx)
}
// GetShards returns a list of the shards in a Kinesis stream.
func (k *Kinetic) GetShards(stream string) ([]string, error) {
k.ensureKinesisClient()
resp, err := k.kclient.DescribeStream(&kinesis.DescribeStreamInput{
StreamName: aws.String(stream),
})
if err != nil {
k.LogError("Error describing kinesis stream:", err)
return nil, err
}
if resp == nil {
return nil, ErrNilDescribeStreamResponse
}
if resp.StreamDescription == nil {
return nil, ErrNilStreamDescription
}
var shards []string
for _, shard := range resp.StreamDescription.Shards {
if shard.ShardId != nil {
shards = append(shards, aws.StringValue(shard.ShardId))
}
}
return shards, nil
}