Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Have MergedLinesEnumerable implement IAsyncEnumerable<string> #109

Open
wants to merge 30 commits into
base: release-1.7
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
3fddb89
Update README.md
madelson Nov 15, 2020
15815ce
Update README.md
madelson Mar 10, 2023
ca60f2a
Update README.md
madelson Mar 10, 2023
da45635
Update README.md
madelson Mar 10, 2023
e45f1f6
Update README.md
madelson Apr 4, 2023
87316df
Close #98: Have MergedLinesEnumerable implement IAsyncEnumerable<string>
Bartleby2718 Feb 26, 2024
7976a96
Merge branch 'release-1.7' into IAsyncEnumerable
Bartleby2718 Mar 7, 2024
d3720b9
Fix spaces, preprocessor directives, pull out common assertions, Add …
Bartleby2718 Mar 7, 2024
990605e
Use var
Bartleby2718 Mar 7, 2024
0ab96b0
Pass CancellationToken to GetAsyncEnumeratorInternal
Bartleby2718 Mar 7, 2024
fdd9137
Unfixed
Bartleby2718 Mar 8, 2024
750b1e0
Fix all bugs to pass all tests (but seeing some transient failures of…
Bartleby2718 Mar 10, 2024
caa9428
Clean up per Visual Studio's suggestions and remove comments
Bartleby2718 Mar 10, 2024
3b99204
Remove System.Linq.Async and use an extension method instead
Bartleby2718 Mar 10, 2024
6a2baae
Fix Condition for Microsoft.Bcl.AsyncInterfaces
Bartleby2718 Mar 10, 2024
983a28f
Revert primary constructor changes to fix CI failures
Bartleby2718 Mar 10, 2024
7104cea
Also revert collection expressions changes
Bartleby2718 Mar 10, 2024
f6a555a
Revert primary constructor in AsyncEnumerableAdapter
Bartleby2718 Mar 10, 2024
9030874
Revert primary constructor in AsyncEnumeratorAdapter
Bartleby2718 Mar 10, 2024
b2b7bbe
Revert collection expression in MergedLinesEnumerableTestBase
Bartleby2718 Mar 10, 2024
f9f0813
Fix spacing and variable names
Bartleby2718 Mar 11, 2024
8d18b9d
Fix preprocessor directives for IAsyncEnumerable/IAsyncEnumerator
Bartleby2718 Mar 11, 2024
5151a25
Replace AsyncEnumerableAdapter with AsAsyncEnumerable
Bartleby2718 Mar 11, 2024
97198a3
Revert to doing Task.WaitAll(task1, task2, consumeTask)
Bartleby2718 Mar 11, 2024
1420db9
Revert AsAsyncEnumerable changes to disallow repeated consumptions
Bartleby2718 Mar 11, 2024
4c68836
Minor style changes
Bartleby2718 Jun 29, 2024
263d7dd
Do not wait consumeTask with other tasks, and revert everything else
Bartleby2718 Jun 29, 2024
16b38df
Merge branch 'release-1.7' into IAsyncEnumerable
Bartleby2718 Jun 29, 2024
7830e3e
Try bumping the timeout, considering the CI pipeline
Bartleby2718 Jun 29, 2024
ff9af41
Try replacing Task.Run with an async method
Bartleby2718 Jul 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 13 additions & 22 deletions MedallionShell.Tests/Streams/MergedLinesEnumerableTestBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -97,46 +97,37 @@ void TestOneThrows(bool reverse)
TestOneThrows(reverse: true);
}

