Skip to content

Commit

Permalink
Make OmsOutput extensible (#306)
Browse files Browse the repository at this point in the history
  • Loading branch information
AssafTzurEl authored and karolz-ms committed Mar 4, 2019
1 parent f28fa7d commit 85dcbcb
Showing 1 changed file with 87 additions and 48 deletions.
135 changes: 87 additions & 48 deletions src/Microsoft.Diagnostics.EventFlow.Outputs.Oms/OmsOutput.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@
// Licensed under the MIT License (MIT). See License.txt in the repo root for license information.
// ------------------------------------------------------------

using Microsoft.Diagnostics.EventFlow.Configuration;
using Microsoft.Diagnostics.EventFlow.Utilities;
using Microsoft.Extensions.Configuration;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Generic;
using System.Diagnostics;
Expand All @@ -12,24 +17,19 @@
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Configuration;
using Newtonsoft.Json;
using Validation;

using Microsoft.Diagnostics.EventFlow.Configuration;
using Microsoft.Diagnostics.EventFlow.Utilities;

namespace Microsoft.Diagnostics.EventFlow.Outputs
{
public class OmsOutput : IOutput
{
const string OmsDataUploadResource = "/api/logs";
const string OmsDataUploadUrl = OmsDataUploadResource + "?api-version=2016-04-01";
const string MsDateHeaderName = "x-ms-date";
const string JsonContentId = "application/json";
private const string OmsDataUploadResource = "/api/logs";
private const string OmsDataUploadUrl = OmsDataUploadResource + "?api-version=2016-04-01";
private const string MsDateHeaderName = "x-ms-date";
private const string JsonContentId = "application/json";

private readonly IHealthReporter healthReporter;
private OmsConnectionData connectionData;
private readonly OmsConnectionData connectionData;

public OmsOutput(IConfiguration configuration, IHealthReporter healthReporter)
{
Expand All @@ -44,7 +44,8 @@ public OmsOutput(IConfiguration configuration, IHealthReporter healthReporter)
}
catch
{
healthReporter.ReportProblem($"Invalid {nameof(OmsOutput)} configuration encountered: '{configuration.ToString()}'",
healthReporter.ReportProblem(
$"Invalid {nameof(OmsOutput)} configuration encountered: '{configuration.ToString()}'",
EventFlowContextIdentifiers.Configuration);
throw;
}
Expand All @@ -61,9 +62,8 @@ public OmsOutput(OmsOutputConfiguration configuration, IHealthReporter healthRep
this.connectionData = CreateConnectionData(configuration);
}

private OmsConnectionData CreateConnectionData(OmsOutputConfiguration configuration)
protected virtual OmsConnectionData CreateConnectionData(OmsOutputConfiguration configuration)
{
Debug.Assert(this.healthReporter != null);
Debug.Assert(configuration != null);

string workspaceId = configuration.WorkspaceId;
Expand All @@ -80,30 +80,35 @@ private OmsConnectionData CreateConnectionData(OmsOutputConfiguration configurat
}

var hasher = new HMACSHA256(Convert.FromBase64String(omsWorkspaceKeyBase64));

var retryHandler = new HttpExponentialRetryMessageHandler();
var httpClient = new HttpClient(retryHandler);

if (!string.IsNullOrWhiteSpace(configuration.ServiceDomain))
{
httpClient.BaseAddress = new Uri($"https://{workspaceId}." + configuration.ServiceDomain, UriKind.Absolute);
httpClient.BaseAddress = new Uri($"https://{workspaceId}.{configuration.ServiceDomain}", UriKind.Absolute);
}
else
{
httpClient.BaseAddress = new Uri($"https://{workspaceId}.ods.opinsights.azure.com", UriKind.Absolute);
}
}

string logTypeName = configuration.LogTypeName;
if (string.IsNullOrWhiteSpace(logTypeName))
{
logTypeName = "Event";
}
httpClient.DefaultRequestHeaders.Add("Log-Type", logTypeName);

return new OmsConnectionData { HttpClient = httpClient, Hasher = hasher, WorkspaceId = workspaceId };
return new OmsConnectionData
{
HttpClient = httpClient,
Hasher = hasher,
WorkspaceId = workspaceId,
LogTypeName = logTypeName
};
}

public async Task SendEventsAsync(IReadOnlyCollection<EventData> events, long transmissionSequenceNumber, CancellationToken cancellationToken)
public async Task SendEventsAsync(IReadOnlyCollection<EventData> events, long transmissionSequenceNumber,
CancellationToken cancellationToken)
{
if (this.connectionData == null || events == null || events.Count == 0)
{
Expand All @@ -112,47 +117,80 @@ public async Task SendEventsAsync(IReadOnlyCollection<EventData> events, long tr

try
{
string jsonData = JsonConvert.SerializeObject(events);

string dateString = DateTime.UtcNow.ToString("r");
HttpContent content = new StringContent(jsonData, Encoding.UTF8, JsonContentId);
string signature = this.BuildSignature(content.Headers.ContentLength.Value, dateString);

content.Headers.ContentType = new MediaTypeHeaderValue(JsonContentId);
HttpRequestMessage request = new HttpRequestMessage(HttpMethod.Post, OmsDataUploadUrl);
request.Headers.Add("Authorization", signature);
request.Headers.Add(MsDateHeaderName, dateString);
request.Content = content;

// SendAsync is thread safe
HttpResponseMessage response = await this.connectionData.HttpClient.SendAsync(request, cancellationToken).ConfigureAwait(false);
if (response.IsSuccessStatusCode)
{
this.healthReporter.ReportHealthy();
}
else
var logTypeToData = PrepareEventData(events);

foreach (var logTypeAndJsonPair in logTypeToData)
{
string responseContent = string.Empty;
try
var logType = logTypeAndJsonPair.Key;
var jsonData = logTypeAndJsonPair.Value;
string dateString = DateTime.UtcNow.ToString("r");
HttpContent content = new StringContent(jsonData, Encoding.UTF8, JsonContentId);
string signature = this.BuildSignature(content.Headers.ContentLength.Value, dateString);

content.Headers.ContentType = new MediaTypeHeaderValue(JsonContentId);
HttpRequestMessage request = new HttpRequestMessage(HttpMethod.Post, OmsDataUploadUrl);
request.Headers.Add("Log-Type", logType);
request.Headers.Add("Authorization", signature);
request.Headers.Add(MsDateHeaderName, dateString);
request.Content = content;

// SendAsync is thread safe
using (var response = await connectionData.HttpClient.SendAsync(request, cancellationToken)
.ConfigureAwait(false))
{
responseContent = await response.Content.ReadAsStringAsync();
if (response.IsSuccessStatusCode)
{
this.healthReporter.ReportHealthy();
}
else
{
string responseContent = string.Empty;
try
{
responseContent = await response.Content.ReadAsStringAsync();
}
catch { }

string errorMessage =
$"{nameof(OmsOutput)}: OMS REST API returned an error. Code: {response.StatusCode} Description: {response.ReasonPhrase} {responseContent}";
this.healthReporter.ReportProblem(errorMessage);
}
}
catch { }

string errorMessage = $"{nameof(OmsOutput)}: OMS REST API returned an error. Code: {response.StatusCode} Description: {response.ReasonPhrase} {responseContent}";
this.healthReporter.ReportProblem(errorMessage);
}
}
catch (Exception e)
{
ErrorHandlingPolicies.HandleOutputTaskError(e, () =>
ErrorHandlingPolicies.HandleOutputTaskError(e, () =>
{
string errorMessage = nameof(OmsOutput) + ": an error occurred while sending data to OMS: " + Environment.NewLine + e.ToString();
string errorMessage = nameof(OmsOutput) + ": an error occurred while sending data to OMS: "
+ Environment.NewLine + e.ToString();
this.healthReporter.ReportWarning(errorMessage, EventFlowContextIdentifiers.Output);
});
}
}

/// <summary>
/// Converts event data objects into log-ready objects. Events are separated into buckets based on their
/// destination table in Log Analytics.
///
/// Default implementation uses a single bucket, taken from the configuration file.
/// </summary>
/// <param name="events">The events to be processed.</param>
/// <returns>A collection of "buckets", each containing a table name and the objects that go into it,
/// serialized into a JSON array.</returns>
protected virtual IReadOnlyDictionary<string, string> PrepareEventData(IReadOnlyCollection<EventData> events)
{
var result = new Dictionary<string, string>();

if ((events != null) && (events.Count > 0))
{

result.Add(this.connectionData.LogTypeName, JsonConvert.SerializeObject(events));
}

return result;
}

private string BuildSignature(long contentLength, string dateString)
{
string dateHeader = $"{MsDateHeaderName}:{dateString}";
Expand All @@ -167,11 +205,12 @@ private string BuildSignature(long contentLength, string dateString)
return signature;
}

private class OmsConnectionData
protected class OmsConnectionData
{
public HttpClient HttpClient { get; set; }
public HMACSHA256 Hasher { get; set; }
public string WorkspaceId { get; set; }
public string LogTypeName { get; set; }
}
}
}

0 comments on commit 85dcbcb

Please sign in to comment.