Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zmq secure connect implementation #641

Merged
merged 21 commits into from
Oct 3, 2018
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
56a07ca
zmq secure connection implement
AionJayT Sep 6, 2018
9fc903a
Merge branch 'native-check' into secure-zmq
AionJayT Sep 6, 2018
49c46a7
implement the curve key loading in the zmq ProtocolProcessor
AionJayT Sep 6, 2018
f994aea
more detail for the log and the config settings
AionJayT Sep 6, 2018
56998ef
Merge branch 'master' into secure-zmq
AionJayT Sep 7, 2018
91c64b1
fix zmq key loading error
AionJayT Sep 10, 2018
3d570d6
fix api secure connect config reset after kernel launch
AionJayT Sep 10, 2018
17fc73c
modify zmq key naming rule for the OS compatibility
AionJayT Sep 10, 2018
8067f6f
Merge branch 'master-pre-merge' into secure-zmq
AionJayT Sep 14, 2018
49c0ea7
add zmq_keystore into gitignore
AionJayT Sep 25, 2018
3f84a20
refactoring the getFiles method to a class
AionJayT Sep 25, 2018
1c75109
refactoring zmq secure connection for logging
AionJayT Sep 25, 2018
33c68bd
refactoring CfgApiZmq logging, ProtocolProcessor construct process an…
AionJayT Sep 25, 2018
0f14ba4
fix merge error
aionjay Oct 1, 2018
04672de
update api repo ref
aionjay Oct 1, 2018
415caea
move zmq key pair generate ouside of Cli. Will be checked after confi…
aionjay Oct 2, 2018
27caad4
undo format Cli and remove comment in Aion class
aionjay Oct 2, 2018
a51c944
fix pack issue
aionjay Oct 2, 2018
117a502
Merge branch 'master-pre-merge' into secure-zmq
aionjay Oct 3, 2018
699080a
fix module-info settings for modAionImpl
aionjay Oct 3, 2018
2208f82
update default config settings for the Java API secure connect
aionjay Oct 3, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
/log/
/rt/
/web3/
/zmq_keystore/
/jars/

# ide
Expand Down
2 changes: 1 addition & 1 deletion aion_api
1 change: 1 addition & 0 deletions modAionBase/src/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@
exports org.aion.base.util;
exports org.aion.base.vm;
exports org.aion.base.db;
exports org.aion.base.io;
exports org.aion.base;
}
46 changes: 46 additions & 0 deletions modAionBase/src/org/aion/base/io/File.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (c) 2017-2018 Aion foundation.
*
* This file is part of the aion network project.
*
* The aion network project is free software: you can redistribute it
* and/or modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation, either version 3 of
* the License, or any later version.
*
* The aion network project is distributed in the hope that it will
* be useful, but WITHOUT ANY WARRANTY; without even the implied
* warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with the aion network project source files.
* If not, see <https://www.gnu.org/licenses/>.
*
* Contributors:
* Aion foundation.
*/

package org.aion.base.io;

import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

public class File {
public static List<java.io.File> getFiles(final Path path) {
if (path == null) {
System.out.println("getFiles null path input!");
return Collections.emptyList();
}

try {
java.io.File[] files = path.toFile().listFiles();
return files != null ? Arrays.asList(files) : Collections.emptyList();
} catch (UnsupportedOperationException | NullPointerException e) {
System.out.println("getFiles exception: " + e.toString());
return Collections.emptyList();
}
}
}
2 changes: 2 additions & 0 deletions modAionImpl/build.xml
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@
<pathelement location="${dir.lib}/guava-25.1-jre.jar"/>
<pathelement location="${dir.lib}/libnsc.jar"/>
<pathelement location="${dir.lib}/slf4j-api-1.7.25.jar"/>
<pathelement location="${dir.lib}/libnzmq.jar"/>
<pathelement location="${dir.lib}/jsr305-3.0.2.jar"/>
</modulepath>
</javac>

