Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A client id assignment error occurred in a multi-computer parallel test, causing the message to fail to be forwarded #2428

Open
AR-Wang opened this issue Jul 20, 2024 · 0 comments
Labels
bug Something isn't working

Comments

@AR-Wang
Copy link

AR-Wang commented Jul 20, 2024

A client id assignment error occurred in a multi-computer parallel test, causing the message to fail to be forwarded

overview

When testing on multiple machines in parallel, there is one broker, one client that is responsible for interacting with the remote broker, and several clients that are actually used for fuzz.

When an id is assigned, a number 1 is assigned by default to the client that interacts with the remote broker, and also to the first client that is actually used for fuzz

As a result, the id conflicts. When a message is forwarded, the first message that is actually intended for fuzz client is ignored.

problem description

I used two machines, machine A and machine B, for a multi-machine parallel test

I first started A test on machine A with a real client for fuzz and a broker connected to port 38105

Launcher::builder()
            .shmem_provider(shmem_provider.clone())
            .configuration(EventConfig::from_name("default"))
            .monitor(monitor)
            .run_client(&mut run_client)
            .cores(&Cores::from_cmdline(1).expect("no core"))
            .remote_broker_addr(Some(
                //SocketAddr::new(IpAddr::V4(Ipv4Addr::new(my_ip)), 38105),
            ))
            .broker_port(38105)
            .launch_delay(2000)
            .stdout_file(
                Some(self.wfuzz_test_dir.join("output").to_str())
                    .expect("join output path fail..."),
            )
            .build()
            .launch()

I then started another test on machine B using A real fuzz client and a broker connected to port 15633 that was remotely connected to a broker on machine A.

Launcher::builder()
            .shmem_provider(shmem_provider.clone())
            .configuration(EventConfig::from_name("default"))
            .monitor(monitor)
            .run_client(&mut run_client)
            .cores(&Cores::from_cmdline(2).expect("no core"))
            .remote_broker_addr(Some(
                SocketAddr::new(IpAddr::V4(Ipv4Addr::new(machineA_ip)), 38105),
            ))
            .broker_port(15633)
            .launch_delay(2000)
            .stdout_file(
                Some(self.wfuzz_test_dir.join("output").to_str())
                    .expect("join output path fail..."),
            )
            .build()
            .launch()

According to logic,machine A can then interact with machine B in testcase, that is,machine A can send the testcase found by itself to machine B, and machine B can also send the testcase found by itself to machine A

However, I found that the interaction process was unidirectional, that is, machine A could only send its own seeds to mahcine B, but could not accept machine B's seeds. machineB can only accept seeds from mahicne A and cannot actively send seeds to machine A

I have printed the run log on machine B, as shown below. There is no doubt that client-1 is responsible for the communication with machine A, while client 2 is the real fuzz on machine B. According to reason, the message of client-2 needs to be sent to machine A. But instead of sending them, they kept printing them

“Ignored message we probably sent earlier (same id), TAG: Tag(2B0741)”

client-1  message is LlmpMsg { tag: Tag(2B0741), sender: ClientId(1), broker: BrokerId(0), flags: Flags2( FROM_B2B ), message_id: MessageId(3297), buf_len: 142, buf_len_padded: 152, buf: [] }


client-2  message is LlmpMsg { tag: Tag(2B0741), sender: ClientId(1), broker: BrokerId(0), flags: Flags0(  ), message_id: MessageId(1371), buf_len: 19, buf_len_padded: 24, buf: [] }

When I looked at the code, I realized that machine B was trying to send a message, but it was blocked by this line of code.

if client_id == b2b_client_id {
                                log::info!(
                                    "Ignored message we probably sent earlier (same id), TAG: {tag:?}"
                                );
                                continue;
                            }

I can understand the need for this line of code to avoid repeated sending and receiving of messages

But in principle, client-2 message id should be ClientId(2) instead of ClientId(1)

So in the end, I think there is a problem with your client id allocation

why problem occur

I have analyzed how you do client id assignment and I think there will be some issues with client id assignment when remote_broker_addr is present

When the test is started on machine B, a broker connected to the specified port is created in the following way, and a client 0 is started by default. Then a background thread listener is started. The function of this thread is to process new client connections. Assign an id to the client, write the client information to the message sending area of client 0, and wait for polling by the broker before the client is actually registered.

