Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement nullability in streams Buffer #7496

Open
wants to merge 2 commits into
base: dev
Choose a base branch
from

Conversation

Arkatufus
Copy link
Contributor

@Arkatufus Arkatufus commented Feb 5, 2025

Changes

  • Implement nullability in Akka.Streams.Implementation.Buffer
  • Propagate and implement nullability to stream classes that uses Buffer:
    • ActorRefSourceActor
    • Buffer
    • SelectAsync
    • SelectAsyncUnordered
    • Delay
    • FlattenMerge
    • SourceRefStageImpl
    • QueueSink
    • QueueSource

Checklist

For significant changes, please ensure that the following have been completed (delete if not relevant):

Copy link
Contributor Author

@Arkatufus Arkatufus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Self-review

}
}

if (_terminating && _buffer.IsEmpty)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an actual bug, _buffer can be null if _stage._maxBuffer is less than 1. Moved the code to inside the previous if block

@@ -54,7 +55,7 @@ internal interface IBuffer<T>
/// TBD
/// </summary>
/// <returns>TBD</returns>
T Peek();
T? Peek();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Original behavior is that Peek() can return null

@Aaronontheweb Aaronontheweb added this to the 1.5.38 milestone Feb 5, 2025
Copy link
Member

@Aaronontheweb Aaronontheweb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not done reviewing yet but left a comment

@@ -90,7 +90,7 @@ protected bool DefaultReceive(object message)
Context.Stop(Self);
else if (message is Status.Success)
{
if (BufferSize == 0 || Buffer.IsEmpty)
if (Buffer is null || Buffer.IsEmpty)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@@ -63,13 +63,13 @@ public ActorRefSourceActor(int bufferSize, OverflowStrategy overflowStrategy, in
{
BufferSize = bufferSize;
OverflowStrategy = overflowStrategy;
Buffer = bufferSize != 0 ? Implementation.Buffer.Create<T>(bufferSize, maxFixedBufferSize) : null;
Buffer = bufferSize > 0 ? Implementation.Buffer.Create<T>(bufferSize, maxFixedBufferSize) : null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although, should we throw an exception here if the bufferSize is less than 0 ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should, I just tried to preserve the old behavior

@Aaronontheweb
Copy link
Member

Another thing I'm wondering - is there a compelling reason to have a null buffer at all?

@Arkatufus
Copy link
Contributor Author

In QueueSource case, maybe, though I don't know what the use case is and what the thought behind it, its a 1 to 1 behavior port from scala Akka.

@Arkatufus
Copy link
Contributor Author

Though for QueueSource, there is a difference in scala Akka, in their version, _pendingOffer is another buffer, so you have 2 buffers before the stage backpressure.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants