diff --git a/kinesis.go b/kinesis.go index e6777a4..d528f08 100644 --- a/kinesis.go +++ b/kinesis.go @@ -91,9 +91,41 @@ type kinesis struct { } func (k *kinesis) initWithStartTime(stream, shard, shardIteratorType, accessKey, secretKey, region string, endpoint string, startTime *time.Time) (*kinesis, error) { - k.startTime = startTime + httpClient := &http.Client{ + Transport: &http.Transport{ + Dial: (&net.Dialer{ + Timeout: DialTimeout, + }).Dial, + TLSHandshakeTimeout: HandShakeTimeout}, + Timeout: HTTPTimeout, + } + sess, err := authenticate(accessKey, secretKey) + awsConf := aws.NewConfig().WithRegion(region).WithHTTPClient(httpClient) + if conf.Debug.Verbose { + awsConf = awsConf.WithLogLevel(aws.LogDebugWithRequestRetries | aws.LogDebugWithRequestErrors) + } + + if k.endPoint != "" { + awsConf = awsConf.WithEndpoint(k.endPoint) + } else if endpoint != "" { + awsConf = awsConf.WithEndpoint(endpoint) + } + + client := awsKinesis.New(sess, awsConf) + + k = &kinesis{ + stream: stream, + shard: shard, + shardIteratorType: "AT_TIMESTAMP", + client: client, + startTime: startTime, + } + if err != nil { + return k, err + } - return k.init(stream, shard, "AT_TIMESTAMP", accessKey, secretKey, region, endpoint) + err = k.initShardIterator() + return k, err } func (k *kinesis) init(stream, shard, shardIteratorType, accessKey, secretKey, region string, endpoint string) (*kinesis, error) {