Skip to content

Commit

Permalink
Merge pull request #641 from aionnetwork/secure-zmq
Browse files Browse the repository at this point in the history
Zmq secure connect implementation
  • Loading branch information
AionJayT authored Oct 3, 2018
2 parents fd5721f + 2208f82 commit 4aed4ca
Show file tree
Hide file tree
Showing 18 changed files with 331 additions and 87 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
/log/
/rt/
/web3/
/zmq_keystore/
/jars/

# ide
Expand Down
2 changes: 1 addition & 1 deletion aion_api
1 change: 1 addition & 0 deletions modAionBase/src/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@
exports org.aion.base.util;
exports org.aion.base.vm;
exports org.aion.base.db;
exports org.aion.base.io;
exports org.aion.base;
}
46 changes: 46 additions & 0 deletions modAionBase/src/org/aion/base/io/File.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (c) 2017-2018 Aion foundation.
*
* This file is part of the aion network project.
*
* The aion network project is free software: you can redistribute it
* and/or modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation, either version 3 of
* the License, or any later version.
*
* The aion network project is distributed in the hope that it will
* be useful, but WITHOUT ANY WARRANTY; without even the implied
* warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with the aion network project source files.
* If not, see <https://www.gnu.org/licenses/>.
*
* Contributors:
* Aion foundation.
*/

package org.aion.base.io;

import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

