From 71a1aa2b76916a2cfd1374ba0322acdf093820eb Mon Sep 17 00:00:00 2001 From: William Wills Date: Tue, 10 Dec 2024 17:07:40 -0500 Subject: [PATCH] feat: check organization subsubscriptions improved fix: removed rpc call exceeding max response feat: improved logging --- src/datalayer/persistance.js | 6 +- .../organizations/organizations.model.js | 29 ++-- src/tasks/check-organization-subscriptions.js | 139 +++++------------- src/tasks/sync-registries.js | 2 +- 4 files changed, 51 insertions(+), 125 deletions(-) diff --git a/src/datalayer/persistance.js b/src/datalayer/persistance.js index 730d951a..e0b8f1b1 100644 --- a/src/datalayer/persistance.js +++ b/src/datalayer/persistance.js @@ -357,14 +357,14 @@ const getStoreData = async (storeId, rootHash) => { const data = response.body; if (data.success) { - if (!_.isEmpty(data.keys_values)) { + if (_.isEmpty(data.keys_values)) { logger.warn( `datalayer get_keys_values returned no data for store ${storeId} at root hash: ${rootHash || 'latest'}`, ); } return data; } else { - throw new Error(`${data}`); + throw new Error(JSON.stringify(data)); } } catch (error) { logger.error( @@ -396,7 +396,7 @@ const getRoot = async (storeId, ignoreEmptyStore = false) => { const { confirmed, hash } = response.body; logger.debug( - `the current root data for store ${storeId} is ${response.body}`, + `the current root data for store ${storeId} is ${JSON.stringify(response.body)}`, ); if (confirmed && (ignoreEmptyStore || !hash.includes('0x00000000000'))) { diff --git a/src/models/organizations/organizations.model.js b/src/models/organizations/organizations.model.js index e7dd670e..bcb57ecb 100644 --- a/src/models/organizations/organizations.model.js +++ b/src/models/organizations/organizations.model.js @@ -420,23 +420,12 @@ class Organization extends Model { } logger.debug(`checking registry store singleton for org ${orgUid}`); - let registryData = null; - while (!registryData) { - try { - registryData = await datalayer.getSubscribedStoreData(registryStoreId); - if (!registryData) { - throw new Error( - `failed to get data from registry store ${registryStoreId}`, - ); - } - } catch (error) { - if (reachedTimeout()) { - onTimeout(error); - } - logger.debug(`${error.message}. RETRYING`); - } finally { - await new Promise((resolve) => setTimeout(resolve, 300)); - } + const subscribedToRegistryStore = + await datalayer.subscribeToStoreOnDataLayer(registryStoreId); + if (!subscribedToRegistryStore) { + throw new Error( + `failed to subscribe to or validate subscription for registry store ${registryStoreId}`, + ); } if (AUTO_SUBSCRIBE_FILESTORE) { @@ -545,8 +534,14 @@ class Organization extends Model { const exists = await Organization.findOne({ where: { orgUid: org.orgUid }, }); + logger.debug( + `sync dafault orgs task checking default org ${org.orgUid}`, + ); if (!exists) { + logger.debug( + `default organization ${org.orgUid} was not found in the organizations table. running the import process to correct`, + ); await Organization.importOrganization(org.orgUid); } } diff --git a/src/tasks/check-organization-subscriptions.js b/src/tasks/check-organization-subscriptions.js index 9fab5efa..865543d5 100644 --- a/src/tasks/check-organization-subscriptions.js +++ b/src/tasks/check-organization-subscriptions.js @@ -1,13 +1,8 @@ import { SimpleIntervalJob, Task } from 'toad-scheduler'; import { getConfig } from '../utils/config-loader.js'; import { Meta, Organization } from '../models/index.js'; -import { - getOwnedStores, - getSubscriptions, - subscribeToStoreOnDataLayer, -} from '../datalayer/persistance.js'; +import { getOwnedStores } from '../datalayer/persistance.js'; import { logger } from '../config/logger.js'; -import datalayer from '../datalayer/index.js'; const CONFIG = getConfig(); @@ -31,118 +26,54 @@ const task = new Task('check-organization-subscriptions', async () => { try { const organizations = await Organization.findAll(); - const subscribedStores = await getSubscriptions(); const ownedStores = await getOwnedStores(); - if (!subscribedStores?.success) { - throw new Error('failed to get subscriptions from datalayer'); - } - for (const organization of organizations) { - const { orgUid, registryId, isHome, name } = organization; - const orgStoreData = await datalayer.getCurrentStoreData(orgUid); + try { + const { orgUid, registryId, isHome, name } = organization; - // note here the registryId is the datamodel version store - const dataModelStoreId = orgStoreData.find( - (record) => record?.key === 'registryId', - )?.value; - - if (!dataModelStoreId) { - throw new Error( - `failed to retrieve datamodel version store id for organization ${name} (orgUid ${orgUid})`, + logger.debug( + `running the organization subscription process on organization ${name} (orgUid ${orgUid})`, ); - } - logger.debug( - `validating that datalayer is subscribed to org store ${orgUid}, datamodel version store ${dataModelStoreId}, and registry store ${registryId} belonging to ${name}`, - ); - - if (isHome) { - const homeOrgStoreOwned = ownedStores.storeIds.includes(orgUid); - const dataModelVersionStoreOwned = - ownedStores.storeIds.includes(dataModelStoreId); - const homeRegistryStoreOwned = - ownedStores.storeIds.includes(registryId); - - if (!homeOrgStoreOwned) { - throw new Error( - `your wallet does not own your home organization store ${orgUid}. this is a serious issue that CADT cannot resolve`, - ); - } + const datalayerOrganizationStoreIds = + await Organization.subscribeToOrganization(orgUid); - if (!dataModelVersionStoreOwned) { - throw new Error( - `your wallet does not own your home datamodel version store ${dataModelStoreId}. this is a serious issue that CADT cannot resolve`, + if (isHome) { + const homeOrgStoreOwned = ownedStores.storeIds.includes(orgUid); + const dataModelVersionStoreOwned = ownedStores.storeIds.includes( + datalayerOrganizationStoreIds.dataModelVersionStoreId, ); + const homeRegistryStoreOwned = + ownedStores.storeIds.includes(registryId); + + if (!homeOrgStoreOwned) { + throw new Error( + `your wallet does not own your home organization store ${orgUid}. this is a serious issue that CADT cannot resolve`, + ); + } + + if (!dataModelVersionStoreOwned) { + throw new Error( + `your wallet does not own your home datamodel version store ${datalayerOrganizationStoreIds.dataModelVersionStoreId}. this is a serious issue that CADT cannot resolve`, + ); + } + + if (!homeRegistryStoreOwned) { + throw new Error( + `your wallet does not own your home registry store ${registryId}. this is a serious issue that CADT cannot resolve`, + ); + } } - - if (!homeRegistryStoreOwned) { - throw new Error( - `your wallet does not own your home registry store ${registryId}. this is a serious issue that CADT cannot resolve`, - ); - } - } - - const subscribedToOrgStore = subscribedStores.storeIds.includes(orgUid); - const subscribedToDataModelVersionStore = - subscribedStores.storeIds.includes(dataModelStoreId); - const subscribedToRegistryStore = - subscribedStores.storeIds.includes(registryId); - - if (!subscribedToOrgStore) { - logger.info( - `datalayer is not subscribed to orgUid store ${orgUid}, subscribing ...`, - ); - - const result = await subscribeToStoreOnDataLayer(orgUid); - if (result) { - logger.info(`subscribed to store ${orgUid}`); - } else { - throw new Error(`failed to subscribe to store ${orgUid}`); - } - - // wait 5 secs to give RPC a break - await new Promise((resolve) => setTimeout(resolve, 5000)); - } - - if (!subscribedToDataModelVersionStore) { - logger.info( - `datalayer is not subscribed to datamodel version store ${dataModelStoreId}, subscribing ...`, - ); - - const result = await subscribeToStoreOnDataLayer( - dataModelStoreId, - true, + } catch (error) { + logger.error( + `check-organization-subscriptions task error while processing org ${organization?.orgUid}. Error: ${error.message}`, ); - if (result) { - logger.info(`subscribed to store ${dataModelStoreId}`); - } else { - throw new Error(`failed to subscribe to store ${dataModelStoreId}`); - } - - // wait 5 secs to give RPC a break - await new Promise((resolve) => setTimeout(resolve, 5000)); - } - - if (!subscribedToRegistryStore) { - logger.info( - `datalayer is not subscribed to registryId store ${registryId}, subscribing ...`, - ); - - const result = await subscribeToStoreOnDataLayer(registryId); - if (result) { - logger.info(`subscribed to store ${registryId}`); - } else { - throw new Error(`failed to subscribe to store ${registryId}`); - } - - // wait 5 secs to give RPC a break - await new Promise((resolve) => setTimeout(resolve, 5000)); } } } catch (error) { logger.error( - `check-organization-subscriptions task encountered an error: ${error.message}`, + `check-organization-subscriptions task encountered an error and could not complete: ${error.message}`, ); } }); diff --git a/src/tasks/sync-registries.js b/src/tasks/sync-registries.js index 871ff1c2..9c9e0664 100644 --- a/src/tasks/sync-registries.js +++ b/src/tasks/sync-registries.js @@ -250,7 +250,7 @@ const syncOrganizationAudit = async (organization) => { rootHistory.length - 1 !== sync_status?.generation ) { logger.debug( - `the root history length does not match the number of synced generations for ${organization.name} (registry store Id ${organization.registryId}). pausing the sync for this organization until the root history length and `, + `the root history length does not match the number of synced generations for ${organization.name} (registry store Id ${organization.registryId}). pausing the sync for this organization until the root history length and number of synced generations match`, ); return; } else if (