[Test, Timeout(10000)] // something's wrong if it's taking more than 10 seconds
public void FuzzTest()
[Test, Timeout(5_000)] // something's wrong if it's taking more than 5 seconds
public async Task FuzzTest()
{
var pipe1 = new Pipe();
var pipe2 = new Pipe();
Pipe pipe1 = new(), pipe2 = new();

var asyncEnumerable = this.Create(new StreamReader(pipe1.OutputStream), new StreamReader(pipe2.OutputStream));

var strings1 = Enumerable.Range(0, 2000).Select(_ => Guid.NewGuid().ToString()).ToArray();
var strings2 = Enumerable.Range(0, 2300).Select(_ => Guid.NewGuid().ToString()).ToArray();

static void WriteStrings(IReadOnlyList<string> strings, TextWriter writer)
static void WriteStrings(IReadOnlyList<string> strings, Pipe pipe)
{
var spinWait = default(SpinWait);
var random = new Random(Guid.NewGuid().GetHashCode());
SpinWait spinWait = default;
Random random = new(Guid.NewGuid().GetHashCode());
using StreamWriter writer = new(pipe.InputStream);
foreach (var line in strings)
{
if (random.Next(110) == 1)
if (random.Next(10) == 1)
{
spinWait.SpinOnce();
}

writer.WriteLine(line);
}
}

var task1 = Task.Run(() =>
{
using StreamWriter writer1 = new(pipe1.InputStream);
WriteStrings(strings1, writer1);
});
var task2 = Task.Run(() =>
{
using StreamWriter writer2 = new(pipe2.InputStream);
WriteStrings(strings2, writer2);
});
var consumeTask = asyncEnumerable.ToListAsync();
Task.WaitAll(task1, task2, consumeTask);

CollectionAssert.AreEquivalent(strings1.Concat(strings2).ToList(), consumeTask.Result);
var task1 = Task.Run(() => WriteStrings(strings1, pipe1));
var task2 = Task.Run(() => WriteStrings(strings2, pipe2));
Task.WaitAll(task1, task2); // need to dispose the writer to end the stream
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Bartleby2718 can we try await Task.WhenAll(task1, task2, consumeTask); here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@madelson I've been trying to get that working, but this test fails for the async case if I do that. Not sure if the test logic is flawed (i.e. shouldn't await consumeTask if the input streams may not have been closed?) or there's a bug somewhere else.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you try specifically with the test being async and using await Task.WhenAll instead of Task.WaitAll? If that doesn't work, could be some kind of threading bug in 1.7

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@madelson Yes, MergedLinesEnumerableTestAsync's FuzzTest fails but MergedLinesEnumerableTestSync's FuzzTest is fine. It fails even when it's run alone, but I did notice that TestPipeline(2) always fails with it if all tests are run together:
image

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@madelson I somehow got the test to pass with await Task.WhenAll(task1, task2, consumeTask);! Not sure if this is the fix or it means something else needs to be fixed, but this does look promising.

