-
Notifications
You must be signed in to change notification settings - Fork 345
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: introduce FlownodePeerCache
#4254
Conversation
WalkthroughThe changes introduced a new module Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant MetaSrv
participant Cache
participant ClusterInfo
Client->>MetaSrv: Request Peer Info for FlownodeId
MetaSrv->>Cache: Check FlownodePeerCache
Cache-->>MetaSrv: Cache Miss
MetaSrv->>ClusterInfo: Retrieve Flownode Peer Info
ClusterInfo-->>MetaSrv: Returned Peer Info
MetaSrv->>Cache: Update FlownodePeerCache
Cache-->>MetaSrv: Cache Updated
MetaSrv-->>Client: Return Peer Info
Poem
Tip AI model upgrade
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (13)
- src/common/meta/src/cache.rs (1 hunks)
- src/common/meta/src/cache/cluster.rs (1 hunks)
- src/common/meta/src/cache/cluster/flownode.rs (1 hunks)
- src/common/meta/src/cache_invalidator.rs (1 hunks)
- src/common/meta/src/cluster.rs (7 hunks)
- src/common/meta/src/error.rs (3 hunks)
- src/common/meta/src/instruction.rs (1 hunks)
- src/frontend/src/instance/builder.rs (1 hunks)
- src/meta-client/src/client.rs (6 hunks)
- src/meta-srv/src/metasrv.rs (3 hunks)
- src/meta-srv/src/metasrv/builder.rs (2 hunks)
- src/meta-srv/src/service/heartbeat.rs (5 hunks)
- src/operator/src/insert.rs (3 hunks)
Files skipped from review due to trivial changes (1)
- src/common/meta/src/cache_invalidator.rs
Additional comments not posted (28)
src/common/meta/src/cache/cluster.rs (1)
15-17
: Introduceflownode
module and export related entities.The new
flownode
module is added and related entities are exported. This looks good and is consistent with the objective of the PR.src/common/meta/src/cache.rs (1)
21-21
: ExportFlownodePeerCache
related entities.The new
FlownodePeerCache
related entities are exported. This looks good and is consistent with the objective of the PR.src/common/meta/src/cache/cluster/flownode.rs (5)
28-29
: DefineFlownodePeerCache
type alias.The
FlownodePeerCache
type alias is defined correctly. This looks good and is consistent with the objective of the PR.
31-31
: DefineFlownodePeerCacheRef
type alias.The
FlownodePeerCacheRef
type alias is defined correctly. This looks good and is consistent with the objective of the PR.
33-42
: ConstructFlownodePeerCache
.The function constructs a
FlownodePeerCache
correctly. This looks good and is consistent with the objective of the PR.
44-56
: Create initializer forFlownodePeerCache
.The function creates an initializer for the
FlownodePeerCache
correctly. The error handling is appropriate. This looks good and is consistent with the objective of the PR.
59-73
: Defineinvalidator
andfilter
functions.The
invalidator
andfilter
functions are defined correctly. They handle cache invalidation and filtering appropriately. This looks good and is consistent with the objective of the PR.src/frontend/src/instance/builder.rs (1)
130-136
: Addflownode_peer_cache
toFrontendBuilder
.The
flownode_peer_cache
is added to theFrontendBuilder
and passed to theInserter
constructor. This looks good and is consistent with the objective of the PR.src/meta-srv/src/service/heartbeat.rs (2)
50-50
: Verify the correct usage ofcache_invalidator
.Ensure that the
cache_invalidator
is correctly cloned and used in theregister_pusher
function.
Line range hint
177-208
: LGTM! Verify the correct context and cache identifier usage.The code changes are approved.
However, ensure that the
CacheContext
andCacheIdent::FlownodeId
are correctly used.Verification successful
Verification successful: The
CacheContext
andCacheIdent::FlownodeId
are correctly defined and used in the code.
CacheContext
is defined insrc/common/meta/src/cache_invalidator.rs
.CacheIdent::FlownodeId
is part of theCacheIdent
enum insrc/common/meta/src/instruction.rs
.Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify the correct context and cache identifier usage in `register_pusher`. # Test: Search for the `CacheContext` and `CacheIdent::FlownodeId` usage. Expect: Correct initialization and usage. rg --type rust $'CacheContext' rg --type rust $'CacheIdent::FlownodeId'Length of output: 647
Script:
#!/bin/bash # Description: Verify the definitions and implementations of `CacheContext` and `CacheIdent::FlownodeId`. # Search for the definition of `CacheContext`. rg --type rust --pattern 'struct CacheContext' # Search for the definition of `CacheIdent::FlownodeId`. rg --type rust --pattern 'enum CacheIdent'Length of output: 917
Script:
#!/bin/bash # Description: Verify the definitions and implementations of `CacheContext` and `CacheIdent::FlownodeId`. # Search for the definition of `CacheContext`. rg --type rust 'struct CacheContext' # Search for the definition of `CacheIdent`. rg --type rust 'enum CacheIdent'Length of output: 130
Script:
#!/bin/bash # Description: Verify the definition of `CacheContext`. # Search for the definition of `CacheContext`. rg --type rust 'CacheContext'Length of output: 225
Script:
#!/bin/bash # Description: Verify the definition of `Context` in `common_meta::cache_invalidator`. # Search for the definition of `Context` in `common_meta::cache_invalidator`. rg --type rust 'struct Context' src/common/meta/src/cache_invalidator.rsLength of output: 95
src/common/meta/src/cluster.rs (2)
194-195
: LGTM! Verify the correct usage ofFromUtf8Snafu
.The code changes are approved.
However, ensure that the
FromUtf8Snafu
context is correctly used in theTryFrom
implementations.Also applies to: 227-228
Verification successful
Verification successful!
The
FromUtf8Snafu
context is correctly used in theTryFrom
implementations insrc/common/meta/src/cluster.rs
.
src/common/meta/src/cluster.rs
: Lines 194-195src/common/meta/src/cluster.rs
: Lines 227-228Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify the correct usage of `FromUtf8Snafu`. # Test: Search for the `FromUtf8Snafu` context usage. Expect: Correct usage in the `TryFrom` implementations. rg --type rust $'FromUtf8Snafu'Length of output: 737
51-52
: LGTM! Verify the correct implementation ofget_flownode
.The code changes are approved.
However, ensure that the
get_flownode
method is correctly implemented in all structs that implement theClusterInfo
trait.Verification successful
Verified the correct implementation of
get_flownode
.The
get_flownode
method inMetaClient
is correctly implemented, adhering to the expected logic and signature.
src/meta-client/src/client.rs
:
- Correctly implements
get_flownode
method.Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify the correct implementation of `get_flownode`. # Test: Search for the `get_flownode` method implementation. Expect: Correct implementation in all structs that implement the `ClusterInfo` trait. rg --type rust $'impl ClusterInfo'Length of output: 99
Script:
#!/bin/bash # Description: Verify the correct implementation of `get_flownode` in MetaClient. # Test: Search for the `get_flownode` method implementation within src/meta-client/src/client.rs. rg 'async fn get_flownode' src/meta-client/src/client.rsLength of output: 158
Script:
#!/bin/bash # Description: Extract the full implementation of the `get_flownode` method in MetaClient. # Test: Extract the implementation of the `get_flownode` method within src/meta-client/src/client.rs. ast-grep --lang rust --pattern $'impl ClusterInfo for MetaClient { $$$ async fn get_flownode(&self, id: FlownodeId) -> std::result::Result<Option<Peer>, BoxedError> { $$$ } $$$ }' src/meta-client/src/client.rsLength of output: 6191
src/common/meta/src/instruction.rs (1)
167-167
: LGTM! Verify the correct usage ofFlownodeId
in the enum.The code changes are approved.
However, ensure that the
FlownodeId
variant is correctly used in theCacheIdent
enum.Verification successful
LGTM! The
FlownodeId
variant is correctly used in theCacheIdent
enum.The code changes are approved.
src/meta-srv/src/service/heartbeat.rs
: Used in cache invalidation process.src/common/meta/src/cache_invalidator.rs
: Explicitly handled with no action needed.src/common/meta/src/cache/cluster/flownode.rs
: Used in cache invalidation and filter functions.Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify the correct usage of `FlownodeId` in the `CacheIdent` enum. # Test: Search for the `FlownodeId` variant usage in the `CacheIdent` enum. Expect: Correct usage in all relevant places. rg --type rust $'CacheIdent::FlownodeId'Length of output: 421
Script:
#!/bin/bash # Extract and review the relevant lines of code around each usage of `CacheIdent::FlownodeId`. # Extract lines around the usage in src/meta-srv/src/service/heartbeat.rs echo "File: src/meta-srv/src/service/heartbeat.rs" rg --context 5 'CacheIdent::FlownodeId' src/meta-srv/src/service/heartbeat.rs # Extract lines around the usage in src/common/meta/src/cache_invalidator.rs echo "File: src/common/meta/src/cache_invalidator.rs" rg --context 5 'CacheIdent::FlownodeId' src/common/meta/src/cache_invalidator.rs # Extract lines around the usage in src/common/meta/src/cache/cluster/flownode.rs echo "File: src/common/meta/src/cache/cluster/flownode.rs" rg --context 5 'CacheIdent::FlownodeId' src/common/meta/src/cache/cluster/flownode.rsLength of output: 1797
src/meta-srv/src/metasrv/builder.rs (1)
349-349
: LGTM! Verify the correct initialization and usage ofcache_invalidator
.The code changes are approved.
However, ensure that the
cache_invalidator
is correctly initialized and used in theDdlContext
.Also applies to: 413-413
src/meta-srv/src/metasrv.rs (3)
26-26
: Import Addition Approved.The import of
CacheInvalidatorRef
is necessary for the new cache invalidation functionality.
331-332
: Field Addition Approved.The addition of the
cache_invalidator
field to theMetasrv
struct is necessary for the new cache invalidation functionality.
542-544
: Method Addition Approved.The
cache_invalidator
method correctly returns a reference to thecache_invalidator
field.src/common/meta/src/error.rs (3)
639-643
: Error Variant Addition Approved.The addition of the
GetCache
error variant is necessary for handling cache-related errors.
645-650
: Error Variant Addition Approved.The addition of the
GetClusterInfo
error variant is necessary for handling cluster information retrieval errors.
737-737
: Method Update Approved.The
status_code
method correctly handles the newGetClusterInfo
variant.src/operator/src/insert.rs (4)
28-28
: Import Addition Approved.The import of
FlownodePeerCacheRef
is necessary for the new flownode peer cache functionality.
65-67
: Field Addition Approved.The addition of the
flownode_peer_cache
field to theInserter
struct is necessary for the new flownode peer cache functionality.
94-101
: Constructor Update Approved.The
Inserter::new
constructor correctly initializes the newflownode_peer_cache
field.
Line range hint
185-211
: Method Update Approved.The
do_request
method correctly implements the functionality to mirror requests to flownodes.src/meta-client/src/client.rs (4)
327-344
: Ensure Correct Error Handling.The function returns a
BoxedError
for error handling. Ensure that all potential error sources are correctly boxed and propagated.Verification successful
Ensure Correct Error Handling.
The
get_flownode
function correctly boxes and propagates all potential error sources usingmap_err(BoxedError::new)
andcontext
.
- Lines 328, 332, and 337:
map_err(BoxedError::new)
- Line 337:
context(ConvertMetaResponseSnafu)
Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify that all error sources in the `get_flownode` function are correctly boxed and propagated. # Test: Search for error handling within the function. Expect: All errors are boxed. rg --type rust 'map_err(BoxedError::new)' src/meta-client/src/client.rsLength of output: 73
Script:
#!/bin/bash # Description: Verify that all error sources in the `get_flownode` function are correctly boxed and propagated. # Test: Search for error handling within the `get_flownode` function. Expect: All errors are boxed. # Locate the `get_flownode` function and extract its content. rg -A 20 'async fn get_flownode' src/meta-client/src/client.rs # Check for error handling patterns within the function. rg 'map_err|context|?;' src/meta-client/src/client.rsLength of output: 990
Script:
#!/bin/bash # Description: Verify that all error sources in the `get_flownode` function are correctly boxed and propagated. # Test: Search for error handling within the `get_flownode` function. Expect: All errors are boxed. # Locate the `get_flownode` function and extract its content. rg -A 20 'async fn get_flownode' src/meta-client/src/client.rs # Check for error handling patterns within the function. rg 'map_err|context' src/meta-client/src/client.rsLength of output: 2436
Line range hint
264-320
: Ensure Correct Error Handling.The function now returns a
BoxedError
for error handling. Ensure that all potential error sources are correctly boxed and propagated.Verification successful
Error Handling Verified
The
list_nodes
function correctly boxes errors usingBoxedError::new
throughout its implementation.
cluster_client().map_err(BoxedError::new)?
cluster_client.get_metasrv_peers().await.map_err(BoxedError::new)?
cluster_client.range(req).await.map_err(BoxedError::new)?
NodeInfo::try_from(kv.value.as_slice()).map_err(BoxedError::new)?
Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify that all error sources in the `list_nodes` function are correctly boxed and propagated. # Test: Search for error handling within the function. Expect: All errors are boxed. rg --type rust 'map_err(BoxedError::new)' src/meta-client/src/client.rsLength of output: 73
Script:
#!/bin/bash # Description: Verify that all error sources in the `list_nodes` function are correctly boxed and propagated. # Test: Search for the entire `list_nodes` function and review all error handling instances. ast-grep --lang rust --pattern 'async fn list_nodes($_) -> $_ { $$$ }' --debug-query src/meta-client/src/client.rsLength of output: 221
Script:
#!/bin/bash # Description: Locate the `list_nodes` function and verify all error handling instances are correctly boxed. # Step 1: Locate the `list_nodes` function definition. rg 'async fn list_nodes' src/meta-client/src/client.rs # Step 2: Search for error handling within the `list_nodes` function. rg 'map_err(BoxedError::new)' src/meta-client/src/client.rs -A 50Length of output: 149
Script:
#!/bin/bash # Description: Locate all instances of `map_err` within the `list_nodes` function to ensure they are correctly boxed. # Step 1: Search for all instances of `map_err` within the `list_nodes` function. rg 'map_err' src/meta-client/src/client.rs -A 50Length of output: 5205
329-334
: Handle Potential Errors incluster_client.range
.Ensure that the call to
cluster_client.range
correctly handles potential errors and edge cases.Verification successful
Handle Potential Errors in
cluster_client.range
.The
range
method in bothsrc/meta-client/src/client.rs
andsrc/meta-client/src/client/cluster.rs
handles errors by using thewith_retry
mechanism and propagating errors with appropriate context. This ensures that potential errors and edge cases are managed properly.
src/meta-client/src/client.rs
:
- Method:
pub async fn range(&self, req: RangeRequest) -> Result<RangeResponse>
- Error Handling: Uses
self.store_client()?.range(req.into()).await?
and maps errors with.context(ConvertMetaResponseSnafu)
.
src/meta-client/src/client/cluster.rs
:
- Method:
async fn range(&self, request: RangeRequest) -> Result<RangeResponse>
- Error Handling: Uses
self.with_retry
to retry the request and handles specific errors using utility functions likeutil::is_not_leader
andutil::is_unreachable
.These implementations show proper handling of potential errors and edge cases.
Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify that `cluster_client.range` handles potential errors and edge cases. # Test: Search for error handling and edge cases in the `cluster_client.range` function. Expect: Proper error handling. rg --type rust 'cluster_client.range' src/meta-client/src/client.rsLength of output: 225
Script:
#!/bin/bash # Locate the definition of `range` method in the `cluster_client` to inspect its implementation. ast-grep --lang rust --pattern 'impl $_ { $$$ async fn range($_) -> $_ { $$$ } $$$ }'Length of output: 99
Script:
#!/bin/bash # Search for the definition of `range` method in the entire codebase to inspect its implementation. rg 'async fn range\('Length of output: 2108
Script:
#!/bin/bash # Extract the full implementations of the `range` methods in `src/meta-client/src/client.rs` and `src/meta-client/src/client/cluster.rs` to inspect error handling. ast-grep --lang rust --pattern 'impl $_ { $$$ pub async fn range(&self, $_) -> $_ { $$$ } $$$ }' src/meta-client/src/client.rs ast-grep --lang rust --pattern 'impl $_ { $$$ async fn range(&self, $_) -> $_ { $$$ } $$$ }' src/meta-client/src/client/cluster.rsLength of output: 25937
285-288
: Handle Potential Errors inget_metasrv_peers
.Ensure that the call to
get_metasrv_peers
correctly handles potential errors and edge cases.Verification successful
Handle Potential Errors in
get_metasrv_peers
.The
get_metasrv_peers
function correctly handles potential errors and edge cases through its use of thewith_retry
function, which includes retry logic and error handling mechanisms.
src/meta-client/src/client/cluster.rs
:get_metasrv_peers
andwith_retry
implementations.Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify that `get_metasrv_peers` handles potential errors and edge cases. # Test: Search for error handling and edge cases in the `get_metasrv_peers` function. Expect: Proper error handling. rg --type rust 'get_metasrv_peers' src/meta-client/src/client.rsLength of output: 101
Script:
#!/bin/bash # Description: Locate the definition of `get_metasrv_peers` and inspect its implementation for error handling and edge cases. # Step 1: Find the file containing the definition of `get_metasrv_peers`. rg --type rust 'fn get_metasrv_peers' # Step 2: Extract the implementation of `get_metasrv_peers` to analyze error handling. ast-grep --lang rust --pattern 'fn get_metasrv_peers($$_) { $$$ }'Length of output: 321
Script:
#!/bin/bash # Description: Extract the full implementation of `get_metasrv_peers` function to verify error handling and edge cases. # Extract the implementation of `get_metasrv_peers` from the specified file. ast-grep --lang rust --pattern 'async fn get_metasrv_peers(&self) -> Result<(Option<MetasrvNodeInfo>, Vec<MetasrvNodeInfo>)> { $$$ }' src/meta-client/src/client/cluster.rsLength of output: 1539
Script:
#!/bin/bash # Description: Locate and extract the implementation of `self.with_retry` to verify error handling and edge case management. # Step 1: Find the file containing the definition of `with_retry`. rg --type rust 'fn with_retry' # Step 2: Extract the implementation of `with_retry` to analyze error handling. ast-grep --lang rust --pattern 'fn with_retry($$_) { $$$ }'Length of output: 364
Script:
#!/bin/bash # Description: Extract the full implementation of `with_retry` function in `src/meta-client/src/client/cluster.rs` to verify error handling and edge cases. # Extract the implementation of `with_retry` from the specified file. ast-grep --lang rust --pattern 'async fn with_retry<T, F, R, H>(&self, task: &str, body_fn: F, get_header: H) -> Result<T> { $$$ }' src/meta-client/src/client/cluster.rsLength of output: 173
Script:
#!/bin/bash # Description: Extract the full implementation of `with_retry` function in `src/meta-client/src/client/procedure.rs` to verify error handling and edge cases. # Extract the implementation of `with_retry` from the specified file. ast-grep --lang rust --pattern 'async fn with_retry<T, F, R, H>(&self, task: &str, body_fn: F, get_header: H) -> Result<T> { $$$ }' src/meta-client/src/client/procedure.rsLength of output: 175
Script:
#!/bin/bash # Description: Use `rg` to search for the `with_retry` function definition and extract a larger context to verify error handling and edge cases. # Search for the `with_retry` function definition and extract 20 lines of context around it. rg --context=20 'async fn with_retry' src/meta-client/src/client/procedure.rs src/meta-client/src/client/cluster.rsLength of output: 6229
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #4254 +/- ##
==========================================
- Coverage 84.96% 84.66% -0.31%
==========================================
Files 1054 1055 +1
Lines 186950 187313 +363
==========================================
- Hits 158850 158591 -259
- Misses 28100 28722 +622 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (1)
- src/common/meta/src/cache/cluster/flownode.rs (1 hunks)
Files skipped from review as they are similar to previous changes (1)
- src/common/meta/src/cache/cluster/flownode.rs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (3)
- src/cache/src/lib.rs (3 hunks)
- src/cmd/src/frontend.rs (2 hunks)
- src/frontend/src/instance.rs (1 hunks)
Additional comments not posted (4)
src/cache/src/lib.rs (2)
108-118
: LGTM!The
build_flownode_peer_cache
function correctly initializes the flownode peer cache.
120-129
: LGTM!The
build_fundamental_cache_registry_with
function correctly builds the fundamental cache registry with additional caches.src/cmd/src/frontend.rs (1)
303-307
: LGTM!The modifications to the
build
function correctly include the flownode peer cache in the fundamental cache registry.src/frontend/src/instance.rs (1)
155-155
: LGTM!The modification to the
create_meta_client
function correctly enables access to cluster info.
Sorry for the late review., we cannot use cluster info for routing since it is unreliable to query peers by id from cluster info.
|
I hereby agree to the terms of the GreptimeDB CLA.
Refer to a related PR or issue link (optional)
What's changed and what's your intention?
get_flownode
method inClusterInfo
trait.FlownodePeerCache
Option<FlownodePeerCache>
toInserter
Checklist
Summary by CodeRabbit
New Features
FlownodeId
toPeer
mappings.Peer
information for Flownodes.Improvements
Refactor