Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fog-view: Limit number of user events, and tell client this happened #3151

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).

The crates in this repository do not adhere to [Semantic Versioning](https://semver.org/spec/v2.0.0.html) at this time.

## [5.0.0]

### Changed

- Fog-view now imposes a limit on how many user events it will return to the user, to ensure that we can avoid exceeding
grpc maximums. The new flag `may_have_more_user_events` is set when this limit is reached, so that clients can know to retry. ([#3151])

## [4.1.0]

### Changed
Expand Down
5 changes: 5 additions & 0 deletions fog/api/proto/view.proto
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,11 @@ message QueryResponse {
/// This can be used by the client as a hint when choosing cryptonote mixin indices.
/// This field doesn't have the same "cursor" semantics as the other fields.
uint64 last_known_block_cumulative_txo_count = 9;

/// If true, this means that due limits, we could not return all the requested
cbeck88 marked this conversation as resolved.
Show resolved Hide resolved
/// user events in one response. Clients cannot compute an accurate balance check
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the PR description you write: "I believe that this is a backwards compatible change, because old clients will ignore this flag".

But if it's true that "clients cannot compute an accurate balance check until they have received all relevant user events," then how can an (old) client that ignores this flag accurately compute a balance check?

Copy link
Contributor Author

@cbeck88 cbeck88 Feb 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question:

  • Today, the amount of user events in postgres is not so much that we can't return all the events, and so all the existing clients are fine, and will be fine with servers before and after this change.
  • After this change, we can ship clients that will be okay even when that is no longer the case, and the servers will have to do some sort of pagination.
  • The backwards compatible part is, new clients that talk to old servers, in the time period before there are too many user events, will get all the events, and the flag will default to false when they deserialize, so they will correctly decide that they got all the events.

/// until they have received all relevant user events.
bool may_have_more_user_events = 10;
}

/// A record of an Rng created by a fog ingest enclave.
Expand Down
3 changes: 3 additions & 0 deletions fog/api/tests/fog_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ fn fog_view_query_response_round_trip() {
.collect(),
last_known_block_count: rng.next_u32() as u64,
last_known_block_cumulative_txo_count: rng.next_u32() as u64,
may_have_more_user_events: true,
};
round_trip_message::<mc_fog_types::view::QueryResponse, mc_fog_api::view::QueryResponse>(
&test_val,
Expand All @@ -157,6 +158,7 @@ fn fog_view_query_response_round_trip() {
.collect(),
last_known_block_count: rng.next_u32() as u64,
last_known_block_cumulative_txo_count: rng.next_u32() as u64,
may_have_more_user_events: true,
};
round_trip_message::<mc_fog_types::view::QueryResponse, mc_fog_api::view::QueryResponse>(
&test_val,
Expand Down Expand Up @@ -187,6 +189,7 @@ fn fog_view_query_response_round_trip() {
.collect(),
last_known_block_count: rng.next_u32() as u64,
last_known_block_cumulative_txo_count: rng.next_u32() as u64,
may_have_more_user_events: true,
};
round_trip_message::<mc_fog_types::view::QueryResponse, mc_fog_api::view::QueryResponse>(
&test_val,
Expand Down
7 changes: 4 additions & 3 deletions fog/recovery_db_iface/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,14 +206,15 @@ pub trait RecoveryDb {
///
/// Arguments:
/// * start_after_event_id: The last event id the user has received.
/// * max_num_events: The maximum number of user events to return.
///
/// Returns:
/// * List of found events, and higehst event id in the database (to be used
/// as
/// start_after_event_id in the next query).
/// * List of found events, and highest event id in the database (to be used
/// as start_after_event_id in the next query).
fn search_user_events(
&self,
start_from_user_event_id: i64,
max_num_events: usize,
) -> Result<(Vec<FogUserEvent>, i64), Self::Error>;

/// Get any TxOutSearchResults corresponding to given search keys.
Expand Down
26 changes: 18 additions & 8 deletions fog/sql_recovery_db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,7 @@ impl SqlRecoveryDb {
fn search_user_events_retriable(
&self,
start_from_user_event_id: i64,
max_num_events: usize,
) -> Result<(Vec<FogUserEvent>, i64), Error> {
// Early return if start_from_user_event_id is max
if start_from_user_event_id == i64::MAX {
Expand All @@ -737,6 +738,10 @@ impl SqlRecoveryDb {
// NOTE: sql auto increment columns start from 1, so "start_from_user_event_id = 0"
// will capture everything
.filter(schema::user_events::dsl::id.gt(start_from_user_event_id))
// Limit the number of responses we can get
.limit(max_num_events as i64)
// Order by id
.order(schema::user_events::dsl::id.asc())
// Get only the fields that we need
.select((
// Fields for every event type
Expand Down Expand Up @@ -1400,9 +1405,10 @@ impl RecoveryDb for SqlRecoveryDb {
fn search_user_events(
&self,
start_from_user_event_id: i64,
max_num_events: usize,
) -> Result<(Vec<FogUserEvent>, i64), Self::Error> {
our_retry(self.get_retries(), || {
self.search_user_events_retriable(start_from_user_event_id)
self.search_user_events_retriable(start_from_user_event_id, max_num_events)
})
}

Expand Down Expand Up @@ -1633,6 +1639,8 @@ mod tests {
use mc_util_from_random::FromRandom;
use rand::{rngs::StdRng, thread_rng, SeedableRng};

const MAX_USER_EVENTS: usize = 10_000;

#[test_with_logger]
fn test_new_ingest_invocation(logger: Logger) {
let mut rng: StdRng = SeedableRng::from_seed([123u8; 32]);
Expand Down Expand Up @@ -1824,7 +1832,8 @@ mod tests {
assert_eq!(ranges[1].last_ingested_block, None);

// Ensure we do not have any decommissioning events.
let (events, next_start_from_user_event_id) = db.search_user_events(0).unwrap();
let (events, next_start_from_user_event_id) =
db.search_user_events(0, MAX_USER_EVENTS).unwrap();
assert_eq!(
events
.iter()
Expand All @@ -1851,7 +1860,7 @@ mod tests {

// We should have one decommissioning event.
let (events, next_start_from_user_event_id) = db
.search_user_events(next_start_from_user_event_id)
.search_user_events(next_start_from_user_event_id, MAX_USER_EVENTS)
.unwrap();
assert_eq!(events.len(), 1);
assert_eq!(
Expand Down Expand Up @@ -1899,7 +1908,7 @@ mod tests {

// We should have one decommissioning event and one new ingest invocation event.
let (events, _next_start_from_user_event_id) = db
.search_user_events(next_start_from_user_event_id)
.search_user_events(next_start_from_user_event_id, MAX_USER_EVENTS)
.unwrap();
assert_eq!(events.len(), 2);
assert_eq!(
Expand Down Expand Up @@ -2144,7 +2153,7 @@ mod tests {
db.report_lost_ingress_key(ingress_key2).unwrap();

// Search for events and verify the results.
let (events, _) = db.search_user_events(0).unwrap();
let (events, _) = db.search_user_events(0, MAX_USER_EVENTS).unwrap();
assert_eq!(
events,
vec![
Expand Down Expand Up @@ -2182,10 +2191,11 @@ mod tests {

// Searching with a start_from_user_id that is higher than the highest available
// one should return nothing.
let (_events, next_start_from_user_event_id) = db.search_user_events(0).unwrap();
let (_events, next_start_from_user_event_id) =
db.search_user_events(0, MAX_USER_EVENTS).unwrap();

let (events, next_start_from_user_event_id2) = db
.search_user_events(next_start_from_user_event_id)
.search_user_events(next_start_from_user_event_id, MAX_USER_EVENTS)
.unwrap();
assert_eq!(events.len(), 0);
assert_eq!(
Expand All @@ -2194,7 +2204,7 @@ mod tests {
);

let (events, next_start_from_user_event_id2) = db
.search_user_events(next_start_from_user_event_id + 1)
.search_user_events(next_start_from_user_event_id + 1, MAX_USER_EVENTS)
.unwrap();
assert_eq!(events.len(), 0);
assert_eq!(
Expand Down
44 changes: 28 additions & 16 deletions fog/test_infra/src/db_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ pub fn get_num_blocks(db: &impl RecoveryDb) -> u64 {
.unwrap_or(0)
}

const USER_EVENT_LIMIT: usize = 1000;

/// Exercise new recovery db apis and check the results
/// - Add random blocks and get tx's using new get txs API, check for NotFound
/// result with junk queries
Expand Down Expand Up @@ -49,8 +51,9 @@ pub fn recovery_db_smoke_tests_new_apis<DB: RecoveryDb>(
);

// Test that they have no rng records when the cursor value is up-to-date
let (user_events, _next_start_from_user_event_id) =
db.search_user_events(start_from_user_event_id).unwrap();
let (user_events, _next_start_from_user_event_id) = db
.search_user_events(start_from_user_event_id, USER_EVENT_LIMIT)
.unwrap();
let has_rng_events = user_events
.iter()
.any(|event| matches!(event, FogUserEvent::NewRngRecord(_)));
Expand All @@ -74,8 +77,9 @@ pub fn recovery_db_smoke_tests_new_apis<DB: RecoveryDb>(

// Test that the user can see them
{
let (user_events, next_start_from_user_event_id) =
db.search_user_events(start_from_user_event_id).unwrap();
let (user_events, next_start_from_user_event_id) = db
.search_user_events(start_from_user_event_id, USER_EVENT_LIMIT)
.unwrap();
let num_rng_events = user_events
.iter()
.filter(|event| matches!(event, FogUserEvent::NewRngRecord(_)))
Expand Down Expand Up @@ -113,8 +117,9 @@ pub fn recovery_db_smoke_tests_new_apis<DB: RecoveryDb>(
// Test that the user can still see those rng records at
// start_from_user_event_id.
{
let (user_events, next_start_from_user_event_id) =
db.search_user_events(start_from_user_event_id).unwrap();
let (user_events, next_start_from_user_event_id) = db
.search_user_events(start_from_user_event_id, USER_EVENT_LIMIT)
.unwrap();
assert_rng_record_rows_were_recovered(
&user_events[..],
&invoc_ids_with_kex_rng_pubkeys[..],
Expand All @@ -128,8 +133,9 @@ pub fn recovery_db_smoke_tests_new_apis<DB: RecoveryDb>(
// Test that the user cannot see those rng records at the updated
// start_from_user_event_id
{
let (user_events, next_start_from_user_event_id) =
db.search_user_events(start_from_user_event_id).unwrap();
let (user_events, next_start_from_user_event_id) = db
.search_user_events(start_from_user_event_id, USER_EVENT_LIMIT)
.unwrap();
assert_eq!(user_events.len(), 0);
assert_eq!(
next_start_from_user_event_id, start_from_user_event_id,
Expand All @@ -140,7 +146,8 @@ pub fn recovery_db_smoke_tests_new_apis<DB: RecoveryDb>(

// Test that if user tries full recovery (cursor = 0) they get 10 rounds worth
// of rng records
let (user_events, _next_start_from_user_event_id) = db.search_user_events(0).unwrap();
let (user_events, _next_start_from_user_event_id) =
db.search_user_events(0, USER_EVENT_LIMIT).unwrap();
let num_rng_events = user_events
.iter()
.filter(|event| matches!(event, FogUserEvent::NewRngRecord(_)))
Expand Down Expand Up @@ -238,7 +245,8 @@ pub fn recovery_db_rng_records_decommissioning<DB: RecoveryDb>(
db.new_ingress_key(&ingress_key, 0).unwrap();

// We start without any rng record events.
let (user_events, _next_start_from_user_event_id) = db.search_user_events(0).unwrap();
let (user_events, _next_start_from_user_event_id) =
db.search_user_events(0, USER_EVENT_LIMIT).unwrap();
let has_rng_events = user_events
.iter()
.any(|event| matches!(event, FogUserEvent::NewRngRecord(_)));
Expand All @@ -253,7 +261,8 @@ pub fn recovery_db_rng_records_decommissioning<DB: RecoveryDb>(
// Test that user has rng record event now
let test_rows0 = vec![kex_rng_pubkey1];

let (user_events, next_start_from_user_event_id) = db.search_user_events(0).unwrap();
let (user_events, next_start_from_user_event_id) =
db.search_user_events(0, USER_EVENT_LIMIT).unwrap();
let rng_records: Vec<RngRecord> = user_events
.iter()
.filter_map(|event| {
Expand All @@ -274,7 +283,7 @@ pub fn recovery_db_rng_records_decommissioning<DB: RecoveryDb>(

// Test that user has no new rngs after cursor update
let (user_events, _next_start_from_user_event_id) = db
.search_user_events(next_start_from_user_event_id)
.search_user_events(next_start_from_user_event_id, USER_EVENT_LIMIT)
.unwrap();
assert_eq!(user_events, vec![]);

Expand All @@ -294,7 +303,7 @@ pub fn recovery_db_rng_records_decommissioning<DB: RecoveryDb>(
let test_rows1 = vec![kex_rng_pubkey2];

let (user_events, _next_start_from_user_event_id) = db
.search_user_events(next_start_from_user_event_id)
.search_user_events(next_start_from_user_event_id, USER_EVENT_LIMIT)
.unwrap();
let rng_records: Vec<RngRecord> = user_events
.iter()
Expand All @@ -315,7 +324,8 @@ pub fn recovery_db_rng_records_decommissioning<DB: RecoveryDb>(
assert_eq!(10, rng_records[0].start_block);

// Check that if starting at 0 we see both rngs
let (user_events, _next_start_from_user_event_id) = db.search_user_events(0).unwrap();
let (user_events, _next_start_from_user_event_id) =
db.search_user_events(0, USER_EVENT_LIMIT).unwrap();
let rng_records: Vec<RngRecord> = user_events
.iter()
.filter_map(|event| {
Expand Down Expand Up @@ -393,7 +403,8 @@ pub fn recovery_db_rng_records_decommissioning<DB: RecoveryDb>(
assert_eq!(ingestable_ranges[1].last_ingested_block, None);

// Check if we can see an event for that.
let (user_events, _next_start_from_user_event_id) = db.search_user_events(0).unwrap();
let (user_events, _next_start_from_user_event_id) =
db.search_user_events(0, USER_EVENT_LIMIT).unwrap();
let decommissioned_invocs: Vec<_> = user_events
.iter()
.filter_map(|event| {
Expand Down Expand Up @@ -459,7 +470,8 @@ pub fn recovery_db_rng_records_decommissioning<DB: RecoveryDb>(
assert!(!ingestable_ranges[2].decommissioned);
assert_eq!(ingestable_ranges[2].last_ingested_block, None);

let (user_events, _next_start_from_user_event_id) = db.search_user_events(0).unwrap();
let (user_events, _next_start_from_user_event_id) =
db.search_user_events(0, USER_EVENT_LIMIT).unwrap();
let decommissioned_invocs: Vec<_> = user_events
.iter()
.filter_map(|event| {
Expand Down
8 changes: 6 additions & 2 deletions fog/test_infra/src/mock_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,11 @@ impl<R: RecoveryDb> FogViewConnection for PassThroughViewClient<R> {
start_from_block_index: u64,
search_keys: Vec<Vec<u8>>,
) -> Result<QueryResponse, Self::Error> {
let (user_events, next_start_from_user_event_id) =
self.db.search_user_events(start_from_user_event_id)?;
const USER_EVENT_LIMIT: usize = 10_000;
let (user_events, next_start_from_user_event_id) = self
.db
.search_user_events(start_from_user_event_id, USER_EVENT_LIMIT)?;
let may_have_more_user_events = user_events.len() >= USER_EVENT_LIMIT;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It appears that we're returning all the user_events in the query_response below. If there are 12_000 user_events, then we return them all to the client AND we set may_have_more_user_events to true.

However, the logic for fn search_user_events has this line: .limit(max_num_events as i64), and max_num_events == USER_EVENT_LIMIT. How then can user_events.len() ever be > than USER_EVENT_LIMIT?


let highest_known_block_count = self
.db
Expand Down Expand Up @@ -75,6 +78,7 @@ impl<R: RecoveryDb> FogViewConnection for PassThroughViewClient<R> {
tx_out_search_results: Default::default(),
last_known_block_count: highest_known_block_count,
last_known_block_cumulative_txo_count: cumulative_txo_count,
may_have_more_user_events,
};

resp.tx_out_search_results = self.db.get_tx_outs(start_from_block_index, &search_keys)?;
Expand Down
7 changes: 7 additions & 0 deletions fog/types/src/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,13 @@ pub struct QueryResponse {
/// clients sample for mixins.
#[prost(uint64, tag = "9")]
pub last_known_block_cumulative_txo_count: u64,

/// If true, this means that due limits, we could not return all the
cbeck88 marked this conversation as resolved.
Show resolved Hide resolved
/// requested user events in one response. Clients cannot compute an
/// accurate balance check until they have received all relevant user
/// events.
#[prost(bool, tag = "10")]
pub may_have_more_user_events: bool,
}

/// A record that can be used by the user to produce an Rng shared with fog
Expand Down
3 changes: 3 additions & 0 deletions fog/view/enclave/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ pub struct UntrustedQueryResponse {

/// The cumulative txo count of the last known block.
pub last_known_block_cumulative_txo_count: u64,

/// If we may have more user events than this.
cbeck88 marked this conversation as resolved.
Show resolved Hide resolved
pub may_have_more_user_events: bool,
}

/// Represents a serialized request for the view enclave to service
Expand Down
1 change: 1 addition & 0 deletions fog/view/enclave/impl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ where
last_known_block_count: untrusted_query_response.last_known_block_count,
last_known_block_cumulative_txo_count: untrusted_query_response
.last_known_block_cumulative_txo_count,
may_have_more_user_events: untrusted_query_response.may_have_more_user_events,
};

// Do the txos part, scope lock of e_tx_out_store
Expand Down
7 changes: 5 additions & 2 deletions fog/view/protocol/src/polling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub trait FogViewConnection {
let mut missed_block_ranges = Vec::<BlockRange>::new();

// Update seeds, get block count
let mut new_highest_processed_block_count = {
let mut new_highest_processed_block_count = loop {
match self
.request(
user_rng_set.get_next_start_from_user_event_id(),
Expand Down Expand Up @@ -105,7 +105,10 @@ pub trait FogViewConnection {
user_rng_set
.set_next_start_from_user_event_id(result.next_start_from_user_event_id);

result.highest_processed_block_count
if result.may_have_more_user_events {
continue;
}
break result.highest_processed_block_count;
}
}
};
Expand Down
7 changes: 7 additions & 0 deletions fog/view/server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,11 @@ pub struct MobileAcctViewConfig {
/// and should not much harm performance otherwise when loading the DB.
#[clap(long, default_value = "1000", env = "MC_BLOCK_QUERY_BATCH_SIZE")]
pub block_query_batch_size: usize,

/// How many user events to request at once when requesting user events from
/// postgres.
/// This limit affects the maximum possible size of a grpc response from the
/// server.
#[clap(long, default_value = "10000", env = "MC_MAX_USER_EVENTS")]
pub max_user_events: usize,
}
Loading