Skip to content

Operations with Network-Based Sub-Operations Lack Atomicity Guarantees #1975

@netsirius

Description

@netsirius

Operations with Network-Based Sub-Operations Lack Atomicity Guarantees

Freenet's high-level operations (PUT, GET, UPDATE, SUBSCRIBE) are not atomic when they spawn network-based sub-operations. A parent operation can complete and respond to the client while its constituent sub-operations are still running, have failed, or haven't started. This breaks fundamental ACID guarantees and creates inconsistent distributed state across the network.

This issue supersedes #1765, which narrowly addresses PUT+subscribe atomicity. The problem is architectural: there is no infrastructure to track sub-operation completion or propagate sub-operation failures to parent operations.


Problem Statement

Current Non-Atomic Behavior

Parent operations complete independently of their sub-operations:

// PUT with subscribe=true (put.rs:562-566)
if subscribe && is_seeding_contract {
    // TODO: Make put operation atomic by linking it to the completion of this subscription request.
    // Currently we can't link one transaction to another transaction's result, which would be needed
    // to make this fully atomic. This should be addressed in a future refactoring.
    super::start_subscription_request(op_manager, key).await;  // ← Fire-and-forget
}

// Mark operation as finished
new_state = Some(PutState::Finished { key });  // ← Parent finishes immediately