Let me know what you think! (FWIW the test didn't pass within 10 secodns with if (random.Next(4) == 1).)


Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I had to swap the order for tests to pass. Is this a red flag?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah let's revert this change and make sure it still passes. Also, does this pass or fail on main? You didn't make any changes to Pipe I think so it may be an issue with the release branch.

Copy link
Author

@Bartleby2718 Bartleby2718 Mar 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@madelson I looked more into this and gathered some numbers, but I'm lost as to how I should debug this.

Note:

  • Workarounds used in the test project to run tests on master:
    • updated PackageReferences to nunit 3.13.3, NUnit3TestAdapter 3.17.0, Microsoft.NET.Test.Sdk 15.9.0
    • added NU1902,NU1903 to NoWarn and set CheckEolTargetFramework to false
chance \ Branch master
(net46 / netcoreapp2.2)
release-1.7
(net462 / net6.0)
AsyncEnumerable with Task.WaitAll(task1, task2, consumeTask)
(net462 / net6.0)
25% (current) 516ms / 8s
image
511ms / 8.4s
image
522ms / timeout in the async case, 9.2s in the sync case
image
20% (new) 408ms / 6.4s
image
374ms / 6.2s
image
282ms / timeout in the async case, 5.9s in the sync case
image

However, I noticed that a small change makes a difference.

  1. If I start consume before waiting writes, I get a timeout:
            var consumeTask = Task.Run(enumerable.ToListAsync);
            Task.WaitAll(task1, task2);
  1. If I wait for writes before I start consuming, the performance is similar to the first two columns:
            Task.WaitAll(task1, task2);
            var consumeTask = Task.Run(enumerable.ToListAsync);
  • This proves that the problem lies in consumeTask, not SpinWait.
  1. If I do the same as but use much shorter strings1 and strings2, the test completes within 20ms.
            // originally 2000
            var strings1 = Enumerable.Range(0, 20).Select(_ => Guid.NewGuid().ToString()).ToArray();
            // originally 2300
            var strings2 = Enumerable.Range(0, 23).Select(_ => Guid.NewGuid().ToString()).ToArray();
...
            // same as master or release-1.7
            var consumeTask = Task.Run(enumerable.ToListAsync);
            Task.WaitAll(task1, task2, consumeTask);

            CollectionAssert.AreEquivalent(strings1.Concat(strings2).ToList(), consumeTask.Result);

Therefore, I believe that my IAsyncEnumerable implementation is flawed in a way that somehow "explodes" for bigger inputs (the threshold fluctuates, but it's somewhere between 70 and low 100s).

Any idea how I should debug this? For one thing, I think replacing Guid.NewGuid() with a human-friendly value will help, but that's all I can think of. I can also temporarily comment out SpinWait-related code.

Copy link
Author

@Bartleby2718 Bartleby2718 Mar 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also found that consumeTask.Status is WaitingForActivation even after a few seconds (if I don't await it or include consumeTask in the WaitAll).

Copy link
Author

@Bartleby2718 Bartleby2718 Mar 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like ChatGPT came to the rescue again. (It wasn't helping a few hours ago.)

I lost its message, but it said something along the lines of "StreamWriter wasn't being disposed properly, causing the Pipe's InputStream to wait indefinitely."

Now the test passes, but I can't run spinWait.SpinOnce(); as often. Do you think the frequency should also be a protected virtual value?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I lost its message, but it said something along the lines of "StreamWriter wasn't being disposed properly, causing the Pipe's InputStream to wait indefinitely."

Makes sense; we have do dispose the writer to end the stream. Can you point me to the relevant code change?

Now the test passes, but I can't run spinWait.SpinOnce(); as often. Do you think the frequency should also be a protected virtual value?

I'm not sure I follow here. As often as what? Does it fail when it runs more often? In what way? How does the overall time for this test case compare before and after the changes (I would expect it to be the same). Who would be overriding the frequency if it were protected virtual?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@madelson

  1. StreamWriter changes
    FuzzTest is the one where I had to change this. Note the parameter changes of the local function WriteStrings.
  2. spinWait.SpinOnce() changes
    In FuzzTest, specifically https://github.com/madelson/MedallionShell/pull/109/files#diff-68fdbc9634d30b7e1a0bb438ab37b458f0c478766fba188c44ece72f93e41cacR102-R121, I updated the if condition from random.Next(4) == 1 to random.Next(110) == 1, so I'm spinning left often (25% -> 0.91%). If I use a greater value for random.Next (i.e. spin more often), then the test never ends (at least for like 30 minutes, after which I stop the test) only in the async case. Therefore I was wondering if MergedLinesEnumerableTestAsync and MergedLinesEnumerableTestSync should use different frequencies by overriding a protected virtual variable.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@madelson Let me know if the above makes sense!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@madelson Bumping this thread!

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the test never ends (at least for like 30 minutes, after which I stop the test) only in the async case

This makes me feel like there is a bug somewhere. It could be in the MergedLinesEnumerable changes, it could be in the test code, or it could be in the Pipe code.

What I would suggest is to (temporarily) add some logging statements to the code like this:

static class TempLogger
{
    private static readonly object Lock = new();

    public static void Log(string message)
    {
       lock (Lock)
       {
            File.AppendAllLines(@"c:\dev\log.txt", [$"[{DateTime.Now}] {message}"]);
       }
    }
}

My assumption is that at some point we should stop seeing log statements as the code will enter a hung state. We can then add additional logs to try to get closer and closer to the point where each thread stops.

From there, hopefully we can deduce why it is hanging.

CollectionAssert.AreEquivalent(strings1.Concat(strings2), await asyncEnumerable.ToListAsync());
}
Bartleby2718 marked this conversation as resolved.
Show resolved Hide resolved
}

Expand Down