Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,28 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.rest.PlanStatus;
import org.apache.iceberg.rest.credentials.Credential;

public class PlanTableScanResponse extends BaseScanTaskResponse {
private final PlanStatus planStatus;
private final String planId;
private final List<Credential> credentials;

private PlanTableScanResponse(
PlanStatus planStatus,
String planId,
List<String> planTasks,
List<FileScanTask> fileScanTasks,
List<DeleteFile> deleteFiles,
Map<Integer, PartitionSpec> specsById) {
Map<Integer, PartitionSpec> specsById,
List<Credential> credentials) {
super(planTasks, fileScanTasks, deleteFiles, specsById);
this.planStatus = planStatus;
this.planId = planId;
this.credentials = credentials;
validate();
}

Expand All @@ -52,6 +58,10 @@ public String planId() {
return planId;
}

public List<Credential> credentials() {
return credentials != null ? credentials : ImmutableList.of();
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
Expand Down Expand Up @@ -90,6 +100,7 @@ public static Builder builder() {
public static class Builder extends BaseScanTaskResponse.Builder<Builder, PlanTableScanResponse> {
private PlanStatus planStatus;
private String planId;
private final List<Credential> credentials = Lists.newArrayList();

public Builder withPlanStatus(PlanStatus status) {
this.planStatus = status;
Expand All @@ -101,10 +112,21 @@ public Builder withPlanId(String id) {
return this;
}

public Builder withCredentials(List<Credential> credentialsToAdd) {
credentials.addAll(credentialsToAdd);
return this;
}

@Override
public PlanTableScanResponse build() {
return new PlanTableScanResponse(
planStatus, planId, planTasks(), fileScanTasks(), deleteFiles(), specsById());
planStatus,
planId,
planTasks(),
fileScanTasks(),
deleteFiles(),
specsById(),
credentials);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,18 @@
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.rest.PlanStatus;
import org.apache.iceberg.rest.TableScanResponseParser;
import org.apache.iceberg.rest.credentials.Credential;
import org.apache.iceberg.rest.credentials.CredentialParser;
import org.apache.iceberg.util.JsonUtil;

public class PlanTableScanResponseParser {
private static final String PLAN_STATUS = "plan-status";
private static final String PLAN_ID = "plan-id";
private static final String PLAN_TASKS = "plan-tasks";
private static final String STORAGE_CREDENTIALS = "storage-credentials";

private PlanTableScanResponseParser() {}

Expand Down Expand Up @@ -64,6 +68,15 @@ public static void toJson(PlanTableScanResponse response, JsonGenerator gen) thr
JsonUtil.writeStringArray(PLAN_TASKS, response.planTasks(), gen);
}

if (!response.credentials().isEmpty()) {
gen.writeArrayFieldStart(STORAGE_CREDENTIALS);
for (Credential credential : response.credentials()) {
CredentialParser.toJson(credential, gen);
}

gen.writeEndArray();
}

TableScanResponseParser.serializeScanTasks(
response.fileScanTasks(), response.deleteFiles(), response.specsById(), gen);

Expand Down Expand Up @@ -92,13 +105,28 @@ public static PlanTableScanResponse fromJson(
List<FileScanTask> fileScanTasks =
TableScanResponseParser.parseFileScanTasks(json, deleteFiles, specsById, caseSensitive);

return PlanTableScanResponse.builder()
.withPlanId(planId)
.withPlanStatus(planStatus)
.withPlanTasks(planTasks)
.withFileScanTasks(fileScanTasks)
.withDeleteFiles(deleteFiles)
.withSpecsById(specsById)
.build();
PlanTableScanResponse.Builder builder =
PlanTableScanResponse.builder()
.withPlanId(planId)
.withPlanStatus(planStatus)
.withPlanTasks(planTasks)
.withFileScanTasks(fileScanTasks)
.withDeleteFiles(deleteFiles)
.withSpecsById(specsById);

if (json.hasNonNull(STORAGE_CREDENTIALS)) {
JsonNode credsNode = JsonUtil.get(STORAGE_CREDENTIALS, json);
Preconditions.checkArgument(
credsNode.isArray(), "Cannot parse credentials from non-array: %s", credsNode);

List<Credential> credentials = Lists.newArrayList();
for (JsonNode credential : credsNode) {
credentials.add(CredentialParser.fromJson(credential));
}

builder.withCredentials(credentials);
}

return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.rest.PlanStatus;
import org.apache.iceberg.rest.credentials.Credential;
import org.apache.iceberg.rest.credentials.ImmutableCredential;
import org.junit.jupiter.api.Test;

public class TestPlanTableScanResponseParser {
Expand All @@ -53,7 +57,6 @@ public void nullAndEmptyCheck() {

@Test
public void roundTripSerdeWithEmptyObject() {

assertThatThrownBy(() -> PlanTableScanResponse.builder().build())
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Invalid response: plan status must be defined");
Expand Down Expand Up @@ -229,4 +232,231 @@ public void roundTripSerdeWithValidStatusAndFileScanTasks() {

assertThat(PlanTableScanResponseParser.toJson(copyResponse)).isEqualTo(expectedToJson);
}

@Test
public void emptyOrInvalidCredentials() {
assertThat(
PlanTableScanResponseParser.fromJson(
"{\"plan-status\": \"completed\",\"storage-credentials\": null}",
PARTITION_SPECS_BY_ID,
false)
.credentials())
.isEmpty();

assertThat(
PlanTableScanResponseParser.fromJson(
"{\"plan-status\": \"completed\",\"storage-credentials\": []}",
PARTITION_SPECS_BY_ID,
false)
.credentials())
.isEmpty();

assertThatThrownBy(
() ->
PlanTableScanResponseParser.fromJson(
"{\"plan-status\": \"completed\",\"storage-credentials\": \"invalid\"}",
PARTITION_SPECS_BY_ID,
false))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Cannot parse credentials from non-array: \"invalid\"");
}

@Test
public void roundTripSerdeWithCredentials() {
PlanStatus planStatus = PlanStatus.fromName("completed");
List<Credential> credentials =
ImmutableList.of(
ImmutableCredential.builder()
.prefix("s3://custom-uri")
.config(
ImmutableMap.of(
"s3.access-key-id",
"keyId",
"s3.secret-access-key",
"accessKey",
"s3.session-token",
"sessionToken"))
.build(),
ImmutableCredential.builder()
.prefix("gs://custom-uri")
.config(
ImmutableMap.of(
"gcs.oauth2.token", "gcsToken1", "gcs.oauth2.token-expires-at", "1000"))
.build(),
ImmutableCredential.builder()
.prefix("gs")
.config(
ImmutableMap.of(
"gcs.oauth2.token", "gcsToken2", "gcs.oauth2.token-expires-at", "2000"))
.build());

PlanTableScanResponse response =
PlanTableScanResponse.builder()
.withPlanStatus(planStatus)
.withCredentials(credentials)
.withSpecsById(PARTITION_SPECS_BY_ID)
.build();

String expectedJson =
"{\n"
+ " \"plan-status\" : \"completed\",\n"
+ " \"storage-credentials\" : [ {\n"
+ " \"prefix\" : \"s3://custom-uri\",\n"
+ " \"config\" : {\n"
+ " \"s3.access-key-id\" : \"keyId\",\n"
+ " \"s3.secret-access-key\" : \"accessKey\",\n"
+ " \"s3.session-token\" : \"sessionToken\"\n"
+ " }\n"
+ " }, {\n"
+ " \"prefix\" : \"gs://custom-uri\",\n"
+ " \"config\" : {\n"
+ " \"gcs.oauth2.token\" : \"gcsToken1\",\n"
+ " \"gcs.oauth2.token-expires-at\" : \"1000\"\n"
+ " }\n"
+ " }, {\n"
+ " \"prefix\" : \"gs\",\n"
+ " \"config\" : {\n"
+ " \"gcs.oauth2.token\" : \"gcsToken2\",\n"
+ " \"gcs.oauth2.token-expires-at\" : \"2000\"\n"
+ " }\n"
+ " } ]\n"
+ "}";

String json = PlanTableScanResponseParser.toJson(response, true);
assertThat(json).isEqualTo(expectedJson);

PlanTableScanResponse fromResponse =
PlanTableScanResponseParser.fromJson(json, PARTITION_SPECS_BY_ID, false);
PlanTableScanResponse copyResponse =
PlanTableScanResponse.builder()
.withPlanStatus(fromResponse.planStatus())
.withPlanId(fromResponse.planId())
.withSpecsById(PARTITION_SPECS_BY_ID)
.withCredentials(credentials)
.build();

assertThat(PlanTableScanResponseParser.toJson(copyResponse, true)).isEqualTo(expectedJson);
}

@Test
public void roundTripSerdeWithValidStatusAndFileScanTasksAndCredentials() {
ResidualEvaluator residualEvaluator =
ResidualEvaluator.of(SPEC, Expressions.equal("id", 1), true);
FileScanTask fileScanTask =
new BaseFileScanTask(
FILE_A,
new DeleteFile[] {FILE_A_DELETES},
SchemaParser.toJson(SCHEMA),
PartitionSpecParser.toJson(SPEC),
residualEvaluator);

PlanStatus planStatus = PlanStatus.fromName("completed");
List<Credential> credentials =
ImmutableList.of(
ImmutableCredential.builder()
.prefix("s3://custom-uri")
.config(
ImmutableMap.of(
"s3.access-key-id",
"keyId",
"s3.secret-access-key",
"accessKey",
"s3.session-token",
"sessionToken"))
.build(),
ImmutableCredential.builder()
.prefix("gs://custom-uri")
.config(
ImmutableMap.of(
"gcs.oauth2.token", "gcsToken1", "gcs.oauth2.token-expires-at", "1000"))
.build(),
ImmutableCredential.builder()
.prefix("gs")
.config(
ImmutableMap.of(
"gcs.oauth2.token", "gcsToken2", "gcs.oauth2.token-expires-at", "2000"))
.build());
PlanTableScanResponse response =
PlanTableScanResponse.builder()
.withPlanStatus(planStatus)
.withFileScanTasks(List.of(fileScanTask))
.withDeleteFiles(List.of(FILE_A_DELETES))
.withSpecsById(PARTITION_SPECS_BY_ID)
.withCredentials(credentials)
.build();

String expectedJson =
"{\n"
+ " \"plan-status\" : \"completed\",\n"
+ " \"storage-credentials\" : [ {\n"
+ " \"prefix\" : \"s3://custom-uri\",\n"
+ " \"config\" : {\n"
+ " \"s3.access-key-id\" : \"keyId\",\n"
+ " \"s3.secret-access-key\" : \"accessKey\",\n"
+ " \"s3.session-token\" : \"sessionToken\"\n"
+ " }\n"
+ " }, {\n"
+ " \"prefix\" : \"gs://custom-uri\",\n"
+ " \"config\" : {\n"
+ " \"gcs.oauth2.token\" : \"gcsToken1\",\n"
+ " \"gcs.oauth2.token-expires-at\" : \"1000\"\n"
+ " }\n"
+ " }, {\n"
+ " \"prefix\" : \"gs\",\n"
+ " \"config\" : {\n"
+ " \"gcs.oauth2.token\" : \"gcsToken2\",\n"
+ " \"gcs.oauth2.token-expires-at\" : \"2000\"\n"
+ " }\n"
+ " } ],\n"
+ " \"delete-files\" : [ {\n"
+ " \"spec-id\" : 0,\n"
+ " \"content\" : \"POSITION_DELETES\",\n"
+ " \"file-path\" : \"/path/to/data-a-deletes.parquet\",\n"
+ " \"file-format\" : \"PARQUET\",\n"
+ " \"partition\" : {\n"
+ " \"1000\" : 0\n"
+ " },\n"
+ " \"file-size-in-bytes\" : 10,\n"
+ " \"record-count\" : 1\n"
+ " } ],\n"
+ " \"file-scan-tasks\" : [ {\n"
+ " \"data-file\" : {\n"
+ " \"spec-id\" : 0,\n"
+ " \"content\" : \"DATA\",\n"
+ " \"file-path\" : \"/path/to/data-a.parquet\",\n"
+ " \"file-format\" : \"PARQUET\",\n"
+ " \"partition\" : {\n"
+ " \"1000\" : 0\n"
+ " },\n"
+ " \"file-size-in-bytes\" : 10,\n"
+ " \"record-count\" : 1,\n"
+ " \"sort-order-id\" : 0\n"
+ " },\n"
+ " \"delete-file-references\" : [ 0 ],\n"
+ " \"residual-filter\" : {\n"
+ " \"type\" : \"eq\",\n"
+ " \"term\" : \"id\",\n"
+ " \"value\" : 1\n"
+ " }\n"
+ " } ]\n"
+ "}";

String json = PlanTableScanResponseParser.toJson(response, true);
assertThat(json).isEqualTo(expectedJson);

PlanTableScanResponse fromResponse =
PlanTableScanResponseParser.fromJson(json, PARTITION_SPECS_BY_ID, false);
PlanTableScanResponse copyResponse =
PlanTableScanResponse.builder()
.withPlanStatus(fromResponse.planStatus())
.withPlanId(fromResponse.planId())
.withPlanTasks(fromResponse.planTasks())
.withDeleteFiles(fromResponse.deleteFiles())
.withFileScanTasks(fromResponse.fileScanTasks())
.withSpecsById(PARTITION_SPECS_BY_ID)
.withCredentials(credentials)
.build();

assertThat(PlanTableScanResponseParser.toJson(copyResponse, true)).isEqualTo(expectedJson);
}
}