Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Abstracting EventHubs EventData #740

Merged
merged 6 commits into from
Sep 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions src/Common/Abstractions/EventDataMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
namespace ServiceBusExplorer.Abstractions
{
using System;
using System.Collections.Generic;
using System.IO;
using Microsoft.Azure.Amqp;
using Microsoft.ServiceBus.Messaging;

public class EventDataMessage : IDisposable
{
readonly EventData eventData;
Stream stream;

public EventDataMessage(EventData eventData)
{
this.eventData = eventData;
stream = eventData.GetBodyStream();
Properties = eventData.Properties;
PartitionKey = eventData.PartitionKey;
SequenceNumber = eventData.SequenceNumber;
Offset = eventData.Offset;
SerializedSizeInBytes = eventData.SerializedSizeInBytes;
EnqueuedTimeUtc = eventData.EnqueuedTimeUtc;
SystemProperties = eventData.SystemProperties;
}

public string PartitionKey { get; private set; }
public long SequenceNumber { get; private set; }
public long SerializedSizeInBytes { get; private set; }
public string Offset { get; private set; }
public DateTime EnqueuedTimeUtc { get; private set; }
public IDictionary<string, object> Properties { get; private set; }
public IDictionary<string, object> SystemProperties { get; private set; }

public Stream GetBodyStream()
{
var memoryStream = new MemoryStream();

stream.CopyTo(memoryStream);
stream.Seek(0L, SeekOrigin.Begin);

return memoryStream;
}

public void Dispose()
{
eventData.Dispose();
}
}
}
98 changes: 50 additions & 48 deletions src/Common/Helpers/ServiceBusHelper.cs

Large diffs are not rendered by default.

27 changes: 13 additions & 14 deletions src/ServiceBus/Helpers/ServiceBusPurger.cs
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
#region Copyright
//=======================================================================================
// Microsoft Azure Customer Advisory Team
// Microsoft Azure Customer Advisory Team
//
// This sample is supplemental to the technical guidance published on my personal
// blog at http://blogs.msdn.com/b/paolos/.
//
// blog at http://blogs.msdn.com/b/paolos/.
//
// Author: Paolo Salvatori
//=======================================================================================
// Copyright (c) Microsoft Corporation. All rights reserved.
//
// LICENSED UNDER THE APACHE LICENSE, VERSION 2.0 (THE "LICENSE"); YOU MAY NOT USE THESE
// FILES EXCEPT IN COMPLIANCE WITH THE LICENSE. YOU MAY OBTAIN A COPY OF THE LICENSE AT
//
// LICENSED UNDER THE APACHE LICENSE, VERSION 2.0 (THE "LICENSE"); YOU MAY NOT USE THESE
// FILES EXCEPT IN COMPLIANCE WITH THE LICENSE. YOU MAY OBTAIN A COPY OF THE LICENSE AT
// http://www.apache.org/licenses/LICENSE-2.0
// UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING, SOFTWARE DISTRIBUTED UNDER THE
// LICENSE IS DISTRIBUTED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, EITHER EXPRESS OR IMPLIED. SEE THE LICENSE FOR THE SPECIFIC LANGUAGE GOVERNING
// UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING, SOFTWARE DISTRIBUTED UNDER THE
// LICENSE IS DISTRIBUTED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, EITHER EXPRESS OR IMPLIED. SEE THE LICENSE FOR THE SPECIFIC LANGUAGE GOVERNING
// PERMISSIONS AND LIMITATIONS UNDER THE LICENSE.
//=======================================================================================
#endregion
Expand Down Expand Up @@ -100,7 +100,6 @@ private async Task<long> PurgeSessionEntity(TEntity entity)
{
long totalMessagesPurged = 0;
var consecutiveSessionTimeOuts = 0;
ServiceBusSessionReceiver sessionReceiver = null;
long messagesToPurgeCount = await GetMessageCount(entity, deadLetterQueueData: false)
.ConfigureAwait(false);

Expand All @@ -117,9 +116,9 @@ private async Task<long> PurgeSessionEntity(TEntity entity)

while (consecutiveSessionTimeOuts < enoughZeroReceives && totalMessagesPurged < messagesToPurgeCount)
{
sessionReceiver = await CreateServiceBusSessionReceiver(entity,
client,
purgeDeadLetterQueueInstead: false)
var sessionReceiver = await CreateServiceBusSessionReceiver(entity,
client,
purgeDeadLetterQueueInstead: false)
.ConfigureAwait(false);

var consecutiveZeroBatchReceives = 0;
Expand Down Expand Up @@ -236,7 +235,7 @@ private async Task<long> DoPurgeNonSessionEntity(TEntity entity, long messagesTo
await receiver.CloseAsync().ConfigureAwait(false);
}
}
}); // End of lambda
}); // End of lambda
}

