Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into v2.6
Browse files Browse the repository at this point in the history
  • Loading branch information
Nicole00 committed Nov 24, 2021
2 parents 32b5225 + 5923b3d commit 8644d59
Show file tree
Hide file tree
Showing 60 changed files with 414 additions and 205 deletions.
14 changes: 0 additions & 14 deletions LICENSES/CC-1.0.txt

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
* This source code is licensed under Apache 2.0 License.
*/

package org.apache.flink.connector.nebula.catalog;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
* This source code is licensed under Apache 2.0 License.
*/

package org.apache.flink.connector.nebula.catalog;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
* This source code is licensed under Apache 2.0 License.
*/

package org.apache.flink.connector.nebula.catalog.factory;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
* This source code is licensed under Apache 2.0 License.
*/

package org.apache.flink.connector.nebula.catalog.factory;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

package org.apache.flink.connector.nebula.connection;

import java.io.Serializable;

public class CASignParams implements Serializable {

private String caCrtFilePath;
private String crtFilePath;
private String keyFilePath;

public CASignParams(String caCrtFilePath, String crtFilePath, String keyFilePath) {
this.caCrtFilePath = caCrtFilePath;
this.crtFilePath = crtFilePath;
this.keyFilePath = keyFilePath;
}

public String getCaCrtFilePath() {
return caCrtFilePath;
}

public void setCaCrtFilePath(String caCrtFilePath) {
this.caCrtFilePath = caCrtFilePath;
}

public String getCrtFilePath() {
return crtFilePath;
}

public void setCrtFilePath(String crtFilePath) {
this.crtFilePath = crtFilePath;
}

public String getKeyFilePath() {
return keyFilePath;
}

public void setKeyFilePath(String keyFilePath) {
this.keyFilePath = keyFilePath;
}

@Override
public String toString() {
return "CASSLSignParams{"
+ "caCrtFilePath='" + caCrtFilePath + '\''
+ ", crtFilePath='" + crtFilePath + '\''
+ ", keyFilePath='" + keyFilePath + '\''
+ '}';
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
* This source code is licensed under Apache 2.0 License.
*/

package org.apache.flink.connector.nebula.connection;

import com.vesoft.nebula.client.graph.data.CASignedSSLParam;
import com.vesoft.nebula.client.graph.data.HostAddress;
import com.vesoft.nebula.client.graph.data.SelfSignedSSLParam;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -35,18 +32,21 @@ public class NebulaClientOptions implements Serializable {

private final boolean enableMetaSSL;

private final boolean enableStorageSSL;

private final SSLSighType sslSighType;

private final CASignedSSLParam caSignParam;
private final CASignParams caSignParams;

private final SelfSignedSSLParam selfSignParam;
private final SelfSignParams selfSignParams;


private NebulaClientOptions(String metaAddress, String graphAddress, String username,
String password, int timeout, int connectRetry,
boolean enableGraphSSL, boolean enableMetaSSL,
SSLSighType sslSighType, CASignedSSLParam caSignParam,
SelfSignedSSLParam selfSignParam) {
boolean enableStorageSSL,
SSLSighType sslSighType, CASignParams caSignParams,
SelfSignParams selfSignParams) {
this.metaAddress = metaAddress;
this.graphAddress = graphAddress;
this.username = username;
Expand All @@ -55,9 +55,10 @@ private NebulaClientOptions(String metaAddress, String graphAddress, String user
this.connectRetry = connectRetry;
this.enableGraphSSL = enableGraphSSL;
this.enableMetaSSL = enableMetaSSL;
this.enableStorageSSL = enableStorageSSL;
this.sslSighType = sslSighType;
this.caSignParam = caSignParam;
this.selfSignParam = selfSignParam;
this.caSignParams = caSignParams;
this.selfSignParams = selfSignParams;
}

public List<HostAddress> getMetaAddress() {
Expand Down Expand Up @@ -97,16 +98,20 @@ public boolean isEnableMetaSSL() {
return enableMetaSSL;
}

public boolean isEnableStorageSSL() {
return enableStorageSSL;
}

public SSLSighType getSSLSighType() {
return sslSighType;
}

public CASignedSSLParam getCaSignParam() {
return caSignParam;
public CASignParams getCaSignParam() {
return caSignParams;
}

public SelfSignedSSLParam getSelfSignParam() {
return selfSignParam;
public SelfSignParams getSelfSignParam() {
return selfSignParams;
}

/**
Expand All @@ -123,9 +128,10 @@ public static class NebulaClientOptionsBuilder {
// ssl options
private boolean enableGraphSSL = false;
private boolean enableMetaSSL = false;
private boolean enableStorageSSL = false;
private SSLSighType sslSighType = null;
private CASignedSSLParam caSignParam = null;
private SelfSignedSSLParam selfSignParam = null;
private CASignParams caSignParams = null;
private SelfSignParams selfSignParams = null;

public NebulaClientOptionsBuilder setMetaAddress(String metaAddress) {
this.metaAddress = metaAddress;
Expand Down Expand Up @@ -167,48 +173,54 @@ public NebulaClientOptionsBuilder setEnableMetaSSL(boolean enableMetaSSL) {
return this;
}

public NebulaClientOptionsBuilder setEnableStorageSSL(boolean enableStorageSSL) {
this.enableStorageSSL = enableStorageSSL;
return this;
}


public NebulaClientOptionsBuilder setSSLSignType(SSLSighType sslSighType) {
this.sslSighType = sslSighType;
return this;
}

public NebulaClientOptionsBuilder setCaSignParam(String caCrtFilePath, String crtFilePath,
String keyFilePath) {
this.caSignParam = new CASignedSSLParam(caCrtFilePath, crtFilePath,
keyFilePath);
this.caSignParams = new CASignParams(caCrtFilePath, crtFilePath, keyFilePath);
return this;
}

public NebulaClientOptionsBuilder setSelfSignParam(String crtFilePath, String keyFilePath,
String password) {
this.selfSignParam = new SelfSignedSSLParam(crtFilePath, keyFilePath, password);
this.selfSignParams = new SelfSignParams(crtFilePath, keyFilePath, password);
return this;
}

public NebulaClientOptions build() {
if (metaAddress == null || metaAddress.trim().isEmpty()) {
throw new IllegalArgumentException("meta address can not be empty.");
}
if (enableMetaSSL || enableGraphSSL) {
// if meta is set to open ssl, then graph must be set to open ssl
if (enableMetaSSL && !enableGraphSSL) {
if (enableMetaSSL || enableGraphSSL || enableStorageSSL) {
// if storage is set to open ssl, then meta must be set to open ssl
if (enableStorageSSL && !enableMetaSSL) {
throw new IllegalArgumentException(
"meta ssl is enable, graph ssl must be enable");
"storage ssl is enabled, meta ssl must be enabled.");
}

if (sslSighType == null) {
throw new IllegalArgumentException("ssl is enable, ssl sign type must not be "
throw new IllegalArgumentException("ssl is enabled, ssl sign type must not be "
+ "null");
}
switch (sslSighType) {
case CA:
if (caSignParam == null) {
throw new IllegalArgumentException("ssl is enable and sign type is "
if (caSignParams == null) {
throw new IllegalArgumentException("ssl is enabled and sign type is "
+ "CA, caSignParam must not be null");
}
break;
case SELF:
if (selfSignParam == null) {
throw new IllegalArgumentException("ssl is enable and sign type is "
if (selfSignParams == null) {
throw new IllegalArgumentException("ssl is enabled and sign type is "
+ "CA, selfSignParam must not be null");
}
break;
Expand All @@ -227,9 +239,10 @@ public NebulaClientOptions build() {
connectRetry,
enableGraphSSL,
enableMetaSSL,
enableStorageSSL,
sslSighType,
caSignParam,
selfSignParam);
caSignParams,
selfSignParams);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,20 +1,17 @@
/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
* This source code is licensed under Apache 2.0 License.
*/

package org.apache.flink.connector.nebula.connection;


import com.vesoft.nebula.client.graph.NebulaPoolConfig;
import com.vesoft.nebula.client.graph.data.CASignedSSLParam;
import com.vesoft.nebula.client.graph.data.HostAddress;
import com.vesoft.nebula.client.graph.exception.AuthFailedException;
import com.vesoft.nebula.client.graph.exception.ClientServerIncompatibleException;
import com.vesoft.nebula.client.graph.exception.IOErrorException;
import com.vesoft.nebula.client.graph.exception.NotValidConnectionException;
import com.vesoft.nebula.client.graph.data.SSLParam;
import com.vesoft.nebula.client.graph.data.SelfSignedSSLParam;
import com.vesoft.nebula.client.graph.net.NebulaPool;
import com.vesoft.nebula.client.graph.net.Session;
import java.io.Serializable;
import java.net.UnknownHostException;
import java.util.ArrayList;
Expand Down Expand Up @@ -52,12 +49,20 @@ public NebulaPool getNebulaPool() throws UnknownHostException {
if (nebulaClientOptions.isEnableGraphSSL()) {
poolConfig.setEnableSsl(true);
switch (nebulaClientOptions.getSSLSighType()) {
case CA:
poolConfig.setSslParam(nebulaClientOptions.getCaSignParam());
case CA: {
CASignParams caSignParams = nebulaClientOptions.getCaSignParam();
SSLParam sslParam = new CASignedSSLParam(caSignParams.getCaCrtFilePath(),
caSignParams.getCrtFilePath(), caSignParams.getKeyFilePath());
poolConfig.setSslParam(sslParam);
break;
case SELF:
poolConfig.setSslParam(nebulaClientOptions.getSelfSignParam());
}
case SELF: {
SelfSignParams selfSignParams = nebulaClientOptions.getSelfSignParam();
SSLParam sslParam = new SelfSignedSSLParam(selfSignParams.getCrtFilePath(),
selfSignParams.getKeyFilePath(), selfSignParams.getPassword());
poolConfig.setSslParam(sslParam);
break;
}
default:
throw new IllegalArgumentException("ssl sign type is not supported.");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
* This source code is licensed under Apache 2.0 License.
*/

package org.apache.flink.connector.nebula.connection;

import com.facebook.thrift.TException;
import com.vesoft.nebula.client.graph.data.CASignedSSLParam;
import com.vesoft.nebula.client.graph.data.HostAddress;
import com.vesoft.nebula.client.graph.data.SSLParam;
import com.vesoft.nebula.client.graph.data.SelfSignedSSLParam;
import com.vesoft.nebula.client.graph.exception.ClientServerIncompatibleException;
import com.vesoft.nebula.client.meta.MetaClient;
import com.vesoft.nebula.client.meta.exception.ExecuteFailedException;
Expand All @@ -16,11 +18,9 @@
import com.vesoft.nebula.meta.Schema;
import com.vesoft.nebula.meta.SpaceItem;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.connector.nebula.utils.NebulaConstant;
import org.apache.flink.connector.nebula.utils.VidTypeEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -37,7 +37,32 @@ public NebulaMetaConnectionProvider(NebulaClientOptions nebulaClientOptions) {

public MetaClient getMetaClient() throws TException, ClientServerIncompatibleException {
List<HostAddress> addresses = nebulaClientOptions.getMetaAddress();
MetaClient metaClient = new MetaClient(addresses);
int timeout = nebulaClientOptions.getTimeout();
int retry = nebulaClientOptions.getConnectRetry();
MetaClient metaClient;
if (nebulaClientOptions.isEnableMetaSSL()) {
switch (nebulaClientOptions.getSSLSighType()) {
case CA: {
CASignParams caSignParams = nebulaClientOptions.getCaSignParam();
SSLParam sslParam = new CASignedSSLParam(caSignParams.getCaCrtFilePath(),
caSignParams.getCrtFilePath(), caSignParams.getKeyFilePath());
metaClient = new MetaClient(addresses, timeout, retry, retry, true, sslParam);
break;
}
case SELF: {
SelfSignParams selfSignParams = nebulaClientOptions.getSelfSignParam();
SSLParam sslParam = new SelfSignedSSLParam(selfSignParams.getCrtFilePath(),
selfSignParams.getKeyFilePath(), selfSignParams.getPassword());
metaClient = new MetaClient(addresses, timeout, retry, retry, true, sslParam);
break;
}
default:
throw new IllegalArgumentException("ssl sign type is not supported.");
}
} else {
metaClient = new MetaClient(addresses, timeout, retry, retry);
}

metaClient.connect();
return metaClient;
}
Expand Down
Loading

0 comments on commit 8644d59

Please sign in to comment.