Skip to content

Commit

Permalink
refactor(sync.v2): improve readability and debugability
Browse files Browse the repository at this point in the history
1. abstract functions and reshape the code, including error handling
2. extra call to notify CP the state of sync;
3. more debuging logs;
4. sync once retry will not count timeout as retry, allowing DP to sync deltas right after a full sync

KAG-6177
  • Loading branch information
StarlightIbuki committed Jan 16, 2025
1 parent ce15c78 commit 2d55554
Showing 1 changed file with 134 additions and 86 deletions.
220 changes: 134 additions & 86 deletions kong/clustering/services/sync/rpc.lua
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ local constants = require("kong.constants")
local concurrency = require("kong.concurrency")
local isempty = require("table.isempty")
local events = require("kong.runloop.events")
local yield = require("kong.tools.yield").yield


local insert_entity_for_txn = declarative.insert_entity_for_txn
Expand All @@ -26,6 +27,7 @@ local ipairs = ipairs
local ngx_null = ngx.null
local ngx_log = ngx.log
local ngx_ERR = ngx.ERR
local ngx_INFO = ngx.INFO
local ngx_DEBUG = ngx.DEBUG


Expand Down Expand Up @@ -173,34 +175,112 @@ local function is_rpc_ready()
end


local function do_sync()
local do_sync


local function error_handler(err, is_full_sync)
ngx_log(ngx_ERR, err)

if is_full_sync then
return do_sync() -- retry without releasing the mutex as the DP has nothing to do except syncing
end

return nil, err
end


local function lmdb_upsert(db, t, delta, opts)
local delta_type = delta.type
local delta_entity = delta.entity

-- upsert the entity
-- does the entity already exists?
local old_entity, err = db[delta_type]:select(delta_entity)

if err then
return nil, err
end

if old_entity then
local res, err = delete_entity_for_txn(t, delta_type, old_entity, opts)
if not res then
return nil, err
end
end

local res, err = insert_entity_for_txn(t, delta_type, delta_entity, opts)
if not res then
return nil, err
end

return { delta_type, old_entity and "update" or "create", delta_entity, old_entity, }
end


local function lmdb_delete(db, t, delta, opts, is_full_sync)
local delta_type = delta.type

-- NOTE: the first page of full sync MUST NOT contain delete events
-- assert(not t.purged, "unexpected delete event in frist page of full sync")

local old_entity, err = db[delta_type]:select(delta.pk, opts)
if err then
return nil, err
end

-- entity does not exist, no need to delete
if not old_entity then
return true
end

-- delete the entity
local res, err = delete_entity_for_txn(t, delta_type, old_entity, opts)

if is_full_sync then
local entity_not_found = err and err:match("MDB_NOTFOUND")
-- full sync needs extra tolerance for errors
if not res and not entity_not_found then
return nil, err
end
else
if not res then
return nil, err
end
end

return { delta_type, "delete", old_entity, }
end


function do_sync()
if not is_rpc_ready() then
return nil, "rpc is not ready"
end

local msg = { default = { version = get_current_version(), }, }
local current_version = get_current_version()

local msg = { default = { version = current_version, }, }

local ns_deltas, err = kong.rpc:call("control_plane", "kong.sync.v2.get_delta", msg)
if not ns_deltas then
ngx_log(ngx_ERR, "sync get_delta error: ", err)
return true
return error_handler("sync get_delta error: " .. err)
end

-- ns_deltas should look like:
-- { default = { deltas = { ... }, wipe = true, }, }
-- { default = { deltas = { ... } }, }

local ns_delta = ns_deltas.default
if not ns_delta then
return nil, "default namespace does not exist inside params"
return error_handler("default namespace does not exist inside params")
end

local deltas = ns_delta.deltas

if not deltas then
return nil, "sync get_delta error: deltas is null"
return error_handler("sync get_delta error: deltas is null")
end

if isempty(deltas) then
if isempty(deltas) and not is_full_sync then
-- no delta to sync
return true
end
Expand All @@ -221,14 +301,17 @@ local function do_sync()

local t = txn.begin(512)

local wipe = ns_delta.wipe
if wipe then
local is_full_sync = ns_delta.wipe
if is_full_sync then
-- a sync begins
-- reset the dataplane to a clean state
ngx_log(ngx_INFO, "[kong.sync.v2] sync begins")
t:db_drop(false)
end

local db = kong.db

local version = ""
local version = current_version
local opts = {}
local crud_events = {}
local crud_events_n = 0
Expand All @@ -239,72 +322,26 @@ local function do_sync()
local delta_version = delta.version
local delta_type = delta.type
local delta_entity = delta.entity
local ev
local delta_is_upsert = delta_entity ~= nil and delta_entity ~= ngx_null
local operation_name = delta_is_upsert and "upsert" or "delete"
local operation = delta_is_upsert and lmdb_upsert or lmdb_delete

ngx_log(ngx_DEBUG,
"[kong.sync.v2] ", operation_name, " entity",
", version: ", delta_version,
", type: ", delta_type)

