diff --git a/core/src/main/java/org/apache/iceberg/rest/HTTPHeaders.java b/core/src/main/java/org/apache/iceberg/rest/HTTPHeaders.java
new file mode 100644
index 000000000000..35710bd9a9b7
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/rest/HTTPHeaders.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.rest;
+
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.immutables.value.Value;
+
+/**
+ * Represents a group of HTTP headers.
+ *
+ *
Header name comparison in this class is always case-insensitive, in accordance with RFC 2616.
+ *
+ *
This class exposes methods to convert to and from different representations such as maps and
+ * multimap, for easier access and manipulation – especially when dealing with multiple headers with
+ * the same name.
+ */
+@Value.Style(depluralize = true)
+@Value.Immutable
+@SuppressWarnings({"ImmutablesStyle", "SafeLoggingPropagation"})
+public interface HTTPHeaders {
+
+ HTTPHeaders EMPTY = of();
+
+ /** Returns all the header entries in this group. */
+ Set entries();
+
+ /** Returns all the entries in this group for the given name (case-insensitive). */
+ default Set entries(String name) {
+ return entries().stream()
+ .filter(header -> header.name().equalsIgnoreCase(name))
+ .collect(Collectors.toSet());
+ }
+
+ /** Returns whether this group contains an entry with the given name (case-insensitive). */
+ default boolean contains(String name) {
+ return entries().stream().anyMatch(header -> header.name().equalsIgnoreCase(name));
+ }
+
+ /**
+ * Adds the given header to the current group if no entry with the same name is already present.
+ * Returns a new instance with the added header, or the current instance if the header is already
+ * present.
+ */
+ default HTTPHeaders putIfAbsent(HTTPHeader header) {
+ Preconditions.checkNotNull(header, "header");
+ return contains(header.name())
+ ? this
+ : ImmutableHTTPHeaders.builder().from(this).addEntry(header).build();
+ }
+
+ /**
+ * Adds the given headers to the current group if no entries with same names are already present.
+ * Returns a new instance with the added headers, or the current instance if all headers are
+ * already present.
+ */
+ default HTTPHeaders putIfAbsent(HTTPHeaders headers) {
+ Preconditions.checkNotNull(headers, "headers");
+ List newHeaders =
+ headers.entries().stream().filter(e -> !contains(e.name())).collect(Collectors.toList());
+ return newHeaders.isEmpty()
+ ? this
+ : ImmutableHTTPHeaders.builder().from(this).addAllEntries(newHeaders).build();
+ }
+
+ static HTTPHeaders of(HTTPHeader... headers) {
+ return ImmutableHTTPHeaders.builder().addEntries(headers).build();
+ }
+
+ /** Represents an HTTP header as a name-value pair. */
+ @Value.Style(redactedMask = "****", depluralize = true)
+ @Value.Immutable
+ @SuppressWarnings({"ImmutablesStyle", "SafeLoggingPropagation"})
+ interface HTTPHeader {
+
+ String name();
+
+ @Value.Redacted
+ String value();
+
+ @Value.Check
+ default void check() {
+ if (name().isEmpty()) {
+ throw new IllegalArgumentException("Header name cannot be empty");
+ }
+ }
+
+ static HTTPHeader of(String name, String value) {
+ return ImmutableHTTPHeader.builder().name(name).value(value).build();
+ }
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/rest/HTTPRequest.java b/core/src/main/java/org/apache/iceberg/rest/HTTPRequest.java
new file mode 100644
index 000000000000..41921d946ca8
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/rest/HTTPRequest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.rest;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.hc.core5.net.URIBuilder;
+import org.apache.iceberg.exceptions.RESTException;
+import org.immutables.value.Value;
+
+/** Represents an HTTP request. */
+@Value.Style(redactedMask = "****", depluralize = true)
+@Value.Immutable
+@SuppressWarnings({"ImmutablesStyle", "SafeLoggingPropagation"})
+public interface HTTPRequest {
+
+ enum HTTPMethod {
+ GET,
+ HEAD,
+ POST,
+ DELETE
+ }
+
+ /**
+ * Returns the base URI configured at the REST client level. The base URI is used to construct the
+ * full {@link #requestUri()}.
+ */
+ URI baseUri();
+
+ /**
+ * Returns the full URI of this request. The URI is constructed from the base URI, path, and query
+ * parameters. It cannot be modified directly.
+ */
+ @Value.Lazy
+ default URI requestUri() {
+ // if full path is provided, use the input path as path
+ String fullPath =
+ (path().startsWith("https://") || path().startsWith("http://"))
+ ? path()
+ : String.format("%s/%s", baseUri(), path());
+ try {
+ URIBuilder builder = new URIBuilder(RESTUtil.stripTrailingSlash(fullPath));
+ queryParameters().forEach(builder::addParameter);
+ return builder.build();
+ } catch (URISyntaxException e) {
+ throw new RESTException(
+ "Failed to create request URI from base %s, params %s", fullPath, queryParameters());
+ }
+ }
+
+ /** Returns the HTTP method of this request. */
+ HTTPMethod method();
+
+ /** Returns the path of this request. */
+ String path();
+
+ /** Returns the query parameters of this request. */
+ Map queryParameters();
+
+ /** Returns the headers of this request. */
+ @Value.Default
+ default HTTPHeaders headers() {
+ return HTTPHeaders.EMPTY;
+ }
+
+ /** Returns the raw, unencoded request body. */
+ @Nullable
+ @Value.Redacted
+ Object body();
+
+ /** Returns the encoded request body as a string. */
+ @Value.Lazy
+ @Nullable
+ @Value.Redacted
+ default String encodedBody() {
+ Object body = body();
+ if (body instanceof Map) {
+ return RESTUtil.encodeFormData((Map, ?>) body);
+ } else if (body != null) {
+ try {
+ return mapper().writeValueAsString(body);
+ } catch (JsonProcessingException e) {
+ throw new RESTException(e, "Failed to encode request body: %s", body);
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Returns the {@link ObjectMapper} to use for encoding the request body. The default is {@link
+ * RESTObjectMapper#mapper()}.
+ */
+ @Value.Default
+ default ObjectMapper mapper() {
+ return RESTObjectMapper.mapper();
+ }
+
+ @Value.Check
+ default void check() {
+ if (path().startsWith("/")) {
+ throw new RESTException(
+ "Received a malformed path for a REST request: %s. Paths should not start with /",
+ path());
+ }
+ }
+}
diff --git a/core/src/test/java/org/apache/iceberg/rest/TestHTTPHeaders.java b/core/src/test/java/org/apache/iceberg/rest/TestHTTPHeaders.java
new file mode 100644
index 000000000000..53d37186f7cc
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/rest/TestHTTPHeaders.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.rest;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import org.apache.iceberg.rest.HTTPHeaders.HTTPHeader;
+import org.junit.jupiter.api.Test;
+
+class TestHTTPHeaders {
+
+ private final HTTPHeaders headers =
+ HTTPHeaders.of(
+ HTTPHeader.of("header1", "value1a"),
+ HTTPHeader.of("HEADER1", "value1b"),
+ HTTPHeader.of("header2", "value2"));
+
+ @Test
+ void entries() {
+ assertThat(headers.entries("header1"))
+ .containsExactlyInAnyOrder(
+ HTTPHeader.of("header1", "value1a"), HTTPHeader.of("HEADER1", "value1b"));
+ assertThat(headers.entries("HEADER1"))
+ .containsExactlyInAnyOrder(
+ HTTPHeader.of("header1", "value1a"), HTTPHeader.of("HEADER1", "value1b"));
+ assertThat(headers.entries("header2"))
+ .containsExactlyInAnyOrder(HTTPHeader.of("header2", "value2"));
+ assertThat(headers.entries("HEADER2"))
+ .containsExactlyInAnyOrder(HTTPHeader.of("header2", "value2"));
+ assertThat(headers.entries("header3")).isEmpty();
+ assertThat(headers.entries("HEADER3")).isEmpty();
+ assertThat(headers.entries(null)).isEmpty();
+ }
+
+ @Test
+ void contains() {
+ assertThat(headers.contains("header1")).isTrue();
+ assertThat(headers.contains("HEADER1")).isTrue();
+ assertThat(headers.contains("header2")).isTrue();
+ assertThat(headers.contains("HEADER2")).isTrue();
+ assertThat(headers.contains("header3")).isFalse();
+ assertThat(headers.contains("HEADER3")).isFalse();
+ assertThat(headers.contains(null)).isFalse();
+ }
+
+ @Test
+ void putIfAbsentHTTPHeader() {
+ HTTPHeaders actual = headers.putIfAbsent(HTTPHeader.of("Header1", "value1c"));
+ assertThat(actual).isSameAs(headers);
+
+ actual = headers.putIfAbsent(HTTPHeader.of("header3", "value3"));
+ assertThat(actual.entries())
+ .containsExactly(
+ HTTPHeader.of("header1", "value1a"),
+ HTTPHeader.of("HEADER1", "value1b"),
+ HTTPHeader.of("header2", "value2"),
+ HTTPHeader.of("header3", "value3"));
+
+ assertThatThrownBy(() -> headers.putIfAbsent((HTTPHeader) null))
+ .isInstanceOf(NullPointerException.class)
+ .hasMessage("header");
+ }
+
+ @Test
+ void putIfAbsentHTTPHeaders() {
+ HTTPHeaders actual = headers.putIfAbsent(HTTPHeaders.of(HTTPHeader.of("Header1", "value1c")));
+ assertThat(actual).isSameAs(headers);
+
+ actual =
+ headers.putIfAbsent(
+ ImmutableHTTPHeaders.builder()
+ .addEntry(HTTPHeader.of("Header1", "value1c"))
+ .addEntry(HTTPHeader.of("header3", "value3"))
+ .build());
+ assertThat(actual)
+ .isEqualTo(
+ ImmutableHTTPHeaders.builder()
+ .addEntries(
+ HTTPHeader.of("header1", "value1a"),
+ HTTPHeader.of("HEADER1", "value1b"),
+ HTTPHeader.of("header2", "value2"),
+ HTTPHeader.of("header3", "value3"))
+ .build());
+
+ assertThatThrownBy(() -> headers.putIfAbsent((HTTPHeaders) null))
+ .isInstanceOf(NullPointerException.class)
+ .hasMessage("headers");
+ }
+
+ @Test
+ void invalidHeader() {
+ // invalid input (null name or value)
+ assertThatThrownBy(() -> HTTPHeader.of(null, "value1"))
+ .isInstanceOf(NullPointerException.class)
+ .hasMessage("name");
+ assertThatThrownBy(() -> HTTPHeader.of("header1", null))
+ .isInstanceOf(NullPointerException.class)
+ .hasMessage("value");
+
+ // invalid input (empty name)
+ assertThatThrownBy(() -> HTTPHeader.of("", "value1"))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Header name cannot be empty");
+ }
+}
diff --git a/core/src/test/java/org/apache/iceberg/rest/TestHTTPRequest.java b/core/src/test/java/org/apache/iceberg/rest/TestHTTPRequest.java
new file mode 100644
index 000000000000..84e1b0830c9b
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/rest/TestHTTPRequest.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.rest;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.net.URI;
+import java.util.Map;
+import java.util.stream.Stream;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.exceptions.RESTException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.rest.requests.CreateNamespaceRequest;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.Mockito;
+
+class TestHTTPRequest {
+
+ @ParameterizedTest
+ @MethodSource("validRequestUris")
+ public void requestUriSuccess(HTTPRequest request, URI expected) {
+ assertThat(request.requestUri()).isEqualTo(expected);
+ }
+
+ public static Stream validRequestUris() {
+ return Stream.of(
+ Arguments.of(
+ ImmutableHTTPRequest.builder()
+ .baseUri(URI.create("http://localhost:8080/foo"))
+ .method(HTTPRequest.HTTPMethod.GET)
+ .path("v1/namespaces/ns/tables/") // trailing slash should be removed
+ .putQueryParameter("pageToken", "1234")
+ .putQueryParameter("pageSize", "10")
+ .build(),
+ URI.create(
+ "http://localhost:8080/foo/v1/namespaces/ns/tables?pageToken=1234&pageSize=10")),
+ Arguments.of(
+ ImmutableHTTPRequest.builder()
+ .baseUri(URI.create("http://localhost:8080/foo"))
+ .method(HTTPRequest.HTTPMethod.GET)
+ .path("https://authserver.com/token") // absolute path HTTPS
+ .build(),
+ URI.create("https://authserver.com/token")),
+ Arguments.of(
+ ImmutableHTTPRequest.builder()
+ .baseUri(URI.create("http://localhost:8080/foo"))
+ .method(HTTPRequest.HTTPMethod.GET)
+ .path("http://authserver.com/token") // absolute path HTTP
+ .build(),
+ URI.create("http://authserver.com/token")));
+ }
+
+ @Test
+ public void malformedPath() {
+ assertThatThrownBy(
+ () ->
+ ImmutableHTTPRequest.builder()
+ .baseUri(URI.create("http://localhost"))
+ .method(HTTPRequest.HTTPMethod.GET)
+ .path("/v1/namespaces") // wrong leading slash
+ .build())
+ .isInstanceOf(RESTException.class)
+ .hasMessage(
+ "Received a malformed path for a REST request: /v1/namespaces. Paths should not start with /");
+ }
+
+ @Test
+ public void invalidPath() {
+ HTTPRequest request =
+ ImmutableHTTPRequest.builder()
+ .baseUri(URI.create("http://localhost"))
+ .method(HTTPRequest.HTTPMethod.GET)
+ .path(" not a valid path") // wrong path
+ .build();
+ assertThatThrownBy(request::requestUri)
+ .isInstanceOf(RESTException.class)
+ .hasMessage(
+ "Failed to create request URI from base http://localhost/ not a valid path, params {}");
+ }
+
+ @Test
+ public void encodedBodyJSON() {
+ HTTPRequest request =
+ ImmutableHTTPRequest.builder()
+ .baseUri(URI.create("http://localhost"))
+ .method(HTTPRequest.HTTPMethod.POST)
+ .path("v1/namespaces/ns")
+ .body(
+ CreateNamespaceRequest.builder()
+ .withNamespace(Namespace.of("ns"))
+ .setProperties(ImmutableMap.of("prop1", "value1"))
+ .build())
+ .build();
+ assertThat(request.encodedBody())
+ .isEqualTo("{\"namespace\":[\"ns\"],\"properties\":{\"prop1\":\"value1\"}}");
+ }
+
+ @Test
+ public void encodedBodyJSONInvalid() throws JsonProcessingException {
+ ObjectMapper mapper = Mockito.mock(ObjectMapper.class);
+ Mockito.when(mapper.writeValueAsString(Mockito.any()))
+ .thenThrow(new JsonMappingException(null, "invalid"));
+ HTTPRequest request =
+ ImmutableHTTPRequest.builder()
+ .baseUri(URI.create("http://localhost"))
+ .method(HTTPRequest.HTTPMethod.POST)
+ .path("token")
+ .body("invalid")
+ .mapper(mapper)
+ .build();
+ assertThatThrownBy(request::encodedBody)
+ .isInstanceOf(RESTException.class)
+ .hasMessage("Failed to encode request body: invalid");
+ }
+
+ @Test
+ public void encodedBodyFormData() {
+ HTTPRequest request =
+ ImmutableHTTPRequest.builder()
+ .baseUri(URI.create("http://localhost"))
+ .method(HTTPRequest.HTTPMethod.POST)
+ .path("token")
+ .body(
+ ImmutableMap.of(
+ "grant_type", "urn:ietf:params:oauth:grant-type:token-exchange",
+ "subject_token", "token",
+ "subject_token_type", "urn:ietf:params:oauth:token-type:access_token",
+ "scope", "catalog"))
+ .build();
+ assertThat(request.encodedBody())
+ .isEqualTo(
+ "grant_type=urn%3Aietf%3Aparams%3Aoauth%3Agrant-type%3Atoken-exchange&"
+ + "subject_token=token&"
+ + "subject_token_type=urn%3Aietf%3Aparams%3Aoauth%3Atoken-type%3Aaccess_token&"
+ + "scope=catalog");
+ }
+
+ @Test
+ public void encodedBodyFormDataNullKeysAndValues() {
+ Map body = Maps.newHashMap();
+ body.put(null, "token");
+ body.put("scope", null);
+ HTTPRequest request =
+ ImmutableHTTPRequest.builder()
+ .baseUri(URI.create("http://localhost"))
+ .method(HTTPRequest.HTTPMethod.POST)
+ .path("token")
+ .body(body)
+ .build();
+ assertThat(request.encodedBody()).isEqualTo("null=token&scope=null");
+ }
+
+ @Test
+ public void encodedBodyNull() {
+ HTTPRequest request =
+ ImmutableHTTPRequest.builder()
+ .baseUri(URI.create("http://localhost"))
+ .method(HTTPRequest.HTTPMethod.POST)
+ .path("token")
+ .build();
+ assertThat(request.encodedBody()).isNull();
+ }
+}