Skip to content

Commit

Permalink
Federated Clients: implement various federated consumer methods
Browse files Browse the repository at this point in the history
  • Loading branch information
jonlee2 committed May 17, 2019
1 parent 1db12d5 commit d9f853a
Show file tree
Hide file tree
Showing 3 changed files with 291 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class LiKafkaConsumerConfig extends AbstractConfig {
public static final String VALUE_DESERIALIZER_CLASS_CONFIG = ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
public static final String SEGMENT_DESERIALIZER_CLASS_CONFIG = "segment.deserializer.class";
public static final String AUDITOR_CLASS_CONFIG = "auditor.class";
public static final String DEFAULT_API_TIMEOUT_MS_CONFIG = "default.api.timeout.ms";
public static final String ENABLE_AUTO_COMMIT_CONFIG = ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
public static final String AUTO_COMMIT_INTERVAL_MS_CONFIG = ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG;
public static final String AUTO_OFFSET_RESET_CONFIG = ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
Expand Down Expand Up @@ -92,6 +93,10 @@ public class LiKafkaConsumerConfig extends AbstractConfig {

private static final String MAX_POLL_RECORDS_DOC = "The maximum number of records returned in a single call to poll().";

private static final String DEFAULT_API_TIMEOUT_MS_DOC = "Specifies the timeout (in milliseconds) for consumer APIs" +
" that could block. This configuration is used as the default timeout for all consumer operations that do not" +
" explicitly accept a <code>timeout</code> parameter.";

public static final String METADATA_SERVICE_CLIENT_CLASS_DOC =
LiKafkaCommonClientConfigs.METADATA_SERVICE_CLIENT_CLASS_DOC;

Expand Down Expand Up @@ -187,7 +192,13 @@ public class LiKafkaConsumerConfig extends AbstractConfig {
Type.INT,
Integer.MAX_VALUE,
Importance.MEDIUM,
METADATA_SERVICE_REQUEST_TIMEOUT_MS_DOC);
METADATA_SERVICE_REQUEST_TIMEOUT_MS_DOC)
.define(DEFAULT_API_TIMEOUT_MS_CONFIG,
Type.INT,
60 * 1000,
atLeast(0),
Importance.MEDIUM,
DEFAULT_API_TIMEOUT_MS_DOC);
}

public LiKafkaConsumerConfig(Map<?, ?> props) {
Expand Down
Loading

0 comments on commit d9f853a

Please sign in to comment.