-- delta should have ws_id to generate the correct lmdb key
-- if entity is workspaceable
-- set the correct workspace for item
opts.workspace = delta.ws_id

if delta_entity ~= nil and delta_entity ~= ngx_null then
-- upsert the entity
-- does the entity already exists?
local old_entity, err = db[delta_type]:select(delta_entity)
if err then
return nil, err
end

-- If we will wipe lmdb, we don't need to delete it from lmdb.
if old_entity and not wipe then
local res, err = delete_entity_for_txn(t, delta_type, old_entity, opts)
if not res then
return nil, err
end
end

local res, err = insert_entity_for_txn(t, delta_type, delta_entity, opts)
if not res then
return nil, err
end

ngx_log(ngx_DEBUG,
"[kong.sync.v2] update entity",
", version: ", delta_version,
", type: ", delta_type)

-- wipe the whole lmdb, should not have events
if not wipe then
ev = { delta_type, old_entity and "update" or "create", delta_entity, old_entity, }
end

else
-- delete the entity, opts for getting correct lmdb key
local old_entity, err = db[delta_type]:select(delta.pk, opts) -- composite key
if err then
return nil, err
end

-- If we will wipe lmdb, we don't need to delete it from lmdb.
if old_entity and not wipe then
local res, err = delete_entity_for_txn(t, delta_type, old_entity, opts)
if not res then
return nil, err
end
end

ngx_log(ngx_DEBUG,
"[kong.sync.v2] delete entity",
", version: ", delta_version,
", type: ", delta_type)

-- wipe the whole lmdb, should not have events
if not wipe then
ev = { delta_type, "delete", old_entity, }
end
end -- if delta_entity ~= nil and delta_entity ~= ngx_null

-- wipe the whole lmdb, should not have events
if not wipe then
local ev, err = operation(db, t, delta, opts)
if not ev then
return error_handler("failed to " .. operation_name .. " entity: " .. err, is_full_sync)
end

if not is_full_sync then
crud_events_n = crud_events_n + 1
crud_events[crud_events_n] = ev
end
Expand All @@ -323,16 +360,18 @@ local function do_sync()
-- record the default workspace into LMDB for any of the following case:
-- * wipe is false, but the default workspace has been changed
-- * wipe is true (full sync)
if default_ws_changed or wipe then
if default_ws_changed or is_full_sync then
t:set(DECLARATIVE_DEFAULT_WORKSPACE_KEY, kong.default_workspace)
end

local ok, err = t:commit()
if not ok then
return nil, err
return error_handler("failed to commit transaction: " .. err, is_full_sync)
end

if wipe then
if is_full_sync then
-- all deltas are applied. Emit events to notify the changes
ngx_log(ngx_INFO, "[kong.sync.v2] sync ends")
kong.core_cache:purge()
kong.cache:purge()

Expand All @@ -341,16 +380,12 @@ local function do_sync()
-- Full sync could rebuild route, plugins and balancer route, so their
-- hashes are nil.
local reconfigure_data = { kong.default_workspace, nil, nil, nil, }
local ok, err = events.declarative_reconfigure_notify(reconfigure_data)
if not ok then
return nil, err
end
return events.declarative_reconfigure_notify(reconfigure_data)
end

else
for _, event in ipairs(crud_events) do
-- delta_type, crud_event_type, delta.entity, old_entity
db[event[1]]:post_crud_event(event[2], event[3], event[4])
end
for _, event in ipairs(crud_events) do
-- delta_type, crud_event_type, delta.entity, old_entity
db[event[1]]:post_crud_event(event[2], event[3], event[4])
end

return true
Expand All @@ -363,9 +398,12 @@ local function sync_handler(premature)
end

local res, err = concurrency.with_worker_mutex(SYNC_MUTEX_OPTS, do_sync)

if not res and err ~= "timeout" then
ngx_log(ngx_ERR, "unable to create worker mutex and sync: ", err)
end

return res, err
end


Expand All @@ -387,7 +425,7 @@ function sync_once_impl(premature, retry_count)
return
end

sync_handler()
local _, err = sync_handler()

-- check if "kong.sync.v2.notify_new_version" updates the latest version

Expand All @@ -400,17 +438,27 @@ function sync_once_impl(premature, retry_count)
local current_version = get_current_version()
if current_version >= latest_notified_version then
ngx_log(ngx_DEBUG, "version already updated")
return
return sync_handler() -- call get_delta once more to report to the CP that we are up-to-date
end

-- retry if the version is not updated
retry_count = retry_count or 0
-- we do not count a timed out sync. just retry
if err ~= "timeout" then
retry_count = retry_count + 1
end

-- in some cases, the new spawned timer will be switched to immediately,
-- preventing the coroutine who possesses the mutex to run
-- to let other coroutines has a chance to run
yield()

if retry_count > MAX_RETRY then
ngx_log(ngx_ERR, "sync_once retry count exceeded. retry_count: ", retry_count)
return
end

return start_sync_once_timer(retry_count + 1)
return start_sync_once_timer(retry_count)
end


Expand Down

0 comments on commit 2d55554

Please sign in to comment.