Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fcfeature/use shared lib #1471

Open
wants to merge 25 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
b3f3d83
fix e2e-test repo branch
itsankit-google Apr 3, 2023
2698afb
Merge pull request #1225 from data-integrations/fix-e2e-repo-branch
itsankit-google Apr 3, 2023
a2fc0ae
remove snapshot
CuriousVini Apr 11, 2023
a9e861f
Update pom.xml
CuriousVini Apr 11, 2023
014b5dc
Update pom.xml
CuriousVini Apr 12, 2023
e18ee09
Merge pull request #1229 from data-integrations/remove-snapshot-690
CuriousVini Apr 13, 2023
c6ef101
Add BigQuery support of nullable array
TarasSluka Jan 26, 2024
50c6436
Ignore tests that are trying to use real GCS connection
TarasSluka Jan 26, 2024
c147024
change version for custom usage: 0.22.0.1-SNAPSHOT
TarasSluka Jan 26, 2024
a2843f7
change version to v0.22.0.0-SNAPSHOT
TarasSluka Jan 29, 2024
a3f0b51
Merge pull request #2 from festcloud/fcfeature/FCDF-314
TarasSluka Jan 31, 2024
8e59e01
FCGB-71 Add Jenkinsfile
f1reballl Feb 12, 2024
f26b02a
Merge pull request #3 from festcloud/features/FCGB-71
f1reballl Feb 22, 2024
2b685a3
fixed error with union field type
o-turyk Jul 30, 2024
d53a1f1
added logging
fedorovychdev Aug 16, 2024
2e536dc
added more loggings
fedorovychdev Aug 16, 2024
241e9af
added log
fedorovychdev Aug 30, 2024
2ad9806
added logging
fedorovychdev Aug 16, 2024
cebb7ae
Merge remote-tracking branch 'origin/fcfeature/FCDF-588' into fcfeatu…
fedorovychdev Aug 30, 2024
03c665f
deleted redudant logging
fedorovychdev Aug 30, 2024
9455dad
Merge pull request #5 from festcloud/fcfeature/FCDF-588
fedorovychdev Aug 30, 2024
a811581
add BQ integrity plugin
TarasSluka Nov 25, 2024
485964d
uncomment part of pom.xml
TarasSluka Nov 25, 2024
9f4964b
Merge pull request #6 from festcloud/fcfeature/FCDF-725
TarasSluka Nov 25, 2024
f7777b5
updated jenkinsfile
blyzniukoles Nov 28, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .github/workflows/e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ name: Build e2e tests

