Skip to content

Commit

Permalink
Revert changes to beekeeper-path
Browse files Browse the repository at this point in the history
  • Loading branch information
Hamza Jugon committed Nov 26, 2024
1 parent eb13799 commit 812565e
Show file tree
Hide file tree
Showing 10 changed files with 42 additions and 270 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,15 @@ public class S3PathCleaner implements PathCleaner {
private IcebergValidator icebergValidator;

public S3PathCleaner(S3Client s3Client, SentinelFilesCleaner sentinelFilesCleaner,
BytesDeletedReporter bytesDeletedReporter, IcebergValidator icebergValidator) {
BytesDeletedReporter bytesDeletedReporter) {
this.s3Client = s3Client;
this.sentinelFilesCleaner = sentinelFilesCleaner;
this.bytesDeletedReporter = bytesDeletedReporter;
this.icebergValidator = icebergValidator;
}

@Override
@TimedTaggable("s3-paths-deleted")
public void cleanupPath(HousekeepingEntity housekeepingEntity) {
icebergValidator.throwExceptionIfIceberg(housekeepingEntity.getDatabaseName(), housekeepingEntity.getTableName());
S3SchemeURI s3SchemeURI = new S3SchemeURI(housekeepingEntity.getPath());
String key = s3SchemeURI.getKey();
String bucket = s3SchemeURI.getBucket();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import com.amazonaws.services.s3.AmazonS3ClientBuilder;

import com.expediagroup.beekeeper.cleanup.monitoring.BytesDeletedReporter;
import com.expediagroup.beekeeper.cleanup.validation.IcebergValidator;
import com.expediagroup.beekeeper.core.model.HousekeepingPath;
import com.expediagroup.beekeeper.core.model.PeriodDuration;

Expand All @@ -58,8 +57,8 @@ class S3DryRunPathCleanerTest {

private HousekeepingPath housekeepingPath;
private AmazonS3 amazonS3;
@Mock private BytesDeletedReporter bytesDeletedReporter;
@Mock private IcebergValidator icebergValidator;
private @Mock BytesDeletedReporter bytesDeletedReporter;


private boolean dryRunEnabled = true;

Expand All @@ -83,7 +82,7 @@ void setUp() {
.getObjectSummaries()
.forEach(object -> amazonS3.deleteObject(bucket, object.getKey()));
S3Client s3Client = new S3Client(amazonS3, dryRunEnabled);
s3DryRunPathCleaner = new S3PathCleaner(s3Client, new S3SentinelFilesCleaner(s3Client), bytesDeletedReporter, icebergValidator);
s3DryRunPathCleaner = new S3PathCleaner(s3Client, new S3SentinelFilesCleaner(s3Client), bytesDeletedReporter);
housekeepingPath = HousekeepingPath
.builder()
.path(absolutePath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@
import java.util.List;

import org.apache.hadoop.fs.s3a.BasicAWSCredentialsProvider;
import org.junit.Rule;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.testcontainers.containers.localstack.LocalStackContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

Expand All @@ -55,10 +55,8 @@
import com.amazonaws.services.s3.model.S3ObjectSummary;

import com.expediagroup.beekeeper.cleanup.monitoring.BytesDeletedReporter;
import com.expediagroup.beekeeper.cleanup.validation.IcebergValidator;
import com.expediagroup.beekeeper.core.config.FileSystemType;
import com.expediagroup.beekeeper.core.error.BeekeeperException;
import com.expediagroup.beekeeper.core.error.BeekeeperIcebergException;
import com.expediagroup.beekeeper.core.model.HousekeepingPath;
import com.expediagroup.beekeeper.core.model.PeriodDuration;

Expand All @@ -80,16 +78,19 @@ class S3PathCleanerTest {
private S3Client s3Client;
private S3SentinelFilesCleaner s3SentinelFilesCleaner;
private @Mock BytesDeletedReporter bytesDeletedReporter;
private @Mock IcebergValidator icebergValidator;

private S3PathCleaner s3PathCleaner;

@Container
@Rule
public static LocalStackContainer awsContainer = new LocalStackContainer(
DockerImageName.parse("localstack/localstack:0.14.2")).withServices(S3);
static {
awsContainer.start();
}
public static String S3_ENDPOINT = awsContainer.getEndpointConfiguration(S3).getServiceEndpoint();

@BeforeEach
void setUp() {
String S3_ENDPOINT = awsContainer.getEndpointConfiguration(S3).getServiceEndpoint();
amazonS3 = AmazonS3ClientBuilder
.standard()
.withCredentials(new BasicAWSCredentialsProvider("accesskey", "secretkey"))
Expand All @@ -103,7 +104,7 @@ void setUp() {
boolean dryRunEnabled = false;
s3Client = new S3Client(amazonS3, dryRunEnabled);
s3SentinelFilesCleaner = new S3SentinelFilesCleaner(s3Client);
s3PathCleaner = new S3PathCleaner(s3Client, s3SentinelFilesCleaner, bytesDeletedReporter, icebergValidator);
s3PathCleaner = new S3PathCleaner(s3Client, s3SentinelFilesCleaner, bytesDeletedReporter);
String tableName = "table";
String databaseName = "database";
housekeepingPath = HousekeepingPath
Expand Down Expand Up @@ -256,7 +257,7 @@ void sentinelFilesCleanerThrowsException() {

amazonS3.putObject(bucket, key1, content);

s3PathCleaner = new S3PathCleaner(s3Client, s3SentinelFilesCleaner, bytesDeletedReporter, icebergValidator);
s3PathCleaner = new S3PathCleaner(s3Client, s3SentinelFilesCleaner, bytesDeletedReporter);
assertThatCode(() -> s3PathCleaner.cleanupPath(housekeepingPath)).doesNotThrowAnyException();
assertThat(amazonS3.doesObjectExist(bucket, key1)).isFalse();
}
Expand Down Expand Up @@ -321,7 +322,7 @@ void sentinelFilesForParentsAndPathWithTrailingSlash() {
@Test
void noBytesDeletedMetricWhenFileDeletionFails() {
S3Client mockS3Client = mock(S3Client.class);
s3PathCleaner = new S3PathCleaner(mockS3Client, s3SentinelFilesCleaner, bytesDeletedReporter, icebergValidator);
s3PathCleaner = new S3PathCleaner(mockS3Client, s3SentinelFilesCleaner, bytesDeletedReporter);
when(mockS3Client.doesObjectExist(bucket, key1)).thenReturn(true);
ObjectMetadata objectMetadata = new ObjectMetadata();
objectMetadata.setContentLength(10);
Expand All @@ -337,7 +338,7 @@ void noBytesDeletedMetricWhenFileDeletionFails() {
@Test
void noBytesDeletedMetricWhenDirectoryDeletionFails() {
S3Client mockS3Client = mock(S3Client.class);
s3PathCleaner = new S3PathCleaner(mockS3Client, s3SentinelFilesCleaner, bytesDeletedReporter, icebergValidator);
s3PathCleaner = new S3PathCleaner(mockS3Client, s3SentinelFilesCleaner, bytesDeletedReporter);
doThrow(AmazonServiceException.class).when(mockS3Client).listObjects(bucket, keyRootAsDirectory);

assertThatExceptionOfType(AmazonServiceException.class)
Expand All @@ -350,7 +351,7 @@ void reportBytesDeletedWhenDirectoryDeletionPartiallyFails() {
AmazonS3 mockAmazonS3 = mock(AmazonS3.class);
S3Client mockS3Client = new S3Client(mockAmazonS3, false);
mockOneOutOfTwoObjectsDeleted(mockAmazonS3);
s3PathCleaner = new S3PathCleaner(mockS3Client, s3SentinelFilesCleaner, bytesDeletedReporter, icebergValidator);
s3PathCleaner = new S3PathCleaner(mockS3Client, s3SentinelFilesCleaner, bytesDeletedReporter);
assertThatExceptionOfType(BeekeeperException.class)
.isThrownBy(() -> s3PathCleaner.cleanupPath(housekeepingPath))
.withMessage(format("Not all files could be deleted at path \"%s/%s\"; deleted 1/2 objects. "
Expand All @@ -367,37 +368,6 @@ void extractingURIFails() {
.withMessage(format("'%s' is not an S3 path.", path));
}

@Test
void shouldThrowBeekeeperIcebergExceptionWhenIcebergTableDetected() {
doThrow(new BeekeeperIcebergException("Iceberg tables are not supported"))
.when(icebergValidator)
.throwExceptionIfIceberg(housekeepingPath.getDatabaseName(), housekeepingPath.getTableName());

assertThatExceptionOfType(BeekeeperIcebergException.class)
.isThrownBy(() -> s3PathCleaner.cleanupPath(housekeepingPath))
.withMessage("Iceberg tables are not supported");

verify(icebergValidator).throwExceptionIfIceberg(housekeepingPath.getDatabaseName(), housekeepingPath.getTableName());
verifyNoInteractions(bytesDeletedReporter);
}

@Test
void shouldProceedWithDeletionWhenNotIcebergTable() {
amazonS3.putObject(bucket, key1, content);
amazonS3.putObject(bucket, key2, content);

housekeepingPath.setPath("s3://" + bucket + "/" + keyRoot);

assertThatCode(() -> s3PathCleaner.cleanupPath(housekeepingPath))
.doesNotThrowAnyException();

assertThat(amazonS3.doesObjectExist(bucket, key1)).isFalse();
assertThat(amazonS3.doesObjectExist(bucket, key2)).isFalse();

long expectedBytesDeleted = content.getBytes().length * 2L; // 11 bytes('some content') * 2 = 22 bytes
verify(bytesDeletedReporter).reportTaggable(expectedBytesDeleted, housekeepingPath, FileSystemType.S3);
}

private void mockOneOutOfTwoObjectsDeleted(AmazonS3 mockAmazonS3) {
S3ObjectSummary s3ObjectSummary = new S3ObjectSummary();
s3ObjectSummary.setBucketName(bucket);
Expand All @@ -415,4 +385,4 @@ private void mockOneOutOfTwoObjectsDeleted(AmazonS3 mockAmazonS3) {
when(mockAmazonS3.deleteObjects(any(DeleteObjectsRequest.class)))
.thenReturn(new DeleteObjectsResult(List.of(deletedObject)));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@
*/
package com.expediagroup.beekeeper.integration;

import static org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY;
import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
import static org.apache.hadoop.fs.s3a.Constants.SECRET_KEY;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.testcontainers.containers.localstack.LocalStackContainer.Service.S3;
Expand All @@ -28,20 +25,17 @@

import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.awaitility.Duration;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.junit.jupiter.MockitoExtension;
import org.testcontainers.containers.localstack.LocalStackContainer;
import org.testcontainers.junit.jupiter.Container;
Expand All @@ -53,16 +47,12 @@

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.CreateBucketRequest;
import com.google.common.collect.ImmutableMap;

import com.expediagroup.beekeeper.cleanup.monitoring.BytesDeletedReporter;
import com.expediagroup.beekeeper.integration.utils.ContainerTestUtils;
import com.expediagroup.beekeeper.integration.utils.HiveTestUtils;
import com.expediagroup.beekeeper.integration.utils.TestAppender;
import com.expediagroup.beekeeper.path.cleanup.BeekeeperPathCleanup;

import com.hotels.beeju.extensions.ThriftHiveMetaStoreJUnitExtension;

@Testcontainers
@ExtendWith(MockitoExtension.class)
public class BeekeeperDryRunPathCleanupIntegrationTest extends BeekeeperIntegrationTestBase {
Expand All @@ -72,12 +62,6 @@ public class BeekeeperDryRunPathCleanupIntegrationTest extends BeekeeperIntegrat
private static final String SCHEDULER_DELAY_MS_PROPERTY = "properties.scheduler-delay-ms";
private static final String DRY_RUN_ENABLED_PROPERTY = "properties.dry-run-enabled";
private static final String AWS_S3_ENDPOINT_PROPERTY = "aws.s3.endpoint";
private static final String METASTORE_URI_PROPERTY = "properties.metastore-uri";
private static final String AWS_DISABLE_GET_VALIDATION_PROPERTY = "com.amazonaws.services.s3.disableGetObjectMD5Validation";
private static final String AWS_DISABLE_PUT_VALIDATION_PROPERTY = "com.amazonaws.services.s3.disablePutObjectMD5Validation";

private static final String S3_ACCESS_KEY = "access";
private static final String S3_SECRET_KEY = "secret";

private static final String BUCKET = "test-path-bucket";
private static final String DB_AND_TABLE_PREFIX = DATABASE_NAME_VALUE + "/" + TABLE_NAME_VALUE;
Expand All @@ -99,37 +83,17 @@ public class BeekeeperDryRunPathCleanupIntegrationTest extends BeekeeperIntegrat

@Container
private static final LocalStackContainer S3_CONTAINER = ContainerTestUtils.awsContainer(S3);
static {
S3_CONTAINER.start();
}
private static AmazonS3 amazonS3;

private static final String S3_ENDPOINT = ContainerTestUtils.awsServiceEndpoint(S3_CONTAINER, S3);

private final ExecutorService executorService = Executors.newFixedThreadPool(1);
private final TestAppender appender = new TestAppender();

private static Map<String, String> metastoreProperties = ImmutableMap
.<String, String>builder()
.put(ENDPOINT, S3_ENDPOINT)
.put(ACCESS_KEY, S3_ACCESS_KEY)
.put(SECRET_KEY, S3_SECRET_KEY)
.build();

@RegisterExtension
public ThriftHiveMetaStoreJUnitExtension thriftHiveMetaStore = new ThriftHiveMetaStoreJUnitExtension(
DATABASE_NAME_VALUE, metastoreProperties);
private HiveTestUtils hiveTestUtils;
private HiveMetaStoreClient metastoreClient;

@BeforeAll
public static void init() {
System.setProperty(SPRING_PROFILES_ACTIVE_PROPERTY, SPRING_PROFILES_ACTIVE);
System.setProperty(SCHEDULER_DELAY_MS_PROPERTY, SCHEDULER_DELAY_MS);
System.setProperty(DRY_RUN_ENABLED_PROPERTY, DRY_RUN_ENABLED);
System.setProperty(AWS_S3_ENDPOINT_PROPERTY, S3_ENDPOINT);
System.setProperty(AWS_DISABLE_GET_VALIDATION_PROPERTY, "true");
System.setProperty(AWS_DISABLE_PUT_VALIDATION_PROPERTY, "true");
System.setProperty(AWS_S3_ENDPOINT_PROPERTY, ContainerTestUtils.awsServiceEndpoint(S3_CONTAINER, S3));

amazonS3 = ContainerTestUtils.s3Client(S3_CONTAINER, AWS_REGION);
amazonS3.createBucket(new CreateBucketRequest(BUCKET, AWS_REGION));
Expand All @@ -141,18 +105,12 @@ public static void teardown() {
System.clearProperty(SCHEDULER_DELAY_MS_PROPERTY);
System.clearProperty(DRY_RUN_ENABLED_PROPERTY);
System.clearProperty(AWS_S3_ENDPOINT_PROPERTY);
System.clearProperty(METASTORE_URI_PROPERTY);

amazonS3.shutdown();
S3_CONTAINER.stop();
}

@BeforeEach
public void setup() {
System.setProperty(METASTORE_URI_PROPERTY, thriftHiveMetaStore.getThriftConnectionUri());
metastoreClient = thriftHiveMetaStore.client();
hiveTestUtils = new HiveTestUtils(metastoreClient);

amazonS3.listObjectsV2(BUCKET)
.getObjectSummaries()
.forEach(object -> amazonS3.deleteObject(BUCKET, object.getKey()));
Expand Down Expand Up @@ -289,4 +247,4 @@ private void assertS3ClientLogs(int expected) {
}
assertThat(logsFromS3Client).isEqualTo(expected);
}
}
}
Loading

0 comments on commit 812565e

Please sign in to comment.