Result: Client receives PutResponse { key } success, but subscription may have:

  • Not completed network propagation
  • Failed to establish connections to remote peers
  • Timed out during GET (if contract wasn't locally available)
  • Silently failed with only a warning log

Scope of the Problem

This is not limited to PUT+subscribe. Analysis reveals multiple locations where operations spawn untracked sub-operations:

Parent Operation Sub-Operation Location Trigger Condition
PUT SUBSCRIBE put.rs:355 Caching peer receives PUT
PUT SUBSCRIBE put.rs:565 Client PUT with subscribe: true
PUT SUBSCRIBE put.rs:672 Contract seeding during PUT
GET SUBSCRIBE get.rs:953 Proximity caching (contract not local)

Evidence from Codebase

1. Explicit Acknowledgment of the Problem

The codebase contains a TODO comment explicitly recognizing this architectural gap:

// put.rs:562-564
// TODO: Make put operation atomic by linking it to the completion of this subscription request.
// Currently we can't link one transaction to another transaction's result, which would be needed
// to make this fully atomic. This should be addressed in a future refactoring.

2. Sub-Operations Are Fire-and-Forget

All sub-operations are spawned without tracking their completion:

// operations/mod.rs:310-318
async fn start_subscription_request(op_manager: &OpManager, key: ContractKey) {
    let sub_op = subscribe::start_op(key);  // ← New independent transaction
    if let Err(error) = subscribe::request_subscribe(op_manager, sub_op).await {
        tracing::warn!(%error, "Error subscribing to contract");  // ← Silent failure!
    }
}

Critical observation: Errors are logged but not returned to the caller. Parent operation has no way to detect failure.

3. No Parent-Child Transaction Tracking

The Transaction type has no concept of hierarchy:

// message.rs:35-38
#[derive(Serialize, Deserialize, PartialEq, Eq, Hash, Clone, Copy)]
pub struct Transaction {
    id: Ulid,  // ← No parent field
}

Transaction IDs are independent. A parent cannot query "are my children done?"

4. OpManager Has No Sub-Operation Registry

Operations are tracked flatly without relationships:

// node/op_state_manager.rs:54-63
#[derive(Default)]
struct Ops {
    connect: DashMap<Transaction, ConnectOp>,
    put: DashMap<Transaction, PutOp>,
    get: DashMap<Transaction, GetOp>,
    subscribe: DashMap<Transaction, SubscribeOp>,
    update: DashMap<Transaction, UpdateOp>,
    completed: DashSet<Transaction>,       // ← Flat completion tracking
    under_progress: DashSet<Transaction>,  // ← No hierarchical structure
}

No data structure tracks "Transaction A spawned Transactions B and C."

5. Operation Finalization Ignores Sub-Operations

When an operation finishes, it checks only its own state:

// put.rs:60-62
pub(super) fn finalized(&self) -> bool {
    self.state.is_none() || matches!(self.state, Some(PutState::Finished { .. }))
    // ↑ Only checks PUT state, not whether subscription completed
}

The finalization logic in operations/mod.rs:

// mod.rs:108-112
Ok(OperationResult {
    return_msg: None,
    state: Some(final_state),
}) if final_state.finalized() => {
    // operation finished_completely with result
    op_manager.completed(tx_id);  // ← Parent marked complete
    return Ok(Some(final_state));  // ← Client receives response
}

No check for sub-operation completion. Parent completes as soon as its own state is Finished.

6. Client Responses Sent Immediately

When operation is marked completed, result is sent to client immediately:

// node/mod.rs:413-421
if let Some(transaction) = tx {
    let host_result = op_res.to_host_result();  // ← Convert to client response
    let router_tx_clone = op_manager.result_router_tx.clone();

    tokio::spawn(async move {
        if let Err(e) = router_tx_clone.send((transaction, host_result)).await {
            // Send to client via result router
        }
    });
}

Client receives Ok(PutResponse { key }) before subscription completes.


Architectural Root Causes

1. No Parent-Child Transaction Relationship Model

Current state:

  • Transactions are globally unique ULIDs
  • Transaction type encoded in last byte (Connect=0, Put=1, Get=2, Subscribe=3, Update=4)
  • No field to reference parent transaction

What's missing:

  • Parent transaction reference
  • API to query "what are the children of transaction X?"
  • API to query "has transaction X completed all its children?"

Impact: Impossible to know if a parent operation has pending sub-operations.

2. No Sub-Operation Registry in OpManager

Current state:

  • OpManager maintains separate DashMap<Transaction, Op> for each operation type
  • completed: DashSet<Transaction> tracks completed operations
  • under_progress: DashSet<Transaction> tracks running operations

What's missing:

  • sub_operations: HashMap<Transaction, HashSet<Transaction>> to track parent→children
  • pending_finalization: HashMap<Transaction, OpEnum> for operations waiting on children
  • Logic to defer parent completion until all children complete

Impact: No infrastructure to enforce atomicity.

3. Finalization Logic Has No Atomicity Concept

Current state:

pub(super) fn finalized(&self) -> bool {
    self.state.is_none() || matches!(self.state, Some(PutState::Finished { .. }))
}

Each operation type implements finalized() checking only its own state.

What's missing:

  • finalized() should also check OpManager::all_sub_operations_completed(self.id)
  • Operations need a new state like AwaitingSubOperations { state, pending_children }

Impact: Parent operations finalize prematurely.

4. Sub-Operation Spawning Has No Completion Callback

Current state:

async fn start_subscription_request(op_manager: &OpManager, key: ContractKey) {
    let sub_op = subscribe::start_op(key);  // ← New independent transaction
    if let Err(error) = subscribe::request_subscribe(op_manager, sub_op).await {
        tracing::warn!(%error, "Error subscribing to contract");  // ← Logged and dropped
    }
}

Errors are swallowed. No return value. No notification to parent.

What's missing:

  • Return Result<Transaction, OpError> with child transaction ID
  • Register child with parent: op_manager.register_sub_operation(parent_tx, child_tx)
  • Propagate errors: Err(error) => parent_operation.fail_with_error(error)

Impact: Silent failures. Parents never know children failed.

5. No Error Propagation Mechanism

Current state:

  • Sub-operations execute in separate async contexts
  • Errors are logged but not returned
  • Parent has no channel to receive child failure notifications

What's missing:

  • When child fails: op_manager.sub_operation_failed(parent_tx, child_tx, error)
  • Parent state transition: Finished { .. }Failed { cause: ChildOperationFailed }
  • Client notification: Change Ok(PutResponse) to Err(OperationError)

Impact: Clients think operations succeeded when they actually failed.


Distributed Systems Impact

Violated Guarantees

Guarantee Current State Expected State
Atomicity ❌ Parent completes while children run ✅ All-or-nothing completion
Consistency ❌ Contract stored, but subscription not registered ✅ Consistent state across network
Durability ⚠️ Contract persisted, but updates won't propagate ✅ Contract + subscription both durable
Reliability ❌ Silent sub-operation failures ✅ Failures reported to client

Concrete Failure Scenarios

Scenario 1: PUT with subscribe=true Times Out

1. Client sends: PUT(contract, state, subscribe=true)
2. Peer stores contract locally
3. Peer spawns SUBSCRIBE sub-operation
4. SUBSCRIBE needs to GET contract from remote peer (not locally available yet)
5. GET times out after 10 seconds
6. SUBSCRIBE fails with warning log
7. PUT completes: PutState::Finished { key }
8. Client receives: Ok(PutResponse { key })

Result: Client thinks operation succeeded
Reality: Subscription failed - peer won't receive updates!

Scenario 2: GET Proximity Caching Fails Silently

1. Peer A requests: GET(contract_key)
2. Peer B responds with contract
3. Peer A caches contract locally
4. Peer A spawns SUBSCRIBE to join subscription tree
5. SUBSCRIBE fails (network partition, peer offline, etc.)
6. Error logged: "Error subscribing to contract"
7. GET completes: GetState::Finished
8. Client receives: Ok(GetResponse { contract, state })

Result: Client has contract, unaware of subscription failure
Reality: Peer A won't receive updates - stale data!

Scenario 3: Multi-Hop PUT with Partial Subscription

1. Client PUTs contract, subscribe=true
2. PUT routes through Peer A, Peer B, arrives at Peer C (optimal location)
3. Peer C stores contract
4. Peer C spawns SUBSCRIBE
5. Peer B's subscription succeeds
6. Peer A's subscription fails (network issue)
7. PUT still returns success to client

Result: Broken subscription tree
Reality: Updates broadcasted from Peer C won't reach Peer A

Why This Supersedes Issue #1765

Aspect #1765 Scope This Issue Scope
Problem identification PUT + subscribe blocks response Operations lack atomicity for ALL sub-operations
Root cause Subscription blocks PUT No infrastructure to track sub-operation completion
Operations affected PUT only PUT, GET, UPDATE (any operation with sub-ops)
Sub-operations affected SUBSCRIBE only SUBSCRIBE, future sub-ops (e.g., replication)
Solution approach Make PUT+subscribe non-blocking Build general sub-operation tracking infrastructure
Reusability One-off fix Reusable for all future operations
Architectural improvement None Enables composite atomic operations
Error handling Unclear Explicit propagation from children to parents

Conclusion: #1765 is a symptom. This issue addresses the disease.


Proposed Solution Architecture

Design Principles

  1. Hierarchical Transactions: Parent transactions track their children
  2. Deferred Completion: Parents finalize only after all children finalize
  3. Explicit Error Propagation: Child failures propagate to parent and client
  4. Backward Compatible: Operations without sub-operations unchanged
  5. Transparent Integration: Minimal changes to operation implementations

Implementation Components

Component 1: Extend Transaction Model

Add parent tracking to Transaction:

// message.rs
pub struct Transaction {
    id: Ulid,
    parent: Option<Ulid>,  // ← NEW: Parent transaction ID
}

impl Transaction {
    pub fn new<T: TxType>() -> Self {
        // Existing implementation (no parent)
    }

    pub fn new_child_of(parent: &Transaction, tx_type: TransactionType) -> Self {
        let id = Ulid::new();
        let updated_id = Self::update(tx_type as u8, id);
        Self {
            id: updated_id,
            parent: Some(parent.id),  // ← Link to parent
        }
    }

    pub fn parent_id(&self) -> Option<&Ulid> {
        self.parent.as_ref()
    }
}

Rationale: Enables tracing operation hierarchies, debugging, and completion tracking.

Component 2: Add Sub-Operation Registry to OpManager

Track parent-child relationships:

// node/op_state_manager.rs
use dashmap::DashMap;
use std::collections::HashSet;
use std::sync::Arc;

pub(crate) struct OpManager {
    // ... existing fields ...

    /// Maps parent transaction to set of child transactions
    sub_operations: Arc<DashMap<Transaction, HashSet<Transaction>>>,

    /// Operations that reached Finished state but have pending children
    pending_finalization: Arc<DashMap<Transaction, OpEnum>>,
}

impl OpManager {
    /// Register a sub-operation relationship
    pub fn register_sub_operation(&self, parent: Transaction, child: Transaction) {
        self.sub_operations
            .entry(parent)
            .or_insert_with(HashSet::new)
            .insert(child);

        tracing::debug!(
            parent_tx = %parent,
            child_tx = %child,
            "Registered sub-operation"
        );
    }

    /// Check if all sub-operations of a parent have completed
    pub fn all_sub_operations_completed(&self, parent: Transaction) -> bool {
        match self.sub_operations.get(&parent) {
            None => true,  // No sub-operations
            Some(children) => {
                // Check if all children are in completed set
                children.iter().all(|child| self.ops.completed.contains(child))
            }
        }
    }

    /// Mark a sub-operation as completed, check if parent can now finalize
    pub async fn sub_operation_completed(
        &self,
        child: Transaction,
    ) -> Result<Option<Transaction>, OpError> {
        // Mark child as completed (existing logic)
        self.completed(child);

        // Find parent transaction
        // Note: We need to iterate or maintain reverse mapping
        for entry in self.sub_operations.iter() {
            let (parent_tx, children) = entry.pair();
            if children.contains(&child) {
                // Check if ALL siblings are now complete
                if self.all_sub_operations_completed(*parent_tx) {
                    tracing::info!(
                        parent_tx = %parent_tx,
                        child_tx = %child,
                        "All sub-operations completed, parent can finalize"
                    );
                    return Ok(Some(*parent_tx));
                }
            }
        }
        Ok(None)
    }

    /// Handle sub-operation failure - propagate to parent
    pub async fn sub_operation_failed(
        &self,
        child: Transaction,
        error: OpError,
    ) -> Result<(), OpError> {
        tracing::error!(
            child_tx = %child,
            error = %error,
            "Sub-operation failed, propagating to parent"
        );

        // Find parent and fail it
        for entry in self.sub_operations.iter() {
            let (parent_tx, children) = entry.pair();
            if children.contains(&child) {
                // Mark parent as failed
                self.completed(*parent_tx);

                // Send error to client
                let error_result = Err(freenet_stdlib::client_api::ErrorKind::OperationError {
                    cause: format!("Sub-operation {} failed: {}", child, error).into(),
                }.into());

                self.result_router_tx
                    .send((*parent_tx, error_result))
                    .await
                    .map_err(|_| OpError::NotificationError)?;

                return Ok(());
            }
        }
        Ok(())
    }
}

Optimization: Maintain reverse index for O(1) parent lookup:

/// Reverse index: child -> parent (for faster lookups)
parent_of: Arc<DashMap<Transaction, Transaction>>,

Component 3: Modify Operation Finalization Logic

Update operation completion handler:

// operations/mod.rs
async fn handle_op_result<CB>(
    op_manager: &OpManager,
    network_bridge: &mut CB,
    result: Result<OperationResult, OpError>,
    tx_id: Transaction,
    sender: Option<PeerId>,
) -> Result<Option<OpEnum>, OpError>
where
    CB: NetworkBridge,
{
    match result {
        // ... error handling unchanged ...

        Ok(OperationResult {
            return_msg: None,
            state: Some(final_state),
        }) if final_state.finalized() => {
            // ↓ NEW: Check sub-operations before finalizing
            if op_manager.all_sub_operations_completed(tx_id) {
                // All sub-operations done, finalize immediately
                tracing::debug!(%tx_id, "operation completed with all sub-operations");
                op_manager.completed(tx_id);
                return Ok(Some(final_state));
            } else {
                // Parent reached finished state but children still pending
                tracing::debug!(
                    %tx_id,
                    "operation finished but awaiting sub-operation completion"
                );

                // Store in pending_finalization map
                op_manager.pending_finalization.insert(tx_id, final_state.clone());

                // Don't send response to client yet
                return Ok(None);
            }
        }

        // ... rest unchanged ...
    }
    Ok(None)
}

When child completes, check if parent can finalize:

// In operation completion path
pub fn completed(&self, id: Transaction) {
    self.ring.live_tx_tracker.remove_finished_transaction(id);
    self.ops.completed.insert(id);

    // ↓ NEW: Check if this was a sub-operation
    if let Some(parent_tx) = self.parent_of.get(&id) {
        if self.all_sub_operations_completed(*parent_tx) {
            // All siblings completed, finalize parent now
            if let Some((_key, parent_op)) = self.pending_finalization.remove(&parent_tx) {
                tracing::info!(
                    parent_tx = %parent_tx,
                    "All sub-operations completed, finalizing parent"
                );

                // Send result to client
                let host_result = parent_op.to_host_result();
                let _ = self.result_router_tx.try_send((*parent_tx, host_result));
            }
        }
    }
}

Component 4: Update Sub-Operation Spawning

Modify start_subscription_request to register and track:

// operations/mod.rs
async fn start_subscription_request(
    op_manager: &OpManager,
    parent_tx: Transaction,  // ← NEW: Parent transaction
    key: ContractKey,
) -> Result<Transaction, OpError> {  // ← NEW: Return child transaction
    // Create child transaction
    let child_tx = Transaction::new_child_of(&parent_tx, TransactionType::Subscribe);

    // Register with parent
    op_manager.register_sub_operation(parent_tx, child_tx);

    // Create subscription operation with child transaction
    let sub_op = subscribe::start_op_with_id(key, child_tx);

    // Execute subscription
    match subscribe::request_subscribe(op_manager, sub_op).await {
        Ok(_) => {
            tracing::debug!(
                parent_tx = %parent_tx,
                child_tx = %child_tx,
                "Subscription sub-operation succeeded"
            );
            Ok(child_tx)
        }
        Err(error) => {
            tracing::error!(
                parent_tx = %parent_tx,
                child_tx = %child_tx,
                error = %error,
                "Subscription sub-operation failed"
            );

            // ↓ NEW: Propagate failure to parent
            op_manager.sub_operation_failed(child_tx, error.clone()).await?;
            Err(error)
        }
    }
}

Update call sites to pass parent transaction:

// put.rs:565
if subscribe && is_seeding_contract {
    // ↓ Changed: Pass parent transaction ID
    super::start_subscription_request(op_manager, id, key).await?;
    //                                              ↑ parent tx
}

Migration Path

Phase 1: Infrastructure (No Behavior Change)

Goal: Add tracking infrastructure without enforcing atomicity yet.

Changes:

  1. Add parent: Option<Ulid> to Transaction
  2. Add sub_operations and pending_finalization to OpManager
  3. Add helper methods: register_sub_operation(), all_sub_operations_completed()
  4. Wire up registration in start_subscription_request() but don't propagate errors yet

Validation:

  • All existing tests pass
  • New metrics show sub-operation relationships being tracked
  • No change to client-visible behavior

Phase 2: Enforce Atomicity for PUT+Subscribe

Goal: Make PUT with subscribe=true fully atomic.

Changes:

  1. Modify handle_op_result() to defer parent completion
  2. Update completed() to trigger parent finalization
  3. Make start_subscription_request() return errors
  4. Update PUT call sites to propagate errors

Validation:

  • New test: test_put_subscribe_atomicity() - verify PUT fails if subscribe fails
  • New test: test_put_subscribe_success() - verify both succeed together
  • River chat room creation succeeds reliably

Phase 3: Extend to GET Proximity Caching

Goal: Make GET proximity caching atomic.

Changes:

  1. Update GET call site in get.rs:953
  2. Ensure GET doesn't complete until caching subscription establishes

Validation:

  • Test: test_get_proximity_caching_atomicity()
  • Verify cached peers receive updates

Phase 4: Generalize for All Operations

Goal: Enable any operation to spawn sub-operations atomically.

Changes:

  1. Document pattern for spawning sub-operations
  2. Add helper: op_manager.spawn_sub_operation(parent, child_op)
  3. Audit all operations for potential sub-operations

Validation:

  • Integration test suite exercises all operation types
  • No regression in existing functionality

Testing Strategy

Unit Tests

#[tokio::test]
async fn test_transaction_parent_child_relationship() {
    let parent_tx = Transaction::new::<PutMsg>();
    let child_tx = Transaction::new_child_of(&parent_tx, TransactionType::Subscribe);

    assert_eq!(child_tx.parent_id(), Some(&parent_tx.id));
    assert_eq!(child_tx.transaction_type(), TransactionType::Subscribe);
}

#[tokio::test]
async fn test_sub_operation_registration() {
    let op_manager = create_test_op_manager().await;
    let parent_tx = Transaction::new::<PutMsg>();
    let child_tx = Transaction::new_child_of(&parent_tx, TransactionType::Subscribe);

    op_manager.register_sub_operation(parent_tx, child_tx);

    // Parent should not be complete yet
    assert!(!op_manager.all_sub_operations_completed(parent_tx));

    // Mark child complete
    op_manager.completed(child_tx);

    // Now parent should be ready to complete
    assert!(op_manager.all_sub_operations_completed(parent_tx));
}

#[tokio::test]
async fn test_put_subscribe_atomicity_success() {
    // Setup multi-node network
    let mut network = TestNetwork::new(3).await;

    // PUT with subscribe=true
    let result = network.nodes[0]
        .put_contract(contract, state, subscribe: true)
        .await;

    // Should succeed
    assert!(result.is_ok());

    // Verify subscription was established
    let subscriptions = network.nodes[0].get_subscriptions(&contract_key);
    assert!(subscriptions.contains(&contract_key));

    // Verify UPDATE can propagate
    let update_result = network.nodes[0]
        .update_contract(contract_key, new_state)
        .await;
    assert!(update_result.is_ok());
}

#[tokio::test]
async fn test_put_subscribe_atomicity_failure() {
    // Setup network where subscription will fail
    let mut network = TestNetwork::new_with_partition(3).await;
    network.partition_node(2);  // Isolate node 2

    // PUT with subscribe=true - should fail atomically
    let result = network.nodes[2]
        .put_contract(contract, state, subscribe: true)
        .await;

    // Should fail because subscription can't establish
    assert!(result.is_err());

    // Contract should NOT be stored (atomicity)
    let get_result = network.nodes[2].get_contract(&contract_key).await;
    assert!(get_result.is_err() || get_result.unwrap().is_none());
}

#[tokio::test]
async fn test_sub_operation_error_propagation() {
    let op_manager = create_test_op_manager().await;
    let parent_tx = Transaction::new::<PutMsg>();
    let child_tx = Transaction::new_child_of(&parent_tx, TransactionType::Subscribe);

    op_manager.register_sub_operation(parent_tx, child_tx);

    // Simulate child failure
    let error = OpError::RingError(RingError::EmptyRing);
    op_manager.sub_operation_failed(child_tx, error).await.unwrap();

    // Parent should be marked completed (with error)
    assert!(op_manager.ops.completed.contains(&parent_tx));

    // Error should be in result router
    let (tx, result) = timeout(Duration::from_millis(100),
                                op_manager.result_router_rx.recv())
        .await
        .unwrap()
        .unwrap();

    assert_eq!(tx, parent_tx);
    assert!(result.is_err());
}

#[tokio::test]
async fn test_nested_sub_operations() {
    // Parent -> Child1 -> Grandchild1
    //        -> Child2
    let op_manager = create_test_op_manager().await;
    let parent = Transaction::new::<PutMsg>();
    let child1 = Transaction::new_child_of(&parent, TransactionType::Subscribe);
    let child2 = Transaction::new_child_of(&parent, TransactionType::Get);
    let grandchild = Transaction::new_child_of(&child1, TransactionType::Get);

    op_manager.register_sub_operation(parent, child1);
    op_manager.register_sub_operation(parent, child2);
    op_manager.register_sub_operation(child1, grandchild);

    // Complete grandchild
    op_manager.completed(grandchild);
    assert!(op_manager.all_sub_operations_completed(child1));

    // Complete child1
    op_manager.completed(child1);
    assert!(!op_manager.all_sub_operations_completed(parent));  // child2 still pending

    // Complete child2
    op_manager.completed(child2);
    assert!(op_manager.all_sub_operations_completed(parent));  // Now parent can complete
}

Integration Tests

#[tokio::test(flavor = "multi_thread")]
async fn test_river_chat_room_creation_atomic() {
    // Test the actual River use case from #1765
    let gateway = TestGateway::start().await;

    // Create room with PUT + subscribe
    let room_result = gateway.client()
        .create_room("test-room", subscribe: true)
        .await;

    assert!(room_result.is_ok(), "Room creation should succeed atomically");

    // Send message immediately (requires subscription to be working)
    let msg_result = gateway.client()
        .send_message("test-room", "Hello")
        .await;

    assert!(msg_result.is_ok(), "Message send should work immediately after room creation");
}

#[tokio::test(flavor = "multi_thread")]
async fn test_multi_hop_put_subscribe_atomicity() {
    // PUT routes through multiple peers
    let mut network = TestNetwork::new(5).await;

    // Client connected to node 0, optimal location at node 4
    let contract_key = ContractKey::from_params_and_code(...);
    let optimal_peer = network.nodes[4].peer_id();

    // PUT with subscribe from node 0
    let result = network.nodes[0]
        .put_contract(contract, state, subscribe: true)
        .await;

    assert!(result.is_ok());

    // Verify subscription established at all hops
    for i in 0..=4 {
        let subs = network.nodes[i].get_subscriptions(&contract_key);
        assert!(subs.contains(&contract_key),
                "Node {} should have subscription", i);
    }

    // Update from optimal location should reach all peers
    let update_sent = network.nodes[4]
        .broadcast_update(&contract_key, new_state)
        .await;

    assert_eq!(update_sent.len(), 4, "Should broadcast to 4 other peers");
}

#[tokio::test(flavor = "multi_thread")]
async fn test_get_proximity_caching_with_subscription() {
    let mut network = TestNetwork::new(3).await;

    // Node 0 has contract, node 1 gets it
    let contract_key = network.nodes[0].store_contract(contract, state).await;

    // Node 1 performs GET (should trigger proximity caching)
    let result = network.nodes[1].get_contract(&contract_key).await;
    assert!(result.is_ok());

    // Verify node 1 subscribed (for caching)
    let node1_subs = network.nodes[1].get_subscriptions(&contract_key);
    assert!(node1_subs.contains(&contract_key));

    // Update at node 0 should propagate to node 1
    network.nodes[0].update_contract(&contract_key, new_state).await;

    // Wait for propagation
    tokio::time::sleep(Duration::from_millis(100)).await;

    // Node 1 should have updated state
    let cached_state = network.nodes[1].get_cached_state(&contract_key).await;
    assert_eq!(cached_state, new_state);
}

Alternative Approaches Considered

Alternative 1: Make Subscribe Synchronous

Idea: Wait for subscription to complete before returning from PUT.

Problems:

  • Lifetime constraints prevent spawning detached tasks
  • Would block PUT for duration of subscription (multi-hop network traversal)
  • Doesn't solve error propagation
  • Still specific to PUT+subscribe, not general

Verdict: ❌ Rejected - This is what #1765 attempted and it causes timeouts

Alternative 2: Two-Phase Operations

Idea: Return preliminary success, send final confirmation when sub-operations complete.

enum OperationStatus {
    Pending,      // Primary operation complete, sub-operations running
    Confirmed,    // All sub-operations complete
    Failed,       // Sub-operation failed
}

Problems:

  • Breaking change to client API
  • Clients must poll or listen for confirmation
  • Doesn't align with existing request-response pattern
  • Complex state management on client side

Verdict: ⚠️ Possible but complex - Defer unless atomicity proves insufficient

Alternative 3: Best-Effort Sub-Operations

Idea: Accept that sub-operations may fail silently, improve observability instead.

Changes:

  • Add metrics for sub-operation success/failure rates
  • Improve logging and tracing
  • Document that operations are eventually consistent

Problems:

  • Doesn't solve the fundamental atomicity issue
  • Still breaks ACID guarantees
  • Poor developer experience (silent failures)
  • Difficult to debug distributed state inconsistencies

Verdict: ❌ Rejected - Doesn't address the core problem

Alternative 4: Explicit Sub-Operation Tracking (Chosen Approach)

Idea: Build infrastructure for parent operations to track and await sub-operations.

Benefits:

  • ✅ General-purpose solution for all operations
  • ✅ Backward compatible
  • ✅ Explicit error propagation
  • ✅ Enables composite atomic operations
  • ✅ Improves observability (transaction hierarchies visible)

Verdict: ✅ Selected - Addresses root cause with reusable infrastructure


Performance Considerations

Overhead Analysis

Memory overhead:

  • HashMap<Transaction, HashSet<Transaction>> for sub-operation tracking
  • Estimated: ~100 bytes per parent operation with sub-operations
  • For 1000 concurrent operations: ~100KB (negligible)

Computation overhead:

  • all_sub_operations_completed(): O(n) where n = number of children
  • Typical n: 1-5 children per parent
  • Called once per child completion: acceptable

Network overhead:

  • No additional network messages
  • Same operations, just tracked differently

Latency impact:

  • Parent operations delayed until children complete
  • This is intentional - enforcing atomicity
  • Clients experience accurate operation completion time

Optimizations

Reverse index for parent lookup:

parent_of: Arc<DashMap<Transaction, Transaction>>

Reduces sub_operation_completed() from O(num_parents) to O(1).

Batch finalization:
When multiple children complete simultaneously, finalize parent in single operation.

Timeout propagation:
If child times out, immediately fail parent (don't wait for other children).


Success Criteria

Functional Requirements

  • PUT with subscribe: true completes atomically (both succeed or both fail)
  • GET proximity caching completes atomically
  • Sub-operation failures propagate to parent and client
  • Transaction hierarchies are traceable for debugging
  • No regression in existing operation functionality

Non-Functional Requirements

  • All existing tests pass
  • New integration tests cover atomic operation scenarios
  • Documentation updated

Validation Tests

  • River chat room creation succeeds reliably (real-world validation)
  • Multi-node integration tests pass
  • Chaos testing: network partitions, peer failures handled correctly
  • Load testing: 1000+ concurrent operations with sub-operations

Related Issues and PRs


Open Questions for Discussion

  1. Partial failure semantics: If parent has 3 children and 1 fails, should:

    • All children be cancelled/rolled back? (strong atomicity)
    • Or just parent fails, successful children remain? (weak atomicity)
  2. Timeout behavior: Should parent timeout = sum of all child timeouts?

    • Or should parent use the same timeout, applied to entire composite operation?
  3. Nested sub-operations: Should we support arbitrary nesting depth?

    • Or limit to 1-2 levels for simplicity?
  4. Rollback mechanism: Should failed operations undo their effects?

    • E.g., if PUT succeeds but SUBSCRIBE fails, delete the stored contract?
    • Or mark it as "locally cached only" without subscription?
  5. API changes: Should we expose transaction hierarchies in client API?

    • Useful for debugging, but adds complexity

Conclusion

The lack of atomicity for operations with network-based sub-operations is a fundamental architectural gap in Freenet. While issue #1765 correctly identified that PUT+subscribe doesn't work reliably, the problem is more general: the system has no infrastructure to ensure parent operations complete atomically with their sub-operations.

This proposal provides a comprehensive, reusable solution that:

  1. ✅ Tracks parent-child relationships between transactions
  2. ✅ Defers parent completion until all children succeed
  3. ✅ Propagates errors from children to parents and clients
  4. ✅ Enables observable, debuggable operation hierarchies
  5. ✅ Maintains backward compatibility

By building this infrastructure, we enable not just PUT+subscribe atomicity, but a general pattern for composite atomic operations that will serve future features like multi-contract transactions, replicated state, and distributed consensus operations.

Metadata

Metadata

Assignees

Labels

A-developer-xpArea: developer experienceA-networkingArea: Networking, ring protocol, peer discoveryE-hardExperience needed to fix/implement: Hard / a lotP-highHigh priorityS-needs-designStatus: Needs architectural design or RFCT-bugType: Something is broken

Type

No type

Projects

Status

Triage

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions