diff --git a/Sources/Exporters/OpenTelemetryProtocolGrpc/logs/OtlpLogExporter.swift b/Sources/Exporters/OpenTelemetryProtocolGrpc/logs/OtlpLogExporter.swift index 41b9c9f2..a1a26550 100644 --- a/Sources/Exporters/OpenTelemetryProtocolGrpc/logs/OtlpLogExporter.swift +++ b/Sources/Exporters/OpenTelemetryProtocolGrpc/logs/OtlpLogExporter.swift @@ -4,68 +4,66 @@ */ import Foundation -import Logging import GRPC +import Logging import NIO import NIOHPACK import OpenTelemetryApi -import OpenTelemetrySdk import OpenTelemetryProtocolExporterCommon +import OpenTelemetrySdk -public class OtlpLogExporter : LogRecordExporter { - let channel : GRPCChannel - var logClient : Opentelemetry_Proto_Collector_Logs_V1_LogsServiceNIOClient - let config : OtlpConfiguration - var callOptions : CallOptions - - public init(channel: GRPCChannel, - config: OtlpConfiguration = OtlpConfiguration(), - logger: Logging.Logger = Logging.Logger(label: "io.grpc", factory: { _ in SwiftLogNoOpLogHandler() }), - envVarHeaders: [(String, String)]? = EnvVarHeaders.attributes){ - self.channel = channel - logClient = Opentelemetry_Proto_Collector_Logs_V1_LogsServiceNIOClient(channel: channel) - self.config = config - let userAgentHeader = (Constants.HTTP.userAgent, Headers.getUserAgentHeader()) - if let headers = envVarHeaders { - var updatedHeaders = headers - updatedHeaders.append(userAgentHeader) - callOptions = CallOptions(customMetadata: HPACKHeaders(updatedHeaders), logger: logger) - } else if let headers = config.headers { - var updatedHeaders = headers - updatedHeaders.append(userAgentHeader) - callOptions = CallOptions(customMetadata: HPACKHeaders(updatedHeaders), logger: logger) - } - else { - var headers = [(String, String)]() - headers.append(userAgentHeader) - callOptions = CallOptions(customMetadata: HPACKHeaders(headers), logger: logger) +public class OtlpLogExporter: LogRecordExporter { + let channel: GRPCChannel + var logClient: Opentelemetry_Proto_Collector_Logs_V1_LogsServiceNIOClient + let config: OtlpConfiguration + var callOptions: CallOptions + + public init(channel: GRPCChannel, + config: OtlpConfiguration = OtlpConfiguration(), + logger: Logging.Logger = Logging.Logger(label: "io.grpc", factory: { _ in SwiftLogNoOpLogHandler() }), + envVarHeaders: [(String, String)]? = EnvVarHeaders.attributes) { + self.channel = channel + logClient = Opentelemetry_Proto_Collector_Logs_V1_LogsServiceNIOClient(channel: channel) + self.config = config + let userAgentHeader = (Constants.HTTP.userAgent, Headers.getUserAgentHeader()) + if let headers = envVarHeaders { + var updatedHeaders = headers + updatedHeaders.append(userAgentHeader) + callOptions = CallOptions(customMetadata: HPACKHeaders(updatedHeaders), logger: logger) + } else if let headers = config.headers { + var updatedHeaders = headers + updatedHeaders.append(userAgentHeader) + callOptions = CallOptions(customMetadata: HPACKHeaders(updatedHeaders), logger: logger) + } else { + var headers = [(String, String)]() + headers.append(userAgentHeader) + callOptions = CallOptions(customMetadata: HPACKHeaders(headers), logger: logger) + } } - } - - public func export(logRecords: [ReadableLogRecord], explicitTimeout: TimeInterval? = nil) -> ExportResult { - let logRequest = Opentelemetry_Proto_Collector_Logs_V1_ExportLogsServiceRequest.with { request in - request.resourceLogs = LogRecordAdapter.toProtoResourceRecordLog(logRecordList: logRecords) + + public func export(logRecords: [ReadableLogRecord], explicitTimeout: TimeInterval? = nil) -> ExportResult { + let logRequest = Opentelemetry_Proto_Collector_Logs_V1_ExportLogsServiceRequest.with { request in + request.resourceLogs = LogRecordAdapter.toProtoResourceRecordLog(logRecordList: logRecords) + } + let timeout = min(explicitTimeout ?? TimeInterval.greatestFiniteMagnitude, config.timeout) + if timeout > 0 { + callOptions.timeLimit = TimeLimit.timeout(TimeAmount.nanoseconds(Int64(timeout.toNanoseconds))) + } + + let export = logClient.export(logRequest, callOptions: callOptions) + do { + _ = try export.response.wait() + return .success + } catch { + return .failure + } } - let timeout = min(explicitTimeout ?? TimeInterval.greatestFiniteMagnitude, config.timeout) - if timeout > 0 { - callOptions.timeLimit = TimeLimit.timeout(TimeAmount.nanoseconds(Int64(timeout.toNanoseconds))) + + public func shutdown(explicitTimeout: TimeInterval? = nil) { + _ = channel.close() } - - - let export = logClient.export(logRequest, callOptions: callOptions) - do { - _ = try export.response.wait() - return .success - } catch { - return .failure + + public func forceFlush(explicitTimeout: TimeInterval? = nil) -> ExportResult { + .success } - } - - public func shutdown(explicitTimeout: TimeInterval? = nil) { - _ = channel.close() - } - - public func forceFlush(explicitTimeout: TimeInterval? = nil) -> ExportResult { - .success - } } diff --git a/Sources/Exporters/OpenTelemetryProtocolGrpc/metric/OtlpMetricExporter.swift b/Sources/Exporters/OpenTelemetryProtocolGrpc/metric/OtlpMetricExporter.swift index c44c6c73..51188c0c 100644 --- a/Sources/Exporters/OpenTelemetryProtocolGrpc/metric/OtlpMetricExporter.swift +++ b/Sources/Exporters/OpenTelemetryProtocolGrpc/metric/OtlpMetricExporter.swift @@ -4,26 +4,24 @@ */ import Foundation -import Logging import GRPC +import Logging import NIO import NIOHPACK import OpenTelemetryApi -import OpenTelemetrySdk import OpenTelemetryProtocolExporterCommon +import OpenTelemetrySdk public class OtlpMetricExporter: MetricExporter { let channel: GRPCChannel var metricClient: Opentelemetry_Proto_Collector_Metrics_V1_MetricsServiceNIOClient - let config : OtlpConfiguration - var callOptions : CallOptions? = nil - - + let config: OtlpConfiguration + var callOptions: CallOptions? - public init(channel: GRPCChannel, config: OtlpConfiguration = OtlpConfiguration(), logger: Logging.Logger = Logging.Logger(label: "io.grpc", factory: { _ in SwiftLogNoOpLogHandler() }), envVarHeaders: [(String,String)]? = EnvVarHeaders.attributes) { + public init(channel: GRPCChannel, config: OtlpConfiguration = OtlpConfiguration(), logger: Logging.Logger = Logging.Logger(label: "io.grpc", factory: { _ in SwiftLogNoOpLogHandler() }), envVarHeaders: [(String, String)]? = EnvVarHeaders.attributes) { self.channel = channel self.config = config - self.metricClient = Opentelemetry_Proto_Collector_Metrics_V1_MetricsServiceNIOClient(channel: self.channel) + metricClient = Opentelemetry_Proto_Collector_Metrics_V1_MetricsServiceNIOClient(channel: self.channel) let userAgentHeader = (Constants.HTTP.userAgent, Headers.getUserAgentHeader()) if let headers = envVarHeaders { var updatedHeaders = headers @@ -33,26 +31,25 @@ public class OtlpMetricExporter: MetricExporter { var updatedHeaders = headers updatedHeaders.append(userAgentHeader) callOptions = CallOptions(customMetadata: HPACKHeaders(updatedHeaders), logger: logger) - } - else { + } else { var headers = [(String, String)]() headers.append(userAgentHeader) callOptions = CallOptions(customMetadata: HPACKHeaders(headers), logger: logger) } } - + public func export(metrics: [Metric], shouldCancel: (() -> Bool)?) -> MetricExporterResultCode { let exportRequest = Opentelemetry_Proto_Collector_Metrics_V1_ExportMetricsServiceRequest .with { $0.resourceMetrics = MetricsAdapter.toProtoResourceMetrics(metricDataList: metrics) } - + if config.timeout > 0 { metricClient.defaultCallOptions.timeLimit = TimeLimit.timeout(TimeAmount.nanoseconds(Int64(config.timeout.toNanoseconds))) } - + let export = metricClient.export(exportRequest, callOptions: callOptions) - + do { _ = try export.response.wait() return .success @@ -60,11 +57,11 @@ public class OtlpMetricExporter: MetricExporter { return .failureRetryable } } - + public func flush() -> SpanExporterResultCode { return .success } - + public func shutdown() { _ = channel.close() } diff --git a/Sources/Exporters/OpenTelemetryProtocolGrpc/metric/StableOtlpMetricExporter.swift b/Sources/Exporters/OpenTelemetryProtocolGrpc/metric/StableOtlpMetricExporter.swift index b139e77d..17a7d0ae 100644 --- a/Sources/Exporters/OpenTelemetryProtocolGrpc/metric/StableOtlpMetricExporter.swift +++ b/Sources/Exporters/OpenTelemetryProtocolGrpc/metric/StableOtlpMetricExporter.swift @@ -15,14 +15,14 @@ public class StableOtlpMetricExporter: StableMetricExporter { public func getAggregationTemporality(for instrument: OpenTelemetrySdk.InstrumentType) -> OpenTelemetrySdk.AggregationTemporality { return aggregationTemporalitySelector.getAggregationTemporality(for: instrument) } - + let channel: GRPCChannel var metricClient: Opentelemetry_Proto_Collector_Metrics_V1_MetricsServiceNIOClient let config: OtlpConfiguration var callOptions: CallOptions? var aggregationTemporalitySelector: AggregationTemporalitySelector var defaultAggregationSelector: DefaultAggregationSelector - + public init(channel: GRPCChannel, config: OtlpConfiguration = OtlpConfiguration(), aggregationTemporalitySelector: AggregationTemporalitySelector = AggregationTemporality.alwaysCumulative(), defaultAggregationSelector: DefaultAggregationSelector = AggregationSelector.instance, logger: Logging.Logger = Logging.Logger(label: "io.grpc", factory: { _ in SwiftLogNoOpLogHandler() }), envVarHeaders: [(String, String)]? = EnvVarHeaders.attributes) @@ -31,7 +31,7 @@ public class StableOtlpMetricExporter: StableMetricExporter { self.aggregationTemporalitySelector = aggregationTemporalitySelector self.channel = channel self.config = config - self.metricClient = Opentelemetry_Proto_Collector_Metrics_V1_MetricsServiceNIOClient(channel: self.channel) + metricClient = Opentelemetry_Proto_Collector_Metrics_V1_MetricsServiceNIOClient(channel: self.channel) if let headers = envVarHeaders { callOptions = CallOptions(customMetadata: HPACKHeaders(headers), logger: logger) } else if let headers = config.headers { @@ -40,7 +40,7 @@ public class StableOtlpMetricExporter: StableMetricExporter { callOptions = CallOptions(logger: logger) } } - + public func export(metrics: [OpenTelemetrySdk.StableMetricData]) -> OpenTelemetrySdk.ExportResult { let exportRequest = Opentelemetry_Proto_Collector_Metrics_V1_ExportMetricsServiceRequest.with { $0.resourceMetrics = MetricsAdapter.toProtoResourceMetrics(stableMetricData: metrics) @@ -56,14 +56,14 @@ public class StableOtlpMetricExporter: StableMetricExporter { return .failure } } - + public func flush() -> OpenTelemetrySdk.ExportResult { return .success } - + public func shutdown() -> OpenTelemetrySdk.ExportResult { _ = channel.close() - + return .success } } diff --git a/Sources/Exporters/OpenTelemetryProtocolGrpc/trace/OtlpTraceExporter.swift b/Sources/Exporters/OpenTelemetryProtocolGrpc/trace/OtlpTraceExporter.swift index 5078038f..a7352e9a 100644 --- a/Sources/Exporters/OpenTelemetryProtocolGrpc/trace/OtlpTraceExporter.swift +++ b/Sources/Exporters/OpenTelemetryProtocolGrpc/trace/OtlpTraceExporter.swift @@ -4,67 +4,65 @@ */ import Foundation -import Logging import GRPC +import Logging import NIO import NIOHPACK import OpenTelemetryApi -import OpenTelemetrySdk import OpenTelemetryProtocolExporterCommon +import OpenTelemetrySdk public class OtlpTraceExporter: SpanExporter { - - let channel: GRPCChannel - var traceClient: Opentelemetry_Proto_Collector_Trace_V1_TraceServiceNIOClient - let config : OtlpConfiguration - var callOptions : CallOptions - - public init(channel: GRPCChannel, config: OtlpConfiguration = OtlpConfiguration(), logger: Logging.Logger = Logging.Logger(label: "io.grpc", factory: { _ in SwiftLogNoOpLogHandler() }), envVarHeaders: [(String,String)]? = EnvVarHeaders.attributes) { - self.channel = channel - traceClient = Opentelemetry_Proto_Collector_Trace_V1_TraceServiceNIOClient(channel: channel) - self.config = config - let userAgentHeader = (Constants.HTTP.userAgent, Headers.getUserAgentHeader()) - if let headers = envVarHeaders { - var updatedHeaders = headers - updatedHeaders.append(userAgentHeader) - callOptions = CallOptions(customMetadata: HPACKHeaders(updatedHeaders), logger: logger) - } else if let headers = config.headers { - var updatedHeaders = headers - updatedHeaders.append(userAgentHeader) - callOptions = CallOptions(customMetadata: HPACKHeaders(updatedHeaders), logger: logger) - } else { - var headers = [(String, String)]() - headers.append(userAgentHeader) - callOptions = CallOptions(customMetadata: HPACKHeaders(headers), logger: logger) + let channel: GRPCChannel + var traceClient: Opentelemetry_Proto_Collector_Trace_V1_TraceServiceNIOClient + let config: OtlpConfiguration + var callOptions: CallOptions + + public init(channel: GRPCChannel, config: OtlpConfiguration = OtlpConfiguration(), logger: Logging.Logger = Logging.Logger(label: "io.grpc", factory: { _ in SwiftLogNoOpLogHandler() }), envVarHeaders: [(String, String)]? = EnvVarHeaders.attributes) { + self.channel = channel + traceClient = Opentelemetry_Proto_Collector_Trace_V1_TraceServiceNIOClient(channel: channel) + self.config = config + let userAgentHeader = (Constants.HTTP.userAgent, Headers.getUserAgentHeader()) + if let headers = envVarHeaders { + var updatedHeaders = headers + updatedHeaders.append(userAgentHeader) + callOptions = CallOptions(customMetadata: HPACKHeaders(updatedHeaders), logger: logger) + } else if let headers = config.headers { + var updatedHeaders = headers + updatedHeaders.append(userAgentHeader) + callOptions = CallOptions(customMetadata: HPACKHeaders(updatedHeaders), logger: logger) + } else { + var headers = [(String, String)]() + headers.append(userAgentHeader) + callOptions = CallOptions(customMetadata: HPACKHeaders(headers), logger: logger) + } } - } - - - public func export(spans: [SpanData], explicitTimeout: TimeInterval? = nil) -> SpanExporterResultCode { - let exportRequest = Opentelemetry_Proto_Collector_Trace_V1_ExportTraceServiceRequest.with { - $0.resourceSpans = SpanAdapter.toProtoResourceSpans(spanDataList: spans) + + public func export(spans: [SpanData], explicitTimeout: TimeInterval? = nil) -> SpanExporterResultCode { + let exportRequest = Opentelemetry_Proto_Collector_Trace_V1_ExportTraceServiceRequest.with { + $0.resourceSpans = SpanAdapter.toProtoResourceSpans(spanDataList: spans) + } + let timeout = min(explicitTimeout ?? TimeInterval.greatestFiniteMagnitude, config.timeout) + if timeout > 0 { + callOptions.timeLimit = TimeLimit.timeout(TimeAmount.nanoseconds(Int64(timeout.toNanoseconds))) + } + + let export = traceClient.export(exportRequest, callOptions: callOptions) + + do { + // wait() on the response to stop the program from exiting before the response is received. + _ = try export.response.wait() + return .success + } catch { + return .failure + } } - let timeout = min(explicitTimeout ?? TimeInterval.greatestFiniteMagnitude, config.timeout) - if timeout > 0 { - callOptions.timeLimit = TimeLimit.timeout(TimeAmount.nanoseconds(Int64(timeout.toNanoseconds))) + + public func flush(explicitTimeout: TimeInterval? = nil) -> SpanExporterResultCode { + return .success } - - let export = traceClient.export(exportRequest, callOptions: callOptions) - - do { - // wait() on the response to stop the program from exiting before the response is received. - _ = try export.response.wait() - return .success - } catch { - return .failure + + public func shutdown(explicitTimeout: TimeInterval? = nil) { + _ = channel.close() } - } - - public func flush(explicitTimeout: TimeInterval? = nil) -> SpanExporterResultCode { - return .success - } - - public func shutdown(explicitTimeout: TimeInterval? = nil) { - _ = channel.close() - } } diff --git a/Sources/Exporters/OpenTelemetryProtocolHttp/StableOtlpHTTPExporterBase.swift b/Sources/Exporters/OpenTelemetryProtocolHttp/StableOtlpHTTPExporterBase.swift index 4e01f44b..e63738c5 100644 --- a/Sources/Exporters/OpenTelemetryProtocolHttp/StableOtlpHTTPExporterBase.swift +++ b/Sources/Exporters/OpenTelemetryProtocolHttp/StableOtlpHTTPExporterBase.swift @@ -4,46 +4,53 @@ // import Foundation -import SwiftProtobuf import OpenTelemetryProtocolExporterCommon +import SwiftProtobuf public class StableOtlpHTTPExporterBase { - let endpoint: URL - let httpClient: HTTPClient - let envVarHeaders: [(String, String)]? - let config: OtlpConfiguration - - // MARK: - Init - - public init(endpoint: URL, config: OtlpConfiguration = OtlpConfiguration(), useSession: URLSession? = nil, envVarHeaders: [(String, String)]? = EnvVarHeaders.attributes) { - self.envVarHeaders = envVarHeaders - self.endpoint = endpoint - self.config = config - if let providedSession = useSession { - self.httpClient = HTTPClient(session: providedSession) - } else { - self.httpClient = HTTPClient() - } - } - - public func createRequest(body: Message, endpoint: URL) -> URLRequest { - var request = URLRequest(url: endpoint) - - for header in config.headers ?? [] { - request.addValue(header.1, forHTTPHeaderField: header.0) + let endpoint: URL + let httpClient: HTTPClient + let envVarHeaders: [(String, String)]? + let config: OtlpConfiguration + + // MARK: - Init + + public init(endpoint: URL, config: OtlpConfiguration = OtlpConfiguration(), useSession: URLSession? = nil, envVarHeaders: [(String, String)]? = EnvVarHeaders.attributes) { + self.envVarHeaders = envVarHeaders + self.endpoint = endpoint + self.config = config + if let providedSession = useSession { + httpClient = HTTPClient(session: providedSession) + } else { + httpClient = HTTPClient() + } } - - do { - request.httpMethod = "POST" - request.httpBody = try body.serializedData() - request.setValue(Headers.getUserAgentHeader(), forHTTPHeaderField: Constants.HTTP.userAgent) - request.setValue("application/x-protobuf", forHTTPHeaderField: "Content-Type") - } catch { - print("Error serializing body: \(error)") + + public func createRequest(body: Message, endpoint: URL) -> URLRequest { + var request = URLRequest(url: endpoint) + + if let headers = envVarHeaders { + headers.forEach { key, value in + request.addValue(value, forHTTPHeaderField: key) + } + + } else if let headers = config.headers { + headers.forEach { key, value in + request.addValue(value, forHTTPHeaderField: key) + } + } + + do { + request.httpMethod = "POST" + request.httpBody = try body.serializedData() + request.setValue(Headers.getUserAgentHeader(), forHTTPHeaderField: Constants.HTTP.userAgent) + request.setValue("application/x-protobuf", forHTTPHeaderField: "Content-Type") + } catch { + print("Error serializing body: \(error)") + } + + return request } - - return request - } - - public func shutdown(explicitTimeout: TimeInterval? = nil) { } + + public func shutdown(explicitTimeout: TimeInterval? = nil) { } } diff --git a/Sources/Exporters/OpenTelemetryProtocolHttp/logs/OtlpHttpLogExporter.swift b/Sources/Exporters/OpenTelemetryProtocolHttp/logs/OtlpHttpLogExporter.swift index cc81dda9..2f15b39d 100644 --- a/Sources/Exporters/OpenTelemetryProtocolHttp/logs/OtlpHttpLogExporter.swift +++ b/Sources/Exporters/OpenTelemetryProtocolHttp/logs/OtlpHttpLogExporter.swift @@ -4,85 +4,102 @@ // import Foundation -import OpenTelemetrySdk import OpenTelemetryProtocolExporterCommon +import OpenTelemetrySdk public func defaultOltpHttpLoggingEndpoint() -> URL { - URL(string: "http://localhost:4318/v1/logs")! + URL(string: "http://localhost:4318/v1/logs")! } -public class OtlpHttpLogExporter : OtlpHttpExporterBase, LogRecordExporter { - - var pendingLogRecords: [ReadableLogRecord] = [] - private let exporterLock = Lock() - override public init(endpoint: URL = defaultOltpHttpLoggingEndpoint(), - config: OtlpConfiguration = OtlpConfiguration(), - useSession: URLSession? = nil, - envVarHeaders: [(String,String)]? = EnvVarHeaders.attributes){ - super.init(endpoint: endpoint, config: config, useSession: useSession, envVarHeaders: envVarHeaders) - } - - public func export(logRecords: [OpenTelemetrySdk.ReadableLogRecord], explicitTimeout: TimeInterval? = nil) -> OpenTelemetrySdk.ExportResult { - var sendingLogRecords: [ReadableLogRecord] = [] - exporterLock.withLockVoid { - pendingLogRecords.append(contentsOf: logRecords) - sendingLogRecords = pendingLogRecords - pendingLogRecords = [] - } - - let body = Opentelemetry_Proto_Collector_Logs_V1_ExportLogsServiceRequest.with { request in - request.resourceLogs = LogRecordAdapter.toProtoResourceRecordLog(logRecordList: sendingLogRecords) +public class OtlpHttpLogExporter: OtlpHttpExporterBase, LogRecordExporter { + var pendingLogRecords: [ReadableLogRecord] = [] + private let exporterLock = Lock() + override public init(endpoint: URL = defaultOltpHttpLoggingEndpoint(), + config: OtlpConfiguration = OtlpConfiguration(), + useSession: URLSession? = nil, + envVarHeaders: [(String, String)]? = EnvVarHeaders.attributes) { + super.init(endpoint: endpoint, config: config, useSession: useSession, envVarHeaders: envVarHeaders) } - - var request = createRequest(body: body, endpoint: endpoint) - request.timeoutInterval = min(explicitTimeout ?? TimeInterval.greatestFiniteMagnitude , config.timeout) - httpClient.send(request: request) { [weak self] result in - switch result { - case .success(_): - break - case .failure(let error): - self?.exporterLock.withLockVoid { - self?.pendingLogRecords.append(contentsOf: sendingLogRecords) + + public func export(logRecords: [OpenTelemetrySdk.ReadableLogRecord], explicitTimeout: TimeInterval? = nil) -> OpenTelemetrySdk.ExportResult { + var sendingLogRecords: [ReadableLogRecord] = [] + exporterLock.withLockVoid { + pendingLogRecords.append(contentsOf: logRecords) + sendingLogRecords = pendingLogRecords + pendingLogRecords = [] + } + + let body = Opentelemetry_Proto_Collector_Logs_V1_ExportLogsServiceRequest.with { request in + request.resourceLogs = LogRecordAdapter.toProtoResourceRecordLog(logRecordList: sendingLogRecords) + } + + var request = createRequest(body: body, endpoint: endpoint) + if let headers = envVarHeaders { + headers.forEach { key, value in + request.addValue(value, forHTTPHeaderField: key) + } + + } else if let headers = config.headers { + headers.forEach { key, value in + request.addValue(value, forHTTPHeaderField: key) + } + } + request.timeoutInterval = min(explicitTimeout ?? TimeInterval.greatestFiniteMagnitude, config.timeout) + httpClient.send(request: request) { [weak self] result in + switch result { + case .success: + break + case let .failure(error): + self?.exporterLock.withLockVoid { + self?.pendingLogRecords.append(contentsOf: sendingLogRecords) + } + print(error) + } } - print(error) - } + + return .success } - - return .success - } - - public func forceFlush(explicitTimeout: TimeInterval? = nil) -> ExportResult { - self.flush(explicitTimeout: explicitTimeout) - } - - public func flush(explicitTimeout: TimeInterval? = nil) -> ExportResult { - var exporterResult: ExportResult = .success - var pendingLogRecords: [ReadableLogRecord] = [] - exporterLock.withLockVoid { - pendingLogRecords = self.pendingLogRecords + + public func forceFlush(explicitTimeout: TimeInterval? = nil) -> ExportResult { + flush(explicitTimeout: explicitTimeout) } - - if !pendingLogRecords.isEmpty { - let body = Opentelemetry_Proto_Collector_Logs_V1_ExportLogsServiceRequest.with { request in - request.resourceLogs = LogRecordAdapter.toProtoResourceRecordLog(logRecordList: pendingLogRecords) - } - let semaphore = DispatchSemaphore(value: 0) - var request = createRequest(body: body, endpoint: endpoint) - request.timeoutInterval = min(explicitTimeout ?? TimeInterval.greatestFiniteMagnitude , config.timeout) - - httpClient.send(request: request) { result in - switch result { - case .success(_): - exporterResult = ExportResult.success - case .failure(let error): - print(error) - exporterResult = ExportResult.failure + + public func flush(explicitTimeout: TimeInterval? = nil) -> ExportResult { + var exporterResult: ExportResult = .success + var pendingLogRecords: [ReadableLogRecord] = [] + exporterLock.withLockVoid { + pendingLogRecords = self.pendingLogRecords + } + + if !pendingLogRecords.isEmpty { + let body = Opentelemetry_Proto_Collector_Logs_V1_ExportLogsServiceRequest.with { request in + request.resourceLogs = LogRecordAdapter.toProtoResourceRecordLog(logRecordList: pendingLogRecords) + } + let semaphore = DispatchSemaphore(value: 0) + var request = createRequest(body: body, endpoint: endpoint) + request.timeoutInterval = min(explicitTimeout ?? TimeInterval.greatestFiniteMagnitude, config.timeout) + if let headers = envVarHeaders { + headers.forEach { key, value in + request.addValue(value, forHTTPHeaderField: key) + } + } else if let headers = config.headers { + headers.forEach { key, value in + request.addValue(value, forHTTPHeaderField: key) + } + } + httpClient.send(request: request) { result in + switch result { + case .success: + exporterResult = ExportResult.success + case let .failure(error): + print(error) + exporterResult = ExportResult.failure + } + semaphore.signal() + } + semaphore.wait() } - semaphore.signal() - } - semaphore.wait() + + return exporterResult } - - return exporterResult - } } diff --git a/Sources/Exporters/OpenTelemetryProtocolHttp/trace/OtlpHttpTraceExporter.swift b/Sources/Exporters/OpenTelemetryProtocolHttp/trace/OtlpHttpTraceExporter.swift index ceee8362..fab82651 100644 --- a/Sources/Exporters/OpenTelemetryProtocolHttp/trace/OtlpHttpTraceExporter.swift +++ b/Sources/Exporters/OpenTelemetryProtocolHttp/trace/OtlpHttpTraceExporter.swift @@ -4,85 +4,83 @@ // import Foundation -import OpenTelemetrySdk import OpenTelemetryProtocolExporterCommon +import OpenTelemetrySdk public func defaultOltpHttpTracesEndpoint() -> URL { - URL(string: "http://localhost:4318/v1/traces")! + URL(string: "http://localhost:4318/v1/traces")! } public class OtlpHttpTraceExporter: OtlpHttpExporterBase, SpanExporter { - - - var pendingSpans: [SpanData] = [] - private let exporterLock = Lock() - override - public init(endpoint: URL = defaultOltpHttpTracesEndpoint(), config: OtlpConfiguration = OtlpConfiguration(), - useSession: URLSession? = nil, envVarHeaders: [(String,String)]? = EnvVarHeaders.attributes) { - super.init(endpoint: endpoint, config: config, useSession: useSession) - } - - public func export(spans: [SpanData], explicitTimeout: TimeInterval? = nil) -> SpanExporterResultCode { - var sendingSpans: [SpanData] = [] - exporterLock.withLockVoid { - pendingSpans.append(contentsOf: spans) - sendingSpans = pendingSpans - pendingSpans = [] - } - - let body = Opentelemetry_Proto_Collector_Trace_V1_ExportTraceServiceRequest.with { - $0.resourceSpans = SpanAdapter.toProtoResourceSpans(spanDataList: sendingSpans) - } - var request = createRequest(body: body, endpoint: endpoint) - if let headers = envVarHeaders { - headers.forEach { (key, value) in - request.addValue(value, forHTTPHeaderField: key) - } - - } else if let headers = config.headers { - headers.forEach { (key, value) in - request.addValue(value, forHTTPHeaderField: key) - } + var pendingSpans: [SpanData] = [] + private let exporterLock = Lock() + override + public init(endpoint: URL = defaultOltpHttpTracesEndpoint(), config: OtlpConfiguration = OtlpConfiguration(), + useSession: URLSession? = nil, envVarHeaders: [(String, String)]? = EnvVarHeaders.attributes) { + super.init(endpoint: endpoint, config: config, useSession: useSession) } - httpClient.send(request: request) { [weak self] result in - switch result { - case .success: - break - case .failure(let error): - self?.exporterLock.withLockVoid { - self?.pendingSpans.append(contentsOf: sendingSpans) + + public func export(spans: [SpanData], explicitTimeout: TimeInterval? = nil) -> SpanExporterResultCode { + var sendingSpans: [SpanData] = [] + exporterLock.withLockVoid { + pendingSpans.append(contentsOf: spans) + sendingSpans = pendingSpans + pendingSpans = [] } - print(error) - } - } - return .success - } - - public func flush(explicitTimeout: TimeInterval? = nil) -> SpanExporterResultCode { - var resultValue: SpanExporterResultCode = .success - var pendingSpans: [SpanData] = [] - exporterLock.withLockVoid { - pendingSpans = self.pendingSpans + + let body = Opentelemetry_Proto_Collector_Trace_V1_ExportTraceServiceRequest.with { + $0.resourceSpans = SpanAdapter.toProtoResourceSpans(spanDataList: sendingSpans) + } + var request = createRequest(body: body, endpoint: endpoint) + if let headers = envVarHeaders { + headers.forEach { key, value in + request.addValue(value, forHTTPHeaderField: key) + } + + } else if let headers = config.headers { + headers.forEach { key, value in + request.addValue(value, forHTTPHeaderField: key) + } + } + httpClient.send(request: request) { [weak self] result in + switch result { + case .success: + break + case let .failure(error): + self?.exporterLock.withLockVoid { + self?.pendingSpans.append(contentsOf: sendingSpans) + } + print(error) + } + } + return .success } - if !pendingSpans.isEmpty { - let body = Opentelemetry_Proto_Collector_Trace_V1_ExportTraceServiceRequest.with { - $0.resourceSpans = SpanAdapter.toProtoResourceSpans(spanDataList: pendingSpans) - } - let semaphore = DispatchSemaphore(value: 0) - let request = createRequest(body: body, endpoint: endpoint) - - httpClient.send(request: request) { result in - switch result { - case .success: - break - case .failure(let error): - print(error) - resultValue = .failure + + public func flush(explicitTimeout: TimeInterval? = nil) -> SpanExporterResultCode { + var resultValue: SpanExporterResultCode = .success + var pendingSpans: [SpanData] = [] + exporterLock.withLockVoid { + pendingSpans = self.pendingSpans + } + if !pendingSpans.isEmpty { + let body = Opentelemetry_Proto_Collector_Trace_V1_ExportTraceServiceRequest.with { + $0.resourceSpans = SpanAdapter.toProtoResourceSpans(spanDataList: pendingSpans) + } + let semaphore = DispatchSemaphore(value: 0) + let request = createRequest(body: body, endpoint: endpoint) + + httpClient.send(request: request) { result in + switch result { + case .success: + break + case let .failure(error): + print(error) + resultValue = .failure + } + semaphore.signal() + } + semaphore.wait() } - semaphore.signal() - } - semaphore.wait() + return resultValue } - return resultValue - } } diff --git a/Tests/ExportersTests/OpenTelemetryProtocol/OtlpLogRecordExporterTests.swift b/Tests/ExportersTests/OpenTelemetryProtocol/OtlpLogRecordExporterTests.swift index aa079fd5..f5b29b0e 100644 --- a/Tests/ExportersTests/OpenTelemetryProtocol/OtlpLogRecordExporterTests.swift +++ b/Tests/ExportersTests/OpenTelemetryProtocol/OtlpLogRecordExporterTests.swift @@ -4,8 +4,8 @@ // import Foundation -import Logging import GRPC +import Logging import NIO import OpenTelemetryApi import OpenTelemetryProtocolExporterCommon @@ -13,156 +13,148 @@ import OpenTelemetryProtocolExporterCommon @testable import OpenTelemetrySdk import XCTest - class OtlpLogRecordExporterTests: XCTestCase { - let traceIdBytes: [UInt8] = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 2, 3, 4] - let spanIdBytes: [UInt8] = [0, 0, 0, 0, 4, 3, 2, 1] - var traceId: TraceId! - var spanId: SpanId! - let tracestate = TraceState() - var spanContext: SpanContext! - - - - - var fakeCollector: FakeLogCollector! - var server: EventLoopFuture! - var channel: ClientConnection! - - let channelGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) - let serverGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) - - func startServer() -> EventLoopFuture { - // Start the server and print its address once it has started. - let server = Server.insecure(group: serverGroup) - .withServiceProviders([fakeCollector]) - .bind(host: "localhost", port: 4317) - - server.map { - $0.channel.localAddress - }.whenSuccess { address in - print("server started on port \(address!.port!)") + let traceIdBytes: [UInt8] = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 2, 3, 4] + let spanIdBytes: [UInt8] = [0, 0, 0, 0, 4, 3, 2, 1] + var traceId: TraceId! + var spanId: SpanId! + let tracestate = TraceState() + var spanContext: SpanContext! + + var fakeCollector: FakeLogCollector! + var server: EventLoopFuture! + var channel: ClientConnection! + + let channelGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) + let serverGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) + + func startServer() -> EventLoopFuture { + // Start the server and print its address once it has started. + let server = Server.insecure(group: serverGroup) + .withServiceProviders([fakeCollector]) + .bind(host: "localhost", port: 4317) + + server.map { + $0.channel.localAddress + }.whenSuccess { address in + print("server started on port \(address!.port!)") + } + return server + } + + func startChannel() -> ClientConnection { + let channel = ClientConnection.insecure(group: channelGroup) + .connect(host: "localhost", port: 4317) + return channel + } + + override func setUp() { + fakeCollector = FakeLogCollector() + server = startServer() + channel = startChannel() + traceId = TraceId(fromBytes: traceIdBytes) + spanId = SpanId(fromBytes: spanIdBytes) + spanContext = SpanContext.create(traceId: traceId, spanId: spanId, traceFlags: TraceFlags(), traceState: tracestate) + } + + override func tearDown() { + try! serverGroup.syncShutdownGracefully() + try! channelGroup.syncShutdownGracefully() + } + + func testExport() { + let logRecord = ReadableLogRecord(resource: Resource(), + instrumentationScopeInfo: InstrumentationScopeInfo(name: "scope"), + timestamp: Date(), + observedTimestamp: Date.distantPast, + spanContext: spanContext, + severity: .fatal, + body: AttributeValue.string("Hello, world"), + attributes: ["event.name": AttributeValue.string("name"), "event.domain": AttributeValue.string("domain")]) + + let exporter = OtlpLogExporter(channel: channel) + let result = exporter.export(logRecords: [logRecord]) + XCTAssertEqual(result, ExportResult.success) + XCTAssertEqual(fakeCollector.receivedLogs, LogRecordAdapter.toProtoResourceRecordLog(logRecordList: [logRecord])) + exporter.shutdown() + } + + func testImplicitGrpcLoggingConfig() throws { + let exporter = OtlpLogExporter(channel: channel) + let logger = exporter.callOptions.logger + XCTAssertEqual(logger.label, "io.grpc") + } + + func testExplicitGrpcLoggingConfig() throws { + let exporter = OtlpLogExporter(channel: channel, logger: Logger(label: "my.grpc.logger")) + let logger = exporter.callOptions.logger + XCTAssertEqual(logger.label, "my.grpc.logger") + } + + func testConfigHeadersIsNil_whenDefaultInitCalled() throws { + let exporter = OtlpLogExporter(channel: channel) + XCTAssertNil(exporter.config.headers) + } + + func testConfigHeadersAreSet_whenInitCalledWithCustomConfig() throws { + let config: OtlpConfiguration = OtlpConfiguration(timeout: TimeInterval(10), headers: [("FOO", "BAR")]) + let exporter = OtlpLogExporter(channel: channel, config: config) + XCTAssertNotNil(exporter.config.headers) + XCTAssertEqual(exporter.config.headers?[0].0, "FOO") + XCTAssertEqual(exporter.config.headers?[0].1, "BAR") + XCTAssertEqual("BAR", exporter.callOptions.customMetadata.first(name: "FOO")) + } + + func testConfigHeadersAreSet_whenInitCalledWithExplicitHeaders() throws { + let exporter = OtlpLogExporter(channel: channel, envVarHeaders: [("FOO", "BAR")]) + XCTAssertNil(exporter.config.headers) + XCTAssertEqual("BAR", exporter.callOptions.customMetadata.first(name: "FOO")) + } + + func testExportAfterShutdown() { + let logRecord = ReadableLogRecord(resource: Resource(), + instrumentationScopeInfo: InstrumentationScopeInfo(name: "scope"), + timestamp: Date(), + observedTimestamp: Date.distantPast, + spanContext: spanContext, + severity: .fatal, + body: AttributeValue.string("Hello, world"), + attributes: ["event.name": AttributeValue.string("name"), "event.domain": AttributeValue.string("domain")]) + let exporter = OtlpLogExporter(channel: channel) + exporter.shutdown() + let result = exporter.export(logRecords: [logRecord]) + XCTAssertEqual(result, ExportResult.failure) } - return server - } - - func startChannel() -> ClientConnection { - let channel = ClientConnection.insecure(group: channelGroup) - .connect(host: "localhost", port: 4317) - return channel - } - - override func setUp() { - fakeCollector = FakeLogCollector() - server = startServer() - channel = startChannel() - traceId = TraceId(fromBytes: traceIdBytes) - spanId = SpanId(fromBytes: spanIdBytes) - spanContext = SpanContext.create(traceId: traceId, spanId: spanId, traceFlags: TraceFlags(), traceState: tracestate) - - } - - override func tearDown() { - try! serverGroup.syncShutdownGracefully() - try! channelGroup.syncShutdownGracefully() - } - - func testExport() { - let logRecord = ReadableLogRecord(resource: Resource(), - instrumentationScopeInfo: InstrumentationScopeInfo(name: "scope"), - timestamp: Date(), - observedTimestamp: Date.distantPast, - spanContext: spanContext, - severity: .fatal, - body: AttributeValue.string("Hello, world"), - attributes: ["event.name":AttributeValue.string("name"), "event.domain": AttributeValue.string("domain")]) - - let exporter = OtlpLogExporter(channel: channel) - let result = exporter.export(logRecords: [logRecord]) - XCTAssertEqual(result, ExportResult.success) - XCTAssertEqual(fakeCollector.receivedLogs, LogRecordAdapter.toProtoResourceRecordLog(logRecordList: [logRecord])) - exporter.shutdown() - } - - func testImplicitGrpcLoggingConfig() throws { - let exporter = OtlpLogExporter(channel: channel) - let logger = exporter.callOptions.logger - XCTAssertEqual(logger.label, "io.grpc") - } - func testExplicitGrpcLoggingConfig() throws { - let exporter = OtlpLogExporter(channel: channel, logger: Logger(label: "my.grpc.logger")) - let logger = exporter.callOptions.logger - XCTAssertEqual(logger.label, "my.grpc.logger") - } - - func testConfigHeadersIsNil_whenDefaultInitCalled() throws { - let exporter = OtlpLogExporter(channel: channel) - XCTAssertNil(exporter.config.headers) - } - - func testConfigHeadersAreSet_whenInitCalledWithCustomConfig() throws { - let config: OtlpConfiguration = OtlpConfiguration(timeout: TimeInterval(10), headers: [("FOO", "BAR")]) - let exporter = OtlpLogExporter(channel: channel, config: config) - XCTAssertNotNil(exporter.config.headers) - XCTAssertEqual(exporter.config.headers?[0].0, "FOO") - XCTAssertEqual(exporter.config.headers?[0].1, "BAR") - XCTAssertEqual("BAR", exporter.callOptions.customMetadata.first(name: "FOO")) - } - - func testConfigHeadersAreSet_whenInitCalledWithExplicitHeaders() throws { - let exporter = OtlpLogExporter(channel: channel, envVarHeaders: [("FOO", "BAR")]) - XCTAssertNil(exporter.config.headers) - XCTAssertEqual("BAR", exporter.callOptions.customMetadata.first(name: "FOO")) - } - - func testExportAfterShutdown() { - let logRecord = ReadableLogRecord(resource: Resource(), - instrumentationScopeInfo: InstrumentationScopeInfo(name: "scope"), - timestamp: Date(), - observedTimestamp: Date.distantPast, - spanContext: spanContext, - severity: .fatal, - body: AttributeValue.string("Hello, world"), - attributes: ["event.name":AttributeValue.string("name"), "event.domain": AttributeValue.string("domain")]) - let exporter = OtlpLogExporter(channel: channel) - exporter.shutdown() - let result = exporter.export(logRecords: [logRecord]) - XCTAssertEqual(result, ExportResult.failure) - } - - func testExportCancelled() { - fakeCollector.returnedStatus = GRPCStatus(code: .cancelled, message: nil) - let exporter = OtlpLogExporter(channel: channel) - let logRecord = ReadableLogRecord(resource: Resource(), - instrumentationScopeInfo: InstrumentationScopeInfo(name: "scope"), - timestamp: Date(), - observedTimestamp: Date.distantPast, - spanContext: spanContext, - severity: .fatal, - body: AttributeValue.string("Hello, world"), - attributes: ["event.name":AttributeValue.string("name"), - "event.domain": AttributeValue.string("domain")]) - let result = exporter.export(logRecords: [logRecord]) - XCTAssertEqual(result, ExportResult.failure) - exporter.shutdown() - } - -} + func testExportCancelled() { + fakeCollector.returnedStatus = GRPCStatus(code: .cancelled, message: nil) + let exporter = OtlpLogExporter(channel: channel) + let logRecord = ReadableLogRecord(resource: Resource(), + instrumentationScopeInfo: InstrumentationScopeInfo(name: "scope"), + timestamp: Date(), + observedTimestamp: Date.distantPast, + spanContext: spanContext, + severity: .fatal, + body: AttributeValue.string("Hello, world"), + attributes: ["event.name": AttributeValue.string("name"), + "event.domain": AttributeValue.string("domain")]) + let result = exporter.export(logRecords: [logRecord]) + XCTAssertEqual(result, ExportResult.failure) + exporter.shutdown() + } +} +class FakeLogCollector: Opentelemetry_Proto_Collector_Logs_V1_LogsServiceProvider { + var interceptors: Opentelemetry_Proto_Collector_Logs_V1_LogsServiceServerInterceptorFactoryProtocol? -class FakeLogCollector: Opentelemetry_Proto_Collector_Logs_V1_LogsServiceProvider { - var interceptors: Opentelemetry_Proto_Collector_Logs_V1_LogsServiceServerInterceptorFactoryProtocol? - - func export(request: Opentelemetry_Proto_Collector_Logs_V1_ExportLogsServiceRequest, context: GRPC.StatusOnlyCallContext) -> NIOCore.EventLoopFuture { - receivedLogs.append(contentsOf: request.resourceLogs) - if returnedStatus != GRPCStatus.ok { - return context.eventLoop.makeFailedFuture(returnedStatus) + func export(request: Opentelemetry_Proto_Collector_Logs_V1_ExportLogsServiceRequest, context: GRPC.StatusOnlyCallContext) -> NIOCore.EventLoopFuture { + receivedLogs.append(contentsOf: request.resourceLogs) + if returnedStatus != GRPCStatus.ok { + return context.eventLoop.makeFailedFuture(returnedStatus) + } + return context.eventLoop.makeSucceededFuture(Opentelemetry_Proto_Collector_Logs_V1_ExportLogsServiceResponse()) } - return context.eventLoop.makeSucceededFuture(Opentelemetry_Proto_Collector_Logs_V1_ExportLogsServiceResponse()) - } - - var receivedLogs = [Opentelemetry_Proto_Logs_V1_ResourceLogs]() - var returnedStatus = GRPCStatus.ok - + + var receivedLogs = [Opentelemetry_Proto_Logs_V1_ResourceLogs]() + var returnedStatus = GRPCStatus.ok } diff --git a/Tests/ExportersTests/OpenTelemetryProtocol/OtlpMetricExporterTests.swift b/Tests/ExportersTests/OpenTelemetryProtocol/OtlpMetricExporterTests.swift index 60285dff..d567539d 100644 --- a/Tests/ExportersTests/OpenTelemetryProtocol/OtlpMetricExporterTests.swift +++ b/Tests/ExportersTests/OpenTelemetryProtocol/OtlpMetricExporterTests.swift @@ -4,8 +4,8 @@ */ import Foundation -import Logging import GRPC +import Logging import NIO import OpenTelemetryApi import OpenTelemetryProtocolExporterCommon @@ -185,7 +185,6 @@ class OtlpMetricExproterTests: XCTestCase { } XCTFail("User-Agent header was not set correctly") } - } class FakeMetricCollector: Opentelemetry_Proto_Collector_Metrics_V1_MetricsServiceProvider { @@ -194,8 +193,7 @@ class FakeMetricCollector: Opentelemetry_Proto_Collector_Metrics_V1_MetricsServi var receivedMetrics = [Opentelemetry_Proto_Metrics_V1_ResourceMetrics]() var returnedStatus = GRPCStatus.ok func export(request: Opentelemetry_Proto_Collector_Metrics_V1_ExportMetricsServiceRequest, context: StatusOnlyCallContext) -> - EventLoopFuture - { + EventLoopFuture { receivedMetrics.append(contentsOf: request.resourceMetrics) if returnedStatus != GRPCStatus.ok { return context.eventLoop.makeFailedFuture(returnedStatus) diff --git a/Tests/ExportersTests/OpenTelemetryProtocol/OtlpTraceExporterTests.swift b/Tests/ExportersTests/OpenTelemetryProtocol/OtlpTraceExporterTests.swift index ee7befb6..e8531e99 100644 --- a/Tests/ExportersTests/OpenTelemetryProtocol/OtlpTraceExporterTests.swift +++ b/Tests/ExportersTests/OpenTelemetryProtocol/OtlpTraceExporterTests.swift @@ -4,8 +4,8 @@ */ import Foundation -import Logging import GRPC +import Logging import NIO import OpenTelemetryApi import OpenTelemetryProtocolExporterCommon @@ -66,7 +66,7 @@ class OtlpTraceExporterTests: XCTestCase { if customMetadata.contains(name: Constants.HTTP.userAgent) && customMetadata.first(name: Constants.HTTP.userAgent) == userAgent { return } - + XCTFail("User-Agent header was not set correctly") } @@ -169,7 +169,7 @@ class OtlpTraceExporterTests: XCTestCase { class FakeCollector: Opentelemetry_Proto_Collector_Trace_V1_TraceServiceProvider { var receivedSpans = [Opentelemetry_Proto_Trace_V1_ResourceSpans]() var returnedStatus = GRPCStatus.ok - var interceptors: Opentelemetry_Proto_Collector_Trace_V1_TraceServiceServerInterceptorFactoryProtocol? = nil + var interceptors: Opentelemetry_Proto_Collector_Trace_V1_TraceServiceServerInterceptorFactoryProtocol? func export(request: Opentelemetry_Proto_Collector_Trace_V1_ExportTraceServiceRequest, context: StatusOnlyCallContext) -> EventLoopFuture { receivedSpans.append(contentsOf: request.resourceSpans)