Skip to content

Commit

Permalink
changed logic
Browse files Browse the repository at this point in the history
  • Loading branch information
sourabh1007 committed Aug 5, 2024
1 parent 80340c6 commit 740e6c0
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 38 deletions.
26 changes: 13 additions & 13 deletions Microsoft.Azure.Cosmos/src/Batch/BatchCore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ internal class BatchCore : TransactionalBatchInternal

private readonly ContainerInternal container;

private List<ItemBatchOperation> operations;

/// <summary>
/// Initializes a new instance of the <see cref="BatchCore"/> class.
/// </summary>
Expand All @@ -43,7 +41,7 @@ public override TransactionalBatch CreateItem<T>(
throw new ArgumentNullException(nameof(item));
}

this.operations.Add(new ItemBatchOperation<T>(
this.AddOperation(new ItemBatchOperation<T>(
operationType: OperationType.Create,
operationIndex: this.operations.Count,
resource: item,
Expand All @@ -62,7 +60,7 @@ public override TransactionalBatch CreateItemStream(
throw new ArgumentNullException(nameof(streamPayload));
}

this.operations.Add(new ItemBatchOperation(
this.AddOperation(new ItemBatchOperation(
operationType: OperationType.Create,
operationIndex: this.operations.Count,
resourceStream: streamPayload,
Expand All @@ -81,7 +79,7 @@ public override TransactionalBatch ReadItem(
throw new ArgumentNullException(nameof(id));
}

this.operations.Add(new ItemBatchOperation(
this.AddOperation(new ItemBatchOperation(
operationType: OperationType.Read,
operationIndex: this.operations.Count,
id: id,
Expand All @@ -100,7 +98,7 @@ public override TransactionalBatch UpsertItem<T>(
throw new ArgumentNullException(nameof(item));
}

this.operations.Add(new ItemBatchOperation<T>(
this.AddOperation(new ItemBatchOperation<T>(
operationType: OperationType.Upsert,
operationIndex: this.operations.Count,
resource: item,
Expand All @@ -119,7 +117,7 @@ public override TransactionalBatch UpsertItemStream(
throw new ArgumentNullException(nameof(streamPayload));
}

this.operations.Add(new ItemBatchOperation(
this.AddOperation(new ItemBatchOperation(
operationType: OperationType.Upsert,
operationIndex: this.operations.Count,
resourceStream: streamPayload,
Expand All @@ -144,7 +142,7 @@ public override TransactionalBatch ReplaceItem<T>(
throw new ArgumentNullException(nameof(item));
}

this.operations.Add(new ItemBatchOperation<T>(
this.AddOperation(new ItemBatchOperation<T>(
operationType: OperationType.Replace,
operationIndex: this.operations.Count,
id: id,
Expand All @@ -170,7 +168,7 @@ public override TransactionalBatch ReplaceItemStream(
throw new ArgumentNullException(nameof(streamPayload));
}

this.operations.Add(new ItemBatchOperation(
this.AddOperation(new ItemBatchOperation(
operationType: OperationType.Replace,
operationIndex: this.operations.Count,
id: id,
Expand All @@ -190,7 +188,7 @@ public override TransactionalBatch DeleteItem(
throw new ArgumentNullException(nameof(id));
}

this.operations.Add(new ItemBatchOperation(
this.AddOperation(new ItemBatchOperation(
operationType: OperationType.Delete,
operationIndex: this.operations.Count,
id: id,
Expand Down Expand Up @@ -236,7 +234,9 @@ public override Task<TransactionalBatchResponse> ExecuteAsync(
return executor.ExecuteAsync(trace, cancellationToken);
},
openTelemetry: (response) => new OpenTelemetryResponse(
responseMessage: response));
responseMessage: response,
operationFlag: this.isHomogenousOperations,
operationName: this.lastItemBatchOperation.OperationType));
}

/// <summary>
Expand All @@ -251,7 +251,7 @@ public virtual TransactionalBatch PatchItemStream(
Stream patchStream,
TransactionalBatchPatchItemRequestOptions requestOptions = null)
{
this.operations.Add(new ItemBatchOperation(
this.AddOperation(new ItemBatchOperation(
operationType: OperationType.Patch,
operationIndex: this.operations.Count,
id: id,
Expand Down Expand Up @@ -287,7 +287,7 @@ public override TransactionalBatch PatchItem(

PatchSpec patchSpec = new PatchSpec(patchOperations, requestOptions);

this.operations.Add(new ItemBatchOperation<PatchSpec>(
this.AddOperation(new ItemBatchOperation<PatchSpec>(
operationType: OperationType.Patch,
operationIndex: this.operations.Count,
id: id,
Expand Down
22 changes: 22 additions & 0 deletions Microsoft.Azure.Cosmos/src/Batch/TransactionalBatchInternal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,34 @@

namespace Microsoft.Azure.Cosmos
{
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Documents;

internal abstract class TransactionalBatchInternal : TransactionalBatch
{
protected List<ItemBatchOperation> operations;

internal bool isHomogenousOperations = false;

internal ItemBatchOperation lastItemBatchOperation = null;

protected void AddOperation(ItemBatchOperation itemBatchOperation)
{
this.operations.Add(itemBatchOperation);
if (this.operations.Count == 1)
{
this.lastItemBatchOperation = itemBatchOperation;
}
else
{
this.isHomogenousOperations = this.isHomogenousOperations
&& this.lastItemBatchOperation.OperationType == itemBatchOperation.OperationType;
this.lastItemBatchOperation = itemBatchOperation;
}
}
}
}
22 changes: 0 additions & 22 deletions Microsoft.Azure.Cosmos/src/Batch/TransactionalBatchResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -401,28 +401,6 @@ internal int GetBatchSize()
return this.Operations.Count;
}

internal OperationType? GetBatchOperationName()
{
HashSet<OperationType> operationNames = new ();

if (this.Operations == null)
{
return null;
}

foreach (ItemBatchOperation operation in this.Operations)
{
operationNames.Add(operation.OperationType);
}

if (operationNames.Count == 1)
{
return this.Operations[0].OperationType;
}

return null;
}

/// <summary>
/// Disposes the disposable members held by this class.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ namespace Microsoft.Azure.Cosmos
using System.IO;
using System.Net;
using Microsoft.Azure.Cosmos.Core.Trace;
using Microsoft.Azure.Documents;
using Telemetry;

internal sealed class OpenTelemetryResponse : OpenTelemetryAttributes
{
internal OpenTelemetryResponse(TransactionalBatchResponse responseMessage)
internal OpenTelemetryResponse(TransactionalBatchResponse responseMessage, bool operationFlag, OperationType? operationName)
: this(
statusCode: responseMessage.StatusCode,
requestCharge: OpenTelemetryResponse.GetHeader(responseMessage)?.RequestCharge,
Expand All @@ -24,7 +25,7 @@ internal OpenTelemetryResponse(TransactionalBatchResponse responseMessage)
activityId: OpenTelemetryResponse.GetHeader(responseMessage)?.ActivityId,
correlationId: OpenTelemetryResponse.GetHeader(responseMessage)?.CorrelatedActivityId,
batchSize: responseMessage.GetBatchSize(),
batchOperationName: responseMessage.GetBatchOperationName())
batchOperationName: operationFlag ? operationName : null )
{
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ internal class CustomListener :
"db.cosmosdb.connection_mode",
"db.cosmosdb.operation_type",
"db.cosmosdb.regions_contacted",
"db.cosmosdb.batch_size",
"rntbd.sub_status_code",
"rntbd.status_code",
"error.type"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public async Task CheckResponseCompatibility()

if (instance is TransactionalBatchResponse transactionInstance)
{
_ = new OpenTelemetryResponse(transactionInstance);
_ = new OpenTelemetryResponse(transactionInstance, false, null);
}
else if (instance is ResponseMessage responseMessageInstance)
{
Expand Down

0 comments on commit 740e6c0

Please sign in to comment.