Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stats query framework #2210

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open

Stats query framework #2210

wants to merge 7 commits into from

Conversation

phoebusm
Copy link
Collaborator

@phoebusm phoebusm commented Feb 27, 2025

Reference Issues/PRs

https://man312219.monday.com/boards/7852509418/pulses/8297768017

What does this implement or fix?

  • Add basic C++ framework to stat query
  • Add stat to list_symbols on s3 to demostrate the framework

Any other comments?

For python layer, the change is minimum, only serve the purpose of outputing something that can be verified in the test
The missing functions will be handled in later PRs.

Design

It is designed that one row will only have one piece of non-groupable data in it
Columns that being grouped are ["arcticdb_call", "stage", "key_type", "storage_op", "parallelized"]


1. Original data output by C++, just a simple vector of vector of string, in the form of [key, value, key, value, ...]:
[{'arcticdb_call': 'list_streams', 'count': '1', 'key_type': 'l', 'stage': 'list', 'storage_op': 'ListObjectsV2'}, 
{'arcticdb_call': 'list_streams', 'key_type': 'l', 'stage': 'list', 'storage_op': 'ListObjectsV2', 'time': '47'}, 
{'arcticdb_call': 'list_streams', 'count': '1', 'key_type': 'r', 'stage': 'list', 'storage_op': 'ListObjectsV2'}, 
{'arcticdb_call': 'list_streams', 'key_type': 'r', 'stage': 'list', 'storage_op': 'ListObjectsV2', 'time': '43'}, 
{'arcticdb_call': 'list_streams', 'count': '0', 'key_type': 'l', 'stage': 'list', 'storage_op': 'ListObjectsV2'}, 
{'arcticdb_call': 'list_streams', 'key_type': 'l', 'stage': 'list', 'storage_op': 'ListObjectsV2', 'time': '24'}, 
{'arcticdb_call': 'list_streams', 'stage': 'list', 'time': '630'}, {'arcticdb_call': 'list_streams', 'time': '630'}]


2.  Load it to pandas:
  arcticdb_call count key_type stage     storage_op time
0  list_streams     1        l  list  ListObjectsV2  NaN
1  list_streams   NaN        l  list  ListObjectsV2   47
2  list_streams     1        r  list  ListObjectsV2  NaN
3  list_streams   NaN        r  list  ListObjectsV2   43
4  list_streams     0        l  list  ListObjectsV2  NaN
5  list_streams   NaN        l  list  ListObjectsV2   24
6  list_streams   NaN      NaN  list            NaN  630
7  list_streams   NaN      NaN   NaN            NaN  630


3. Add parallelized columns and change the placeholder to _NA_
  arcticdb_call  count key_type   stage     storage_op time parallelized
0  list_streams    1.0        l    list  ListObjectsV2  NaN        False
1  list_streams    NaN        l    list  ListObjectsV2   47        False
2  list_streams    1.0        r    list  ListObjectsV2  NaN        False
3  list_streams    NaN        r    list  ListObjectsV2   43        False
4  list_streams    0.0        l    list  ListObjectsV2  NaN        False
5  list_streams    NaN        l    list  ListObjectsV2   24        False
6  list_streams    NaN   __NA__    list         __NA__  630       __NA__
7  list_streams    NaN   __NA__  __NA__         __NA__  630       __NA__


4. Use pandas::group_by()  to group by rows by specifying columns to group by are ["arcticdb_call", "stage", "key_type", "storage_op", "parallelized"] to merge rows in the same group into 1 row

name: ('list_streams', '__NA__', '__NA__', '__NA__', '__NA__') # Overall time for the entire API call
group:   arcticdb_call  count key_type   stage storage_op time parallelized
7  list_streams    NaN   __NA__  __NA__     __NA__  630       __NA__

name: ('list_streams', 'list', '__NA__', '__NA__', '__NA__') # Overall time used in this stage; There is only one stage here
group:   arcticdb_call  count key_type stage storage_op time parallelized
6  list_streams    NaN   __NA__  list     __NA__  630       __NA__

