Skip to content

Commit

Permalink
cleaning up
Browse files Browse the repository at this point in the history
  • Loading branch information
javsanbel2 committed Nov 27, 2024
1 parent 95e6c64 commit 804be2f
Show file tree
Hide file tree
Showing 10 changed files with 11 additions and 182 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,23 +126,6 @@ public Map<String, String> getTableProperties(String databaseName, String tableN
}
}

@Override
public String getOutputFormat(String databaseName, String tableName) {
String result = null;
try {
Table table = client.getTable(databaseName, tableName);
if (table.getSd() != null) {
result = table.getSd().getOutputFormat();
}
} catch (NoSuchObjectException e) {
log.warn("Table {}.{} does not exist", databaseName, tableName);
} catch (TException e) {
throw new BeekeeperException(
"Unexpected exception when getting output format for \"" + databaseName + "." + tableName + ".", e);
}
return result;
}

@Override
public void close() {
client.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,4 @@ public interface CleanerClient extends Closeable {
boolean tableExists(String databaseName, String tableName);

Map<String, String> getTableProperties(String databaseName, String tableName);

String getOutputFormat(String databaseName, String tableName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,8 @@ public void throwExceptionIfIceberg(String databaseName, String tableName) {
Map<String, String> parameters = client.getTableProperties(databaseName, tableName);
String tableType = parameters.getOrDefault("table_type", "").toLowerCase();
String format = parameters.getOrDefault("format", "").toLowerCase();
String outputFormat = client.getOutputFormat(databaseName, tableName);
if (tableType.contains("iceberg") || format.contains("iceberg") || (outputFormat != null
&& outputFormat.toLowerCase().contains("iceberg"))) {
String metadataLocation = parameters.getOrDefault("metadata_location", "").toLowerCase();
if (tableType.contains("iceberg") || format.contains("iceberg") || !metadataLocation.isEmpty()) {
throw new BeekeeperIcebergException(
format("Iceberg table %s.%s is not currently supported in Beekeeper.", databaseName, tableName));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
Expand All @@ -46,7 +45,6 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.testcontainers.containers.localstack.LocalStackContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
Expand All @@ -56,7 +54,6 @@
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;

import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.CreateQueueResult;
import com.amazonaws.services.sqs.model.PurgeQueueRequest;
import com.amazonaws.services.sqs.model.SendMessageRequest;

Expand All @@ -67,72 +64,50 @@
import com.expediagroup.beekeeper.integration.model.AlterTableSqsMessage;
import com.expediagroup.beekeeper.integration.model.CreateTableSqsMessage;
import com.expediagroup.beekeeper.integration.utils.ContainerTestUtils;
import com.expediagroup.beekeeper.integration.utils.HiveTestUtils;
import com.expediagroup.beekeeper.scheduler.apiary.BeekeeperSchedulerApiary;

import com.hotels.beeju.extensions.ThriftHiveMetaStoreJUnitExtension;

@Testcontainers
public class BeekeeperExpiredMetadataSchedulerApiaryIntegrationTest extends BeekeeperIntegrationTestBase {

private static final int TIMEOUT = 30;
private static final String DRY_RUN_ENABLED_PROPERTY = "properties.dry-run-enabled";
private static final String APIARY_QUEUE_URL_PROPERTY = "properties.apiary.queue-url";
private static final String METASTORE_URI_PROPERTY = "properties.metastore-uri";

private static final String QUEUE = "apiary-receiver-queue";
private static final String SCHEDULED_EXPIRED_METRIC = "metadata-scheduled";
private static final String HEALTHCHECK_URI = "http://localhost:8080/actuator/health";
private static final String PROMETHEUS_URI = "http://localhost:8080/actuator/prometheus";

private static final String S3_ACCESS_KEY = "access";
private static final String S3_SECRET_KEY = "secret";

private static final String PARTITION_KEYS = "{ \"event_date\": \"date\", \"event_hour\": \"smallint\"}";
private static final String PARTITION_A_VALUES = "[ \"2020-01-01\", \"0\" ]";
private static final String PARTITION_B_VALUES = "[ \"2020-01-01\", \"1\" ]";
private static final String PARTITION_A_NAME = "event_date=2020-01-01/event_hour=0";
private static final String PARTITION_B_NAME = "event_date=2020-01-01/event_hour=1";
private static final String LOCATION_A = "s3://bucket/table1/partition";
private static final String LOCATION_B = "s3://bucket/table2/partition";
private static final String TABLE_PATH = "/tmp/bucket/" + DATABASE_NAME_VALUE + "/" + TABLE_NAME_VALUE + "/";

@Container
private static final LocalStackContainer SQS_CONTAINER = ContainerTestUtils.awsContainer(SQS);
private static AmazonSQS amazonSQS;
private static String queueUrl;

@RegisterExtension
public ThriftHiveMetaStoreJUnitExtension thriftHiveMetaStore = new ThriftHiveMetaStoreJUnitExtension(
DATABASE_NAME_VALUE);

private HiveTestUtils hiveTestUtils;
private HiveMetaStoreClient metastoreClient;

@BeforeAll
public static void init() {
System.setProperty(DRY_RUN_ENABLED_PROPERTY, "false");
amazonSQS = ContainerTestUtils.sqsClient(SQS_CONTAINER, AWS_REGION);
CreateQueueResult queue = amazonSQS.createQueue(QUEUE);
queueUrl = queue.getQueueUrl();
String queueUrl = ContainerTestUtils.queueUrl(SQS_CONTAINER, QUEUE);
System.setProperty(APIARY_QUEUE_URL_PROPERTY, queueUrl);

amazonSQS = ContainerTestUtils.sqsClient(SQS_CONTAINER, AWS_REGION);
amazonSQS.createQueue(QUEUE);
}

@AfterAll
public static void teardown() {
System.clearProperty(APIARY_QUEUE_URL_PROPERTY);
System.clearProperty(DRY_RUN_ENABLED_PROPERTY);

amazonSQS.shutdown();
}

@BeforeEach
public void setup() {
System.setProperty(METASTORE_URI_PROPERTY, thriftHiveMetaStore.getThriftConnectionUri());
metastoreClient = thriftHiveMetaStore.client();
hiveTestUtils = new HiveTestUtils(metastoreClient);

amazonSQS.purgeQueue(new PurgeQueueRequest(queueUrl));
amazonSQS.purgeQueue(new PurgeQueueRequest(ContainerTestUtils.queueUrl(SQS_CONTAINER, QUEUE)));
executorService.execute(() -> BeekeeperSchedulerApiary.main(new String[] {}));
await().atMost(Duration.ONE_MINUTE).until(BeekeeperSchedulerApiary::isRunning);
}
Expand Down Expand Up @@ -255,7 +230,7 @@ public void prometheus() {
}

private SendMessageRequest sendMessageRequest(String payload) {
return new SendMessageRequest(queueUrl, payload);
return new SendMessageRequest(ContainerTestUtils.queueUrl(SQS_CONTAINER, QUEUE), payload);
}

private void assertExpiredMetadata(HousekeepingMetadata actual, String expectedPath, String partitionName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.testcontainers.containers.localstack.LocalStackContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
Expand All @@ -67,14 +66,11 @@
import com.expediagroup.beekeeper.integration.utils.ContainerTestUtils;
import com.expediagroup.beekeeper.scheduler.apiary.BeekeeperSchedulerApiary;

import com.hotels.beeju.extensions.ThriftHiveMetaStoreJUnitExtension;

@Testcontainers
public class BeekeeperUnreferencedPathSchedulerApiaryIntegrationTest extends BeekeeperIntegrationTestBase {

private static final int TIMEOUT = 5;
private static final String APIARY_QUEUE_URL_PROPERTY = "properties.apiary.queue-url";
private static final String DRY_RUN_ENABLED_PROPERTY = "properties.dry-run-enabled";

private static final String QUEUE = "apiary-receiver-queue";
private static final String SCHEDULED_ORPHANED_METRIC = "paths-scheduled";
Expand All @@ -85,10 +81,6 @@ public class BeekeeperUnreferencedPathSchedulerApiaryIntegrationTest extends Bee
private static final LocalStackContainer SQS_CONTAINER = ContainerTestUtils.awsContainer(SQS);
private static AmazonSQS amazonSQS;

@RegisterExtension
public ThriftHiveMetaStoreJUnitExtension thriftHiveMetaStore = new ThriftHiveMetaStoreJUnitExtension(
DATABASE_NAME_VALUE);

@BeforeAll
public static void init() {
String queueUrl = ContainerTestUtils.queueUrl(SQS_CONTAINER, QUEUE);
Expand All @@ -101,17 +93,12 @@ public static void init() {
@AfterAll
public static void teardown() {
System.clearProperty(APIARY_QUEUE_URL_PROPERTY);
System.clearProperty("properties.metastore-uri");
System.clearProperty("properties.dry-run-enabled");

amazonSQS.shutdown();
}

@BeforeEach
public void setup() {
System.setProperty("properties.metastore-uri", thriftHiveMetaStore.getThriftConnectionUri());
System.setProperty("properties.dry-run-enabled", "false");

amazonSQS.purgeQueue(new PurgeQueueRequest(ContainerTestUtils.queueUrl(SQS_CONTAINER, QUEUE)));
executorService.execute(() -> BeekeeperSchedulerApiary.main(new String[] {}));
await().atMost(Duration.ONE_MINUTE).until(BeekeeperSchedulerApiary::isRunning);
Expand All @@ -121,9 +108,6 @@ public void setup() {
public void stop() throws InterruptedException {
BeekeeperSchedulerApiary.stop();
executorService.awaitTermination(5, TimeUnit.SECONDS);

System.clearProperty("properties.metastore-uri");
System.clearProperty("properties.dry-run-enabled");
}

@Test
Expand Down Expand Up @@ -173,7 +157,7 @@ public void unreferencedAlterPartitionEvent() throws SQLException, IOException,
public void unreferencedMultipleAlterPartitionEvent() throws IOException, SQLException, URISyntaxException {
List
.of(new AlterPartitionSqsMessage("s3://bucket/table/expiredTableLocation",
"s3://bucket/table/partitionLocation", "s3://bucket/table/unreferencedPartitionLocation", true, true),
"s3://bucket/table/partitionLocation", "s3://bucket/table/unreferencedPartitionLocation", true, true),
new AlterPartitionSqsMessage("s3://bucket/table/expiredTableLocation2",
"s3://bucket/table/partitionLocation2", "s3://bucket/table/partitionLocation", true, true))
.forEach(msg -> amazonSQS.sendMessage(sendMessageRequest(msg.getFormattedString())));
Expand Down
28 changes: 0 additions & 28 deletions beekeeper-scheduler-apiary/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,31 +11,8 @@

<artifactId>beekeeper-scheduler-apiary</artifactId>

<properties>
<hadoop.version>2.8.1</hadoop.version>
<hive.version>2.3.7</hive.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>

<!-- Hive -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sts</artifactId>
Expand All @@ -46,11 +23,6 @@
<artifactId>beekeeper-scheduler</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.expediagroup</groupId>
<artifactId>beekeeper-cleanup</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>ch.qos.logback</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@

import java.util.EnumMap;
import java.util.List;
import java.util.function.Supplier;

import org.apache.hadoop.hive.conf.HiveConf;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.domain.EntityScan;
Expand All @@ -39,9 +37,6 @@
import com.expedia.apiary.extensions.receiver.common.messaging.MessageReader;
import com.expedia.apiary.extensions.receiver.sqs.messaging.SqsMessageReader;

import com.expediagroup.beekeeper.cleanup.hive.HiveClientFactory;
import com.expediagroup.beekeeper.cleanup.metadata.CleanerClientFactory;
import com.expediagroup.beekeeper.cleanup.validation.IcebergValidator;
import com.expediagroup.beekeeper.core.model.LifecycleEventType;
import com.expediagroup.beekeeper.scheduler.apiary.filter.EventTypeListenerEventFilter;
import com.expediagroup.beekeeper.scheduler.apiary.filter.ListenerEventFilter;
Expand All @@ -57,10 +52,6 @@
import com.expediagroup.beekeeper.scheduler.apiary.messaging.RetryingMessageReader;
import com.expediagroup.beekeeper.scheduler.service.SchedulerService;

import com.hotels.hcommon.hive.metastore.client.api.CloseableMetaStoreClient;
import com.hotels.hcommon.hive.metastore.client.closeable.CloseableMetaStoreClientFactory;
import com.hotels.hcommon.hive.metastore.client.supplier.HiveMetaStoreClientSupplier;

@Configuration
@ComponentScan(basePackages = { "com.expediagroup.beekeeper.core", "com.expediagroup.beekeeper.scheduler" })
@EntityScan(basePackages = { "com.expediagroup.beekeeper.core" })
Expand Down Expand Up @@ -148,35 +139,4 @@ public BeekeeperEventReader eventReader(

return new MessageReaderAdapter(messageReader, handlers);
}

@Bean
public HiveConf hiveConf(@Value("${properties.metastore-uri}") String metastoreUri) {
HiveConf conf = new HiveConf();
conf.setVar(HiveConf.ConfVars.METASTOREURIS, metastoreUri);
return conf;
}

@Bean
public CloseableMetaStoreClientFactory metaStoreClientFactory() {
return new CloseableMetaStoreClientFactory();
}

@Bean
Supplier<CloseableMetaStoreClient> metaStoreClientSupplier(
CloseableMetaStoreClientFactory metaStoreClientFactory, HiveConf hiveConf) {
String name = "beekeeper-scheduler-apiary";
return new HiveMetaStoreClientSupplier(metaStoreClientFactory, hiveConf, name);
}

@Bean(name = "hiveClientFactory")
public CleanerClientFactory clientFactory(
Supplier<CloseableMetaStoreClient> metaStoreClientSupplier,
@Value("${properties.dry-run-enabled}") boolean dryRunEnabled) {
return new HiveClientFactory(metaStoreClientSupplier, dryRunEnabled);
}

@Bean
public IcebergValidator icebergValidator(CleanerClientFactory clientFactory) {
return new IcebergValidator(clientFactory);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,3 @@ public boolean isFiltered(ListenerEvent listenerEvent, LifecycleEventType lifecy
return !Boolean.parseBoolean(tableParameters.get(lifecycleEventType.getTableParameterName()));
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.io.IOException;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import org.slf4j.Logger;
Expand All @@ -29,16 +28,12 @@
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import com.expediagroup.beekeeper.cleanup.validation.IcebergValidator;
import com.expediagroup.beekeeper.core.error.BeekeeperException;
import com.expediagroup.beekeeper.core.error.BeekeeperIcebergException;
import com.expediagroup.beekeeper.core.model.HousekeepingEntity;
import com.expediagroup.beekeeper.core.model.LifecycleEventType;
import com.expediagroup.beekeeper.scheduler.apiary.messaging.BeekeeperEventReader;
import com.expediagroup.beekeeper.scheduler.apiary.messaging.MessageReaderAdapter;
import com.expediagroup.beekeeper.scheduler.apiary.model.BeekeeperEvent;
import com.expediagroup.beekeeper.scheduler.service.SchedulerService;
import com.expedia.apiary.extensions.receiver.common.event.ListenerEvent;

@Component
public class SchedulerApiary {
Expand All @@ -47,17 +42,14 @@ public class SchedulerApiary {

private final BeekeeperEventReader beekeeperEventReader;
private final EnumMap<LifecycleEventType, SchedulerService> schedulerServiceMap;
private final IcebergValidator icebergValidator;

@Autowired
public SchedulerApiary(
BeekeeperEventReader beekeeperEventReader,
EnumMap<LifecycleEventType, SchedulerService> schedulerServiceMap,
IcebergValidator icebergValidator
EnumMap<LifecycleEventType, SchedulerService> schedulerServiceMap
) {
this.beekeeperEventReader = beekeeperEventReader;
this.schedulerServiceMap = schedulerServiceMap;
this.icebergValidator = icebergValidator;
}

@Transactional
Expand All @@ -69,13 +61,9 @@ public void scheduleBeekeeperEvent() {

for (HousekeepingEntity entity : housekeepingEntities) {
try {
icebergValidator.throwExceptionIfIceberg(entity.getDatabaseName(), entity.getTableName());

LifecycleEventType eventType = LifecycleEventType.valueOf(entity.getLifecycleType());
SchedulerService scheduler = schedulerServiceMap.get(eventType);
scheduler.scheduleForHousekeeping(entity);
} catch (BeekeeperIcebergException e) {
log.warn("Iceberg table are not supported in Beekeeper. Deleting message from queue", e);
} catch (Exception e) {
throw new BeekeeperException(format(
"Unable to schedule %s deletion for entity, this message will go back on the queue",
Expand Down
Loading

0 comments on commit 804be2f

Please sign in to comment.