Skip to content

Commit f4dc3e9

Browse files
committed
[FLINK-25866][statefun] Support additional TLS configuration
1 parent 547554a commit f4dc3e9

File tree

42 files changed

+1603
-116
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+1603
-116
lines changed

docs/content/docs/modules/http-endpoint.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,13 +191,22 @@ spec:
191191
pool_ttl: 15s
192192
pool_size: 1024
193193
payload_max_bytes: 33554432
194+
trust_cacerts: ~/trustedCAs.pem
195+
client_cert: classpath:clientPublic.crt
196+
client_key: ~/clientPrivate.key
197+
client_key_password: /tmp/password.txt
194198
```
195199

196200
* `call`: total duration of a single request (including retries, and backoffs). After this duration, the call is considered failed.
197201
* `connect`: the total amount of time to wait for a successful TCP connection. After that amount of time, an attempt is considered failed, and if the total call time has not elapsed, an additional attempt will be scheduled (after a backoff).
198202
* `pool_ttl`: the amount of time a connection will live in the connection pool. Set to 0 to disable, otherwise the connection will be evicted from the pool after (approximately) that time. If a connection is evicted while it is serving a request, that connection will be only marked for eviction and will be dropped from the pool once the request returns.
199203
* `pool_size`: the maximum pool size.
200204
* `payload_max_bytes`: the maximum size for a request or response payload size. The default is set to 32MB.
205+
* `trust_cacerts`: Trusted public certificate authority certificates in a pem format. If none are provided, but the function uses https, the default jre truststore will be used. If you need to provide more than one CA cert, concat them with a newline in between. This can be taken from a classpath (e.g. classpath:file.pem) or a path.
206+
* `client_cert`: Client public certificate used for mutual tls authentication. This can be taken from a classpath (e.g. classpath:file.crt) or a path
207+
* `client_key`: PKCS8 client private key used for mutual tls authentication. This can be taken from a classpath (e.g. classpath:file.key) or a path
208+
* `client_key_password`: The location of a file containing the client key password (if required). This can be taken from a classpath (e.g. classpath:file.key) or a path
209+
201210

202211
{{< hint info >}}
203212
We highly recommend setting `statefun.async.max-per-task` to a much higher value (see [Configurations]({{< ref "docs/deployment/configurations">}}))

statefun-e2e-tests/statefun-smoke-e2e-java/pom.xml

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,23 +28,28 @@ under the License.
2828

2929
<artifactId>statefun-smoke-e2e-java</artifactId>
3030

31+
<properties>
32+
<netty.shaded.version>4.1.65.Final-14.0</netty.shaded.version>
33+
</properties>
34+
3135
<dependencies>
3236
<!-- Remote Java function dependencies -->
3337
<dependency>
3438
<groupId>org.apache.flink</groupId>
3539
<artifactId>statefun-sdk-java</artifactId>
3640
<version>${project.version}</version>
3741
</dependency>
38-
<dependency>
39-
<groupId>io.undertow</groupId>
40-
<artifactId>undertow-core</artifactId>
41-
<version>1.4.18.Final</version>
42-
</dependency>
4342
<dependency>
4443
<groupId>com.google.protobuf</groupId>
4544
<artifactId>protobuf-java</artifactId>
4645
<version>${protobuf.version}</version>
4746
</dependency>
47+
<dependency>
48+
<groupId>org.apache.flink</groupId>
49+
<artifactId>flink-shaded-netty</artifactId>
50+
<version>${netty.shaded.version}</version>
51+
<scope>compile</scope>
52+
</dependency>
4853
</dependencies>
4954

5055
<build>

statefun-e2e-tests/statefun-smoke-e2e-java/src/main/java/org/apache/flink/statefun/e2e/smoke/java/CommandInterpreterAppServer.java

Lines changed: 108 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -20,33 +20,120 @@
2020

2121
import static org.apache.flink.statefun.e2e.smoke.java.Constants.CMD_INTERPRETER_FN;
2222

23-
import io.undertow.Undertow;
23+
import java.io.IOException;
24+
import java.io.InputStream;
25+
import java.util.Objects;
26+
import java.util.concurrent.CompletableFuture;
27+
import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
28+
import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
29+
import org.apache.flink.shaded.netty4.io.netty.channel.*;
30+
import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
31+
import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
32+
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.*;
33+
import org.apache.flink.shaded.netty4.io.netty.handler.ssl.ClientAuth;
34+
import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslContext;
35+
import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslContextBuilder;
36+
import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslProvider;
2437
import org.apache.flink.statefun.sdk.java.StatefulFunctionSpec;
2538
import org.apache.flink.statefun.sdk.java.StatefulFunctions;
26-
import org.apache.flink.statefun.sdk.java.handler.RequestReplyHandler;
39+
import org.apache.flink.statefun.sdk.java.slice.Slice;
40+
import org.apache.flink.statefun.sdk.java.slice.Slices;
2741

2842
public class CommandInterpreterAppServer {
29-
public static final int PORT = 8000;
30-
31-
public static void main(String[] args) {
32-
final CommandInterpreter interpreter = new CommandInterpreter();
33-
final StatefulFunctionSpec FN_SPEC =
34-
StatefulFunctionSpec.builder(CMD_INTERPRETER_FN)
35-
.withSupplier(() -> new CommandInterpreterFn(interpreter))
36-
.withValueSpec(CommandInterpreterFn.STATE)
37-
.build();
38-
final StatefulFunctions functions = new StatefulFunctions();
43+
private static final int PORT = 8000;
44+
private static final String A_SERVER_KEY_PASSWORD = "test";
45+
private static final CommandInterpreter commandInterpreter = new CommandInterpreter();
46+
private static final StatefulFunctionSpec FN_SPEC =
47+
StatefulFunctionSpec.builder(CMD_INTERPRETER_FN)
48+
.withSupplier(() -> new CommandInterpreterFn(commandInterpreter))
49+
.withValueSpec(CommandInterpreterFn.STATE)
50+
.build();
51+
52+
public static void main(String[] args) throws IOException, InterruptedException {
53+
final InputStream trustCaCerts =
54+
Objects.requireNonNull(
55+
CommandInterpreter.class.getClassLoader().getResource("certs/a_ca.pem"))
56+
.openStream();
57+
final InputStream aServerCert =
58+
Objects.requireNonNull(
59+
CommandInterpreter.class.getClassLoader().getResource("certs/a_server.crt"))
60+
.openStream();
61+
final InputStream aServerKey =
62+
Objects.requireNonNull(
63+
CommandInterpreter.class.getClassLoader().getResource("certs/a_server.key.p8"))
64+
.openStream();
65+
66+
ServerBootstrap httpsMutualTlsBootstrap =
67+
getServerBootstrap(getChannelInitializer(trustCaCerts, aServerCert, aServerKey));
68+
69+
httpsMutualTlsBootstrap.bind(PORT).sync();
70+
}
71+
72+
private static ChannelInitializer<Channel> getChannelInitializer(
73+
InputStream trustInputStream, InputStream certInputStream, InputStream keyInputStream) {
74+
return getTlsEnabledInitializer(
75+
SslContextBuilder.forServer(certInputStream, keyInputStream, A_SERVER_KEY_PASSWORD)
76+
.trustManager(trustInputStream));
77+
}
78+
79+
private static ChannelInitializer<Channel> getTlsEnabledInitializer(
80+
SslContextBuilder sslContextBuilder) {
81+
return new ChannelInitializer<Channel>() {
82+
@Override
83+
protected void initChannel(Channel channel) throws IOException {
84+
ChannelPipeline pipeline = channel.pipeline();
85+
SslContext sslContext =
86+
sslContextBuilder.sslProvider(SslProvider.JDK).clientAuth(ClientAuth.REQUIRE).build();
87+
pipeline.addLast(sslContext.newHandler(channel.alloc()));
88+
addResponseHandlerToPipeline(pipeline);
89+
}
90+
};
91+
}
92+
93+
private static ServerBootstrap getServerBootstrap(ChannelInitializer<Channel> childHandler) {
94+
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
95+
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
96+
97+
return new ServerBootstrap()
98+
.group(eventLoopGroup, workerGroup)
99+
.channel(NioServerSocketChannel.class)
100+
.childHandler(childHandler)
101+
.option(ChannelOption.SO_BACKLOG, 128)
102+
.childOption(ChannelOption.SO_KEEPALIVE, true);
103+
}
104+
105+
private static void addResponseHandlerToPipeline(ChannelPipeline pipeline) {
106+
pipeline.addLast(new HttpServerCodec());
107+
pipeline.addLast(new HttpObjectAggregator(Integer.MAX_VALUE));
108+
pipeline.addLast(getStatefunInboundHandler());
109+
}
110+
111+
private static SimpleChannelInboundHandler<FullHttpRequest> getStatefunInboundHandler() {
112+
StatefulFunctions functions = new StatefulFunctions();
39113
functions.withStatefulFunction(FN_SPEC);
40114

41-
final RequestReplyHandler requestReplyHandler = functions.requestReplyHandler();
115+
return new SimpleChannelInboundHandler<FullHttpRequest>() {
116+
@Override
117+
protected void channelRead0(
118+
ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest) {
119+
CompletableFuture<Slice> res =
120+
functions
121+
.requestReplyHandler()
122+
.handle(Slices.wrap(fullHttpRequest.content().nioBuffer()));
123+
res.whenComplete(
124+
(r, e) -> {
125+
FullHttpResponse response =
126+
new DefaultFullHttpResponse(
127+
HttpVersion.HTTP_1_1,
128+
HttpResponseStatus.OK,
129+
Unpooled.copiedBuffer(r.toByteArray()));
130+
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/octet-stream");
131+
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, r.readableBytes());
42132

43-
// Use the request-reply handler along with your favorite HTTP web server framework
44-
// to serve the functions!
45-
final Undertow httpServer =
46-
Undertow.builder()
47-
.addHttpListener(PORT, "0.0.0.0")
48-
.setHandler(new UndertowHttpHandler(requestReplyHandler))
49-
.build();
50-
httpServer.start();
133+
channelHandlerContext.write(response);
134+
channelHandlerContext.flush();
135+
});
136+
}
137+
};
51138
}
52139
}

statefun-e2e-tests/statefun-smoke-e2e-java/src/main/java/org/apache/flink/statefun/e2e/smoke/java/UndertowHttpHandler.java

Lines changed: 0 additions & 62 deletions
This file was deleted.
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
For instructions on how these cert/key files got created, see statefun-flink/statefun-flink-core/src/test/resources/certs/README.md
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
-----BEGIN CERTIFICATE-----
2+
MIIDhTCCAm2gAwIBAgIUTRXcSpygsZsWmuX4QMz9ey0rPBYwDQYJKoZIhvcNAQEL
3+
BQAwUTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM
4+
GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDEKMAgGA1UEAwwBYTAgFw0yMjAyMjIx
5+
MzA2MDJaGA8yMTIyMDEyOTEzMDYwMlowUTELMAkGA1UEBhMCQVUxEzARBgNVBAgM
6+
ClNvbWUtU3RhdGUxITAfBgNVBAoMGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDEK
7+
MAgGA1UEAwwBYTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAKBR0E/f
8+
fcTFxs8Oo4ItZm9HYuPR6U52OuBDN8TdzjMfA+FKoTvqvt/RUg7y6QFCdVEFwMS1
9+
uyjxoMlGDX1MF9SfDTH9oZlND+DL4YatsqpEXGRfjYSheBPUWusoQ6BbXvjiUNf4
10+
2bEdTFHnb08pdt0ixtymPp3vLsoUWAqscqYKOj4DW0AMRq2fAMdt0NlJsvW9hUQr
11+
I8wvU90SJlZ/YNOMv4uae8q67YQKZziRpFaLTM5aFjXnsIYAA6Z+PByFjpLTofgc
12+
BNuGM5WIk7O88UdZOwd5OCZhkaXrPCkkCtyyPDO8kWiiSmdTOnQ2sjkOSKgNTXkd
13+
+6K7REk5gZ8WVv8CAwEAAaNTMFEwHQYDVR0OBBYEFFvccwv5Cufwh+0kLG0vmQCF
14+
6JbLMB8GA1UdIwQYMBaAFFvccwv5Cufwh+0kLG0vmQCF6JbLMA8GA1UdEwEB/wQF
15+
MAMBAf8wDQYJKoZIhvcNAQELBQADggEBADfgGnHZ42Cxoe5gNoWIUdgFjBqeiWOe
16+
NHNDLgXkx/jcOsbAO6H3+tz3Dz8QL3JIYTrwRKv0vX+5GAkkRz2R2ZaN4xHz73Co
17+
SuFfIqq4MjjrymVcnNA347vk50FjOMgfHrxpS3UQeTvlb1iuA88Zk8ewzhY376+t
18+
MoFmT1/ocb2E7jvxR0kDNCK5XsJIGzJmCBq8nIAD6wxrPXU3HJ/GLBlL5sL5kRrN
19+
x7l/DnL2oqN2DuyFmf4g03+DVmuu87XrbDrGHnn57CVnUe7z4jCFE82vA4u/tppK
20+
VgI17uYA714s3Jw6Iw+u38cmmSb79AVG05D2b2+TsqICVet3bc9MkQ8=
21+
-----END CERTIFICATE-----
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
-----BEGIN CERTIFICATE-----
2+
MIIDjTCCAnWgAwIBAgIUOfy6Qa7zTjHWOwUTdNZrzoPVVjIwDQYJKoZIhvcNAQEL
3+
BQAwUTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM
4+
GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDEKMAgGA1UEAwwBYTAgFw0yMjAzMDIx
5+
MjE4MTRaGA8yMTIyMDIwNjEyMTgxNFowRTELMAkGA1UEBhMCQVUxEzARBgNVBAgM
6+
ClNvbWUtU3RhdGUxITAfBgNVBAoMGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDCC
7+
ASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBANj0NXPAatJmpQZSqdoNU/aI
8+
bs+b7WXHJ7if2pRU+sdbAdIqU9+eGK+9lzkyin8YExvZokDXV2cHVQCW2kO8qnZE
9+
pksKk9ZtHMe8RoksbRN2uNq174GXLlku52+IleJE6SiuHDWZXU0s0tZWl3LL5yei
10+
oKaWLrF4bKxdZqm2bnWe3VZJVgiIJOuHMMoSQPE8BYnUe6n0YjpZR7vHifBLf0wM
11+
w70PuZrzcADRiokqVFS5dwZYhW1xErbNA0/pYg3MsnQeNuWyJ517KkYSUxCAnb8q
12+
LHWGBgcqJ7CSmGGpcLDIDEXJZg4lT/SQp08n99+EoTy7rkncoG+pG5IBmyJKLtUC
13+
AwEAAaNnMGUwHwYDVR0jBBgwFoAUW9xzC/kK5/CH7SQsbS+ZAIXolsswCQYDVR0T
14+
BAIwADALBgNVHQ8EBAMCBPAwKgYDVR0RBCMwIYIJbG9jYWxob3N0ghRyZW1vdGUt
15+
ZnVuY3Rpb24taG9zdDANBgkqhkiG9w0BAQsFAAOCAQEAVo2WzynblomqTGqUss9S
16+
eCOTrKkySETvGLK/GLLKaxn1Du/JRgEpIXCo5Ri3MQ8MBtpi748kN3z6Jh7ouoeu
17+
tG1I/hll0naxUzBDBzx25ZxSxGCQG/eoOFgRLF5OQ9L5BKRMW1T4/XNJqLtUVoiw
18+
qiulkNYJs2tKD/swAkM3LwAGuXl/p9KbGoYmxozwDYrW9PohL2zS0qzBRriJQGhM
19+
x65SeX50KhiDXx6mXTVUfFU91lh5H6er8SOHbS103jt0B7Y3403mhDmq2NcmY2Ln
20+
P3d6AZlhXwzsFDxD45U4lxYxMWTy6rOZkcCqrV9LGHYbmZYQPVgmiseWWo6qjLiI
21+
pw==
22+
-----END CERTIFICATE-----
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
-----BEGIN ENCRYPTED PRIVATE KEY-----
2+
MIIE4zAcBgoqhkiG9w0BDAEBMA4ECMk3nj4DFVXtAgIIAASCBMFgWecNkVuMByRI
3+
P3qAOsQxC+Tmf1eamSe4jn+WS022jUTaqO8c+T/9yyj+UJE9w1VKb1+RYAcnijAY
4+
JJghl82TTIZgzQlIcD1MwiV2E6Fiz/HolaCeEIOHhNEZDdI5INke92uon++TG/ue
5+
aW2dhpy7AFL138BUr1GfeYZgnpRWh+VuR9tgR4CRbINFqohD0nein5XUMHUZr+cY
6+
pFho0f6P5+AGhkNJJK7b/NMQULUimehnFU+wiKavEkCPcLQ3ZOd6WwadOLJMRpiK
7+
jrqP5KB6YoB/W83L9qMscZnA6gaLnfBwP42EQOtKVaPVLF3p0vb19SjQ30XNJgwG
8+
O5snRRuFQJ+IeWDklZMlEGcDrnZrRh6CtOtszBPxwmX2TNjh7XxP3KRUDj2a1dg0
9+
WHLp9osJuBkHmCa5Yb7EnX/eMWfrGRnJ8h3gIPP237rgyGkkdgC2uh4KSzZ1m0f7
10+
hUow3sArKzg3zdMbNIaV7IP/9GjJbSIHVSta7LinWgygHUtgApjKsGn4AMdZsTRO
11+
NE5cN7M4S7ersvufRF2/1EH485A5HPP4UpD4xtq6v6dEX6hcN+mBoKCwVoeClxSC
12+
avvWbI6z1LWg+m3vwNywFWJ07tDV+iLQdBk197xeNZ687qjgaxHEG9hgP2neJ7Jg
13+
lhAtyW0SbgJG/Ux5D7/wc2beQ8VhgsNUuX5XDZd09RsgGtSh28qUb2lhP6jCHH6v
14+
FDRkWtZDmQDwvofyY9cRWvRxph2GE8P+/KVV36xs1c/SJmaPevM0rI8aRP1ocLXo
15+
yx+eeQqPPg7gxSx0v9yLQ6j6pj1yhq3CPOnG2vp8MKSjc3U1sTNYGn1mc6Et5bbG
16+
cLiTIiJv566g2je2xub74Ylcj0dZdispp20M3AYHOLNsO/JNY5rKJgfiuJNbhbNV
17+
VLMIcB5yr8hhZIE+AoE9FWxFGLhoPwmGoQbvXqlvj8YvL7uqRduMxxwYSULTnINn
18+
ghG9Gt8E5ha6FSk+neOd2upj9kJng+RT2xlKLTdeaQ4T/w9Uq5Av9tqD9Q8xuUFQ
19+
Dh1weFH8HjgiJD67ozqZa0u2AbE60QXbTj8ygSOADTEdTNox97NeDRjb9hdyFCbn
20+
7T5ISYB3ZnughKN1CdwNAz+OEnID0ydidcDvH5DHMkh2Cey4ghMd1LdHH61cswcB
21+
awWjBCC8MB6tEPeWO49q0Jdq2nXeCWIBvi+0Bsde4BY0yk+S283pu78r6/NZC9y+
22+
fyNemrFZKmnBIwLQ5TLAxiK6fSo+fZPXRESGzsCCezLp5y1WoMmkEQpyYbw/XWZs
23+
9WmbldJuvNW2YHfmZ3n8ijtpmyzUlzZrXIPLB9TVqNGINj/bgSG4OjyMT0jmSQ55
24+
uGOceaG02fQjl1culVIe+axyox0UFQP3g/B5bnCDbOv9J8Nohw3OZo8Me6ThnwsB
25+
I55krAmytF6vr9FtlOqv+eF9Ahf/RRnr4IrklkgoZpcK21itk5SLPoW/hhfZnvgg
26+
Lraeycf28uaVknxVZ9gEVGcxa2Ldh9sF4u7+lhTZ/ax22X9QN0fl76pIY8zoRYSL
27+
o2maqbjH9q8bkOpX7MzXJ349k4h3QrduF+7bC//zoUTLo/rV/7b+LX2s2nWLvBhy
28+
T4+PpNAkuA==
29+
-----END ENCRYPTED PRIVATE KEY-----

statefun-e2e-tests/statefun-smoke-e2e-java/src/test/java/org/apache/flink/statefun/e2e/smoke/java/SmokeVerificationJavaE2E.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ public void runWith() throws Throwable {
4747
StatefulFunctionsAppContainers.Builder builder =
4848
StatefulFunctionsAppContainers.builder("flink-statefun-cluster", NUM_WORKERS)
4949
.withBuildContextFileFromClasspath("remote-module", "/remote-module/")
50+
.withBuildContextFileFromClasspath("certs", "/certs/")
5051
.dependsOn(remoteFunction);
5152

5253
SmokeRunner.run(parameters, builder);

statefun-e2e-tests/statefun-smoke-e2e-java/src/test/resources/Dockerfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,5 @@ FROM flink-statefun:3.3-SNAPSHOT
1818
RUN mkdir -p /opt/statefun/modules/statefun-smoke-e2e
1919
COPY statefun-smoke-e2e-driver.jar /opt/statefun/modules/statefun-smoke-e2e/
2020
COPY remote-module/ /opt/statefun/modules/statefun-smoke-e2e/
21+
COPY certs/ /opt/statefun/modules/statefun-smoke-e2e/certs/
2122
COPY flink-conf.yaml $FLINK_HOME/conf/flink-conf.yaml

0 commit comments

Comments
 (0)