await Task.WhenAll(tasks).ConfigureAwait(false);
Expand Down
3 changes: 1 addition & 2 deletions src/ServiceBusExplorer/Controls/HandleQueueControl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,6 @@ public partial class HandleQueueControl : UserControl

private QueueDescription queueDescription = default!;
private readonly ServiceBusHelper serviceBusHelper = default!;
private readonly ServiceBusHelper2 serviceBusHelper2 = default!;
private readonly WriteToLogDelegate writeToLog = default!;
private readonly string path = default!;
private readonly List<TabPage> hiddenPages = new List<TabPage>();
Expand Down Expand Up @@ -297,9 +296,9 @@ public HandleQueueControl(WriteToLogDelegate writeToLog, ServiceBusHelper servic
{
this.writeToLog = writeToLog;
this.serviceBusHelper = serviceBusHelper;
this.serviceBusHelper2 = serviceBusHelper.GetServiceBusHelper2();
this.path = path;
this.queueDescription = queueDescription;
var serviceBusHelper2 = serviceBusHelper.GetServiceBusHelper2();

if (!serviceBusHelper2.ConnectionStringContainsEntityPath())
{
Expand Down
3 changes: 1 addition & 2 deletions src/ServiceBusExplorer/Controls/HandleTopicControl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ public partial class HandleTopicControl : UserControl
private readonly List<TabPage> hiddenPages = new List<TabPage>();
private TopicDescription topicDescription;
private readonly ServiceBusHelper serviceBusHelper;
private readonly ServiceBusHelper2 serviceBusHelper2 = default!;
private readonly WriteToLogDelegate writeToLog;
private readonly bool premiumNamespace;
private readonly string path;
Expand Down Expand Up @@ -153,7 +152,7 @@ public HandleTopicControl(WriteToLogDelegate writeToLog, ServiceBusHelper servic
{
this.writeToLog = writeToLog;
this.serviceBusHelper = serviceBusHelper;
this.serviceBusHelper2 = serviceBusHelper.GetServiceBusHelper2();
var serviceBusHelper2 = serviceBusHelper.GetServiceBusHelper2();

if (!serviceBusHelper2.ConnectionStringContainsEntityPath())
{
Expand Down
77 changes: 47 additions & 30 deletions src/ServiceBusExplorer/Controls/PartitionListenerControl.cs
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
#region Copyright
//=======================================================================================
// Microsoft Azure Customer Advisory Team
// Microsoft Azure Customer Advisory Team
//
// This sample is supplemental to the technical guidance published on my personal
// blog at http://blogs.msdn.com/b/paolos/.
//
// blog at http://blogs.msdn.com/b/paolos/.
//
// Author: Paolo Salvatori
//=======================================================================================
// Copyright (c) Microsoft Corporation. All rights reserved.
//
// LICENSED UNDER THE APACHE LICENSE, VERSION 2.0 (THE "LICENSE"); YOU MAY NOT USE THESE
// FILES EXCEPT IN COMPLIANCE WITH THE LICENSE. YOU MAY OBTAIN A COPY OF THE LICENSE AT
//
// LICENSED UNDER THE APACHE LICENSE, VERSION 2.0 (THE "LICENSE"); YOU MAY NOT USE THESE
// FILES EXCEPT IN COMPLIANCE WITH THE LICENSE. YOU MAY OBTAIN A COPY OF THE LICENSE AT
// http://www.apache.org/licenses/LICENSE-2.0
// UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING, SOFTWARE DISTRIBUTED UNDER THE
// LICENSE IS DISTRIBUTED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, EITHER EXPRESS OR IMPLIED. SEE THE LICENSE FOR THE SPECIFIC LANGUAGE GOVERNING
// UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING, SOFTWARE DISTRIBUTED UNDER THE
// LICENSE IS DISTRIBUTED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, EITHER EXPRESS OR IMPLIED. SEE THE LICENSE FOR THE SPECIFIC LANGUAGE GOVERNING
// PERMISSIONS AND LIMITATIONS UNDER THE LICENSE.
//=======================================================================================
#endregion
Expand Down Expand Up @@ -45,6 +45,8 @@

namespace ServiceBusExplorer.Controls
{
using Abstractions;

public partial class PartitionListenerControl : UserControl
{
#region Private Constants
Expand Down Expand Up @@ -115,14 +117,13 @@ public partial class PartitionListenerControl : UserControl
private readonly WriteToLogDelegate writeToLog;
private readonly Func<Task> stopLog;
private readonly Action startLog;
private EventData currentEventData;
private int grouperEventDataCustomPropertiesWidth;
private EventDataMessage currentEventData;
private int currentMessageRowIndex;
private readonly int partitionCount;
private bool sorting;
private readonly SortableBindingList<EventData> eventDataBindingList = new SortableBindingList<EventData> { AllowNew = false, AllowEdit = false, AllowRemove = false };
private readonly SortableBindingList<EventDataMessage> eventDataBindingList = new SortableBindingList<EventDataMessage> { AllowNew = false, AllowEdit = false, AllowRemove = false };
private readonly IList<PartitionRuntimeInformation> partitionRuntumeInformationList = new List<PartitionRuntimeInformation>();
private BlockingCollection<EventData> eventDataCollection = new BlockingCollection<EventData>();
private BlockingCollection<EventDataMessage> eventDataCollection = new BlockingCollection<EventDataMessage>();
private System.Timers.Timer timer;
private long receiverMessageNumber;
private long receiverMessageSizeTotal;
Expand Down Expand Up @@ -151,6 +152,7 @@ public partial class PartitionListenerControl : UserControl
private bool clearing;
private bool cleared;
private readonly string iotHubConnectionString;
int grouperEventDataCustomPropertiesWidth;
public Task AsyncTrackEventDataTask { get; private set; }

#endregion
Expand Down Expand Up @@ -315,12 +317,12 @@ private void InitializeControls()
eventDataDataGridView.DefaultCellStyle.SelectionBackColor = Color.FromArgb(92, 125, 150);
eventDataDataGridView.DefaultCellStyle.SelectionForeColor = SystemColors.Window;

// Set RowHeadersDefaultCellStyle.SelectionBackColor so that its default
// Set RowHeadersDefaultCellStyle.SelectionBackColor so that its default
// value won't override DataGridView.DefaultCellStyle.SelectionBackColor.
eventDataDataGridView.RowHeadersDefaultCellStyle.SelectionBackColor = Color.FromArgb(153, 180, 209);

// Set the background color for all rows and for alternating rows.
// The value for alternating rows overrides the value for all rows.
// Set the background color for all rows and for alternating rows.
// The value for alternating rows overrides the value for all rows.
eventDataDataGridView.RowsDefaultCellStyle.BackColor = SystemColors.Window;
eventDataDataGridView.RowsDefaultCellStyle.ForeColor = SystemColors.ControlText;
//eventDataDataGridView.AlternatingRowsDefaultCellStyle.BackColor = Color.White;
Expand Down Expand Up @@ -631,7 +633,7 @@ private void CalculateLastColumnWidth(object sender)

private void eventDataDataGridView_RowEnter(object sender, DataGridViewCellEventArgs e)
{
var bindingList = eventDataBindingSource.DataSource as BindingList<EventData>;
var bindingList = eventDataBindingSource.DataSource as BindingList<EventDataMessage>;
currentMessageRowIndex = e.RowIndex;
if (bindingList == null)
{
Expand All @@ -644,11 +646,26 @@ private void eventDataDataGridView_RowEnter(object sender, DataGridViewCellEvent
currentEventData = bindingList[e.RowIndex];
eventDataPropertyGrid.SelectedObject = currentEventData;

LanguageDetector.SetFormattedMessage(serviceBusHelper, currentEventData.Clone(), txtMessageText);
try
{
//var eventData = currentEventData.Clone();
LanguageDetector.SetFormattedMessage(serviceBusHelper, currentEventData, txtMessageText);
}
catch (Exception exception)
{
HandleException(exception);
}

var listViewItems = currentEventData.Properties.Select(p => new ListViewItem(new[] { p.Key, (p.Value ?? string.Empty).ToString() })).ToArray();
eventDataPropertyListView.Items.Clear();
eventDataPropertyListView.Items.AddRange(listViewItems);
try
{
var listViewItems = currentEventData.Properties.Select(p => new ListViewItem(new[] { p.Key, (p.Value ?? string.Empty).ToString() })).ToArray();
eventDataPropertyListView.Items.Clear();
eventDataPropertyListView.Items.AddRange(listViewItems);
}
catch (Exception exception)
{
HandleException(exception);
}
}

private void tabPageMessages_Resize(object sender, EventArgs e)
Expand Down Expand Up @@ -691,7 +708,7 @@ private void eventDataDataGridView_CellDoubleClick(object sender, DataGridViewCe
{
return;
}
var bindingList = eventDataBindingSource.DataSource as BindingList<EventData>;
var bindingList = eventDataBindingSource.DataSource as BindingList<EventDataMessage>;
if (bindingList == null)
{
return;
Expand Down Expand Up @@ -831,7 +848,7 @@ private async void btnStart_Click(object sender, EventArgs e)
checkBoxCheckpoint,
cancellationTokenSource.Token)
{
TrackEvent = ev => Invoke(new Action<EventData>(m => eventDataCollection.Add(m)), ev),
TrackEvent = ev => Invoke(new Action<EventData>(m => eventDataCollection.Add(new EventDataMessage(m))), ev),
GetElapsedTime = GetElapsedTime,
UpdateStatistics = UpdateStatistics,
WriteToLog = writeToLog,
Expand Down Expand Up @@ -953,7 +970,7 @@ private void btnClear_Click(object sender, EventArgs e)
clearing = true;
cleared = true;
eventDataCollection.Dispose();
eventDataCollection = new BlockingCollection<EventData>();
eventDataCollection = new BlockingCollection<EventDataMessage>();
ClearTrackedMessages();
ClearStatistics();
ClearCharts();
Expand Down Expand Up @@ -1166,8 +1183,8 @@ private void RefreshGraph()
if (InvokeRequired)
{
Invoke(new Action<long, long, long, bool>(InternalUpdateStatistics),
new object[] { receiveTuple.Item1,
receiveTuple.Item2,
new object[] { receiveTuple.Item1,
receiveTuple.Item2,
receiveTuple.Item3,
graph});
}
Expand Down Expand Up @@ -1338,7 +1355,7 @@ private void PartitionListenerControl_Paint(object sender, PaintEventArgs e)
cboReceiverInspector.Size.Height + 1);
}

/// <summary>
/// <summary>
/// Clean up any resources being used.
/// </summary>
/// <param name="disposing">true if managed resources should be disposed; otherwise, false.</param>
Expand Down Expand Up @@ -1501,7 +1518,7 @@ private void saveSelectedEventToolStripMenuItem_Click(object sender, EventArgs e
{
return;
}
var bindingList = eventDataBindingSource.DataSource as BindingList<EventData>;
var bindingList = eventDataBindingSource.DataSource as BindingList<EventDataMessage>;
if (bindingList == null)
{
return;
Expand Down Expand Up @@ -1543,8 +1560,8 @@ private void saveSelectedEventsToolStripMenuItem_Click(object sender, EventArgs
{
return;
}
var messages = eventDataDataGridView.SelectedRows.Cast<DataGridViewRow>().Select(r => r.DataBoundItem as EventData);
IEnumerable<EventData> events = messages as EventData[] ?? messages.ToArray();
var messages = eventDataDataGridView.SelectedRows.Cast<DataGridViewRow>().Select(r => r.DataBoundItem as EventDataMessage);
IEnumerable<EventDataMessage> events = messages as EventDataMessage[] ?? messages.ToArray();
if (!events.Any())
{
return;
Expand Down
Loading