Skip to content

Commit a89a503

Browse files
authored
Revert previous changes to MemoryTracker (#344)
* fix: revert previous changes to MemoryTracker * fix: fix memory tracker for stream query * fix: fix memory tracker for stream query * fix: fix memory tracker for stream query
1 parent c13a5e8 commit a89a503

File tree

4 files changed

+71
-65
lines changed

4 files changed

+71
-65
lines changed

programs/local/chdb.cpp

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#include "chdb.h"
22
#include <cstddef>
3+
#include "Common/MemoryTracker.h"
34
#include "chdb-internal.h"
45
#include "LocalServer.h"
56
#include "QueryResult.h"
@@ -223,13 +224,14 @@ static std::pair<QueryResultPtr, bool> createQueryResult(DB::LocalServer * serve
223224
else if (!req.isIteration())
224225
{
225226
server->streaming_query_context = std::make_shared<DB::StreamingQueryContext>();
227+
/// TODO: support memory tracker for streaming query
228+
server->streaming_query_context->limit = total_memory_tracker.getHardLimit();
229+
total_memory_tracker.setHardLimit(0);
226230
query_result = createStreamingQueryResult(server, req);
227231
is_end = !query_result->getError().empty();
228232

229233
if (!is_end)
230234
server->streaming_query_context->streaming_result = query_result.get();
231-
else
232-
server->streaming_query_context.reset();
233235
}
234236
else
235237
{
@@ -244,7 +246,13 @@ static std::pair<QueryResultPtr, bool> createQueryResult(DB::LocalServer * serve
244246

245247
if (is_end)
246248
{
247-
server->streaming_query_context.reset();
249+
if (server->streaming_query_context)
250+
{
251+
total_memory_tracker.resetCounters();
252+
MemoryTracker::updateRSS(0);
253+
total_memory_tracker.setHardLimit(server->streaming_query_context->limit);
254+
server->streaming_query_context.reset();
255+
}
248256
#if USE_PYTHON
249257
if (auto * local_connection = static_cast<DB::LocalConnection*>(server->connection.get()))
250258
{

src/Client/ClientBase.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#include <Common/QueryFuzzer.h>
1212
#include <Common/ShellCommand.h>
1313
#include <Common/Stopwatch.h>
14+
#include "base/types.h"
1415
#include <Core/ExternalTable.h>
1516
#include <Core/Settings.h>
1617
#include <Interpreters/Context.h>
@@ -83,6 +84,7 @@ struct StreamingQueryContext
8384
ASTPtr parsed_query;
8485
void * streaming_result = nullptr;
8586
bool is_streaming_query = true;
87+
Int64 limit = 0;
8688

8789
StreamingQueryContext() = default;
8890
};

src/Common/CurrentMemoryTracker.cpp

Lines changed: 32 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -92,53 +92,50 @@ AllocationTrace CurrentMemoryTracker::allocImpl(Int64 size, bool throw_if_memory
9292

9393
void CurrentMemoryTracker::check()
9494
{
95-
// if (auto * memory_tracker = getMemoryTracker())
96-
// std::ignore = memory_tracker->allocImpl(0, true);
95+
if (auto * memory_tracker = getMemoryTracker())
96+
std::ignore = memory_tracker->allocImpl(0, true);
9797
}
9898

99-
AllocationTrace CurrentMemoryTracker::alloc(Int64)
99+
AllocationTrace CurrentMemoryTracker::alloc(Int64 size)
100100
{
101-
// bool throw_if_memory_exceeded = true;
102-
// return allocImpl(size, throw_if_memory_exceeded);
103-
return {};
101+
bool throw_if_memory_exceeded = true;
102+
return allocImpl(size, throw_if_memory_exceeded);
104103
}
105104

106-
AllocationTrace CurrentMemoryTracker::allocNoThrow(Int64)
105+
AllocationTrace CurrentMemoryTracker::allocNoThrow(Int64 size)
107106
{
108-
// bool throw_if_memory_exceeded = false;
109-
// return allocImpl(size, throw_if_memory_exceeded);
110-
return {};
107+
bool throw_if_memory_exceeded = false;
108+
return allocImpl(size, throw_if_memory_exceeded);
111109
}
112110

113-
AllocationTrace CurrentMemoryTracker::free(Int64)
111+
AllocationTrace CurrentMemoryTracker::free(Int64 size)
114112
{
115-
// if (auto * memory_tracker = getMemoryTracker())
116-
// {
117-
// if (!current_thread || MemoryTrackerBlockerInThread::isBlockedAny())
118-
// {
119-
// return memory_tracker->free(size);
120-
// }
121-
// else
122-
// {
123-
// current_thread->untracked_memory -= size;
124-
// if (current_thread->untracked_memory < -current_thread->untracked_memory_limit)
125-
// {
126-
// Int64 untracked_memory = current_thread->untracked_memory;
127-
// current_thread->untracked_memory = 0;
128-
// return memory_tracker->free(-untracked_memory);
129-
// }
130-
// }
131-
132-
// return AllocationTrace(memory_tracker->getSampleProbability(size));
133-
// }
134-
135-
// return AllocationTrace(0);
136-
return {};
113+
if (auto * memory_tracker = getMemoryTracker())
114+
{
115+
if (!current_thread || MemoryTrackerBlockerInThread::isBlockedAny())
116+
{
117+
return memory_tracker->free(size);
118+
}
119+
else
120+
{
121+
current_thread->untracked_memory -= size;
122+
if (current_thread->untracked_memory < -current_thread->untracked_memory_limit)
123+
{
124+
Int64 untracked_memory = current_thread->untracked_memory;
125+
current_thread->untracked_memory = 0;
126+
return memory_tracker->free(-untracked_memory);
127+
}
128+
}
129+
130+
return AllocationTrace(memory_tracker->getSampleProbability(size));
131+
}
132+
133+
return AllocationTrace(0);
137134
}
138135

139136
void CurrentMemoryTracker::injectFault()
140137
{
141-
// if (auto * memory_tracker = getMemoryTracker())
142-
// memory_tracker->injectFault();
138+
if (auto * memory_tracker = getMemoryTracker())
139+
memory_tracker->injectFault();
143140
}
144141

src/Common/MemoryTracker.cpp

Lines changed: 26 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -205,28 +205,27 @@ void MemoryTracker::injectFault() const
205205
/// Let's find them.
206206
void MemoryTracker::debugLogBigAllocationWithoutCheck(Int64 size [[maybe_unused]])
207207
{
208-
return;
209-
// if constexpr (MemoryTrackerDebugBlockerInThread::isEnabled())
210-
// {
211-
// if (size < 0)
212-
// return;
213-
214-
// /// The choice is arbitrary (maybe we should decrease it)
215-
// constexpr Int64 threshold = 16 * 1024 * 1024;
216-
// if (size < threshold)
217-
// return;
218-
219-
// if (MemoryTrackerDebugBlockerInThread::isBlocked())
220-
// return;
221-
222-
// MemoryTrackerBlockerInThread blocker(VariableContext::Global);
223-
// LOG_TEST(
224-
// getLogger("MemoryTracker"),
225-
// "Too big allocation ({} bytes) without checking memory limits, "
226-
// "it may lead to OOM. Stack trace: {}",
227-
// size,
228-
// StackTrace().toString());
229-
// }
208+
if constexpr (MemoryTrackerDebugBlockerInThread::isEnabled())
209+
{
210+
if (size < 0)
211+
return;
212+
213+
/// The choice is arbitrary (maybe we should decrease it)
214+
constexpr Int64 threshold = 16 * 1024 * 1024;
215+
if (size < threshold)
216+
return;
217+
218+
if (MemoryTrackerDebugBlockerInThread::isBlocked())
219+
return;
220+
221+
MemoryTrackerBlockerInThread blocker(VariableContext::Global);
222+
LOG_TEST(
223+
getLogger("MemoryTracker"),
224+
"Too big allocation ({} bytes) without checking memory limits, "
225+
"it may lead to OOM. Stack trace: {}",
226+
size,
227+
StackTrace().toString());
228+
}
230229
}
231230

232231
AllocationTrace MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded, MemoryTracker * query_tracker, double _sample_probability)
@@ -421,12 +420,12 @@ AllocationTrace MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceed
421420
return AllocationTrace(_sample_probability);
422421
}
423422

424-
void MemoryTracker::adjustWithUntrackedMemory(Int64)
423+
void MemoryTracker::adjustWithUntrackedMemory(Int64 untracked_memory)
425424
{
426-
// if (untracked_memory > 0)
427-
// std::ignore = allocImpl(untracked_memory, /*throw_if_memory_exceeded*/ false);
428-
// else
429-
// std::ignore = free(-untracked_memory);
425+
if (untracked_memory > 0)
426+
std::ignore = allocImpl(untracked_memory, /*throw_if_memory_exceeded*/ false);
427+
else
428+
std::ignore = free(-untracked_memory);
430429
}
431430

432431
bool MemoryTracker::updatePeak(Int64 will_be, bool log_memory_usage)

0 commit comments

Comments
 (0)