Skip to content

Commit

Permalink
Reopen database after full initial recommitment to decrease memory us…
Browse files Browse the repository at this point in the history
…age by ParityDB (see paritytech/parity-db#93 (comment))
  • Loading branch information
nazar-pc committed Sep 10, 2022
1 parent b30ca37 commit 2e34218
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 23 deletions.
59 changes: 39 additions & 20 deletions crates/subspace-farmer/src/commitments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,9 @@ impl Commitments {
salt: None,
};
db_entry.lock().replace(Arc::new(
Db::open_or_create(&options).map_err(CommitmentError::CommitmentDb)?,
Db::open_or_create(&options)
.map(|db| (db, options))
.map_err(CommitmentError::CommitmentDb)?,
));
}

Expand Down Expand Up @@ -179,7 +181,8 @@ impl Commitments {

let db_guard = db_entry.lock();

if let Some(db) = db_guard.as_ref() {
if let Some(db_with_options) = db_guard.as_ref() {
let (db, _options) = db_with_options.as_ref();
for (tag, offset) in tags.into_iter().zip(batch_start..) {
tags_with_offset.push((tag, offset.to_le_bytes()));
}
Expand All @@ -195,6 +198,11 @@ impl Commitments {
.map_err(CommitmentError::CommitmentDb)?;

tags_with_offset.clear();

// Hack: Reopen database to free memory, see
// https://github.com/paritytech/parity-db/issues/93#issuecomment-1241812705
drop(db_guard);
db_entry.reopen()?;
}
} else {
// Database was already removed, no need to continue
Expand All @@ -207,7 +215,9 @@ impl Commitments {
if let Some(db_entry) = self.get_db_entry(salt) {
let db_guard = db_entry.lock();

if let Some(db) = db_guard.as_ref() {
if let Some(db_with_options) = db_guard.as_ref() {
let (db, _options) = db_with_options.as_ref();

tags_with_offset.sort_by(|(tag_a, _), (tag_b, _)| tag_a.cmp(tag_b));

db.commit(
Expand All @@ -216,6 +226,11 @@ impl Commitments {
.map(|(tag, offset)| (0, tag, Some(offset.to_vec()))),
)
.map_err(CommitmentError::CommitmentDb)?;

// Hack: Reopen database to free memory, see
// https://github.com/paritytech/parity-db/issues/93#issuecomment-1241812705
drop(db_guard);
db_entry.reopen()?;
}
}

Expand Down Expand Up @@ -255,13 +270,15 @@ impl Commitments {
let salt = db_entry.salt();
let db_guard = db_entry.lock();

if let Some(db) = db_guard.as_ref() {
db.commit(
pieces
.iter()
.map(|piece| (0, create_tag(piece, salt), None)),
)
.map_err(CommitmentError::CommitmentDb)?;
if let Some(db_with_options) = db_guard.as_ref() {
db_with_options
.0
.commit(
pieces
.iter()
.map(|piece| (0, create_tag(piece, salt), None)),
)
.map_err(CommitmentError::CommitmentDb)?;
}
}

Expand All @@ -285,19 +302,21 @@ impl Commitments {
let salt = db_entry.salt();
let db_guard = db_entry.lock();

if let Some(db) = db_guard.as_ref() {
if let Some(db_with_options) = db_guard.as_ref() {
let mut tags_with_offset: Vec<(Tag, PieceOffset)> = pieces_with_offsets()
.map(|(piece_offset, piece)| (create_tag(piece, salt), piece_offset))
.collect();

tags_with_offset.sort_by(|(tag_a, _), (tag_b, _)| tag_a.cmp(tag_b));

db.commit(
tags_with_offset
.into_iter()
.map(|(tag, offset)| (0, tag, Some(offset.to_le_bytes().to_vec()))),
)
.map_err(CommitmentError::CommitmentDb)?;
db_with_options
.0
.commit(
tags_with_offset
.into_iter()
.map(|(tag, offset)| (0, tag, Some(offset.to_le_bytes().to_vec()))),
)
.map_err(CommitmentError::CommitmentDb)?;
};
}

Expand Down Expand Up @@ -326,13 +345,13 @@ impl Commitments {
return Vec::new();
}
};
let db = match db_guard.as_ref() {
Some(db) => db,
let db_with_options = match db_guard.as_ref() {
Some(db_with_options) => db_with_options,
None => {
return Vec::new();
}
};
let iter = match db.iter(0) {
let iter = match db_with_options.0.iter(0) {
Ok(iter) => iter,
Err(_) => {
return Vec::new();
Expand Down
18 changes: 15 additions & 3 deletions crates/subspace-farmer/src/commitments/databases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub(super) struct CreateDbEntryResult {

pub(super) struct DbEntry {
salt: Salt,
db: Mutex<Option<Arc<Db>>>,
db: Mutex<Option<Arc<(Db, Options)>>>,
}

impl fmt::Debug for DbEntry {
Expand All @@ -30,7 +30,7 @@ impl fmt::Debug for DbEntry {
}

impl Deref for DbEntry {
type Target = Mutex<Option<Arc<Db>>>;
type Target = Mutex<Option<Arc<(Db, Options)>>>;

fn deref(&self) -> &Self::Target {
&self.db
Expand All @@ -41,6 +41,18 @@ impl DbEntry {
pub(super) fn salt(&self) -> Salt {
self.salt
}

pub(super) fn reopen(&self) -> Result<(), CommitmentError> {
let mut inner = self.db.lock();
if let Some(db_with_options) = inner.take() {
let options = db_with_options.1.clone();
drop(db_with_options);
let db = Db::open_or_create(&options).map_err(CommitmentError::CommitmentDb)?;
inner.replace(Arc::new((db, options)));
}

Ok(())
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -94,7 +106,7 @@ impl CommitmentDatabases {
*salt,
Arc::new(DbEntry {
salt: *salt,
db: Mutex::new(Some(Arc::new(db))),
db: Mutex::new(Some(Arc::new((db, options)))),
}),
);
}
Expand Down

0 comments on commit 2e34218

Please sign in to comment.