Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Headers were not being properly sent in OtlpHttpLogExporter #513

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,68 +4,66 @@
*/

import Foundation
import Logging
import GRPC
import Logging
import NIO
import NIOHPACK
import OpenTelemetryApi
import OpenTelemetrySdk
import OpenTelemetryProtocolExporterCommon
import OpenTelemetrySdk

public class OtlpLogExporter : LogRecordExporter {
let channel : GRPCChannel
var logClient : Opentelemetry_Proto_Collector_Logs_V1_LogsServiceNIOClient
let config : OtlpConfiguration
var callOptions : CallOptions
public init(channel: GRPCChannel,
config: OtlpConfiguration = OtlpConfiguration(),
logger: Logging.Logger = Logging.Logger(label: "io.grpc", factory: { _ in SwiftLogNoOpLogHandler() }),
envVarHeaders: [(String, String)]? = EnvVarHeaders.attributes){
self.channel = channel
logClient = Opentelemetry_Proto_Collector_Logs_V1_LogsServiceNIOClient(channel: channel)
self.config = config
let userAgentHeader = (Constants.HTTP.userAgent, Headers.getUserAgentHeader())
if let headers = envVarHeaders {
var updatedHeaders = headers
updatedHeaders.append(userAgentHeader)
callOptions = CallOptions(customMetadata: HPACKHeaders(updatedHeaders), logger: logger)
} else if let headers = config.headers {
var updatedHeaders = headers
updatedHeaders.append(userAgentHeader)
callOptions = CallOptions(customMetadata: HPACKHeaders(updatedHeaders), logger: logger)
}
else {
var headers = [(String, String)]()
headers.append(userAgentHeader)
callOptions = CallOptions(customMetadata: HPACKHeaders(headers), logger: logger)
public class OtlpLogExporter: LogRecordExporter {
let channel: GRPCChannel
var logClient: Opentelemetry_Proto_Collector_Logs_V1_LogsServiceNIOClient
let config: OtlpConfiguration
var callOptions: CallOptions

public init(channel: GRPCChannel,
config: OtlpConfiguration = OtlpConfiguration(),
logger: Logging.Logger = Logging.Logger(label: "io.grpc", factory: { _ in SwiftLogNoOpLogHandler() }),
envVarHeaders: [(String, String)]? = EnvVarHeaders.attributes) {
self.channel = channel
logClient = Opentelemetry_Proto_Collector_Logs_V1_LogsServiceNIOClient(channel: channel)
self.config = config
let userAgentHeader = (Constants.HTTP.userAgent, Headers.getUserAgentHeader())
if let headers = envVarHeaders {
var updatedHeaders = headers
updatedHeaders.append(userAgentHeader)
callOptions = CallOptions(customMetadata: HPACKHeaders(updatedHeaders), logger: logger)
} else if let headers = config.headers {
var updatedHeaders = headers
updatedHeaders.append(userAgentHeader)
callOptions = CallOptions(customMetadata: HPACKHeaders(updatedHeaders), logger: logger)
} else {
var headers = [(String, String)]()
headers.append(userAgentHeader)
callOptions = CallOptions(customMetadata: HPACKHeaders(headers), logger: logger)
}
}
}

public func export(logRecords: [ReadableLogRecord], explicitTimeout: TimeInterval? = nil) -> ExportResult {
let logRequest = Opentelemetry_Proto_Collector_Logs_V1_ExportLogsServiceRequest.with { request in
request.resourceLogs = LogRecordAdapter.toProtoResourceRecordLog(logRecordList: logRecords)

public func export(logRecords: [ReadableLogRecord], explicitTimeout: TimeInterval? = nil) -> ExportResult {
let logRequest = Opentelemetry_Proto_Collector_Logs_V1_ExportLogsServiceRequest.with { request in
request.resourceLogs = LogRecordAdapter.toProtoResourceRecordLog(logRecordList: logRecords)
}
let timeout = min(explicitTimeout ?? TimeInterval.greatestFiniteMagnitude, config.timeout)
if timeout > 0 {
callOptions.timeLimit = TimeLimit.timeout(TimeAmount.nanoseconds(Int64(timeout.toNanoseconds)))
}

let export = logClient.export(logRequest, callOptions: callOptions)
do {
_ = try export.response.wait()
return .success
} catch {
return .failure
}
}
let timeout = min(explicitTimeout ?? TimeInterval.greatestFiniteMagnitude, config.timeout)
if timeout > 0 {
callOptions.timeLimit = TimeLimit.timeout(TimeAmount.nanoseconds(Int64(timeout.toNanoseconds)))

public func shutdown(explicitTimeout: TimeInterval? = nil) {
_ = channel.close()
}


let export = logClient.export(logRequest, callOptions: callOptions)
do {
_ = try export.response.wait()
return .success
} catch {
return .failure

public func forceFlush(explicitTimeout: TimeInterval? = nil) -> ExportResult {
.success
}
}

public func shutdown(explicitTimeout: TimeInterval? = nil) {
_ = channel.close()
}

public func forceFlush(explicitTimeout: TimeInterval? = nil) -> ExportResult {
.success
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,24 @@
*/

import Foundation
import Logging
import GRPC
import Logging
import NIO
import NIOHPACK
import OpenTelemetryApi
import OpenTelemetrySdk
import OpenTelemetryProtocolExporterCommon
import OpenTelemetrySdk

public class OtlpMetricExporter: MetricExporter {
let channel: GRPCChannel
var metricClient: Opentelemetry_Proto_Collector_Metrics_V1_MetricsServiceNIOClient
let config : OtlpConfiguration
var callOptions : CallOptions? = nil


let config: OtlpConfiguration
var callOptions: CallOptions?

public init(channel: GRPCChannel, config: OtlpConfiguration = OtlpConfiguration(), logger: Logging.Logger = Logging.Logger(label: "io.grpc", factory: { _ in SwiftLogNoOpLogHandler() }), envVarHeaders: [(String,String)]? = EnvVarHeaders.attributes) {
public init(channel: GRPCChannel, config: OtlpConfiguration = OtlpConfiguration(), logger: Logging.Logger = Logging.Logger(label: "io.grpc", factory: { _ in SwiftLogNoOpLogHandler() }), envVarHeaders: [(String, String)]? = EnvVarHeaders.attributes) {
self.channel = channel
self.config = config
self.metricClient = Opentelemetry_Proto_Collector_Metrics_V1_MetricsServiceNIOClient(channel: self.channel)
metricClient = Opentelemetry_Proto_Collector_Metrics_V1_MetricsServiceNIOClient(channel: self.channel)
let userAgentHeader = (Constants.HTTP.userAgent, Headers.getUserAgentHeader())
if let headers = envVarHeaders {
var updatedHeaders = headers
Expand All @@ -33,38 +31,37 @@ public class OtlpMetricExporter: MetricExporter {
var updatedHeaders = headers
updatedHeaders.append(userAgentHeader)
callOptions = CallOptions(customMetadata: HPACKHeaders(updatedHeaders), logger: logger)
}
else {
} else {
var headers = [(String, String)]()
headers.append(userAgentHeader)
callOptions = CallOptions(customMetadata: HPACKHeaders(headers), logger: logger)
}
}

public func export(metrics: [Metric], shouldCancel: (() -> Bool)?) -> MetricExporterResultCode {
let exportRequest = Opentelemetry_Proto_Collector_Metrics_V1_ExportMetricsServiceRequest
.with {
$0.resourceMetrics = MetricsAdapter.toProtoResourceMetrics(metricDataList: metrics)
}

if config.timeout > 0 {
metricClient.defaultCallOptions.timeLimit = TimeLimit.timeout(TimeAmount.nanoseconds(Int64(config.timeout.toNanoseconds)))
}

let export = metricClient.export(exportRequest, callOptions: callOptions)

do {
_ = try export.response.wait()
return .success
} catch {
return .failureRetryable
}
}

public func flush() -> SpanExporterResultCode {
return .success
}

public func shutdown() {
_ = channel.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ public class StableOtlpMetricExporter: StableMetricExporter {
public func getAggregationTemporality(for instrument: OpenTelemetrySdk.InstrumentType) -> OpenTelemetrySdk.AggregationTemporality {
return aggregationTemporalitySelector.getAggregationTemporality(for: instrument)
}

let channel: GRPCChannel
var metricClient: Opentelemetry_Proto_Collector_Metrics_V1_MetricsServiceNIOClient
let config: OtlpConfiguration
var callOptions: CallOptions?
var aggregationTemporalitySelector: AggregationTemporalitySelector
var defaultAggregationSelector: DefaultAggregationSelector

public init(channel: GRPCChannel, config: OtlpConfiguration = OtlpConfiguration(), aggregationTemporalitySelector: AggregationTemporalitySelector = AggregationTemporality.alwaysCumulative(),
defaultAggregationSelector: DefaultAggregationSelector = AggregationSelector.instance,
logger: Logging.Logger = Logging.Logger(label: "io.grpc", factory: { _ in SwiftLogNoOpLogHandler() }), envVarHeaders: [(String, String)]? = EnvVarHeaders.attributes)
Expand All @@ -31,7 +31,7 @@ public class StableOtlpMetricExporter: StableMetricExporter {
self.aggregationTemporalitySelector = aggregationTemporalitySelector
self.channel = channel
self.config = config
self.metricClient = Opentelemetry_Proto_Collector_Metrics_V1_MetricsServiceNIOClient(channel: self.channel)
metricClient = Opentelemetry_Proto_Collector_Metrics_V1_MetricsServiceNIOClient(channel: self.channel)
if let headers = envVarHeaders {
callOptions = CallOptions(customMetadata: HPACKHeaders(headers), logger: logger)
} else if let headers = config.headers {
Expand All @@ -40,7 +40,7 @@ public class StableOtlpMetricExporter: StableMetricExporter {
callOptions = CallOptions(logger: logger)
}
}

public func export(metrics: [OpenTelemetrySdk.StableMetricData]) -> OpenTelemetrySdk.ExportResult {
let exportRequest = Opentelemetry_Proto_Collector_Metrics_V1_ExportMetricsServiceRequest.with {
$0.resourceMetrics = MetricsAdapter.toProtoResourceMetrics(stableMetricData: metrics)
Expand All @@ -56,14 +56,14 @@ public class StableOtlpMetricExporter: StableMetricExporter {
return .failure
}
}

public func flush() -> OpenTelemetrySdk.ExportResult {
return .success
}

public func shutdown() -> OpenTelemetrySdk.ExportResult {
_ = channel.close()

return .success
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,67 +4,65 @@
*/

import Foundation
import Logging
import GRPC
import Logging
import NIO
import NIOHPACK
import OpenTelemetryApi
import OpenTelemetrySdk
import OpenTelemetryProtocolExporterCommon
import OpenTelemetrySdk

public class OtlpTraceExporter: SpanExporter {

let channel: GRPCChannel
var traceClient: Opentelemetry_Proto_Collector_Trace_V1_TraceServiceNIOClient
let config : OtlpConfiguration
var callOptions : CallOptions

public init(channel: GRPCChannel, config: OtlpConfiguration = OtlpConfiguration(), logger: Logging.Logger = Logging.Logger(label: "io.grpc", factory: { _ in SwiftLogNoOpLogHandler() }), envVarHeaders: [(String,String)]? = EnvVarHeaders.attributes) {
self.channel = channel
traceClient = Opentelemetry_Proto_Collector_Trace_V1_TraceServiceNIOClient(channel: channel)
self.config = config
let userAgentHeader = (Constants.HTTP.userAgent, Headers.getUserAgentHeader())
if let headers = envVarHeaders {
var updatedHeaders = headers
updatedHeaders.append(userAgentHeader)
callOptions = CallOptions(customMetadata: HPACKHeaders(updatedHeaders), logger: logger)
} else if let headers = config.headers {
var updatedHeaders = headers
updatedHeaders.append(userAgentHeader)
callOptions = CallOptions(customMetadata: HPACKHeaders(updatedHeaders), logger: logger)
} else {
var headers = [(String, String)]()
headers.append(userAgentHeader)
callOptions = CallOptions(customMetadata: HPACKHeaders(headers), logger: logger)
let channel: GRPCChannel
var traceClient: Opentelemetry_Proto_Collector_Trace_V1_TraceServiceNIOClient
let config: OtlpConfiguration
var callOptions: CallOptions

public init(channel: GRPCChannel, config: OtlpConfiguration = OtlpConfiguration(), logger: Logging.Logger = Logging.Logger(label: "io.grpc", factory: { _ in SwiftLogNoOpLogHandler() }), envVarHeaders: [(String, String)]? = EnvVarHeaders.attributes) {
self.channel = channel
traceClient = Opentelemetry_Proto_Collector_Trace_V1_TraceServiceNIOClient(channel: channel)
self.config = config
let userAgentHeader = (Constants.HTTP.userAgent, Headers.getUserAgentHeader())
if let headers = envVarHeaders {
var updatedHeaders = headers
updatedHeaders.append(userAgentHeader)
callOptions = CallOptions(customMetadata: HPACKHeaders(updatedHeaders), logger: logger)
} else if let headers = config.headers {
var updatedHeaders = headers
updatedHeaders.append(userAgentHeader)
callOptions = CallOptions(customMetadata: HPACKHeaders(updatedHeaders), logger: logger)
} else {
var headers = [(String, String)]()
headers.append(userAgentHeader)
callOptions = CallOptions(customMetadata: HPACKHeaders(headers), logger: logger)
}
}
}


public func export(spans: [SpanData], explicitTimeout: TimeInterval? = nil) -> SpanExporterResultCode {
let exportRequest = Opentelemetry_Proto_Collector_Trace_V1_ExportTraceServiceRequest.with {
$0.resourceSpans = SpanAdapter.toProtoResourceSpans(spanDataList: spans)

public func export(spans: [SpanData], explicitTimeout: TimeInterval? = nil) -> SpanExporterResultCode {
let exportRequest = Opentelemetry_Proto_Collector_Trace_V1_ExportTraceServiceRequest.with {
$0.resourceSpans = SpanAdapter.toProtoResourceSpans(spanDataList: spans)
}
let timeout = min(explicitTimeout ?? TimeInterval.greatestFiniteMagnitude, config.timeout)
if timeout > 0 {
callOptions.timeLimit = TimeLimit.timeout(TimeAmount.nanoseconds(Int64(timeout.toNanoseconds)))
}

let export = traceClient.export(exportRequest, callOptions: callOptions)

do {
// wait() on the response to stop the program from exiting before the response is received.
_ = try export.response.wait()
return .success
} catch {
return .failure
}
}
let timeout = min(explicitTimeout ?? TimeInterval.greatestFiniteMagnitude, config.timeout)
if timeout > 0 {
callOptions.timeLimit = TimeLimit.timeout(TimeAmount.nanoseconds(Int64(timeout.toNanoseconds)))

public func flush(explicitTimeout: TimeInterval? = nil) -> SpanExporterResultCode {
return .success
}

let export = traceClient.export(exportRequest, callOptions: callOptions)

do {
// wait() on the response to stop the program from exiting before the response is received.
_ = try export.response.wait()
return .success
} catch {
return .failure

public func shutdown(explicitTimeout: TimeInterval? = nil) {
_ = channel.close()
}
}

public func flush(explicitTimeout: TimeInterval? = nil) -> SpanExporterResultCode {
return .success
}

public func shutdown(explicitTimeout: TimeInterval? = nil) {
_ = channel.close()
}
}
Loading
Loading