Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement message headers #338

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 105 additions & 33 deletions pgmq-extension/sql/pgmq.sql
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ CREATE TYPE pgmq.message_record AS (
read_ct INTEGER,
enqueued_at TIMESTAMP WITH TIME ZONE,
vt TIMESTAMP WITH TIME ZONE,
message JSONB
message JSONB,
headers JSONB
);

CREATE TYPE pgmq.queue_record AS (
Expand Down Expand Up @@ -85,7 +86,7 @@ BEGIN
read_ct = read_ct + 1
FROM cte
WHERE m.msg_id = cte.msg_id
RETURNING m.msg_id, m.read_ct, m.enqueued_at, m.vt, m.message;
RETURNING m.msg_id, m.read_ct, m.enqueued_at, m.vt, m.message, m.headers;
$QUERY$,
qtable, conditional, qtable, make_interval(secs => vt)
);
Expand Down Expand Up @@ -136,7 +137,7 @@ BEGIN
read_ct = read_ct + 1
FROM cte
WHERE m.msg_id = cte.msg_id
RETURNING m.msg_id, m.read_ct, m.enqueued_at, m.vt, m.message;
RETURNING m.msg_id, m.read_ct, m.enqueued_at, m.vt, m.message, m.headers;
$QUERY$,
qtable, conditional, qtable, make_interval(secs => vt)
);
Expand Down Expand Up @@ -267,23 +268,56 @@ BEGIN
END;
$$ LANGUAGE plpgsql;

-- send
-- sends a message to a queue, optionally with a delay
-- send: 2 args, no delay or headers
CREATE FUNCTION pgmq.send(
queue_name TEXT,
msg JSONB
) RETURNS SETOF BIGINT AS $$
SELECT * FROM pgmq.send(queue_name, msg, NULL, clock_timestamp());
$$ LANGUAGE sql;

-- send: 3 args with headers
CREATE FUNCTION pgmq.send(
queue_name TEXT,
msg JSONB,
delay INTEGER DEFAULT 0
headers JSONB
) RETURNS SETOF BIGINT AS $$
BEGIN
RETURN QUERY SELECT * FROM pgmq.send(queue_name, msg, clock_timestamp() + make_interval(secs => delay));
END;
$$ LANGUAGE plpgsql;
SELECT * FROM pgmq.send(queue_name, msg, headers, clock_timestamp());
$$ LANGUAGE sql;

-- send: 3 args with integer delay
CREATE FUNCTION pgmq.send(
queue_name TEXT,
msg JSONB,
delay INTEGER
) RETURNS SETOF BIGINT AS $$
SELECT * FROM pgmq.send(queue_name, msg, NULL, clock_timestamp() + make_interval(secs => delay));
$$ LANGUAGE sql;

-- send: 3 args with timestamp
CREATE FUNCTION pgmq.send(
queue_name TEXT,
msg JSONB,
delay TIMESTAMP WITH TIME ZONE
) RETURNS SETOF BIGINT AS $$
SELECT * FROM pgmq.send(queue_name, msg, NULL, delay);
$$ LANGUAGE sql;

-- send: 4 args with integer delay
CREATE FUNCTION pgmq.send(
queue_name TEXT,
msg JSONB,
headers JSONB,
delay INTEGER
) RETURNS SETOF BIGINT AS $$
SELECT * FROM pgmq.send(queue_name, msg, headers, clock_timestamp() + make_interval(secs => delay));
$$ LANGUAGE sql;

-- send_at
-- sends a message to a queue, with a delay as a timestamp
-- send: actual implementation
CREATE FUNCTION pgmq.send(
queue_name TEXT,
msg JSONB,
headers JSONB,
delay TIMESTAMP WITH TIME ZONE
) RETURNS SETOF BIGINT AS $$
DECLARE
Expand All @@ -292,33 +326,66 @@ DECLARE
BEGIN
sql := FORMAT(
$QUERY$
INSERT INTO pgmq.%I (vt, message)
VALUES ($2, $1)
INSERT INTO pgmq.%I (vt, message, headers)
VALUES ($2, $1, $3)
RETURNING msg_id;
$QUERY$,
qtable
);
RETURN QUERY EXECUTE sql USING msg, delay;
RETURN QUERY EXECUTE sql USING msg, delay, headers;
END;
$$ LANGUAGE plpgsql;

-- send_batch
-- sends an array of list of messages to a queue, optionally with a delay
-- send batch: 2 args
CREATE FUNCTION pgmq.send_batch(
queue_name TEXT,
msgs JSONB[]
) RETURNS SETOF BIGINT AS $$
SELECT * FROM pgmq.send_batch(queue_name, msgs, NULL, clock_timestamp());
$$ LANGUAGE sql;

-- send batch: 3 args with headers
CREATE FUNCTION pgmq.send_batch(
queue_name TEXT,
msgs JSONB[],
headers JSONB[]
) RETURNS SETOF BIGINT AS $$
SELECT * FROM pgmq.send_batch(queue_name, msgs, headers, clock_timestamp());
$$ LANGUAGE sql;

-- send batch: 3 args with integer delay
CREATE FUNCTION pgmq.send_batch(
queue_name TEXT,
msgs JSONB[],
delay INTEGER
) RETURNS SETOF BIGINT AS $$
SELECT * FROM pgmq.send_batch(queue_name, msgs, NULL, clock_timestamp() + make_interval(secs => delay));
$$ LANGUAGE sql;

-- send batch: 3 args with timestamp
CREATE FUNCTION pgmq.send_batch(
queue_name TEXT,
msgs JSONB[],
delay TIMESTAMP WITH TIME ZONE
) RETURNS SETOF BIGINT AS $$
SELECT * FROM pgmq.send_batch(queue_name, msgs, NULL, delay);
$$ LANGUAGE sql;

-- send_batch: 4 args with integer delay
CREATE FUNCTION pgmq.send_batch(
queue_name TEXT,
msgs JSONB[],
headers JSONB[],
delay INTEGER DEFAULT 0
) RETURNS SETOF BIGINT AS $$
BEGIN
RETURN QUERY SELECT * FROM pgmq.send_batch(queue_name, msgs, clock_timestamp() + make_interval(secs => delay));
END;
$$ LANGUAGE plpgsql;
SELECT * FROM pgmq.send_batch(queue_name, msgs, headers, clock_timestamp() + make_interval(secs => delay));
$$ LANGUAGE sql;

-- send_batch_at
-- sends an array of list of messages to a queue, with a delay as a timestamp
-- send_batch: actual implementation
CREATE FUNCTION pgmq.send_batch(
queue_name TEXT,
msgs JSONB[],
headers JSONB[],
delay TIMESTAMP WITH TIME ZONE
) RETURNS SETOF BIGINT AS $$
DECLARE
Expand All @@ -327,13 +394,13 @@ DECLARE
BEGIN
sql := FORMAT(
$QUERY$
INSERT INTO pgmq.%I (vt, message)
SELECT $2, unnest($1)
INSERT INTO pgmq.%I (vt, message, headers)
SELECT $2, unnest($1), unnest(coalesce($3, ARRAY[]::jsonb[]))
RETURNING msg_id;
$QUERY$,
qtable
);
RETURN QUERY EXECUTE sql USING msgs, delay;
RETURN QUERY EXECUTE sql USING msgs, delay, headers;
END;
$$ LANGUAGE plpgsql;

Expand Down Expand Up @@ -631,7 +698,8 @@ BEGIN
read_ct INT DEFAULT 0 NOT NULL,
enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL,
vt TIMESTAMP WITH TIME ZONE NOT NULL,
message JSONB
message JSONB,
headers JSONB
)
$QUERY$,
qtable
Expand All @@ -645,7 +713,8 @@ BEGIN
enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL,
archived_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL,
vt TIMESTAMP WITH TIME ZONE NOT NULL,
message JSONB
message JSONB,
headers JSONB
);
$QUERY$,
atable
Expand Down Expand Up @@ -699,7 +768,8 @@ BEGIN
read_ct INT DEFAULT 0 NOT NULL,
enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL,
vt TIMESTAMP WITH TIME ZONE NOT NULL,
message JSONB
message JSONB,
headers JSONB
)
$QUERY$,
qtable
Expand All @@ -713,7 +783,8 @@ BEGIN
enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL,
archived_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL,
vt TIMESTAMP WITH TIME ZONE NOT NULL,
message JSONB
message JSONB,
headers JSONB
);
$QUERY$,
atable
Expand Down Expand Up @@ -819,7 +890,8 @@ BEGIN
read_ct INT DEFAULT 0 NOT NULL,
enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL,
vt TIMESTAMP WITH TIME ZONE NOT NULL,
message JSONB
message JSONB,
headers JSONB
) PARTITION BY RANGE (%I)
$QUERY$,
qtable, partition_col
Expand Down Expand Up @@ -895,7 +967,8 @@ BEGIN
enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL,
archived_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL,
vt TIMESTAMP WITH TIME ZONE NOT NULL,
message JSONB
message JSONB,
headers JSONB
) PARTITION BY RANGE (%I);
$QUERY$,
atable, a_partition_col
Expand Down Expand Up @@ -1034,6 +1107,5 @@ BEGIN
retention_interval,
qualified_a_table_name
);

END;
$$ LANGUAGE plpgsql;
8 changes: 4 additions & 4 deletions pgmq-extension/test/expected/base.out
Original file line number Diff line number Diff line change
Expand Up @@ -503,8 +503,8 @@ SELECT pgmq.create('test_pop_queue');
(1 row)

SELECT * FROM pgmq.pop('test_pop_queue');
msg_id | read_ct | enqueued_at | vt | message
--------+---------+-------------+----+---------
msg_id | read_ct | enqueued_at | vt | message | headers
--------+---------+-------------+----+---------+---------
(0 rows)

SELECT send AS first_msg_id from pgmq.send('test_pop_queue', '0') \gset
Expand Down Expand Up @@ -534,8 +534,8 @@ SELECT pgmq.create('test_set_vt_queue');
(1 row)

SELECT * FROM pgmq.set_vt('test_set_vt_queue', 9999, 0);
msg_id | read_ct | enqueued_at | vt | message
--------+---------+-------------+----+---------
msg_id | read_ct | enqueued_at | vt | message | headers
--------+---------+-------------+----+---------+---------
(0 rows)

SELECT send AS first_msg_id from pgmq.send('test_set_vt_queue', '0') \gset
Expand Down
Loading