Expand Down
3 changes: 3 additions & 0 deletions modAionImpl/src/module-info.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
module aion.zero.impl {
uses org.aion.evtmgr.EventMgrModule;
uses org.aion.txpool.TxPoolModule;
requires aion.base;
requires aion.mcf;
requires aion.log;
Expand All @@ -17,6 +19,7 @@
requires jdk.management;
requires java.xml;
requires slf4j.api;
requires com.google.common;

exports org.aion.equihash;
exports org.aion.zero.impl.blockchain;
Expand Down
4 changes: 2 additions & 2 deletions modApiServer/src/org/aion/api/server/ApiAion.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public abstract class ApiAion extends Api {
// 2. underlying datastructure provides concurrency guarntees

// delegate concurrency to underlying object
protected static NrgOracle NRG_ORACLE;
private static NrgOracle NRG_ORACLE;
protected IAionChain ac; // assumption: blockchainImpl et al. provide concurrency guarantee

// using java.util.concurrent library objects
Expand Down Expand Up @@ -224,7 +224,7 @@ public AionBlock getBlock(long blkNr) {
}
}

public Map.Entry<AionBlock, BigInteger> getBlockWithTotalDifficulty(long blkNr) {
protected Map.Entry<AionBlock, BigInteger> getBlockWithTotalDifficulty(long blkNr) {
if (blkNr > 0) {
return ((AionBlockStore) this.ac.getBlockchain().getBlockStore())
.getChainBlockByNumberWithTotalDifficulty(blkNr);
Expand Down
15 changes: 7 additions & 8 deletions modApiServer/src/org/aion/api/server/zmq/HdlrZmq.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/*******************************************************************************
/*
* Copyright (c) 2017-2018 Aion foundation.
*
* This file is part of the aion network project.
Expand All @@ -19,8 +19,7 @@
*
* Contributors:
* Aion foundation.
*
******************************************************************************/
*/

package org.aion.api.server.zmq;

Expand Down Expand Up @@ -70,7 +69,7 @@ public byte[] process(byte[] request, byte[] socketId) {
}
}

public void getTxWait() {
void getTxWait() {
TxWaitingMappingUpdate txWait = null;
try {
txWait = this.api.takeTxWait();
Expand Down Expand Up @@ -107,15 +106,15 @@ public Map<Long, Fltr> getFilter() {
return this.api.getFilter();
}

public BlockingQueue<TxPendingStatus> getTxStatusQueue() {
BlockingQueue<TxPendingStatus> getTxStatusQueue() {
return this.api.getPendingStatus();
}

public byte[] toRspMsg(byte[] msgHash, int txCode, String error) {
byte[] toRspMsg(byte[] msgHash, int txCode, String error) {
return ApiUtil.toReturnHeader(this.api.getApiVersion(), txCode, msgHash, error.getBytes());
}

public byte[] toRspMsg(byte[] msgHash, int txCode, String error, byte[] result) {
byte[] toRspMsg(byte[] msgHash, int txCode, String error, byte[] result) {
return ApiUtil.toReturnHeader(this.api.getApiVersion(), txCode, msgHash, error.getBytes(), result);
}

Expand All @@ -124,7 +123,7 @@ public byte[] process(byte[] request) {
return null;
}

public byte[] toRspEvtMsg(byte[] ecb) {
byte[] toRspEvtMsg(byte[] ecb) {
return ApiUtil.toReturnEvtHeader(this.api.getApiVersion(), ecb);
}

Expand Down
105 changes: 90 additions & 15 deletions modApiServer/src/org/aion/api/server/zmq/ProtocolProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,12 @@
import static org.zeromq.ZMQ.DEALER;
import static org.zeromq.ZMQ.ROUTER;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand All @@ -53,22 +58,32 @@
public class ProtocolProcessor implements Runnable {

protected static final Logger LOG = AionLoggerFactory.getLogger(LogEnum.API.name());
private final IHdlr handler;
private static final String AION_ZMQ_WK_TH = "inproc://aionZmqWkTh";
private static final String AION_ZMQ_CB_TH = "inproc://aionZmqCbTh";
private static final String AION_ZMQ_EV_TH = "inproc://aionZmqEvTh";
private static final String AION_ZMQ_HB_TH = "inproc://aionZmqHbTh";

private CfgApiZmq cfgApi;
private AtomicBoolean shutDown = new AtomicBoolean();

private final Path PATH;
private static final long zmqHWM = 100_000;
private static final int SOCKETID_LEN = 5;
private static final int SOCKET_RECV_TIMEOUT = 3000;

private final IHdlr handler;
private CfgApiZmq cfgApi;
private AtomicBoolean shutDown = new AtomicBoolean();
private byte[] curvePubKey;
private byte[] curveSecKey;

public ProtocolProcessor(IHdlr _handler, final CfgApiZmq cfg) {
this.handler = _handler;
this.cfgApi = cfg;

String storageDir = System.getProperty("local.storage.dir");
if (storageDir == null || storageDir.equalsIgnoreCase("")) {
storageDir = System.getProperty("user.dir");
}

String curveKeyPath = storageDir + "/" + CfgApiZmq.ZMQ_KEY_DIR;
PATH = Paths.get(curveKeyPath);
}

public void shutdown() throws InterruptedException {
Expand All @@ -90,6 +105,23 @@ public void run() {

// create router sock.
Socket feSock = ctx.socket(ROUTER);
if (cfgApi.isSecureConnectEnabledEnabled()) {
// Currently the system will only load the first pair of the key files.
loadCurveKeyPair();

if (curveSecKey != null && curvePubKey != null) {
feSock.setZAPDomain("global".getBytes());
feSock.setCurveServer(true);
feSock.setCurvePublicKey(curvePubKey);
feSock.setCurveSecretKey(curveSecKey);
LOG.info("Secure connection enabled!");
} else {
LOG.info("Can't find the keyfile for setup the connection. Secure connection disabled!");
}
} else {
LOG.info("Secure connection disabled!");
}

feSock.setSndHWM(zmqHWM);
feSock.bind(bindAddr);

Expand Down Expand Up @@ -144,6 +176,39 @@ public void run() {
}
}

private void loadCurveKeyPair() {
List<File> files = org.aion.base.io.File.getFiles(PATH);
String nextLoad = "";
for (File f : files) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a benefit to this fall-back/fuzzy logic? Why not just have two hard-coded file names it looks for?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking the user will generate several keypairs in case need to switch the keyset. The public key also needs to be updated to the client side, then the connection will be setup.

if (f.getName().contains("zmqCurvePubkey")) {
try {
curvePubKey = Files.readAllBytes(f.toPath());
nextLoad = f.getName().replace("zmqCurvePubkey", "zmqCurveSeckey");
} catch (IOException e) {
LOG.error("Get zmqCurvePubkey exception! {}", e.toString());
}
} else if (f.getName().contains("zmqCurveSeckey")) {
try {
curveSecKey = Files.readAllBytes(f.toPath());
nextLoad = f.getName().replace("zmqCurveSeckey", "zmqCurvePubkey");
} catch (IOException e) {
LOG.error("Get zmqCurveSeckey exception! {}", e.toString());
}
} else if (nextLoad.contentEquals(f.getName())){
try {
if (nextLoad.contains("zmqCurveSeckey")) {
curveSecKey = Files.readAllBytes(f.toPath());
} else {
curvePubKey = Files.readAllBytes(f.toPath());
}
} catch (IOException e) {
LOG.error("Get zmqCurveSeckey exception! {}", e.toString());
}
break;
}
}
}

private void eventRun(Context ctx) {
Socket sock = ctx.socket(ZMQ.DEALER);
sock.connect(AION_ZMQ_EV_TH);
Expand All @@ -168,15 +233,18 @@ private void eventRun(Context ctx) {
}

if (!al.isEmpty()) {
Message.rsp_EventCtCallback ecb = Message.rsp_EventCtCallback.newBuilder().addAllEc(al).build();
Message.rsp_EventCtCallback ecb = Message.rsp_EventCtCallback.newBuilder()
.addAllEc(al).build();
byte[] rsp = ((HdlrZmq) this.handler).toRspEvtMsg(ecb.toByteArray());

try {
byte[] socketId = ByteBuffer.allocate(5).put(ByteUtil.longToBytes(i), 3, 5).array();
byte[] socketId = ByteBuffer.allocate(5)
.put(ByteUtil.longToBytes(i), 3, 5).array();
sock.send(socketId, ZMQ.SNDMORE);
sock.send(rsp, ZMQ.DONTWAIT);
} catch (Exception e) {
LOG.error("ProtocolProcessor.callbackRun sock.send exception: " + e.getMessage());
LOG.error("ProtocolProcessor.callbackRun sock.send exception: " + e
.getMessage());
}
}
}
Expand Down Expand Up @@ -220,19 +288,25 @@ private void callbackRun(Context ctx) {
}

byte[] rsp = tps.toTxReturnCode() != 105
? ((HdlrZmq) this.handler).toRspMsg(tps.getMsgHash(), tps.toTxReturnCode(), tps.getError())
: ((HdlrZmq) this.handler).toRspMsg(tps.getMsgHash(), tps.toTxReturnCode(), tps.getError(), tps.getTxResult());
? ((HdlrZmq) this.handler)
.toRspMsg(tps.getMsgHash(), tps.toTxReturnCode(), tps.getError())
: ((HdlrZmq) this.handler)
.toRspMsg(tps.getMsgHash(), tps.toTxReturnCode(), tps.getError(),
tps.getTxResult());
if (LOG.isTraceEnabled()) {
LOG.trace("callbackRun send. socketID: [{}], msgHash: [{}], txReturnCode: [{}]/n rspMsg: [{}]",
Hex.toHexString(tps.getSocketId()), Hex.toHexString(tps.getMsgHash()), tps.toTxReturnCode(),
Hex.toHexString(rsp));
LOG.trace(
"callbackRun send. socketID: [{}], msgHash: [{}], txReturnCode: [{}]/n rspMsg: [{}]",
Hex.toHexString(tps.getSocketId()), Hex.toHexString(tps.getMsgHash()),
tps.toTxReturnCode(),
Hex.toHexString(rsp));
}
try {
sock.send(tps.getSocketId(), ZMQ.SNDMORE);
sock.send(rsp, ZMQ.DONTWAIT);
} catch (Exception e) {
if (LOG.isErrorEnabled()) {
LOG.error("ProtocolProcessor.callbackRun sock.send exception: " + e.getMessage());
LOG.error(
"ProtocolProcessor.callbackRun sock.send exception: " + e.getMessage());
}
}
}
Expand All @@ -251,7 +325,8 @@ private void workerRun(ZMQ.Context ctx) {
try {
byte[] socketId = sock.recv(0);
if (LOG.isTraceEnabled()) {
LOG.trace("ProtocolProcessor.workerRun socketID: [{}]", Hex.toHexString(socketId));
LOG.trace("ProtocolProcessor.workerRun socketID: [{}]",
Hex.toHexString(socketId));
}
if (socketId != null && socketId.length == SOCKETID_LEN) {
byte[] req = sock.recv(0);
Expand Down
5 changes: 4 additions & 1 deletion modBoot/resource/mainnet/config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@
<!--comma-separated list, APIs available: web3,net,debug,personal,eth,stratum-->
<apis-enabled>web3,eth,personal,stratum,ops</apis-enabled>
</rpc>
<java active="true" ip="127.0.0.1" port="8547"></java>
<java active="true" ip="127.0.0.1" port="8547">
<secure-connect>true</secure-connect>
</java>

<nrg-recommendation>
<!--default NRG price used by api if oracle disabled, minimum price recommended by oracle-->
<default>10E9</default>
Expand Down
6 changes: 4 additions & 2 deletions modBoot/src/module-info.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
module aion.boot {
uses org.aion.evtmgr.EventMgrModule;
uses org.aion.log.AionLoggerFactory;

requires aion.crypto;
requires aion.apiserver;
Expand All @@ -9,8 +11,8 @@
requires slf4j.api;
requires aion.p2p;
requires aion.fastvm;


requires aion.base;
requires libnzmq;

exports org.aion;
}
Loading