Skip to content

Commit

Permalink
use record types for bundle params, wire up checks and pool rpc test
Browse files Browse the repository at this point in the history
Signed-off-by: garyschulte <[email protected]>
  • Loading branch information
garyschulte committed Jan 29, 2025
1 parent 104bbc7 commit 67bfbe1
Show file tree
Hide file tree
Showing 8 changed files with 272 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,26 @@
*/
package net.consensys.linea.rpc.methods;

import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.extern.slf4j.Slf4j;
import net.consensys.linea.rpc.methods.parameters.BundleParameter;
import net.consensys.linea.rpc.services.LineaLimitedBundlePool;
import net.consensys.linea.rpc.services.LineaLimitedBundlePool.TransactionBundle;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.datatypes.PendingTransaction;
import org.hyperledger.besu.datatypes.Transaction;
import org.hyperledger.besu.datatypes.parameters.UnsignedLongParameter;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.parameters.JsonRpcParameter;
import org.hyperledger.besu.ethereum.api.util.DomainObjectDecodeUtils;
import org.hyperledger.besu.plugin.services.RpcEndpointService;
import org.hyperledger.besu.plugin.services.exception.PluginRpcEndpointException;
import org.hyperledger.besu.plugin.services.rpc.PluginRpcRequest;
Expand Down Expand Up @@ -52,11 +65,57 @@ public BundleResponse execute(final PluginRpcRequest request) {
final int logId = log.isDebugEnabled() ? LOG_SEQUENCE.incrementAndGet() : -1;

try {
final var bundleParams = parseRequest(logId, request.getParams());
final BundleParameter bundleParams = parseRequest(logId, request.getParams());

if (bundleParams.maxTimestamp.isPresent()
&& bundleParams.maxTimestamp.get() < Instant.now().toEpochMilli()) {
throw new Exception("bundle max timestamp is in the past");
}

// TODO: pre-validate the bundle transactions via selectors?

var optBundleUUID = bundleParams.replacementUUID.map(UUID::fromString);

// use replacement UUID hashed if present, otherwise the hash of the transactions themselves
var optBundleHash =
optBundleUUID
.map(
uuid ->
Bytes.concatenate(
Bytes.ofUnsignedLong(uuid.getMostSignificantBits()),
Bytes.ofUnsignedLong(uuid.getLeastSignificantBits())))
.map(Hash::hash)
.or(
() ->
bundleParams.txs().stream()
.map(Bytes::fromHexString)
.reduce(Bytes::concatenate)
.map(Hash::hash));

if (optBundleHash.isPresent()) {
Hash bundleHash = optBundleHash.get();
List<PendingTransaction> txs =
bundleParams.txs.stream()
.map(DomainObjectDecodeUtils::decodeRawTransaction)
.map(tx -> new PendingBundleTx(tx, true, true, System.currentTimeMillis()))
.collect(Collectors.toList());

if (!txs.isEmpty()) {
bundlePool.put(
bundleHash,
new TransactionBundle(
bundleHash,
txs,
bundleParams.blockNumber,
bundleParams.minTimestamp,
bundleParams.maxTimestamp,
bundleParams.revertingTxHashes()));
return new BundleResponse(bundleHash);
}
}
// otherwise boom.
throw new RuntimeException("Malformed bundle, no bundle transactions present");

// TODO: pre-validate the bundle and add to the bundle pool.

return new BundleResponse(Bytes32.random());
} catch (final Exception e) {
throw new PluginRpcEndpointException(new LineaSendBundleError(e.getMessage()));
}
Expand All @@ -72,7 +131,7 @@ private BundleParameter parseRequest(final int logId, final Object[] params) {
.addArgument(logId)
.setCause(e)
.log();
throw new RuntimeException(e);
throw new RuntimeException("malformed linea_sendBundle json param");
}
}

Expand All @@ -96,4 +155,46 @@ public String getMessage() {
return errMessage;
}
}