name: ('list_streams', 'list', 'l', 'ListObjectsV2', 'False') # S3 SDK API calls for listing symbol list keys. Non-grouped columns are count and time here. One s3 API call will always log one entry that has count (Row 0 and 4) and one entry that has time (Row 1 and 5). It is designed that one row will only have one piece of non-group data in it. So a simple sum of non-groupable data will do the job to merge all the rows into one
group:   arcticdb_call  count key_type stage     storage_op time parallelized
0  list_streams    1.0        l  list  ListObjectsV2  NaN        False
1  list_streams    NaN        l  list  ListObjectsV2   47        False
4  list_streams    0.0        l  list  ListObjectsV2  NaN        False
5  list_streams    NaN        l  list  ListObjectsV2   24        False

name: ('list_streams', 'list', 'r', 'ListObjectsV2', 'False') # S3 SDK API calls for listing vref keys. Similar merge as above. Row 2 has non-groupable data count only. Row 3 has non-groupable data time only
group:   arcticdb_call  count key_type stage     storage_op time parallelized
2  list_streams    1.0        r  list  ListObjectsV2  NaN        False
3  list_streams    NaN        r  list  ListObjectsV2   43        False


5. After merging the rows, add time bucket as well:
  arcticdb_call stage key_type     storage_op parallelized  time_count_630  count  time_count_40  time_count_20
0  list_streams  None     None           None         None             1.0    NaN            NaN            NaN
1  list_streams  list     None           None         None             1.0    NaN            NaN            NaN
2  list_streams  list        l  ListObjectsV2        False             NaN    1.0            1.0            1.0
3  list_streams  list        r  ListObjectsV2        False             NaN    1.0            1.0            NaN


6. Reorder the column and replace NaN in time bucket to 0:
  arcticdb_call stage key_type     storage_op parallelized count  time_count_20  time_count_40  time_count_630
0  list_streams  None     None           None         None  None              0              0               1
1  list_streams  list     None           None         None  None              0              0               1
2  list_streams  list        l  ListObjectsV2        False     1              1              1               0
3  list_streams  list        r  ListObjectsV2        False     1              0              1               0

Checklist

Checklist for code changes...
  • Have you updated the relevant docstrings, documentation and copyright notice?
  • Is this contribution tested against all ArcticDB's features?
  • Do all exceptions introduced raise appropriate error messages?
  • Are API changes highlighted in the PR description?
  • Is the PR labelled as enhancement or bug so it appears in autogenerated release notes?

@phoebusm phoebusm marked this pull request as draft February 27, 2025 16:34
Copy link

github-actions bot commented Feb 27, 2025

Label error. Requires exactly 1 of: patch, minor, major. Found: enhancement

@phoebusm phoebusm added the enhancement New feature or request label Feb 28, 2025
def __sub__(self, other):
return self._populate_stats(other._create_time)

def _populate_stats(self, other_time):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Boilerplate code to beautify the output for now
Pending changes and improvement in later PRs

@phoebusm phoebusm marked this pull request as ready for review February 28, 2025 14:41
Copy link
Collaborator

@poodlewars poodlewars left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice job figuring out how to pass this stuff through Folly. Can I suggest merging a PR to start with that just introduces the custom Folly executors (with a suite of tests to show that the stats calculation works with both our task based APIs and normal Folly::.via) and then we can figure out the other discussions after.

It would have been helpful if your PR description had explained your design.


