Skip to content

Commit

Permalink
draft
Browse files Browse the repository at this point in the history
  • Loading branch information
leandromoh committed Dec 23, 2023
1 parent 8213cc2 commit 08c370a
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 5 deletions.
6 changes: 3 additions & 3 deletions RecordParser/Engines/ExpressionHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ public static Expression Call(Delegate f, params Expression[] args) =>
Expression.Call(f.Target is null ? null : Expression.Constant(f.Target), f.Method, args);

public static Expression StringAsSpan(Expression str) =>
Expression.Call(typeof(MemoryExtensions), "AsSpan", Type.EmptyTypes, str);
Expression.Call(typeof(MemoryExtensions), nameof(MemoryExtensions.AsSpan), Type.EmptyTypes, str);

public static Expression SpanAsString(Expression span) =>
Expression.Call(span, "ToString", Type.EmptyTypes);

public static Expression Trim(Expression str) =>
Expression.Call(typeof(MemoryExtensions), "Trim", Type.EmptyTypes, str);
Expression.Call(typeof(MemoryExtensions), nameof(MemoryExtensions.Trim), Type.EmptyTypes, str);

public static Expression Slice(Expression span, Expression start) =>
Expression.Call(span, "Slice", Type.EmptyTypes, start);
Expand All @@ -30,6 +30,6 @@ public static Expression Slice(Expression span, Expression start, Expression len
Expression.Call(span, "Slice", Type.EmptyTypes, start, length);

public static Expression IsWhiteSpace(Expression valueText) =>
Expression.Call(typeof(MemoryExtensions), "IsWhiteSpace", Type.EmptyTypes, valueText);
Expression.Call(typeof(MemoryExtensions), nameof(MemoryExtensions.IsWhiteSpace), Type.EmptyTypes, valueText);
}
}
2 changes: 1 addition & 1 deletion RecordParser/Engines/Reader/PrimitiveTypeReaderEngine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ private static Expression GetEnumFromSpanParseExpression(Type type, Expression s
{
var enumText = color.ToString();

var compareTo = Expression.Call(typeof(MemoryExtensions), "CompareTo", Type.EmptyTypes,
var compareTo = Expression.Call(typeof(MemoryExtensions), nameof(MemoryExtensions.CompareTo), Type.EmptyTypes,
StringAsSpan(Expression.Constant(enumText)),
trim,
Expression.Constant(StringComparison.OrdinalIgnoreCase));
Expand Down
14 changes: 14 additions & 0 deletions RecordParser/Extensions/FileReader/ReaderCommon.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace RecordParser.Extensions
{
Expand Down Expand Up @@ -44,6 +45,19 @@ private static IEnumerable<T> Skip<T>(this IEnumerable<T> source, bool hasHeader
? source.Skip(1)
: source;

public static ParallelOptions AsParallel(this ParallelismOptions option)
{
var query = new ParallelOptions();

if (option.MaxDegreeOfParallelism is { } degree)
query.MaxDegreeOfParallelism = degree;

if (option.CancellationToken is { } token)
query.CancellationToken = token;

return query;
}

public static ParallelQuery<T> AsParallel<T>(this IEnumerable<T> source, ParallelismOptions option)
{
var query = source.AsParallel();
Expand Down
49 changes: 48 additions & 1 deletion RecordParser/Extensions/FileWriter/WriterExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;

namespace RecordParser.Extensions
{
Expand Down Expand Up @@ -47,14 +48,60 @@ public static void WriteRecords<T>(this TextWriter textWriter, IEnumerable<T> it
{
if (options.Enabled)
{
WriteParallel(textWriter, items, tryFormat, options);
if (options.EnsureOriginalOrdering)
WriteParallel(textWriter, items, tryFormat, options);
else
WriteParallelUnordered(textWriter, items, tryFormat, options);
}
else
{
WriteSequential(textWriter, items, tryFormat);
}
}

private class BufferContext
{
public char[] buffer;
public object lockObj;
}

private static void WriteParallelUnordered<T>(TextWriter textWriter, IEnumerable<T> items, TryFormat<T> tryFormat, ParallelismOptions options)
{
var poolSize = 1_000;
var textWriterLock = new object();
var opt = options.AsParallel();
var pool = Enumerable
.Range(0, poolSize)
.Select(_ => new BufferContext
{
buffer = new char[(int)Math.Pow(2, initialPow)],
lockObj = new object()
})
.ToArray();

Parallel.ForEach(items, opt, (item, _, i) =>
{
var x = pool[i % poolSize];

lock (x.lockObj)
{
x = pool[i % poolSize];
retry:

if (tryFormat(item, x.buffer, out var charsWritten))
{
lock (textWriterLock)
textWriter.WriteLine(x.buffer, 0, charsWritten);
}
else
{
x.buffer = new char[x.buffer.Length * 2];
goto retry;
}
}
});
}

private static void WriteParallel<T>(TextWriter textWriter, IEnumerable<T> items, TryFormat<T> tryFormat, ParallelismOptions options)
{
var poolSize = 10_000;
Expand Down

0 comments on commit 08c370a

Please sign in to comment.