Skip to content

[Fix] Integer overflow in network frontend causes premature termination of simulation with empty end-to-end results #127

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ class ASTRASimNetwork : public AstraSim::AstraNetworkAPI {
AstraSim::RecvPacketEventHadndlerData* ehd = (AstraSim::RecvPacketEventHadndlerData*) t.fun_arg;
AstraSim::EventType event = ehd->event;
tag = ehd->flowTag.tag_id;
NcclLog->writeLog(NcclLogLevel::DEBUG,"接收事件注册 src %d sim_recv on rank %d tag_id %d channdl id %d",src,rank,tag,ehd->flowTag.channel_id);
NcclLog->writeLog(NcclLogLevel::DEBUG,"[Receive event registration] src %d sim_recv on rank %d tag_id %d channdl id %d",src,rank,tag,ehd->flowTag.channel_id);

if (recvHash.find(make_pair(tag, make_pair(t.src, t.dest))) !=
recvHash.end()) {
Expand Down Expand Up @@ -190,12 +190,12 @@ class ASTRASimNetwork : public AstraSim::AstraNetworkAPI {
if (expeRecvHash.find(make_pair(tag, make_pair(t.src, t.dest))) ==
expeRecvHash.end()) {
expeRecvHash[make_pair(tag, make_pair(t.src, t.dest))] = t;
NcclLog->writeLog(NcclLogLevel::DEBUG," 网络包后到,先进行注册 recvHash do not find expeRecvHash.new make src %d dest %d t.count: %d channel_id %d current_flow_id %d",t.src,t.dest,t.count,tag,flowTag.current_flow_id);
NcclLog->writeLog(NcclLogLevel::DEBUG," [Packet arrived late, registering first] recvHash do not find expeRecvHash.new make src %d dest %d t.count: %llu channel_id %d current_flow_id %d",t.src,t.dest,t.count,tag,flowTag.current_flow_id);

} else {
uint64_t expecount =
expeRecvHash[make_pair(tag, make_pair(t.src, t.dest))].count;
NcclLog->writeLog(NcclLogLevel::DEBUG," 网络包后到,重复注册 recvHash do not find expeRecvHash.add make src %d dest %d expecount: %d t.count: %d tag_id %d current_flow_id %d",t.src,t.dest,expecount,t.count,tag,flowTag.current_flow_id);
NcclLog->writeLog(NcclLogLevel::DEBUG," [Packet arrived late, re-registering] recvHash do not find expeRecvHash.add make src %d dest %d expecount: %d t.count: %d tag_id %d current_flow_id %d",t.src,t.dest,expecount,t.count,tag,flowTag.current_flow_id);

}
}
Expand Down
22 changes: 11 additions & 11 deletions astra-sim-alibabacloud/astra-sim/network_frontend/ns3/entry.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ struct task1 {
double schTime;
};
map<std::pair<int, std::pair<int, int>>, struct task1> expeRecvHash;
map<std::pair<int, std::pair<int, int>>, int> recvHash;
map<std::pair<int, std::pair<int, int>>, uint64_t> recvHash;
map<std::pair<int, std::pair<int, int>>, struct task1> sentHash;
map<std::pair<int, int>, int64_t> nodeHash;
map<std::pair<int,std::pair<int,int>>,int> waiting_to_sent_callback;
Expand Down Expand Up @@ -139,8 +139,8 @@ void SendFlow(int src, int dst, uint64_t maxPacketCount,
flow_input.idx++;
if(real_PacketCount == 0) real_PacketCount = 1;
MockNcclLog* NcclLog = MockNcclLog::getInstance();
NcclLog->writeLog(NcclLogLevel::DEBUG," 发包事件 %dSendFlow to %d channelid: %d flow_id %d srcip %d dstip %d size: %d at the tick: %d",src,dst,tag,flow_id,serverAddress[src],serverAddress[dst],maxPacketCount,AstraSim::Sys::boostedTick());
NcclLog->writeLog(NcclLogLevel::DEBUG," request->flowTag 发包事件 %dSendFlow to %d tag_id: %d flow_id %d srcip %d dstip %d size: %d at the tick: %d",request->flowTag.sender_node,request->flowTag.receiver_node,request->flowTag.tag_id,request->flowTag.current_flow_id,serverAddress[src],serverAddress[dst],maxPacketCount,AstraSim::Sys::boostedTick());
NcclLog->writeLog(NcclLogLevel::DEBUG," [Packet sending event] %dSendFlow to %d channelid: %d flow_id %d srcip %d dstip %d size: %llu at the tick: %d",src,dst,tag,flow_id,serverAddress[src],serverAddress[dst],maxPacketCount,AstraSim::Sys::boostedTick());
NcclLog->writeLog(NcclLogLevel::DEBUG," request->flowTag [Packet sending event] %dSendFlow to %d tag_id: %d flow_id %d srcip %d dstip %d size: %llu at the tick: %d",request->flowTag.sender_node,request->flowTag.receiver_node,request->flowTag.tag_id,request->flowTag.current_flow_id,serverAddress[src],serverAddress[dst],maxPacketCount,AstraSim::Sys::boostedTick());
RdmaClientHelper clientHelper(
pg, serverAddress[src], serverAddress[dst], port, dport, real_PacketCount,
has_win ? (global_t == 1 ? maxBdp : pairBdp[n.Get(src)][n.Get(dst)]) : 0,
Expand Down Expand Up @@ -170,17 +170,17 @@ void notify_receiver_receive_data(int sender_node, int receiver_node,
MtpInterface::explicitCriticalSection cs;
#endif
MockNcclLog* NcclLog = MockNcclLog::getInstance();
NcclLog->writeLog(NcclLogLevel::DEBUG," %d notify recevier: %d message size: %d",sender_node,receiver_node,message_size);
NcclLog->writeLog(NcclLogLevel::DEBUG," %d notify recevier: %d message size: %llu",sender_node,receiver_node,message_size);
int tag = flowTag.tag_id;
if (expeRecvHash.find(make_pair(
tag, make_pair(sender_node, receiver_node))) != expeRecvHash.end()) {
task1 t2 =
expeRecvHash[make_pair(tag, make_pair(sender_node, receiver_node))];
MockNcclLog* NcclLog = MockNcclLog::getInstance();
NcclLog->writeLog(NcclLogLevel::DEBUG," %d notify recevier: %d message size: %d t2.count: %d channle id: %d",sender_node,receiver_node,message_size,t2.count,flowTag.channel_id);
NcclLog->writeLog(NcclLogLevel::DEBUG," %d notify recevier: %d message size: %llu t2.count: %llu channle id: %d",sender_node,receiver_node,message_size,t2.count,flowTag.channel_id);
AstraSim::RecvPacketEventHadndlerData* ehd = (AstraSim::RecvPacketEventHadndlerData*) t2.fun_arg;
if (message_size == t2.count) {
NcclLog->writeLog(NcclLogLevel::DEBUG," message_size = t2.count expeRecvHash.erase %d notify recevier: %d message size: %d channel_id %d",sender_node,receiver_node,message_size,tag);
NcclLog->writeLog(NcclLogLevel::DEBUG," message_size = t2.count expeRecvHash.erase %d notify recevier: %d message size: %llu channel_id %d",sender_node,receiver_node,message_size,tag);
expeRecvHash.erase(make_pair(tag, make_pair(sender_node, receiver_node)));
#ifdef NS3_MTP
cs.ExitSection();
Expand All @@ -192,7 +192,7 @@ void notify_receiver_receive_data(int sender_node, int receiver_node,
} else if (message_size > t2.count) {
recvHash[make_pair(tag, make_pair(sender_node, receiver_node))] =
message_size - t2.count;
NcclLog->writeLog(NcclLogLevel::DEBUG,"message_size > t2.count expeRecvHash.erase %d notify recevier: %d message size: %d channel_id %d",sender_node,receiver_node,message_size,tag);
NcclLog->writeLog(NcclLogLevel::DEBUG,"message_size > t2.count expeRecvHash.erase %d notify recevier: %d message size: %llu channel_id %d",sender_node,receiver_node,message_size,tag);
expeRecvHash.erase(make_pair(tag, make_pair(sender_node, receiver_node)));
#ifdef NS3_MTP
cs.ExitSection();
Expand Down Expand Up @@ -299,7 +299,7 @@ void notify_sender_packet_arrivered_receiver(int sender_node, int receiver_node,
void qp_finish(FILE *fout, Ptr<RdmaQueuePair> q) {
uint32_t sid = ip_to_node_id(q->sip), did = ip_to_node_id(q->dip);
uint64_t base_rtt = pairRtt[sid][did], b = pairBw[sid][did];
uint32_t total_bytes =
uint64_t total_bytes =
q->m_size +
((q->m_size - 1) / packet_payload_size + 1) *
(CustomHeader::GetStaticWholeHeaderSize() -
Expand All @@ -320,7 +320,7 @@ void qp_finish(FILE *fout, Ptr<RdmaQueuePair> q) {
Ptr<RdmaDriver> rdma = dstNode->GetObject<RdmaDriver>();
rdma->m_rdma->DeleteRxQp(q->sip.Get(), q->m_pg, q->sport);
MockNcclLog* NcclLog = MockNcclLog::getInstance();
NcclLog->writeLog(NcclLogLevel::DEBUG,"qp finish, src: %d did: %d port: %d total bytes: %d at the tick: %d",sid,did,q->sport,q->m_size,AstraSim::Sys::boostedTick());
NcclLog->writeLog(NcclLogLevel::DEBUG,"qp finish, src: %d did: %d port: %d total bytes: %llu at the tick: %d",sid,did,q->sport,q->m_size,AstraSim::Sys::boostedTick());
if (sender_src_port_map.find(make_pair(q->sport, make_pair(sid, did))) ==
sender_src_port_map.end()) {
NcclLog->writeLog(NcclLogLevel::ERROR,"could not find the tag, there must be something wrong");
Expand Down Expand Up @@ -348,8 +348,8 @@ void send_finish(FILE *fout, Ptr<RdmaQueuePair> q) {
uint32_t sid = ip_to_node_id(q->sip), did = ip_to_node_id(q->dip);
AstraSim::ncclFlowTag flowTag;
MockNcclLog* NcclLog = MockNcclLog::getInstance();
NcclLog->writeLog(NcclLogLevel::DEBUG,"数据包出发送网卡 send finish, src: %d did: %d port: %d srcip %d dstip %d total bytes: %d at the tick: %d",sid,did,q->sport,q->sip,q->dip,q->m_size,AstraSim::Sys::boostedTick());
int all_sent_chunksize;
NcclLog->writeLog(NcclLogLevel::DEBUG,"[Packet sent from NIC] send finish, src: %d did: %d port: %d srcip %d dstip %d total bytes: %llu at the tick: %d",sid,did,q->sport,q->sip,q->dip,q->m_size,AstraSim::Sys::boostedTick());
uint64_t all_sent_chunksize;
{
#ifdef NS3_MTP
MtpInterface::explicitCriticalSection cs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ simai_send_finish(AstraSim::ncclFlowTag flowTag) {
MockNcclLog* NcclLog = MockNcclLog::getInstance();
NcclLog->writeLog(
NcclLogLevel::DEBUG,
" 数据包出网卡队列, src %d did %d total_bytes %lu channel_id %d flow_id %d tag_id %d",
" [Packet dequeued from NIC TX queue], src %d did %d total_bytes %lu channel_id %d flow_id %d tag_id %d",
sid,
did,
flowTag.flow_size,
Expand Down