let broker = LlmpBroker::create_attach_to_tcp(
                        self.shmem_provider.clone(),
                        tuple_list!(llmp_hook),
                        self.broker_port,
                    )?;

//Finally, we come to this point and create a listener thread to listen for new connections, and this listener thread is single-threaded to handle connections
match tcp_bind(port) {
            Ok(listener) => {
                let mut broker =
                    LlmpBrokerInner::with_keep_pages(shmem_provider, keep_pages_forever)?;
                let _listener_thread = broker.launch_listener(Listener::Tcp(listener))?;
                Ok(broker)
            }
            Err(e) => Err(e),
        }

// launch_listener logic creates a client 0 by default
// By default, the number 1 is used as the number of the first connected client
pub fn launch_listener(&mut self, listener: Listener) -> Result<thread::JoinHandle<()>, Error> {
        //todo  A client 0 (num_clients_seen is 0 initially) is launched to handle client connections
        let llmp_tcp_id = self.peek_next_client_id();

        // Tcp out map sends messages from background thread tcp server to foreground client
        let tcp_out_shmem = LlmpSharedMap::new(
            llmp_tcp_id,
            self.shmem_provider.new_shmem(LLMP_CFG_INITIAL_MAP_SIZE)?,
        );
        let tcp_out_shmem_description = tcp_out_shmem.shmem.description();
        let listener_id = self.register_client(tcp_out_shmem);

        let ret = thread::spawn(move || {
            // Create a new ShMemProvider for this background thread.
            let mut shmem_provider_bg = SP::new().unwrap();

            //todo By default, the cid of the client for the first connection is 1, and the CID of the client will be increased sequentially. Because the connection is processed by a single thread, there is no problem.
            //todo Because another variable is used to hold num_clients_seen, the change in num_clients_seen will not be visible to subsequent steps
            let mut current_client_id = ClientId(llmp_tcp_id.0 + 1);

            let mut tcp_incoming_sender = LlmpSender {
                id: llmp_tcp_id,
                last_msg_sent: ptr::null_mut(),
                out_shmems: vec![LlmpSharedMap::existing(
                    shmem_provider_bg
                        .shmem_from_description(tcp_out_shmem_description)
                        .unwrap(),
                )],
                // drop pages to the broker, if it already read them.
                keep_pages_forever: false,
                has_unsent_message: false,
                shmem_provider: shmem_provider_bg.clone(),
                unused_shmem_cache: vec![],
            };

            loop {
                match listener.accept() {
                    ListenerStream::Tcp(mut stream, addr) => {
                        //todo Here the default is 1 as the number of the next client.
                        Self::handle_tcp_request(
                            stream,
                            &req,
                            &mut current_client_id,
                            &mut tcp_incoming_sender,
                            &broker_shmem_description,
                        );
                    }
                    ListenerStream::Empty() => {
                        continue;
                    }
                };
            }
        });
        
        self.listeners.push(listener_id);

        Ok(ret)
    }


 fn handle_tcp_request(
        mut stream: TcpStream,
        request: &TcpRequest,
        current_client_id: &mut ClientId,
        sender: &mut LlmpSender<SP>,
        broker_shmem_description: &ShMemDescription,
    ) {
        match request {
            TcpRequest::ClientQuit { client_id } => {
                // todo search the ancestor_id and remove it.
                match Self::announce_client_exit(sender, client_id.0) {
                    Ok(()) => (),
                    Err(e) => log::info!("Error announcing client exit: {e:?}"),
                }
            }
            TcpRequest::LocalClientHello { shmem_description } => {
                match Self::announce_new_client(sender, shmem_description) {
                    Ok(()) => (),
                    Err(e) => log::info!("Error forwarding client on map: {e:?}"),
                };
                //todo here takes 1 as the number of the first fuzz client and sends this number to the client
//todo The client will use this ID to create a corresponding sender.
                if let Err(e) = send_tcp_msg(
                    &mut stream,
                    &TcpResponse::LocalClientAccepted {
                        client_id: *current_client_id,
                    },
                ) {
                    log::info!("An error occurred sending via tcp {e}");
                };
                //increase
                current_client_id.0 += 1;
            }
            TcpRequest::RemoteBrokerHello { hostname } => {
                log::info!("B2B new client: {hostname}");

                // TODO: Clean up broker ids.
                if send_tcp_msg(
                    &mut stream,
                    &TcpResponse::RemoteBrokerAccepted {
                        broker_id: BrokerId(current_client_id.0),
                    },
                )
                .is_err()
                {
                    log::info!("Error accepting broker, ignoring.");
                    return;
                }

                if let Ok(shmem_description) =
                    Self::b2b_thread_on(stream, *current_client_id, broker_shmem_description)
                {
                    if Self::announce_new_client(sender, &shmem_description).is_err() {
                        log::info!("B2B: Error announcing client {shmem_description:?}");
                    };
                    current_client_id.0 += 1;
                }
            }
        };
    }

