Skip to content
This repository has been archived by the owner on Jun 25, 2024. It is now read-only.

Commit

Permalink
updated the code as data is coming is mock data
Browse files Browse the repository at this point in the history
  • Loading branch information
sarthak-metron committed Jun 11, 2024
1 parent e2107db commit 2d5e348
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 50 deletions.
32 changes: 30 additions & 2 deletions src/steps/data-flow/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ export class dataFlowClient extends Client {
const auth = await this.getAuthenticatedServiceClient();

// Iterate over each region
for (const region of googleCloudRegions) {
//for (const region of googleCloudRegions) {
await this.iterateApi(
async (nextPageToken) => {
return this.client.projects.locations.jobs.list({
projectId: this.projectId,
location: region.name,
location: 'us-east1',
auth,
pageToken: nextPageToken,
});
Expand All @@ -33,6 +33,34 @@ export class dataFlowClient extends Client {
STEP_GOOGLE_CLOUD_DATAFLOW,
DataFlowPermissions.STEP_GOOGLE_CLOUD_DATAFLOW_JOB,
);
//}
}

async iterateGoogleCloudDataFlowSnapshot(
callback: (data: dataflow_v1b3.Schema$Snapshot) => Promise<void>,
) {
const auth = await this.getAuthenticatedServiceClient();

// Iterate over each region
for (const region of googleCloudRegions) {
await this.iterateApi(
async () => {
const req=await this.client.projects.locations.snapshots.list({
projectId: this.projectId,
location: region.name,
auth,
});
console.log(req+" .........................")

Check failure on line 53 in src/steps/data-flow/client.ts

View workflow job for this annotation

GitHub Actions / test

Unexpected console statement
return req
},
async (data: dataflow_v1b3.Schema$ListSnapshotsResponse) => {
for (const snapshot of data.snapshots || []) {
await callback(snapshot);
}
},
STEP_GOOGLE_CLOUD_DATAFLOW,
DataFlowPermissions.STEP_GOOGLE_CLOUD_DATAFLOW_JOB,
);
}
}
}
11 changes: 6 additions & 5 deletions src/steps/data-flow/constants.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
export const STEP_GOOGLE_CLOUD_DATAFLOW_DATASTORE = 'fetch-google-cloud-dataflow-datastore';
export const GOOGLE_CLOUD_DATAFLOW_DATASTORE_CLASS = 'Datastore';
export const GOOGLE_CLOUD_DATAFLOW_DATASTORE_CLASS = ['DataStore'];
export const GOOGLE_CLOUD_DATAFLOW_DATASTORE_TYPE = 'google_cloud_dataflow_datastore';

export const STEP_GOOGLE_CLOUD_DATAFLOW = 'fetch-google-cloud-dataflow';
export const GOOGLE_CLOUD_DATAFLOW_CLASS = 'Service';
export const GOOGLE_CLOUD_DATAFLOW_CLASS = ['Service'];
export const GOOGLE_CLOUD_DATAFLOW_TYPE = 'google_cloud_dataflow';

export const STEP_GOOGLE_CLOUD_DATAFLOW_JOB = 'fetch-google-cloud-dataflow-job';
export const GOOGLE_CLOUD_DATAFLOW_JOB_CLASS = ['Workflow'];
export const GOOGLE_CLOUD_DATAFLOW_JOB_TYPE = 'google_cloud_dataflow_job';

export const STEP_GOOGLE_CLOUD_DATAFLOW_SNAPSHOT = 'fetch-google-cloud-dataflow-snapshot';
export const GOOGLE_CLOUD_DATAFLOW_SNAPSHOT_CLASS = 'Database, DataStore, Image, Backup';
export const GOOGLE_CLOUD_DATAFLOW_SNAPSHOT_CLASS = ['Database', 'DataStore', 'Image', 'Backup'];
export const GOOGLE_CLOUD_DATAFLOW_SNAPSHOT_TYPE = 'google_cloud_dataflow_snapshot';

export const STEP_GOOGLE_CLOUD_PROJECT_HAS_GOOGLE_CLOUD_DATAFLOW_DATASTORE =
'fetch-project-has-google-cloud-dataflow-datastore';
export const RELATIONSHIP_TYPE_GOOGLE_CLOUD_PROJECT_HAS_GOOGLE_CLOUD_DATAFLOW_DATASTORE =
'google_cloud_project_has_google_cloud_dataflow_datastore';
'google_cloud_project_has_dataflow_datastore';

export const STEP_GOOGLE_CLOUD_PROJECT_HAS_GOOGLE_CLOUD_DATAFLOW =
'fetch-project-has-google-cloud-dataflow';
Expand All @@ -32,7 +32,7 @@ export const RELATIONSHIP_TYPE_GOOGLE_CLOUD_PROJECT_HAS_GOOGLE_CLOUD_DATAFLOW_JO
export const STEP_GOOGLE_CLOUD_DATAFLOW_JOB_USES_GOOGLE_CLOUD_DATAFLOW_DATASTORE =
'fetch-google-cloud-dataflow-job-uses-google-cloud-dataflow-datastore';
export const RELATIONSHIP_TYPE_GOOGLE_CLOUD_DATAFLOW_JOB_USES_GOOGLE_CLOUD_DATAFLOW_DATASTORE =
'google_cloud_dataflow_job_uses_google_cloud_dataflow_datastore';
'google_cloud_dataflow_job_uses_datastore';

export const STEP_GOOGLE_CLOUD_DATAFLOW_JOB_HAS_GOOGLE_CLOUD_DATAFLOW_SNAPSHOT =
'fetch-google-cloud-dataflow-job-has-google-cloud-dataflow-snapshot';
Expand Down Expand Up @@ -123,4 +123,5 @@ export const DataflowIngestionConfig = {

export const DataFlowPermissions = {
STEP_GOOGLE_CLOUD_DATAFLOW_JOB: ['dataflow.jobs.list'],
STEP_GOOGLE_CLOUD_DATAFLOW_SNAPSHOT: ['dataflow.snapshots.list']
};
8 changes: 5 additions & 3 deletions src/steps/data-flow/converters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ export function createGoogleCloudDataFlowJobEntity(
id: data.id as string,
name: data.name,
displayName: data.name as string,
description: data.environment as string,
description: data.jobMetadata?.pubsubDetails as string[],
projectId: data.projectId,
type: data.type,
stepLocation: data.stepsLocation,
Expand All @@ -52,7 +52,8 @@ export function createGoogleCloudDataFlowJobEntity(
satisfiesPzs: data.satisfiesPzs,
satisfiesPzi: data.satisfiesPzi,
maxNumWorkers: data.runtimeUpdatableParams?.maxNumWorkers,
minNumWorkers: data.runtimeUpdatableParams?.minNumWorkers
minNumWorkers: data.runtimeUpdatableParams?.minNumWorkers,
datastoreDetails:data.jobMetadata?.datastoreDetails as string[],
},
},
});
Expand All @@ -65,12 +66,13 @@ export function createGoogleCloudDataFlowDataStoreEntity(
entityData: {
source: data,
assign: {
_key: data.namespace as string,
_key: (data.namespace+"/"+data.projectId) as string,
_type: GOOGLE_CLOUD_DATAFLOW_DATASTORE_TYPE,
_class: GOOGLE_CLOUD_DATAFLOW_DATASTORE_CLASS,
projectId: data.projectId as string,
name: data.namespace,
encrypted: false,
classification: 'true'
},
},
});
Expand Down
79 changes: 39 additions & 40 deletions src/steps/data-flow/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import {
createDirectRelationship,
getRawData,
IntegrationMissingKeyError,
RelationshipClass,
} from '@jupiterone/integration-sdk-core';
Expand Down Expand Up @@ -32,20 +31,17 @@ import {
GOOGLE_CLOUD_DATAFLOW_SNAPSHOT_CLASS,
STEP_GOOGLE_CLOUD_DATAFLOW_SNAPSHOT_USES_GOOGLE_PUBSUB_TOPIC,
RELATIONSHIP_TYPE_GOOGLE_CLOUD_DATAFLOW_SNAPSHOT_USES_GOOGLE_PUBSUB_TOPIC,
STEP_GOOGLE_CLOUD_DATAFLOW_JOB_HAS_GOOGLE_CLOUD_DATAFLOW_SNAPSHOT,
RELATIONSHIP_TYPE_GOOGLE_CLOUD_DATAFLOW_JOB_HAS_GOOGLE_CLOUD_DATAFLOW_SNAPSHOT,
STEP_GOOGLE_CLOUD_DATAFLOW_USES_GOOGLE_SPANNER_INSTANCE,
RELATIONSHIP_TYPE_GOOGLE_CLOUD_DATAFLOW_USES_GOOGLE_SPANNER_INSTANCE
RELATIONSHIP_TYPE_GOOGLE_CLOUD_DATAFLOW_USES_GOOGLE_SPANNER_INSTANCE,
GOOGLE_CLOUD_DATAFLOW_JOB_CLASS
} from './constants';
import {
createGoogleCloudDataFlowEntity,
createGoogleCloudDataFlowDataStoreEntity,
createGoogleCloudDataFlowJobEntity,
createGoogleCloudDataFlowSnapshotEntity,
} from './converters';
import { PROJECT_ENTITY_TYPE, STEP_RESOURCE_MANAGER_PROJECT } from '../resource-manager';
import { getProjectEntity } from '../../utils/project';
import { dataflow_v1b3 } from 'googleapis';
import { ENTITY_TYPE_PUBSUB_TOPIC, STEP_PUBSUB_TOPICS } from '../pub-sub/constants';
import { ENTITY_TYPE_SPANNER_INSTANCE, STEP_SPANNER_INSTANCES } from '../spanner/constants';

