Skip to content

Commit

Permalink
Fix #3167: avoid infinite wait on failure to init curl handle (#3200)
Browse files Browse the repository at this point in the history
  • Loading branch information
SergeyRyabinin authored Nov 20, 2024
1 parent 09da88b commit aa4382d
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 1 deletion.
29 changes: 29 additions & 0 deletions src/aws-cpp-sdk-core/include/aws/core/utils/ResourceManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,34 @@ namespace Aws
return resource;
}

/**
* Returns a resource with exclusive ownership or a nullptr.
* If resource is available within the wait timeout then the resource is returned.
* otherwise (if the timeout has expired or container is shutdown) the nullptr is returned.
* You must call Release on the resource when you are finished.
* This method is enabled only for pointer RESOURCE_TYPE type.
*
* @return instance of RESOURCE_TYPE, nullptr if the resource manager is being shutdown
*/
template <typename std::enable_if<std::is_pointer<RESOURCE_TYPE>::value>::type* = nullptr>
RESOURCE_TYPE TryAcquire(const uint64_t timeoutMs) {
std::unique_lock<std::mutex> locker(m_queueLock);
bool hasResource = m_shutdown.load() || !m_resources.empty();
if (!hasResource) {
hasResource = m_semaphore.wait_for(locker, std::chrono::milliseconds(timeoutMs),
[&]() { return m_shutdown.load() || !m_resources.empty(); });
}

if (m_shutdown || !hasResource) {
return nullptr;
}

RESOURCE_TYPE resource = m_resources.back();
m_resources.pop_back();

return resource;
}

