Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Add basic implementation to support REST Catalog #4553

Merged
merged 60 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
4d5a1fd
[core] Add implementation to support REST Catalog
jerry-024 Nov 18, 2024
693b1e1
[core] Add implementation to support REST Catalog
jerry-024 Nov 18, 2024
f632a06
[core] Add implementation to support REST Catalog
jerry-024 Nov 18, 2024
0a87c4d
[core] Add implementation to support REST Catalog
jerry-024 Nov 18, 2024
52f07b6
[core] Add implementation to support REST Catalog
jerry-024 Nov 18, 2024
1c3cc24
[core] Add implementation to support REST Catalog
jerry-024 Nov 19, 2024
b5c0cdb
[core] Add implementation to support REST Catalog
jerry-024 Nov 19, 2024
2ef4bd8
[core] Add implementation to support REST Catalog
jerry-024 Nov 19, 2024
1730e06
[core] Add implementation to support REST Catalog
jerry-024 Nov 19, 2024
cf5407f
[core] Add implementation to support REST Catalog
jerry-024 Nov 19, 2024
3171f0e
[core] Add implementation to support REST Catalog
jerry-024 Nov 19, 2024
484da3c
[core] Add implementation to support REST Catalog
jerry-024 Nov 19, 2024
5c85fda
[core] Add implementation to support REST Catalog
jerry-024 Nov 19, 2024
6613160
[core] Add implementation to support REST Catalog
jerry-024 Nov 20, 2024
3f39e7e
[core] Add implementation to support REST Catalog
jerry-024 Nov 20, 2024
4f99327
[core] Add implementation to support REST Catalog
jerry-024 Nov 20, 2024
a52eb44
[core] Add implementation to support REST Catalog
jerry-024 Nov 20, 2024
a5886f5
[core] Add implementation to support REST Catalog
jerry-024 Nov 20, 2024
da2a2cf
[core] Add basic implementation to support REST Catalog
jerry-024 Nov 20, 2024
d70fe35
[core] Add basic implementation to support REST Catalog
jerry-024 Nov 20, 2024
289fbb9
[core] Add basic implementation to support REST Catalog
jerry-024 Nov 20, 2024
b0faa00
[core] Add basic implementation to support REST Catalog
jerry-024 Nov 21, 2024
f46cff3
[core] Add basic implementation to support REST Catalog
jerry-024 Nov 21, 2024
fc3fdb5
[core] Add basic implementation to support REST Catalog
jerry-024 Nov 21, 2024
903f71a
[core] Add basic implementation to support REST Catalog
jerry-024 Nov 21, 2024
acb64fc
[core] Add basic implementation to support REST Catalog
jerry-024 Nov 21, 2024
d5f253c
[core] Add basic implementation to support REST Catalog
jerry-024 Nov 21, 2024
6088452
[core] Add basic implementation to support REST Catalog
jerry-024 Nov 21, 2024
cd2ffd6
[core] Add basic implementation to support REST Catalog
jerry-024 Nov 21, 2024
ef7e9c3
[core] Add basic implementation to support REST Catalog
jerry-024 Nov 21, 2024
cb2e7ef
[core] Add basic implementation to support REST Catalog
jerry-024 Nov 21, 2024
ce226e3
[core] Add basic implementation to support REST Catalog
jerry-024 Nov 21, 2024
9a92829
[core] Add basic implementation to support REST Catalog
jerry-024 Nov 22, 2024
084e338
[core] Add basic implementation to support REST Catalog
jerry-024 Nov 22, 2024
6353a89
[core] Add basic implementation to support REST Catalog
jerry-024 Nov 27, 2024
eee9863
[core] Add basic implementation to support REST Catalog
jerry-024 Nov 27, 2024
3186300
[core] Add basic implementation to support REST Catalog
jerry-024 Nov 27, 2024
fdd093c
[core] Add basic implementation to support REST Catalog
jerry-024 Nov 27, 2024
93652b1
[core] Add basic implementation to support REST Catalog
jerry-024 Nov 27, 2024
b884933
[core] Add basic implementation to support REST Catalog
jerry-024 Nov 27, 2024
cb1de26
[core] Add basic implementation to support REST Catalog
jerry-024 Nov 27, 2024
72b7aba
[core] Add basic implementation to support REST Catalog
jerry-024 Nov 27, 2024
599a1ca
[core] Add basic implementation to support REST Catalog
jerry-024 Nov 27, 2024
2484e48
[core] Add basic implementation to support REST Catalog
jerry-024 Nov 27, 2024
4e8598c
[core] Add basic implementation to support REST Catalog
jerry-024 Nov 27, 2024
5167826
[core] Add basic implementation to support REST Catalog
jerry-024 Nov 27, 2024
a65aebd
[core] Add basic implementation to support REST Catalog
jerry-024 Nov 27, 2024
22ebee5
[core] Add basic implementation to support REST Catalog
jerry-024 Nov 27, 2024
6c7c96c
[core] Add basic implementation to support REST Catalog
jerry-024 Nov 27, 2024
41dd0aa
[core] Add basic implementation to support REST Catalog
jerry-024 Nov 27, 2024
ad576cd
[core] Add basic implementation to support REST Catalog
jerry-024 Nov 27, 2024
8ad1624
[core] Add basic implementation to support REST Catalog
jerry-024 Nov 28, 2024
3b865ce
[core] Add basic implementation to support REST Catalog
jerry-024 Nov 28, 2024
ef004a2
[core] Add basic implementation to support REST Catalog
jerry-024 Nov 28, 2024
2b8811c
[core] Add basic implementation to support REST Catalog
jerry-024 Nov 28, 2024
ea28b06
[core] Add basic implementation to support REST Catalog
jerry-024 Nov 28, 2024
72fc7dd
[core] Add basic implementation to support REST Catalog
jerry-024 Nov 28, 2024
da96c82
[core] Add basic implementation to support REST Catalog
jerry-024 Nov 28, 2024
32c3a4c
[core] Add basic implementation to support REST Catalog
jerry-024 Nov 28, 2024
b3db196
update paimon core pom exclude NOTICE
jerry-024 Nov 28, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -54,13 +55,22 @@ public class ThreadPoolUtils {
* is max thread number.
*/
public static ThreadPoolExecutor createCachedThreadPool(int threadNum, String namePrefix) {
return createCachedThreadPool(threadNum, namePrefix, new LinkedBlockingQueue<>());
}

/**
* Create a thread pool with max thread number and define queue. Inactive threads will
* automatically exit.
*/
public static ThreadPoolExecutor createCachedThreadPool(
int threadNum, String namePrefix, BlockingQueue<Runnable> workQueue) {
ThreadPoolExecutor executor =
new ThreadPoolExecutor(
threadNum,
threadNum,
1,
TimeUnit.MINUTES,
new LinkedBlockingQueue<>(),
workQueue,
newDaemonThreadFactory(namePrefix));
executor.allowCoreThreadTimeOut(true);
return executor;
Expand Down
57 changes: 57 additions & 0 deletions paimon-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ under the License.

<properties>
<frocksdbjni.version>6.20.3-ververica-2.0</frocksdbjni.version>
<okhttp.version>4.12.0</okhttp.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -63,6 +64,14 @@ under the License.
<scope>provided</scope>
</dependency>

<!-- REST Catalog dependencies -->

<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>${okhttp.version}</version>
</dependency>

<!-- test dependencies -->

<dependency>
Expand Down Expand Up @@ -204,6 +213,20 @@ under the License.
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>mockwebserver</artifactId>
<version>${okhttp.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${mockito.version}</version>
<type>jar</type>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand All @@ -219,6 +242,40 @@ under the License.
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<id>shade-paimon</id>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
jerry-024 marked this conversation as resolved.
Show resolved Hide resolved
jerry-024 marked this conversation as resolved.
Show resolved Hide resolved
<filters>
<filter>
<artifact>*</artifact>
<excludes>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why need to exclude NOTICE and LICENSE?

<exclude>okhttp3/internal/publicsuffix/NOTICE</exclude>
</excludes>
</filter>
</filters>
<artifactSet>
<includes combine.children="append">
<include>com.squareup.okhttp3:okhttp</include>
</includes>
</artifactSet>
<relocations>
<relocation>
<pattern>okhttp3</pattern>
<shadedPattern>org.apache.paimon.shade.okhttp3</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.rest;

import org.apache.paimon.rest.exceptions.BadRequestException;
import org.apache.paimon.rest.exceptions.ForbiddenException;
import org.apache.paimon.rest.exceptions.NotAuthorizedException;
import org.apache.paimon.rest.exceptions.RESTException;
import org.apache.paimon.rest.exceptions.ServiceFailureException;
import org.apache.paimon.rest.exceptions.ServiceUnavailableException;
import org.apache.paimon.rest.responses.ErrorResponse;

/** Default error handler. */
public class DefaultErrorHandler extends ErrorHandler {
private static final ErrorHandler INSTANCE = new DefaultErrorHandler();

public static ErrorHandler getInstance() {
return INSTANCE;
}

@Override
public void accept(ErrorResponse error) {
int code = error.code();
switch (code) {
case 400:
throw new BadRequestException(
String.format("Malformed request: %s", error.message()));
case 401:
throw new NotAuthorizedException("Not authorized: %s", error.message());
case 403:
throw new ForbiddenException("Forbidden: %s", error.message());
case 405:
case 406:
break;
case 500:
throw new ServiceFailureException("Server error: %s", error.message());
case 501:
throw new UnsupportedOperationException(error.message());
case 503:
throw new ServiceUnavailableException("Service unavailable: %s", error.message());
}

throw new RESTException("Unable to process: %s", error.message());
}
}
26 changes: 26 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/rest/ErrorHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.rest;

import org.apache.paimon.rest.responses.ErrorResponse;

import java.util.function.Consumer;

/** Error handler for REST client. */
public abstract class ErrorHandler implements Consumer<ErrorResponse> {}
142 changes: 142 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.rest;

import org.apache.paimon.rest.exceptions.RESTException;
import org.apache.paimon.rest.responses.ErrorResponse;

import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

import okhttp3.Dispatcher;
import okhttp3.Headers;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;

import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;

import static okhttp3.ConnectionSpec.CLEARTEXT;
import static okhttp3.ConnectionSpec.COMPATIBLE_TLS;
import static okhttp3.ConnectionSpec.MODERN_TLS;
import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool;

/** HTTP client for REST catalog. */
public class HttpClient implements RESTClient {

private final OkHttpClient okHttpClient;
private final String uri;
private final ObjectMapper mapper;
private final ErrorHandler errorHandler;

private static final String THREAD_NAME = "REST-CATALOG-HTTP-CLIENT-THREAD-POOL";
private static final MediaType MEDIA_TYPE = MediaType.parse("application/json");

public HttpClient(HttpClientOptions httpClientOptions) {
this.uri = httpClientOptions.uri();
this.mapper = httpClientOptions.mapper();
this.okHttpClient = createHttpClient(httpClientOptions);
this.errorHandler = httpClientOptions.errorHandler();
}

@Override
public <T extends RESTResponse> T get(
String path, Class<T> responseType, Map<String, String> headers) {
try {
Request request =
new Request.Builder()
.url(uri + path)
.get()
.headers(Headers.of(headers))
.build();
return exec(request, responseType);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
public <T extends RESTResponse> T post(
String path, RESTRequest body, Class<T> responseType, Map<String, String> headers) {
try {
RequestBody requestBody = buildRequestBody(body);
Request request =
new Request.Builder()
.url(uri + path)
.post(requestBody)
.headers(Headers.of(headers))
.build();
return exec(request, responseType);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
public void close() throws IOException {
okHttpClient.dispatcher().cancelAll();
okHttpClient.connectionPool().evictAll();
}

private <T extends RESTResponse> T exec(Request request, Class<T> responseType) {
try (Response response = okHttpClient.newCall(request).execute()) {
String responseBodyStr = response.body() != null ? response.body().string() : null;
if (!response.isSuccessful()) {
ErrorResponse error =
new ErrorResponse(
responseBodyStr != null ? responseBodyStr : "response body is null",
response.code());
errorHandler.accept(error);
}
if (responseBodyStr == null) {
throw new RESTException("response body is null.");
}
return mapper.readValue(responseBodyStr, responseType);
} catch (Exception e) {
throw new RESTException(e, "rest exception");
}
}

private RequestBody buildRequestBody(RESTRequest body) throws JsonProcessingException {
return RequestBody.create(mapper.writeValueAsBytes(body), MEDIA_TYPE);
}

private static OkHttpClient createHttpClient(HttpClientOptions httpClientOptions) {
BlockingQueue<Runnable> workQueue = new SynchronousQueue<>();
ExecutorService executorService =
createCachedThreadPool(httpClientOptions.threadPoolSize(), THREAD_NAME, workQueue);

OkHttpClient.Builder builder =
new OkHttpClient.Builder()
.dispatcher(new Dispatcher(executorService))
.retryOnConnectionFailure(true)
.connectionSpecs(Arrays.asList(MODERN_TLS, COMPATIBLE_TLS, CLEARTEXT));
httpClientOptions.connectTimeout().ifPresent(builder::connectTimeout);
httpClientOptions.readTimeout().ifPresent(builder::readTimeout);

return builder.build();
}
}
Loading
Loading