Expand All @@ -58,13 +54,16 @@ export async function fetchGoogleCloudDataFlowDataStore(

await jobState.iterateEntities(
{ _type: GOOGLE_CLOUD_DATAFLOW_JOB_TYPE },
async (dataflowEntity) => {
const dataflow = getRawData<dataflow_v1b3.Schema$Job>(dataflowEntity);
async (dataflowEntity) => {

if (dataflow?.jobMetadata?.datastoreDetails) {
await jobState.addEntity(createGoogleCloudDataFlowDataStoreEntity(dataflow));
const dataflowDataStore = dataflowEntity.datastoreDetails as any;
for(const datastore in dataflowDataStore){
if (datastore) {
const dataflow=dataflowEntity.datastoreDetails[datastore]
await jobState.addEntity(createGoogleCloudDataFlowDataStoreEntity(dataflow as any));
}
},
}
}
);
}

Expand Down Expand Up @@ -101,24 +100,16 @@ export async function fetchGoogleCloudDataFlowSnapshot(
context: IntegrationStepContext,
): Promise<void> {
const {
jobState,
instance: { config },
logger,
} = context;

const client = new dataFlowClient({ config }, logger);
await jobState.iterateEntities(
{ _type: GOOGLE_CLOUD_DATAFLOW_JOB_TYPE },
async (dataflowEntity) => {
const dataflow = getRawData<dataflow_v1b3.Schema$ListSnapshotsResponse>(dataflowEntity);

if (dataflow?.snapshots) {
for (const snapshot of dataflow.snapshots) {
await jobState.addEntity(createGoogleCloudDataFlowSnapshotEntity(snapshot, client.projectId));
}
}
},
);
console.log(";;;;;;;;")

Check failure on line 108 in src/steps/data-flow/index.ts

View workflow job for this annotation

GitHub Actions / test

Unexpected console statement
await client.iterateGoogleCloudDataFlowSnapshot(async (snapshot) => {

Check failure on line 109 in src/steps/data-flow/index.ts

View workflow job for this annotation

GitHub Actions / test

Async arrow function has no 'await' expression
console.log(snapshot+"''''''''")

Check failure on line 110 in src/steps/data-flow/index.ts

View workflow job for this annotation

GitHub Actions / test

Unexpected console statement
}
)
}


Expand All @@ -133,13 +124,16 @@ export async function buildProjectHasDataflowDatastoreRelationship(

await jobState.iterateEntities(
{ _type: GOOGLE_CLOUD_DATAFLOW_DATASTORE_TYPE },
async (dataflowJob) => {
async (DataStore) => {
const projectId=projectEntity.projectId as string

if(DataStore.projectId === projectId)
await jobState.addRelationship(
createDirectRelationship({
_class: RelationshipClass.HAS,
fromKey: projectEntity._key as string,
fromType: PROJECT_ENTITY_TYPE,
toKey: dataflowJob._key as string,
toKey: DataStore._key as string,
toType: GOOGLE_CLOUD_DATAFLOW_DATASTORE_TYPE,
}),
);
Expand Down Expand Up @@ -203,30 +197,35 @@ export async function buildDataflowUsesDataflowDatastoreRelationship(
const { jobState } = executionContext;

await jobState.iterateEntities(
{ _type: GOOGLE_CLOUD_DATAFLOW_DATASTORE_TYPE },
async (datastoreEntity) => {
const dataflowJobKey = datastoreEntity.jobId as string
{ _type: GOOGLE_CLOUD_DATAFLOW_JOB_TYPE },
async (dataflowEntity) => {
const dataflowDataStore = dataflowEntity.datastoreDetails as any;
for(const datastore in dataflowDataStore){

const hasDataflowJobKey = jobState.hasKey(dataflowJobKey);
const dataflow=dataflowEntity.datastoreDetails[datastore]
const dataStoreKey = (dataflow.namespace+"/"+dataflow.projectId) as string

if (!hasDataflowJobKey) {
const hasDataStoreKey = jobState.hasKey(dataStoreKey);

if (!hasDataStoreKey) {
throw new IntegrationMissingKeyError(
`Cannot build Relationship.
Error: Missing Key.
dataflowJobKey : ${dataflowJobKey}`,
dataflowJobKey : ${dataStoreKey}`,
);
}

else{
await jobState.addRelationship(
createDirectRelationship({
_class: RelationshipClass.HAS,
fromKey: dataflowJobKey,
_class: RelationshipClass.USES,
fromKey: dataflowEntity._key as string,
fromType: GOOGLE_CLOUD_DATAFLOW_JOB_TYPE,
toKey: datastoreEntity._key,
toKey: dataStoreKey as string,
toType: GOOGLE_CLOUD_DATAFLOW_DATASTORE_TYPE,
}),
);
},
}}
}
);
}

Expand Down Expand Up @@ -323,7 +322,7 @@ export const dataFlowSteps: GoogleCloudIntegrationStep[] = [
{
resourceName: 'Google Cloud Dataflow Job',
_type: GOOGLE_CLOUD_DATAFLOW_JOB_TYPE,
_class: ['Workflow'],
_class: GOOGLE_CLOUD_DATAFLOW_JOB_CLASS,
},
],
relationships: [],
Expand Down Expand Up @@ -421,7 +420,7 @@ export const dataFlowSteps: GoogleCloudIntegrationStep[] = [
entities: [],
relationships: [
{
_class: RelationshipClass.HAS,
_class: RelationshipClass.USES,
_type: RELATIONSHIP_TYPE_GOOGLE_CLOUD_DATAFLOW_JOB_USES_GOOGLE_CLOUD_DATAFLOW_DATASTORE,
sourceType: GOOGLE_CLOUD_DATAFLOW_JOB_TYPE,
targetType: GOOGLE_CLOUD_DATAFLOW_DATASTORE_TYPE,
Expand All @@ -447,7 +446,7 @@ export const dataFlowSteps: GoogleCloudIntegrationStep[] = [
},
],
relationships: [],
dependsOn: [STEP_GOOGLE_CLOUD_DATAFLOW_JOB],
dependsOn: [],
executionHandler: fetchGoogleCloudDataFlowSnapshot,
apis: ['dataflow.googleapis.com'],
},
Expand Down

0 comments on commit 2d5e348

Please sign in to comment.