/**
* Returns whether or not resources are currently available for acquisition
*
Expand Down Expand Up @@ -122,6 +150,7 @@ namespace Aws
{
std::unique_lock<std::mutex> locker(m_queueLock);
m_shutdown = true;
m_semaphore.notify_all();

//wait for all acquired resources to be released.
while (m_resources.size() < resourceCount)
Expand Down
22 changes: 21 additions & 1 deletion src/aws-cpp-sdk-core/source/http/curl/CurlHandleContainer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ CurlHandleContainer::~CurlHandleContainer()
AWS_LOGSTREAM_DEBUG(CURL_HANDLE_CONTAINER_TAG, "Cleaning up " << handle);
curl_easy_cleanup(handle);
}
m_poolSize = 0;
}

CURL* CurlHandleContainer::AcquireCurlHandle()
Expand All @@ -44,7 +45,20 @@ CURL* CurlHandleContainer::AcquireCurlHandle()
CheckAndGrowPool();
}

CURL* handle = m_handleContainer.TryAcquire();
// TODO: 1.12: start to fail instead of infinite loop, possibly introduce another timeout config field
CURL* handle = nullptr;
bool errorLogged = false; // avoid log explosion on legacy app behavior
while (!handle) {
constexpr unsigned long ACQUIRE_TIMEOUT = 1000l; // some big enough arbitrary value, possibly need a user config or just fail ASAP.
handle = m_handleContainer.TryAcquire(ACQUIRE_TIMEOUT);
if (!handle && !errorLogged) {
AWS_LOGSTREAM_ERROR(CURL_HANDLE_CONTAINER_TAG,
"Unable to Acquire a curl handle within 1 second. "
"Waiting further, this method will start failing in 1.12.x. "
"Please increase the pool size.");
errorLogged = true;
}
}
AWS_LOGSTREAM_DEBUG(CURL_HANDLE_CONTAINER_TAG, "Connection has been released. Continuing.");
AWS_LOGSTREAM_DEBUG(CURL_HANDLE_CONTAINER_TAG, "Returning connection handle " << handle);
return handle;
Expand Down Expand Up @@ -80,10 +94,16 @@ void CurlHandleContainer::DestroyCurlHandle(CURL* handle)
// If the handle is not released back to the pool, it could create a deadlock
// Create a new handle and release that into the pool
handle = CreateCurlHandleInPool();
if (!handle && m_poolSize) {
m_poolSize -= 1;
}
}
if (handle)
{
AWS_LOGSTREAM_DEBUG(CURL_HANDLE_CONTAINER_TAG, "Created replacement handle and released to pool: " << handle);
} else {
AWS_LOGSTREAM_ERROR(CURL_HANDLE_CONTAINER_TAG,
"Failed to create a replacement handle. The handle pool size reduced to " << m_poolSize);
}
}

Expand Down
107 changes: 107 additions & 0 deletions tests/aws-cpp-sdk-core-tests/utils/ResourceManagerTest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

#include <aws/core/utils/ResourceManager.h>
#include <aws/testing/AwsCppSdkGTestSuite.h>

#include <future>
#include <random>

using namespace Aws::Utils;

class ExclusiveOwnershipResourceManagerTest : public Aws::Testing::AwsCppSdkGTestSuite {};

TEST_F(ExclusiveOwnershipResourceManagerTest, TestAcquire) {
class TestResource {
public:
TestResource(int startVal) : m_value(startVal) {}

int GetValue() { return m_value; }

void DoSomething() {
std::unique_lock<std::mutex> lock(m_lock, std::defer_lock);

if (!lock.try_lock()) {
m_fail = true; // concurrent access detected
}
m_value += 1;
}

bool Fail() { return m_fail; }

protected:
int m_value = 0;
std::mutex m_lock;
bool m_fail = false;
};

ExclusiveOwnershipResourceManager<TestResource*> resourceManagerInTest;

ASSERT_FALSE(resourceManagerInTest.HasResourcesAvailable());
ASSERT_EQ(nullptr, resourceManagerInTest.TryAcquire(1l));

static const size_t RESOURCES = 6;
TestResource* resources[RESOURCES] = {};
for (int i = 0; i < (int)RESOURCES; ++i) {
resources[i] = new TestResource(i);
resourceManagerInTest.Release(resources[i]);
}

static const size_t THREADS = 8;
Aws::Vector<std::future<bool>> futures(THREADS);

auto useResourceFn = [](ExclusiveOwnershipResourceManager<TestResource*>& resourceManager) -> bool {
std::random_device rd;
std::mt19937_64 randGen(rd());
std::uniform_int_distribution<int64_t> randDist(0, 100);

for (size_t j = 0; j < 50; ++j) {
TestResource* resource = nullptr;
while (!resource) {
resource = resourceManager.TryAcquire(randDist(randGen));
}
// "do something" for rand ms
int64_t sleepFor = randDist(randGen);
std::this_thread::sleep_for(std::chrono::milliseconds(sleepFor));
const int before = resource->GetValue();
resource->DoSomething();
EXPECT_NE(before, resource->GetValue());
resourceManager.Release(resource);
}
return true;
};

for (size_t i = 0; i < THREADS; ++i) {
futures[i] = std::async([&resourceManagerInTest, &useResourceFn]() -> bool { return useResourceFn(resourceManagerInTest); });
}

for (size_t i = 0; i < THREADS; ++i) {
bool batchResult = futures[i].get();
ASSERT_TRUE(batchResult);
}

// single thread test
Aws::Vector<TestResource*> inUse;
for (int i = 0; i < (int)RESOURCES; ++i) {
TestResource* resource = resourceManagerInTest.TryAcquire(0l);
ASSERT_TRUE(resource);
const int before = resource->GetValue();
resource->DoSomething();
EXPECT_NE(before, resource->GetValue());
inUse.push_back(resource);
}
for (auto resource : inUse) {
resourceManagerInTest.Release(resource);
}

Aws::Vector<TestResource*> released = resourceManagerInTest.ShutdownAndWait(RESOURCES);
ASSERT_EQ(RESOURCES, released.size());

for (int i = 0; i < (int)RESOURCES; ++i) {
ASSERT_FALSE(resources[i]->Fail());
ASSERT_NE(i, resources[i]->GetValue());
delete resources[i];
}
}

0 comments on commit aa4382d

Please sign in to comment.