// The first overload function will call the second one in folly. Have to override both as they are overloading
// Called by the submitter when submitted to a executor
void add(folly::Func func) override {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite follow why we need this kind of no-op override?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a C++ syntax. To override a parent function which is overloaded, it is needed to override all

@@ -174,11 +175,73 @@ inline auto get_default_num_cpus([[maybe_unused]] const std::string& cgroup_fold
* 3/ Priority: How to assign priorities to task in order to treat the most pressing first.
* 4/ Throttling: (similar to priority) how to absorb work spikes and apply memory backpressure
*/

class CustomIOThreadPoolExecutor : public folly::IOThreadPoolExecutor{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a use-case for the CRTP rather than one copy of the code for IO and one for CPU.

The name is a bit weird, CustomIOThreadPoolExecutor could apply to any subclass of IOThreadPoolExecutor regardless of its purpose. StatsContextIOThreadPoolExecutor?

class IOSchedulerType : public folly::FutureExecutor<CustomIOThreadPoolExecutor>{
public:
template<typename... Args>
IOSchedulerType(Args&&... args) : folly::FutureExecutor<CustomIOThreadPoolExecutor>(std::forward<Args>(args)...){}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CRTP for this too I think

@@ -194,17 +257,27 @@ class TaskScheduler {
auto task = std::forward<decltype(t)>(t);
static_assert(std::is_base_of_v<BaseTask, std::decay_t<Task>>, "Only supports Task derived from BaseTask");
ARCTICDB_DEBUG(log::schedule(), "{} Submitting CPU task {}: {} of {}", uintptr_t(this), typeid(task).name(), cpu_exec_.getTaskQueueSize(), cpu_exec_.kDefaultMaxQueueSize);
// Executor::Add will be called before below function
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this here too? Don't your custom executors handle this for us regardless of whether futures are scheduled with normal Folly APIs or our own task-based wrappers?

@@ -194,17 +257,27 @@ class TaskScheduler {
auto task = std::forward<decltype(t)>(t);
static_assert(std::is_base_of_v<BaseTask, std::decay_t<Task>>, "Only supports Task derived from BaseTask");
ARCTICDB_DEBUG(log::schedule(), "{} Submitting CPU task {}: {} of {}", uintptr_t(this), typeid(task).name(), cpu_exec_.getTaskQueueSize(), cpu_exec_.kDefaultMaxQueueSize);
// Executor::Add will be called before below function
auto task_with_stat_query_wrap = [parent_instance = util::stats_query::StatsInstance::instance(), task = std::move(task)]() mutable{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand what task_with_stat_query_wrap is supposed to mean


}

#define GROUPABLE_STAT_NAME(x) stats_query_info##x
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand how I'm meant to use these APIs. A C++ unit test suite would help. How does the grouping work? Am I able to specify a grouping on a composite like, increment the counter for objects of this key type seen during this storage operation during this arcticdb operation?

stats = query_stats_tools_end - query_stats_tools_start
"""
Expected output; time values are not deterministic
arcticdb_call stage key_type storage_op parallelized count time_count_20 time_count_510
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does time_count_{20,510} mean?

Can be done later, let's have human readable key types in the output (like TABLE_DATA).

"""
Expected output; time values are not deterministic
arcticdb_call stage key_type storage_op parallelized count time_count_20 time_count_510
0 list_streams None None None None None 0 1
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

list_streams isn't a Python API method so won't mean anything to the user. How has it ended up in this output?

query_stats_tools_end = StatsQueryTool()
stats = query_stats_tools_end - query_stats_tools_start
"""
Expected output; time values are not deterministic
Copy link
Collaborator

@poodlewars poodlewars Mar 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we might need to rethink the idea of the stats output being a dataframe. It seems hard to answer the most important questions like "how long did I spend running list_symbols in total", "how much of that was compaction" with the dataframe API. How would you get that information from the proposed dataframe output? We could always have utilities to transform some strongly typed stats output to a dataframe for the subset of measurements where that makes sense (eg these breakdowns of storage operations).

Also there are things like the dataframe API forces all the operations to share the same histogram buckets which probably isn't suitable

Copy link
Collaborator

@poodlewars poodlewars Mar 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see why it has this, but this design has a kind of "no schema" approach to the stats, the schema is generated
dynamically based on the macro invocations. I think it may be better to have a defined schema for the stats. Just like Prometheus metric names get defined up front. I think the APIs to maintain the stats should be more like the Prometheus APIs to modify metrics.

I think your design is more similar to the APIs used by tracing libraries where you can add a hook anywhere you like, but this is quite different because we have to aggregate the metrics together.

This would add an extra chore when adding a new stat, but I think would make the whole thing clearer to people who don't use these APIs all the time (people may add new stats a couple of times a year so won't be familiar with this framework).

@poodlewars
Copy link
Collaborator

How are you calculating the histogram buckets?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants