Skip to content

Commit

Permalink
Merge pull request #6808 from jeniawhite/evgb-5.9Backport
Browse files Browse the repository at this point in the history
5.9 Backport to buffer pool and config.js changes
  • Loading branch information
nimrod-becker authored Nov 11, 2021
2 parents 98c7769 + 3d110ff commit cf74fd9
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 35 deletions.
2 changes: 1 addition & 1 deletion config.js
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,7 @@ function load_config_env_overrides() {
config[conf_name] = true;
} else if (val === 'false') {
console.log(`Overriding config.js from ENV with ${conf_name}=false (bool)`);
config[conf_name] = true;
config[conf_name] = false;
} else {
throw new Error(`${val} should be true|false`);
}
Expand Down
36 changes: 23 additions & 13 deletions src/sdk/namespace_fs.js
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ class NamespaceFS {

// allocate or reuse buffer
const remain_size = Math.max(0, end - pos);
const { buffer, callback } = await buffers_pool.get_buffer(remain_size);
const { buffer, callback } = await buffers_pool.get_buffer();
buffer_pool_cleanup = callback;

// read from file
Expand All @@ -535,7 +535,11 @@ class NamespaceFS {
count = 0;

const bytesRead = await file.read(fs_account_config, buffer, 0, read_size, pos);
if (!bytesRead) break;
if (!bytesRead) {
buffer_pool_cleanup = null;
callback();
break;
}
const data = buffer.slice(0, bytesRead);

// update stats
Expand Down Expand Up @@ -633,6 +637,8 @@ class NamespaceFS {
fs_xattr = await this._copy_stream(source_file_path, upload_path, fs_account_config, fs_xattr);
}
} else {
// TODO: Take up only as much as we need (requires fine-tune of the semaphore inside the _upload_stream)
// Currently we are taking config.NSFS_BUF_SIZE for any sized upload (1KB upload will take a full buffer from semaphore)
fs_xattr = await buffers_pool_sem.surround_count(
config.NSFS_BUF_SIZE,
async () => this._upload_stream(params.source_stream, upload_path, fs_account_config, object_sdk.rpc_client, fs_xattr)
Expand Down Expand Up @@ -685,10 +691,14 @@ class NamespaceFS {
//Reading the source_file and writing into the target_file
let read_pos = 0;
for (;;) {
const { buffer, callback } = await buffers_pool.get_buffer(config.NSFS_BUF_SIZE);
const { buffer, callback } = await buffers_pool.get_buffer();
buffer_pool_cleanup = callback;
const bytesRead = await source_file.read(fs_account_config, buffer, 0, config.NSFS_BUF_SIZE, read_pos);
if (!bytesRead) break;
if (!bytesRead) {
buffer_pool_cleanup = null;
callback();
break;
}
read_pos += bytesRead;
const data = buffer.slice(0, bytesRead);
if (MD5Async) await MD5Async.update(data);
Expand Down Expand Up @@ -883,10 +893,14 @@ class NamespaceFS {
}
let read_pos = 0;
for (;;) {
const { buffer, callback } = await buffers_pool.get_buffer(config.NSFS_BUF_SIZE);
const { buffer, callback } = await buffers_pool.get_buffer();
buffer_pool_cleanup = callback;
const bytesRead = await read_file.read(fs_account_config, buffer, 0, config.NSFS_BUF_SIZE, read_pos);
if (!bytesRead) break;
if (!bytesRead) {
buffer_pool_cleanup = null;
callback();
break;
}
read_pos += bytesRead;
const data = buffer.slice(0, bytesRead);
await write_file.write(fs_account_config, data);
Expand All @@ -904,13 +918,9 @@ class NamespaceFS {
);
const { xattr } = JSON.parse(create_params_buffer);
let fs_xattr = to_fs_xattr(xattr);
if (MD5Async) {
fs_xattr = this._assign_md5_to_fs_xattr(((await MD5Async.digest()).toString('hex')) + '-' + multiparts.length, fs_xattr);
}
if (fs_xattr) {
await write_file.setxattr(fs_account_config, fs_xattr);
}
await write_file.fsync(fs_account_config);
if (MD5Async) fs_xattr = this._assign_md5_to_fs_xattr(((await MD5Async.digest()).toString('hex')) + '-' + multiparts.length, fs_xattr);
if (fs_xattr) await write_file.setxattr(fs_account_config, fs_xattr);
if (config.NSFS_TRIGGER_FSYNC) await write_file.fsync(fs_account_config);
const stat = await write_file.stat(fs_account_config);
await write_file.close(fs_account_config);
write_file = null;
Expand Down
1 change: 1 addition & 0 deletions src/test/unit_tests/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ require('./test_kmeans');
require('./test_sensitive_wrapper');
// require('./test_debug_module');
require('./test_range_stream');
require('./test_buffer_pool');

// // STORES
require('./test_md_store');
Expand Down
47 changes: 47 additions & 0 deletions src/test/unit_tests/test_buffer_pool.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/* Copyright (C) 2016 NooBaa */
'use strict';
const mocha = require('mocha');
const assert = require('assert');
const buffer_utils = require('../../util/buffer_utils');
const Semaphore = require('../../util/semaphore');

mocha.describe('Test buffers pool', function() {

mocha.it('Work parallel with buf_size and respect semaphore', async function() {
const SEM_LIMIT = 64 * 1024 * 1024;
const BUF_LIMIT = 8 * 1024 * 1024;
const MAX_POOL_ALLOWED = SEM_LIMIT / BUF_LIMIT;
const SLEEP_BEFORE_RELEASE = 264;
const buffers_pool_sem = new Semaphore(SEM_LIMIT);
const buffers_pool = new buffer_utils.BuffersPool({
buf_size: BUF_LIMIT,
sem: buffers_pool_sem,
warning_timeout: false
});
// Allocate the buffers in a lazy fashion and verify
const lazy_fill = new Array(MAX_POOL_ALLOWED).fill(0);
const from_pool_fill = new Array(MAX_POOL_ALLOWED * 2).fill(0);
const lazy_buffer_allocation = lazy_fill.map(async () => {
const { buffer, callback } = await buffers_pool.get_buffer();
await new Promise((resolve, reject) => setTimeout(resolve, SLEEP_BEFORE_RELEASE));
console.log('Lazy allocation', buffer.length, buffers_pool.buffers.length, buffers_pool.sem.value);
callback();
});
const lazy_buffers = await Promise.all(lazy_buffer_allocation);
assert(lazy_buffers.length === MAX_POOL_ALLOWED, 'Allocated more buffers than requested');
assert(buffers_pool.buffers.length === MAX_POOL_ALLOWED, 'Buffer pool allocated more than semaphore allows');
assert(buffers_pool.sem.value === SEM_LIMIT, 'Smepahore did not deallocate after buffers release');
// Re-use buffer pool and verify that we do not allocate new buffers and respect the semaphore
const from_pool_allocation = from_pool_fill.map(async () => {
const { buffer, callback } = await buffers_pool.get_buffer();
await new Promise((resolve, reject) => setTimeout(resolve, SLEEP_BEFORE_RELEASE));
console.log('From pool allocations', buffer.length, buffers_pool.buffers.length, buffers_pool.sem.value);
callback();
});
const from_pool_buffers = await Promise.all(from_pool_allocation);
assert(from_pool_buffers.length === MAX_POOL_ALLOWED * 2, 'Allocated more buffers than requested');
assert(buffers_pool.buffers.length === MAX_POOL_ALLOWED, 'Buffer pool allocated more than semaphore allows');
assert(buffers_pool.sem.value === SEM_LIMIT, 'Smepahore did not deallocate after buffers release');
});

});
29 changes: 8 additions & 21 deletions src/util/buffer_utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -184,29 +184,22 @@ class BuffersPool {
}

/**
* @param {number} len
* @returns {Promise<{
* buffer: Buffer,
* callback: () => void,
* }>}
*/
async get_buffer(len) {
async get_buffer() {
dbg.log1('BufferPool.get_buffer', this);
let buffer = null;
let should_release = 0;
let should_pool = false;
let warning_timer;
if (len < this.buf_size / 4) {
await this.sem.wait(len);
should_release = len;
buffer = Buffer.allocUnsafe(len);
} else if (this.buffers.length) {
// Lazy allocation of buffers pool, first cycle will take up buffers
// Will cause semaphore to be empty with actual buffers allocated and waiting to be used
// Any buffer that is in usage (allocated from this.buffers) will be accounted in the semaphore
await this.sem.wait(this.buf_size);
if (this.buffers.length) {
buffer = this.buffers.shift();
should_pool = true;
} else {
await this.sem.wait(this.buf_size);
should_release = this.buf_size;
should_pool = true;
buffer = Buffer.allocUnsafeSlow(this.buf_size);
}
if (this.warning_timeout) {
Expand All @@ -218,14 +211,8 @@ class BuffersPool {
}
const callback = () => {
if (warning_timer) clearTimeout(warning_timer);
if (should_release) {
this.sem.release(should_release);
should_release = 0;
}
if (should_pool) {
this.buffers.push(buffer);
should_pool = false;
}
this.buffers.push(buffer);
this.sem.release(this.buf_size);
};
return { buffer, callback };
}
Expand Down

0 comments on commit cf74fd9

Please sign in to comment.