Skip to content

Commit

Permalink
Merge pull request #805 from subspace/improve-ram-usage-on-farmer
Browse files Browse the repository at this point in the history
Reopen database after full initial recommitment to decrease memory usage by ParityDB
  • Loading branch information
nazar-pc authored Sep 10, 2022
2 parents b30ca37 + 2e34218 commit 8f7c62d
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 23 deletions.
59 changes: 39 additions & 20 deletions crates/subspace-farmer/src/commitments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,9 @@ impl Commitments {
salt: None,
};
db_entry.lock().replace(Arc::new(
Db::open_or_create(&options).map_err(CommitmentError::CommitmentDb)?,
Db::open_or_create(&options)
.map(|db| (db, options))
.map_err(CommitmentError::CommitmentDb)?,
));
}

Expand Down Expand Up @@ -179,7 +181,8 @@ impl Commitments {

let db_guard = db_entry.lock();

if let Some(db) = db_guard.as_ref() {
if let Some(db_with_options) = db_guard.as_ref() {
let (db, _options) = db_with_options.as_ref();
for (tag, offset) in tags.into_iter().zip(batch_start..) {
tags_with_offset.push((tag, offset.to_le_bytes()));
}
Expand All @@ -195,6 +198,11 @@ impl Commitments {
.map_err(CommitmentError::CommitmentDb)?;

tags_with_offset.clear();

// Hack: Reopen database to free memory, see
// https://github.com/paritytech/parity-db/issues/93#issuecomment-1241812705
drop(db_guard);
db_entry.reopen()?;
}
} else {
// Database was already removed, no need to continue
Expand All @@ -207,7 +215,9 @@ impl Commitments {
if let Some(db_entry) = self.get_db_entry(salt) {
let db_guard = db_entry.lock();

if let Some(db) = db_guard.as_ref() {
if let Some(db_with_options) = db_guard.as_ref() {
let (db, _options) = db_with_options.as_ref();

tags_with_offset.sort_by(|(tag_a, _), (tag_b, _)| tag_a.cmp(tag_b));

db.commit(
Expand All @@ -216,6 +226,11 @@ impl Commitments {
.map(|(tag, offset)| (0, tag, Some(offset.to_vec()))),
)
.map_err(CommitmentError::CommitmentDb)?;

// Hack: Reopen database to free memory, see
// https://github.com/paritytech/parity-db/issues/93#issuecomment-1241812705
drop(db_guard);
db_entry.reopen()?;
}
}

Expand Down Expand Up @@ -255,13 +270,15 @@ impl Commitments {
let salt = db_entry.salt();
let db_guard = db_entry.lock();

if let Some(db) = db_guard.as_ref() {
db.commit(
pieces
.iter()
.map(|piece| (0, create_tag(piece, salt), None)),
)
.map_err(CommitmentError::CommitmentDb)?;
if let Some(db_with_options) = db_guard.as_ref() {
db_with_options
.0
.commit(
pieces
.iter()
.map(|piece| (0, create_tag(piece, salt), None)),
)
.map_err(CommitmentError::CommitmentDb)?;
}
}

Expand All @@ -285,19 +302,21 @@ impl Commitments {
let salt = db_entry.salt();
let db_guard = db_entry.lock();

if let Some(db) = db_guard.as_ref() {
if let Some(db_with_options) = db_guard.as_ref() {
let mut tags_with_offset: Vec<(Tag, PieceOffset)> = pieces_with_offsets()
.map(|(piece_offset, piece)| (create_tag(piece, salt), piece_offset))
.collect();

tags_with_offset.sort_by(|(tag_a, _), (tag_b, _)| tag_a.cmp(tag_b));

db.commit(
tags_with_offset
.into_iter()
.map(|(tag, offset)| (0, tag, Some(offset.to_le_bytes().to_vec()))),
)
.map_err(CommitmentError::CommitmentDb)?;
db_with_options
.0
.commit(
tags_with_offset
.into_iter()
.map(|(tag, offset)| (0, tag, Some(offset.to_le_bytes().to_vec()))),
)
.map_err(CommitmentError::CommitmentDb)?;
};
}

Expand Down Expand Up @@ -326,13 +345,13 @@ impl Commitments {
return Vec::new();
}
};
let db = match db_guard.as_ref() {
Some(db) => db,
let db_with_options = match db_guard.as_ref() {
Some(db_with_options) => db_with_options,
None => {
return Vec::new();
}
};
let iter = match db.iter(0) {
let iter = match db_with_options.0.iter(0) {
Ok(iter) => iter,
Err(_) => {
return Vec::new();
Expand Down
18 changes: 15 additions & 3 deletions crates/subspace-farmer/src/commitments/databases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub(super) struct CreateDbEntryResult {

pub(super) struct DbEntry {
salt: Salt,
db: Mutex<Option<Arc<Db>>>,
db: Mutex<Option<Arc<(Db, Options)>>>,
}

impl fmt::Debug for DbEntry {
Expand All @@ -30,7 +30,7 @@ impl fmt::Debug for DbEntry {
}

impl Deref for DbEntry {
type Target = Mutex<Option<Arc<Db>>>;
type Target = Mutex<Option<Arc<(Db, Options)>>>;

fn deref(&self) -> &Self::Target {
&self.db
Expand All @@ -41,6 +41,18 @@ impl DbEntry {
pub(super) fn salt(&self) -> Salt {
self.salt
}

pub(super) fn reopen(&self) -> Result<(), CommitmentError> {
let mut inner = self.db.lock();
if let Some(db_with_options) = inner.take() {
let options = db_with_options.1.clone();
drop(db_with_options);
let db = Db::open_or_create(&options).map_err(CommitmentError::CommitmentDb)?;
inner.replace(Arc::new((db, options)));
}

Ok(())
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -94,7 +106,7 @@ impl CommitmentDatabases {
*salt,
Arc::new(DbEntry {
salt: *salt,
db: Mutex::new(Some(Arc::new(db))),
db: Mutex::new(Some(Arc::new((db, options)))),
}),
);
}
Expand Down

0 comments on commit 8f7c62d

Please sign in to comment.