on:
push:
branches: [ develop ]
branches: [ develop, release/* ]
pull_request:
branches: [ develop]
branches: [ develop, release/* ]
types: [opened, synchronize, reopened, labeled]
workflow_dispatch:

Expand Down Expand Up @@ -61,6 +61,7 @@ jobs:
with:
repository: cdapio/cdap-e2e-tests
path: e2e
ref: release/6.9
- name: Cache
uses: actions/cache@v3
with:
Expand Down
2 changes: 2 additions & 0 deletions Jenkinsfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
@Library('shared-lib') _
forkFusionPublicFlow(gitRepo: 'fc-google-cloud')
50 changes: 47 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

<groupId>io.cdap.plugin</groupId>
<artifactId>google-cloud</artifactId>
<version>0.22.0-SNAPSHOT</version>
<version>0.22.0.0-SNAPSHOT</version>
<name>Google Cloud Plugins</name>
<packaging>jar</packaging>
<description>Plugins for Google Big Query</description>
Expand Down Expand Up @@ -72,8 +72,8 @@
<avro.version>1.8.2</avro.version>
<bigquery.connector.hadoop2.version>hadoop2-1.0.0</bigquery.connector.hadoop2.version>
<commons.codec.version>1.4</commons.codec.version>
<cdap.version>6.9.0-SNAPSHOT</cdap.version>
<cdap.plugin.version>2.11.0-SNAPSHOT</cdap.plugin.version>
<cdap.version>6.9.0</cdap.version>
<cdap.plugin.version>2.11.0</cdap.plugin.version>
<dropwizard.metrics-core.version>3.2.6</dropwizard.metrics-core.version>
<flogger.system.backend.version>0.3.1</flogger.system.backend.version>
<gcs.connector.version>hadoop2-2.0.0</gcs.connector.version>
Expand All @@ -99,6 +99,12 @@
<spark3.version>3.1.1</spark3.version>
<spark-bq-connector.version>0.23.1</spark-bq-connector.version>
<testSourceLocation>${project.basedir}/src/test/java/</testSourceLocation>

<!--FC related properties-->
<version.cloud-metadata-provider>0.0.2-SNAPSHOT</version.cloud-metadata-provider>
<version.cloud-metadata-common>0.0.2-SNAPSHOT</version.cloud-metadata-common>
<apache.avro.version>1.11.3</apache.avro.version>
<version.commons-lang3>3.17.0</version.commons-lang3>
</properties>

<dependencyManagement>
Expand Down Expand Up @@ -782,6 +788,39 @@
<version>0.2.0</version>
</dependency>
<!-- End: dependency used by the Dataplex connector -->
<!-- Start: FC related dependencies -->
<dependency>
<groupId>ai.festcloud</groupId>
<artifactId>cloud-metadata-provider</artifactId>
<version>${version.cloud-metadata-provider}</version>
</dependency>
<dependency>
<groupId>ai.festcloud</groupId>
<artifactId>cloud-metadata-model</artifactId>
<version>${version.cloud-metadata-common}</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${apache.avro.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${version.commons-lang3}</version>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>2.2.224</version>
</dependency>
<dependency>
<groupId>ai.festcloud</groupId>
<artifactId>cloud-data-plugins-common</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>

<!-- End: FC related dependencies -->
</dependencies>

<build>
Expand Down Expand Up @@ -867,6 +906,11 @@
org.apache.hadoop.hbase.mapreduce.*;
org.apache.hadoop.hbase.security.token.*;
com.google.cloud.spark.bigquery.*;
org.apache.commons.collections4.*;
ai.festcloud.model.*;
ai.festcloud.datafabric.plugins.common.integrity.*;
org.apache.avro.*;
org.h2.*;
</_exportcontents>
</instructions>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
package io.cdap.plugin.gcp.bigquery.fctransform;

import ai.festcloud.datafabric.plugins.common.integrity.CDAPUtils;
import ai.festcloud.datafabric.plugins.common.integrity.IntegrityService;
import ai.festcloud.datafabric.plugins.common.integrity.IntegrityServiceBQ;
import ai.festcloud.datafabric.plugins.common.integrity.MetadataUtils;
import ai.festcloud.datafabric.plugins.common.integrity.mapping.MappingEntryConfig;
import ai.festcloud.datafabric.plugins.common.integrity.mapping.MappingObj;
import ai.festcloud.datafabric.plugins.common.integrity.mapping.MappingParsingService;
import ai.festcloud.metadata.model.TypeRecord;
import com.google.auth.Credentials;
import com.google.cloud.bigquery.BigQuery;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.InvalidEntry;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.StageSubmitterContext;
import io.cdap.cdap.etl.api.Transform;
import io.cdap.cdap.etl.api.TransformContext;
import io.cdap.plugin.gcp.common.GCPUtils;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Plugin(type = Transform.PLUGIN_TYPE)
@Name("BigQueryMdmIntegrityValidation")
@Description("Verify whether the requested values are present in MDM and add new specified field.")
public class MdmIntegrityBigQueryTransformer extends Transform<StructuredRecord, StructuredRecord> {

private static final Logger LOG = LoggerFactory.getLogger(MdmIntegrityBigQueryTransformer.class);

private static final String OPERATION = "operation";
private static final String OPERATION_CREATE = "create";
private static final String OPERATION_UPDATE = "update";

private final MdmIntegrityBigQueryTransformerConfig config;
private Schema outputSchema;

private MappingObj mapping;
private Map<String, TypeRecord> entities;
private IntegrityService integrityService;
private boolean containsOperationField = false;

public MdmIntegrityBigQueryTransformer(MdmIntegrityBigQueryTransformerConfig config) {
this.config = config;
}

@Override
public void initialize(TransformContext context) throws Exception {
LOG.info("Initializing BigQuery integrity validation...");
super.initialize(context);

FailureCollector failureCollector = context.getFailureCollector();
outputSchema = config.getSchema(failureCollector);

String configServerUrl = context.getArguments()
.get(MetadataUtils.CONFIGSERVER_METADATA_SCHEMA_URL);
String metadataRootPath = context.getArguments().get(MetadataUtils.METADATA_ROOT_PATH);
entities = MetadataUtils.getTypeRecordByUrl(configServerUrl,
metadataRootPath);
config.validate(failureCollector, entities, context.getInputSchema());

MappingParsingService mappingParsingService
= new MappingParsingService(config.getMapping(),
config.getFullyQualifiedEntityName(),
failureCollector,
entities,
outputSchema);
Optional<MappingObj> mappingOpt = mappingParsingService.getMapping();
mapping = mappingOpt.orElse(null);

Credentials credentials = config.getConnection().getCredentials(failureCollector);
BigQuery bigQuery = GCPUtils.getBigQuery(config.getConnection().getProject(), credentials);

failureCollector.getOrThrowException();

integrityService = new IntegrityServiceBQ(bigQuery, entities, mapping);
containsOperationField = outputSchema.getFields()
.stream().anyMatch(field -> field.getName().equals(OPERATION));

LOG.info("BigQueryMdmIntegrityValidation initialized.");
}

@Override
public void onRunFinish(boolean succeeded, StageSubmitterContext context) {
super.onRunFinish(succeeded, context);

}

@Override
public void destroy() {
super.destroy();
try {
integrityService.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
super.configurePipeline(pipelineConfigurer);
FailureCollector collector = pipelineConfigurer.getStageConfigurer().getFailureCollector();
pipelineConfigurer.getStageConfigurer().setOutputSchema(config.getSchema(collector));
}


@Override
public void transform(StructuredRecord input, Emitter<StructuredRecord> emitter)
throws Exception {
try {
StructuredRecord structuredRecord = fillIds(input);
emitter.emit(structuredRecord);
} catch (Exception e) {
emitter.emitError(new InvalidEntry<>(MetadataUtils.ERROR_CODE, e.getMessage(), input));
}
}

private StructuredRecord fillIds(StructuredRecord input) {

Map<String, Object> result = new HashMap<>();
Map<String, List<MappingEntryConfig>> mappingEntryConfigs = mapping.getMappingEntryConfigs();

mappingEntryConfigs.forEach((targetFieldName, mappingEntryConfig) -> {

for (MappingEntryConfig entryConfig : mappingEntryConfig) {
List<String> ids = integrityService.getIds(entryConfig, input);
if (ids.size() > 1) {
throw new RuntimeException(
"More than one id found for request: " + entryConfig.toString());
}
if (ids.size() == 1) {
result.put(targetFieldName, ids.get(0));
break;
}
}
});
if (result.get(MetadataUtils.DEFAULT_TARGET_FIELD) == null && config.getFcidRequired()) {
throw new RuntimeException("ID is required but not provided.");
}

if (containsOperationField) {
String operationType = result.get(MetadataUtils.DEFAULT_TARGET_FIELD) == null
? OPERATION_CREATE
: OPERATION_UPDATE;
result.put(OPERATION, operationType);
}

return setValuesToTargetFields(input, result);
}


private StructuredRecord setValuesToTargetFields(StructuredRecord input,
Map<String, Object> values) {
StructuredRecord.Builder builder = StructuredRecord.builder(outputSchema);
setFieldValues(input, values, builder, outputSchema);
return builder.build();
}

private void setFieldValues(StructuredRecord input,
Map<String, Object> values,
StructuredRecord.Builder builder,
Schema schema) {
for (Schema.Field field : schema.getFields()) {
String fieldName = field.getName();
Object fieldValue = input.get(fieldName);

if (CDAPUtils.isRecordType(field) && fieldValue != null) {
StructuredRecord nestedRecord = (StructuredRecord) fieldValue;
Schema nestedSchema = CDAPUtils.getNonNullableSchema(field.getSchema());

StructuredRecord.Builder nestedBuilder = StructuredRecord.builder(nestedSchema);
setFieldValues(nestedRecord, values, nestedBuilder, nestedSchema);
builder.set(fieldName, nestedBuilder.build());
} else {
builder.set(fieldName, values.getOrDefault(fieldName, fieldValue));
}
}
}


}
Loading