Skip to content

Commit

Permalink
Support for Elasticsearch 7 (#331)
Browse files Browse the repository at this point in the history
  • Loading branch information
karolz-ms authored Sep 7, 2019
1 parent acf96b2 commit f757093
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 74 deletions.
12 changes: 8 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -792,8 +792,6 @@ All other events will be reported as Application Insights *traces* (telemetry of
#### Elasticsearch
*Nuget Package*: [**Microsoft.Diagnostics.EventFlow.Outputs.ElasticSearch**](https://www.nuget.org/packages/Microsoft.Diagnostics.EventFlow.Outputs.ElasticSearch/)

**Note: Nuget package version 1.x supports Elasticsearch version 2.x. Nuget package version 2.x supports Elasticsearch version 6.x**

This output writes data to the [Elasticsearch](https://www.elastic.co/products/elasticsearch). Here is an example showing all possible settings:
```json
{
Expand All @@ -803,7 +801,6 @@ This output writes data to the [Elasticsearch](https://www.elastic.co/products/e
"connectionPoolType": "Sniffing",
"basicAuthenticationUserName": "esUser1",
"basicAuthenticationUserPassword": "<MyPassword>",
"eventDocumentTypeName": "diagData",
"numberOfShards": 1,
"numberOfReplicas": 1,
"refreshInterval": "15s",
Expand All @@ -818,7 +815,7 @@ This output writes data to the [Elasticsearch](https://www.elastic.co/products/e
| `connectionPoolType` | "Static", "Sniffing", or "Sticky" | No | Specifies the Connection Pool that takes care of registering what nodes there are in the cluster. |
| `basicAuthenticationUserName` | string | No | Specifies the user name used to authenticate with Elasticsearch. To protect the cluster, authentication is often setup on the cluster. |
| `basicAuthenticationUserPassword` | string | No | Specifies the password used to authenticate with Elasticsearch. This field should be used only if basicAuthenticationUserName is specified. |
| `eventDocumentTypeName` | string | Yes | Specifies the document type to be applied when data is written. Elasticsearch allows documents to be typed, so they can be distinguished from other types. This type name is user-defined. |
| `eventDocumentTypeName` | string | Yes (ver < 2.7.0) <br/> N/A (ver >= 2.7.0) | Specifies the document type to be applied when data is written. Elasticsearch allows documents to be typed, so they can be distinguished from other types. This type name is user-defined. <br/> <br/> Starting with Elasticsearch 7.x the [mapping types have been removed](https://www.elastic.co/guide/en/elasticsearch/reference/7.0/removal-of-types.html). Consequently this configuration setting has been removed from Elasticsearch output version 2.7.0 and newer. |
| `numberOfShards` | int | No | Specifies how many shards to create the index with. If not specified, it defaults to 1.|
| `numberOfReplicas` | int | No | Specifies how many replicas the index is created with. If not specified, it defaults to 5.|
| `refreshInterval` | string | No | Specifies what refresh interval the index is created with. If not specified, it defaults to 15s.|
Expand All @@ -845,6 +842,13 @@ Fields injected byt the `request` metadata are:
| `IsSuccess` | Success indicator, read from the event property specified by `isSuccessProperty` (if available). |
| `ResponseCode` | Response code for the request, read from the event property specified by `responseCodeProperty` (if available). |

*Elasticsearch version support*
| Elasticsearch output package version | Supported Elasticsearch server version |
| :---- | :---- |
| 1.x | 2.x |
| 2.6.x | 6.x |
| 2.7.x | 7.x |

#### Azure Monitor Logs

*Nuget package*: [**Microsoft.Diagnostics.EventFlow.Outputs.AzureMonitorLogs**](https://www.nuget.org/packages/Microsoft.Diagnostics.EventFlow.Outputs.AzureMonitorLogs/)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public async Task SendEventsAsync(IReadOnlyCollection<EventData> events, long tr
// Note: the NEST client is documented to be thread-safe so it should be OK to just reuse the this.esClient instance
// between different SendEventsAsync callbacks.
// Reference: https://www.elastic.co/blog/nest-and-elasticsearch-net-1-3
IBulkResponse response = await this.connectionData.Client.BulkAsync(request).ConfigureAwait(false);
BulkResponse response = await this.connectionData.Client.BulkAsync(request).ConfigureAwait(false);
if (!response.IsValid)
{
this.ReportEsRequestError(response, "Bulk upload");
Expand Down Expand Up @@ -130,7 +130,7 @@ private IEnumerable<IBulkOperation> GetCreateOperationsForEvent(EventData eventD
{
foreach (var metricMetadata in metadataSet)
{
operation = CreateMetricOperation(eventData, metricMetadata, currentIndexName, documentTypeName);
operation = CreateMetricOperation(eventData, metricMetadata, currentIndexName);
if (operation != null)
{
reportedAsSpecialEvent = true;
Expand All @@ -143,7 +143,7 @@ private IEnumerable<IBulkOperation> GetCreateOperationsForEvent(EventData eventD
{
foreach (var requestMetadata in metadataSet)
{
operation = CreateRequestOperation(eventData, requestMetadata, currentIndexName, documentTypeName);
operation = CreateRequestOperation(eventData, requestMetadata, currentIndexName);
if (operation != null)
{
reportedAsSpecialEvent = true;
Expand All @@ -156,7 +156,7 @@ private IEnumerable<IBulkOperation> GetCreateOperationsForEvent(EventData eventD
{
foreach (var dependencyMetadata in metadataSet)
{
operation = CreateDependencyOperation(eventData, dependencyMetadata, currentIndexName, documentTypeName);
operation = CreateDependencyOperation(eventData, dependencyMetadata, currentIndexName);
if (operation != null)
{
reportedAsSpecialEvent = true;
Expand All @@ -169,7 +169,7 @@ private IEnumerable<IBulkOperation> GetCreateOperationsForEvent(EventData eventD
{
foreach (var dependencyMetadata in metadataSet)
{
operation = CreateDependencyOperation(eventData, dependencyMetadata, currentIndexName, documentTypeName);
operation = CreateDependencyOperation(eventData, dependencyMetadata, currentIndexName);
if (operation != null)
{
reportedAsSpecialEvent = true;
Expand All @@ -182,7 +182,7 @@ private IEnumerable<IBulkOperation> GetCreateOperationsForEvent(EventData eventD
{
foreach (var exceptionMetadata in metadataSet)
{
operation = CreateExceptionOperation(eventData, exceptionMetadata, currentIndexName, documentTypeName);
operation = CreateExceptionOperation(eventData, exceptionMetadata, currentIndexName);
if (operation != null)
{
reportedAsSpecialEvent = true;
Expand All @@ -193,16 +193,12 @@ private IEnumerable<IBulkOperation> GetCreateOperationsForEvent(EventData eventD

if (!reportedAsSpecialEvent)
{
operation = CreateOperation(eventData, currentIndexName, documentTypeName);
operation = CreateOperation(eventData, currentIndexName);
yield return operation;
}
}

private BulkIndexOperation<EventData> CreateMetricOperation(
EventData eventData,
EventMetadata metricMetadata,
string currentIndexName,
string documentTypeName)
private BulkIndexOperation<EventData> CreateMetricOperation(EventData eventData, EventMetadata metricMetadata, string currentIndexName)
{
var result = MetricData.TryGetData(eventData, metricMetadata, out MetricData metricData);
if (result.Status != DataRetrievalStatus.Success)
Expand All @@ -214,15 +210,11 @@ private BulkIndexOperation<EventData> CreateMetricOperation(
var metricEventData = eventData.DeepClone();
metricEventData.Payload[nameof(MetricData.MetricName)] = metricData.MetricName;
metricEventData.Payload[nameof(MetricData.Value)] = metricData.Value;
var operation = CreateOperation(metricEventData, currentIndexName, documentTypeName);
var operation = CreateOperation(metricEventData, currentIndexName);
return operation;
}

private BulkIndexOperation<EventData> CreateRequestOperation(
EventData eventData,
EventMetadata requestMetadata,
string currentIndexName,
string documentTypeName)
private BulkIndexOperation<EventData> CreateRequestOperation(EventData eventData, EventMetadata requestMetadata, string currentIndexName)
{
var result = RequestData.TryGetData(eventData, requestMetadata, out RequestData requestData);
if (result.Status != DataRetrievalStatus.Success)
Expand All @@ -245,15 +237,11 @@ private BulkIndexOperation<EventData> CreateRequestOperation(
{
requestEventData.Payload[nameof(RequestData.ResponseCode)] = requestData.ResponseCode;
}
var operation = CreateOperation(requestEventData, currentIndexName, documentTypeName);
var operation = CreateOperation(requestEventData, currentIndexName);
return operation;
}

private BulkIndexOperation<EventData> CreateDependencyOperation(
EventData eventData,
EventMetadata dependencyMetadata,
string currentIndexName,
string documentTypeName)
private BulkIndexOperation<EventData> CreateDependencyOperation(EventData eventData, EventMetadata dependencyMetadata, string currentIndexName)
{
var result = DependencyData.TryGetData(eventData, dependencyMetadata, out DependencyData dependencyData);
if (result.Status != DataRetrievalStatus.Success)
Expand Down Expand Up @@ -283,15 +271,11 @@ private BulkIndexOperation<EventData> CreateDependencyOperation(
{
dependencyEventData.Payload[nameof(DependencyData.DependencyType)] = dependencyData.DependencyType;
}
var operation = CreateOperation(dependencyEventData, currentIndexName, documentTypeName);
var operation = CreateOperation(dependencyEventData, currentIndexName);
return operation;
}

private BulkIndexOperation<EventData> CreateExceptionOperation(
EventData eventData,
EventMetadata exceptionMetadata,
string currentIndexName,
string documentTypeName)
private BulkIndexOperation<EventData> CreateExceptionOperation(EventData eventData, EventMetadata exceptionMetadata, string currentIndexName)
{
var result = ExceptionData.TryGetData(eventData, exceptionMetadata, out ExceptionData exceptionData);
if (result.Status != DataRetrievalStatus.Success)
Expand All @@ -302,15 +286,14 @@ private BulkIndexOperation<EventData> CreateExceptionOperation(

var exceptionEventData = eventData.DeepClone();
exceptionEventData.Payload[nameof(ExceptionData.Exception)] = exceptionData.Exception.ToString();
var operation = CreateOperation(exceptionEventData, currentIndexName, documentTypeName);
var operation = CreateOperation(exceptionEventData, currentIndexName);
return operation;
}

private static BulkIndexOperation<EventData> CreateOperation(EventData eventData, string currentIndexName, string documentTypeName)
private static BulkIndexOperation<EventData> CreateOperation(EventData eventData, string currentIndexName)
{
BulkIndexOperation<EventData> operation = new BulkIndexOperation<EventData>(eventData);
operation.Index = currentIndexName;
operation.Type = documentTypeName;
return operation;
}

Expand Down Expand Up @@ -371,7 +354,7 @@ private void Initialize(ElasticSearchOutputConfiguration esOutputConfiguration)

private async Task EnsureIndexExists(string indexName, ElasticClient esClient)
{
IExistsResponse existsResult = await esClient.IndexExistsAsync(indexName).ConfigureAwait(false);
ExistsResponse existsResult = await esClient.Indices.ExistsAsync(indexName).ConfigureAwait(false);
if (!existsResult.IsValid)
{
this.ReportEsRequestError(existsResult, "Index exists check");
Expand All @@ -393,7 +376,7 @@ private async Task EnsureIndexExists(string indexName, ElasticClient esClient)
indexSettings.Settings.Add("default_pipeline", this.connectionData.Configuration.DefaultPipeline);
}

ICreateIndexResponse createIndexResult = await esClient.CreateIndexAsync(indexName, c => c.InitializeUsing(indexSettings)).ConfigureAwait(false);
CreateIndexResponse createIndexResult = await esClient.Indices.CreateAsync(indexName, c => c.InitializeUsing(indexSettings)).ConfigureAwait(false);

if (!createIndexResult.IsValid)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,17 @@
<PropertyGroup>
<Description>Provides an output implementation that sends diagnostics data to Elasticsearch.</Description>
<Copyright>© Microsoft Corporation. All rights reserved.</Copyright>
<VersionPrefix>2.6.1</VersionPrefix>
<VersionPrefix>2.7.0</VersionPrefix>
<Authors>Microsoft</Authors>
<TargetFrameworks>netstandard1.6;net451;netstandard2.0</TargetFrameworks>
<TargetFrameworks>net471;netstandard2.0</TargetFrameworks>
<AssemblyName>Microsoft.Diagnostics.EventFlow.Outputs.ElasticSearch</AssemblyName>
<PublicSign Condition=" '$(OS)' != 'Windows_NT' ">true</PublicSign>
<PackageId>Microsoft.Diagnostics.EventFlow.Outputs.ElasticSearch</PackageId>
<PackageTags>Microsoft;Diagnostics;EventFlow;Outputs;Elasticsearch</PackageTags>
<PackageProjectUrl>https://github.com/Azure/diagnostics-eventflow</PackageProjectUrl>
<PackageLicenseExpression>MIT</PackageLicenseExpression>
<PackageRequireLicenseAcceptance>true</PackageRequireLicenseAcceptance>
<NetStandardImplicitPackageVersion>1.6.1</NetStandardImplicitPackageVersion>
<NetStandardImplicitPackageVersion Condition=" '$(TargetFramework)' == 'netstandard1.6' ">1.6.1</NetStandardImplicitPackageVersion>
<NetStandardImplicitPackageVersion>2.0.0</NetStandardImplicitPackageVersion>
<NetStandardImplicitPackageVersion Condition=" '$(TargetFramework)' == 'netstandard2.0' ">2.0.0</NetStandardImplicitPackageVersion>
<GenerateAssemblyConfigurationAttribute>false</GenerateAssemblyConfigurationAttribute>
<GenerateAssemblyCompanyAttribute>false</GenerateAssemblyCompanyAttribute>
Expand All @@ -27,11 +26,11 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Elasticsearch.Net" Version="[6.1.0,7.0)" />
<PackageReference Include="NEST" Version="[6.1.0,7.0)" />
<PackageReference Include="Elasticsearch.Net" Version="[7.1.0,8.0)" />
<PackageReference Include="NEST" Version="[7.1.0,8.0)" />
</ItemGroup>

<ItemGroup Condition=" '$(TargetFramework)' == 'net451' ">
<ItemGroup Condition=" '$(TargetFramework)' == 'net471' ">
<Reference Include="System" />
<Reference Include="Microsoft.CSharp" />
</ItemGroup>
Expand Down
Loading

0 comments on commit f757093

Please sign in to comment.