From 668b506f59114744fb7ce0cbc2fbc5904a2917e8 Mon Sep 17 00:00:00 2001 From: david-leifker <114954101+david-leifker@users.noreply.github.com> Date: Fri, 16 Aug 2024 14:41:44 -0500 Subject: [PATCH] fix(elasticsearch): refactor idHashAlgo setting (#11193) --- .../datahub-gms/env/docker-without-neo4j.env | 2 -- docker/datahub-gms/env/docker.env | 2 -- .../env/docker-without-neo4j.env | 2 -- docker/datahub-mae-consumer/env/docker.env | 2 -- .../env/docker-without-neo4j.env | 2 -- docker/datahub-mce-consumer/env/docker.env | 2 -- .../docker-compose-m1.quickstart.yml | 1 - ...er-compose-without-neo4j-m1.quickstart.yml | 1 - ...ocker-compose-without-neo4j.quickstart.yml | 1 - ...ose.consumers-without-neo4j.quickstart.yml | 2 -- .../docker-compose.consumers.quickstart.yml | 2 -- .../quickstart/docker-compose.quickstart.yml | 1 - .../metadata/aspect/models/graph/Edge.java | 6 ++-- metadata-io/build.gradle | 1 - .../elastic/ElasticSearchGraphService.java | 5 ++-- .../service/UpdateIndicesService.java | 9 ++++-- .../TimeseriesAspectTransformer.java | 20 +++++++------ .../search/SearchGraphServiceTestBase.java | 5 ++-- .../search/LineageServiceTestBase.java | 2 +- .../search/SearchServiceTestBase.java | 2 +- .../metadata/search/TestEntityTestBase.java | 3 +- .../metadata/search/query/BrowseDAOTest.java | 2 +- .../SystemMetadataServiceTestBase.java | 2 +- .../TimeseriesAspectServiceTestBase.java | 4 +-- .../SampleDataFixtureConfiguration.java | 4 +-- .../SearchLineageFixtureConfiguration.java | 5 ++-- .../kafka/hook/UpdateIndicesHookTest.java | 6 ++-- .../metadata/context/SearchContext.java | 2 +- .../context/TestOperationContexts.java | 2 +- .../metadata/context/SearchContextTest.java | 18 +++++------ .../search/ElasticSearchConfiguration.java | 1 + .../ElasticSearchGraphServiceFactory.java | 7 +++-- .../common/IndexConventionFactory.java | 5 ++-- .../indices/UpdateIndicesServiceFactory.java | 13 +++++--- .../metadata/resources/usage/UsageStats.java | 30 ++----------------- .../utils/elasticsearch/IndexConvention.java | 3 ++ .../elasticsearch/IndexConventionImpl.java | 9 ++++-- .../IndexConventionImplTest.java | 8 ++--- smoke-test/run-quickstart.sh | 2 -- smoke-test/set-test-env-vars.sh | 3 +- 40 files changed, 90 insertions(+), 109 deletions(-) diff --git a/docker/datahub-gms/env/docker-without-neo4j.env b/docker/datahub-gms/env/docker-without-neo4j.env index 37b7ba1797af5b..cc0dd6b4278b56 100644 --- a/docker/datahub-gms/env/docker-without-neo4j.env +++ b/docker/datahub-gms/env/docker-without-neo4j.env @@ -23,8 +23,6 @@ PE_CONSUMER_ENABLED=true UI_INGESTION_ENABLED=true ENTITY_SERVICE_ENABLE_RETENTION=true -ELASTIC_ID_HASH_ALGO=MD5 - # Uncomment to disable persistence of client-side analytics events # DATAHUB_ANALYTICS_ENABLED=false diff --git a/docker/datahub-gms/env/docker.env b/docker/datahub-gms/env/docker.env index 0ecaa32c4cb123..59fc4bdde02ff4 100644 --- a/docker/datahub-gms/env/docker.env +++ b/docker/datahub-gms/env/docker.env @@ -27,8 +27,6 @@ MCE_CONSUMER_ENABLED=true PE_CONSUMER_ENABLED=true UI_INGESTION_ENABLED=true -ELASTIC_ID_HASH_ALGO=MD5 - # Uncomment to enable Metadata Service Authentication METADATA_SERVICE_AUTH_ENABLED=false diff --git a/docker/datahub-mae-consumer/env/docker-without-neo4j.env b/docker/datahub-mae-consumer/env/docker-without-neo4j.env index 6a82f235b29711..b6899f7e6d63b2 100644 --- a/docker/datahub-mae-consumer/env/docker-without-neo4j.env +++ b/docker/datahub-mae-consumer/env/docker-without-neo4j.env @@ -13,8 +13,6 @@ ES_BULK_REFRESH_POLICY=WAIT_UNTIL GRAPH_SERVICE_IMPL=elasticsearch ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-mae-consumer/resources/entity-registry.yml -ELASTIC_ID_HASH_ALGO=MD5 - # Uncomment to disable persistence of client-side analytics events # DATAHUB_ANALYTICS_ENABLED=false diff --git a/docker/datahub-mae-consumer/env/docker.env b/docker/datahub-mae-consumer/env/docker.env index 1f0ee4b05b3820..5a6daa6eaeaed7 100644 --- a/docker/datahub-mae-consumer/env/docker.env +++ b/docker/datahub-mae-consumer/env/docker.env @@ -17,8 +17,6 @@ NEO4J_PASSWORD=datahub GRAPH_SERVICE_IMPL=neo4j ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-mae-consumer/resources/entity-registry.yml -ELASTIC_ID_HASH_ALGO=MD5 - # Uncomment to disable persistence of client-side analytics events # DATAHUB_ANALYTICS_ENABLED=false diff --git a/docker/datahub-mce-consumer/env/docker-without-neo4j.env b/docker/datahub-mce-consumer/env/docker-without-neo4j.env index b0edfc0a75b669..e7be7d8ed4ddc5 100644 --- a/docker/datahub-mce-consumer/env/docker-without-neo4j.env +++ b/docker/datahub-mce-consumer/env/docker-without-neo4j.env @@ -24,8 +24,6 @@ MAE_CONSUMER_ENABLED=false PE_CONSUMER_ENABLED=false UI_INGESTION_ENABLED=false -ELASTIC_ID_HASH_ALGO=MD5 - # Uncomment to configure kafka topic names # Make sure these names are consistent across the whole deployment # METADATA_CHANGE_PROPOSAL_TOPIC_NAME=MetadataChangeProposal_v1 diff --git a/docker/datahub-mce-consumer/env/docker.env b/docker/datahub-mce-consumer/env/docker.env index c0f85ef667546e..8618f3f5f7af7a 100644 --- a/docker/datahub-mce-consumer/env/docker.env +++ b/docker/datahub-mce-consumer/env/docker.env @@ -24,8 +24,6 @@ MAE_CONSUMER_ENABLED=false PE_CONSUMER_ENABLED=false UI_INGESTION_ENABLED=false -ELASTIC_ID_HASH_ALGO=MD5 - # Uncomment to configure kafka topic names # Make sure these names are consistent across the whole deployment # METADATA_CHANGE_PROPOSAL_TOPIC_NAME=MetadataChangeProposal_v1 diff --git a/docker/quickstart/docker-compose-m1.quickstart.yml b/docker/quickstart/docker-compose-m1.quickstart.yml index a0f60d23710a07..834d55096468f6 100644 --- a/docker/quickstart/docker-compose-m1.quickstart.yml +++ b/docker/quickstart/docker-compose-m1.quickstart.yml @@ -86,7 +86,6 @@ services: - ELASTICSEARCH_INDEX_BUILDER_MAPPINGS_REINDEX=true - ELASTICSEARCH_INDEX_BUILDER_SETTINGS_REINDEX=true - ELASTICSEARCH_PORT=9200 - - ELASTIC_ID_HASH_ALGO=MD5 - ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml - ENTITY_SERVICE_ENABLE_RETENTION=true - ES_BULK_REFRESH_POLICY=WAIT_UNTIL diff --git a/docker/quickstart/docker-compose-without-neo4j-m1.quickstart.yml b/docker/quickstart/docker-compose-without-neo4j-m1.quickstart.yml index 11e33a9950ba9b..47fb50f78e4f0c 100644 --- a/docker/quickstart/docker-compose-without-neo4j-m1.quickstart.yml +++ b/docker/quickstart/docker-compose-without-neo4j-m1.quickstart.yml @@ -86,7 +86,6 @@ services: - ELASTICSEARCH_INDEX_BUILDER_MAPPINGS_REINDEX=true - ELASTICSEARCH_INDEX_BUILDER_SETTINGS_REINDEX=true - ELASTICSEARCH_PORT=9200 - - ELASTIC_ID_HASH_ALGO=MD5 - ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml - ENTITY_SERVICE_ENABLE_RETENTION=true - ES_BULK_REFRESH_POLICY=WAIT_UNTIL diff --git a/docker/quickstart/docker-compose-without-neo4j.quickstart.yml b/docker/quickstart/docker-compose-without-neo4j.quickstart.yml index 2efa8959834183..3fa13a9e56b421 100644 --- a/docker/quickstart/docker-compose-without-neo4j.quickstart.yml +++ b/docker/quickstart/docker-compose-without-neo4j.quickstart.yml @@ -86,7 +86,6 @@ services: - ELASTICSEARCH_INDEX_BUILDER_MAPPINGS_REINDEX=true - ELASTICSEARCH_INDEX_BUILDER_SETTINGS_REINDEX=true - ELASTICSEARCH_PORT=9200 - - ELASTIC_ID_HASH_ALGO=MD5 - ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml - ENTITY_SERVICE_ENABLE_RETENTION=true - ES_BULK_REFRESH_POLICY=WAIT_UNTIL diff --git a/docker/quickstart/docker-compose.consumers-without-neo4j.quickstart.yml b/docker/quickstart/docker-compose.consumers-without-neo4j.quickstart.yml index 4f47a3da24eb1b..a4211acedcf102 100644 --- a/docker/quickstart/docker-compose.consumers-without-neo4j.quickstart.yml +++ b/docker/quickstart/docker-compose.consumers-without-neo4j.quickstart.yml @@ -19,7 +19,6 @@ services: - ES_BULK_REFRESH_POLICY=WAIT_UNTIL - GRAPH_SERVICE_IMPL=elasticsearch - ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-mae-consumer/resources/entity-registry.yml - - ELASTIC_ID_HASH_ALGO=MD5 hostname: datahub-mae-consumer image: ${DATAHUB_MAE_CONSUMER_IMAGE:-acryldata/datahub-mae-consumer}:${DATAHUB_VERSION:-head} ports: @@ -38,7 +37,6 @@ services: - EBEAN_DATASOURCE_USERNAME=datahub - ELASTICSEARCH_HOST=elasticsearch - ELASTICSEARCH_PORT=9200 - - ELASTIC_ID_HASH_ALGO=MD5 - ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-mce-consumer/resources/entity-registry.yml - ENTITY_SERVICE_ENABLE_RETENTION=true - ES_BULK_REFRESH_POLICY=WAIT_UNTIL diff --git a/docker/quickstart/docker-compose.consumers.quickstart.yml b/docker/quickstart/docker-compose.consumers.quickstart.yml index 7dd7388b939884..e7571e4baf8b4e 100644 --- a/docker/quickstart/docker-compose.consumers.quickstart.yml +++ b/docker/quickstart/docker-compose.consumers.quickstart.yml @@ -26,7 +26,6 @@ services: - NEO4J_PASSWORD=datahub - GRAPH_SERVICE_IMPL=neo4j - ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-mae-consumer/resources/entity-registry.yml - - ELASTIC_ID_HASH_ALGO=MD5 hostname: datahub-mae-consumer image: ${DATAHUB_MAE_CONSUMER_IMAGE:-acryldata/datahub-mae-consumer}:${DATAHUB_VERSION:-head} ports: @@ -48,7 +47,6 @@ services: - EBEAN_DATASOURCE_USERNAME=datahub - ELASTICSEARCH_HOST=elasticsearch - ELASTICSEARCH_PORT=9200 - - ELASTIC_ID_HASH_ALGO=MD5 - ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-mce-consumer/resources/entity-registry.yml - ENTITY_SERVICE_ENABLE_RETENTION=true - ES_BULK_REFRESH_POLICY=WAIT_UNTIL diff --git a/docker/quickstart/docker-compose.quickstart.yml b/docker/quickstart/docker-compose.quickstart.yml index f42ed1f40c2467..c63b6d1d61b030 100644 --- a/docker/quickstart/docker-compose.quickstart.yml +++ b/docker/quickstart/docker-compose.quickstart.yml @@ -86,7 +86,6 @@ services: - ELASTICSEARCH_INDEX_BUILDER_MAPPINGS_REINDEX=true - ELASTICSEARCH_INDEX_BUILDER_SETTINGS_REINDEX=true - ELASTICSEARCH_PORT=9200 - - ELASTIC_ID_HASH_ALGO=MD5 - ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml - ENTITY_SERVICE_ENABLE_RETENTION=true - ES_BULK_REFRESH_POLICY=WAIT_UNTIL diff --git a/entity-registry/src/main/java/com/linkedin/metadata/aspect/models/graph/Edge.java b/entity-registry/src/main/java/com/linkedin/metadata/aspect/models/graph/Edge.java index 3de09e599d99ed..8777be57e1bd8f 100644 --- a/entity-registry/src/main/java/com/linkedin/metadata/aspect/models/graph/Edge.java +++ b/entity-registry/src/main/java/com/linkedin/metadata/aspect/models/graph/Edge.java @@ -13,6 +13,7 @@ import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; +import javax.annotation.Nonnull; import lombok.AllArgsConstructor; import lombok.Data; import lombok.EqualsAndHashCode; @@ -59,7 +60,7 @@ public Edge( null); } - public String toDocId() { + public String toDocId(@Nonnull String idHashAlgo) { StringBuilder rawDocId = new StringBuilder(); rawDocId .append(getSource().toString()) @@ -72,9 +73,8 @@ public String toDocId() { } try { - String hashAlgo = System.getenv("ELASTIC_ID_HASH_ALGO"); byte[] bytesOfRawDocID = rawDocId.toString().getBytes(StandardCharsets.UTF_8); - MessageDigest md = MessageDigest.getInstance(hashAlgo); + MessageDigest md = MessageDigest.getInstance(idHashAlgo); byte[] thedigest = md.digest(bytesOfRawDocID); return Base64.getEncoder().encodeToString(thedigest); } catch (NoSuchAlgorithmException e) { diff --git a/metadata-io/build.gradle b/metadata-io/build.gradle index 17d9cb8cd14fee..ff29cb5fff47d2 100644 --- a/metadata-io/build.gradle +++ b/metadata-io/build.gradle @@ -130,7 +130,6 @@ test { // override, testng controlling parallelization // increasing >1 will merely run all tests extra times maxParallelForks = 1 - environment "ELASTIC_ID_HASH_ALGO", "MD5" } useTestNG() { suites 'src/test/resources/testng.xml' diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphService.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphService.java index 5b0fb554a4f48d..e1532ea4e26c06 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphService.java @@ -64,6 +64,7 @@ public class ElasticSearchGraphService implements GraphService, ElasticSearchInd private final ESGraphWriteDAO _graphWriteDAO; private final ESGraphQueryDAO _graphReadDAO; private final ESIndexBuilder _indexBuilder; + private final String idHashAlgo; public static final String INDEX_NAME = "graph_service_v1"; private static final Map EMPTY_HASH = new HashMap<>(); @@ -125,7 +126,7 @@ public LineageRegistry getLineageRegistry() { @Override public void addEdge(@Nonnull final Edge edge) { - String docId = edge.toDocId(); + String docId = edge.toDocId(idHashAlgo); String edgeDocument = toDocument(edge); _graphWriteDAO.upsertDocument(docId, edgeDocument); } @@ -137,7 +138,7 @@ public void upsertEdge(@Nonnull final Edge edge) { @Override public void removeEdge(@Nonnull final Edge edge) { - String docId = edge.toDocId(); + String docId = edge.toDocId(idHashAlgo); _graphWriteDAO.deleteDocument(docId); } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/service/UpdateIndicesService.java b/metadata-io/src/main/java/com/linkedin/metadata/service/UpdateIndicesService.java index dff0a99a142b73..2ab9e17f281637 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/service/UpdateIndicesService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/service/UpdateIndicesService.java @@ -80,6 +80,7 @@ public class UpdateIndicesService implements SearchIndicesService { private final SystemMetadataService _systemMetadataService; private final SearchDocumentTransformer _searchDocumentTransformer; private final EntityIndexBuilders _entityIndexBuilders; + @Nonnull private final String idHashAlgo; @Value("${featureFlags.graphServiceDiffModeEnabled:true}") private boolean _graphDiffMode; @@ -117,13 +118,15 @@ public UpdateIndicesService( TimeseriesAspectService timeseriesAspectService, SystemMetadataService systemMetadataService, SearchDocumentTransformer searchDocumentTransformer, - EntityIndexBuilders entityIndexBuilders) { + EntityIndexBuilders entityIndexBuilders, + @Nonnull String idHashAlgo) { _graphService = graphService; _entitySearchService = entitySearchService; _timeseriesAspectService = timeseriesAspectService; _systemMetadataService = systemMetadataService; _searchDocumentTransformer = searchDocumentTransformer; _entityIndexBuilders = entityIndexBuilders; + this.idHashAlgo = idHashAlgo; } @Override @@ -601,7 +604,9 @@ private void updateTimeseriesFields( SystemMetadata systemMetadata) { Map documents; try { - documents = TimeseriesAspectTransformer.transform(urn, aspect, aspectSpec, systemMetadata); + documents = + TimeseriesAspectTransformer.transform( + urn, aspect, aspectSpec, systemMetadata, idHashAlgo); } catch (JsonProcessingException e) { log.error("Failed to generate timeseries document from aspect: {}", e.toString()); return; diff --git a/metadata-io/src/main/java/com/linkedin/metadata/timeseries/transformer/TimeseriesAspectTransformer.java b/metadata-io/src/main/java/com/linkedin/metadata/timeseries/transformer/TimeseriesAspectTransformer.java index cf0a3f1466d254..c353e601a31b70 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/timeseries/transformer/TimeseriesAspectTransformer.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/timeseries/transformer/TimeseriesAspectTransformer.java @@ -54,7 +54,8 @@ public static Map transform( @Nonnull final Urn urn, @Nonnull final RecordTemplate timeseriesAspect, @Nonnull final AspectSpec aspectSpec, - @Nullable final SystemMetadata systemMetadata) + @Nullable final SystemMetadata systemMetadata, + @Nonnull final String idHashAlgo) throws JsonProcessingException { ObjectNode commonDocument = getCommonDocument(urn, timeseriesAspect, systemMetadata); Map finalDocuments = new HashMap<>(); @@ -74,7 +75,7 @@ public static Map transform( final Map> timeseriesFieldValueMap = FieldExtractor.extractFields(timeseriesAspect, aspectSpec.getTimeseriesFieldSpecs()); timeseriesFieldValueMap.forEach((k, v) -> setTimeseriesField(document, k, v)); - finalDocuments.put(getDocId(document, null), document); + finalDocuments.put(getDocId(document, null, idHashAlgo), document); // Create new rows for the member collection fields. final Map> timeseriesFieldCollectionValueMap = @@ -83,7 +84,7 @@ public static Map transform( timeseriesFieldCollectionValueMap.forEach( (key, values) -> finalDocuments.putAll( - getTimeseriesFieldCollectionDocuments(key, values, commonDocument))); + getTimeseriesFieldCollectionDocuments(key, values, commonDocument, idHashAlgo))); return finalDocuments; } @@ -216,12 +217,13 @@ private static void setTimeseriesField( private static Map getTimeseriesFieldCollectionDocuments( final TimeseriesFieldCollectionSpec fieldSpec, final List values, - final ObjectNode commonDocument) { + final ObjectNode commonDocument, + @Nonnull final String idHashAlgo) { return values.stream() .map(value -> getTimeseriesFieldCollectionDocument(fieldSpec, value, commonDocument)) .collect( Collectors.toMap( - keyDocPair -> getDocId(keyDocPair.getSecond(), keyDocPair.getFirst()), + keyDocPair -> getDocId(keyDocPair.getSecond(), keyDocPair.getFirst(), idHashAlgo), Pair::getSecond)); } @@ -257,9 +259,9 @@ private static Pair getTimeseriesFieldCollectionDocument( finalDocument); } - private static String getDocId(@Nonnull JsonNode document, String collectionId) + private static String getDocId( + @Nonnull JsonNode document, String collectionId, @Nonnull String idHashAlgo) throws IllegalArgumentException { - String hashAlgo = System.getenv("ELASTIC_ID_HASH_ALGO"); String docId = document.get(MappingsBuilder.TIMESTAMP_MILLIS_FIELD).toString(); JsonNode eventGranularity = document.get(MappingsBuilder.EVENT_GRANULARITY); if (eventGranularity != null) { @@ -278,9 +280,9 @@ private static String getDocId(@Nonnull JsonNode document, String collectionId) docId += partitionSpec.toString(); } - if (hashAlgo.equalsIgnoreCase("SHA-256")) { + if (idHashAlgo.equalsIgnoreCase("SHA-256")) { return DigestUtils.sha256Hex(docId); - } else if (hashAlgo.equalsIgnoreCase("MD5")) { + } else if (idHashAlgo.equalsIgnoreCase("MD5")) { return DigestUtils.md5Hex(docId); } throw new IllegalArgumentException("Hash function not handled !"); diff --git a/metadata-io/src/test/java/com/linkedin/metadata/graph/search/SearchGraphServiceTestBase.java b/metadata-io/src/test/java/com/linkedin/metadata/graph/search/SearchGraphServiceTestBase.java index 06f1369ff0670c..d1a51b1d69b2c3 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/graph/search/SearchGraphServiceTestBase.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/graph/search/SearchGraphServiceTestBase.java @@ -62,7 +62,7 @@ public abstract class SearchGraphServiceTestBase extends GraphServiceTestBase { @Nonnull protected abstract ESIndexBuilder getIndexBuilder(); - private final IndexConvention _indexConvention = IndexConventionImpl.NO_PREFIX; + private final IndexConvention _indexConvention = IndexConventionImpl.noPrefix("MD5"); private final String _indexName = _indexConvention.getIndexName(INDEX_NAME); private ElasticSearchGraphService _client; @@ -108,7 +108,8 @@ private ElasticSearchGraphService buildService(boolean enableMultiPathSearch) { _indexConvention, writeDAO, readDAO, - getIndexBuilder()); + getIndexBuilder(), + "MD5"); } @Override diff --git a/metadata-io/src/test/java/com/linkedin/metadata/search/LineageServiceTestBase.java b/metadata-io/src/test/java/com/linkedin/metadata/search/LineageServiceTestBase.java index a9d84ae1f3aea1..99e4923885a41d 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/search/LineageServiceTestBase.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/search/LineageServiceTestBase.java @@ -122,7 +122,7 @@ public void setup() throws RemoteInvocationException, URISyntaxException { operationContext = TestOperationContexts.systemContextNoSearchAuthorization( new SnapshotEntityRegistry(new Snapshot()), - new IndexConventionImpl("lineage_search_service_test")) + new IndexConventionImpl("lineage_search_service_test", "MD5")) .asSession(RequestContext.TEST, Authorizer.EMPTY, TestOperationContexts.TEST_USER_AUTH); settingsBuilder = new SettingsBuilder(null); elasticSearchService = buildEntitySearchService(); diff --git a/metadata-io/src/test/java/com/linkedin/metadata/search/SearchServiceTestBase.java b/metadata-io/src/test/java/com/linkedin/metadata/search/SearchServiceTestBase.java index 445b71b2eaff62..5e30e01a8ea690 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/search/SearchServiceTestBase.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/search/SearchServiceTestBase.java @@ -79,7 +79,7 @@ public void setup() throws RemoteInvocationException, URISyntaxException { operationContext = TestOperationContexts.systemContextNoSearchAuthorization( new SnapshotEntityRegistry(new Snapshot()), - new IndexConventionImpl("search_service_test")) + new IndexConventionImpl("search_service_test", "MD5")) .asSession(RequestContext.TEST, Authorizer.EMPTY, TestOperationContexts.TEST_USER_AUTH); settingsBuilder = new SettingsBuilder(null); diff --git a/metadata-io/src/test/java/com/linkedin/metadata/search/TestEntityTestBase.java b/metadata-io/src/test/java/com/linkedin/metadata/search/TestEntityTestBase.java index ab5e90f77c21aa..282a3d8e3ea6ae 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/search/TestEntityTestBase.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/search/TestEntityTestBase.java @@ -62,7 +62,8 @@ public abstract class TestEntityTestBase extends AbstractTestNGSpringContextTest public void setup() { opContext = TestOperationContexts.systemContextNoSearchAuthorization( - new SnapshotEntityRegistry(new Snapshot()), new IndexConventionImpl("es_service_test")); + new SnapshotEntityRegistry(new Snapshot()), + new IndexConventionImpl("es_service_test", "MD5")); settingsBuilder = new SettingsBuilder(null); elasticSearchService = buildService(); elasticSearchService.reindexAll(Collections.emptySet()); diff --git a/metadata-io/src/test/java/com/linkedin/metadata/search/query/BrowseDAOTest.java b/metadata-io/src/test/java/com/linkedin/metadata/search/query/BrowseDAOTest.java index a0288d019644bd..8044515e3dc6a7 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/search/query/BrowseDAOTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/search/query/BrowseDAOTest.java @@ -45,7 +45,7 @@ public void setup() throws RemoteInvocationException, URISyntaxException { mockClient = mock(RestHighLevelClient.class); opContext = TestOperationContexts.systemContextNoSearchAuthorization( - new IndexConventionImpl("es_browse_dao_test")); + new IndexConventionImpl("es_browse_dao_test", "MD5")); browseDAO = new ESBrowseDAO(mockClient, searchConfiguration, customSearchConfiguration); } diff --git a/metadata-io/src/test/java/com/linkedin/metadata/systemmetadata/SystemMetadataServiceTestBase.java b/metadata-io/src/test/java/com/linkedin/metadata/systemmetadata/SystemMetadataServiceTestBase.java index af7005c93c46d3..1b9d8c57b4cad3 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/systemmetadata/SystemMetadataServiceTestBase.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/systemmetadata/SystemMetadataServiceTestBase.java @@ -32,7 +32,7 @@ public abstract class SystemMetadataServiceTestBase extends AbstractTestNGSpring protected abstract ESIndexBuilder getIndexBuilder(); private final IndexConvention _indexConvention = - new IndexConventionImpl("es_system_metadata_service_test"); + new IndexConventionImpl("es_system_metadata_service_test", "MD5"); private ElasticSearchSystemMetadataService _client; diff --git a/metadata-io/src/test/java/com/linkedin/metadata/timeseries/search/TimeseriesAspectServiceTestBase.java b/metadata-io/src/test/java/com/linkedin/metadata/timeseries/search/TimeseriesAspectServiceTestBase.java index 10c6f09cb8f8d6..414183c8882f9c 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/timeseries/search/TimeseriesAspectServiceTestBase.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/timeseries/search/TimeseriesAspectServiceTestBase.java @@ -126,7 +126,7 @@ public void setup() throws RemoteInvocationException, URISyntaxException { opContext = TestOperationContexts.systemContextNoSearchAuthorization( - entityRegistry, new IndexConventionImpl("es_timeseries_aspect_service_test")); + entityRegistry, new IndexConventionImpl("es_timeseries_aspect_service_test", "MD5")); elasticSearchTimeseriesAspectService = buildService(); elasticSearchTimeseriesAspectService.reindexAll(Collections.emptySet()); @@ -152,7 +152,7 @@ private ElasticSearchTimeseriesAspectService buildService() { private void upsertDocument(TestEntityProfile dp, Urn urn) throws JsonProcessingException { Map documents = - TimeseriesAspectTransformer.transform(urn, dp, aspectSpec, null); + TimeseriesAspectTransformer.transform(urn, dp, aspectSpec, null, "MD5"); assertEquals(documents.size(), 3); documents.forEach( (key, value) -> diff --git a/metadata-io/src/test/java/io/datahubproject/test/fixtures/search/SampleDataFixtureConfiguration.java b/metadata-io/src/test/java/io/datahubproject/test/fixtures/search/SampleDataFixtureConfiguration.java index 28a4a2b00cd6f1..6a95d16c254370 100644 --- a/metadata-io/src/test/java/io/datahubproject/test/fixtures/search/SampleDataFixtureConfiguration.java +++ b/metadata-io/src/test/java/io/datahubproject/test/fixtures/search/SampleDataFixtureConfiguration.java @@ -86,12 +86,12 @@ protected String longTailIndexPrefix() { @Bean(name = "sampleDataIndexConvention") protected IndexConvention indexConvention(@Qualifier("sampleDataPrefix") String prefix) { - return new IndexConventionImpl(prefix); + return new IndexConventionImpl(prefix, "MD5"); } @Bean(name = "longTailIndexConvention") protected IndexConvention longTailIndexConvention(@Qualifier("longTailPrefix") String prefix) { - return new IndexConventionImpl(prefix); + return new IndexConventionImpl(prefix, "MD5"); } @Bean(name = "sampleDataFixtureName") diff --git a/metadata-io/src/test/java/io/datahubproject/test/fixtures/search/SearchLineageFixtureConfiguration.java b/metadata-io/src/test/java/io/datahubproject/test/fixtures/search/SearchLineageFixtureConfiguration.java index e783c011de6d0e..33e04af83c0a3a 100644 --- a/metadata-io/src/test/java/io/datahubproject/test/fixtures/search/SearchLineageFixtureConfiguration.java +++ b/metadata-io/src/test/java/io/datahubproject/test/fixtures/search/SearchLineageFixtureConfiguration.java @@ -71,7 +71,7 @@ protected String indexPrefix() { @Bean(name = "searchLineageIndexConvention") protected IndexConvention indexConvention(@Qualifier("searchLineagePrefix") String prefix) { - return new IndexConventionImpl(prefix); + return new IndexConventionImpl(prefix, "MD5"); } @Bean(name = "searchLineageFixtureName") @@ -173,7 +173,8 @@ protected ElasticSearchGraphService graphService( new ESGraphWriteDAO(indexConvention, bulkProcessor, 1), new ESGraphQueryDAO( searchClient, lineageRegistry, indexConvention, getGraphQueryConfiguration()), - indexBuilder); + indexBuilder, + indexConvention.getIdHashAlgo()); graphService.reindexAll(Collections.emptySet()); return graphService; } diff --git a/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHookTest.java b/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHookTest.java index 411fe02260bb1b..4cd59992eb2f00 100644 --- a/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHookTest.java +++ b/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHookTest.java @@ -125,7 +125,8 @@ public void setupTest() { mockTimeseriesAspectService, mockSystemMetadataService, searchDocumentTransformer, - mockEntityIndexBuilders); + mockEntityIndexBuilders, + "MD5"); OperationContext systemOperationContext = TestOperationContexts.systemContextNoSearchAuthorization(); @@ -235,7 +236,8 @@ public void testInputFieldsEdgesAreAdded() throws Exception { mockTimeseriesAspectService, mockSystemMetadataService, searchDocumentTransformer, - mockEntityIndexBuilders); + mockEntityIndexBuilders, + "MD5"); updateIndicesHook = new UpdateIndicesHook(updateIndicesService, true, false); updateIndicesHook.init( diff --git a/metadata-operation-context/src/main/java/io/datahubproject/metadata/context/SearchContext.java b/metadata-operation-context/src/main/java/io/datahubproject/metadata/context/SearchContext.java index c067e91c3524cf..5ad7bdc14820c3 100644 --- a/metadata-operation-context/src/main/java/io/datahubproject/metadata/context/SearchContext.java +++ b/metadata-operation-context/src/main/java/io/datahubproject/metadata/context/SearchContext.java @@ -21,7 +21,7 @@ public class SearchContext implements ContextInterface { public static SearchContext EMPTY = - SearchContext.builder().indexConvention(IndexConventionImpl.NO_PREFIX).build(); + SearchContext.builder().indexConvention(IndexConventionImpl.noPrefix("")).build(); public static SearchContext withFlagDefaults( @Nonnull SearchContext searchContext, diff --git a/metadata-operation-context/src/main/java/io/datahubproject/test/metadata/context/TestOperationContexts.java b/metadata-operation-context/src/main/java/io/datahubproject/test/metadata/context/TestOperationContexts.java index e54c040fe13b58..76f58fb4751085 100644 --- a/metadata-operation-context/src/main/java/io/datahubproject/test/metadata/context/TestOperationContexts.java +++ b/metadata-operation-context/src/main/java/io/datahubproject/test/metadata/context/TestOperationContexts.java @@ -191,7 +191,7 @@ public static OperationContext systemContext( IndexConvention indexConvention = Optional.ofNullable(indexConventionSupplier) .map(Supplier::get) - .orElse(IndexConventionImpl.NO_PREFIX); + .orElse(IndexConventionImpl.noPrefix("MD5")); ServicesRegistryContext servicesRegistryContext = Optional.ofNullable(servicesRegistrySupplier).orElse(() -> null).get(); diff --git a/metadata-operation-context/src/test/java/io/datahubproject/metadata/context/SearchContextTest.java b/metadata-operation-context/src/test/java/io/datahubproject/metadata/context/SearchContextTest.java index 4858bb342258a5..2e0585cc82a4fd 100644 --- a/metadata-operation-context/src/test/java/io/datahubproject/metadata/context/SearchContextTest.java +++ b/metadata-operation-context/src/test/java/io/datahubproject/metadata/context/SearchContextTest.java @@ -12,26 +12,26 @@ public class SearchContextTest { @Test public void searchContextId() { SearchContext testNoFlags = - SearchContext.builder().indexConvention(IndexConventionImpl.NO_PREFIX).build(); + SearchContext.builder().indexConvention(IndexConventionImpl.noPrefix("MD5")).build(); assertEquals( testNoFlags.getCacheKeyComponent(), SearchContext.builder() - .indexConvention(IndexConventionImpl.NO_PREFIX) + .indexConvention(IndexConventionImpl.noPrefix("MD5")) .build() .getCacheKeyComponent(), "Expected consistent context ids across instances"); SearchContext testWithFlags = SearchContext.builder() - .indexConvention(IndexConventionImpl.NO_PREFIX) + .indexConvention(IndexConventionImpl.noPrefix("MD5")) .searchFlags(new SearchFlags().setFulltext(true)) .build(); assertEquals( testWithFlags.getCacheKeyComponent(), SearchContext.builder() - .indexConvention(IndexConventionImpl.NO_PREFIX) + .indexConvention(IndexConventionImpl.noPrefix("MD5")) .searchFlags(new SearchFlags().setFulltext(true)) .build() .getCacheKeyComponent(), @@ -44,7 +44,7 @@ public void searchContextId() { assertNotEquals( testWithFlags.getCacheKeyComponent(), SearchContext.builder() - .indexConvention(IndexConventionImpl.NO_PREFIX) + .indexConvention(IndexConventionImpl.noPrefix("MD5")) .searchFlags(new SearchFlags().setFulltext(true).setIncludeRestricted(true)) .build() .getCacheKeyComponent(), @@ -53,7 +53,7 @@ public void searchContextId() { assertNotEquals( testNoFlags.getCacheKeyComponent(), SearchContext.builder() - .indexConvention(new IndexConventionImpl("Some Prefix")) + .indexConvention(new IndexConventionImpl("Some Prefix", "MD5")) .searchFlags(null) .build() .getCacheKeyComponent(), @@ -61,7 +61,7 @@ public void searchContextId() { assertNotEquals( SearchContext.builder() - .indexConvention(IndexConventionImpl.NO_PREFIX) + .indexConvention(IndexConventionImpl.noPrefix("MD5")) .searchFlags( new SearchFlags() .setFulltext(false) @@ -70,7 +70,7 @@ public void searchContextId() { .build() .getCacheKeyComponent(), SearchContext.builder() - .indexConvention(IndexConventionImpl.NO_PREFIX) + .indexConvention(IndexConventionImpl.noPrefix("MD5")) .searchFlags(new SearchFlags().setFulltext(true).setIncludeRestricted(true)) .build() .getCacheKeyComponent(), @@ -80,7 +80,7 @@ public void searchContextId() { @Test public void testImmutableSearchFlags() { SearchContext initial = - SearchContext.builder().indexConvention(IndexConventionImpl.NO_PREFIX).build(); + SearchContext.builder().indexConvention(IndexConventionImpl.noPrefix("MD5")).build(); assertEquals(initial.getSearchFlags(), new SearchFlags().setSkipCache(false)); SearchContext mutated = initial.withFlagDefaults(flags -> flags.setSkipCache(true)); diff --git a/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/search/ElasticSearchConfiguration.java b/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/search/ElasticSearchConfiguration.java index 130620a9ab918c..7d68e18940401e 100644 --- a/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/search/ElasticSearchConfiguration.java +++ b/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/search/ElasticSearchConfiguration.java @@ -8,4 +8,5 @@ public class ElasticSearchConfiguration { private BuildIndicesConfiguration buildIndices; public String implementation; private SearchConfiguration search; + private String idHashAlgo; } diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/common/ElasticSearchGraphServiceFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/common/ElasticSearchGraphServiceFactory.java index eb56e8d42c158e..55eb931625fecc 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/common/ElasticSearchGraphServiceFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/common/ElasticSearchGraphServiceFactory.java @@ -11,6 +11,7 @@ import javax.annotation.Nonnull; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; @@ -30,7 +31,8 @@ public class ElasticSearchGraphServiceFactory { @Bean(name = "elasticSearchGraphService") @Nonnull - protected ElasticSearchGraphService getInstance() { + protected ElasticSearchGraphService getInstance( + @Value("${elasticsearch.idHashAlgo}") final String idHashAlgo) { LineageRegistry lineageRegistry = new LineageRegistry(entityRegistry); return new ElasticSearchGraphService( lineageRegistry, @@ -45,6 +47,7 @@ protected ElasticSearchGraphService getInstance() { lineageRegistry, components.getIndexConvention(), configurationProvider.getElasticSearch().getSearch().getGraph()), - components.getIndexBuilder()); + components.getIndexBuilder(), + idHashAlgo); } } diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/common/IndexConventionFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/common/IndexConventionFactory.java index 5b76a3f2cb833f..2288c8d4ecd50d 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/common/IndexConventionFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/common/IndexConventionFactory.java @@ -19,7 +19,8 @@ public class IndexConventionFactory { private String indexPrefix; @Bean(name = INDEX_CONVENTION_BEAN) - protected IndexConvention createInstance() { - return new IndexConventionImpl(indexPrefix); + protected IndexConvention createInstance( + @Value("${elasticsearch.idHashAlgo}") final String isHashAlgo) { + return new IndexConventionImpl(indexPrefix, isHashAlgo); } } diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entity/update/indices/UpdateIndicesServiceFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entity/update/indices/UpdateIndicesServiceFactory.java index fad9d0eaf3b45c..38a344f8be8e92 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entity/update/indices/UpdateIndicesServiceFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entity/update/indices/UpdateIndicesServiceFactory.java @@ -9,6 +9,7 @@ import com.linkedin.metadata.service.UpdateIndicesService; import com.linkedin.metadata.systemmetadata.SystemMetadataService; import com.linkedin.metadata.timeseries.TimeseriesAspectService; +import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -30,7 +31,8 @@ public UpdateIndicesService searchIndicesServiceNonGMS( TimeseriesAspectService timeseriesAspectService, SystemMetadataService systemMetadataService, SearchDocumentTransformer searchDocumentTransformer, - EntityIndexBuilders entityIndexBuilders) { + EntityIndexBuilders entityIndexBuilders, + @Value("${elasticsearch.idHashAlgo}") final String idHashAlgo) { return new UpdateIndicesService( graphService, @@ -38,7 +40,8 @@ public UpdateIndicesService searchIndicesServiceNonGMS( timeseriesAspectService, systemMetadataService, searchDocumentTransformer, - entityIndexBuilders); + entityIndexBuilders, + idHashAlgo); } @Bean @@ -50,7 +53,8 @@ public UpdateIndicesService searchIndicesServiceGMS( final SystemMetadataService systemMetadataService, final SearchDocumentTransformer searchDocumentTransformer, final EntityIndexBuilders entityIndexBuilders, - final EntityService entityService) { + final EntityService entityService, + @Value("${elasticsearch.idHashAlgo}") final String idHashAlgo) { UpdateIndicesService updateIndicesService = new UpdateIndicesService( @@ -59,7 +63,8 @@ public UpdateIndicesService searchIndicesServiceGMS( timeseriesAspectService, systemMetadataService, searchDocumentTransformer, - entityIndexBuilders); + entityIndexBuilders, + idHashAlgo); entityService.setUpdateIndicesService(updateIndicesService); diff --git a/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/usage/UsageStats.java b/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/usage/UsageStats.java index 518dfecd576808..1b003fec82e8b8 100644 --- a/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/usage/UsageStats.java +++ b/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/usage/UsageStats.java @@ -2,25 +2,20 @@ import static com.datahub.authorization.AuthUtil.isAPIAuthorized; import static com.datahub.authorization.AuthUtil.isAPIAuthorizedEntityUrns; -import static com.linkedin.metadata.Constants.*; import static com.linkedin.metadata.authorization.ApiOperation.UPDATE; import static com.linkedin.metadata.timeseries.elastic.UsageServiceUtil.USAGE_STATS_ASPECT_NAME; import static com.linkedin.metadata.timeseries.elastic.UsageServiceUtil.USAGE_STATS_ENTITY_NAME; import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.Timer; import com.datahub.authentication.Authentication; import com.datahub.authentication.AuthenticationContext; import com.datahub.authorization.EntitySpec; import com.datahub.plugins.auth.authorization.Authorizer; import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.core.StreamReadConstraints; import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; import com.linkedin.common.WindowDuration; import com.linkedin.common.urn.Urn; import com.linkedin.common.urn.UrnUtils; -import com.linkedin.data.template.StringArray; import com.linkedin.dataset.DatasetFieldUsageCounts; import com.linkedin.dataset.DatasetFieldUsageCountsArray; import com.linkedin.dataset.DatasetUsageStatistics; @@ -29,17 +24,10 @@ import com.linkedin.metadata.authorization.PoliciesConfig; import com.linkedin.metadata.models.AspectSpec; import com.linkedin.metadata.models.registry.EntityRegistry; -import com.linkedin.metadata.query.filter.Condition; -import com.linkedin.metadata.query.filter.ConjunctiveCriterion; -import com.linkedin.metadata.query.filter.ConjunctiveCriterionArray; -import com.linkedin.metadata.query.filter.Criterion; -import com.linkedin.metadata.query.filter.CriterionArray; -import com.linkedin.metadata.query.filter.Filter; import com.linkedin.metadata.restli.RestliUtil; import com.linkedin.metadata.timeseries.TimeseriesAspectService; import com.linkedin.metadata.timeseries.elastic.UsageServiceUtil; import com.linkedin.metadata.timeseries.transformer.TimeseriesAspectTransformer; -import com.linkedin.metadata.utils.metrics.MetricUtils; import com.linkedin.parseq.Task; import com.linkedin.restli.common.HttpStatus; import com.linkedin.restli.server.RestLiServiceException; @@ -47,35 +35,20 @@ import com.linkedin.restli.server.annotations.ActionParam; import com.linkedin.restli.server.annotations.RestLiSimpleResource; import com.linkedin.restli.server.resources.SimpleResourceTemplate; -import com.linkedin.timeseries.AggregationSpec; -import com.linkedin.timeseries.AggregationType; -import com.linkedin.timeseries.CalendarInterval; -import com.linkedin.timeseries.GenericTable; -import com.linkedin.timeseries.GroupingBucket; -import com.linkedin.timeseries.GroupingBucketType; import com.linkedin.timeseries.TimeWindowSize; import com.linkedin.usage.FieldUsageCounts; -import com.linkedin.usage.FieldUsageCountsArray; import com.linkedin.usage.UsageAggregation; -import com.linkedin.usage.UsageAggregationArray; import com.linkedin.usage.UsageAggregationMetrics; import com.linkedin.usage.UsageQueryResult; -import com.linkedin.usage.UsageQueryResultAggregations; import com.linkedin.usage.UsageTimeRange; import com.linkedin.usage.UserUsageCounts; -import com.linkedin.usage.UserUsageCountsArray; import io.datahubproject.metadata.context.OperationContext; import io.datahubproject.metadata.context.RequestContext; import io.opentelemetry.extension.annotations.WithSpan; -import java.net.URISyntaxException; -import java.time.Instant; -import java.util.ArrayList; import java.util.Arrays; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; -import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; import javax.inject.Inject; import javax.inject.Named; @@ -255,7 +228,8 @@ private void ingest(@Nonnull OperationContext opContext, @Nonnull UsageAggregati try { documents = TimeseriesAspectTransformer.transform( - bucket.getResource(), datasetUsageStatistics, getUsageStatsAspectSpec(), null); + bucket.getResource(), datasetUsageStatistics, getUsageStatsAspectSpec(), null, + systemOperationContext.getSearchContext().getIndexConvention().getIdHashAlgo()); } catch (JsonProcessingException e) { log.error("Failed to generate timeseries document from aspect: {}", e.toString()); return; diff --git a/metadata-utils/src/main/java/com/linkedin/metadata/utils/elasticsearch/IndexConvention.java b/metadata-utils/src/main/java/com/linkedin/metadata/utils/elasticsearch/IndexConvention.java index 4a3f78fcef7bd6..87aebabf643666 100644 --- a/metadata-utils/src/main/java/com/linkedin/metadata/utils/elasticsearch/IndexConvention.java +++ b/metadata-utils/src/main/java/com/linkedin/metadata/utils/elasticsearch/IndexConvention.java @@ -47,4 +47,7 @@ public interface IndexConvention { * if one cannot be extracted */ Optional> getEntityAndAspectName(String timeseriesAspectIndexName); + + @Nonnull + String getIdHashAlgo(); } diff --git a/metadata-utils/src/main/java/com/linkedin/metadata/utils/elasticsearch/IndexConventionImpl.java b/metadata-utils/src/main/java/com/linkedin/metadata/utils/elasticsearch/IndexConventionImpl.java index 47801cd2054fa4..2c9c927cd8c347 100644 --- a/metadata-utils/src/main/java/com/linkedin/metadata/utils/elasticsearch/IndexConventionImpl.java +++ b/metadata-utils/src/main/java/com/linkedin/metadata/utils/elasticsearch/IndexConventionImpl.java @@ -8,25 +8,30 @@ import java.util.concurrent.ConcurrentHashMap; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import lombok.Getter; import org.apache.commons.lang3.StringUtils; // Default implementation of search index naming convention public class IndexConventionImpl implements IndexConvention { - public static final IndexConvention NO_PREFIX = new IndexConventionImpl(null); + public static IndexConvention noPrefix(@Nonnull String idHashAlgo) { + return new IndexConventionImpl(null, idHashAlgo); + } // Map from Entity name -> Index name private final Map indexNameMapping = new ConcurrentHashMap<>(); private final Optional _prefix; private final String _getAllEntityIndicesPattern; private final String _getAllTimeseriesIndicesPattern; + @Getter private final String idHashAlgo; private static final String ENTITY_INDEX_VERSION = "v2"; private static final String ENTITY_INDEX_SUFFIX = "index"; private static final String TIMESERIES_INDEX_VERSION = "v1"; private static final String TIMESERIES_ENTITY_INDEX_SUFFIX = "aspect"; - public IndexConventionImpl(@Nullable String prefix) { + public IndexConventionImpl(@Nullable String prefix, String idHashAlgo) { _prefix = StringUtils.isEmpty(prefix) ? Optional.empty() : Optional.of(prefix); + this.idHashAlgo = idHashAlgo; _getAllEntityIndicesPattern = _prefix.map(p -> p + "_").orElse("") + "*" diff --git a/metadata-utils/src/test/java/com/linkedin/metadata/utils/elasticsearch/IndexConventionImplTest.java b/metadata-utils/src/test/java/com/linkedin/metadata/utils/elasticsearch/IndexConventionImplTest.java index 8074f344cd2441..2f6c7138d3c4fb 100644 --- a/metadata-utils/src/test/java/com/linkedin/metadata/utils/elasticsearch/IndexConventionImplTest.java +++ b/metadata-utils/src/test/java/com/linkedin/metadata/utils/elasticsearch/IndexConventionImplTest.java @@ -10,7 +10,7 @@ public class IndexConventionImplTest { @Test public void testIndexConventionNoPrefix() { - IndexConvention indexConventionNoPrefix = IndexConventionImpl.NO_PREFIX; + IndexConvention indexConventionNoPrefix = IndexConventionImpl.noPrefix("MD5"); String entityName = "dataset"; String expectedIndexName = "datasetindex_v2"; assertEquals(indexConventionNoPrefix.getEntityIndexName(entityName), expectedIndexName); @@ -25,7 +25,7 @@ public void testIndexConventionNoPrefix() { @Test public void testIndexConventionPrefix() { - IndexConvention indexConventionPrefix = new IndexConventionImpl("prefix"); + IndexConvention indexConventionPrefix = new IndexConventionImpl("prefix", "MD5"); String entityName = "dataset"; String expectedIndexName = "prefix_datasetindex_v2"; assertEquals(indexConventionPrefix.getEntityIndexName(entityName), expectedIndexName); @@ -42,7 +42,7 @@ public void testIndexConventionPrefix() { @Test public void testTimeseriesIndexConventionNoPrefix() { - IndexConvention indexConventionNoPrefix = IndexConventionImpl.NO_PREFIX; + IndexConvention indexConventionNoPrefix = IndexConventionImpl.noPrefix("MD5"); String entityName = "dataset"; String aspectName = "datasetusagestatistics"; String expectedIndexName = "dataset_datasetusagestatisticsaspect_v1"; @@ -64,7 +64,7 @@ public void testTimeseriesIndexConventionNoPrefix() { @Test public void testTimeseriesIndexConventionPrefix() { - IndexConvention indexConventionPrefix = new IndexConventionImpl("prefix"); + IndexConvention indexConventionPrefix = new IndexConventionImpl("prefix", "MD5"); String entityName = "dataset"; String aspectName = "datasetusagestatistics"; String expectedIndexName = "prefix_dataset_datasetusagestatisticsaspect_v1"; diff --git a/smoke-test/run-quickstart.sh b/smoke-test/run-quickstart.sh index 2bf5cdf8ca9c44..eb0d46b3172442 100755 --- a/smoke-test/run-quickstart.sh +++ b/smoke-test/run-quickstart.sh @@ -16,8 +16,6 @@ DATAHUB_SEARCH_TAG="${DATAHUB_SEARCH_TAG:=2.9.0}" XPACK_SECURITY_ENABLED="${XPACK_SECURITY_ENABLED:=plugins.security.disabled=true}" ELASTICSEARCH_USE_SSL="${ELASTICSEARCH_USE_SSL:=false}" USE_AWS_ELASTICSEARCH="${USE_AWS_ELASTICSEARCH:=true}" -ELASTIC_ID_HASH_ALGO="${ELASTIC_ID_HASH_ALGO:=MD5}" - DATAHUB_TELEMETRY_ENABLED=false \ DOCKER_COMPOSE_BASE="file://$( dirname "$DIR" )" \ diff --git a/smoke-test/set-test-env-vars.sh b/smoke-test/set-test-env-vars.sh index dee3af2b68747a..4668721f80de08 100644 --- a/smoke-test/set-test-env-vars.sh +++ b/smoke-test/set-test-env-vars.sh @@ -1,3 +1,2 @@ export DATAHUB_KAFKA_SCHEMA_REGISTRY_URL=http://localhost:8080/schema-registry/api -export DATAHUB_GMS_URL=http://localhost:8080 -export ELASTIC_ID_HASH_ALGO="MD5" \ No newline at end of file +export DATAHUB_GMS_URL=http://localhost:8080 \ No newline at end of file