Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: archive state using BufferPool if provided #7042

Merged
merged 2 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions packages/beacon-node/src/chain/archiver/archiveStates.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import {CheckpointWithHex} from "@lodestar/fork-choice";
import {IBeaconDb} from "../../db/index.js";
import {IStateRegenerator} from "../regen/interface.js";
import {getStateSlotFromBytes} from "../../util/multifork.js";
import {serializeState} from "../serializeState.js";
import {AllocSource, BufferPool} from "../../util/bufferPool.js";

/**
* Minimum number of epochs between single temp archived states
Expand All @@ -30,7 +32,8 @@ export class StatesArchiver {
private readonly regen: IStateRegenerator,
private readonly db: IBeaconDb,
private readonly logger: Logger,
private readonly opts: StatesArchiverOpts
private readonly opts: StatesArchiverOpts,
private readonly bufferPool?: BufferPool | null
) {}

/**
Expand Down Expand Up @@ -95,8 +98,13 @@ export class StatesArchiver {
await this.db.stateArchive.putBinary(slot, finalizedStateOrBytes);
this.logger.verbose("Archived finalized state bytes", {epoch: finalized.epoch, slot, root: rootHex});
} else {
// state
await this.db.stateArchive.put(finalizedStateOrBytes.slot, finalizedStateOrBytes);
// serialize state using BufferPool if provided
await serializeState(
finalizedStateOrBytes,
AllocSource.ARCHIVE_STATE,
(stateBytes) => this.db.stateArchive.putBinary(finalizedStateOrBytes.slot, stateBytes),
this.bufferPool
);
// don't delete states before the finalized state, auto-prune will take care of it
this.logger.verbose("Archived finalized state", {
epoch: finalized.epoch,
Expand Down
2 changes: 1 addition & 1 deletion packages/beacon-node/src/chain/archiver/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ export class Archiver {
opts: ArchiverOpts
) {
this.archiveBlobEpochs = opts.archiveBlobEpochs;
this.statesArchiver = new StatesArchiver(chain.regen, db, logger, opts);
this.statesArchiver = new StatesArchiver(chain.regen, db, logger, opts, chain.bufferPool);
this.prevFinalized = chain.forkChoice.getFinalizedCheckpoint();
this.jobQueue = new JobItemQueue<[CheckpointWithHex], void>(this.processFinalizedCheckpoint, {
maxLength: PROCESS_FINALIZED_CHECKPOINT_QUEUE_LEN,
Expand Down
6 changes: 5 additions & 1 deletion packages/beacon-node/src/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ export class BeaconChain implements IBeaconChain {
readonly config: BeaconConfig;
readonly logger: Logger;
readonly metrics: Metrics | null;
readonly bufferPool: BufferPool | null;

readonly anchorStateLatestBlockSlot: Slot;

Expand Down Expand Up @@ -266,6 +267,9 @@ export class BeaconChain implements IBeaconChain {
const blockStateCache = this.opts.nHistoricalStates
? new FIFOBlockStateCache(this.opts, {metrics})
: new BlockStateCacheImpl({metrics});
this.bufferPool = this.opts.nHistoricalStates
? new BufferPool(anchorState.type.tree_serializedSize(anchorState.node), metrics)
: null;
const checkpointStateCache = this.opts.nHistoricalStates
? new PersistentCheckpointStateCache(
{
Expand All @@ -274,7 +278,7 @@ export class BeaconChain implements IBeaconChain {
clock,
shufflingCache: this.shufflingCache,
blockStateCache,
bufferPool: new BufferPool(anchorState.type.tree_serializedSize(anchorState.node), metrics),
bufferPool: this.bufferPool,
datastore: fileDataStore
? // debug option if we want to investigate any issues with the DB
new FileCPStateDatastore()
Expand Down
2 changes: 2 additions & 0 deletions packages/beacon-node/src/chain/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import {IEth1ForBlockProduction} from "../eth1/index.js";
import {IExecutionEngine, IExecutionBuilder} from "../execution/index.js";
import {Metrics} from "../metrics/metrics.js";
import {IClock} from "../util/clock.js";
import {BufferPool} from "../util/bufferPool.js";
import {ChainEventEmitter} from "./emitter.js";
import {IStateRegenerator, RegenCaller} from "./regen/index.js";
import {IBlsVerifier} from "./bls/index.js";
Expand Down Expand Up @@ -86,6 +87,7 @@ export interface IBeaconChain {
readonly config: BeaconConfig;
readonly logger: Logger;
readonly metrics: Metrics | null;
readonly bufferPool: BufferPool | null;

/** The initial slot that the chain is started with */
readonly anchorStateLatestBlockSlot: Slot;
Expand Down
33 changes: 33 additions & 0 deletions packages/beacon-node/src/chain/serializeState.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import {CachedBeaconStateAllForks} from "@lodestar/state-transition";
import {AllocSource, BufferPool} from "../util/bufferPool.js";

