Skip to content

Commit

Permalink
Implement GDAL RFC 101 (#185)
Browse files Browse the repository at this point in the history
* support opening in threadsafe mode

* fix the exception throw

* do not lock threadsafe datasets

* accept all error messages

* add documentation

* add sync reading to the testing mix
  • Loading branch information
mmomtchev authored Dec 4, 2024
1 parent 4c78966 commit 38def27
Show file tree
Hide file tree
Showing 9 changed files with 139 additions and 20 deletions.
12 changes: 12 additions & 0 deletions ASYNCIO.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,15 @@ SQL layers present a unique challenge when implementing asynchronous bindings -

Alas, there are no simple solutions for this issue. `gdal-async`prints a warning to stderr when this happens.

## RFC101 thread-safe datasets with GDAL >= 3.10

GDAL 3.10 introduces a major performance improvement when accessing raster datasets in read-only *threadsafe* mode.

If a raster datasets is opened in threadsafe mode, by specifying the `t` flag:

```js
const ds = gdal.open(`${__dirname}/data/sample.tif`, 'rt')
assert(ds.threadSafe)
```

then all asynchronous operations can run in parallel and can be freely mixed with synchronous operations without ever blocking the event loop.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- GDAL 3.10.0
- Node.js 23 support
- Implement RFC101 support, see [`ASYNCIO.md`](https://github.com/mmomtchev/node-gdal-async/blob/main/ASYNCIO.md) for more information

### Removed
- Drop Node.js 16 support
Expand Down
13 changes: 7 additions & 6 deletions src/async.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,15 @@ class AsyncGuard {
locks = make_shared<vector<AsyncLock>>(object_store.lockDatasets(uids));
}
inline AsyncGuard(vector<long> uids, bool warning) : lock(nullptr), locks(nullptr) {
bool locked = true;
if (uids.size() == 1) {
if (uids[0] == 0) return;
lock = warning ? object_store.tryLockDataset(uids[0]) : object_store.lockDataset(uids[0]);
if (lock == nullptr) { MEASURE_EXECUTION_TIME(eventLoopWarning, lock = object_store.lockDataset(uids[0])); }
lock = warning ? object_store.tryLockDataset(uids[0], locked) : object_store.lockDataset(uids[0]);
if (!locked) { MEASURE_EXECUTION_TIME(eventLoopWarning, lock = object_store.lockDataset(uids[0])); }
} else {
locks = warning ? make_shared<vector<AsyncLock>>(object_store.tryLockDatasets(uids))
locks = warning ? make_shared<vector<AsyncLock>>(object_store.tryLockDatasets(uids, locked))
: make_shared<vector<AsyncLock>>(object_store.lockDatasets(uids));
if (locks->size() == 0) {
if (!locked) {
MEASURE_EXECUTION_TIME(
eventLoopWarning, locks = make_shared<vector<AsyncLock>>(object_store.lockDatasets(uids)));
}
Expand Down Expand Up @@ -392,9 +393,9 @@ template <class GDALType> class GDALAsyncableJob {
GDALRValFunc rval;
Nan::Callback *progress;

GDALAsyncableJob(long ds_uid) : main(), rval(), progress(nullptr), persistent(), ds_uids({ds_uid}), autoIndex(0){};
GDALAsyncableJob(long ds_uid) : main(), rval(), progress(nullptr), persistent(), ds_uids({ds_uid}), autoIndex(0) {};
GDALAsyncableJob(std::vector<long> ds_uids)
: main(), rval(), progress(nullptr), persistent(), ds_uids(ds_uids), autoIndex(0){};
: main(), rval(), progress(nullptr), persistent(), ds_uids(ds_uids), autoIndex(0) {};

inline void persist(const std::string &key, const v8::Local<v8::Object> &obj) {
persistent[key] = obj;
Expand Down
25 changes: 25 additions & 0 deletions src/gdal_dataset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ void Dataset::Initialize(Local<Object> target) {
ATTR(lcons, "layers", layersGetter, READ_ONLY_SETTER);
ATTR_ASYNCABLE(lcons, "rasterSize", rasterSizeGetter, READ_ONLY_SETTER);
ATTR(lcons, "driver", driverGetter, READ_ONLY_SETTER);
ATTR(lcons, "threadSafe", threadSafeGetter, READ_ONLY_SETTER);
ATTR(lcons, "root", rootGetter, READ_ONLY_SETTER);
ATTR_ASYNCABLE(lcons, "srs", srsGetter, srsSetter);
ATTR_ASYNCABLE(lcons, "geoTransform", geoTransformGetter, geoTransformSetter);
Expand Down Expand Up @@ -926,6 +927,30 @@ NAN_GETTER(Dataset::driverGetter) {
if (raw->GetDriver() != nullptr) { info.GetReturnValue().Set(Driver::New(raw->GetDriver())); }
}

/**
* @readonly
* @kind member
* @name threadSafe
* @instance
* @memberof Dataset
* @type {boolean}
*/
NAN_GETTER(Dataset::threadSafeGetter) {
#if GDAL_VERSION_MAJOR > 3 || (GDAL_VERSION_MAJOR == 3 && GDAL_VERSION_MINOR >= 10)
Dataset *ds = Nan::ObjectWrap::Unwrap<Dataset>(info.This());

if (!ds->isAlive()) {
Nan::ThrowError("Dataset object has already been destroyed");
return;
}

GDALDataset *raw = ds->get();
if (raw->GetDriver() != nullptr) { info.GetReturnValue().Set(Nan::New<Boolean>(raw->IsThreadSafe(GDAL_OF_RASTER))); }
#else
info.GetReturnValue().Set(Nan::New<Boolean>(false));
#endif
}

NAN_SETTER(Dataset::srsSetter) {
Dataset *ds = Nan::ObjectWrap::Unwrap<Dataset>(info.This());

Expand Down
1 change: 1 addition & 0 deletions src/gdal_dataset.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class Dataset : public Nan::ObjectWrap {
GDAL_ASYNCABLE_GETTER_DECLARE(rasterSizeGetter);
GDAL_ASYNCABLE_GETTER_DECLARE(srsGetter);
static NAN_GETTER(driverGetter);
static NAN_GETTER(threadSafeGetter);
GDAL_ASYNCABLE_GETTER_DECLARE(geoTransformGetter);
static NAN_GETTER(descriptionGetter);
static NAN_GETTER(layersGetter);
Expand Down
13 changes: 11 additions & 2 deletions src/node_gdal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,12 +214,21 @@ GDAL_ASYNCABLE_DEFINE(gdal_open) {
} else {
flags |= GDAL_OF_READONLY;
}
#if GDAL_VERSION_MAJOR > 3 || (GDAL_VERSION_MAJOR == 3 && GDAL_VERSION_MINOR >= 1)
} else if (mode[i] == 'm') {
#if GDAL_VERSION_MAJOR > 3 || (GDAL_VERSION_MAJOR == 3 && GDAL_VERSION_MINOR >= 1)
flags |= GDAL_OF_MULTIDIM_RASTER;
#else
Nan::ThrowError("Multidimensional support requires GDAL 3.1");
#endif
} else if (mode[i] == 't') {
#if GDAL_VERSION_MAJOR > 3 || (GDAL_VERSION_MAJOR == 3 && GDAL_VERSION_MINOR >= 10)
flags |= GDAL_OF_THREAD_SAFE | GDAL_OF_RASTER;
#else
Nan::ThrowError("Thread-safe read-only reading requires GDAL 3.10");
return;
#endif
} else {
Nan::ThrowError("Invalid open mode. Must contain only \"r\" or \"r+\" and \"m\" ");
Nan::ThrowError("Invalid open mode. Must contain only \"r\" or \"r+\" and \"m\" or \"t\" ");
return;
}
}
Expand Down
44 changes: 35 additions & 9 deletions src/utils/ptr_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ AsyncLock ObjectStore::lockDataset(long uid) {
while (true) {
auto parent = uidMap<GDALDataset *>.find(uid);
if (parent == uidMap<GDALDataset *>.end()) { throw "Parent Dataset object has already been destroyed"; }
#if GDAL_VERSION_MAJOR > 3 || (GDAL_VERSION_MAJOR == 3 && GDAL_VERSION_MINOR >= 10)
// Do not lock thread-safe datasets
if (parent->second->ptr->IsThreadSafe(GDAL_OF_RASTER)) { return nullptr; }
#endif
int r = uv_sem_trywait(parent->second->async_lock.get());
if (r == 0) { return parent->second->async_lock; }
uv_cond_wait(&master_sleep, &master_lock);
Expand All @@ -148,8 +152,9 @@ vector<AsyncLock> ObjectStore::lockDatasets(vector<long> uids) {
uv_scoped_mutex lock(&master_lock);
while (true) {
try {
vector<AsyncLock> locks = _tryLockDatasets(uids);
if (locks.size() > 0) { return locks; }
bool locked;
vector<AsyncLock> locks = _tryLockDatasets(uids, locked);
if (locked) { return locks; }
} catch (const char *msg) { throw msg; }
uv_cond_wait(&master_sleep, &master_lock);
}
Expand All @@ -158,21 +163,38 @@ vector<AsyncLock> ObjectStore::lockDatasets(vector<long> uids) {
/*
* Acquire the lock only if it is free, do not block.
*/
AsyncLock ObjectStore::tryLockDataset(long uid) {
if (uid == 0) return nullptr;
AsyncLock ObjectStore::tryLockDataset(long uid, bool &result) {
if (uid == 0) {
result = true;
return nullptr;
}
uv_scoped_mutex lock(&master_lock);
auto parent = uidMap<GDALDataset *>.find(uid);
if (parent == uidMap<GDALDataset *>.end()) { throw "Parent Dataset object has already been destroyed"; }
#if GDAL_VERSION_MAJOR > 3 || (GDAL_VERSION_MAJOR == 3 && GDAL_VERSION_MINOR >= 10)
if (parent->second->ptr->IsThreadSafe(GDAL_OF_RASTER)) {
result = true;
return nullptr;
}
#endif
int r = uv_sem_trywait(parent->second->async_lock.get());
if (r == 0) return parent->second->async_lock;
if (r == 0) {
result = true;
return parent->second->async_lock;
}
result = false;
return nullptr;
}

vector<AsyncLock> ObjectStore::_tryLockDatasets(vector<long> uids) {
vector<AsyncLock> ObjectStore::_tryLockDatasets(vector<long> uids, bool &result) {
vector<AsyncLock> locks;
for (long uid : uids) {
auto parent = uidMap<GDALDataset *>.find(uid);
if (parent == uidMap<GDALDataset *>.end()) { throw "Parent Dataset object has already been destroyed"; }
#if GDAL_VERSION_MAJOR > 3 || (GDAL_VERSION_MAJOR == 3 && GDAL_VERSION_MINOR >= 10)
// Skip locking thread-safe datasets
if (parent->second->ptr->IsThreadSafe(GDAL_OF_RASTER)) continue;
#endif
locks.push_back(parent->second->async_lock);
}
vector<AsyncLock> locked;
Expand All @@ -189,19 +211,23 @@ vector<AsyncLock> ObjectStore::_tryLockDatasets(vector<long> uids) {
break;
}
}
if (r == 0) return locks;
if (r == 0) {
result = true;
return locks;
}
result = false;
return {};
}

/*
* Try to acquire several locks avoiding deadlocks without blocking.
*/
vector<AsyncLock> ObjectStore::tryLockDatasets(vector<long> uids) {
vector<AsyncLock> ObjectStore::tryLockDatasets(vector<long> uids, bool &result) {
// There is lots of copying around here but these vectors are never longer than 3 elements
sortUnique(uids);
if (uids.size() == 0) return {};
uv_scoped_mutex lock(&master_lock);
return _tryLockDatasets(uids);
return _tryLockDatasets(uids, result);
}

// The basic unit of the ObjectStore is the ObjectStoreItem<GDALPTR>
Expand Down
6 changes: 3 additions & 3 deletions src/utils/ptr_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ class ObjectStore {
}
AsyncLock lockDataset(long uid);
vector<AsyncLock> lockDatasets(vector<long> uids);
AsyncLock tryLockDataset(long uid);
vector<AsyncLock> tryLockDatasets(vector<long> uids);
AsyncLock tryLockDataset(long uid, bool &result);
vector<AsyncLock> tryLockDatasets(vector<long> uids, bool &result);

template <typename GDALPTR> bool has(GDALPTR ptr);
template <typename GDALPTR> Local<Object> get(GDALPTR ptr);
Expand All @@ -96,7 +96,7 @@ class ObjectStore {
long uid;
uv_mutex_t master_lock;
uv_cond_t master_sleep;
vector<AsyncLock> _tryLockDatasets(vector<long> uids);
vector<AsyncLock> _tryLockDatasets(vector<long> uids, bool &result);
template <typename GDALPTR> void dispose(shared_ptr<ObjectStoreItem<GDALPTR>> item, bool manual);
void do_dispose(long uid, bool manual = false);
};
Expand Down
44 changes: 44 additions & 0 deletions test/api_rasterband-async.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,50 @@ describe('gdal.RasterBandAsync', () => {
})
})
})
describe('threadSafe', () => {
it('should support opening in threadsafe mode with GDAL >= 3.10', () => {
const ds = gdal.open(`${__dirname}/data/sample.tif`, 'r')
assert.isFalse(ds.threadSafe)
if (semver.gte(gdal.version, '3.10.0')) {
const ds = gdal.open(`${__dirname}/data/sample.tif`, 'rt')
assert.isTrue(ds.threadSafe)
} else {
assert.throws(() => {
gdal.open(`${__dirname}/data/sample.tif`, 'rt')
}, /requires GDAL 3.10/)
}
})
it('should disallow opening in threadsafe writing mode with GDAL >= 3.10', () => {
const file = `/vsimem/write_threadSafe_test.${String(
Math.random()
).substring(2)}.tmp.tif`
assert.throws(() => {
gdal.open(file, 'r+t', 'GTiff', 64, 64, 1, gdal.GDT_Byte)
})
})
it('should support reading in threadsafe mode with GDAL >= 3.10', function () {
if (semver.gte(gdal.version, '3.10.0')) {
const ds = gdal.open(`${__dirname}/data/sample.tif`, 'rt')
assert.isTrue(ds.threadSafe)
const data: Promise<gdal.TypedArray>[] = []
for (let x = 0; x < 5; x++) {
for (let y = 0; y < 5; y++) {
const array = ds.bands.getAsync(1).then((b) => b.pixels.readAsync(x * 10, y * 10, 10, 10))
data.push(array)
// Synchronously accessing datasets on which async operations are running
// should produce a console warning about blocking the event loop
// if the dataset is actually locked
assert.isNumber(ds.rasterSize.x)
assert.instanceOf(ds.bands.get(1).pixels.read(x * 10, y * 10, 10, 10), Uint8Array)
}
}
assert.lengthOf(data, 25)
return Promise.all(data)
.then((arrays) => arrays.forEach((a) => void assert.instanceOf(a, Uint8Array)))
}
this.skip()
})
})
describe('flushAsync()', () => {
it('should flush the written data', () => {
const file = `/vsimem/write_flushAsync_test.${String(
Expand Down

0 comments on commit 38def27

Please sign in to comment.