Skip to content

feat: process the queue on demand #199

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,15 @@ The extension introduces a new `net` schema, which contains two unlogged tables,

When any of the three request functions (`http_get`, `http_post`, `http_delete`) are invoked, they create an entry in the `net.http_request_queue` table.

The extension employs C's [libCurl](https://curl.se/libcurl/c/) library within a PostgreSQL [background worker](https://www.postgresql.org/docs/current/bgworker.html) to manage HTTP requests. This background worker regularly checks the `http_request_queue` table and executes the requests it finds there.
The extension employs C's [libCurl](https://curl.se/libcurl/c/) library within a PostgreSQL [background worker](https://www.postgresql.org/docs/current/bgworker.html) to manage HTTP requests.
This background worker waits for a signal from the aforementioned request functions and executes the requests it finds on the `net.http_request_queue` table.

Once a response is received, it gets stored in the `_http_response` table. By monitoring this table, you can keep track of response statuses and messages.

> [!IMPORTANT]
> Inserting directly into the `net.http_request_queue` won't cause the worker to process requests, you must use the request functions.
> We do it this way to avoid polling the `net.http_request_queue` table, which would pollute `pg_stat_statements` and cause unnecesssary activity from the worker.

---

# Installation
Expand Down Expand Up @@ -145,7 +150,7 @@ create extension pg_net;
the extension creates 3 configurable variables:

1. **pg_net.batch_size** _(default: 200)_: An integer that limits the max number of rows that the extension will process from _`net.http_request_queue`_ during each read
2. **pg_net.ttl** _(default: 6 hours)_: An interval that defines the max time a row in the _`net.http_response`_ will live before being deleted
2. **pg_net.ttl** _(default: 6 hours)_: An interval that defines the max time a row in the _`net.http_response`_ will live before being deleted. Note that this won't happen exactly after the TTL has passed. The worker will perform this deletion while its processing requests.
3. **pg_net.database_name** _(default: 'postgres')_: A string that defines which database the extension is applied to
4. **pg_net.username** _(default: NULL)_: A string that defines which user will the background worker be connected with. If not set (`NULL`), it will assume the bootstrap user.

Expand Down
11 changes: 11 additions & 0 deletions sql/pg_net.sql
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ create or replace function net.wait_until_running()
as 'pg_net';
comment on function net.wait_until_running() is 'waits until the worker is running';

create or replace function net.wake()
returns void
language 'c'
as 'pg_net';