The problem, however, is that with multiple machines in parallel, there is an additional process to handle the actual client connection, which is to connect to the specified remote broker

if let Some(remote_broker_addr) = remote_broker_addr {
                    log::info!("B2b: Connecting to {:?}", &remote_broker_addr);
                    broker.inner_mut().connect_b2b(remote_broker_addr)?;
                };


// The processing logic that connects to the remote broker creates an additional client that handles the information interaction with the remote broker
// The default id of the client is self.peek_next_client_id().
// self.peek_next_client_id() must be 1 because only client 0 has registered with the broker
// At most, the other clients register with client 0 and the information is written to the sender of client 0. After this method is complete, the broker polls all clients for messages
pub fn connect_b2b<A>(&mut self, addr: A) -> Result<(), Error>
    where
        A: ToSocketAddrs,
    {
        let mut stream = TcpStream::connect(addr)?;
        log::info!("B2B: Connected to {stream:?}");

        match recv_tcp_msg(&mut stream)?.try_into()? {
            TcpResponse::BrokerConnectHello {
                broker_shmem_description: _,
                hostname,
            } => log::info!("B2B: Connected to {hostname}"),
            _ => {
                return Err(Error::illegal_state(
                    "Unexpected response from B2B server received.".to_string(),
                ))
            }
        };

        let hostname = hostname::get()
            .unwrap_or_else(|_| "<unknown>".into())
            .to_string_lossy()
            .into();

        send_tcp_msg(&mut stream, &TcpRequest::RemoteBrokerHello { hostname })?;

        let broker_id = match recv_tcp_msg(&mut stream)?.try_into()? {
            TcpResponse::RemoteBrokerAccepted { broker_id } => {
                log::info!("B2B: Got Connection Ack, broker_id {broker_id:?}");
                broker_id
            }
            _ => {
                return Err(Error::illegal_state(
                    "Unexpected response from B2B server received.".to_string(),
                ));
            }
        };

        // TODO: use broker ids!
        log::info!("B2B: We are broker {broker_id:?}");

        // TODO: handle broker_ids properly/at all.
        let map_description = Self::b2b_thread_on(
            stream,
             // self.peek_next_client_id() is used by default, which must be 1.
             // Because at this point, there will only be one client, client 0
             // Other clients must wait for connect_b2b to end before they are registered
            self.peek_next_client_id(),
            &self
                .llmp_out
                .out_shmems
                .first()
                .unwrap()
                .shmem
                .description(),
        )?;

        let new_shmem = LlmpSharedMap::existing(
            self.shmem_provider
                .shmem_from_description(map_description)?,
        );

        {
            self.register_client(new_shmem);
        }

        Ok(())
    }

how solution

pub fn launch_listener(&mut self, listener: Listener) -> Result<thread::JoinHandle<()>, Error> {
        let ret = thread::spawn(move || {
            // Create a new ShMemProvider for this background thread.
            let mut shmem_provider_bg = SP::new().unwrap();

            //todo Modify this by default to set the number of the first connected client to 2 and reserve number 1 for the client communicating with the broker
            let mut current_client_id = ClientId(llmp_tcp_id.0 + 2);

            let mut tcp_incoming_sender = LlmpSender {
                id: llmp_tcp_id,
                last_msg_sent: ptr::null_mut(),
                out_shmems: vec![LlmpSharedMap::existing(
                    shmem_provider_bg
                        .shmem_from_description(tcp_out_shmem_description)
                        .unwrap(),
                )],
                // drop pages to the broker, if it already read them.
                keep_pages_forever: false,
                has_unsent_message: false,
                shmem_provider: shmem_provider_bg.clone(),
                unused_shmem_cache: vec![],
            };
        });

        self.listeners.push(listener_id);

        Ok(ret)
}

Of course, this method may not be suitable, maybe you have a better method

@AR-Wang AR-Wang added the bug Something isn't working label Jul 20, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

1 participant