Skip to content

Commit

Permalink
updated cache function
Browse files Browse the repository at this point in the history
  • Loading branch information
hectormachin authored Jun 9, 2023
2 parents d112b96 + 5cdbf12 commit dfba543
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 48 deletions.
45 changes: 21 additions & 24 deletions src/swoop/db/migrations/00000_base_schema.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -438,19 +438,20 @@ $$;

CREATE FUNCTION swoop.check_cache(plhash bytea, wf_version smallint, wf_name text, invalid timestamptz)
RETURNS RECORD
LANGUAGE plpgsql VOLATILE
AS $$
DECLARE
rec RECORD;
BEGIN
IF EXISTS (SELECT * FROM swoop.payload_cache WHERE payload_hash = plhash) THEN
-- cache already exists
-- An entry exists in the cache
DECLARE
_status text;
_jobid uuid;
_payloadid uuid;
v_status text;
v_job_id uuid;
v_payload_id uuid;
BEGIN
SELECT t.status, t.action_uuid, p.payload_uuid
INTO _status,_jobid, _payloadid
INTO v_status, v_job_id, v_payload_id
FROM swoop.payload_cache p
INNER JOIN swoop.action a
ON p.payload_uuid = a.payload_uuid
Expand All @@ -460,45 +461,41 @@ BEGIN
ORDER BY t.created_at DESC
LIMIT 1;

IF _status IN ('RUNNING', 'PENDING', 'QUEUED', 'BACKOFF', 'SUCCESSFUL', 'INVALID') THEN
-- redirect to job details for that workflow, do not process
SELECT FALSE, _jobid INTO rec;
IF v_status IN ('RUNNING', 'PENDING', 'QUEUED', 'BACKOFF', 'SUCCESSFUL', 'INVALID') THEN
-- Redirect to job details for that workflow, and do not process
SELECT FALSE, v_job_id INTO rec;
ELSE
-- -- this means there is already a record in both payload_cache and action tables for this
-- -- need to reprocess
-- Reprocess payload
DECLARE
_ver smallint;
_invalid timestamptz;
n_version smallint;
d_invalid timestamptz;
BEGIN
SELECT workflow_version, invalid_after
INTO _ver, _invalid
INTO n_version, d_invalid
FROM swoop.payload_cache
WHERE payload_hash = plhash;

IF wf_version > _ver OR _invalid < NOW() THEN
IF wf_version > _ver AND _invalid < NOW() THEN
-- Check workflow version and invalidation
IF wf_version > n_version OR d_invalid < NOW() THEN
IF wf_version > n_version AND d_invalid < NOW() THEN
UPDATE swoop.payload_cache SET workflow_version = wf_version, invalid_after = NULL WHERE payload_hash = plhash;
ELSIF wf_version > _ver THEN
ELSIF wf_version > n_version THEN
UPDATE swoop.payload_cache SET workflow_version = wf_version WHERE payload_hash = plhash;
ELSE
UPDATE swoop.payload_cache SET invalid_after = NULL WHERE payload_hash = plhash;
END IF;

-- reprocess the payload
SELECT TRUE, _payloadid, gen_random_uuid() INTO rec;
ELSE
SELECT FALSE, _jobid INTO rec;
END IF;
-- Reprocess payload with a new action_uuid
SELECT TRUE, v_payload_id, gen_random_uuid() INTO rec;
END;
END IF;
END;
ELSE
-- Insert a new entry into cache table and process payload with a new action_uuid
INSERT INTO swoop.payload_cache(payload_hash, workflow_version, workflow_name, invalid_after)
VALUES (plhash, wf_version, wf_name, invalid)
RETURNING TRUE, payload_uuid, gen_random_uuid() INTO rec;
-- process the payload
END IF;
RETURN rec;
END;
$$
LANGUAGE plpgsql;
$$;
45 changes: 21 additions & 24 deletions src/swoop/db/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -446,19 +446,20 @@ $$;

CREATE FUNCTION swoop.check_cache(plhash bytea, wf_version smallint, wf_name text, invalid timestamptz)
RETURNS RECORD
LANGUAGE plpgsql VOLATILE
AS $$
DECLARE
rec RECORD;
BEGIN
IF EXISTS (SELECT * FROM swoop.payload_cache WHERE payload_hash = plhash) THEN
-- cache already exists
-- An entry exists in the cache
DECLARE
_status text;
_jobid uuid;
_payloadid uuid;
v_status text;
v_job_id uuid;
v_payload_id uuid;
BEGIN
SELECT t.status, t.action_uuid, p.payload_uuid
INTO _status,_jobid, _payloadid
INTO v_status, v_job_id, v_payload_id
FROM swoop.payload_cache p
INNER JOIN swoop.action a
ON p.payload_uuid = a.payload_uuid
Expand All @@ -468,45 +469,41 @@ BEGIN
ORDER BY t.created_at DESC
LIMIT 1;

IF _status IN ('RUNNING', 'PENDING', 'QUEUED', 'BACKOFF', 'SUCCESSFUL', 'INVALID') THEN
-- redirect to job details for that workflow, do not process
SELECT FALSE, _jobid INTO rec;
IF v_status IN ('RUNNING', 'PENDING', 'QUEUED', 'BACKOFF', 'SUCCESSFUL', 'INVALID') THEN
-- Redirect to job details for that workflow, and do not process
SELECT FALSE, v_job_id INTO rec;
ELSE
-- -- this means there is already a record in both payload_cache and action tables for this
-- -- need to reprocess
-- Reprocess payload
DECLARE
_ver smallint;
_invalid timestamptz;
n_version smallint;
d_invalid timestamptz;
BEGIN
SELECT workflow_version, invalid_after
INTO _ver, _invalid
INTO n_version, d_invalid
FROM swoop.payload_cache
WHERE payload_hash = plhash;

IF wf_version > _ver OR _invalid < NOW() THEN
IF wf_version > _ver AND _invalid < NOW() THEN
-- Check workflow version and invalidation
IF wf_version > n_version OR d_invalid < NOW() THEN
IF wf_version > n_version AND d_invalid < NOW() THEN
UPDATE swoop.payload_cache SET workflow_version = wf_version, invalid_after = NULL WHERE payload_hash = plhash;
ELSIF wf_version > _ver THEN
ELSIF wf_version > n_version THEN
UPDATE swoop.payload_cache SET workflow_version = wf_version WHERE payload_hash = plhash;
ELSE
UPDATE swoop.payload_cache SET invalid_after = NULL WHERE payload_hash = plhash;
END IF;

-- reprocess the payload
SELECT TRUE, _payloadid, gen_random_uuid() INTO rec;
ELSE
SELECT FALSE, _jobid INTO rec;
END IF;
-- Reprocess payload with a new action_uuid
SELECT TRUE, v_payload_id, gen_random_uuid() INTO rec;
END;
END IF;
END;
ELSE
-- Insert a new entry into cache table and process payload with a new action_uuid
INSERT INTO swoop.payload_cache(payload_hash, workflow_version, workflow_name, invalid_after)
VALUES (plhash, wf_version, wf_name, invalid)
RETURNING TRUE, payload_uuid, gen_random_uuid() INTO rec;
-- process the payload
END IF;
RETURN rec;
END;
$$
LANGUAGE plpgsql;
$$;

0 comments on commit dfba543

Please sign in to comment.