-
Notifications
You must be signed in to change notification settings - Fork 29
Thread Safety
The CardinalityEstimation library provides thread-safe cardinality estimation through the ConcurrentCardinalityEstimator class and related utilities. This guide covers everything you need to know about using thread-safe cardinality estimation in your applications.
The ConcurrentCardinalityEstimator provides all the functionality of the regular CardinalityEstimator while ensuring thread safety for concurrent access scenarios. It's designed for applications that need to:
- Process data from multiple threads simultaneously
- Merge cardinality estimates in parallel
- Scale cardinality estimation across distributed workloads
- Maintain accuracy while maximizing throughput
using CardinalityEstimation;
// Create a thread-safe cardinality estimator
using var estimator = new ConcurrentCardinalityEstimator(b: 14);
// Add elements from multiple threads safely
var elements = GetDataFromMultipleSources();
Parallel.ForEach(elements, element => estimator.Add(element));
// Get the estimated count (thread-safe)
var uniqueCount = estimator.Count();
Console.WriteLine($"Estimated unique elements: {uniqueCount}");// Convert an existing estimator to thread-safe
var regularEstimator = new CardinalityEstimator();
regularEstimator.Add("some data");
using var concurrentEstimator = regularEstimator.ToConcurrent();
// Now safe for concurrent access- ? All operations are thread-safe:
Add(),Count(),Merge() - ? Lock-free counting: Atomic operations for performance counters
- ? Optimized locking: Reader-writer locks minimize contention
- ? Deadlock prevention: Consistent lock ordering prevents deadlocks
- ? Resource management: Proper disposal pattern with
IDisposable
- Add Operations: Lock-free for most cases, minimal locking overhead
- Count Operations: Highly concurrent read operations
- Merge Operations: Optimized with batching to reduce memory pressure
- Memory Usage: Same memory footprint as regular estimator
// Basic constructor with defaults
var estimator = new ConcurrentCardinalityEstimator();
// Custom accuracy and hash function
var customEstimator = new ConcurrentCardinalityEstimator(
hashFunction: myHashFunction,
b: 12, // Accuracy parameter (4-16)
useDirectCounting: true // Exact counting for small sets
);
// Copy constructor from regular estimator
var fromRegular = new ConcurrentCardinalityEstimator(regularEstimator);
// Copy constructor from another concurrent estimator
var copy = new ConcurrentCardinalityEstimator(existingConcurrentEstimator);The ConcurrentCardinalityEstimator supports the same types as the regular estimator:
using var estimator = new ConcurrentCardinalityEstimator();
// Primitive types
estimator.Add("string value");
estimator.Add(42); // int
estimator.Add(42u); // uint
estimator.Add(42L); // long
estimator.Add(42UL); // ulong
estimator.Add(3.14f); // float
estimator.Add(3.14); // double
// Binary data
estimator.Add(new byte[] { 1, 2, 3, 4 });Merge multiple estimators efficiently using parallel processing:
var estimators = new List<ConcurrentCardinalityEstimator>
{
estimator1, estimator2, estimator3, estimator4
};
// Merge in parallel with default parallelism
using var merged = ConcurrentCardinalityEstimator.ParallelMerge(estimators);
// Merge with custom parallelism degree
using var merged = ConcurrentCardinalityEstimator.ParallelMerge(
estimators,
parallelismDegree: 4
);Process large datasets by distributing work across multiple estimators:
// Create multiple estimators for parallel processing
var estimators = CardinalityEstimatorExtensions.CreateMultiple(
count: 4, // Number of estimators
b: 14, // Accuracy parameter
useDirectCounting: true
);
// Distribute elements across estimators using different strategies
var largeDataset = GetLargeDataset();
// Round-robin distribution (default)
estimators.ParallelAdd(largeDataset, PartitionStrategy.RoundRobin);
// Hash-based distribution (better locality)
estimators.ParallelAdd(largeDataset, PartitionStrategy.Hash);
// Chunked distribution (sequential blocks)
estimators.ParallelAdd(largeDataset, PartitionStrategy.Chunked);
// Merge final results
using var finalEstimator = ConcurrentCardinalityEstimator.Merge(estimators);
var totalUniqueCount = finalEstimator.Count();
// Clean up
foreach (var est in estimators) est.Dispose();// Convert regular estimator to concurrent
using var concurrent = regularEstimator.ToConcurrent();
// Safe merge with automatic type conversion
var result = CardinalityEstimatorExtensions.SafeMerge(
regularEstimator1,
concurrentEstimator1,
regularEstimator2,
null // null values are safely ignored
);
// Parallel merge for regular estimators
var regularEstimators = new[] { est1, est2, est3 };
using var merged = regularEstimators.ParallelMerge(parallelismDegree: 2);// Create multiple estimators for distributed processing
var estimators = CardinalityEstimatorExtensions.CreateMultiple(
count: Environment.ProcessorCount,
b: 12,
useDirectCounting: true
);Always dispose of estimators to free resources:
// Using statement (recommended)
using var estimator = new ConcurrentCardinalityEstimator();
// Or explicit disposal
var estimator = new ConcurrentCardinalityEstimator();
try
{
// Use estimator
}
finally
{
estimator.Dispose();
}// Memory vs. Accuracy tradeoffs
var lowMemory = new ConcurrentCardinalityEstimator(b: 4); // ~1KB, ~100% error
var balanced = new ConcurrentCardinalityEstimator(b: 14); // ~16KB, ~3% error
var highAccuracy = new ConcurrentCardinalityEstimator(b: 16); // ~64KB, ~1% error// Use processor count for CPU-bound work
var parallelDegree = Environment.ProcessorCount;
// For I/O-bound work, consider higher values
var ioBoundParallelism = Environment.ProcessorCount * 2;
var merged = ConcurrentCardinalityEstimator.ParallelMerge(
estimators,
parallelismDegree: parallelDegree
);// Round-robin: Best for uniform data distribution
estimators.ParallelAdd(data, PartitionStrategy.RoundRobin);
// Hash-based: Best when data locality matters
estimators.ParallelAdd(data, PartitionStrategy.Hash);
// Chunked: Best for ordered/sequential data
estimators.ParallelAdd(data, PartitionStrategy.Chunked);try
{
var estimator = new ConcurrentCardinalityEstimator();
// This will throw ArgumentOutOfRangeException
var invalid = new ConcurrentCardinalityEstimator(b: 20); // Max is 16
}
catch (ArgumentOutOfRangeException ex)
{
Console.WriteLine($"Invalid accuracy parameter: {ex.Message}");
}
try
{
estimator1.Merge(estimator2); // Different accuracy parameters
}
catch (ArgumentOutOfRangeException ex)
{
Console.WriteLine($"Cannot merge estimators: {ex.Message}");
}
try
{
estimator.Dispose();
estimator.Add("data"); // Will throw ObjectDisposedException
}
catch (ObjectDisposedException ex)
{
Console.WriteLine($"Estimator disposed: {ex.Message}");
}// High-throughput scenario
using var estimator = new ConcurrentCardinalityEstimator(b: 12);
// Many writers, few readers (optimal)
Parallel.ForEach(data, item => estimator.Add(item));
var count = estimator.Count(); // Infrequent reads
// Many readers, few writers (still efficient)
var readerTasks = Enumerable.Range(0, 10)
.Select(_ => Task.Run(() =>
{
while (!cancellationToken.IsCancellationRequested)
{
var currentCount = estimator.Count(); // Concurrent reads
await Task.Delay(100);
}
}))
.ToArray();// For memory-constrained environments
using var estimator = new ConcurrentCardinalityEstimator(
b: 8, // Lower memory usage
useDirectCounting: false // Skip direct counting phase
);
// For accuracy-critical scenarios
using var estimator = new ConcurrentCardinalityEstimator(
b: 16, // Higher accuracy
useDirectCounting: true // Exact counting for small sets
);[ApiController]
[Route("api/[controller]")]
public class AnalyticsController : ControllerBase
{
private readonly ConcurrentCardinalityEstimator _estimator;
public AnalyticsController()
{
_estimator = new ConcurrentCardinalityEstimator(b: 14);
}
[HttpPost("track")]
public IActionResult TrackEvent([FromBody] EventData eventData)
{
// Thread-safe addition
_estimator.Add(eventData.UserId);
return Ok();
}
[HttpGet("unique-users")]
public IActionResult GetUniqueUsers()
{
var count = _estimator.Count();
return Ok(new { UniqueUsers = count });
}
protected override void Dispose(bool disposing)
{
if (disposing)
{
_estimator?.Dispose();
}
base.Dispose(disposing);
}
}public class CardinalityAnalysisService : BackgroundService
{
private readonly ConcurrentCardinalityEstimator[] _estimators;
private readonly ILogger<CardinalityAnalysisService> _logger;
public CardinalityAnalysisService(ILogger<CardinalityAnalysisService> logger)
{
_logger = logger;
_estimators = CardinalityEstimatorExtensions.CreateMultiple(
count: Environment.ProcessorCount,
b: 14
);
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
var data = await FetchDataAsync();
// Process data in parallel across estimators
_estimators.ParallelAdd(data, PartitionStrategy.Hash);
// Periodically merge and report
if (ShouldReport())
{
using var merged = ConcurrentCardinalityEstimator.Merge(_estimators);
var uniqueCount = merged.Count();
_logger.LogInformation("Unique items processed: {Count}", uniqueCount);
}
await Task.Delay(TimeSpan.FromMinutes(1), stoppingToken);
}
}
public override void Dispose()
{
foreach (var estimator in _estimators)
{
estimator.Dispose();
}
base.Dispose();
}
}public class StreamProcessor
{
private readonly ConcurrentCardinalityEstimator _estimator;
public StreamProcessor()
{
_estimator = new ConcurrentCardinalityEstimator(b: 14);
}
public async Task ProcessStreamAsync(IAsyncEnumerable<string> dataStream)
{
var semaphore = new SemaphoreSlim(Environment.ProcessorCount);
var tasks = new List<Task>();
await foreach (var item in dataStream)
{
await semaphore.WaitAsync();
var task = Task.Run(() =>
{
try
{
_estimator.Add(item);
}
finally
{
semaphore.Release();
}
});
tasks.Add(task);
}
await Task.WhenAll(tasks);
}
public ulong GetUniqueCount() => _estimator.Count();
public void Dispose() => _estimator.Dispose();
}// Before (not thread-safe)
var estimator = new CardinalityEstimator();
lock (lockObject)
{
estimator.Add(item); // Manual locking required
}
// After (thread-safe)
using var estimator = new ConcurrentCardinalityEstimator();
estimator.Add(item); // No manual locking neededpublic class HybridService
{
private readonly CardinalityEstimator _legacyEstimator;
private readonly ConcurrentCardinalityEstimator _newEstimator;
public void MigrateToThreadSafe()
{
// Create concurrent version from existing data
_newEstimator = new ConcurrentCardinalityEstimator(_legacyEstimator);
// Gradually switch traffic to new estimator
// Both can coexist during transition
}
}Issue: ObjectDisposedException when accessing estimator
// Solution: Ensure proper lifetime management
using var estimator = new ConcurrentCardinalityEstimator();
// Don't access estimator after using blockIssue: Poor performance with high contention
// Solution: Use distributed processing
var estimators = CardinalityEstimatorExtensions.CreateMultiple(
Environment.ProcessorCount
);
// Process in parallel, merge resultsIssue: Memory usage higher than expected
// Solution: Adjust accuracy parameter
var lowerMemoryEstimator = new ConcurrentCardinalityEstimator(b: 10);// Monitor additions and accuracy
var estimator = new ConcurrentCardinalityEstimator();
// Track operation counts
Console.WriteLine($"Total additions: {estimator.CountAdditions}");
Console.WriteLine($"Estimated count: {estimator.Count()}");
// Convert to regular estimator for detailed inspection
var snapshot = estimator.ToCardinalityEstimator();
// Use existing debugging tools on snapshot| Method | Description | Thread Safety |
|---|---|---|
Add(T element) |
Add element to the set | ? Thread-safe |
Count() |
Get estimated cardinality | ? Thread-safe |
Merge(other) |
Merge another estimator | ? Thread-safe |
ToCardinalityEstimator() |
Create non-thread-safe snapshot | ? Thread-safe |
Dispose() |
Release resources | ? Thread-safe |
| Method | Description |
|---|---|
ToConcurrent() |
Convert to concurrent estimator |
ParallelMerge() |
Merge collection in parallel |
SafeMerge() |
Merge with type conversion |
CreateMultiple() |
Factory for multiple estimators |
ParallelAdd<T>() |
Distribute elements across estimators |
| Enum | Values | Description |
|---|---|---|
PartitionStrategy |
RoundRobin, Chunked, Hash
|
Element distribution strategy |
For questions, issues, or contributions, please visit the CardinalityEstimation GitHub repository.
- .NET 8.0 and later
- .NET 9.0 and later
- Compatible with existing serialization formats
- Backward compatible with regular
CardinalityEstimator