Skip to content

Commit

Permalink
chore: current changes
Browse files Browse the repository at this point in the history
  • Loading branch information
ablax committed Aug 22, 2024
1 parent 8663462 commit 7a21b86
Show file tree
Hide file tree
Showing 16 changed files with 269 additions and 163 deletions.
6 changes: 3 additions & 3 deletions src/main/java/com/limechain/client/LightClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@ public LightClient() {
@SneakyThrows
public void start() {
this.network.start();

System.out.println("Network started");
while (true) {
this.network.updateCurrentSelectedPeer();
if (this.network.getKademliaService().getSuccessfulBootNodes() > 0) {
log.log(Level.INFO, "Node successfully connected to a peer! Sync can start!");
this.warpSyncMachine = AppBean.getBean(WarpSyncMachine.class);
System.out.println("Starting warp sync machine...");
this.warpSyncMachine.start();
break;
} else {
this.network.updateCurrentSelectedPeer();
}
log.log(Level.INFO, "Waiting for peer connection...");
Thread.sleep(10000);
Expand Down
26 changes: 15 additions & 11 deletions src/main/java/com/limechain/network/Network.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import com.limechain.chain.ChainService;
import com.limechain.config.HostConfig;
import com.limechain.network.kad.KademliaService;
import com.limechain.network.protocol.warp.WarpSyncService;
import com.limechain.network.protocol.warp.dto.WarpSyncResponse;
import com.limechain.rpc.server.AppBean;
import com.limechain.sync.warpsync.WarpSyncState;
import lombok.Getter;
Expand Down Expand Up @@ -50,15 +52,18 @@ private void initializeProtocols(ChainService chainService,
HostConfig hostConfig) {

//
// String chainId = chainService.getChainSpec().getProtocolId();
// String warpProtocolId = ProtocolUtils.getWarpSyncProtocol(chainId);
String chainId = chainService.getChainSpec().getProtocolId();
String warpProtocolId = ProtocolUtils.getWarpSyncProtocol(chainId);
// String lightProtocolId = ProtocolUtils.getLightMessageProtocol(chainId);
// String blockAnnounceProtocolId = ProtocolUtils.getBlockAnnounceProtocol(chainId);
// String grandpaProtocolId = ProtocolUtils.getGrandpaProtocol(chainId);

kademliaService = new KademliaService();
warpSyncService = new WarpSyncService(warpProtocolId);
}

WarpSyncService warpSyncService;

// private Ed25519PrivateKey loadPrivateKeyFromDB(KVRepository<String, Object> repository) {
// Ed25519PrivateKey privateKey;
//
Expand Down Expand Up @@ -110,6 +115,7 @@ public void updateCurrentSelectedPeer() {
// if (connectionManager.getPeerIds().isEmpty()) return;
// this.currentSelectedPeer = connectionManager.getPeerIds().stream()
// .skip(RANDOM.nextInt(connectionManager.getPeerIds().size())).findAny().orElse(null);
kademliaService.updateSuccessfulBootNodes();
}

// public String getPeerId() {
Expand Down Expand Up @@ -138,7 +144,7 @@ public void findPeers() {
}
if (getPeersCount() >= REPLICATION) {
log.log(Level.INFO,
"Connections have reached replication factor(" + REPLICATION + "). " +
"Connections have reached replication factor(" + REPLICATION + "). " +
"No need to search for new ones yet.");
return;
}
Expand Down Expand Up @@ -177,7 +183,7 @@ public void pingPeers() {
// }
// }

// public BlockResponse syncBlock(PeerId peerId, BigInteger lastBlockNumber) {
// public BlockResponse syncBlock(PeerId peerId, BigInteger lastBlockNumber) {
// this.currentSelectedPeer = peerId;
// // TODO: fields, hash, direction and maxBlocks values not verified
// // TODO: when debugging could not get a value returned
Expand All @@ -202,14 +208,12 @@ public void pingPeers() {
// );
// }
//
// public WarpSyncResponse makeWarpSyncRequest(String blockHash) {
public WarpSyncResponse makeWarpSyncRequest(String blockHash) {
// if (isPeerInvalid()) return null;
//
// return this.warpSyncService.getProtocol().warpSyncRequest(
// this.host,
// this.currentSelectedPeer,
// blockHash);
// }

return this.warpSyncService.getProtocol().warpSyncRequest(
blockHash);
}
//
// public LightClientMessage.Response makeRemoteReadRequest(String blockHash, String[] keys) {
// if (isPeerInvalid()) return null;
Expand Down
58 changes: 46 additions & 12 deletions src/main/java/com/limechain/network/StrictProtocolBinding.java
Original file line number Diff line number Diff line change
@@ -1,18 +1,52 @@
package com.limechain.network;

public abstract class StrictProtocolBinding<T> {
import com.limechain.network.wrapper.Stream;
import org.teavm.jso.JSBody;
import org.teavm.jso.core.JSPromise;

import java.util.concurrent.atomic.AtomicReference;

public abstract class StrictProtocolBinding {
String protocolId;

protected StrictProtocolBinding(String protocolId/*, T protocol*/) {
this.protocolId = protocolId;
}

public Stream dialPeer(/*PeerId peer*/) {
Object peer1 = getPeer();
System.out.println("Peer: " + peer1);
JSPromise<Object> dial = dial(peer1, protocolId);
final var lock = new Object();
AtomicReference<Stream> stream = new AtomicReference<>();

dial.then((result) -> {
stream.set((Stream) result);
synchronized (lock) {
lock.notify();
}
return null;
});

synchronized (lock) {
try {
lock.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
Stream atomicStream = stream.get();

System.out.println("Stream: " + toJson(atomicStream));
return atomicStream;
}

// public T dialPeer(Host us, PeerId peer, AddressBook addrs) {
// Multiaddr[] addr = addrs.get(peer)
// .join().stream()
// .filter(address -> !address.toString().contains("/ws") && !address.toString().contains("/wss"))
// .toList()
// .toArray(new Multiaddr[0]);
// if (addr.length == 0)
// throw new IllegalStateException("No addresses known for peer " + peer);
//
// return dial(us, peer, addr).getController().join();
// }
@JSBody(params = {"object"}, script = "return JSON.stringify(object);")
private static native String toJson(Object object);

@JSBody(params = {"peerId", "protocolId"}, script = "return (async () => ItPbStream.pbStream(await libp.dialProtocol(peerId, protocolId)))()")
private static native JSPromise<Object> dial(Object peerId, String protocolId);

@JSBody(script = "return libp.getConnections()[0].remotePeer;")
private static native Object getPeer();
}
70 changes: 20 additions & 50 deletions src/main/java/com/limechain/network/kad/KademliaService.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@

import com.limechain.network.kad.dto.Host;
import com.limechain.network.kad.dto.PeerId;
import com.limechain.network.protocol.NetworkService;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.java.Log;
import org.teavm.interop.Async;
import org.teavm.interop.AsyncCallback;
import org.teavm.jso.JSBody;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.ExecutionException;
Expand All @@ -19,31 +21,13 @@
*/
@Getter
@Log
public class KademliaService /*extends NetworkService<Kademlia>*/ {
public class KademliaService extends NetworkService {
public static final int REPLICATION = 20;
private static final Random RANDOM = new Random();

@Setter
private Host host;
// private List<PeerId> bootNodePeerIds;
private int successfulBootNodes;


public void addReservedPeer(String multiaddr) throws ExecutionException, InterruptedException {
// final Multiaddr addrWithPeer = Multiaddr.fromString(multiaddr);
//
// CompletableFuture<Stream> peerStream =
// protocol.dial(host, addrWithPeer.getPeerId(), addrWithPeer).getStream();
//
// Stream stream = peerStream.get();
// if (stream == null) {
// log.log(Level.WARNING, "Failed to connect to reserved peer");
// } else {
// ConnectionManager.getInstance().addNewPeer(addrWithPeer.getPeerId());
// log.log(Level.INFO, "Successfully connected to reserved peer");
// }
}

/**
* Connects to boot nodes to the Kademlia dht
*
Expand Down Expand Up @@ -76,49 +60,35 @@ public int connectBootNodes(String[] bootNodes) {
return successfulBootNodes;
}

public void updateSuccessfulBootNodes() {
successfulBootNodes = getPeerStoreSize();
}

@JSBody(params = {"bootNodes"}, script = "start(bootNodes)")
@Async
public static native void startNetwork(String[] bootNodes);

@JSBody(script = "return getPeerId()")
public static native Object getPeerId();

@JSBody(script = "return libp.peerId.privateKey")
public static native byte[] getPeerPrivateKey();

@JSBody(script = "return libp.peerId.publicKey")
public static native byte[] getPeerPublicKey();
@JSBody(script = "return libp.peerStore.store.datastore.data.size")
public static native int getPeerStoreSize();

@JSBody(script = "return libp.getConnections().length")
public static native int getPeerStoreSize();

/**
* Populates Kademlia dht with peers closest in distance to a random id then makes connections with our node
*/
public void findNewPeers() {
// protocol.findClosestPeers(randomPeerId(), REPLICATION, host);
// final var peers =
// protocol.findClosestPeers(Multihash.deserialize(host.getPeerId().getBytes()), REPLICATION, host);
//
// peers.stream().parallel().forEach(p -> {
// boolean isConnected = protocol.connectTo(host, p);
//
// if (!isConnected) {
// protocol.connectTo(host, p);
// }
// });
}
//
// private Multihash randomPeerId() {
// byte[] hash = new byte[32];
// RANDOM.nextBytes(hash);
// return new Multihash(Multihash.Type.sha2_256, hash);
// }
//
// private void setBootNodePeerIds(String[] bootNodes) {
// ArrayList<PeerId> ids = new ArrayList<>();
// for (String bootNode : bootNodes) {
// String peerId = bootNode.substring(bootNode.lastIndexOf('/') + 1, bootNode.length());
// ids.add(PeerId.fromBase58(peerId));
// }
// this.bootNodePeerIds = ids;
// }
@JSBody(script = "libp.peerStore.forEach( async (p) => {" +
" for await (const foundPeer of dht.peerRouting.getClosestPeers(p.id.toBytes())){" +
" if(foundPeer.peer?.multiaddrs?.length > 0){" +
" try{libp.dial(foundPeer.peer)}finally{}" +
" }" +
" }" +
"});")
public static native void findNewPeers();

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import lombok.extern.java.Log;

@Log
public class BlockAnnounce extends StrictProtocolBinding<BlockAnnounceController> {
public class BlockAnnounce extends StrictProtocolBinding {
public BlockAnnounce(String protocolId, BlockAnnounceProtocol protocol) {
super(protocolId/*, protocol*/);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
/**
* GRANDPA protocol binding
*/
public class Grandpa extends StrictProtocolBinding<GrandpaController> {
public class Grandpa extends StrictProtocolBinding {
public Grandpa(String protocolId, GrandpaProtocol protocol) {
super(protocolId/*, protocol*/);
}
Expand Down
31 changes: 24 additions & 7 deletions src/main/java/com/limechain/network/protocol/warp/WarpSync.java
Original file line number Diff line number Diff line change
@@ -1,27 +1,44 @@
package com.limechain.network.protocol.warp;

import com.limechain.exception.global.ExecutionFailedException;
import com.limechain.exception.global.ThreadInterruptedException;
import com.limechain.network.StrictProtocolBinding;
import com.limechain.network.kad.dto.Host;
import com.limechain.network.kad.dto.PeerId;
import com.limechain.network.protocol.warp.dto.WarpSyncRequest;
import com.limechain.network.protocol.warp.dto.WarpSyncResponse;
import com.limechain.network.wrapper.Stream;
import lombok.extern.java.Log;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Level;

@Log
public class WarpSync extends StrictProtocolBinding<WarpSyncController> {
public class WarpSync extends StrictProtocolBinding {

private String protocolId;

public WarpSync(String protocolId, WarpSyncProtocol protocol) {
super(protocolId/*, protocol*/);
this.protocolId = protocolId;
}

// public WarpSyncResponse warpSyncRequest(Host us, PeerId peer, String blockHash) {
public WarpSyncResponse warpSyncRequest(/*PeerId peer,*/ String blockHash) {
// try {
// WarpSyncController controller = dialPeer(us, peer, us.getAddressBook());
// WarpSyncResponse resp = controller.warpSyncRequest(blockHash).get(10, TimeUnit.SECONDS);
// log.log(Level.INFO, "Received warp sync response with " + resp.getFragments().length + " fragments");
// return resp;
// Stream stream = dialPeer(/*peer*/);
WarpSyncProtocol.Sender sender = new WarpSyncProtocol.Sender();
System.out.println("Block hash: " + blockHash);
WarpSyncResponse resp = sender.warpSyncRequest(blockHash, protocolId);
log.log(Level.INFO, "Received warp sync response with " + resp/*.getFragments().length*/ + " fragments");
return resp;
// } catch (ExecutionException | TimeoutException | IllegalStateException e) {
// log.log(Level.SEVERE, "Error while sending remote call request: ", e);
// throw new ExecutionFailedException(e);
// } catch (InterruptedException e) {
// Thread.currentThread().interrupt();
// throw new ThreadInterruptedException(e);
// }
// }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,12 @@
import com.limechain.network.protocol.warp.dto.WarpSyncRequest;
import com.limechain.network.protocol.warp.dto.WarpSyncResponse;

import java.util.concurrent.CompletableFuture;

public interface WarpSyncController {
CompletableFuture<WarpSyncResponse> send(WarpSyncRequest req);
WarpSyncResponse send(WarpSyncRequest req, String protocolId);

default CompletableFuture<WarpSyncResponse> warpSyncRequest(String blockHash) {
default WarpSyncResponse warpSyncRequest(String blockHash, String protocolId) {
var request = new WarpSyncRequest(blockHash);

return send(request);
return send(request, protocolId);
}
}
Loading

0 comments on commit 7a21b86

Please sign in to comment.