Skip to content

Commit

Permalink
supply library versions in instrumented producer (#142)
Browse files Browse the repository at this point in the history
  • Loading branch information
radai-rosenblatt authored Sep 11, 2019
1 parent 8ce07a2 commit 848cd91
Showing 1 changed file with 20 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class LiKafkaInstrumentedProducerImpl<K, V> implements DelegatingProducer
private final long initialConnectionTimeoutMs = TimeUnit.SECONDS.toMillis(30);
private final ReadWriteLock delegateLock = new ReentrantReadWriteLock();
private final Properties baseConfig;
private final Map<String, String> libraryVersions;
private final ProducerFactory<K, V> producerFactory;
private final CountDownLatch initialConnectionLatch = new CountDownLatch(1);
private final MetricsProxy metricsProxy = new MetricsProxy() {
Expand All @@ -74,14 +75,28 @@ public class LiKafkaInstrumentedProducerImpl<K, V> implements DelegatingProducer
private Method boundedFlushMethod;
private final AtomicInteger boundFlushThreadCount = new AtomicInteger();

@Deprecated
public LiKafkaInstrumentedProducerImpl(
Properties baseConfig,
ProducerFactory<K, V> producerFactory,
Supplier<String> mdsUrlSupplier
) {
this(baseConfig, null, producerFactory, mdsUrlSupplier);
}

public LiKafkaInstrumentedProducerImpl(
Properties baseConfig,
Map<String, String> libraryVersions,
ProducerFactory<K, V> producerFactory,
Supplier<String> mdsUrlSupplier
) {
List<String> conversionIssues = new ArrayList<>(1);
this.baseConfig = baseConfig;
Map<String, String> translatedBaseConfig = LiKafkaClientsUtils.propertiesToStringMap(baseConfig, conversionIssues);
this.libraryVersions = LiKafkaClientsUtils.getKnownLibraryVersions();
if (libraryVersions != null && !libraryVersions.isEmpty()) {
this.libraryVersions.putAll(libraryVersions); //allow user arguments to override builtins
}
this.producerFactory = producerFactory;

if (!conversionIssues.isEmpty()) {
Expand All @@ -90,9 +105,12 @@ public LiKafkaInstrumentedProducerImpl(
LOG.error("issues translating producer config to strings: {}", csv);
}

mdsClient = new SimpleClient(mdsUrlSupplier,
mdsClient = new SimpleClient(
mdsUrlSupplier,
TimeUnit.MINUTES.toMillis(1),
TimeUnit.HOURS.toMillis(1), translatedBaseConfig,
TimeUnit.HOURS.toMillis(1),
this.libraryVersions,
translatedBaseConfig,
this
);

Expand Down

0 comments on commit 848cd91

Please sign in to comment.