Skip to content
This repository has been archived by the owner on Jun 25, 2024. It is now read-only.

Commit

Permalink
added relationships
Browse files Browse the repository at this point in the history
  • Loading branch information
poornima-metron committed May 22, 2024
1 parent 761fff4 commit e2107db
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 28 deletions.
3 changes: 3 additions & 0 deletions src/getStepStartStates.ts
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ import {
STEP_GOOGLE_CLOUD_DATAFLOW_JOB_USES_GOOGLE_CLOUD_DATAFLOW_DATASTORE,
// STEP_GOOGLE_CLOUD_DATAFLOW_SNAPSHOT_USES_GOOGLE_PUBSUB_TOPIC,
STEP_GOOGLE_CLOUD_PROJECT_HAS_GOOGLE_CLOUD_DATAFLOW,
STEP_GOOGLE_CLOUD_DATAFLOW_USES_GOOGLE_SPANNER_INSTANCE,
STEP_GOOGLE_CLOUD_PROJECT_HAS_GOOGLE_CLOUD_DATAFLOW_DATASTORE
} from './steps/data-flow/constants';

Expand Down Expand Up @@ -353,6 +354,7 @@ function getDefaultStepStartStates(params: {
// [STEP_GOOGLE_CLOUD_DATAFLOW_SNAPSHOT_USES_GOOGLE_PUBSUB_TOPIC]: { disabled: false },
[STEP_GOOGLE_CLOUD_PROJECT_HAS_GOOGLE_CLOUD_DATAFLOW]: { disabled: false },
[STEP_GOOGLE_CLOUD_PROJECT_HAS_GOOGLE_CLOUD_DATAFLOW_DATASTORE]: { disabled: false },
[STEP_GOOGLE_CLOUD_DATAFLOW_USES_GOOGLE_SPANNER_INSTANCE]: { disabled: false },
[STEP_COMPUTE_DISKS]: { disabled: false },
[STEP_COMPUTE_REGION_DISKS]: { disabled: false },
[STEP_COMPUTE_IMAGES]: { disabled: false },
Expand Down Expand Up @@ -678,6 +680,7 @@ async function getStepStartStatesUsingServiceEnablements(params: {
// [STEP_GOOGLE_CLOUD_DATAFLOW_SNAPSHOT_USES_GOOGLE_PUBSUB_TOPIC]: createStepStartState(ServiceUsageName.DATA_FLOW),
[STEP_GOOGLE_CLOUD_PROJECT_HAS_GOOGLE_CLOUD_DATAFLOW]: createStepStartState(ServiceUsageName.DATA_FLOW),
[STEP_GOOGLE_CLOUD_PROJECT_HAS_GOOGLE_CLOUD_DATAFLOW_DATASTORE]: createStepStartState(ServiceUsageName.DATA_FLOW),
[STEP_GOOGLE_CLOUD_DATAFLOW_USES_GOOGLE_SPANNER_INSTANCE]: createStepStartState(ServiceUsageName.DATA_FLOW),
[STEP_COMPUTE_DISKS]: createStepStartState(ServiceUsageName.COMPUTE),
[STEP_COMPUTE_REGION_DISKS]: createStepStartState(ServiceUsageName.COMPUTE),
[STEP_COMPUTE_IMAGES]: createStepStartState(ServiceUsageName.COMPUTE),
Expand Down
38 changes: 21 additions & 17 deletions src/steps/data-flow/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ import { Client } from '../../google-cloud/client';
import {
DataFlowPermissions,
STEP_GOOGLE_CLOUD_DATAFLOW,

} from './constants';
import { googleCloudRegions } from '../../google-cloud/regions'

export class dataFlowClient extends Client {
private client = google.dataflow({ version: 'v1b3', retry: false });
Expand All @@ -14,21 +14,25 @@ export class dataFlowClient extends Client {
) {
const auth = await this.getAuthenticatedServiceClient();

await this.iterateApi(
async (nextPageToken) => {
return this.client.projects.jobs.list({
projectId: this.projectId,
auth,
pageToken: nextPageToken,
});
},
async (data: dataflow_v1b3.Schema$ListJobsResponse) => {
for (const job of data.jobs || []) {
await callback(job);
}
},
STEP_GOOGLE_CLOUD_DATAFLOW,
DataFlowPermissions.STEP_GOOGLE_CLOUD_DATAFLOW_JOB,
);
// Iterate over each region
for (const region of googleCloudRegions) {
await this.iterateApi(
async (nextPageToken) => {
return this.client.projects.locations.jobs.list({
projectId: this.projectId,
location: region.name,
auth,
pageToken: nextPageToken,
});
},
async (data: dataflow_v1b3.Schema$ListJobsResponse) => {
for (const job of data.jobs || []) {
await callback(job);
}
},
STEP_GOOGLE_CLOUD_DATAFLOW,
DataFlowPermissions.STEP_GOOGLE_CLOUD_DATAFLOW_JOB,
);
}
}
}
13 changes: 12 additions & 1 deletion src/steps/data-flow/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ export const STEP_GOOGLE_CLOUD_DATAFLOW_JOB_HAS_GOOGLE_CLOUD_DATAFLOW_SNAPSHOT =
export const RELATIONSHIP_TYPE_GOOGLE_CLOUD_DATAFLOW_JOB_HAS_GOOGLE_CLOUD_DATAFLOW_SNAPSHOT =
'google_cloud_dataflow_job_uses_google_cloud_dataflow_snapshot';

export const STEP_GOOGLE_CLOUD_DATAFLOW_USES_GOOGLE_SPANNER_INSTANCE =
'fetch-google-cloud-dataflow-uses-google-spanner-instance';
export const RELATIONSHIP_TYPE_GOOGLE_CLOUD_DATAFLOW_USES_GOOGLE_SPANNER_INSTANCE =
'google_cloud_dataflow_uses_google_spanner_instance';

export const STEP_GOOGLE_CLOUD_DATAFLOW_SNAPSHOT_USES_GOOGLE_PUBSUB_TOPIC =
'fetch-google-cloud-dataflow-job-uses-google-pubsub-topic';
export const RELATIONSHIP_TYPE_GOOGLE_CLOUD_DATAFLOW_SNAPSHOT_USES_GOOGLE_PUBSUB_TOPIC =
Expand All @@ -54,7 +59,8 @@ export const IngestionSources = {
GOOGLE_CLOUD_PROJECT_HAS_GOOGLE_CLOUD_DATAFLOW_JOB: 'google-cloud-project-has-google-cloud-dataflow-job',
GOOGLE_CLOUD_DATAFLOW_JOB_USES_GOOGLE_CLOUD_DATAFLOW_DATASTORE: 'google-cloud-dataflow-job-uses-google-cloud-dataflow-datastore',
GOOGLE_CLOUD_DATAFLOW_JOB_HAS_GOOGLE_CLOUD_DATAFLOW_SNAPSHOT: 'google-cloud-dataflow-job-has-google-cloud-dataflow-snapshot',
GOOGLE_CLOUD_DATAFLOW_SNAPSHOT_USES_GOOGLE_PUBSUB_TOPIC: 'google-cloud-dataflow-snapshot-uses-google-pubsub-topic'
GOOGLE_CLOUD_DATAFLOW_SNAPSHOT_USES_GOOGLE_PUBSUB_TOPIC: 'google-cloud-dataflow-snapshot-uses-google-pubsub-topic',
GOOGLE_CLOUD_DATAFLOW_USES_GOOGLE_SPANNER_INSTANCE: 'google-cloud-dataflow-uses-google-spanner-instance'
};

export const DataflowIngestionConfig = {
Expand Down Expand Up @@ -107,6 +113,11 @@ export const DataflowIngestionConfig = {
title: 'Google Cloud Dataflow Snapshot Uses Google Pub/Sub Topic',
description: '',
defaultsToDisabled: false,
},
[IngestionSources.GOOGLE_CLOUD_DATAFLOW_USES_GOOGLE_SPANNER_INSTANCE]: {
title: 'Google Cloud Dataflow Uses Google Spanner Instance',
description: '',
defaultsToDisabled: false,
}
};

Expand Down
128 changes: 118 additions & 10 deletions src/steps/data-flow/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ import {
GOOGLE_CLOUD_DATAFLOW_SNAPSHOT_CLASS,
STEP_GOOGLE_CLOUD_DATAFLOW_SNAPSHOT_USES_GOOGLE_PUBSUB_TOPIC,
RELATIONSHIP_TYPE_GOOGLE_CLOUD_DATAFLOW_SNAPSHOT_USES_GOOGLE_PUBSUB_TOPIC,
STEP_GOOGLE_CLOUD_DATAFLOW_JOB_HAS_GOOGLE_CLOUD_DATAFLOW_SNAPSHOT,
RELATIONSHIP_TYPE_GOOGLE_CLOUD_DATAFLOW_JOB_HAS_GOOGLE_CLOUD_DATAFLOW_SNAPSHOT,
STEP_GOOGLE_CLOUD_DATAFLOW_USES_GOOGLE_SPANNER_INSTANCE,
RELATIONSHIP_TYPE_GOOGLE_CLOUD_DATAFLOW_USES_GOOGLE_SPANNER_INSTANCE
} from './constants';
import {
createGoogleCloudDataFlowEntity,
Expand All @@ -42,7 +46,8 @@ import {
import { PROJECT_ENTITY_TYPE, STEP_RESOURCE_MANAGER_PROJECT } from '../resource-manager';
import { getProjectEntity } from '../../utils/project';
import { dataflow_v1b3 } from 'googleapis';
import { ENTITY_TYPE_PUBSUB_TOPIC } from '../pub-sub/constants';
import { ENTITY_TYPE_PUBSUB_TOPIC, STEP_PUBSUB_TOPICS } from '../pub-sub/constants';
import { ENTITY_TYPE_SPANNER_INSTANCE, STEP_SPANNER_INSTANCES } from '../spanner/constants';

export async function fetchGoogleCloudDataFlowDataStore(
context: IntegrationStepContext,
Expand Down Expand Up @@ -191,6 +196,7 @@ export async function buildProjectHasDataflowJobRelationship(
},
);
}

export async function buildDataflowUsesDataflowDatastoreRelationship(
executionContext: IntegrationStepContext,
) {
Expand Down Expand Up @@ -224,6 +230,74 @@ export async function buildDataflowUsesDataflowDatastoreRelationship(
);
}

export async function buildCloudDataflowSpannerInstanceRelation(
executionContext: IntegrationStepContext,
) {
const { jobState } = executionContext;
await jobState.iterateEntities(

{ _type: ENTITY_TYPE_SPANNER_INSTANCE },
async (cloudSpannerInstanceEntity) => {

const dataflowJobKey = cloudSpannerInstanceEntity.name as string

const hasDataflowJobKey = jobState.hasKey(dataflowJobKey);

if (!hasDataflowJobKey) {
throw new IntegrationMissingKeyError(
`Cannot build Relationship.
Error: Missing Key.
dataflowJobKey : ${dataflowJobKey}`,
);
}

await jobState.addRelationship(
createDirectRelationship({
_class: RelationshipClass.USES,
fromKey: dataflowJobKey,
fromType: GOOGLE_CLOUD_DATAFLOW_JOB_TYPE,
toKey: cloudSpannerInstanceEntity._key,
toType: ENTITY_TYPE_SPANNER_INSTANCE,
}),
);
},
);
}

export async function buildCloudDataflowSnapshotPubsubTopicRelation(
executionContext: IntegrationStepContext,
) {
const { jobState } = executionContext;
await jobState.iterateEntities(

{ _type: GOOGLE_CLOUD_DATAFLOW_SNAPSHOT_TYPE },
async (snapshotEntity) => {

const pubsubKey = snapshotEntity.pubsubName as string

const haspubsubKey = jobState.hasKey(pubsubKey);

if (!haspubsubKey) {
throw new IntegrationMissingKeyError(
`Cannot build Relationship.
Error: Missing Key.
haspubsubKey : ${haspubsubKey}`,
);
}

await jobState.addRelationship(
createDirectRelationship({
_class: RelationshipClass.USES,
fromKey: snapshotEntity._key,
fromType: GOOGLE_CLOUD_DATAFLOW_SNAPSHOT_TYPE,
toKey: pubsubKey,
toType: ENTITY_TYPE_PUBSUB_TOPIC,
}),
);
},
);
}

export const dataFlowSteps: GoogleCloudIntegrationStep[] = [
{
id: STEP_GOOGLE_CLOUD_DATAFLOW,
Expand Down Expand Up @@ -377,21 +451,55 @@ export const dataFlowSteps: GoogleCloudIntegrationStep[] = [
executionHandler: fetchGoogleCloudDataFlowSnapshot,
apis: ['dataflow.googleapis.com'],
},
{
id: STEP_GOOGLE_CLOUD_DATAFLOW_SNAPSHOT_USES_GOOGLE_PUBSUB_TOPIC,
ingestionSourceId: IngestionSources.GOOGLE_CLOUD_DATAFLOW_SNAPSHOT_USES_GOOGLE_PUBSUB_TOPIC,
name: 'Google Cloud Dataflow Snapshot uses Google pubsub',
entities: [],
relationships: [
{
_class: RelationshipClass.HAS,
_type: RELATIONSHIP_TYPE_GOOGLE_CLOUD_DATAFLOW_SNAPSHOT_USES_GOOGLE_PUBSUB_TOPIC,
sourceType: GOOGLE_CLOUD_DATAFLOW_SNAPSHOT_TYPE,
targetType: ENTITY_TYPE_PUBSUB_TOPIC,
},
],
dependsOn: [STEP_GOOGLE_CLOUD_DATAFLOW_SNAPSHOT, STEP_PUBSUB_TOPICS],
executionHandler: buildCloudDataflowSnapshotPubsubTopicRelation,
apis: ['dataflow.googleapis.com'],
},
{
id: STEP_GOOGLE_CLOUD_DATAFLOW_USES_GOOGLE_SPANNER_INSTANCE,
ingestionSourceId: IngestionSources.GOOGLE_CLOUD_DATAFLOW_USES_GOOGLE_SPANNER_INSTANCE,
name: 'Google Cloud Dataflow uses Google Spanner Instance',
entities: [],
relationships: [
{
_class: RelationshipClass.USES,
_type: RELATIONSHIP_TYPE_GOOGLE_CLOUD_DATAFLOW_USES_GOOGLE_SPANNER_INSTANCE,
sourceType: GOOGLE_CLOUD_DATAFLOW_TYPE,
targetType: ENTITY_TYPE_SPANNER_INSTANCE,
},
],
dependsOn: [STEP_SPANNER_INSTANCES, STEP_GOOGLE_CLOUD_DATAFLOW],
executionHandler: buildCloudDataflowSpannerInstanceRelation,
apis: ['dataflow.googleapis.com'],
},
// {
// id: STEP_GOOGLE_CLOUD_DATAFLOW_SNAPSHOT_USES_GOOGLE_PUBSUB_TOPIC,
// ingestionSourceId: IngestionSources.GOOGLE_CLOUD_DATAFLOW_SNAPSHOT_USES_GOOGLE_PUBSUB_TOPIC,
// name: 'Google Cloud Dataflow Snapshot uses Google pubsub',
// id: STEP_GOOGLE_CLOUD_DATAFLOW_JOB_HAS_GOOGLE_CLOUD_DATAFLOW_SNAPSHOT,
// ingestionSourceId: IngestionSources.GOOGLE_CLOUD_DATAFLOW_JOB_HAS_GOOGLE_CLOUD_DATAFLOW_SNAPSHOT,
// name: 'Google Cloud Dataflow Has Google Cloud DataFlow SNapshot',
// entities: [],
// relationships: [
// {
// _class: RelationshipClass.HAS,
// _type: RELATIONSHIP_TYPE_GOOGLE_CLOUD_DATAFLOW_SNAPSHOT_USES_GOOGLE_PUBSUB_TOPIC,
// sourceType: GOOGLE_CLOUD_DATAFLOW_SNAPSHOT_TYPE,
// targetType: ENTITY_TYPE_PUBSUB_TOPIC,
// _class: RelationshipClass.USES,
// _type: RELATIONSHIP_TYPE_GOOGLE_CLOUD_DATAFLOW_JOB_HAS_GOOGLE_CLOUD_DATAFLOW_SNAPSHOT,
// sourceType: GOOGLE_CLOUD_DATAFLOW_TYPE,
// targetType: ENTITY_TYPE_SPANNER_INSTANCE,
// },
// ],
// dependsOn: [STEP_GOOGLE_CLOUD_DATAFLOW_SNAPSHOT],
// executionHandler: buildCloudDataflow,
// dependsOn: [STEP_SPANNER_INSTANCES, STEP_GOOGLE_CLOUD_DATAFLOW],
// executionHandler: buildCloudDataflowSnapshoteRelation,
// apis: ['dataflow.googleapis.com'],
// },
];
Expand Down

0 comments on commit e2107db

Please sign in to comment.