Skip to content

Commit

Permalink
feat: check organization subsubscriptions improved
Browse files Browse the repository at this point in the history
fix: removed rpc call exceeding max response
feat: improved logging
  • Loading branch information
wwills2 committed Dec 10, 2024
1 parent e228d5a commit 71a1aa2
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 125 deletions.
6 changes: 3 additions & 3 deletions src/datalayer/persistance.js
Original file line number Diff line number Diff line change
Expand Up @@ -357,14 +357,14 @@ const getStoreData = async (storeId, rootHash) => {
const data = response.body;

if (data.success) {
if (!_.isEmpty(data.keys_values)) {
if (_.isEmpty(data.keys_values)) {
logger.warn(
`datalayer get_keys_values returned no data for store ${storeId} at root hash: ${rootHash || 'latest'}`,
);
}
return data;
} else {
throw new Error(`${data}`);
throw new Error(JSON.stringify(data));
}
} catch (error) {
logger.error(
Expand Down Expand Up @@ -396,7 +396,7 @@ const getRoot = async (storeId, ignoreEmptyStore = false) => {

const { confirmed, hash } = response.body;
logger.debug(
`the current root data for store ${storeId} is ${response.body}`,
`the current root data for store ${storeId} is ${JSON.stringify(response.body)}`,
);

if (confirmed && (ignoreEmptyStore || !hash.includes('0x00000000000'))) {
Expand Down
29 changes: 12 additions & 17 deletions src/models/organizations/organizations.model.js
Original file line number Diff line number Diff line change
Expand Up @@ -420,23 +420,12 @@ class Organization extends Model {
}

logger.debug(`checking registry store singleton for org ${orgUid}`);
let registryData = null;
while (!registryData) {
try {
registryData = await datalayer.getSubscribedStoreData(registryStoreId);
if (!registryData) {
throw new Error(
`failed to get data from registry store ${registryStoreId}`,
);
}
} catch (error) {
if (reachedTimeout()) {
onTimeout(error);
}
logger.debug(`${error.message}. RETRYING`);
} finally {
await new Promise((resolve) => setTimeout(resolve, 300));
}
const subscribedToRegistryStore =
await datalayer.subscribeToStoreOnDataLayer(registryStoreId);
if (!subscribedToRegistryStore) {
throw new Error(
`failed to subscribe to or validate subscription for registry store ${registryStoreId}`,
);
}

if (AUTO_SUBSCRIBE_FILESTORE) {
Expand Down Expand Up @@ -545,8 +534,14 @@ class Organization extends Model {
const exists = await Organization.findOne({
where: { orgUid: org.orgUid },
});
logger.debug(
`sync dafault orgs task checking default org ${org.orgUid}`,
);

if (!exists) {
logger.debug(
`default organization ${org.orgUid} was not found in the organizations table. running the import process to correct`,
);
await Organization.importOrganization(org.orgUid);
}
}
Expand Down
139 changes: 35 additions & 104 deletions src/tasks/check-organization-subscriptions.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,8 @@
import { SimpleIntervalJob, Task } from 'toad-scheduler';
import { getConfig } from '../utils/config-loader.js';
import { Meta, Organization } from '../models/index.js';
import {
getOwnedStores,
getSubscriptions,
subscribeToStoreOnDataLayer,
} from '../datalayer/persistance.js';
import { getOwnedStores } from '../datalayer/persistance.js';
import { logger } from '../config/logger.js';
import datalayer from '../datalayer/index.js';

const CONFIG = getConfig();

Expand All @@ -31,118 +26,54 @@ const task = new Task('check-organization-subscriptions', async () => {

try {
const organizations = await Organization.findAll();
const subscribedStores = await getSubscriptions();
const ownedStores = await getOwnedStores();

if (!subscribedStores?.success) {
throw new Error('failed to get subscriptions from datalayer');
}

for (const organization of organizations) {
const { orgUid, registryId, isHome, name } = organization;
const orgStoreData = await datalayer.getCurrentStoreData(orgUid);
try {
const { orgUid, registryId, isHome, name } = organization;

// note here the registryId is the datamodel version store
const dataModelStoreId = orgStoreData.find(
(record) => record?.key === 'registryId',
)?.value;

if (!dataModelStoreId) {
throw new Error(
`failed to retrieve datamodel version store id for organization ${name} (orgUid ${orgUid})`,
logger.debug(
`running the organization subscription process on organization ${name} (orgUid ${orgUid})`,
);
}

logger.debug(
`validating that datalayer is subscribed to org store ${orgUid}, datamodel version store ${dataModelStoreId}, and registry store ${registryId} belonging to ${name}`,
);

if (isHome) {
const homeOrgStoreOwned = ownedStores.storeIds.includes(orgUid);
const dataModelVersionStoreOwned =
ownedStores.storeIds.includes(dataModelStoreId);
const homeRegistryStoreOwned =
ownedStores.storeIds.includes(registryId);

if (!homeOrgStoreOwned) {
throw new Error(
`your wallet does not own your home organization store ${orgUid}. this is a serious issue that CADT cannot resolve`,
);
}
const datalayerOrganizationStoreIds =
await Organization.subscribeToOrganization(orgUid);

if (!dataModelVersionStoreOwned) {
throw new Error(
`your wallet does not own your home datamodel version store ${dataModelStoreId}. this is a serious issue that CADT cannot resolve`,
if (isHome) {
const homeOrgStoreOwned = ownedStores.storeIds.includes(orgUid);
const dataModelVersionStoreOwned = ownedStores.storeIds.includes(
datalayerOrganizationStoreIds.dataModelVersionStoreId,
);
const homeRegistryStoreOwned =
ownedStores.storeIds.includes(registryId);

if (!homeOrgStoreOwned) {
throw new Error(
`your wallet does not own your home organization store ${orgUid}. this is a serious issue that CADT cannot resolve`,
);
}

if (!dataModelVersionStoreOwned) {
throw new Error(
`your wallet does not own your home datamodel version store ${datalayerOrganizationStoreIds.dataModelVersionStoreId}. this is a serious issue that CADT cannot resolve`,
);
}

if (!homeRegistryStoreOwned) {
throw new Error(
`your wallet does not own your home registry store ${registryId}. this is a serious issue that CADT cannot resolve`,
);
}
}

if (!homeRegistryStoreOwned) {
throw new Error(
`your wallet does not own your home registry store ${registryId}. this is a serious issue that CADT cannot resolve`,
);
}
}

const subscribedToOrgStore = subscribedStores.storeIds.includes(orgUid);
const subscribedToDataModelVersionStore =
subscribedStores.storeIds.includes(dataModelStoreId);
const subscribedToRegistryStore =
subscribedStores.storeIds.includes(registryId);

if (!subscribedToOrgStore) {
logger.info(
`datalayer is not subscribed to orgUid store ${orgUid}, subscribing ...`,
);

const result = await subscribeToStoreOnDataLayer(orgUid);
if (result) {
logger.info(`subscribed to store ${orgUid}`);
} else {
throw new Error(`failed to subscribe to store ${orgUid}`);
}

// wait 5 secs to give RPC a break
await new Promise((resolve) => setTimeout(resolve, 5000));
}

if (!subscribedToDataModelVersionStore) {
logger.info(
`datalayer is not subscribed to datamodel version store ${dataModelStoreId}, subscribing ...`,
);

const result = await subscribeToStoreOnDataLayer(
dataModelStoreId,
true,
} catch (error) {
logger.error(
`check-organization-subscriptions task error while processing org ${organization?.orgUid}. Error: ${error.message}`,
);
if (result) {
logger.info(`subscribed to store ${dataModelStoreId}`);
} else {
throw new Error(`failed to subscribe to store ${dataModelStoreId}`);
}

// wait 5 secs to give RPC a break
await new Promise((resolve) => setTimeout(resolve, 5000));
}

if (!subscribedToRegistryStore) {
logger.info(
`datalayer is not subscribed to registryId store ${registryId}, subscribing ...`,
);

const result = await subscribeToStoreOnDataLayer(registryId);
if (result) {
logger.info(`subscribed to store ${registryId}`);
} else {
throw new Error(`failed to subscribe to store ${registryId}`);
}

// wait 5 secs to give RPC a break
await new Promise((resolve) => setTimeout(resolve, 5000));
}
}
} catch (error) {
logger.error(
`check-organization-subscriptions task encountered an error: ${error.message}`,
`check-organization-subscriptions task encountered an error and could not complete: ${error.message}`,
);
}
});
Expand Down
2 changes: 1 addition & 1 deletion src/tasks/sync-registries.js
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ const syncOrganizationAudit = async (organization) => {
rootHistory.length - 1 !== sync_status?.generation
) {
logger.debug(
`the root history length does not match the number of synced generations for ${organization.name} (registry store Id ${organization.registryId}). pausing the sync for this organization until the root history length and `,
`the root history length does not match the number of synced generations for ${organization.name} (registry store Id ${organization.registryId}). pausing the sync for this organization until the root history length and number of synced generations match`,
);
return;
} else if (
Expand Down

0 comments on commit 71a1aa2

Please sign in to comment.