public class File {
public static List<java.io.File> getFiles(final Path path) {
if (path == null) {
System.out.println("getFiles null path input!");
return Collections.emptyList();
}

try {
java.io.File[] files = path.toFile().listFiles();
return files != null ? Arrays.asList(files) : Collections.emptyList();
} catch (UnsupportedOperationException | NullPointerException e) {
System.out.println("getFiles exception: " + e.toString());
return Collections.emptyList();
}
}
}
2 changes: 2 additions & 0 deletions modAionImpl/build.xml
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@
<pathelement location="${dir.lib}/guava-25.1-jre.jar"/>
<pathelement location="${dir.lib}/libnsc.jar"/>
<pathelement location="${dir.lib}/slf4j-api-1.7.25.jar"/>
<pathelement location="${dir.lib}/libnzmq.jar"/>
<pathelement location="${dir.lib}/jsr305-3.0.2.jar"/>
<pathelement location="${dir.lib}/picocli-3.5.1.jar"/>
</modulepath>
</javac>
Expand Down
6 changes: 6 additions & 0 deletions modAionImpl/src/module-info.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
module aion.zero.impl {
uses org.aion.evtmgr.EventMgrModule;
uses org.aion.txpool.TxPoolModule;
requires aion.base;
requires aion.mcf;
requires aion.log;
Expand All @@ -17,6 +19,9 @@
requires jdk.management;
requires java.xml;
requires slf4j.api;
requires com.google.common;
requires info.picocli;
requires commons.lang3;

exports org.aion.equihash;
exports org.aion.zero.impl.blockchain;
Expand All @@ -25,6 +30,7 @@
exports org.aion.zero.impl.types;
exports org.aion.zero.impl.config;
exports org.aion.zero.impl.cli;
opens org.aion.zero.impl.cli;
exports org.aion.zero.impl.db;
exports org.aion.zero.impl.sync;
exports org.aion.zero.impl.config.dynamic;
Expand Down
4 changes: 2 additions & 2 deletions modApiServer/src/org/aion/api/server/ApiAion.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public abstract class ApiAion extends Api {
// 2. underlying datastructure provides concurrency guarntees

// delegate concurrency to underlying object
protected static NrgOracle NRG_ORACLE;
private static NrgOracle NRG_ORACLE;
protected IAionChain ac; // assumption: blockchainImpl et al. provide concurrency guarantee

// using java.util.concurrent library objects
Expand Down Expand Up @@ -224,7 +224,7 @@ public AionBlock getBlock(long blkNr) {
}
}

public Map.Entry<AionBlock, BigInteger> getBlockWithTotalDifficulty(long blkNr) {
protected Map.Entry<AionBlock, BigInteger> getBlockWithTotalDifficulty(long blkNr) {
if (blkNr > 0) {
return ((AionBlockStore) this.ac.getBlockchain().getBlockStore())
.getChainBlockByNumberWithTotalDifficulty(blkNr);
Expand Down
15 changes: 7 additions & 8 deletions modApiServer/src/org/aion/api/server/zmq/HdlrZmq.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/*******************************************************************************
/*
* Copyright (c) 2017-2018 Aion foundation.
*
* This file is part of the aion network project.
Expand All @@ -19,8 +19,7 @@
*
* Contributors:
* Aion foundation.
*
******************************************************************************/
*/

package org.aion.api.server.zmq;

Expand Down Expand Up @@ -70,7 +69,7 @@ public byte[] process(byte[] request, byte[] socketId) {
}
}

public void getTxWait() {
void getTxWait() {
TxWaitingMappingUpdate txWait = null;
try {
txWait = this.api.takeTxWait();
Expand Down Expand Up @@ -107,15 +106,15 @@ public Map<Long, Fltr> getFilter() {
return this.api.getFilter();
}

public BlockingQueue<TxPendingStatus> getTxStatusQueue() {
BlockingQueue<TxPendingStatus> getTxStatusQueue() {
return this.api.getPendingStatus();
}

public byte[] toRspMsg(byte[] msgHash, int txCode, String error) {
byte[] toRspMsg(byte[] msgHash, int txCode, String error) {
return ApiUtil.toReturnHeader(this.api.getApiVersion(), txCode, msgHash, error.getBytes());
}

public byte[] toRspMsg(byte[] msgHash, int txCode, String error, byte[] result) {
byte[] toRspMsg(byte[] msgHash, int txCode, String error, byte[] result) {
return ApiUtil.toReturnHeader(this.api.getApiVersion(), txCode, msgHash, error.getBytes(), result);
}

Expand All @@ -124,7 +123,7 @@ public byte[] process(byte[] request) {
return null;
}

public byte[] toRspEvtMsg(byte[] ecb) {
byte[] toRspEvtMsg(byte[] ecb) {
return ApiUtil.toReturnEvtHeader(this.api.getApiVersion(), ecb);
}

Expand Down
105 changes: 90 additions & 15 deletions modApiServer/src/org/aion/api/server/zmq/ProtocolProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,12 @@
import static org.zeromq.ZMQ.DEALER;
import static org.zeromq.ZMQ.ROUTER;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand All @@ -53,22 +58,32 @@
public class ProtocolProcessor implements Runnable {

protected static final Logger LOG = AionLoggerFactory.getLogger(LogEnum.API.name());
private final IHdlr handler;
private static final String AION_ZMQ_WK_TH = "inproc://aionZmqWkTh";
private static final String AION_ZMQ_CB_TH = "inproc://aionZmqCbTh";
private static final String AION_ZMQ_EV_TH = "inproc://aionZmqEvTh";
private static final String AION_ZMQ_HB_TH = "inproc://aionZmqHbTh";

private CfgApiZmq cfgApi;
private AtomicBoolean shutDown = new AtomicBoolean();

private final Path PATH;
private static final long zmqHWM = 100_000;
private static final int SOCKETID_LEN = 5;
private static final int SOCKET_RECV_TIMEOUT = 3000;

private final IHdlr handler;
private CfgApiZmq cfgApi;
private AtomicBoolean shutDown = new AtomicBoolean();
private byte[] curvePubKey;
private byte[] curveSecKey;

public ProtocolProcessor(IHdlr _handler, final CfgApiZmq cfg) {
this.handler = _handler;
this.cfgApi = cfg;

String storageDir = System.getProperty("local.storage.dir");
if (storageDir == null || storageDir.equalsIgnoreCase("")) {
storageDir = System.getProperty("user.dir");
}

String curveKeyPath = storageDir + "/" + CfgApiZmq.ZMQ_KEY_DIR;
PATH = Paths.get(curveKeyPath);
}

public void shutdown() throws InterruptedException {
Expand All @@ -90,6 +105,23 @@ public void run() {

// create router sock.
Socket feSock = ctx.socket(ROUTER);
if (cfgApi.isSecureConnectEnabledEnabled()) {
// Currently the system will only load the first pair of the key files.
loadCurveKeyPair();

if (curveSecKey != null && curvePubKey != null) {
feSock.setZAPDomain("global".getBytes());
feSock.setCurveServer(true);
feSock.setCurvePublicKey(curvePubKey);
feSock.setCurveSecretKey(curveSecKey);
LOG.info("Secure connection enabled!");
} else {
LOG.info("Can't find the keyfile for setup the connection. Secure connection disabled!");
}
} else {
LOG.info("Secure connection disabled!");
}

feSock.setSndHWM(zmqHWM);
feSock.bind(bindAddr);

Expand Down Expand Up @@ -144,6 +176,39 @@ public void run() {
}
}

private void loadCurveKeyPair() {
List<File> files = org.aion.base.io.File.getFiles(PATH);
String nextLoad = "";
for (File f : files) {
if (f.getName().contains("zmqCurvePubkey")) {
try {
curvePubKey = Files.readAllBytes(f.toPath());
nextLoad = f.getName().replace("zmqCurvePubkey", "zmqCurveSeckey");
} catch (IOException e) {
LOG.error("Get zmqCurvePubkey exception! {}", e.toString());
}
} else if (f.getName().contains("zmqCurveSeckey")) {
try {
curveSecKey = Files.readAllBytes(f.toPath());
nextLoad = f.getName().replace("zmqCurveSeckey", "zmqCurvePubkey");
} catch (IOException e) {
LOG.error("Get zmqCurveSeckey exception! {}", e.toString());
}
} else if (nextLoad.contentEquals(f.getName())){
try {
if (nextLoad.contains("zmqCurveSeckey")) {
curveSecKey = Files.readAllBytes(f.toPath());
} else {
curvePubKey = Files.readAllBytes(f.toPath());
}
} catch (IOException e) {
LOG.error("Get zmqCurveSeckey exception! {}", e.toString());
}
break;
}
}
}

private void eventRun(Context ctx) {
Socket sock = ctx.socket(ZMQ.DEALER);
sock.connect(AION_ZMQ_EV_TH);
Expand All @@ -168,15 +233,18 @@ private void eventRun(Context ctx) {
}

if (!al.isEmpty()) {
Message.rsp_EventCtCallback ecb = Message.rsp_EventCtCallback.newBuilder().addAllEc(al).build();
Message.rsp_EventCtCallback ecb = Message.rsp_EventCtCallback.newBuilder()
.addAllEc(al).build();
byte[] rsp = ((HdlrZmq) this.handler).toRspEvtMsg(ecb.toByteArray());

try {
byte[] socketId = ByteBuffer.allocate(5).put(ByteUtil.longToBytes(i), 3, 5).array();
byte[] socketId = ByteBuffer.allocate(5)
.put(ByteUtil.longToBytes(i), 3, 5).array();
sock.send(socketId, ZMQ.SNDMORE);
sock.send(rsp, ZMQ.DONTWAIT);
} catch (Exception e) {
LOG.error("ProtocolProcessor.callbackRun sock.send exception: " + e.getMessage());
LOG.error("ProtocolProcessor.callbackRun sock.send exception: " + e
.getMessage());
}
}
}
Expand Down Expand Up @@ -220,19 +288,25 @@ private void callbackRun(Context ctx) {
}

byte[] rsp = tps.toTxReturnCode() != 105
? ((HdlrZmq) this.handler).toRspMsg(tps.getMsgHash(), tps.toTxReturnCode(), tps.getError())
: ((HdlrZmq) this.handler).toRspMsg(tps.getMsgHash(), tps.toTxReturnCode(), tps.getError(), tps.getTxResult());
? ((HdlrZmq) this.handler)
.toRspMsg(tps.getMsgHash(), tps.toTxReturnCode(), tps.getError())
: ((HdlrZmq) this.handler)
.toRspMsg(tps.getMsgHash(), tps.toTxReturnCode(), tps.getError(),
tps.getTxResult());
if (LOG.isTraceEnabled()) {
LOG.trace("callbackRun send. socketID: [{}], msgHash: [{}], txReturnCode: [{}]/n rspMsg: [{}]",
Hex.toHexString(tps.getSocketId()), Hex.toHexString(tps.getMsgHash()), tps.toTxReturnCode(),
Hex.toHexString(rsp));
LOG.trace(
"callbackRun send. socketID: [{}], msgHash: [{}], txReturnCode: [{}]/n rspMsg: [{}]",
Hex.toHexString(tps.getSocketId()), Hex.toHexString(tps.getMsgHash()),
tps.toTxReturnCode(),
Hex.toHexString(rsp));
}
try {
sock.send(tps.getSocketId(), ZMQ.SNDMORE);
sock.send(rsp, ZMQ.DONTWAIT);
} catch (Exception e) {
if (LOG.isErrorEnabled()) {
LOG.error("ProtocolProcessor.callbackRun sock.send exception: " + e.getMessage());
LOG.error(
"ProtocolProcessor.callbackRun sock.send exception: " + e.getMessage());
}
}
}
Expand All @@ -251,7 +325,8 @@ private void workerRun(ZMQ.Context ctx) {
try {
byte[] socketId = sock.recv(0);
if (LOG.isTraceEnabled()) {
LOG.trace("ProtocolProcessor.workerRun socketID: [{}]", Hex.toHexString(socketId));
LOG.trace("ProtocolProcessor.workerRun socketID: [{}]",
Hex.toHexString(socketId));
}
if (socketId != null && socketId.length == SOCKETID_LEN) {
byte[] req = sock.recv(0);
Expand Down
4 changes: 3 additions & 1 deletion modBoot/resource/custom/config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
<!--size of thread pool allocated for rpc requests-->
<threads>1</threads>
</rpc>
<java active="false" ip="127.0.0.1" port="8547"></java>
<java active="false" ip="127.0.0.1" port="8547">
<secure-connect>true</secure-connect>
</java>
<nrg-recommendation>
<!--default NRG price used by api if oracle disabled, minimum price recommended by oracle-->
<default>10E9</default>
Expand Down
7 changes: 5 additions & 2 deletions modBoot/resource/mainnet/config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@
<id>[NODE-ID-PLACEHOLDER]</id>
<api>
<!-- rpc config docs: https://github.com/aionnetwork/aion/wiki/JSON-RPC-API-Docs -->
<rpc active="true" ip="127.0.0.1" port="8545">
<rpc active="false" ip="127.0.0.1" port="8545">
<cors-enabled>false</cors-enabled>
<!--comma-separated list, APIs available: web3,net,debug,personal,eth,stratum-->
<apis-enabled>web3,eth,personal,stratum,ops</apis-enabled>
</rpc>
<java active="true" ip="127.0.0.1" port="8547"></java>
<java active="false" ip="127.0.0.1" port="8547">
<secure-connect>true</secure-connect>
</java>

<nrg-recommendation>
<!--default NRG price used by api if oracle disabled, minimum price recommended by oracle-->
<default>10E9</default>
Expand Down
4 changes: 3 additions & 1 deletion modBoot/resource/mastery/config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
<!--comma-separated list, APIs available: web3,net,debug,personal,eth,stratum-->
<apis-enabled>web3,eth,personal,stratum,ops</apis-enabled>
</rpc>
<java active="false" ip="127.0.0.1" port="8547"></java>
<java active="false" ip="127.0.0.1" port="8547">
<secure-connect>true</secure-connect>
</java>
<nrg-recommendation>
<!--default NRG price used by api if oracle disabled, minimum price recommended by oracle-->
<default>10E9</default>
Expand Down
Loading

0 comments on commit 4aed4ca

Please sign in to comment.