From 85dcbcbff92ee5dc3c3489caddf015d1dc492eeb Mon Sep 17 00:00:00 2001 From: Assaf Tzur-El Date: Mon, 4 Mar 2019 19:09:41 +0200 Subject: [PATCH] Make OmsOutput extensible (#306) --- .../OmsOutput.cs | 135 +++++++++++------- 1 file changed, 87 insertions(+), 48 deletions(-) diff --git a/src/Microsoft.Diagnostics.EventFlow.Outputs.Oms/OmsOutput.cs b/src/Microsoft.Diagnostics.EventFlow.Outputs.Oms/OmsOutput.cs index 51eb4e22..0cb1c783 100644 --- a/src/Microsoft.Diagnostics.EventFlow.Outputs.Oms/OmsOutput.cs +++ b/src/Microsoft.Diagnostics.EventFlow.Outputs.Oms/OmsOutput.cs @@ -3,6 +3,11 @@ // Licensed under the MIT License (MIT). See License.txt in the repo root for license information. // ------------------------------------------------------------ +using Microsoft.Diagnostics.EventFlow.Configuration; +using Microsoft.Diagnostics.EventFlow.Utilities; +using Microsoft.Extensions.Configuration; +using Newtonsoft.Json; +using Newtonsoft.Json.Linq; using System; using System.Collections.Generic; using System.Diagnostics; @@ -12,24 +17,19 @@ using System.Text; using System.Threading; using System.Threading.Tasks; -using Microsoft.Extensions.Configuration; -using Newtonsoft.Json; using Validation; -using Microsoft.Diagnostics.EventFlow.Configuration; -using Microsoft.Diagnostics.EventFlow.Utilities; - namespace Microsoft.Diagnostics.EventFlow.Outputs { public class OmsOutput : IOutput { - const string OmsDataUploadResource = "/api/logs"; - const string OmsDataUploadUrl = OmsDataUploadResource + "?api-version=2016-04-01"; - const string MsDateHeaderName = "x-ms-date"; - const string JsonContentId = "application/json"; + private const string OmsDataUploadResource = "/api/logs"; + private const string OmsDataUploadUrl = OmsDataUploadResource + "?api-version=2016-04-01"; + private const string MsDateHeaderName = "x-ms-date"; + private const string JsonContentId = "application/json"; private readonly IHealthReporter healthReporter; - private OmsConnectionData connectionData; + private readonly OmsConnectionData connectionData; public OmsOutput(IConfiguration configuration, IHealthReporter healthReporter) { @@ -44,7 +44,8 @@ public OmsOutput(IConfiguration configuration, IHealthReporter healthReporter) } catch { - healthReporter.ReportProblem($"Invalid {nameof(OmsOutput)} configuration encountered: '{configuration.ToString()}'", + healthReporter.ReportProblem( + $"Invalid {nameof(OmsOutput)} configuration encountered: '{configuration.ToString()}'", EventFlowContextIdentifiers.Configuration); throw; } @@ -61,9 +62,8 @@ public OmsOutput(OmsOutputConfiguration configuration, IHealthReporter healthRep this.connectionData = CreateConnectionData(configuration); } - private OmsConnectionData CreateConnectionData(OmsOutputConfiguration configuration) + protected virtual OmsConnectionData CreateConnectionData(OmsOutputConfiguration configuration) { - Debug.Assert(this.healthReporter != null); Debug.Assert(configuration != null); string workspaceId = configuration.WorkspaceId; @@ -80,30 +80,35 @@ private OmsConnectionData CreateConnectionData(OmsOutputConfiguration configurat } var hasher = new HMACSHA256(Convert.FromBase64String(omsWorkspaceKeyBase64)); - var retryHandler = new HttpExponentialRetryMessageHandler(); var httpClient = new HttpClient(retryHandler); if (!string.IsNullOrWhiteSpace(configuration.ServiceDomain)) { - httpClient.BaseAddress = new Uri($"https://{workspaceId}." + configuration.ServiceDomain, UriKind.Absolute); + httpClient.BaseAddress = new Uri($"https://{workspaceId}.{configuration.ServiceDomain}", UriKind.Absolute); } else { httpClient.BaseAddress = new Uri($"https://{workspaceId}.ods.opinsights.azure.com", UriKind.Absolute); - } + } string logTypeName = configuration.LogTypeName; if (string.IsNullOrWhiteSpace(logTypeName)) { logTypeName = "Event"; } - httpClient.DefaultRequestHeaders.Add("Log-Type", logTypeName); - return new OmsConnectionData { HttpClient = httpClient, Hasher = hasher, WorkspaceId = workspaceId }; + return new OmsConnectionData + { + HttpClient = httpClient, + Hasher = hasher, + WorkspaceId = workspaceId, + LogTypeName = logTypeName + }; } - public async Task SendEventsAsync(IReadOnlyCollection events, long transmissionSequenceNumber, CancellationToken cancellationToken) + public async Task SendEventsAsync(IReadOnlyCollection events, long transmissionSequenceNumber, + CancellationToken cancellationToken) { if (this.connectionData == null || events == null || events.Count == 0) { @@ -112,47 +117,80 @@ public async Task SendEventsAsync(IReadOnlyCollection events, long tr try { - string jsonData = JsonConvert.SerializeObject(events); - - string dateString = DateTime.UtcNow.ToString("r"); - HttpContent content = new StringContent(jsonData, Encoding.UTF8, JsonContentId); - string signature = this.BuildSignature(content.Headers.ContentLength.Value, dateString); - - content.Headers.ContentType = new MediaTypeHeaderValue(JsonContentId); - HttpRequestMessage request = new HttpRequestMessage(HttpMethod.Post, OmsDataUploadUrl); - request.Headers.Add("Authorization", signature); - request.Headers.Add(MsDateHeaderName, dateString); - request.Content = content; - - // SendAsync is thread safe - HttpResponseMessage response = await this.connectionData.HttpClient.SendAsync(request, cancellationToken).ConfigureAwait(false); - if (response.IsSuccessStatusCode) - { - this.healthReporter.ReportHealthy(); - } - else + var logTypeToData = PrepareEventData(events); + + foreach (var logTypeAndJsonPair in logTypeToData) { - string responseContent = string.Empty; - try + var logType = logTypeAndJsonPair.Key; + var jsonData = logTypeAndJsonPair.Value; + string dateString = DateTime.UtcNow.ToString("r"); + HttpContent content = new StringContent(jsonData, Encoding.UTF8, JsonContentId); + string signature = this.BuildSignature(content.Headers.ContentLength.Value, dateString); + + content.Headers.ContentType = new MediaTypeHeaderValue(JsonContentId); + HttpRequestMessage request = new HttpRequestMessage(HttpMethod.Post, OmsDataUploadUrl); + request.Headers.Add("Log-Type", logType); + request.Headers.Add("Authorization", signature); + request.Headers.Add(MsDateHeaderName, dateString); + request.Content = content; + + // SendAsync is thread safe + using (var response = await connectionData.HttpClient.SendAsync(request, cancellationToken) + .ConfigureAwait(false)) { - responseContent = await response.Content.ReadAsStringAsync(); + if (response.IsSuccessStatusCode) + { + this.healthReporter.ReportHealthy(); + } + else + { + string responseContent = string.Empty; + try + { + responseContent = await response.Content.ReadAsStringAsync(); + } + catch { } + + string errorMessage = + $"{nameof(OmsOutput)}: OMS REST API returned an error. Code: {response.StatusCode} Description: {response.ReasonPhrase} {responseContent}"; + this.healthReporter.ReportProblem(errorMessage); + } } - catch { } - - string errorMessage = $"{nameof(OmsOutput)}: OMS REST API returned an error. Code: {response.StatusCode} Description: {response.ReasonPhrase} {responseContent}"; - this.healthReporter.ReportProblem(errorMessage); } } catch (Exception e) { - ErrorHandlingPolicies.HandleOutputTaskError(e, () => + ErrorHandlingPolicies.HandleOutputTaskError(e, () => { - string errorMessage = nameof(OmsOutput) + ": an error occurred while sending data to OMS: " + Environment.NewLine + e.ToString(); + string errorMessage = nameof(OmsOutput) + ": an error occurred while sending data to OMS: " + + Environment.NewLine + e.ToString(); this.healthReporter.ReportWarning(errorMessage, EventFlowContextIdentifiers.Output); }); } } + /// + /// Converts event data objects into log-ready objects. Events are separated into buckets based on their + /// destination table in Log Analytics. + /// + /// Default implementation uses a single bucket, taken from the configuration file. + /// + /// The events to be processed. + /// A collection of "buckets", each containing a table name and the objects that go into it, + /// serialized into a JSON array. + protected virtual IReadOnlyDictionary PrepareEventData(IReadOnlyCollection events) + { + var result = new Dictionary(); + + if ((events != null) && (events.Count > 0)) + { + + result.Add(this.connectionData.LogTypeName, JsonConvert.SerializeObject(events)); + } + + return result; + } + private string BuildSignature(long contentLength, string dateString) { string dateHeader = $"{MsDateHeaderName}:{dateString}"; @@ -167,11 +205,12 @@ private string BuildSignature(long contentLength, string dateString) return signature; } - private class OmsConnectionData + protected class OmsConnectionData { public HttpClient HttpClient { get; set; } public HMACSHA256 Hasher { get; set; } public string WorkspaceId { get; set; } + public string LogTypeName { get; set; } } } }