Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Registry][WIP] Connecting to the ZooKeeper with SSL&ACL #16271

Draft
wants to merge 1 commit into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ registry:
session-timeout: 60s
connection-timeout: 15s
block-until-connected: 15s
digest: ~
authorization:
digest: ~


metrics:
enabled: true
Expand Down
3 changes: 2 additions & 1 deletion dolphinscheduler-api/src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ registry:
session-timeout: 60s
connection-timeout: 15s
block-until-connected: 15s
digest: ~
authorization:
digest: ~

api:
audit-enable: false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ registry:
session-timeout: 60s
connection-timeout: 15s
block-until-connected: 15s
digest: ~
authorization:
digest: ~

master:
listen-port: 5678
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ registry:
connection-timeout: 9s
block-until-connected: 600ms
# The following options are set according to personal needs
digest: ~
authorization:
digest: ~
```

After do this config, you can start your DolphinScheduler cluster, your cluster will use zookeeper as registry center to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;

import com.google.common.base.Strings;

@Slf4j
final class ZookeeperRegistry implements Registry {

Expand All @@ -80,9 +78,10 @@ final class ZookeeperRegistry implements Registry {
.sessionTimeoutMs(DurationUtils.toMillisInt(properties.getSessionTimeout()))
.connectionTimeoutMs(DurationUtils.toMillisInt(properties.getConnectionTimeout()));

final String digest = properties.getDigest();
if (!Strings.isNullOrEmpty(digest)) {
builder.authorization("digest", digest.getBytes(StandardCharsets.UTF_8))
if (properties.getAuthorization().size() > 0) {
final String schema = properties.getAuthorization().keySet().stream().findFirst().get();
final String schemaValue = properties.getAuthorization().get(schema);
builder.authorization(schema.toLowerCase(), schemaValue.getBytes(StandardCharsets.UTF_8))
.aclProvider(new ACLProvider() {

@Override
Expand All @@ -96,6 +95,7 @@ public List<ACL> getAclForPath(final String path) {
}
});
}

client = builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.apache.commons.lang3.StringUtils;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;

import lombok.AllArgsConstructor;
import lombok.Data;
Expand Down Expand Up @@ -79,6 +81,9 @@ public void validate(Object target, Errors errors) {
|| zookeeper.getBlockUntilConnected().isNegative()) {
errors.rejectValue("zookeeper.blockUntilConnected", "", "zookeeper.blockUntilConnected should be positive");
}
if (zookeeper.getAuthorization() != null && zookeeper.getAuthorization().size() != 1) {
errors.rejectValue("zookeeper.authorization", "", "zookeeper.authorization should be unique");
}
printConfig();
}

Expand All @@ -88,10 +93,11 @@ private void printConfig() {
"\n namespace -> " + zookeeper.getNamespace() +
"\n connectString -> " + zookeeper.getConnectString() +
"\n retryPolicy -> " + zookeeper.getRetryPolicy() +
"\n digest -> " + zookeeper.getDigest() +
"\n authorization -> " + zookeeper.getAuthorization() +
"\n sessionTimeout -> " + zookeeper.getSessionTimeout() +
"\n connectionTimeout -> " + zookeeper.getConnectionTimeout() +
"\n blockUntilConnected -> " + zookeeper.getBlockUntilConnected() +
"\n authorization -> " + zookeeper.getAuthorization() +
"\n****************************ZookeeperRegistryProperties**************************************";
log.info(config);
}
Expand All @@ -102,7 +108,7 @@ public static final class ZookeeperProperties {
private String namespace = "dolphinscheduler";
private String connectString;
private RetryPolicy retryPolicy = new RetryPolicy();
private String digest;
private Map<String, String> authorization = new HashMap<>();
private Duration sessionTimeout = Duration.ofSeconds(60);
private Duration connectionTimeout = Duration.ofSeconds(15);
private Duration blockUntilConnected = Duration.ofSeconds(15);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.plugin.registry.zookeeper;

import org.apache.dolphinscheduler.plugin.registry.RegistryTestCase;

import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.client.ZKClientConfig;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.server.DumbWatcher;
import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;

import java.util.Collections;
import java.util.stream.Stream;

import lombok.SneakyThrows;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerImageName;

@ActiveProfiles("digest")
@SpringBootTest(classes = ZookeeperRegistryProperties.class)
@SpringBootApplication(scanBasePackageClasses = ZookeeperRegistryProperties.class)
public class ZookeeperRegistryDigestTestCase extends RegistryTestCase<ZookeeperRegistry> {

@Autowired
private ZookeeperRegistryProperties zookeeperRegistryProperties;

private static GenericContainer<?> zookeeperContainer;

private static final Network NETWORK = Network.newNetwork();

private static ZooKeeper zk;

private static final String ROOT_USER = "root";

private static final String ROOT_PASSWORD = "root_passwd";

private static final String ID_PASSWORD = String.format("%s:%s", ROOT_USER, ROOT_PASSWORD);

private static void setupRootACLForDigest(final ZooKeeper zk) throws Exception {
final String digest = DigestAuthenticationProvider.generateDigest(ID_PASSWORD);
final ACL acl = new ACL(ZooDefs.Perms.ALL, new Id("digest", digest));
zk.setACL("/", Collections.singletonList(acl), -1);
}

@SneakyThrows
@BeforeAll
public static void setUpTestingServer() {
zookeeperContainer = new GenericContainer<>(DockerImageName.parse("zookeeper:3.8"))
.withNetwork(NETWORK)
.withExposedPorts(2181);
Startables.deepStart(Stream.of(zookeeperContainer)).join();
System.clearProperty("registry.zookeeper.connect-string");
System.setProperty("registry.zookeeper.connect-string", "localhost:" + zookeeperContainer.getMappedPort(2181));
zk = new ZooKeeper("localhost:" + zookeeperContainer.getMappedPort(2181),
30000, new DumbWatcher(), new ZKClientConfig());
setupRootACLForDigest(zk);
}

@SneakyThrows
@Override
public ZookeeperRegistry createRegistry() {
return new ZookeeperRegistry(zookeeperRegistryProperties);
}

@SneakyThrows
@AfterAll
public static void tearDownTestingServer() {
zk.close();
zookeeperContainer.close();
}
}
Loading
Loading