Skip to content

Commit d2d4188

Browse files
committed
Core: Add storage credentials to PlanTableScanResponse
1 parent caa42f6 commit d2d4188

File tree

3 files changed

+292
-11
lines changed

3 files changed

+292
-11
lines changed

core/src/main/java/org/apache/iceberg/rest/responses/PlanTableScanResponse.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,22 +25,28 @@
2525
import org.apache.iceberg.PartitionSpec;
2626
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
2727
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
28+
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
29+
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
2830
import org.apache.iceberg.rest.PlanStatus;
31+
import org.apache.iceberg.rest.credentials.Credential;
2932

3033
public class PlanTableScanResponse extends BaseScanTaskResponse {
3134
private final PlanStatus planStatus;
3235
private final String planId;
36+
private final List<Credential> credentials;
3337

3438
private PlanTableScanResponse(
3539
PlanStatus planStatus,
3640
String planId,
3741
List<String> planTasks,
3842
List<FileScanTask> fileScanTasks,
3943
List<DeleteFile> deleteFiles,
40-
Map<Integer, PartitionSpec> specsById) {
44+
Map<Integer, PartitionSpec> specsById,
45+
List<Credential> credentials) {
4146
super(planTasks, fileScanTasks, deleteFiles, specsById);
4247
this.planStatus = planStatus;
4348
this.planId = planId;
49+
this.credentials = credentials;
4450
validate();
4551
}
4652

@@ -52,6 +58,10 @@ public String planId() {
5258
return planId;
5359
}
5460

61+
public List<Credential> credentials() {
62+
return credentials != null ? credentials : ImmutableList.of();
63+
}
64+
5565
@Override
5666
public String toString() {
5767
return MoreObjects.toStringHelper(this)
@@ -90,6 +100,7 @@ public static Builder builder() {
90100
public static class Builder extends BaseScanTaskResponse.Builder<Builder, PlanTableScanResponse> {
91101
private PlanStatus planStatus;
92102
private String planId;
103+
private List<Credential> credentials = Lists.newArrayList();
93104

94105
public Builder withPlanStatus(PlanStatus status) {
95106
this.planStatus = status;
@@ -101,10 +112,26 @@ public Builder withPlanId(String id) {
101112
return this;
102113
}
103114

115+
public Builder withCredential(Credential credential) {
116+
this.credentials.add(credential);
117+
return this;
118+
}
119+
120+
public Builder withCredentials(List<Credential> credentialsToAdd) {
121+
this.credentials = credentialsToAdd;
122+
return this;
123+
}
124+
104125
@Override
105126
public PlanTableScanResponse build() {
106127
return new PlanTableScanResponse(
107-
planStatus, planId, planTasks(), fileScanTasks(), deleteFiles(), specsById());
128+
planStatus,
129+
planId,
130+
planTasks(),
131+
fileScanTasks(),
132+
deleteFiles(),
133+
specsById(),
134+
credentials);
108135
}
109136
}
110137
}

core/src/main/java/org/apache/iceberg/rest/responses/PlanTableScanResponseParser.java

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,15 @@
3030
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
3131
import org.apache.iceberg.rest.PlanStatus;
3232
import org.apache.iceberg.rest.TableScanResponseParser;
33+
import org.apache.iceberg.rest.credentials.Credential;
34+
import org.apache.iceberg.rest.credentials.CredentialParser;
3335
import org.apache.iceberg.util.JsonUtil;
3436

3537
public class PlanTableScanResponseParser {
3638
private static final String PLAN_STATUS = "plan-status";
3739
private static final String PLAN_ID = "plan-id";
3840
private static final String PLAN_TASKS = "plan-tasks";
41+
private static final String STORAGE_CREDENTIALS = "storage-credentials";
3942

4043
private PlanTableScanResponseParser() {}
4144

@@ -64,6 +67,15 @@ public static void toJson(PlanTableScanResponse response, JsonGenerator gen) thr
6467
JsonUtil.writeStringArray(PLAN_TASKS, response.planTasks(), gen);
6568
}
6669

70+
if (!response.credentials().isEmpty()) {
71+
gen.writeArrayFieldStart(STORAGE_CREDENTIALS);
72+
for (Credential credential : response.credentials()) {
73+
CredentialParser.toJson(credential, gen);
74+
}
75+
76+
gen.writeEndArray();
77+
}
78+
6779
TableScanResponseParser.serializeScanTasks(
6880
response.fileScanTasks(), response.deleteFiles(), response.specsById(), gen);
6981

@@ -92,13 +104,25 @@ public static PlanTableScanResponse fromJson(
92104
List<FileScanTask> fileScanTasks =
93105
TableScanResponseParser.parseFileScanTasks(json, deleteFiles, specsById, caseSensitive);
94106

95-
return PlanTableScanResponse.builder()
96-
.withPlanId(planId)
97-
.withPlanStatus(planStatus)
98-
.withPlanTasks(planTasks)
99-
.withFileScanTasks(fileScanTasks)
100-
.withDeleteFiles(deleteFiles)
101-
.withSpecsById(specsById)
102-
.build();
107+
PlanTableScanResponse.Builder builder =
108+
PlanTableScanResponse.builder()
109+
.withPlanId(planId)
110+
.withPlanStatus(planStatus)
111+
.withPlanTasks(planTasks)
112+
.withFileScanTasks(fileScanTasks)
113+
.withDeleteFiles(deleteFiles)
114+
.withSpecsById(specsById);
115+
116+
if (json.hasNonNull(STORAGE_CREDENTIALS)) {
117+
JsonNode credentials = JsonUtil.get(STORAGE_CREDENTIALS, json);
118+
Preconditions.checkArgument(
119+
credentials.isArray(), "Cannot parse credentials from non-array: %s", credentials);
120+
121+
for (JsonNode credential : credentials) {
122+
builder.withCredential(CredentialParser.fromJson(credential));
123+
}
124+
}
125+
126+
return builder.build();
103127
}
104128
}

core/src/test/java/org/apache/iceberg/rest/responses/TestPlanTableScanResponseParser.java

Lines changed: 231 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,11 @@
3434
import org.apache.iceberg.SchemaParser;
3535
import org.apache.iceberg.expressions.Expressions;
3636
import org.apache.iceberg.expressions.ResidualEvaluator;
37+
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
38+
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
3739
import org.apache.iceberg.rest.PlanStatus;
40+
import org.apache.iceberg.rest.credentials.Credential;
41+
import org.apache.iceberg.rest.credentials.ImmutableCredential;
3842
import org.junit.jupiter.api.Test;
3943

4044
public class TestPlanTableScanResponseParser {
@@ -53,7 +57,6 @@ public void nullAndEmptyCheck() {
5357

5458
@Test
5559
public void roundTripSerdeWithEmptyObject() {
56-
5760
assertThatThrownBy(() -> PlanTableScanResponse.builder().build())
5861
.isInstanceOf(IllegalArgumentException.class)
5962
.hasMessage("Invalid response: plan status must be defined");
@@ -229,4 +232,231 @@ public void roundTripSerdeWithValidStatusAndFileScanTasks() {
229232

230233
assertThat(PlanTableScanResponseParser.toJson(copyResponse)).isEqualTo(expectedToJson);
231234
}
235+
236+
@Test
237+
public void emptyOrInvalidCredentials() {
238+
assertThat(
239+
PlanTableScanResponseParser.fromJson(
240+
"{\"plan-status\": \"completed\",\"storage-credentials\": null}",
241+
PARTITION_SPECS_BY_ID,
242+
false)
243+
.credentials())
244+
.isEmpty();
245+
246+
assertThat(
247+
PlanTableScanResponseParser.fromJson(
248+
"{\"plan-status\": \"completed\",\"storage-credentials\": []}",
249+
PARTITION_SPECS_BY_ID,
250+
false)
251+
.credentials())
252+
.isEmpty();
253+
254+
assertThatThrownBy(
255+
() ->
256+
PlanTableScanResponseParser.fromJson(
257+
"{\"plan-status\": \"completed\",\"storage-credentials\": \"invalid\"}",
258+
PARTITION_SPECS_BY_ID,
259+
false))
260+
.isInstanceOf(IllegalArgumentException.class)
261+
.hasMessage("Cannot parse credentials from non-array: \"invalid\"");
262+
}
263+
264+
@Test
265+
public void roundTripSerdeWithCredentials() {
266+
PlanStatus planStatus = PlanStatus.fromName("completed");
267+
List<Credential> credentials =
268+
ImmutableList.of(
269+
ImmutableCredential.builder()
270+
.prefix("s3://custom-uri")
271+
.config(
272+
ImmutableMap.of(
273+
"s3.access-key-id",
274+
"keyId",
275+
"s3.secret-access-key",
276+
"accessKey",
277+
"s3.session-token",
278+
"sessionToken"))
279+
.build(),
280+
ImmutableCredential.builder()
281+
.prefix("gs://custom-uri")
282+
.config(
283+
ImmutableMap.of(
284+
"gcs.oauth2.token", "gcsToken1", "gcs.oauth2.token-expires-at", "1000"))
285+
.build(),
286+
ImmutableCredential.builder()
287+
.prefix("gs")
288+
.config(
289+
ImmutableMap.of(
290+
"gcs.oauth2.token", "gcsToken2", "gcs.oauth2.token-expires-at", "2000"))
291+
.build());
292+
293+
PlanTableScanResponse response =
294+
PlanTableScanResponse.builder()
295+
.withPlanStatus(planStatus)
296+
.withCredentials(credentials)
297+
.withSpecsById(PARTITION_SPECS_BY_ID)
298+
.build();
299+
300+
String expectedJson =
301+
"{\n"
302+
+ " \"plan-status\" : \"completed\",\n"
303+
+ " \"storage-credentials\" : [ {\n"
304+
+ " \"prefix\" : \"s3://custom-uri\",\n"
305+
+ " \"config\" : {\n"
306+
+ " \"s3.access-key-id\" : \"keyId\",\n"
307+
+ " \"s3.secret-access-key\" : \"accessKey\",\n"
308+
+ " \"s3.session-token\" : \"sessionToken\"\n"
309+
+ " }\n"
310+
+ " }, {\n"
311+
+ " \"prefix\" : \"gs://custom-uri\",\n"
312+
+ " \"config\" : {\n"
313+
+ " \"gcs.oauth2.token\" : \"gcsToken1\",\n"
314+
+ " \"gcs.oauth2.token-expires-at\" : \"1000\"\n"
315+
+ " }\n"
316+
+ " }, {\n"
317+
+ " \"prefix\" : \"gs\",\n"
318+
+ " \"config\" : {\n"
319+
+ " \"gcs.oauth2.token\" : \"gcsToken2\",\n"
320+
+ " \"gcs.oauth2.token-expires-at\" : \"2000\"\n"
321+
+ " }\n"
322+
+ " } ]\n"
323+
+ "}";
324+
325+
String json = PlanTableScanResponseParser.toJson(response, true);
326+
assertThat(json).isEqualTo(expectedJson);
327+
328+
PlanTableScanResponse fromResponse =
329+
PlanTableScanResponseParser.fromJson(json, PARTITION_SPECS_BY_ID, false);
330+
PlanTableScanResponse copyResponse =
331+
PlanTableScanResponse.builder()
332+
.withPlanStatus(fromResponse.planStatus())
333+
.withPlanId(fromResponse.planId())
334+
.withSpecsById(PARTITION_SPECS_BY_ID)
335+
.withCredentials(credentials)
336+
.build();
337+
338+
assertThat(PlanTableScanResponseParser.toJson(copyResponse, true)).isEqualTo(expectedJson);
339+
}
340+
341+
@Test
342+
public void roundTripSerdeWithValidStatusAndFileScanTasksAndCredentials() {
343+
ResidualEvaluator residualEvaluator =
344+
ResidualEvaluator.of(SPEC, Expressions.equal("id", 1), true);
345+
FileScanTask fileScanTask =
346+
new BaseFileScanTask(
347+
FILE_A,
348+
new DeleteFile[] {FILE_A_DELETES},
349+
SchemaParser.toJson(SCHEMA),
350+
PartitionSpecParser.toJson(SPEC),
351+
residualEvaluator);
352+
353+
PlanStatus planStatus = PlanStatus.fromName("completed");
354+
List<Credential> credentials =
355+
ImmutableList.of(
356+
ImmutableCredential.builder()
357+
.prefix("s3://custom-uri")
358+
.config(
359+
ImmutableMap.of(
360+
"s3.access-key-id",
361+
"keyId",
362+
"s3.secret-access-key",
363+
"accessKey",
364+
"s3.session-token",
365+
"sessionToken"))
366+
.build(),
367+
ImmutableCredential.builder()
368+
.prefix("gs://custom-uri")
369+
.config(
370+
ImmutableMap.of(
371+
"gcs.oauth2.token", "gcsToken1", "gcs.oauth2.token-expires-at", "1000"))
372+
.build(),
373+
ImmutableCredential.builder()
374+
.prefix("gs")
375+
.config(
376+
ImmutableMap.of(
377+
"gcs.oauth2.token", "gcsToken2", "gcs.oauth2.token-expires-at", "2000"))
378+
.build());
379+
PlanTableScanResponse response =
380+
PlanTableScanResponse.builder()
381+
.withPlanStatus(planStatus)
382+
.withFileScanTasks(List.of(fileScanTask))
383+
.withDeleteFiles(List.of(FILE_A_DELETES))
384+
.withSpecsById(PARTITION_SPECS_BY_ID)
385+
.withCredentials(credentials)
386+
.build();
387+
388+
String expectedJson =
389+
"{\n"
390+
+ " \"plan-status\" : \"completed\",\n"
391+
+ " \"storage-credentials\" : [ {\n"
392+
+ " \"prefix\" : \"s3://custom-uri\",\n"
393+
+ " \"config\" : {\n"
394+
+ " \"s3.access-key-id\" : \"keyId\",\n"
395+
+ " \"s3.secret-access-key\" : \"accessKey\",\n"
396+
+ " \"s3.session-token\" : \"sessionToken\"\n"
397+
+ " }\n"
398+
+ " }, {\n"
399+
+ " \"prefix\" : \"gs://custom-uri\",\n"
400+
+ " \"config\" : {\n"
401+
+ " \"gcs.oauth2.token\" : \"gcsToken1\",\n"
402+
+ " \"gcs.oauth2.token-expires-at\" : \"1000\"\n"
403+
+ " }\n"
404+
+ " }, {\n"
405+
+ " \"prefix\" : \"gs\",\n"
406+
+ " \"config\" : {\n"
407+
+ " \"gcs.oauth2.token\" : \"gcsToken2\",\n"
408+
+ " \"gcs.oauth2.token-expires-at\" : \"2000\"\n"
409+
+ " }\n"
410+
+ " } ],\n"
411+
+ " \"delete-files\" : [ {\n"
412+
+ " \"spec-id\" : 0,\n"
413+
+ " \"content\" : \"POSITION_DELETES\",\n"
414+
+ " \"file-path\" : \"/path/to/data-a-deletes.parquet\",\n"
415+
+ " \"file-format\" : \"PARQUET\",\n"
416+
+ " \"partition\" : {\n"
417+
+ " \"1000\" : 0\n"
418+
+ " },\n"
419+
+ " \"file-size-in-bytes\" : 10,\n"
420+
+ " \"record-count\" : 1\n"
421+
+ " } ],\n"
422+
+ " \"file-scan-tasks\" : [ {\n"
423+
+ " \"data-file\" : {\n"
424+
+ " \"spec-id\" : 0,\n"
425+
+ " \"content\" : \"DATA\",\n"
426+
+ " \"file-path\" : \"/path/to/data-a.parquet\",\n"
427+
+ " \"file-format\" : \"PARQUET\",\n"
428+
+ " \"partition\" : {\n"
429+
+ " \"1000\" : 0\n"
430+
+ " },\n"
431+
+ " \"file-size-in-bytes\" : 10,\n"
432+
+ " \"record-count\" : 1,\n"
433+
+ " \"sort-order-id\" : 0\n"
434+
+ " },\n"
435+
+ " \"delete-file-references\" : [ 0 ],\n"
436+
+ " \"residual-filter\" : {\n"
437+
+ " \"type\" : \"eq\",\n"
438+
+ " \"term\" : \"id\",\n"
439+
+ " \"value\" : 1\n"
440+
+ " }\n"
441+
+ " } ]\n"
442+
+ "}";
443+
444+
String json = PlanTableScanResponseParser.toJson(response, true);
445+
assertThat(json).isEqualTo(expectedJson);
446+
447+
PlanTableScanResponse fromResponse =
448+
PlanTableScanResponseParser.fromJson(json, PARTITION_SPECS_BY_ID, false);
449+
PlanTableScanResponse copyResponse =
450+
PlanTableScanResponse.builder()
451+
.withPlanStatus(fromResponse.planStatus())
452+
.withPlanId(fromResponse.planId())
453+
.withPlanTasks(fromResponse.planTasks())
454+
.withDeleteFiles(fromResponse.deleteFiles())
455+
.withFileScanTasks(fromResponse.fileScanTasks())
456+
.withSpecsById(PARTITION_SPECS_BY_ID)
457+
.withCredentials(credentials)
458+
.build();
459+
460+
assertThat(PlanTableScanResponseParser.toJson(copyResponse, true)).isEqualTo(expectedJson);
461+
}
232462
}

0 commit comments

Comments
 (0)