From 2d5e348c0f4a365c920a9b220db4486a56f20612 Mon Sep 17 00:00:00 2001 From: sarthak-metron Date: Tue, 11 Jun 2024 10:42:07 +0530 Subject: [PATCH] updated the code as data is coming is mock data --- src/steps/data-flow/client.ts | 32 ++++++++++++- src/steps/data-flow/constants.ts | 11 +++-- src/steps/data-flow/converters.ts | 8 ++-- src/steps/data-flow/index.ts | 79 +++++++++++++++---------------- 4 files changed, 80 insertions(+), 50 deletions(-) diff --git a/src/steps/data-flow/client.ts b/src/steps/data-flow/client.ts index 2f11fffc..fbdb31e7 100644 --- a/src/steps/data-flow/client.ts +++ b/src/steps/data-flow/client.ts @@ -15,12 +15,12 @@ export class dataFlowClient extends Client { const auth = await this.getAuthenticatedServiceClient(); // Iterate over each region - for (const region of googleCloudRegions) { + //for (const region of googleCloudRegions) { await this.iterateApi( async (nextPageToken) => { return this.client.projects.locations.jobs.list({ projectId: this.projectId, - location: region.name, + location: 'us-east1', auth, pageToken: nextPageToken, }); @@ -33,6 +33,34 @@ export class dataFlowClient extends Client { STEP_GOOGLE_CLOUD_DATAFLOW, DataFlowPermissions.STEP_GOOGLE_CLOUD_DATAFLOW_JOB, ); + //} + } + + async iterateGoogleCloudDataFlowSnapshot( + callback: (data: dataflow_v1b3.Schema$Snapshot) => Promise, + ) { + const auth = await this.getAuthenticatedServiceClient(); + + // Iterate over each region + for (const region of googleCloudRegions) { + await this.iterateApi( + async () => { + const req=await this.client.projects.locations.snapshots.list({ + projectId: this.projectId, + location: region.name, + auth, + }); + console.log(req+" .........................") + return req + }, + async (data: dataflow_v1b3.Schema$ListSnapshotsResponse) => { + for (const snapshot of data.snapshots || []) { + await callback(snapshot); + } + }, + STEP_GOOGLE_CLOUD_DATAFLOW, + DataFlowPermissions.STEP_GOOGLE_CLOUD_DATAFLOW_JOB, + ); } } } diff --git a/src/steps/data-flow/constants.ts b/src/steps/data-flow/constants.ts index 07d083a4..881d5061 100644 --- a/src/steps/data-flow/constants.ts +++ b/src/steps/data-flow/constants.ts @@ -1,9 +1,9 @@ export const STEP_GOOGLE_CLOUD_DATAFLOW_DATASTORE = 'fetch-google-cloud-dataflow-datastore'; -export const GOOGLE_CLOUD_DATAFLOW_DATASTORE_CLASS = 'Datastore'; +export const GOOGLE_CLOUD_DATAFLOW_DATASTORE_CLASS = ['DataStore']; export const GOOGLE_CLOUD_DATAFLOW_DATASTORE_TYPE = 'google_cloud_dataflow_datastore'; export const STEP_GOOGLE_CLOUD_DATAFLOW = 'fetch-google-cloud-dataflow'; -export const GOOGLE_CLOUD_DATAFLOW_CLASS = 'Service'; +export const GOOGLE_CLOUD_DATAFLOW_CLASS = ['Service']; export const GOOGLE_CLOUD_DATAFLOW_TYPE = 'google_cloud_dataflow'; export const STEP_GOOGLE_CLOUD_DATAFLOW_JOB = 'fetch-google-cloud-dataflow-job'; @@ -11,13 +11,13 @@ export const GOOGLE_CLOUD_DATAFLOW_JOB_CLASS = ['Workflow']; export const GOOGLE_CLOUD_DATAFLOW_JOB_TYPE = 'google_cloud_dataflow_job'; export const STEP_GOOGLE_CLOUD_DATAFLOW_SNAPSHOT = 'fetch-google-cloud-dataflow-snapshot'; -export const GOOGLE_CLOUD_DATAFLOW_SNAPSHOT_CLASS = 'Database, DataStore, Image, Backup'; +export const GOOGLE_CLOUD_DATAFLOW_SNAPSHOT_CLASS = ['Database', 'DataStore', 'Image', 'Backup']; export const GOOGLE_CLOUD_DATAFLOW_SNAPSHOT_TYPE = 'google_cloud_dataflow_snapshot'; export const STEP_GOOGLE_CLOUD_PROJECT_HAS_GOOGLE_CLOUD_DATAFLOW_DATASTORE = 'fetch-project-has-google-cloud-dataflow-datastore'; export const RELATIONSHIP_TYPE_GOOGLE_CLOUD_PROJECT_HAS_GOOGLE_CLOUD_DATAFLOW_DATASTORE = - 'google_cloud_project_has_google_cloud_dataflow_datastore'; + 'google_cloud_project_has_dataflow_datastore'; export const STEP_GOOGLE_CLOUD_PROJECT_HAS_GOOGLE_CLOUD_DATAFLOW = 'fetch-project-has-google-cloud-dataflow'; @@ -32,7 +32,7 @@ export const RELATIONSHIP_TYPE_GOOGLE_CLOUD_PROJECT_HAS_GOOGLE_CLOUD_DATAFLOW_JO export const STEP_GOOGLE_CLOUD_DATAFLOW_JOB_USES_GOOGLE_CLOUD_DATAFLOW_DATASTORE = 'fetch-google-cloud-dataflow-job-uses-google-cloud-dataflow-datastore'; export const RELATIONSHIP_TYPE_GOOGLE_CLOUD_DATAFLOW_JOB_USES_GOOGLE_CLOUD_DATAFLOW_DATASTORE = - 'google_cloud_dataflow_job_uses_google_cloud_dataflow_datastore'; + 'google_cloud_dataflow_job_uses_datastore'; export const STEP_GOOGLE_CLOUD_DATAFLOW_JOB_HAS_GOOGLE_CLOUD_DATAFLOW_SNAPSHOT = 'fetch-google-cloud-dataflow-job-has-google-cloud-dataflow-snapshot'; @@ -123,4 +123,5 @@ export const DataflowIngestionConfig = { export const DataFlowPermissions = { STEP_GOOGLE_CLOUD_DATAFLOW_JOB: ['dataflow.jobs.list'], + STEP_GOOGLE_CLOUD_DATAFLOW_SNAPSHOT: ['dataflow.snapshots.list'] }; diff --git a/src/steps/data-flow/converters.ts b/src/steps/data-flow/converters.ts index 3a01478c..38e8bc6f 100644 --- a/src/steps/data-flow/converters.ts +++ b/src/steps/data-flow/converters.ts @@ -35,7 +35,7 @@ export function createGoogleCloudDataFlowJobEntity( id: data.id as string, name: data.name, displayName: data.name as string, - description: data.environment as string, + description: data.jobMetadata?.pubsubDetails as string[], projectId: data.projectId, type: data.type, stepLocation: data.stepsLocation, @@ -52,7 +52,8 @@ export function createGoogleCloudDataFlowJobEntity( satisfiesPzs: data.satisfiesPzs, satisfiesPzi: data.satisfiesPzi, maxNumWorkers: data.runtimeUpdatableParams?.maxNumWorkers, - minNumWorkers: data.runtimeUpdatableParams?.minNumWorkers + minNumWorkers: data.runtimeUpdatableParams?.minNumWorkers, + datastoreDetails:data.jobMetadata?.datastoreDetails as string[], }, }, }); @@ -65,12 +66,13 @@ export function createGoogleCloudDataFlowDataStoreEntity( entityData: { source: data, assign: { - _key: data.namespace as string, + _key: (data.namespace+"/"+data.projectId) as string, _type: GOOGLE_CLOUD_DATAFLOW_DATASTORE_TYPE, _class: GOOGLE_CLOUD_DATAFLOW_DATASTORE_CLASS, projectId: data.projectId as string, name: data.namespace, encrypted: false, + classification: 'true' }, }, }); diff --git a/src/steps/data-flow/index.ts b/src/steps/data-flow/index.ts index 3d0cca20..610a0f6e 100644 --- a/src/steps/data-flow/index.ts +++ b/src/steps/data-flow/index.ts @@ -1,6 +1,5 @@ import { createDirectRelationship, - getRawData, IntegrationMissingKeyError, RelationshipClass, } from '@jupiterone/integration-sdk-core'; @@ -32,20 +31,17 @@ import { GOOGLE_CLOUD_DATAFLOW_SNAPSHOT_CLASS, STEP_GOOGLE_CLOUD_DATAFLOW_SNAPSHOT_USES_GOOGLE_PUBSUB_TOPIC, RELATIONSHIP_TYPE_GOOGLE_CLOUD_DATAFLOW_SNAPSHOT_USES_GOOGLE_PUBSUB_TOPIC, - STEP_GOOGLE_CLOUD_DATAFLOW_JOB_HAS_GOOGLE_CLOUD_DATAFLOW_SNAPSHOT, - RELATIONSHIP_TYPE_GOOGLE_CLOUD_DATAFLOW_JOB_HAS_GOOGLE_CLOUD_DATAFLOW_SNAPSHOT, STEP_GOOGLE_CLOUD_DATAFLOW_USES_GOOGLE_SPANNER_INSTANCE, - RELATIONSHIP_TYPE_GOOGLE_CLOUD_DATAFLOW_USES_GOOGLE_SPANNER_INSTANCE + RELATIONSHIP_TYPE_GOOGLE_CLOUD_DATAFLOW_USES_GOOGLE_SPANNER_INSTANCE, + GOOGLE_CLOUD_DATAFLOW_JOB_CLASS } from './constants'; import { createGoogleCloudDataFlowEntity, createGoogleCloudDataFlowDataStoreEntity, createGoogleCloudDataFlowJobEntity, - createGoogleCloudDataFlowSnapshotEntity, } from './converters'; import { PROJECT_ENTITY_TYPE, STEP_RESOURCE_MANAGER_PROJECT } from '../resource-manager'; import { getProjectEntity } from '../../utils/project'; -import { dataflow_v1b3 } from 'googleapis'; import { ENTITY_TYPE_PUBSUB_TOPIC, STEP_PUBSUB_TOPICS } from '../pub-sub/constants'; import { ENTITY_TYPE_SPANNER_INSTANCE, STEP_SPANNER_INSTANCES } from '../spanner/constants'; @@ -58,13 +54,16 @@ export async function fetchGoogleCloudDataFlowDataStore( await jobState.iterateEntities( { _type: GOOGLE_CLOUD_DATAFLOW_JOB_TYPE }, - async (dataflowEntity) => { - const dataflow = getRawData(dataflowEntity); + async (dataflowEntity) => { - if (dataflow?.jobMetadata?.datastoreDetails) { - await jobState.addEntity(createGoogleCloudDataFlowDataStoreEntity(dataflow)); + const dataflowDataStore = dataflowEntity.datastoreDetails as any; + for(const datastore in dataflowDataStore){ + if (datastore) { + const dataflow=dataflowEntity.datastoreDetails[datastore] + await jobState.addEntity(createGoogleCloudDataFlowDataStoreEntity(dataflow as any)); } - }, + } + } ); } @@ -101,24 +100,16 @@ export async function fetchGoogleCloudDataFlowSnapshot( context: IntegrationStepContext, ): Promise { const { - jobState, instance: { config }, logger, } = context; const client = new dataFlowClient({ config }, logger); - await jobState.iterateEntities( - { _type: GOOGLE_CLOUD_DATAFLOW_JOB_TYPE }, - async (dataflowEntity) => { - const dataflow = getRawData(dataflowEntity); - - if (dataflow?.snapshots) { - for (const snapshot of dataflow.snapshots) { - await jobState.addEntity(createGoogleCloudDataFlowSnapshotEntity(snapshot, client.projectId)); - } - } - }, - ); + console.log(";;;;;;;;") + await client.iterateGoogleCloudDataFlowSnapshot(async (snapshot) => { + console.log(snapshot+"''''''''") + } +) } @@ -133,13 +124,16 @@ export async function buildProjectHasDataflowDatastoreRelationship( await jobState.iterateEntities( { _type: GOOGLE_CLOUD_DATAFLOW_DATASTORE_TYPE }, - async (dataflowJob) => { + async (DataStore) => { + const projectId=projectEntity.projectId as string + + if(DataStore.projectId === projectId) await jobState.addRelationship( createDirectRelationship({ _class: RelationshipClass.HAS, fromKey: projectEntity._key as string, fromType: PROJECT_ENTITY_TYPE, - toKey: dataflowJob._key as string, + toKey: DataStore._key as string, toType: GOOGLE_CLOUD_DATAFLOW_DATASTORE_TYPE, }), ); @@ -203,30 +197,35 @@ export async function buildDataflowUsesDataflowDatastoreRelationship( const { jobState } = executionContext; await jobState.iterateEntities( - { _type: GOOGLE_CLOUD_DATAFLOW_DATASTORE_TYPE }, - async (datastoreEntity) => { - const dataflowJobKey = datastoreEntity.jobId as string + { _type: GOOGLE_CLOUD_DATAFLOW_JOB_TYPE }, + async (dataflowEntity) => { + const dataflowDataStore = dataflowEntity.datastoreDetails as any; + for(const datastore in dataflowDataStore){ - const hasDataflowJobKey = jobState.hasKey(dataflowJobKey); + const dataflow=dataflowEntity.datastoreDetails[datastore] + const dataStoreKey = (dataflow.namespace+"/"+dataflow.projectId) as string - if (!hasDataflowJobKey) { + const hasDataStoreKey = jobState.hasKey(dataStoreKey); + + if (!hasDataStoreKey) { throw new IntegrationMissingKeyError( `Cannot build Relationship. Error: Missing Key. - dataflowJobKey : ${dataflowJobKey}`, + dataflowJobKey : ${dataStoreKey}`, ); } - + else{ await jobState.addRelationship( createDirectRelationship({ - _class: RelationshipClass.HAS, - fromKey: dataflowJobKey, + _class: RelationshipClass.USES, + fromKey: dataflowEntity._key as string, fromType: GOOGLE_CLOUD_DATAFLOW_JOB_TYPE, - toKey: datastoreEntity._key, + toKey: dataStoreKey as string, toType: GOOGLE_CLOUD_DATAFLOW_DATASTORE_TYPE, }), ); - }, + }} + } ); } @@ -323,7 +322,7 @@ export const dataFlowSteps: GoogleCloudIntegrationStep[] = [ { resourceName: 'Google Cloud Dataflow Job', _type: GOOGLE_CLOUD_DATAFLOW_JOB_TYPE, - _class: ['Workflow'], + _class: GOOGLE_CLOUD_DATAFLOW_JOB_CLASS, }, ], relationships: [], @@ -421,7 +420,7 @@ export const dataFlowSteps: GoogleCloudIntegrationStep[] = [ entities: [], relationships: [ { - _class: RelationshipClass.HAS, + _class: RelationshipClass.USES, _type: RELATIONSHIP_TYPE_GOOGLE_CLOUD_DATAFLOW_JOB_USES_GOOGLE_CLOUD_DATAFLOW_DATASTORE, sourceType: GOOGLE_CLOUD_DATAFLOW_JOB_TYPE, targetType: GOOGLE_CLOUD_DATAFLOW_DATASTORE_TYPE, @@ -447,7 +446,7 @@ export const dataFlowSteps: GoogleCloudIntegrationStep[] = [ }, ], relationships: [], - dependsOn: [STEP_GOOGLE_CLOUD_DATAFLOW_JOB], + dependsOn: [], executionHandler: fetchGoogleCloudDataFlowSnapshot, apis: ['dataflow.googleapis.com'], },