public record BundleParameter(
/* array of signed transactions to execute in a bundle */
List<String> txs,
/* block number for which this bundle is valid */
Long blockNumber,
/* Optional minimum timestamp from which this bundle is valid */
Optional<Long> minTimestamp,
/* Optional max timestamp for which this bundle is valid */
Optional<Long> maxTimestamp,
/* Optional list of transaction hashes which are allowed to revert */
Optional<List<Hash>> revertingTxHashes,
/* Optional UUID which can be used to replace or cancel this bundle */
Optional<String> replacementUUID,
/* Optional list of builders to share this bundle with */
Optional<List<String>> builders) {
@JsonCreator
public BundleParameter(
@JsonProperty("txs") final List<String> txs,
@JsonProperty("blockNumber") final UnsignedLongParameter blockNumber,
@JsonProperty("minTimestamp") final Optional<Long> minTimestamp,
@JsonProperty("maxTimestamp") final Optional<Long> maxTimestamp,
@JsonProperty("revertingTxHashes") final Optional<List<Hash>> revertingTxHashes,
@JsonProperty("replacementUUID") final Optional<String> replacementUUID,
@JsonProperty("builders") final Optional<List<String>> builders) {
this(
txs,
blockNumber.getValue(),
minTimestamp,
maxTimestamp,
revertingTxHashes,
replacementUUID,
builders);
}
}

public record PendingBundleTx(
Transaction getTransaction,
boolean isReceivedFromLocalSource,
boolean hasPriority,
long getAddedAt)
implements PendingTransaction {}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import com.github.benmanes.caffeine.cache.RemovalCause;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.datatypes.PendingTransaction;
import org.hyperledger.besu.datatypes.parameters.UnsignedLongParameter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -154,7 +153,7 @@ public void removeByBlockNumber(long blockNumber) {
* @param bundle The TransactionBundle to add.
*/
private void addToBlockIndex(TransactionBundle bundle) {
long blockNumber = bundle.blockNumber().getValue();
long blockNumber = bundle.blockNumber();
blockIndex.computeIfAbsent(blockNumber, k -> new ArrayList<>()).add(bundle);
}

Expand All @@ -164,7 +163,7 @@ private void addToBlockIndex(TransactionBundle bundle) {
* @param bundle The TransactionBundle to remove.
*/
private void removeFromBlockIndex(TransactionBundle bundle) {
long blockNumber = bundle.blockNumber().getValue();
long blockNumber = bundle.blockNumber();
List<TransactionBundle> bundles = blockIndex.get(blockNumber);
if (bundles != null) {
bundles.remove(bundle);
Expand All @@ -182,7 +181,7 @@ private int calculateWeight(TransactionBundle bundle) {
public record TransactionBundle(
Hash bundleIdentifier,
List<PendingTransaction> pendingTransactions,
UnsignedLongParameter blockNumber,
Long blockNumber,
Optional<Long> minTimestamp,
Optional<Long> maxTimestamp,
Optional<List<Hash>> revertingTxHashes) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class LineaSendBundleEndpointPlugin extends AbstractLineaRequiredPlugin {
private RpcEndpointService rpcEndpointService;
private LineaSendBundle lineaSendBundleMethod;

// TODO: rational default?
// TODO: rational default? config?
private final long DEFAULT_MAX_POOL_SIZE_IN_MB = 4L;

/**
Expand All @@ -43,6 +43,7 @@ public void doRegister(final ServiceManager serviceManager) {
new RuntimeException(
"Failed to obtain RpcEndpointService from the ServiceManager."));

// TODO: where do I come from? need a shared instance
var lineaBundlePool = new LineaLimitedBundlePool(DEFAULT_MAX_POOL_SIZE_IN_MB * 1024 * 1024);

lineaSendBundleMethod = new LineaSendBundle(rpcEndpointService, lineaBundlePool);
Expand Down
Loading

0 comments on commit 67bfbe1

Please sign in to comment.