type ProcessStateBytesFn<T> = (stateBytes: Uint8Array) => Promise<T>;

/*
* Serialize state using the BufferPool if provided.
*/
export async function serializeState<T>(
state: CachedBeaconStateAllForks,
source: AllocSource,
processFn: ProcessStateBytesFn<T>,
bufferPool?: BufferPool | null
): Promise<T> {
const size = state.type.tree_serializedSize(state.node);
let stateBytes: Uint8Array | null = null;
if (bufferPool) {
const bufferWithKey = bufferPool.alloc(size, source);
if (bufferWithKey) {
stateBytes = bufferWithKey.buffer;
const dataView = new DataView(stateBytes.buffer, stateBytes.byteOffset, stateBytes.byteLength);
state.serializeToBytes({uint8Array: stateBytes, dataView}, 0);
}
}

if (!stateBytes) {
// we already have metrics in BufferPool so no need to do it here
stateBytes = state.serialize();
}

return processFn(stateBytes);
// release the buffer back to the pool automatically
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ import {INTERVALS_PER_SLOT} from "@lodestar/params";
import {Metrics} from "../../metrics/index.js";
import {IClock} from "../../util/clock.js";
import {ShufflingCache} from "../shufflingCache.js";
import {BufferPool, BufferWithKey} from "../../util/bufferPool.js";
import {AllocSource, BufferPool, BufferWithKey} from "../../util/bufferPool.js";
import {StateCloneOpts} from "../regen/interface.js";
import {serializeState} from "../serializeState.js";
import {MapTracker} from "./mapMetrics.js";
import {CPStateDatastore, DatastoreKey, datastoreKeyToCheckpoint} from "./datastore/index.js";
import {CheckpointHex, CacheItemType, CheckpointStateCache, BlockStateCache} from "./types.js";
Expand All @@ -29,7 +30,7 @@ type PersistentCheckpointStateCacheModules = {
shufflingCache: ShufflingCache;
datastore: CPStateDatastore;
blockStateCache: BlockStateCache;
bufferPool?: BufferPool;
bufferPool?: BufferPool | null;
};

/** checkpoint serialized as a string */
Expand Down Expand Up @@ -106,7 +107,7 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
private readonly datastore: CPStateDatastore;
private readonly shufflingCache: ShufflingCache;
private readonly blockStateCache: BlockStateCache;
private readonly bufferPool?: BufferPool;
private readonly bufferPool?: BufferPool | null;

constructor(
{
Expand Down Expand Up @@ -686,19 +687,20 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
// persist and do not update epochIndex
this.metrics?.statePersistSecFromSlot.observe(this.clock?.secFromSlot(this.clock?.currentSlot ?? 0) ?? 0);
const cpPersist = {epoch: epoch, root: fromHexString(rootHex)};
{
const timer = this.metrics?.stateSerializeDuration.startTimer();
// automatically free the buffer pool after this scope
using stateBytesWithKey = this.serializeState(state);
let stateBytes = stateBytesWithKey?.buffer;
if (stateBytes == null) {
// fallback logic to use regular way to get state ssz bytes
this.metrics?.persistedStateAllocCount.inc();
stateBytes = state.serialize();
}
timer?.();
persistedKey = await this.datastore.write(cpPersist, stateBytes);
}
// It's not sustainable to allocate ~240MB for each state every epoch, so we use buffer pool to reuse the memory.
// As monitored on holesky as of Jan 2024:
// - This does not increase heap allocation while gc time is the same
// - It helps stabilize persist time and save ~300ms in average (1.5s vs 1.2s)
// - It also helps the state reload to save ~500ms in average (4.3s vs 3.8s)
// - Also `serializeState.test.ts` perf test shows a lot of differences allocating ~240MB once vs per state serialization
const timer = this.metrics?.stateSerializeDuration.startTimer();
persistedKey = await serializeState(
state,
AllocSource.PERSISTENT_CHECKPOINTS_CACHE_STATE,
(stateBytes) => this.datastore.write(cpPersist, stateBytes),
this.bufferPool
);
timer?.();
persistCount++;
this.logger.verbose("Pruned checkpoint state from memory and persisted to disk", {
...logMeta,
Expand Down Expand Up @@ -755,29 +757,6 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
});
}

/*
* It's not sustainable to allocate ~240MB for each state every epoch, so we use buffer pool to reuse the memory.
* As monitored on holesky as of Jan 2024:
* - This does not increase heap allocation while gc time is the same
* - It helps stabilize persist time and save ~300ms in average (1.5s vs 1.2s)
* - It also helps the state reload to save ~500ms in average (4.3s vs 3.8s)
* - Also `serializeState.test.ts` perf test shows a lot of differences allocating ~240MB once vs per state serialization
*/
private serializeState(state: CachedBeaconStateAllForks): BufferWithKey | null {
const size = state.type.tree_serializedSize(state.node);
if (this.bufferPool) {
const bufferWithKey = this.bufferPool.alloc(size);
if (bufferWithKey) {
const stateBytes = bufferWithKey.buffer;
const dataView = new DataView(stateBytes.buffer, stateBytes.byteOffset, stateBytes.byteLength);
state.serializeToBytes({uint8Array: stateBytes, dataView}, 0);
return bufferWithKey;
}
}

return null;
}

/**
* Serialize validators to bytes leveraging the buffer pool to save memory allocation.
* - As monitored on holesky as of Jan 2024, it helps save ~500ms state reload time (4.3s vs 3.8s)
Expand All @@ -788,7 +767,7 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
const type = state.type.fields.validators;
const size = type.tree_serializedSize(state.validators.node);
if (this.bufferPool) {
const bufferWithKey = this.bufferPool.alloc(size);
const bufferWithKey = this.bufferPool.alloc(size, AllocSource.PERSISTENT_CHECKPOINTS_CACHE_VALIDATORS);
if (bufferWithKey) {
const validatorsBytes = bufferWithKey.buffer;
const dataView = new DataView(validatorsBytes.buffer, validatorsBytes.byteOffset, validatorsBytes.byteLength);
Expand Down
11 changes: 5 additions & 6 deletions packages/beacon-node/src/metrics/metrics/lodestar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {LodestarMetadata} from "../options.js";
import {RegistryMetricCreator} from "../utils/registryMetricCreator.js";
import {OpSource} from "../validatorMonitor.js";
import {CacheItemType} from "../../chain/stateCache/types.js";
import {AllocSource} from "../../util/bufferPool.js";

export type LodestarMetrics = ReturnType<typeof createLodestarMetrics>;

Expand Down Expand Up @@ -1150,13 +1151,15 @@ export function createLodestarMetrics(
name: "lodestar_buffer_pool_length",
help: "Buffer pool length",
}),
hits: register.counter({
hits: register.counter<{source: AllocSource}>({
name: "lodestar_buffer_pool_hits_total",
help: "Total number of buffer pool hits",
labelNames: ["source"],
}),
misses: register.counter({
misses: register.counter<{source: AllocSource}>({
name: "lodestar_buffer_pool_misses_total",
help: "Total number of buffer pool misses",
labelNames: ["source"],
}),
grows: register.counter({
name: "lodestar_buffer_pool_grows_total",
Expand Down Expand Up @@ -1251,10 +1254,6 @@ export function createLodestarMetrics(
name: "lodestar_cp_state_cache_persisted_state_remove_count",
help: "Total number of persisted states removed",
}),
persistedStateAllocCount: register.counter({
name: "lodestar_cp_state_cache_persisted_state_alloc_count",
help: "Total number time to allocate memory for persisted state",
}),
},

balancesCache: {
Expand Down
20 changes: 13 additions & 7 deletions packages/beacon-node/src/util/bufferPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ import {Metrics} from "../metrics/metrics.js";
*/
const GROW_RATIO = 1.1;

export enum AllocSource {
PERSISTENT_CHECKPOINTS_CACHE_VALIDATORS = "persistent_checkpoints_cache_validators",
PERSISTENT_CHECKPOINTS_CACHE_STATE = "persistent_checkpoints_cache_state",
ARCHIVE_STATE = "archive_state",
}

/**
* A simple implementation to manage a single buffer.
* This is initially used for state serialization at every epoch and for state reload.
Expand Down Expand Up @@ -36,24 +42,24 @@ export class BufferPool {
* If the buffer is already in use, return null.
* Grow the buffer if the requested size is larger than the current buffer.
*/
alloc(size: number): BufferWithKey | null {
return this.doAlloc(size, false);
alloc(size: number, source: AllocSource): BufferWithKey | null {
return this.doAlloc(size, source, false);
}

/**
* Same to alloc() but the buffer is not zeroed.
*/
allocUnsafe(size: number): BufferWithKey | null {
return this.doAlloc(size, true);
allocUnsafe(size: number, source: AllocSource): BufferWithKey | null {
return this.doAlloc(size, source, true);
}

private doAlloc(size: number, isUnsafe = false): BufferWithKey | null {
private doAlloc(size: number, source: AllocSource, isUnsafe = false): BufferWithKey | null {
if (this.inUse) {
this.metrics?.misses.inc();
this.metrics?.misses.inc({source});
return null;
}
this.inUse = true;
this.metrics?.hits.inc();
this.metrics?.hits.inc({source});
this.currentKey += 1;
if (size > this.buffer.length) {
this.metrics?.grows.inc();
Expand Down
10 changes: 5 additions & 5 deletions packages/beacon-node/test/unit/util/bufferPool.test.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import {describe, it, expect} from "vitest";
import {BufferPool} from "../../../src/util/bufferPool.js";
import {AllocSource, BufferPool} from "../../../src/util/bufferPool.js";

describe("BufferPool", () => {
const pool = new BufferPool(100);

it("should increase length", () => {
expect(pool.length).toEqual(110);
using mem = pool.alloc(200);
using mem = pool.alloc(200, AllocSource.PERSISTENT_CHECKPOINTS_CACHE_STATE);
if (mem === null) {
throw Error("Expected non-null mem");
}
Expand All @@ -15,15 +15,15 @@ describe("BufferPool", () => {

it("should not allow alloc if in use", () => {
{
using mem = pool.alloc(20);
using mem = pool.alloc(20, AllocSource.PERSISTENT_CHECKPOINTS_CACHE_STATE);
if (mem === null) {
throw Error("Expected non-null mem");
}
// in the same scope we can't allocate again
expect(pool.alloc(20)).toEqual(null);
expect(pool.alloc(20, AllocSource.PERSISTENT_CHECKPOINTS_CACHE_STATE)).toEqual(null);
}

// out of the scope we can allocate again
expect(pool.alloc(20)).not.toEqual(null);
expect(pool.alloc(20, AllocSource.PERSISTENT_CHECKPOINTS_CACHE_STATE)).not.toEqual(null);
});
});
Loading