Skip to content

Move persist into async part of the sweeper #3819

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 30 additions & 23 deletions lightning/src/util/sweep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ where
output_spender: O, change_destination_source: D, kv_store: K, logger: L,
) -> Self {
let outputs = Vec::new();
let sweeper_state = Mutex::new(SweeperState { outputs, best_block });
let sweeper_state = Mutex::new(SweeperState { outputs, best_block, dirty: false });
Self {
sweeper_state,
pending_sweep: AtomicBool::new(false),
Expand Down Expand Up @@ -446,7 +446,10 @@ where
}
self.persist_state(&*state_lock).map_err(|e| {
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
})
})?;
state_lock.dirty = false;

Ok(())
}

/// Returns a list of the currently tracked spendable outputs.
Expand Down Expand Up @@ -503,11 +506,19 @@ where

// See if there is anything to sweep before requesting a change address.
{
let sweeper_state = self.sweeper_state.lock().unwrap();
let mut sweeper_state = self.sweeper_state.lock().unwrap();

let cur_height = sweeper_state.best_block.height;
let has_respends = sweeper_state.outputs.iter().any(|o| filter_fn(o, cur_height));
if !has_respends {
// If there is nothing to sweep, we still persist the state if it is dirty.
if sweeper_state.dirty {
self.persist_state(&sweeper_state).map_err(|e| {
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
})?;
sweeper_state.dirty = false;
}

return Ok(());
}
}
Expand All @@ -531,7 +542,8 @@ where
.collect();

if respend_descriptors.is_empty() {
// It could be that a tx confirmed and there is now nothing to sweep anymore.
// It could be that a tx confirmed and there is now nothing to sweep anymore. If there is dirty state,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we now persist in the if !has_respends, why aren't we persisting here, if dirty is set? Seems a bit weird to take a different approach in each case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought this is an edge case. Normally the state shouldn't have changed in the brief period where we fetched a change address. For the async kv store follow up, the persist call gets more complicated. So I thought, let's just wait for the next round.

If you think it is better to also persist here, I can make that change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it's an edge case we hopefully never see, but I'd err on keeping it aligned everywhere, which would also keep the door open for future refactorings/DRYing code up more easily without rethinking through the behavior changes. Let's address any Async KVStore concerns in that PR then.

// we'll persist it in the next cycle.
return Ok(());
}

Expand Down Expand Up @@ -563,6 +575,7 @@ where
self.persist_state(&sweeper_state).map_err(|e| {
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
})?;
sweeper_state.dirty = false;

self.broadcaster.broadcast_transactions(&[&spending_tx]);
}
Expand All @@ -588,6 +601,8 @@ where
}
true
});

sweeper_state.dirty = true;
}

fn persist_state(&self, sweeper_state: &SweeperState) -> Result<(), io::Error> {
Expand Down Expand Up @@ -641,13 +656,17 @@ where
}
}
}

sweeper_state.dirty = true;
}

fn best_block_updated_internal(
&self, sweeper_state: &mut SweeperState, header: &Header, height: u32,
) {
sweeper_state.best_block = BestBlock::new(header.block_hash(), height);
self.prune_confirmed_outputs(sweeper_state);

sweeper_state.dirty = true;
}
}

Expand All @@ -671,12 +690,8 @@ where
assert_eq!(state_lock.best_block.height, height - 1,
"Blocks must be connected in chain-order - the connected block height must be one greater than the previous height");

self.transactions_confirmed_internal(&mut *state_lock, header, txdata, height);
self.best_block_updated_internal(&mut *state_lock, header, height);

let _ = self.persist_state(&*state_lock).map_err(|e| {
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
});
self.transactions_confirmed_internal(&mut state_lock, header, txdata, height);
self.best_block_updated_internal(&mut state_lock, header, height);
}

fn block_disconnected(&self, header: &Header, height: u32) {
Expand All @@ -698,9 +713,7 @@ where
}
}

self.persist_state(&*state_lock).unwrap_or_else(|e| {
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
});
state_lock.dirty = true;
}
}

Expand All @@ -720,9 +733,6 @@ where
) {
let mut state_lock = self.sweeper_state.lock().unwrap();
self.transactions_confirmed_internal(&mut *state_lock, header, txdata, height);
self.persist_state(&*state_lock).unwrap_or_else(|e| {
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
});
}

fn transaction_unconfirmed(&self, txid: &Txid) {
Expand All @@ -743,18 +753,13 @@ where
.filter(|o| o.status.confirmation_height() >= Some(unconf_height))
.for_each(|o| o.status.unconfirmed());

self.persist_state(&*state_lock).unwrap_or_else(|e| {
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
});
state_lock.dirty = true;
}
}

fn best_block_updated(&self, header: &Header, height: u32) {
let mut state_lock = self.sweeper_state.lock().unwrap();
self.best_block_updated_internal(&mut *state_lock, header, height);
let _ = self.persist_state(&*state_lock).map_err(|e| {
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
});
self.best_block_updated_internal(&mut state_lock, header, height);
}

fn get_relevant_txids(&self) -> Vec<(Txid, u32, Option<BlockHash>)> {
Expand Down Expand Up @@ -783,11 +788,13 @@ where
struct SweeperState {
outputs: Vec<TrackedSpendableOutput>,
best_block: BestBlock,
dirty: bool,
}

impl_writeable_tlv_based!(SweeperState, {
(0, outputs, required_vec),
(2, best_block, required),
(_unused, dirty, (static_value, false)),
});

/// A `enum` signalling to the [`OutputSweeper`] that it should delay spending an output until a
Expand Down
Loading