From 7a21b86018e48a9ce967bd2053de4a2f5e8a9dfb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9Cur=D0=B0d=20H=D0=B0mz=D0=B0?= Date: Thu, 22 Aug 2024 09:48:26 +0300 Subject: [PATCH] chore: current changes --- .../com/limechain/client/LightClient.java | 6 +- .../java/com/limechain/network/Network.java | 26 ++--- .../network/StrictProtocolBinding.java | 58 +++++++++--- .../network/kad/KademliaService.java | 70 ++++---------- .../protocol/blockannounce/BlockAnnounce.java | 2 +- .../network/protocol/grandpa/Grandpa.java | 2 +- .../network/protocol/warp/WarpSync.java | 31 ++++-- .../protocol/warp/WarpSyncController.java | 8 +- .../protocol/warp/WarpSyncProtocol.java | 94 ++++++++++++++----- .../warp/scale/reader/BlockHeaderReader.java | 6 ++ .../com/limechain/network/wrapper/Stream.java | 29 ++++++ .../polkaj/reader/ScaleCodecReader.java | 2 +- .../sync/warpsync/WarpSyncMachine.java | 7 +- .../action/RequestFragmentsAction.java | 48 +++++----- .../java/com/limechain/utils/StringUtils.java | 35 +++---- src/main/webapp/index.html | 8 +- 16 files changed, 269 insertions(+), 163 deletions(-) create mode 100644 src/main/java/com/limechain/network/wrapper/Stream.java diff --git a/src/main/java/com/limechain/client/LightClient.java b/src/main/java/com/limechain/client/LightClient.java index 0e5bcc969..bdcf1e934 100644 --- a/src/main/java/com/limechain/client/LightClient.java +++ b/src/main/java/com/limechain/client/LightClient.java @@ -33,15 +33,15 @@ public LightClient() { @SneakyThrows public void start() { this.network.start(); - + System.out.println("Network started"); while (true) { + this.network.updateCurrentSelectedPeer(); if (this.network.getKademliaService().getSuccessfulBootNodes() > 0) { log.log(Level.INFO, "Node successfully connected to a peer! Sync can start!"); this.warpSyncMachine = AppBean.getBean(WarpSyncMachine.class); + System.out.println("Starting warp sync machine..."); this.warpSyncMachine.start(); break; - } else { - this.network.updateCurrentSelectedPeer(); } log.log(Level.INFO, "Waiting for peer connection..."); Thread.sleep(10000); diff --git a/src/main/java/com/limechain/network/Network.java b/src/main/java/com/limechain/network/Network.java index 8abe59c6a..2c78cf58d 100644 --- a/src/main/java/com/limechain/network/Network.java +++ b/src/main/java/com/limechain/network/Network.java @@ -4,6 +4,8 @@ import com.limechain.chain.ChainService; import com.limechain.config.HostConfig; import com.limechain.network.kad.KademliaService; +import com.limechain.network.protocol.warp.WarpSyncService; +import com.limechain.network.protocol.warp.dto.WarpSyncResponse; import com.limechain.rpc.server.AppBean; import com.limechain.sync.warpsync.WarpSyncState; import lombok.Getter; @@ -50,15 +52,18 @@ private void initializeProtocols(ChainService chainService, HostConfig hostConfig) { // -// String chainId = chainService.getChainSpec().getProtocolId(); -// String warpProtocolId = ProtocolUtils.getWarpSyncProtocol(chainId); + String chainId = chainService.getChainSpec().getProtocolId(); + String warpProtocolId = ProtocolUtils.getWarpSyncProtocol(chainId); // String lightProtocolId = ProtocolUtils.getLightMessageProtocol(chainId); // String blockAnnounceProtocolId = ProtocolUtils.getBlockAnnounceProtocol(chainId); // String grandpaProtocolId = ProtocolUtils.getGrandpaProtocol(chainId); kademliaService = new KademliaService(); + warpSyncService = new WarpSyncService(warpProtocolId); } + WarpSyncService warpSyncService; + // private Ed25519PrivateKey loadPrivateKeyFromDB(KVRepository repository) { // Ed25519PrivateKey privateKey; // @@ -110,6 +115,7 @@ public void updateCurrentSelectedPeer() { // if (connectionManager.getPeerIds().isEmpty()) return; // this.currentSelectedPeer = connectionManager.getPeerIds().stream() // .skip(RANDOM.nextInt(connectionManager.getPeerIds().size())).findAny().orElse(null); + kademliaService.updateSuccessfulBootNodes(); } // public String getPeerId() { @@ -138,7 +144,7 @@ public void findPeers() { } if (getPeersCount() >= REPLICATION) { log.log(Level.INFO, - "Connections have reached replication factor(" + REPLICATION + "). " + + "Connections have reached replication factor(" + REPLICATION + "). " + "No need to search for new ones yet."); return; } @@ -177,7 +183,7 @@ public void pingPeers() { // } // } -// public BlockResponse syncBlock(PeerId peerId, BigInteger lastBlockNumber) { + // public BlockResponse syncBlock(PeerId peerId, BigInteger lastBlockNumber) { // this.currentSelectedPeer = peerId; // // TODO: fields, hash, direction and maxBlocks values not verified // // TODO: when debugging could not get a value returned @@ -202,14 +208,12 @@ public void pingPeers() { // ); // } // -// public WarpSyncResponse makeWarpSyncRequest(String blockHash) { + public WarpSyncResponse makeWarpSyncRequest(String blockHash) { // if (isPeerInvalid()) return null; -// -// return this.warpSyncService.getProtocol().warpSyncRequest( -// this.host, -// this.currentSelectedPeer, -// blockHash); -// } + + return this.warpSyncService.getProtocol().warpSyncRequest( + blockHash); + } // // public LightClientMessage.Response makeRemoteReadRequest(String blockHash, String[] keys) { // if (isPeerInvalid()) return null; diff --git a/src/main/java/com/limechain/network/StrictProtocolBinding.java b/src/main/java/com/limechain/network/StrictProtocolBinding.java index 6c7d8b075..a33898dc9 100644 --- a/src/main/java/com/limechain/network/StrictProtocolBinding.java +++ b/src/main/java/com/limechain/network/StrictProtocolBinding.java @@ -1,18 +1,52 @@ package com.limechain.network; -public abstract class StrictProtocolBinding { +import com.limechain.network.wrapper.Stream; +import org.teavm.jso.JSBody; +import org.teavm.jso.core.JSPromise; + +import java.util.concurrent.atomic.AtomicReference; + +public abstract class StrictProtocolBinding { + String protocolId; + protected StrictProtocolBinding(String protocolId/*, T protocol*/) { + this.protocolId = protocolId; + } + + public Stream dialPeer(/*PeerId peer*/) { + Object peer1 = getPeer(); + System.out.println("Peer: " + peer1); + JSPromise dial = dial(peer1, protocolId); + final var lock = new Object(); + AtomicReference stream = new AtomicReference<>(); + + dial.then((result) -> { + stream.set((Stream) result); + synchronized (lock) { + lock.notify(); + } + return null; + }); + + synchronized (lock) { + try { + lock.wait(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + Stream atomicStream = stream.get(); + + System.out.println("Stream: " + toJson(atomicStream)); + return atomicStream; } -// public T dialPeer(Host us, PeerId peer, AddressBook addrs) { -// Multiaddr[] addr = addrs.get(peer) -// .join().stream() -// .filter(address -> !address.toString().contains("/ws") && !address.toString().contains("/wss")) -// .toList() -// .toArray(new Multiaddr[0]); -// if (addr.length == 0) -// throw new IllegalStateException("No addresses known for peer " + peer); -// -// return dial(us, peer, addr).getController().join(); -// } + @JSBody(params = {"object"}, script = "return JSON.stringify(object);") + private static native String toJson(Object object); + + @JSBody(params = {"peerId", "protocolId"}, script = "return (async () => ItPbStream.pbStream(await libp.dialProtocol(peerId, protocolId)))()") + private static native JSPromise dial(Object peerId, String protocolId); + + @JSBody(script = "return libp.getConnections()[0].remotePeer;") + private static native Object getPeer(); } diff --git a/src/main/java/com/limechain/network/kad/KademliaService.java b/src/main/java/com/limechain/network/kad/KademliaService.java index 9ff049188..3e424564b 100644 --- a/src/main/java/com/limechain/network/kad/KademliaService.java +++ b/src/main/java/com/limechain/network/kad/KademliaService.java @@ -2,6 +2,7 @@ import com.limechain.network.kad.dto.Host; import com.limechain.network.kad.dto.PeerId; +import com.limechain.network.protocol.NetworkService; import lombok.Getter; import lombok.Setter; import lombok.extern.java.Log; @@ -9,6 +10,7 @@ import org.teavm.interop.AsyncCallback; import org.teavm.jso.JSBody; +import java.util.ArrayList; import java.util.Arrays; import java.util.Random; import java.util.concurrent.ExecutionException; @@ -19,31 +21,13 @@ */ @Getter @Log -public class KademliaService /*extends NetworkService*/ { +public class KademliaService extends NetworkService { public static final int REPLICATION = 20; - private static final Random RANDOM = new Random(); @Setter private Host host; - // private List bootNodePeerIds; private int successfulBootNodes; - - public void addReservedPeer(String multiaddr) throws ExecutionException, InterruptedException { -// final Multiaddr addrWithPeer = Multiaddr.fromString(multiaddr); -// -// CompletableFuture peerStream = -// protocol.dial(host, addrWithPeer.getPeerId(), addrWithPeer).getStream(); -// -// Stream stream = peerStream.get(); -// if (stream == null) { -// log.log(Level.WARNING, "Failed to connect to reserved peer"); -// } else { -// ConnectionManager.getInstance().addNewPeer(addrWithPeer.getPeerId()); -// log.log(Level.INFO, "Successfully connected to reserved peer"); -// } - } - /** * Connects to boot nodes to the Kademlia dht * @@ -76,49 +60,35 @@ public int connectBootNodes(String[] bootNodes) { return successfulBootNodes; } + public void updateSuccessfulBootNodes() { + successfulBootNodes = getPeerStoreSize(); + } + @JSBody(params = {"bootNodes"}, script = "start(bootNodes)") - @Async public static native void startNetwork(String[] bootNodes); @JSBody(script = "return getPeerId()") public static native Object getPeerId(); + @JSBody(script = "return libp.peerId.privateKey") public static native byte[] getPeerPrivateKey(); + @JSBody(script = "return libp.peerId.publicKey") public static native byte[] getPeerPublicKey(); - @JSBody(script = "return libp.peerStore.store.datastore.data.size") - public static native int getPeerStoreSize(); + @JSBody(script = "return libp.getConnections().length") + public static native int getPeerStoreSize(); /** * Populates Kademlia dht with peers closest in distance to a random id then makes connections with our node */ - public void findNewPeers() { -// protocol.findClosestPeers(randomPeerId(), REPLICATION, host); -// final var peers = -// protocol.findClosestPeers(Multihash.deserialize(host.getPeerId().getBytes()), REPLICATION, host); -// -// peers.stream().parallel().forEach(p -> { -// boolean isConnected = protocol.connectTo(host, p); -// -// if (!isConnected) { -// protocol.connectTo(host, p); -// } -// }); - } -// -// private Multihash randomPeerId() { -// byte[] hash = new byte[32]; -// RANDOM.nextBytes(hash); -// return new Multihash(Multihash.Type.sha2_256, hash); -// } -// -// private void setBootNodePeerIds(String[] bootNodes) { -// ArrayList ids = new ArrayList<>(); -// for (String bootNode : bootNodes) { -// String peerId = bootNode.substring(bootNode.lastIndexOf('/') + 1, bootNode.length()); -// ids.add(PeerId.fromBase58(peerId)); -// } -// this.bootNodePeerIds = ids; -// } + @JSBody(script = "libp.peerStore.forEach( async (p) => {" + + " for await (const foundPeer of dht.peerRouting.getClosestPeers(p.id.toBytes())){" + + " if(foundPeer.peer?.multiaddrs?.length > 0){" + + " try{libp.dial(foundPeer.peer)}finally{}" + + " }" + + " }" + + "});") + public static native void findNewPeers(); + } \ No newline at end of file diff --git a/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounce.java b/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounce.java index 7d5fd5a95..a9f26513c 100644 --- a/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounce.java +++ b/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounce.java @@ -4,7 +4,7 @@ import lombok.extern.java.Log; @Log -public class BlockAnnounce extends StrictProtocolBinding { +public class BlockAnnounce extends StrictProtocolBinding { public BlockAnnounce(String protocolId, BlockAnnounceProtocol protocol) { super(protocolId/*, protocol*/); } diff --git a/src/main/java/com/limechain/network/protocol/grandpa/Grandpa.java b/src/main/java/com/limechain/network/protocol/grandpa/Grandpa.java index 24e62eeb5..1bc1fee4d 100644 --- a/src/main/java/com/limechain/network/protocol/grandpa/Grandpa.java +++ b/src/main/java/com/limechain/network/protocol/grandpa/Grandpa.java @@ -5,7 +5,7 @@ /** * GRANDPA protocol binding */ -public class Grandpa extends StrictProtocolBinding { +public class Grandpa extends StrictProtocolBinding { public Grandpa(String protocolId, GrandpaProtocol protocol) { super(protocolId/*, protocol*/); } diff --git a/src/main/java/com/limechain/network/protocol/warp/WarpSync.java b/src/main/java/com/limechain/network/protocol/warp/WarpSync.java index bf0e07b03..5e6075a56 100644 --- a/src/main/java/com/limechain/network/protocol/warp/WarpSync.java +++ b/src/main/java/com/limechain/network/protocol/warp/WarpSync.java @@ -1,21 +1,38 @@ package com.limechain.network.protocol.warp; +import com.limechain.exception.global.ExecutionFailedException; +import com.limechain.exception.global.ThreadInterruptedException; import com.limechain.network.StrictProtocolBinding; +import com.limechain.network.kad.dto.Host; +import com.limechain.network.kad.dto.PeerId; +import com.limechain.network.protocol.warp.dto.WarpSyncRequest; +import com.limechain.network.protocol.warp.dto.WarpSyncResponse; +import com.limechain.network.wrapper.Stream; import lombok.extern.java.Log; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.logging.Level; + @Log -public class WarpSync extends StrictProtocolBinding { +public class WarpSync extends StrictProtocolBinding { + + private String protocolId; public WarpSync(String protocolId, WarpSyncProtocol protocol) { super(protocolId/*, protocol*/); + this.protocolId = protocolId; } -// public WarpSyncResponse warpSyncRequest(Host us, PeerId peer, String blockHash) { + public WarpSyncResponse warpSyncRequest(/*PeerId peer,*/ String blockHash) { // try { -// WarpSyncController controller = dialPeer(us, peer, us.getAddressBook()); -// WarpSyncResponse resp = controller.warpSyncRequest(blockHash).get(10, TimeUnit.SECONDS); -// log.log(Level.INFO, "Received warp sync response with " + resp.getFragments().length + " fragments"); -// return resp; +// Stream stream = dialPeer(/*peer*/); + WarpSyncProtocol.Sender sender = new WarpSyncProtocol.Sender(); + System.out.println("Block hash: " + blockHash); + WarpSyncResponse resp = sender.warpSyncRequest(blockHash, protocolId); + log.log(Level.INFO, "Received warp sync response with " + resp/*.getFragments().length*/ + " fragments"); + return resp; // } catch (ExecutionException | TimeoutException | IllegalStateException e) { // log.log(Level.SEVERE, "Error while sending remote call request: ", e); // throw new ExecutionFailedException(e); @@ -23,5 +40,5 @@ public WarpSync(String protocolId, WarpSyncProtocol protocol) { // Thread.currentThread().interrupt(); // throw new ThreadInterruptedException(e); // } -// } + } } diff --git a/src/main/java/com/limechain/network/protocol/warp/WarpSyncController.java b/src/main/java/com/limechain/network/protocol/warp/WarpSyncController.java index 9a2c3c234..f8fb14d42 100644 --- a/src/main/java/com/limechain/network/protocol/warp/WarpSyncController.java +++ b/src/main/java/com/limechain/network/protocol/warp/WarpSyncController.java @@ -3,14 +3,12 @@ import com.limechain.network.protocol.warp.dto.WarpSyncRequest; import com.limechain.network.protocol.warp.dto.WarpSyncResponse; -import java.util.concurrent.CompletableFuture; - public interface WarpSyncController { - CompletableFuture send(WarpSyncRequest req); + WarpSyncResponse send(WarpSyncRequest req, String protocolId); - default CompletableFuture warpSyncRequest(String blockHash) { + default WarpSyncResponse warpSyncRequest(String blockHash, String protocolId) { var request = new WarpSyncRequest(blockHash); - return send(request); + return send(request, protocolId); } } diff --git a/src/main/java/com/limechain/network/protocol/warp/WarpSyncProtocol.java b/src/main/java/com/limechain/network/protocol/warp/WarpSyncProtocol.java index ec288eb6a..dc704f88b 100644 --- a/src/main/java/com/limechain/network/protocol/warp/WarpSyncProtocol.java +++ b/src/main/java/com/limechain/network/protocol/warp/WarpSyncProtocol.java @@ -1,5 +1,19 @@ package com.limechain.network.protocol.warp; +import com.limechain.network.protocol.warp.dto.WarpSyncRequest; +import com.limechain.network.protocol.warp.dto.WarpSyncResponse; +import com.limechain.network.protocol.warp.scale.reader.WarpSyncResponseScaleReader; +import com.limechain.network.wrapper.Stream; +import com.limechain.polkaj.reader.ScaleCodecReader; +import com.limechain.utils.StringUtils; +import org.teavm.jso.JSBody; +import org.teavm.jso.core.JSArray; +import org.teavm.jso.core.JSPromise; + +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicReferenceArray; + public class WarpSyncProtocol /*extends ProtocolHandler*/ { // Sizes taken from smoldot public static final int MAX_REQUEST_SIZE = 32; @@ -21,39 +35,71 @@ protected CompletableFuture onStartInitiator(Stream stream) return CompletableFuture.completedFuture(handler); }*/ - /*static class Sender implements ProtocolMessageHandler, WarpSyncController { + static class Sender implements WarpSyncController { public static final int MAX_QUEUE_SIZE = 50; - private final LinkedBlockingDeque> queue = - new LinkedBlockingDeque<>(MAX_QUEUE_SIZE); - private final Stream stream; +// private final LinkedBlockingDeque> queue = +// new LinkedBlockingDeque<>(MAX_QUEUE_SIZE); - public Sender(Stream stream) { - this.stream = stream; + public Sender() { } - @Override + // @Override public void onMessage(Stream stream, WarpSyncResponse msg) { - Objects.requireNonNull(queue.poll()).complete(msg); - stream.closeWrite(); +// Objects.requireNonNull(queue.poll()).complete(msg); +// stream.closeWrite(); } @Override - public CompletableFuture send(WarpSyncRequest req) { - ByteArrayOutputStream buf = new ByteArrayOutputStream(); - try (ScaleCodecWriter writer = new ScaleCodecWriter(buf)) { - writer.write(new WarpSyncRequestWriter(), req); - } catch (IOException e) { - throw new ScaleEncodingException(e); + public WarpSyncResponse send(WarpSyncRequest req, String protocolId) { + System.out.println("Request: " + req.getBlockHash()); + final var lock = new Object(); + + JSPromise> objectJSPromise = sendRequest(StringUtils.toHex(req.getBlockHash().getBytes()), protocolId); + + objectJSPromise.then((ttt) -> { + System.out.println(ttt); + System.out.println(ttt.get(0)); + System.out.println(ttt.get(ttt.getLength() - 1)); + byte[] bytes = new byte[ttt.getLength()]; + for (int i = 0; i < ttt.getLength(); i++) { + //bytes[i] = (byte) ((int) ttt.get(i)); //fails here + } +// byte[] bytes = StringUtils.fromHex(ttt); + System.out.println("Received response: " + " " + bytes); + System.out.println("Received response len: " + bytes.length); + + synchronized (lock) { +// System.out.println("Received response: " + bytes.length + " " + bytes); + ScaleCodecReader scaleCodecReader = new ScaleCodecReader(bytes); + WarpSyncResponse responseaa = new WarpSyncResponseScaleReader().read(scaleCodecReader); + System.out.println(responseaa); +// response.set(result); + lock.notify(); + } + return null; + }); + + synchronized (lock) { + try { + lock.wait(); +// byte[] bytes = response.get(); +// System.out.println("Received response: " + /*bytes.length +*/ " " + bytes); +// ScaleCodecReader scaleCodecReader = new ScaleCodecReader(bytes); +// WarpSyncResponse responseaa = new WarpSyncResponseScaleReader().read(scaleCodecReader); +// System.out.println(responseaa); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } - CompletableFuture res = new CompletableFuture<>(); - queue.add(res); - stream.writeAndFlush(buf.toByteArray()); - return res; + return null; } - @Override - public void onException(Throwable cause) { - Objects.requireNonNull(queue.poll()).completeExceptionally(cause); - } - }*/ + @JSBody(params = {"blockHash", "protocolId"}, script = "return (async () => {" + + "let peer = libp.getConnections()[0].remotePeer;" + + "let stream = await ItPbStream.pbStream(await libp.dialProtocol(peer, protocolId));" + + "stream.writeLP(new Uint8Array([...blockHash.matchAll(/../g)].map(m => parseInt(m[0], 16))));" + + "return Array.from((await stream.readLP()).subarray());})()") + private static native JSPromise> sendRequest(String blockHash, String protocolId); + + } } diff --git a/src/main/java/com/limechain/network/protocol/warp/scale/reader/BlockHeaderReader.java b/src/main/java/com/limechain/network/protocol/warp/scale/reader/BlockHeaderReader.java index b40df4c4a..0c9974ea8 100644 --- a/src/main/java/com/limechain/network/protocol/warp/scale/reader/BlockHeaderReader.java +++ b/src/main/java/com/limechain/network/protocol/warp/scale/reader/BlockHeaderReader.java @@ -12,12 +12,17 @@ public class BlockHeaderReader implements ScaleReader { @Override public BlockHeader read(ScaleCodecReader reader) { BlockHeader blockHeader = new BlockHeader(); + System.out.println("BlockHeaderReader.read"); blockHeader.setParentHash(new Hash256(reader.readUint256())); + System.out.println("BlockHeaderReader.setParentHash"); // NOTE: Usage of BlockNumberReader is intentionally omitted here, // since we want this to be a compact int, not a var size int blockHeader.setBlockNumber(BigInteger.valueOf(reader.readCompactInt())); + System.out.println("BlockHeaderReader.setBlockNumber"); blockHeader.setStateRoot(new Hash256(reader.readUint256())); + System.out.println("BlockHeaderReader.setStateRoot"); blockHeader.setExtrinsicsRoot(new Hash256(reader.readUint256())); + System.out.println("BlockHeaderReader.setExtrinsicsRoot"); var digestCount = reader.readCompactInt(); HeaderDigest[] digests = new HeaderDigest[digestCount]; @@ -26,6 +31,7 @@ public BlockHeader read(ScaleCodecReader reader) { } blockHeader.setDigest(digests); + System.out.println("BlockHeaderReader.setDigest"); return blockHeader; } diff --git a/src/main/java/com/limechain/network/wrapper/Stream.java b/src/main/java/com/limechain/network/wrapper/Stream.java new file mode 100644 index 000000000..590049edd --- /dev/null +++ b/src/main/java/com/limechain/network/wrapper/Stream.java @@ -0,0 +1,29 @@ +package com.limechain.network.wrapper; + +import org.teavm.jso.JSObject; + +public class Stream implements JSObject { + public void write() { + + } + + public void writeLP(byte[] data) { + + } + + public void writePB() { + + } + + public void read() { + + } + + public Object readLP() { + return null; + } + + public void readPB() { + + } +} diff --git a/src/main/java/com/limechain/polkaj/reader/ScaleCodecReader.java b/src/main/java/com/limechain/polkaj/reader/ScaleCodecReader.java index 7ab6c8d3c..6348a2310 100644 --- a/src/main/java/com/limechain/polkaj/reader/ScaleCodecReader.java +++ b/src/main/java/com/limechain/polkaj/reader/ScaleCodecReader.java @@ -29,7 +29,7 @@ public ScaleCodecReader(byte[] source) { * @return true if has more elements */ public boolean hasNext() { - return pos < source.length; + return true;//pos < source.length; } /** diff --git a/src/main/java/com/limechain/sync/warpsync/WarpSyncMachine.java b/src/main/java/com/limechain/sync/warpsync/WarpSyncMachine.java index 2edeecf22..f7b0a6ed2 100644 --- a/src/main/java/com/limechain/sync/warpsync/WarpSyncMachine.java +++ b/src/main/java/com/limechain/sync/warpsync/WarpSyncMachine.java @@ -54,6 +54,7 @@ public void nextState() { } public void handleState() { + System.out.println("Warp sync action" + warpSyncAction.getClass().getSimpleName()); warpSyncAction.handle(this); } @@ -72,14 +73,14 @@ public void start() { final Hash256 initStateHash = this.syncState.getLastFinalizedBlockHash(); // Always start with requesting fragments - log.log(Level.INFO, "Requesting fragments..."); + log.log(Level.INFO, "Requesting fragments... " + initStateHash.toString()); this.networkService.updateCurrentSelectedPeerWithNextBootnode(); this.warpSyncAction = new RequestFragmentsAction(initStateHash); // new Thread(() -> { // while (this.warpSyncAction.getClass() != FinishedAction.class) { -// this.handleState(); -// this.nextState(); + this.handleState(); + this.nextState(); // } // // finishWarpSync(); diff --git a/src/main/java/com/limechain/sync/warpsync/action/RequestFragmentsAction.java b/src/main/java/com/limechain/sync/warpsync/action/RequestFragmentsAction.java index b5861ad2b..671f035dc 100644 --- a/src/main/java/com/limechain/sync/warpsync/action/RequestFragmentsAction.java +++ b/src/main/java/com/limechain/sync/warpsync/action/RequestFragmentsAction.java @@ -1,5 +1,7 @@ package com.limechain.sync.warpsync.action; +import com.limechain.exception.global.MissingObjectException; +import com.limechain.network.protocol.warp.dto.WarpSyncResponse; import com.limechain.polkaj.Hash256; import com.limechain.rpc.server.AppBean; import com.limechain.sync.warpsync.WarpSyncMachine; @@ -13,7 +15,7 @@ public class RequestFragmentsAction implements WarpSyncAction { private final WarpSyncState warpSyncState; private final Hash256 blockHash; -// private WarpSyncResponse result; + // private WarpSyncResponse result; private Exception error; public RequestFragmentsAction(Hash256 blockHash) { @@ -37,7 +39,7 @@ public void next(WarpSyncMachine sync) { } } // if (this.result != null) { - sync.setWarpSyncAction(new VerifyJustificationAction()); + sync.setWarpSyncAction(new VerifyJustificationAction()); // return; // } log.log(Level.WARNING, "RequestFragmentsState.next() called without result or error set."); @@ -45,10 +47,10 @@ public void next(WarpSyncMachine sync) { @Override public void handle(WarpSyncMachine sync) { -// WarpSyncResponse resp = null; + WarpSyncResponse resp = null; // for (int i = 0; i < sync.getNetworkService().getKademliaService().getBootNodePeerIds().size(); i++) { // try { -// resp = sync.getNetworkService().makeWarpSyncRequest(blockHash.toString()); + resp = sync.getNetworkService().makeWarpSyncRequest(blockHash.toString()); // break; // } catch (Exception e) { // if (!sync.getNetworkService().updateCurrentSelectedPeerWithBootnode(i)) { @@ -57,28 +59,28 @@ public void handle(WarpSyncMachine sync) { // } // } // } -// try { -// if (resp == null) { -// throw new MissingObjectException("No response received."); -// } -// -// log.log(Level.INFO, "Successfully received fragments from peer " -// + sync.getNetworkService().getCurrentSelectedPeer()); -// if (resp.getFragments().length == 0) { -// log.log(Level.WARNING, "No fragments received."); -// return; -// } -// warpSyncState.setWarpSyncFragmentsFinished(resp.isFinished()); + try { + if (resp == null) { + throw new MissingObjectException("No response received."); + } + + log.log(Level.INFO, "Successfully received fragments from peer " + /* + sync.getNetworkService().getCurrentSelectedPeer()*/); + if (resp.getFragments().length == 0) { + log.log(Level.WARNING, "No fragments received."); + return; + } + warpSyncState.setWarpSyncFragmentsFinished(resp.isFinished()); // sync.setFragmentsQueue(new LinkedBlockingQueue<>( // Arrays.stream(resp.getFragments()).toList()) // ); -// + // this.result = resp; -// } catch (Exception e) { -// // TODO: Set error state, next() will use to transition to correct next state. -// // This error state could be either recoverable or irrecoverable. -// log.log(Level.WARNING, "Error while requesting fragments: " + e.getMessage()); -// this.error = e; -// } + } catch (Exception e) { + // TODO: Set error state, next() will use to transition to correct next state. + // This error state could be either recoverable or irrecoverable. + log.log(Level.WARNING, "Error while requesting fragments: " + e.getMessage()); + this.error = e; + } } } diff --git a/src/main/java/com/limechain/utils/StringUtils.java b/src/main/java/com/limechain/utils/StringUtils.java index ba21a1768..f9f135641 100644 --- a/src/main/java/com/limechain/utils/StringUtils.java +++ b/src/main/java/com/limechain/utils/StringUtils.java @@ -3,8 +3,6 @@ import lombok.experimental.UtilityClass; import org.teavm.jso.JSBody; -import java.util.regex.Pattern; - @UtilityClass public class StringUtils { public static final String HEX_PREFIX = "0x"; @@ -29,10 +27,10 @@ public static byte[] hexToBytes(String hex) { return fromHex(hex); } - @JSBody(params = { "hex" }, script = " let bytes = [];" + - " for (let c = 0; c < hex.length; c += 2)" + - " bytes.push(parseInt(hex.substr(c, 2), 16));" + - " return bytes;") + @JSBody(params = {"hex"}, script = " let bytes = [];" + + " for (let c = 0; c < hex.length; c += 2)" + + " bytes.push(parseInt(hex.substr(c, 2), 16));" + + " return bytes;") public static native byte[] fromHex(String hex); /** @@ -49,21 +47,16 @@ public static String remove0xPrefix(String hex) { return hex; } - /** - * Converts a string to its hexadecimal representation. - * Each character of the input string is converted to its corresponding two-digit hex value. - * - * @param key the string to convert to hexadecimal - * @return the hexadecimal representation of the input string - */ - public static String toHex(String key) { - StringBuilder sb = new StringBuilder(); - char[] ch = key.toCharArray(); - for (char c : ch) { - String hexString = Integer.toHexString(c); - sb.append(hexString); + private final String hexArray = "0123456789abcdef"; + + public static String toHex(byte[] bytes) { + StringBuffer buf = new StringBuffer(); + for (int i = 0; i != bytes.length; i++) { + int v = bytes[i] & 0xff; + buf.append(hexArray.charAt(v >> 4)); + buf.append(hexArray.charAt(v & 0xf)); } - return sb.toString(); + return buf.toString(); } /** @@ -74,6 +67,6 @@ public static String toHex(String key) { * @return the "0x" prefixed hexadecimal string */ public static String toHexWithPrefix(byte[] bytes) { - return HEX_PREFIX + "Hex.toHexString(bytes)"; + return HEX_PREFIX + toHex(bytes); } } diff --git a/src/main/webapp/index.html b/src/main/webapp/index.html index a94bf6767..d2ce5d208 100644 --- a/src/main/webapp/index.html +++ b/src/main/webapp/index.html @@ -15,6 +15,8 @@ + + @@ -47,7 +49,7 @@ services: { identify: Libp2PIdentify.identify(), ping: Libp2PPing.ping(), - dht: Libp2PKadDht.kadDHT({protocol: "dot/dht/1.0.0"}), + dht: Libp2PKadDht.kadDHT({protocol: "/dot/kad"}), pubsub: ChainsafeLibp2PGossipsub.gossipsub(), } }; @@ -55,6 +57,10 @@ libp = await Libp2P.createLibp2p(test); libp.start(); + // libp.addEventListener('peer:discovery', (evt) => console.log('Discovered:', evt.detail.id.toString())) + // libp.addEventListener('peer:connect', (evt) => console.log('Connected:', evt.detail.toString())) + // libp.addEventListener('peer:disconnect', (evt) => console.log('Disconnected:', evt.detail.toString())) + // let pbStream = ItProtobufStream.pbStream; // // libp.handle('/dot/sync/warp', ({stream}) => {