-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathKinesisConfig.java
165 lines (134 loc) · 5.3 KB
/
KinesisConfig.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
package com.xing.connectors;
import java.util.Properties;
/**
* Configuration for Kinesis Worker.
*/
public class KinesisConfig {
private Properties props;
public static final String REGION = "region";
public static final String STREAM_NAME = "streamName";
public static final String MAX_BUFFERED_TIME = "maxBufferedTime";
public static final String MAX_CONNECTIONS = "maxConnections";
public static final String RATE_LIMIT = "rateLimit";
public static final String RECORD_TTL = "ttl";
public static final String METRICS_LEVEL = "metricsLevel";
public static final String METRICS_GRANULARITY = "metricsGranularity";
public static final String METRICS_NAMESPACE = "metricsNamespace";
public static final String AGGREGATION_ENABLED = "aggregation";
public static final String USE_PARTITION_AS_HASH_KEY = "usePartitionAsHashKey";
public static final String FLUSH_SYNC = "flushSync";
public static final String
SINGLE_KINESIS_PRODUCER_PER_PARTITION = "singleKinesisProducerPerPartition";
public static final String PAUSE_CONSUMPTION = "pauseConsumption";
public static final String OUTSTANDING_RECORDS_THRESHOLD = "outstandingRecordsThreshold";
public static final String SLEEP_PERIOD = "sleepPeriod";
public static final String SLEEP_CYCLES = "sleepCycles";
// AWS Role args
public static final String ROLE_ARN = "roleARN";
public static final String ROLE_SESSION_NAME = "roleSessionName";
public static final String ROLE_EXTERNAL_ID = "roleExternalID";
public static final String ROLE_DURATION_SECONDS = "roleDurationSeconds";
public static final String KINESIS_ENDPOINT = "kinesisEndpoint";
public KinesisConfig() {
this.props = new Properties();
}
/**
* Builder for config.
*/
public static class Builder {
private final KinesisConfig config = new KinesisConfig();
public KinesisConfig.Builder fromProperties(Properties props) {
this.config.props.putAll(props);
return this;
}
public Builder withRegion(String region) {
config.props.setProperty(REGION, region);
return this;
}
public Builder withStreamName(String streamName) {
config.props.setProperty(STREAM_NAME, streamName);
return this;
}
public Builder withMaxBufferedTime(String maxBufferedTime) {
config.props.setProperty(MAX_BUFFERED_TIME, maxBufferedTime);
return this;
}
public Builder withMaxConnections(String maxConnections) {
config.props.setProperty(MAX_CONNECTIONS, maxConnections);
return this;
}
public Builder withRateLimit(String rateLimit) {
config.props.setProperty(RATE_LIMIT, rateLimit);
return this;
}
public Builder withTtl(String ttl) {
config.props.setProperty(RECORD_TTL, ttl);
return this;
}
public Builder withMetricsLevel(String metricsLevel) {
config.props.setProperty(METRICS_LEVEL, metricsLevel);
return this;
}
public Builder withMetricsGranularity(String metricsGranularity) {
config.props.setProperty(METRICS_GRANULARITY, metricsGranularity);
return this;
}
public Builder withMetricsNamespace(String metricsNamespace) {
config.props.setProperty(METRICS_NAMESPACE, metricsNamespace);
return this;
}
public Builder withAggregation(String aggregation) {
config.props.setProperty(AGGREGATION_ENABLED, aggregation);
return this;
}
public Builder withUsePartitionAsHashKey(String usePartitionAsHashKey) {
config.props.setProperty(USE_PARTITION_AS_HASH_KEY, usePartitionAsHashKey);
return this;
}
public Builder withFlushSync(String flushSync) {
config.props.setProperty(FLUSH_SYNC, flushSync);
return this;
}
public Builder withSingleKinesisProducerPerPartition(String singleKinesisProducerPerPartition) {
config.props.setProperty(SINGLE_KINESIS_PRODUCER_PER_PARTITION,
singleKinesisProducerPerPartition);
return this;
}
public Builder withPauseConsumption(String pauseConsumption) {
config.props.setProperty(PAUSE_CONSUMPTION, pauseConsumption);
return this;
}
public Builder withOutstandingRecordsThreshold(String outstandingRecordsThreshold) {
config.props.setProperty(OUTSTANDING_RECORDS_THRESHOLD, outstandingRecordsThreshold);
return this;
}
public Builder withSleepPeriod(String sleepPeriod) {
config.props.setProperty(SLEEP_PERIOD, sleepPeriod);
return this;
}
public Builder withSleepCycles(String sleepCycles) {
config.props.setProperty(SLEEP_CYCLES, sleepCycles);
return this;
}
public Builder withRoleArn(String roleArn) {
config.props.setProperty(ROLE_ARN, roleArn);
return this;
}
public Builder withRoleSessionName(String roleSessionName) {
config.props.setProperty(ROLE_SESSION_NAME, roleSessionName);
return this;
}
public Builder withRoleExternalId(String roleExternalId) {
config.props.setProperty(ROLE_EXTERNAL_ID, roleExternalId);
return this;
}
public Builder withRoleDurationSeconds(String roleDurationSeconds) {
config.props.setProperty(ROLE_DURATION_SECONDS, roleDurationSeconds);
return this;
}
public Builder withKinesisEndpoint(String kinesisEndpoint) {
config.props.setProperty(KINESIS_ENDPOINT, kinesisEndpoint);
return this;
}
}
}