diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/MetricConfig.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/MetricConfig.java index 0b873184..5dae93f5 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/MetricConfig.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/MetricConfig.java @@ -23,7 +23,6 @@ public class MetricConfig { private static final Property METRIC_FILE_WRITER_INTERVAL_SECONDS = Property.named("METRIC_FILE_WRITER_INTERVAL_SECONDS", "15", ""); private static final Property METRIC_STREAM_WRITER_INTERVAL_SECONDS = Property.named("METRIC_STREAM_WRITER_INTERVAL_SECONDS", "30", ""); private static final Property METRIC_STREAM_NAME = Property.named("METRIC_STREAM_NAME", "pscmetricsstream", ""); - private static final Property METRIC_SCOPE_NAME = Property.named("METRIC_SCOPE_NAME", "pscmetricsscope", ""); private static final Property METRIC_CONTROLLER_URI = Property.named("PRAVEGA_CONTROLLER_URI", "tcp://localhost:9090", ""); private static final Property METRIC_FILE_PATH = Property.named("METRIC_FILE_PATH", System.getProperty("java.io.tmpdir") + File.separator + "psc_metric.json", ""); @@ -121,7 +120,7 @@ public static MetricConfig getMetricConfigFrom(DeviceDriverConfig ddrConfig) { metricConfig.setStreamWriterIntervalSeconds(Integer.parseInt(ddrConfig.getProperties().getOrDefault(METRIC_STREAM_WRITER_INTERVAL_SECONDS.getName(), METRIC_STREAM_WRITER_INTERVAL_SECONDS.getDefaultValue()))); metricConfig.setMetricStream(ddrConfig.getProperties().getOrDefault(METRIC_STREAM_NAME.getName() + pscId, METRIC_STREAM_NAME.getDefaultValue() + pscId)); metricConfig.setControllerURI(URI.create(ddrConfig.getProperties().getOrDefault(METRIC_CONTROLLER_URI.getName(), METRIC_CONTROLLER_URI.getDefaultValue()))); - metricConfig.setMetricsScope(METRIC_SCOPE_NAME.getDefaultValue()); + metricConfig.setMetricsScope(ddrConfig.getProperties().get("SCOPE")); metricConfig.setMetricFilePath(METRIC_FILE_PATH.getDefaultValue()); return metricConfig; } diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/PravegaClient.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/PravegaClient.java index 57219df8..83484981 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/PravegaClient.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/PravegaClient.java @@ -50,20 +50,18 @@ public PravegaClient(String scope, String streamName) { private EventStreamWriter initializeWriter() { log.info("Initializing writer with {} {} {}", this.scope, this.streamName, this.controllerURI.toString()); - try (StreamManager streamManager = StreamManager.create(controllerURI)) { - final boolean scopeIsNew = streamManager.createScope(scope); + ClientConfig clientConfig = ClientConfig.builder().controllerURI(this.controllerURI).build(); + try (StreamManager streamManager = StreamManager.create(clientConfig)) { StreamConfiguration streamConfig = StreamConfiguration.builder() .scalingPolicy(ScalingPolicy.fixed(1)) .build(); final boolean streamIsNew = streamManager.createStream(scope, streamName, streamConfig); } - EventStreamWriter writer; - try (EventStreamClientFactory clientFactory = EventStreamClientFactory.withScope(scope, - ClientConfig.builder().controllerURI(controllerURI).build())) { - writer = clientFactory.createEventWriter(streamName, - new UTF8StringSerializer(), - EventWriterConfig.builder().build()); - } + EventStreamClientFactory clientFactory = EventStreamClientFactory.withScope(scope, + clientConfig); + EventStreamWriter writer = clientFactory.createEventWriter(streamName, + new UTF8StringSerializer(), + EventWriterConfig.builder().build()); return writer; }