Skip to content

Commit

Permalink
Merge branch '495-setup-kademlia' into 494-sync-state-rpc-calls
Browse files Browse the repository at this point in the history
# Conflicts:
#	src/main/java/com/limechain/teavm/HttpRequest.java
  • Loading branch information
Zurcusa committed Aug 28, 2024
2 parents d566d92 + 5918e0f commit a201b48
Show file tree
Hide file tree
Showing 33 changed files with 975 additions and 449 deletions.
1 change: 0 additions & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ teavm.js {
targetFileName = "fruzhin.js"
}


//TODO: Debug only. Remove when doing release build
teavm {
js {
Expand Down
10 changes: 8 additions & 2 deletions src/main/java/com/limechain/chain/lightsyncstate/Authority.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
package com.limechain.chain.lightsyncstate;

import com.limechain.teavm.annotation.Reflectable;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

import java.io.Serializable;
import java.math.BigInteger;

@Getter
@Setter
@Reflectable
@AllArgsConstructor
@NoArgsConstructor
public class Authority implements Serializable {
private final byte[] publicKey;
private final BigInteger weight;
private byte[] publicKey;
private BigInteger weight;
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
import lombok.Getter;
import lombok.Setter;
import com.limechain.tuple.Pair;
import lombok.ToString;

import java.math.BigInteger;

@Getter
@Setter
@ToString
public class AuthoritySet {
private Authority[] currentAuthorities;
private BigInteger setId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
import lombok.Getter;
import lombok.Setter;
import com.limechain.tuple.Pair;
import lombok.ToString;

import java.math.BigInteger;
import java.util.Map;

@Getter
@Setter
@ToString
public class EpochChanges {
private ForkTree<PersistedEpochHeader> inner;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,16 @@
import java.util.Map;

@Getter
@ToString
public class LightSyncState {
private BlockHeader finalizedBlockHeader;
private EpochChanges epochChanges;
private AuthoritySet grandpaAuthoritySet;

public static LightSyncState decode(Map<String, String> lightSyncState) {
String header = lightSyncState.get("finalizedBlockHeader");
String epochChanges = lightSyncState.get("babeEpochChanges");
String grandpaAuthoritySet = lightSyncState.get("grandpaAuthoritySet");
public static LightSyncState decode(Map<String, String> lightSyncStateMap) {
String header = lightSyncStateMap.get("finalizedBlockHeader");
String epochChanges = lightSyncStateMap.get("babeEpochChanges");
String grandpaAuthoritySet = lightSyncStateMap.get("grandpaAuthoritySet");

if (header == null) {
throw new IllegalStateException("finalizedBlockHeader is null");
Expand All @@ -34,18 +35,18 @@ public static LightSyncState decode(Map<String, String> lightSyncState) {
}


var state = new LightSyncState();
LightSyncState lightSyncState = new LightSyncState();
byte[] bytes = StringUtils.hexToBytes(header);
state.finalizedBlockHeader = new BlockHeaderReader()
lightSyncState.finalizedBlockHeader = new BlockHeaderReader()
.read(new ScaleCodecReader(bytes));

byte[] bytes1 = StringUtils.hexToBytes(epochChanges);
state.epochChanges = new EpochChangesReader()
lightSyncState.epochChanges = new EpochChangesReader()
.read(new ScaleCodecReader(bytes1));

state.grandpaAuthoritySet = new AuthoritySetReader()
lightSyncState.grandpaAuthoritySet = new AuthoritySetReader()
.read(new ScaleCodecReader(StringUtils.hexToBytes(grandpaAuthoritySet)));

return state;
return lightSyncState;
}
}
4 changes: 1 addition & 3 deletions src/main/java/com/limechain/client/LightClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,13 @@ public LightClient() {
@SneakyThrows
public void start() {
this.network.start();

while (true) {
this.network.updateCurrentSelectedPeer();
if (this.network.getKademliaService().getSuccessfulBootNodes() > 0) {
log.log(Level.INFO, "Node successfully connected to a peer! Sync can start!");
this.warpSyncMachine = AppBean.getBean(WarpSyncMachine.class);
this.warpSyncMachine.start();
break;
} else {
this.network.updateCurrentSelectedPeer();
}
log.log(Level.INFO, "Waiting for peer connection...");
Thread.sleep(10000);
Expand Down
26 changes: 14 additions & 12 deletions src/main/java/com/limechain/network/Network.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import com.limechain.chain.ChainService;
import com.limechain.config.HostConfig;
import com.limechain.network.kad.KademliaService;
import com.limechain.network.protocol.warp.WarpSyncService;
import com.limechain.network.protocol.warp.dto.WarpSyncResponse;
import com.limechain.rpc.server.AppBean;
import com.limechain.sync.warpsync.WarpSyncState;
import lombok.Getter;
Expand Down Expand Up @@ -50,15 +52,18 @@ private void initializeProtocols(ChainService chainService,
HostConfig hostConfig) {

//
// String chainId = chainService.getChainSpec().getProtocolId();
// String warpProtocolId = ProtocolUtils.getWarpSyncProtocol(chainId);
String chainId = chainService.getChainSpec().getProtocolId();
String warpProtocolId = ProtocolUtils.getWarpSyncProtocol(chainId);
// String lightProtocolId = ProtocolUtils.getLightMessageProtocol(chainId);
// String blockAnnounceProtocolId = ProtocolUtils.getBlockAnnounceProtocol(chainId);
// String grandpaProtocolId = ProtocolUtils.getGrandpaProtocol(chainId);

kademliaService = new KademliaService();
warpSyncService = new WarpSyncService(warpProtocolId);
}

WarpSyncService warpSyncService;

// private Ed25519PrivateKey loadPrivateKeyFromDB(KVRepository<String, Object> repository) {
// Ed25519PrivateKey privateKey;
//
Expand Down Expand Up @@ -110,6 +115,7 @@ public void updateCurrentSelectedPeer() {
// if (connectionManager.getPeerIds().isEmpty()) return;
// this.currentSelectedPeer = connectionManager.getPeerIds().stream()
// .skip(RANDOM.nextInt(connectionManager.getPeerIds().size())).findAny().orElse(null);
kademliaService.updateSuccessfulBootNodes();
}

// public String getPeerId() {
Expand Down Expand Up @@ -138,7 +144,7 @@ public void findPeers() {
}
if (getPeersCount() >= REPLICATION) {
log.log(Level.INFO,
"Connections have reached replication factor(" + REPLICATION + "). " +
"Connections have reached replication factor(" + REPLICATION + "). " +
"No need to search for new ones yet.");
return;
}
Expand Down Expand Up @@ -177,7 +183,7 @@ public void pingPeers() {
// }
// }

// public BlockResponse syncBlock(PeerId peerId, BigInteger lastBlockNumber) {
// public BlockResponse syncBlock(PeerId peerId, BigInteger lastBlockNumber) {
// this.currentSelectedPeer = peerId;
// // TODO: fields, hash, direction and maxBlocks values not verified
// // TODO: when debugging could not get a value returned
Expand All @@ -202,14 +208,10 @@ public void pingPeers() {
// );
// }
//
// public WarpSyncResponse makeWarpSyncRequest(String blockHash) {
// if (isPeerInvalid()) return null;
//
// return this.warpSyncService.getProtocol().warpSyncRequest(
// this.host,
// this.currentSelectedPeer,
// blockHash);
// }
public WarpSyncResponse makeWarpSyncRequest(String blockHash) {
return this.warpSyncService.getProtocol().warpSyncRequest(
blockHash);
}
//
// public LightClientMessage.Response makeRemoteReadRequest(String blockHash, String[] keys) {
// if (isPeerInvalid()) return null;
Expand Down
16 changes: 4 additions & 12 deletions src/main/java/com/limechain/network/StrictProtocolBinding.java
Original file line number Diff line number Diff line change
@@ -1,18 +1,10 @@
package com.limechain.network;

public abstract class StrictProtocolBinding<T> {
public abstract class StrictProtocolBinding {
String protocolId;

protected StrictProtocolBinding(String protocolId/*, T protocol*/) {
this.protocolId = protocolId;
}

// public T dialPeer(Host us, PeerId peer, AddressBook addrs) {
// Multiaddr[] addr = addrs.get(peer)
// .join().stream()
// .filter(address -> !address.toString().contains("/ws") && !address.toString().contains("/wss"))
// .toList()
// .toArray(new Multiaddr[0]);
// if (addr.length == 0)
// throw new IllegalStateException("No addresses known for peer " + peer);
//
// return dial(us, peer, addr).getController().join();
// }
}
70 changes: 20 additions & 50 deletions src/main/java/com/limechain/network/kad/KademliaService.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@

import com.limechain.network.kad.dto.Host;
import com.limechain.network.kad.dto.PeerId;
import com.limechain.network.protocol.NetworkService;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.java.Log;
import org.teavm.interop.Async;
import org.teavm.interop.AsyncCallback;
import org.teavm.jso.JSBody;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.ExecutionException;
Expand All @@ -19,31 +21,13 @@
*/
@Getter
@Log
public class KademliaService /*extends NetworkService<Kademlia>*/ {
public class KademliaService extends NetworkService {
public static final int REPLICATION = 20;
private static final Random RANDOM = new Random();

@Setter
private Host host;
// private List<PeerId> bootNodePeerIds;
private int successfulBootNodes;


public void addReservedPeer(String multiaddr) throws ExecutionException, InterruptedException {
// final Multiaddr addrWithPeer = Multiaddr.fromString(multiaddr);
//
// CompletableFuture<Stream> peerStream =
// protocol.dial(host, addrWithPeer.getPeerId(), addrWithPeer).getStream();
//
// Stream stream = peerStream.get();
// if (stream == null) {
// log.log(Level.WARNING, "Failed to connect to reserved peer");
// } else {
// ConnectionManager.getInstance().addNewPeer(addrWithPeer.getPeerId());
// log.log(Level.INFO, "Successfully connected to reserved peer");
// }
}

/**
* Connects to boot nodes to the Kademlia dht
*
Expand Down Expand Up @@ -76,49 +60,35 @@ public int connectBootNodes(String[] bootNodes) {
return successfulBootNodes;
}

public void updateSuccessfulBootNodes() {
successfulBootNodes = getPeerStoreSize();
}

@JSBody(params = {"bootNodes"}, script = "start(bootNodes)")
@Async
public static native void startNetwork(String[] bootNodes);

@JSBody(script = "return getPeerId()")
public static native Object getPeerId();

@JSBody(script = "return libp.peerId.privateKey")
public static native byte[] getPeerPrivateKey();

@JSBody(script = "return libp.peerId.publicKey")
public static native byte[] getPeerPublicKey();
@JSBody(script = "return libp.peerStore.store.datastore.data.size")
public static native int getPeerStoreSize();

@JSBody(script = "return libp.getConnections().length")
public static native int getPeerStoreSize();

/**
* Populates Kademlia dht with peers closest in distance to a random id then makes connections with our node
*/
public void findNewPeers() {
// protocol.findClosestPeers(randomPeerId(), REPLICATION, host);
// final var peers =
// protocol.findClosestPeers(Multihash.deserialize(host.getPeerId().getBytes()), REPLICATION, host);
//
// peers.stream().parallel().forEach(p -> {
// boolean isConnected = protocol.connectTo(host, p);
//
// if (!isConnected) {
// protocol.connectTo(host, p);
// }
// });
}
//
// private Multihash randomPeerId() {
// byte[] hash = new byte[32];
// RANDOM.nextBytes(hash);
// return new Multihash(Multihash.Type.sha2_256, hash);
// }
//
// private void setBootNodePeerIds(String[] bootNodes) {
// ArrayList<PeerId> ids = new ArrayList<>();
// for (String bootNode : bootNodes) {
// String peerId = bootNode.substring(bootNode.lastIndexOf('/') + 1, bootNode.length());
// ids.add(PeerId.fromBase58(peerId));
// }
// this.bootNodePeerIds = ids;
// }
@JSBody(script = "libp.peerStore.forEach( async (p) => {" +
" for await (const foundPeer of dht.peerRouting.getClosestPeers(p.id.toBytes())){" +
" if(foundPeer.peer?.multiaddrs?.length > 0){" +
" try{libp.dial(foundPeer.peer)}finally{}" +
" }" +
" }" +
"});")
public static native void findNewPeers();

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import lombok.extern.java.Log;

@Log
public class BlockAnnounce extends StrictProtocolBinding<BlockAnnounceController> {
public class BlockAnnounce extends StrictProtocolBinding {
public BlockAnnounce(String protocolId, BlockAnnounceProtocol protocol) {
super(protocolId/*, protocol*/);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
/**
* GRANDPA protocol binding
*/
public class Grandpa extends StrictProtocolBinding<GrandpaController> {
public class Grandpa extends StrictProtocolBinding {
public Grandpa(String protocolId, GrandpaProtocol protocol) {
super(protocolId/*, protocol*/);
}
Expand Down
32 changes: 15 additions & 17 deletions src/main/java/com/limechain/network/protocol/warp/WarpSync.java
Original file line number Diff line number Diff line change
@@ -1,27 +1,25 @@
package com.limechain.network.protocol.warp;

import com.limechain.network.StrictProtocolBinding;
import com.limechain.network.protocol.warp.dto.WarpSyncResponse;
import lombok.extern.java.Log;

import java.util.logging.Level;

@Log
public class WarpSync extends StrictProtocolBinding<WarpSyncController> {
public class WarpSync extends StrictProtocolBinding {

private final String protocolId;

public WarpSync(String protocolId, WarpSyncProtocol protocol) {
super(protocolId/*, protocol*/);
public WarpSync(String protocolId) {
super(protocolId);
this.protocolId = protocolId;
}

// public WarpSyncResponse warpSyncRequest(Host us, PeerId peer, String blockHash) {
// try {
// WarpSyncController controller = dialPeer(us, peer, us.getAddressBook());
// WarpSyncResponse resp = controller.warpSyncRequest(blockHash).get(10, TimeUnit.SECONDS);
// log.log(Level.INFO, "Received warp sync response with " + resp.getFragments().length + " fragments");
// return resp;
// } catch (ExecutionException | TimeoutException | IllegalStateException e) {
// log.log(Level.SEVERE, "Error while sending remote call request: ", e);
// throw new ExecutionFailedException(e);
// } catch (InterruptedException e) {
// Thread.currentThread().interrupt();
// throw new ThreadInterruptedException(e);
// }
// }
public WarpSyncResponse warpSyncRequest(String blockHash) {
WarpSyncProtocol.Sender sender = new WarpSyncProtocol.Sender();
WarpSyncResponse resp = sender.warpSyncRequest(blockHash, protocolId);
log.log(Level.INFO, "Received warp sync response with " + resp.getFragments().length + " fragments");
return resp;
}
}
Loading

0 comments on commit a201b48

Please sign in to comment.