Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[QUESTION] Abandon a pipeline after exec has been invoked #585

Open
jbrezina opened this issue Aug 6, 2024 · 7 comments
Open

[QUESTION] Abandon a pipeline after exec has been invoked #585

jbrezina opened this issue Aug 6, 2024 · 7 comments

Comments

@jbrezina
Copy link
Contributor

jbrezina commented Aug 6, 2024

Describe the problem
I have the code which is called in a loop and I need to use the connection pool every iteration. If the single iteration does not finish within the specified time span I need to abandon the results and I'm not sure how to do that.
pipeline destructor returns the connection to the pool, but in the next iteration I get an error with access violation. I cannot use discard as it also results in access violation.
Is there a safe way how to abandon the in-progress commands on connections from the connection pool?

std::future<void> task;
{
	auto pipeline{ _redis->pipeline(false) };

	task = std::async(std::launch::async, [&]() {
		// filling the pipe with data
		pipeline.set(...);

		auto replies{ pipeline.exec() };
	});

	if (const auto status{ task.wait_for(std::chrono::milliseconds(500)) };
		status == std::future_status::timeout) {
		// abandon the pipeline if it didn't finish within the required time span
		//pipeline.discard();
	}
}

Environment:

  • OS: Windows 11
  • Compiler: Visual Studio MSVC Version 17.10.4
  • hiredis version: v1.2.0
  • redis-plus-plus version: 1.3.12
@sewenew
Copy link
Owner

sewenew commented Aug 8, 2024

Operations on Pipeline is not thread-safe.

In your case, e.g. calling set and exec in one thread, while calling discard in another thread, you need to use a mutex to protect these operations.

Regards

@jbrezina
Copy link
Contributor Author

jbrezina commented Aug 8, 2024

Calling discard caused access violation while writing to the connection in the next iteration
Should setting socket timeout solve this issue? Although if I set too small value I cannot establis the connection..

@sewenew
Copy link
Owner

sewenew commented Aug 8, 2024

Calling discard caused access violation while writing to the connection in the next iteration

I think this is caused by race condition, and as I mentioned, you need to use mutex to protect it.

Should setting socket timeout solve this issue?

No, I don't think this is the right solution.

Regards

@jbrezina
Copy link
Contributor Author

jbrezina commented Aug 8, 2024

what is here to protect? the exec is executed within the task almost immediately
when the task does not finish within the 500ms it is discarded (the commented code, I think there was a problem in swapping the connection options when creating the new connection) and the (new) connection is returned to the pool
in the next iteration when the connection is received there is again the access violation while using set on the pipeline
there is no race condition between calling exec and discard

@sewenew
Copy link
Owner

sewenew commented Aug 11, 2024

what is here to protect? the exec is executed within the task almost immediately

How can you ensure that all operations on pipeline in the async thread finish in 500ms?

when the task does not finish within the 500ms it is discarded

If the set or exec runs after discard, the behavior is undefined.

Also, if set or exec runs after discard, the pipeline object will go out-of-scope, and again, the behavior is undefined.

You can try the following code to test if it's working:

std::future<void> task;
sw::redis::Pipeline *pipe = nullptr;
std::mutex mtx;
{
	auto pipeline{ _redis->pipeline(false) };
        pipe = &pipeline;

	task = std::async(std::launch::async, [&]() {
                {
                  std::lock_guard<std::mutex> lock(mtx);
                  if (pipe == nullptr) {
                      return;       // pipeline has been discarded.
                  }
		  // filling the pipe with data
		  pipe->set(...);
                }

                {
                  std::lock_guard<std::mutex> lock(mtx);
                  if (pipe == nullptr) {
                      return;       // pipeline has been discarded.
                  }
		  auto replies{ pipe->exec() };
               }
	});

	if (const auto status{ task.wait_for(std::chrono::milliseconds(500)) };
		status == std::future_status::timeout) {
		// abandon the pipeline if it didn't finish within the required time span
                std::lock_guard<std::mutex> lock(mtx);
		pipe->discard();
                pipe = nullptr;
	}
}

Regards

@jbrezina
Copy link
Contributor Author

How can you ensure that all operations on pipeline in the async thread finish in 500ms?

I know they finish and that is not the issue here even thought I get your point. All set and execute operations are done long before 500ms

If the set or exec runs after discard, the behavior is undefined.

So that's the question, how can I abandon the pipeline? discard should create new connection and return that to the pool, so in the next iteration when I call pipeline on redis instance, I should get the new connection from the pool.

The problem with your code sample is, that it is blocked until the exec is finished because of the mutex and that's exactly what I'm trying to avoid. I wanna abandon the connection the moment wait_for timeouts and I don't care about the results anymore because the next iteration overwrites it all.
Furthermore, keeping the pointer to the connection does not have much sense, as the underlying Connection object is swapped in the reconnect function and the local temporary object, which holds the previous operations in progress after the swap, is being destroyed.

@sewenew
Copy link
Owner

sewenew commented Aug 14, 2024

discard should create new connection and return that to the pool, so in the next iteration when I call pipeline on redis instance, I should get the new connection from the pool.

Yes, that's the current behavior.

The problem with your code sample is, that it is blocked until the exec is finished because of the mutex and that's exactly what I'm trying to avoid.

If you don't block until exec finishes, how can you ensure that exec runs before discard? If you cannot ensure it, you'll get race condition.

keeping the pointer to the connection does not have much sense, as the underlying Connection object is swapped in the reconnect function and the local temporary object, which holds the previous operations in progress after the swap, is being destroyed.

Yes, the previous connection has been destroyed, and that's why I reset the pipe pointer after I call discard.

If you don't want to wait for 500ms, you can set ConnectionOptions::socket_timeout to 500ms, or try the async interface.

Regards

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants