Skip to content

Commit

Permalink
Start one background worker on first DuckDB query (#544)
Browse files Browse the repository at this point in the history
Up until now, we were starting the background worker as part of the
extension.

The problem with this approach is that it imposes a unique worker, and
thus cannot updated multiple database.

In preparation to have one BGW per database, this PR triggers the start
of a background worker when `IsExtensionRegistered` is called and the
cache is populated.

We've prevented potential race conditions that would cause two bgw to
start by checking a lock on a temporary file.

Note: a [following PR](#545)
will actually start one BGW per database

---------

Co-authored-by: Jelte Fennema-Nio <[email protected]>
  • Loading branch information
Y-- and JelteF authored Mar 10, 2025
1 parent a3b0bc3 commit 1454c14
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 17 deletions.
3 changes: 2 additions & 1 deletion include/pgduckdb/pgduckdb_background_worker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

namespace pgduckdb {

void InitBackgroundWorker(void);
void InitBackgroundWorkersShmem(void);
void StartBackgroundWorkerIfNeeded(void);
void TriggerActivity(void);

extern bool is_background_worker;
Expand Down
2 changes: 1 addition & 1 deletion src/pgduckdb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ _PG_init(void) {
DuckdbInitGUC();
DuckdbInitHooks();
DuckdbInitNode();
pgduckdb::InitBackgroundWorker();
pgduckdb::InitBackgroundWorkersShmem();
pgduckdb::RegisterDuckdbXactCallback();
}
} // extern "C"
Expand Down
121 changes: 106 additions & 15 deletions src/pgduckdb_background_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
#include "pgduckdb/utility/cpp_wrapper.hpp"
#include <string>
#include <unordered_map>
#include <sys/file.h>
#include <fcntl.h>

extern "C" {
#include "postgres.h"
Expand All @@ -25,6 +27,7 @@ extern "C" {
#include "miscadmin.h"
#include "pgstat.h"
#include "executor/spi.h"
#include "common/file_utils.h"
#include "postmaster/bgworker.h"
#include "postmaster/interrupt.h"
#include "storage/ipc.h"
Expand Down Expand Up @@ -91,20 +94,28 @@ BackgroundWorkerCheck(duckdb::Connection *connection, int64 *last_activity_count
pgduckdb::SyncMotherDuckCatalogsWithPg_Cpp(false, connection->context.get());
}

bool CanTakeBgwLockForDatabase(Oid database_oid);

} // namespace pgduckdb

extern "C" {

PGDLLEXPORT void
pgduckdb_background_worker_main(Datum /* main_arg */) {
elog(LOG, "started pg_duckdb background worker");
if (!pgduckdb::CanTakeBgwLockForDatabase(0)) {
elog(LOG, "pg_duckdb background worker: could not take lock for database '%u'. Will exit.", 0);
return;
}
// Set up a signal handler for SIGTERM
pqsignal(SIGTERM, die);
BackgroundWorkerUnblockSignals();

BackgroundWorkerInitializeConnection(duckdb_motherduck_postgres_database, NULL, 0);

SpinLockAcquire(&pgduckdb::BgwShmemStruct->lock);
pgduckdb::BgwShmemStruct->bgw_latch = MyLatch;
int64 last_activity_count = pgduckdb::BgwShmemStruct->activity_count;
int64 last_activity_count = pgduckdb::BgwShmemStruct->activity_count - 1; // force a check on the first iteration
SpinLockRelease(&pgduckdb::BgwShmemStruct->lock);

pgduckdb::doing_motherduck_sync = true;
Expand Down Expand Up @@ -220,14 +231,101 @@ ShmemStartup(void) {
*/
MemSet(BgwShmemStruct, 0, size);
SpinLockInit(&BgwShmemStruct->lock);
BgwShmemStruct->bgw_latch = nullptr;
}

LWLockRelease(AddinShmemInitLock);
}

constexpr const char *PGDUCKDB_SYNC_WORKER_NAME = "pg_duckdb sync worker";

bool
HasBgwRunningForMyDatabase() {
const auto num_backends = pgstat_fetch_stat_numbackends();
for (int backend_idx = 1; backend_idx <= num_backends; ++backend_idx) {
#if PG_VERSION_NUM >= 140000 && PG_VERSION_NUM < 160000
PgBackendStatus *beentry = pgstat_fetch_stat_beentry(backend_idx);
#else
LocalPgBackendStatus *local_beentry = pgstat_get_local_beentry_by_index(backend_idx);
PgBackendStatus *beentry = &local_beentry->backendStatus;
#endif
if (beentry->st_databaseid == InvalidOid) {
continue; // backend is not connected to a database
}

auto datid = ObjectIdGetDatum(beentry->st_databaseid);
if (datid != MyDatabaseId) {
continue; // backend is connected to a different database
}

auto backend_type = GetBackgroundWorkerTypeByPid(beentry->st_procpid);
if (!backend_type || strcmp(backend_type, PGDUCKDB_SYNC_WORKER_NAME) != 0) {
continue; // backend is not a pg_duckdb sync worker
}

return true;
}

return false;
}

/*
Attempts to take a lock on a file named 'pgduckdb_worker_<database_oid>.lock'
If the lock is taken, the function returns true. If the lock is not taken, the function returns false.
*/
bool
CanTakeBgwLockForDatabase(Oid database_oid) {
char lock_file_name[MAXPGPATH];
snprintf(lock_file_name, MAXPGPATH, "%s/%s.pgduckdb_worker.%d", DataDir, PG_TEMP_FILE_PREFIX, database_oid);

auto fd = open(lock_file_name, O_RDWR | O_CREAT, S_IRUSR | S_IWUSR);
if (fd < 0) {
auto err = strerror(errno);
elog(ERROR, "Could not open file '%s': %s", lock_file_name, err);
}

// Take exclusive lock on the file
auto ret = flock(fd, LOCK_EX | LOCK_NB);
if (ret == EWOULDBLOCK || ret == EAGAIN) {
return false;
}

if (ret != 0) {
auto err = strerror(errno);
elog(ERROR, "Could not take lock on file '%s': %s", lock_file_name, err);
}

return true;
}

void
InitBackgroundWorker(void) {
InitBackgroundWorkersShmem(void) {
/* Set up the shared memory hooks */
#if PG_VERSION_NUM >= 150000
prev_shmem_request_hook = shmem_request_hook;
shmem_request_hook = ShmemRequest;
#else
ShmemRequest();
#endif

prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = ShmemStartup;
}

/*
Will start the background worker if:
- MotherDuck is enabled (TODO: should be database-specific)
- it is not already running for the current PG database
*/
void
StartBackgroundWorkerIfNeeded(void) {
if (!pgduckdb::IsMotherDuckEnabledAnywhere()) {
elog(DEBUG3, "pg_duckdb background worker not started because MotherDuck is not enabled");
return;
}

if (HasBgwRunningForMyDatabase()) {
elog(DEBUG3, "pg_duckdb background worker already running for database %u", MyDatabaseId);
return;
}

Expand All @@ -238,22 +336,12 @@ InitBackgroundWorker(void) {
worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
snprintf(worker.bgw_library_name, BGW_MAXLEN, "pg_duckdb");
snprintf(worker.bgw_function_name, BGW_MAXLEN, "pgduckdb_background_worker_main");
snprintf(worker.bgw_name, BGW_MAXLEN, "pg_duckdb sync worker");
snprintf(worker.bgw_name, BGW_MAXLEN, PGDUCKDB_SYNC_WORKER_NAME);
worker.bgw_restart_time = 1;
worker.bgw_main_arg = (Datum)0;

// Register the worker
RegisterBackgroundWorker(&worker);

/* Set up the shared memory hooks */
#if PG_VERSION_NUM >= 150000
prev_shmem_request_hook = shmem_request_hook;
shmem_request_hook = ShmemRequest;
#else
ShmemRequest();
#endif
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = ShmemStartup;
RegisterDynamicBackgroundWorker(&worker, NULL);
}

void
Expand All @@ -265,7 +353,9 @@ TriggerActivity(void) {
SpinLockAcquire(&BgwShmemStruct->lock);
BgwShmemStruct->activity_count++;
/* Force wake up the background worker */
SetLatch(BgwShmemStruct->bgw_latch);
if (BgwShmemStruct->bgw_latch) {
SetLatch(BgwShmemStruct->bgw_latch);
}
SpinLockRelease(&BgwShmemStruct->lock);
}

Expand Down Expand Up @@ -721,6 +811,7 @@ SyncMotherDuckCatalogsWithPg_Cpp(bool drop_with_cascade, duckdb::ClientContext *
if (current_motherduck_catalog_version) {
pfree(current_motherduck_catalog_version);
}

current_motherduck_catalog_version = pstrdup(catalog_version.c_str());
MemoryContextSwitchTo(old_context);

Expand Down
6 changes: 6 additions & 0 deletions src/pgduckdb_metadata_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ extern "C" {
#include "pgduckdb/pgduckdb.h"
#include "pgduckdb/vendor/pg_list.hpp"
#include "pgduckdb/pgduckdb_metadata_cache.hpp"
#include "pgduckdb/pgduckdb_background_worker.hpp"
#include "pgduckdb/pgduckdb_guc.h"

namespace pgduckdb {
Expand Down Expand Up @@ -87,9 +88,11 @@ InvalidateCaches(Datum /*arg*/, int /*cache_id*/, uint32 hash_value) {
if (hash_value != schema_hash_value) {
return;
}

if (!cache.valid) {
return;
}

cache.valid = false;
if (cache.installed) {
list_free(cache.duckdb_only_functions);
Expand Down Expand Up @@ -211,6 +214,9 @@ IsExtensionRegistered() {
if (cache.installed) {
/* If the extension is installed we can build the rest of the cache */
BuildDuckdbOnlyFunctions();

StartBackgroundWorkerIfNeeded();

cache.table_am_oid = GetSysCacheOid1(AMNAME, Anum_pg_am_oid, CStringGetDatum("duckdb"));

cache.schema_oid = get_namespace_oid("duckdb", false);
Expand Down

0 comments on commit 1454c14

Please sign in to comment.