Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: setup kademlia and warp sync #8

Merged
merged 15 commits into from
Aug 30, 2024
1 change: 0 additions & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ teavm.js {
targetFileName = "fruzhin.js"
}


//TODO: Debug only. Remove when doing release build
teavm {
js {
Expand Down
10 changes: 8 additions & 2 deletions src/main/java/com/limechain/chain/lightsyncstate/Authority.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
package com.limechain.chain.lightsyncstate;

import com.limechain.teavm.annotation.Reflectable;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

import java.io.Serializable;
import java.math.BigInteger;

@Getter
@Setter
@Reflectable
@AllArgsConstructor
@NoArgsConstructor
public class Authority implements Serializable {
private final byte[] publicKey;
private final BigInteger weight;
private byte[] publicKey;
private BigInteger weight;
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
import lombok.Getter;
import lombok.Setter;
import com.limechain.tuple.Pair;
import lombok.ToString;

import java.math.BigInteger;

@Getter
@Setter
@ToString
public class AuthoritySet {
private Authority[] currentAuthorities;
private BigInteger setId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
import lombok.Getter;
import lombok.Setter;
import com.limechain.tuple.Pair;
import lombok.ToString;

import java.math.BigInteger;
import java.util.Map;

@Getter
@Setter
@ToString
public class EpochChanges {
private ForkTree<PersistedEpochHeader> inner;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,16 @@
import java.util.Map;

@Getter
@ToString
public class LightSyncState {
ablax marked this conversation as resolved.
Show resolved Hide resolved
private BlockHeader finalizedBlockHeader;
private EpochChanges epochChanges;
private AuthoritySet grandpaAuthoritySet;

public static LightSyncState decode(Map<String, String> lightSyncState) {
String header = lightSyncState.get("finalizedBlockHeader");
String epochChanges = lightSyncState.get("babeEpochChanges");
String grandpaAuthoritySet = lightSyncState.get("grandpaAuthoritySet");
public static LightSyncState decode(Map<String, String> lightSyncStateMap) {
String header = lightSyncStateMap.get("finalizedBlockHeader");
String epochChanges = lightSyncStateMap.get("babeEpochChanges");
String grandpaAuthoritySet = lightSyncStateMap.get("grandpaAuthoritySet");

if (header == null) {
throw new IllegalStateException("finalizedBlockHeader is null");
Expand All @@ -34,18 +35,18 @@ public static LightSyncState decode(Map<String, String> lightSyncState) {
}


var state = new LightSyncState();
LightSyncState lightSyncState = new LightSyncState();
byte[] bytes = StringUtils.hexToBytes(header);
state.finalizedBlockHeader = new BlockHeaderReader()
lightSyncState.finalizedBlockHeader = new BlockHeaderReader()
.read(new ScaleCodecReader(bytes));

byte[] bytes1 = StringUtils.hexToBytes(epochChanges);
state.epochChanges = new EpochChangesReader()
lightSyncState.epochChanges = new EpochChangesReader()
.read(new ScaleCodecReader(bytes1));

state.grandpaAuthoritySet = new AuthoritySetReader()
lightSyncState.grandpaAuthoritySet = new AuthoritySetReader()
.read(new ScaleCodecReader(StringUtils.hexToBytes(grandpaAuthoritySet)));

return state;
return lightSyncState;
}
}
4 changes: 1 addition & 3 deletions src/main/java/com/limechain/client/LightClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,13 @@ public LightClient() {
@SneakyThrows
public void start() {
this.network.start();

while (true) {
this.network.updateCurrentSelectedPeer();
if (this.network.getKademliaService().getSuccessfulBootNodes() > 0) {
log.log(Level.INFO, "Node successfully connected to a peer! Sync can start!");
this.warpSyncMachine = AppBean.getBean(WarpSyncMachine.class);
this.warpSyncMachine.start();
break;
} else {
this.network.updateCurrentSelectedPeer();
}
log.log(Level.INFO, "Waiting for peer connection...");
Thread.sleep(10000);
Expand Down
26 changes: 14 additions & 12 deletions src/main/java/com/limechain/network/Network.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import com.limechain.chain.ChainService;
import com.limechain.config.HostConfig;
import com.limechain.network.kad.KademliaService;
import com.limechain.network.protocol.warp.WarpSyncService;
import com.limechain.network.protocol.warp.dto.WarpSyncResponse;
import com.limechain.rpc.server.AppBean;
import com.limechain.sync.warpsync.WarpSyncState;
import lombok.Getter;
Expand Down Expand Up @@ -50,15 +52,18 @@ private void initializeProtocols(ChainService chainService,
HostConfig hostConfig) {

//
// String chainId = chainService.getChainSpec().getProtocolId();
// String warpProtocolId = ProtocolUtils.getWarpSyncProtocol(chainId);
String chainId = chainService.getChainSpec().getProtocolId();
String warpProtocolId = ProtocolUtils.getWarpSyncProtocol(chainId);
// String lightProtocolId = ProtocolUtils.getLightMessageProtocol(chainId);
// String blockAnnounceProtocolId = ProtocolUtils.getBlockAnnounceProtocol(chainId);
// String grandpaProtocolId = ProtocolUtils.getGrandpaProtocol(chainId);

kademliaService = new KademliaService();
warpSyncService = new WarpSyncService(warpProtocolId);
}

WarpSyncService warpSyncService;

// private Ed25519PrivateKey loadPrivateKeyFromDB(KVRepository<String, Object> repository) {
// Ed25519PrivateKey privateKey;
//
Expand Down Expand Up @@ -110,6 +115,7 @@ public void updateCurrentSelectedPeer() {
// if (connectionManager.getPeerIds().isEmpty()) return;
// this.currentSelectedPeer = connectionManager.getPeerIds().stream()
// .skip(RANDOM.nextInt(connectionManager.getPeerIds().size())).findAny().orElse(null);
kademliaService.updateSuccessfulBootNodes();
}

// public String getPeerId() {
Expand Down Expand Up @@ -138,7 +144,7 @@ public void findPeers() {
}
if (getPeersCount() >= REPLICATION) {
log.log(Level.INFO,
"Connections have reached replication factor(" + REPLICATION + "). " +
"Connections have reached replication factor(" + REPLICATION + "). " +
"No need to search for new ones yet.");
return;
}
Expand Down Expand Up @@ -177,7 +183,7 @@ public void pingPeers() {
// }
// }

// public BlockResponse syncBlock(PeerId peerId, BigInteger lastBlockNumber) {
// public BlockResponse syncBlock(PeerId peerId, BigInteger lastBlockNumber) {
// this.currentSelectedPeer = peerId;
// // TODO: fields, hash, direction and maxBlocks values not verified
// // TODO: when debugging could not get a value returned
Expand All @@ -202,14 +208,10 @@ public void pingPeers() {
// );
// }
//
// public WarpSyncResponse makeWarpSyncRequest(String blockHash) {
// if (isPeerInvalid()) return null;
//
// return this.warpSyncService.getProtocol().warpSyncRequest(
// this.host,
// this.currentSelectedPeer,
// blockHash);
// }
public WarpSyncResponse makeWarpSyncRequest(String blockHash) {
return this.warpSyncService.getProtocol().warpSyncRequest(
blockHash);
}
//
// public LightClientMessage.Response makeRemoteReadRequest(String blockHash, String[] keys) {
// if (isPeerInvalid()) return null;
Expand Down
16 changes: 4 additions & 12 deletions src/main/java/com/limechain/network/StrictProtocolBinding.java
Original file line number Diff line number Diff line change
@@ -1,18 +1,10 @@
package com.limechain.network;

public abstract class StrictProtocolBinding<T> {
public abstract class StrictProtocolBinding {
String protocolId;

protected StrictProtocolBinding(String protocolId/*, T protocol*/) {
this.protocolId = protocolId;
}

// public T dialPeer(Host us, PeerId peer, AddressBook addrs) {
// Multiaddr[] addr = addrs.get(peer)
// .join().stream()
// .filter(address -> !address.toString().contains("/ws") && !address.toString().contains("/wss"))
// .toList()
// .toArray(new Multiaddr[0]);
// if (addr.length == 0)
// throw new IllegalStateException("No addresses known for peer " + peer);
//
// return dial(us, peer, addr).getController().join();
// }
}
70 changes: 20 additions & 50 deletions src/main/java/com/limechain/network/kad/KademliaService.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@

import com.limechain.network.kad.dto.Host;
import com.limechain.network.kad.dto.PeerId;
import com.limechain.network.protocol.NetworkService;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.java.Log;
import org.teavm.interop.Async;
import org.teavm.interop.AsyncCallback;
import org.teavm.jso.JSBody;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.ExecutionException;
Expand All @@ -19,31 +21,13 @@
*/
@Getter
@Log
public class KademliaService /*extends NetworkService<Kademlia>*/ {
public class KademliaService extends NetworkService {
public static final int REPLICATION = 20;
private static final Random RANDOM = new Random();

@Setter
private Host host;
// private List<PeerId> bootNodePeerIds;
private int successfulBootNodes;


public void addReservedPeer(String multiaddr) throws ExecutionException, InterruptedException {
// final Multiaddr addrWithPeer = Multiaddr.fromString(multiaddr);
//
// CompletableFuture<Stream> peerStream =
// protocol.dial(host, addrWithPeer.getPeerId(), addrWithPeer).getStream();
//
// Stream stream = peerStream.get();
// if (stream == null) {
// log.log(Level.WARNING, "Failed to connect to reserved peer");
// } else {
// ConnectionManager.getInstance().addNewPeer(addrWithPeer.getPeerId());
// log.log(Level.INFO, "Successfully connected to reserved peer");
// }
}

/**
* Connects to boot nodes to the Kademlia dht
*
Expand Down Expand Up @@ -76,49 +60,35 @@ public int connectBootNodes(String[] bootNodes) {
return successfulBootNodes;
}

public void updateSuccessfulBootNodes() {
successfulBootNodes = getPeerStoreSize();
}

@JSBody(params = {"bootNodes"}, script = "start(bootNodes)")
@Async
public static native void startNetwork(String[] bootNodes);

@JSBody(script = "return getPeerId()")
public static native Object getPeerId();

@JSBody(script = "return libp.peerId.privateKey")
public static native byte[] getPeerPrivateKey();

@JSBody(script = "return libp.peerId.publicKey")
public static native byte[] getPeerPublicKey();
@JSBody(script = "return libp.peerStore.store.datastore.data.size")
public static native int getPeerStoreSize();

@JSBody(script = "return libp.getConnections().length")
public static native int getPeerStoreSize();

/**
* Populates Kademlia dht with peers closest in distance to a random id then makes connections with our node
*/
public void findNewPeers() {
// protocol.findClosestPeers(randomPeerId(), REPLICATION, host);
// final var peers =
// protocol.findClosestPeers(Multihash.deserialize(host.getPeerId().getBytes()), REPLICATION, host);
//
// peers.stream().parallel().forEach(p -> {
// boolean isConnected = protocol.connectTo(host, p);
//
// if (!isConnected) {
// protocol.connectTo(host, p);
// }
// });
}
//
// private Multihash randomPeerId() {
// byte[] hash = new byte[32];
// RANDOM.nextBytes(hash);
// return new Multihash(Multihash.Type.sha2_256, hash);
// }
//
// private void setBootNodePeerIds(String[] bootNodes) {
// ArrayList<PeerId> ids = new ArrayList<>();
// for (String bootNode : bootNodes) {
// String peerId = bootNode.substring(bootNode.lastIndexOf('/') + 1, bootNode.length());
// ids.add(PeerId.fromBase58(peerId));
// }
// this.bootNodePeerIds = ids;
// }
@JSBody(script = "libp.peerStore.forEach( async (p) => {" +
" for await (const foundPeer of dht.peerRouting.getClosestPeers(p.id.toBytes())){" +
" if(foundPeer.peer?.multiaddrs?.length > 0){" +
" try{libp.dial(foundPeer.peer)}finally{}" +
" }" +
" }" +
"});")
public static native void findNewPeers();

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import lombok.extern.java.Log;

@Log
public class BlockAnnounce extends StrictProtocolBinding<BlockAnnounceController> {
public class BlockAnnounce extends StrictProtocolBinding {
public BlockAnnounce(String protocolId, BlockAnnounceProtocol protocol) {
super(protocolId/*, protocol*/);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
/**
* GRANDPA protocol binding
*/
public class Grandpa extends StrictProtocolBinding<GrandpaController> {
public class Grandpa extends StrictProtocolBinding {
public Grandpa(String protocolId, GrandpaProtocol protocol) {
super(protocolId/*, protocol*/);
}
Expand Down
32 changes: 15 additions & 17 deletions src/main/java/com/limechain/network/protocol/warp/WarpSync.java
Original file line number Diff line number Diff line change
@@ -1,27 +1,25 @@
package com.limechain.network.protocol.warp;

import com.limechain.network.StrictProtocolBinding;
import com.limechain.network.protocol.warp.dto.WarpSyncResponse;
import lombok.extern.java.Log;

import java.util.logging.Level;

@Log
public class WarpSync extends StrictProtocolBinding<WarpSyncController> {
public class WarpSync extends StrictProtocolBinding {

private final String protocolId;

public WarpSync(String protocolId, WarpSyncProtocol protocol) {
super(protocolId/*, protocol*/);
public WarpSync(String protocolId) {
super(protocolId);
this.protocolId = protocolId;
}

// public WarpSyncResponse warpSyncRequest(Host us, PeerId peer, String blockHash) {
// try {
// WarpSyncController controller = dialPeer(us, peer, us.getAddressBook());
// WarpSyncResponse resp = controller.warpSyncRequest(blockHash).get(10, TimeUnit.SECONDS);
// log.log(Level.INFO, "Received warp sync response with " + resp.getFragments().length + " fragments");
// return resp;
// } catch (ExecutionException | TimeoutException | IllegalStateException e) {
// log.log(Level.SEVERE, "Error while sending remote call request: ", e);
// throw new ExecutionFailedException(e);
// } catch (InterruptedException e) {
// Thread.currentThread().interrupt();
// throw new ThreadInterruptedException(e);
// }
// }
public WarpSyncResponse warpSyncRequest(String blockHash) {
WarpSyncProtocol.Sender sender = new WarpSyncProtocol.Sender();
WarpSyncResponse resp = sender.warpSyncRequest(blockHash, protocolId);
log.log(Level.INFO, "Received warp sync response with " + resp.getFragments().length + " fragments");
return resp;
}
}
Loading
Loading