Skip to content

Commit 693a7e7

Browse files
authored
fix formatter for trace event fields (#28)
* 7702 bundler response changes * correct trace event field formatter
1 parent 3b28b15 commit 693a7e7

File tree

24 files changed

+302
-249
lines changed

24 files changed

+302
-249
lines changed

core/src/rpc_clients/bundler.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -71,11 +71,18 @@ pub struct TwExecuteResponse {
7171
}
7272

7373
/// Response from tw_getTransactionHash bundler method
74+
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
75+
#[serde(rename_all = "camelCase", tag = "status")]
76+
pub enum TwGetTransactionHashResponse {
77+
Pending,
78+
Success { transaction_hash: String },
79+
}
80+
7481
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
7582
#[serde(rename_all = "camelCase")]
76-
pub struct TwGetTransactionHashResponse {
77-
/// The transaction hash
78-
pub transaction_hash: Option<String>,
83+
pub enum TwGetTransactionHashStatus {
84+
Pending,
85+
Success,
7986
}
8087

8188
impl BundlerClient {
@@ -152,12 +159,12 @@ impl BundlerClient {
152159
pub async fn tw_get_transaction_hash(
153160
&self,
154161
transaction_id: &str,
155-
) -> TransportResult<Option<String>> {
162+
) -> TransportResult<TwGetTransactionHashResponse> {
156163
let params = serde_json::json!([transaction_id]);
157164

158165
let response: TwGetTransactionHashResponse =
159166
self.inner.request("tw_getTransactionHash", params).await?;
160167

161-
Ok(response.transaction_hash)
168+
Ok(response)
162169
}
163170
}

core/src/rpc_clients/transport.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,12 +64,12 @@ impl HeaderInjectingTransport {
6464
.map_err(TransportErrorKind::custom)?;
6565

6666
let status = resp.status();
67-
debug!(%status, "received response from server");
67+
debug!(?status, "received response from server");
6868

6969
// Get response body
7070
let body = resp.bytes().await.map_err(TransportErrorKind::custom)?;
7171
debug!(bytes = body.len(), "retrieved response body");
72-
trace!(body = %String::from_utf8_lossy(&body), "response body");
72+
trace!(body = ?String::from_utf8_lossy(&body), "response body");
7373

7474
// Check for HTTP errors
7575
if !status.is_success() {
@@ -100,7 +100,7 @@ impl Service<RequestPacket> for HeaderInjectingTransport {
100100
#[inline]
101101
fn call(&mut self, req: RequestPacket) -> Self::Future {
102102
let this = self.clone(); // Clone is cheap - just clones the Arc inside Client
103-
let span = debug_span!("HeaderInjectingTransport", url = %this.url);
103+
let span = debug_span!("HeaderInjectingTransport", url = ?this.url);
104104
Box::pin(this.do_request(req).instrument(span))
105105
}
106106
}

executors/src/eip7702_executor/confirm.rs

Lines changed: 16 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use alloy::primitives::{Address, TxHash};
22
use alloy::providers::Provider;
33
use alloy::rpc::types::TransactionReceipt;
44
use engine_core::error::{AlloyRpcErrorToEngineError, EngineError};
5+
use engine_core::rpc_clients::TwGetTransactionHashResponse;
56
use engine_core::{
67
chain::{Chain, ChainService, RpcCredentials},
78
execution_options::WebhookOptions,
@@ -31,11 +32,7 @@ pub struct Eip7702ConfirmationJobData {
3132
pub transaction_id: String,
3233
pub chain_id: u64,
3334
pub bundler_transaction_id: String,
34-
/// ! Deprecated todo: remove this field after all jobs are processed
35-
pub eoa_address: Option<Address>,
36-
37-
// TODO: make non-optional after all jobs are processed
38-
pub sender_details: Option<Eip7702Sender>,
35+
pub sender_details: Eip7702Sender,
3936

4037
pub rpc_credentials: RpcCredentials,
4138
#[serde(default)]
@@ -189,7 +186,7 @@ where
189186
let chain = chain.with_new_default_headers(chain_auth_headers);
190187

191188
// 2. Get transaction hash from bundler
192-
let transaction_hash_str = chain
189+
let transaction_hash_res = chain
193190
.bundler_client()
194191
.tw_get_transaction_hash(&job_data.bundler_transaction_id)
195192
.await
@@ -198,16 +195,19 @@ where
198195
})
199196
.map_err_fail()?;
200197

201-
let transaction_hash = match transaction_hash_str {
202-
Some(hash) => hash.parse::<TxHash>().map_err(|e| {
203-
Eip7702ConfirmationError::TransactionHashError {
204-
message: format!("Invalid transaction hash format: {}", e),
205-
}
206-
.fail()
207-
})?,
208-
None => {
198+
let transaction_hash = match transaction_hash_res {
199+
TwGetTransactionHashResponse::Success { transaction_hash } => {
200+
transaction_hash.parse::<TxHash>().map_err(|e| {
201+
Eip7702ConfirmationError::TransactionHashError {
202+
message: format!("Invalid transaction hash format: {}", e),
203+
}
204+
.fail()
205+
})?
206+
}
207+
208+
TwGetTransactionHashResponse::Pending => {
209209
return Err(Eip7702ConfirmationError::TransactionHashError {
210-
message: "Transaction not found".to_string(),
210+
message: "Transaction not yet confirmed".to_string(),
211211
})
212212
.map_err_nack(Some(Duration::from_secs(2)), RequeuePosition::Last);
213213
}
@@ -262,25 +262,11 @@ where
262262
"Transaction confirmed successfully"
263263
);
264264

265-
// todo: remove this after all jobs are processed
266-
let sender_details = job_data
267-
.sender_details
268-
.clone()
269-
.or_else(|| {
270-
job_data
271-
.eoa_address
272-
.map(|eoa_address| Eip7702Sender::Owner { eoa_address })
273-
})
274-
.ok_or_else(|| Eip7702ConfirmationError::InternalError {
275-
message: "No sender details found".to_string(),
276-
})
277-
.map_err_fail()?;
278-
279265
Ok(Eip7702ConfirmationResult {
280266
transaction_id: job_data.transaction_id.clone(),
281267
transaction_hash,
282268
receipt,
283-
sender_details,
269+
sender_details: job_data.sender_details.clone(),
284270
})
285271
}
286272

executors/src/eip7702_executor/send.rs

Lines changed: 10 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -38,19 +38,7 @@ pub struct Eip7702SendJobData {
3838
pub transaction_id: String,
3939
pub chain_id: u64,
4040
pub transactions: Vec<InnerTransaction>,
41-
42-
// !IMPORTANT TODO
43-
// To preserve backwards compatibility with pre-existing queued jobs, we continue keeping the eoa_address field until the next release
44-
// However, we make it optional now, and rely on the Eip7702ExecutionOptions instead
45-
pub eoa_address: Option<Address>,
46-
47-
// We must also keep the execution_options as optional to prevent deserialization errors
48-
// when we remove the eoa_address field, we can make execution_options required
49-
// at runtime we resolve from both, with preference to execution_options
50-
// if both are none, we return an error
51-
#[serde(skip_serializing_if = "Option::is_none")]
52-
pub execution_options: Option<Eip7702ExecutionOptions>,
53-
41+
pub execution_options: Eip7702ExecutionOptions,
5442
pub signing_credential: SigningCredential,
5543
#[serde(default)]
5644
pub webhook_options: Vec<WebhookOptions>,
@@ -208,24 +196,17 @@ where
208196

209197
let chain = chain.with_new_default_headers(chain_auth_headers);
210198

211-
let owner_address = job_data
212-
.eoa_address
213-
.or(job_data.execution_options.as_ref().map(|e| match e {
214-
Eip7702ExecutionOptions::Owner(o) => o.from,
215-
Eip7702ExecutionOptions::SessionKey(s) => s.session_key_address,
216-
}))
217-
.ok_or(Eip7702SendError::InternalError {
218-
message: "No owner address found".to_string(),
219-
})
220-
.map_err_fail()?;
199+
let owner_address = match &job_data.execution_options {
200+
Eip7702ExecutionOptions::Owner(o) => o.from,
201+
Eip7702ExecutionOptions::SessionKey(s) => s.session_key_address,
202+
};
221203

222204
let account = DelegatedAccount::new(owner_address, chain);
223205

224-
let session_key_target_address =
225-
job_data.execution_options.as_ref().and_then(|e| match e {
226-
Eip7702ExecutionOptions::Owner(_) => None,
227-
Eip7702ExecutionOptions::SessionKey(s) => Some(s.account_address),
228-
});
206+
let session_key_target_address = match &job_data.execution_options {
207+
Eip7702ExecutionOptions::Owner(_) => None,
208+
Eip7702ExecutionOptions::SessionKey(s) => Some(s.account_address),
209+
};
229210

230211
let transactions = match session_key_target_address {
231212
Some(target_address) => {
@@ -343,8 +324,7 @@ where
343324
transaction_id: job.job.data.transaction_id.clone(),
344325
chain_id: job.job.data.chain_id,
345326
bundler_transaction_id: success_data.result.transaction_id.clone(),
346-
eoa_address: None,
347-
sender_details: Some(success_data.result.sender_details.clone()),
327+
sender_details: success_data.result.sender_details.clone(),
348328
rpc_credentials: job.job.data.rpc_credentials.clone(),
349329
webhook_options: job.job.data.webhook_options.clone(),
350330
})

executors/src/eoa/store/atomic.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -105,30 +105,30 @@ impl AtomicEoaExecutorStore {
105105
{
106106
Ok(()) => {
107107
tracing::debug!(
108-
eoa = %self.eoa(),
109-
chain_id = %self.chain_id(),
110-
worker_id = %self.worker_id(),
108+
eoa = ?self.eoa(),
109+
chain_id = self.chain_id(),
110+
worker_id = self.worker_id(),
111111
"Successfully released EOA lock"
112112
);
113113
Ok(self.store)
114114
}
115115
Err(TransactionStoreError::LockLost { .. }) => {
116116
// Lock was already taken over, which is fine for release
117117
tracing::debug!(
118-
eoa = %self.eoa(),
119-
chain_id = %self.chain_id(),
120-
worker_id = %self.worker_id(),
118+
eoa = ?self.eoa(),
119+
chain_id = self.chain_id(),
120+
worker_id = self.worker_id(),
121121
"Lock already released or taken over by another worker"
122122
);
123123
Ok(self.store)
124124
}
125125
Err(e) => {
126126
// Other errors shouldn't fail the worker, just log
127127
tracing::warn!(
128-
eoa = %self.eoa(),
129-
chain_id = %self.chain_id(),
130-
worker_id = %self.worker_id(),
131-
error = %e,
128+
eoa = ?self.eoa(),
129+
chain_id = self.chain_id(),
130+
worker_id = self.worker_id(),
131+
error = ?e,
132132
"Failed to release EOA lock"
133133
);
134134
Ok(self.store)
@@ -197,7 +197,7 @@ impl AtomicEoaExecutorStore {
197197
tracing::debug!(
198198
retry_count = retry_count,
199199
delay_ms = delay_ms,
200-
eoa = %self.eoa(),
200+
eoa = ?self.eoa(),
201201
chain_id = self.chain_id(),
202202
"Retrying lock check operation"
203203
);
@@ -312,7 +312,7 @@ impl AtomicEoaExecutorStore {
312312
tracing::debug!(
313313
retry_count = retry_count,
314314
delay_ms = delay_ms,
315-
eoa = %self.eoa,
315+
eoa = ?self.eoa,
316316
chain_id = self.chain_id,
317317
operation = safe_tx.name(),
318318
"Retrying atomic operation"

executors/src/eoa/store/borrowed.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,8 @@ impl SafeRedisTransaction for ProcessBorrowedTransactions<'_> {
8181
valid_results.push(result.clone());
8282
} else {
8383
tracing::warn!(
84-
transaction_id = %transaction_id,
85-
nonce = %result.transaction.nonce,
84+
transaction_id = ?transaction_id,
85+
nonce = result.transaction.nonce,
8686
"Submission result not found in borrowed state, ignoring"
8787
);
8888
}
@@ -174,6 +174,7 @@ impl SafeRedisTransaction for ProcessBorrowedTransactions<'_> {
174174
// Update transaction data status
175175
let tx_data_key = self.keys.transaction_data_key_name(transaction_id);
176176
pipeline.hset(&tx_data_key, "status", "pending");
177+
pipeline.zadd(self.keys.recycled_nonces_zset_name(), nonce, nonce);
177178

178179
// Queue webhook event using user_request from SubmissionResult
179180
let event = EoaExecutorEvent {
@@ -206,6 +207,7 @@ impl SafeRedisTransaction for ProcessBorrowedTransactions<'_> {
206207
pipeline.hset(&tx_data_key, "status", "failed");
207208
pipeline.hset(&tx_data_key, "completed_at", now);
208209
pipeline.hset(&tx_data_key, "failure_reason", err.to_string());
210+
pipeline.zadd(self.keys.recycled_nonces_zset_name(), nonce, nonce);
209211

210212
// Queue webhook event using user_request from SubmissionResult
211213
let event = EoaExecutorEvent {

executors/src/eoa/store/mod.rs

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -404,9 +404,9 @@ impl EoaExecutorStore {
404404
}
405405
// Lock exists, forcefully take it over
406406
tracing::warn!(
407-
eoa = %self.eoa,
408-
chain_id = %self.chain_id,
409-
worker_id = %worker_id,
407+
eoa = ?self.eoa,
408+
chain_id = self.chain_id,
409+
worker_id = worker_id,
410410
"Forcefully taking over EOA lock from stalled worker"
411411
);
412412
// Force set - no expiry, only released by explicit takeover
@@ -504,13 +504,32 @@ impl EoaExecutorStore {
504504
&self,
505505
limit: u64,
506506
) -> Result<Vec<PendingTransaction>, TransactionStoreError> {
507+
self.peek_pending_transactions_paginated(0, limit).await
508+
}
509+
510+
/// Peek at pending transactions with pagination support
511+
pub async fn peek_pending_transactions_paginated(
512+
&self,
513+
offset: u64,
514+
limit: u64,
515+
) -> Result<Vec<PendingTransaction>, TransactionStoreError> {
516+
if limit == 0 {
517+
return Ok(Vec::new());
518+
}
519+
507520
let pending_key = self.pending_transactions_zset_name();
508521
let mut conn = self.redis.clone();
509522

510-
// Use ZRANGE to peek without removing
511-
let transaction_ids: Vec<PendingTransactionStringWithQueuedAt> = conn
512-
.zrange_withscores(&pending_key, 0, (limit - 1) as isize)
513-
.await?;
523+
// Use ZRANGE to peek without removing, with offset support
524+
let start = offset as isize;
525+
let stop = (offset + limit - 1) as isize;
526+
527+
let transaction_ids: Vec<PendingTransactionStringWithQueuedAt> =
528+
conn.zrange_withscores(&pending_key, start, stop).await?;
529+
530+
if transaction_ids.is_empty() {
531+
return Ok(Vec::new());
532+
}
514533

515534
let mut pipe = twmq::redis::pipe();
516535

executors/src/eoa/store/submitted.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -475,6 +475,8 @@ impl SafeRedisTransaction for CleanAndGetRecycledNonces<'_> {
475475
.zrange(self.keys.recycled_nonces_zset_name(), 0, -1)
476476
.await?;
477477

478+
// filter out nonces that are higher than the highest submitted nonce
479+
// these don't need to be recycled, they'll be used up by incrementing the nonce
478480
let recycled_nonces = recycled_nonces
479481
.into_iter()
480482
.filter(|nonce| *nonce < highest_submitted_nonce)

0 commit comments

Comments
 (0)