-- Interface to make an async request
-- API: Public
create or replace function net.http_get(
Expand Down Expand Up @@ -136,6 +141,8 @@ begin
returning id
into request_id;

perform net.wake();

return request_id;
end
$$;
Expand Down Expand Up @@ -204,6 +211,8 @@ begin
returning id
into request_id;

perform net.wake();

return request_id;
end
$$;
Expand Down Expand Up @@ -246,6 +255,8 @@ begin
returning id
into request_id;

perform net.wake();

return request_id;
end
$$;
Expand Down
185 changes: 111 additions & 74 deletions src/worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ typedef enum {

typedef struct {
pg_atomic_uint32 should_restart;
pg_atomic_uint32 should_work;
pg_atomic_uint32 status;
Latch latch;
ConditionVariable cv;
Expand All @@ -33,13 +34,16 @@ typedef struct {
WorkerState *worker_state = NULL;

static const int curl_handle_event_timeout_ms = 1000;
static const long queue_processing_wait_timeout_ms = 1000;
static const int net_worker_restart_time_sec = 1;
static const long no_timeout = -1L;

static char* guc_ttl;
static int guc_batch_size;
static char* guc_database_name;
static char* guc_username;
static MemoryContext CurlMemContext = NULL;
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
static long latch_timeout = 1000;
static volatile sig_atomic_t got_sighup = false;

void _PG_init(void);
Expand Down Expand Up @@ -73,6 +77,19 @@ Datum wait_until_running(__attribute__ ((unused)) PG_FUNCTION_ARGS){
PG_RETURN_VOID();
}

PG_FUNCTION_INFO_V1(wake);
Datum wake(__attribute__ ((unused)) PG_FUNCTION_ARGS) {
uint32 expected = 0;

bool success = pg_atomic_compare_exchange_u32(&worker_state->should_work, &expected, 1);
pg_write_barrier();

if (success) // only wake the worker on first put
SetLatch(&worker_state->latch);

PG_RETURN_VOID();
}

static void
handle_sigterm(__attribute__ ((unused)) SIGNAL_ARGS)
{
Expand All @@ -94,22 +111,27 @@ handle_sighup(__attribute__ ((unused)) SIGNAL_ARGS)
errno = save_errno;
}

static bool is_extension_loaded(){
Oid extensionOid;
static void publish_state(WorkerStatus s) {
pg_atomic_write_u32(&worker_state->status, (uint32)s);
pg_write_barrier();
ConditionVariableBroadcast(&worker_state->cv);
}

StartTransactionCommand();
static bool process_interrupts(){
bool restart = false;

extensionOid = get_extension_oid("pg_net", true);
CHECK_FOR_INTERRUPTS();

CommitTransactionCommand();
if (got_sighup) {
got_sighup = false;
ProcessConfigFile(PGC_SIGHUP);
}

return OidIsValid(extensionOid);
}
if (pg_atomic_read_u32(&worker_state->should_restart) == 1){ // if a restart is issued, make sure we do it again after waiting
restart = true;
}

static void publish_state(WorkerStatus s) {
pg_atomic_write_u32(&worker_state->status, (uint32)s);
pg_write_barrier();
ConditionVariableBroadcast(&worker_state->cv);
return restart;
}

void pg_net_worker(__attribute__ ((unused)) Datum main_arg) {
Expand Down Expand Up @@ -144,96 +166,110 @@ void pg_net_worker(__attribute__ ((unused)) Datum main_arg) {

set_curl_mhandle(lstate.curl_mhandle, &lstate);

while (!pg_atomic_read_u32(&worker_state->should_restart)) {
WaitLatch(&worker_state->latch,
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
latch_timeout,
PG_WAIT_EXTENSION);
ResetLatch(&worker_state->latch);

while (true) {
publish_state(WS_RUNNING);

CHECK_FOR_INTERRUPTS();
uint32 expected = 1;
if (!pg_atomic_compare_exchange_u32(&worker_state->should_work, &expected, 0)){
elog(DEBUG1, "pg_net_worker waiting for wake");
// this will also wait for the `create extension net` to load, since the signal can only come from the request functions inside the `net` schema
WaitLatch(&worker_state->latch,
WL_LATCH_SET | WL_EXIT_ON_PM_DEATH,
no_timeout,
PG_WAIT_EXTENSION);
ResetLatch(&worker_state->latch);
if(process_interrupts())
goto restart;

if(!is_extension_loaded()){
elog(DEBUG1, "pg_net worker: extension not yet loaded");
continue;
}

if (got_sighup) {
got_sighup = false;
ProcessConfigFile(PGC_SIGHUP);
}

if (pg_atomic_read_u32(&worker_state->should_restart) == 1){ // if a restart is issued, make sure we do it again after waiting
break;
}
uint64 requests_consumed = 0;
uint64 expired_responses = 0;

uint64 expired_responses = delete_expired_responses(guc_ttl, guc_batch_size);
do {
expired_responses = delete_expired_responses(guc_ttl, guc_batch_size);

elog(DEBUG1, "Deleted %zu expired rows", expired_responses);
elog(DEBUG1, "Deleted %zu expired rows", expired_responses);

uint64 requests_consumed = consume_request_queue(lstate.curl_mhandle, guc_batch_size, CurlMemContext);
requests_consumed = consume_request_queue(lstate.curl_mhandle, guc_batch_size, CurlMemContext);

elog(DEBUG1, "Consumed %zu request rows", requests_consumed);
elog(DEBUG1, "Consumed %zu request rows", requests_consumed);

if(requests_consumed == 0)
continue;
if(requests_consumed == 0)
continue;

int running_handles = 0;
int maxevents = guc_batch_size + 1; // 1 extra for the timer
event *events = palloc0(sizeof(event) * maxevents);
int running_handles = 0;
int maxevents = guc_batch_size + 1; // 1 extra for the timer
event *events = palloc0(sizeof(event) * maxevents);

do {
int nfds = wait_event(lstate.epfd, events, maxevents, curl_handle_event_timeout_ms);
if (nfds < 0) {
int save_errno = errno;
if(save_errno == EINTR) { // can happen when the wait is interrupted, for example when running under GDB. Just continue in this case.
continue;
do {
int nfds = wait_event(lstate.epfd, events, maxevents, curl_handle_event_timeout_ms);
if (nfds < 0) {
int save_errno = errno;
if(save_errno == EINTR) { // can happen when the wait is interrupted, for example when running under GDB. Just continue in this case.
continue;
}
else {
ereport(ERROR, errmsg("wait_event() failed: %s", strerror(save_errno)));
break;
}
}
else {
ereport(ERROR, errmsg("wait_event() failed: %s", strerror(save_errno)));
break;
}
}

for (int i = 0; i < nfds; i++) {
if (is_timer(events[i])) {
EREPORT_MULTI(
curl_multi_socket_action(lstate.curl_mhandle, CURL_SOCKET_TIMEOUT, 0, &running_handles)
);
} else {
int curl_event = get_curl_event(events[i]);
int sockfd = get_socket_fd(events[i]);

EREPORT_MULTI(
curl_multi_socket_action(
lstate.curl_mhandle,
sockfd,
curl_event,
&running_handles)
);
for (int i = 0; i < nfds; i++) {
if (is_timer(events[i])) {
EREPORT_MULTI(
curl_multi_socket_action(lstate.curl_mhandle, CURL_SOCKET_TIMEOUT, 0, &running_handles)
);
} else {
int curl_event = get_curl_event(events[i]);
int sockfd = get_socket_fd(events[i]);

EREPORT_MULTI(
curl_multi_socket_action(
lstate.curl_mhandle,
sockfd,
curl_event,
&running_handles)
);
}

insert_curl_responses(&lstate, CurlMemContext);
}

insert_curl_responses(&lstate, CurlMemContext);
}
elog(DEBUG1, "Pending curl running_handles: %d", running_handles);
} while (running_handles > 0); // run while there are curl handles, some won't finish in a single iteration since they could be slow and waiting for a timeout

pfree(events);

elog(DEBUG1, "Pending curl running_handles: %d", running_handles);
} while (running_handles > 0); // run while there are curl handles, some won't finish in a single iteration since they could be slow and waiting for a timeout
MemoryContextReset(CurlMemContext);

pfree(events);
// slow down queue processing to avoid using too much CPU
WaitLatch(&worker_state->latch, WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, queue_processing_wait_timeout_ms, PG_WAIT_EXTENSION);
ResetLatch(&worker_state->latch);
if(process_interrupts()){
if(requests_consumed > 0 || expired_responses > 0) // if we have to restart, ensure the remaining work will continue
pg_atomic_write_u32(&worker_state->should_work, 1);

MemoryContextReset(CurlMemContext);
goto restart;
}

} while (requests_consumed > 0 || expired_responses > 0);
}

restart:

publish_state(WS_EXITED);

elog(DEBUG1, "pg_net_worker restarting, will it process the queue when started: %s", pg_atomic_read_u32(&worker_state->should_work)?"Yes":"No");

pg_atomic_write_u32(&worker_state->should_restart, 0);

ev_monitor_close(&lstate);

curl_multi_cleanup(lstate.curl_mhandle);
curl_global_cleanup();

publish_state(WS_EXITED);
DisownLatch(&worker_state->latch);

// causing a failure on exit will make the postmaster process restart the bg worker
Expand All @@ -251,6 +287,7 @@ static void net_shmem_startup(void) {
if (!found) { // only at worker initialization, once worker restarts it will be found
pg_atomic_init_u32(&worker_state->should_restart, 0);
pg_atomic_init_u32(&worker_state->status, WS_NOT_YET);
pg_atomic_init_u32(&worker_state->should_work, 0);
InitSharedLatch(&worker_state->latch);
ConditionVariableInit(&worker_state->cv);
}
Expand All @@ -273,7 +310,7 @@ void _PG_init(void) {
.bgw_library_name = "pg_net",
.bgw_function_name = "pg_net_worker",
.bgw_name = "pg_net " EXTVERSION " worker",
.bgw_restart_time = 1,
.bgw_restart_time = net_worker_restart_time_sec,
});

prev_shmem_startup_hook = shmem_startup_hook;
Expand Down
6 changes: 4 additions & 2 deletions test/test_http_requests_deleted_after_ttl.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from sqlalchemy import text

def test_http_requests_deleted_after_ttl(sess):
"""Check that http requests are deleted within a few seconds of their ttl"""
"""Check that http requests will be deleted when they reach their ttl, not immediately but when the worker starts again"""

# commit to avoid "cannot run inside a transaction block" error, see https://stackoverflow.com/a/75757326/4692662
sess.execute(text("COMMIT"))
Expand Down Expand Up @@ -38,6 +38,9 @@ def test_http_requests_deleted_after_ttl(sess):
# Sleep until after request should have been deleted
time.sleep(5)

# Wake the worker manually, under normal operation this will happen when new requests are received
sess.execute(text("select net.wake()"))

# Ensure collecting the resposne now results in an error
response = sess.execute(
text(
Expand All @@ -47,7 +50,6 @@ def test_http_requests_deleted_after_ttl(sess):
),
{"request_id": request_id},
).fetchone()
# TODO an ERROR status doesn't seem correct here
assert response[0] == "ERROR"

sess.execute(text("COMMIT"))
Expand Down
5 changes: 3 additions & 2 deletions test/test_stat_statements.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from sqlalchemy import text

def test_query_stat_statements(sess):
"""Check that the background worker executes queries despite no new requests arriving"""
"""Check that the background worker doesn't execute queries when no new requests arrive"""

(pg_version,) = sess.execute(text(
"""
Expand Down Expand Up @@ -39,6 +39,7 @@ def test_query_stat_statements(sess):
"""
)).fetchone()

# sleep for some time to see if new queries arrive
time.sleep(3)

(new_calls,) = sess.execute(text(
Expand All @@ -55,4 +56,4 @@ def test_query_stat_statements(sess):
"""
)).fetchone()

assert new_calls > old_calls
assert new_calls == old_calls
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This proves #164 is solved

Loading