Skip to content

Commit

Permalink
Fix transaction state race condition #3
Browse files Browse the repository at this point in the history
Fix transaction state race condition
  • Loading branch information
yuasatakayuki authored Oct 23, 2021
2 parents 1279722 + 3b4c065 commit 396dc5c
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 99 deletions.
66 changes: 36 additions & 30 deletions includes/RMAPEngine.hh
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@
#include "SpaceWireIF.hh"
#include "SpaceWireUtilities.hh"

#include <memory>
#include <mutex>

class RMAPEngineStoppedAction: public CxxUtilities::Action<void> {
public:
virtual ~RMAPEngineStoppedAction() {
Expand Down Expand Up @@ -134,19 +137,20 @@ public:
class RMAPTargetProcessThread: public CxxUtilities::Thread {
private:
RMAPTargetAccessAction* rmapTargetAcessAction;
RMAPTransaction rmapTransaction;
std::unique_ptr<RMAPTransaction> rmapTransaction;
RMAPEngine* rmapEngine;

private:
bool isCompleted_;

public:
RMAPTargetProcessThread(RMAPEngine* rmapEngine, RMAPTransaction rmapTransaction,
RMAPTargetProcessThread(RMAPEngine* rmapEngine, std::unique_ptr<RMAPTransaction>&& rmapTransaction,
RMAPTargetAccessAction* rmapTargetAcessAction) :
CxxUtilities::Thread() {
this->rmapEngine = rmapEngine;
this->rmapTargetAcessAction = rmapTargetAcessAction;
this->rmapTransaction = rmapTransaction;
CxxUtilities::Thread(),
rmapTargetAcessAction(rmapTargetAcessAction),
rmapTransaction(std::move(rmapTransaction)),
rmapEngine(rmapEngine)
{
isCompleted_ = false;
}

Expand All @@ -155,28 +159,28 @@ public:
using namespace std;
isCompleted_ = false;
try {
rmapTargetAcessAction->processTransaction(&rmapTransaction);
rmapTransaction.setState(RMAPTransaction::ReplySet);
rmapTargetAcessAction->processTransaction(rmapTransaction.get());
rmapTransaction->setState(RMAPTransaction::ReplySet);
} catch (...) {
delete rmapTransaction.commandPacket;
delete rmapTransaction->commandPacket;
rmapEngine->receivedCommandPacketDiscarded();
isCompleted_ = true;
return;
}
try {
rmapTransaction.replyPacket->constructPacket();
rmapEngine->sendPacket(rmapTransaction.replyPacket->getPacketBufferPointer());
rmapTransaction.setState(RMAPTransaction::ReplySent);
rmapTransaction->replyPacket->constructPacket();
rmapEngine->sendPacket(rmapTransaction->replyPacket->getPacketBufferPointer());
rmapTransaction->setState(RMAPTransaction::ReplySent);
} catch (...) {
rmapTargetAcessAction->transactionReplyCouldNotBeSent(&rmapTransaction);
rmapTargetAcessAction->transactionReplyCouldNotBeSent(rmapTransaction.get());
rmapEngine->replyToReceivedCommandPacketCouldNotBeSent();
delete rmapTransaction.commandPacket;
delete rmapTransaction->commandPacket;
isCompleted_ = true;
return;
}
rmapTargetAcessAction->transactionWillComplete(&rmapTransaction);
rmapTransaction.setState(RMAPTransaction::ReplyCompleted);
delete rmapTransaction.commandPacket;
rmapTargetAcessAction->transactionWillComplete(rmapTransaction.get());
rmapTransaction->setState(RMAPTransaction::ReplyCompleted);
delete rmapTransaction->commandPacket;
isCompleted_ = true;
}

Expand Down Expand Up @@ -354,14 +358,15 @@ private:
rmapTargetProcessThreads = newRMAPTargetProcessThreads;

//find an RMAPTarget instance which can accept the accessed address range
RMAPTransaction rmapTransaction;
rmapTransaction.commandPacket = commandPacket;
rmapTransaction.setState(RMAPTransaction::CommandPacketReceived);
auto rmapTransaction = std::unique_ptr<RMAPTransaction>(new RMAPTransaction);
rmapTransaction->commandPacket = commandPacket;
rmapTransaction->setState(RMAPTransaction::CommandPacketReceived);
for (size_t i = 0; i < rmapTargets.size(); i++) {
RMAPTargetAccessAction* rmapTargetAcessAction = rmapTargets[i]->getCorrespondingRMAPTargetAccessAction(
&rmapTransaction);
rmapTransaction.get());
if (rmapTargetAcessAction != NULL) {
RMAPTargetProcessThread* aThread = new RMAPTargetProcessThread(this, rmapTransaction, rmapTargetAcessAction);
RMAPTargetProcessThread* aThread =
new RMAPTargetProcessThread(this, std::move(rmapTransaction), rmapTargetAcessAction);
aThread->start();
rmapTargetProcessThreads.push_back(aThread);
return;
Expand Down Expand Up @@ -405,12 +410,10 @@ private:
//register reply packet to the resolved transaction
transaction->replyPacket = packet;
//update transaction state
/*while(transaction->getState()!=RMAPTransaction::CommandSent and transaction->getState()!=RMAPTransaction::Initiated){
cout << "RMAPEngine::rmapReplyPacketReceived(): Waiting" << endl;
cout << transaction->getState() << endl;
c.wait(100);
}*/
transaction->setState(RMAPTransaction::ReplyReceived);
{
std::lock_guard<std::mutex> stateGuard(transaction->stateMutex);
transaction->setState(RMAPTransaction::ReplyReceived);
}
if (!transaction->isNonblockingMode) {
transaction->getCondition()->signal();
}
Expand Down Expand Up @@ -537,8 +540,11 @@ public:
commandPacket->setTransactionID(transactionID);
commandPacket->constructPacket();
if (isStarted()) {
sendPacket(commandPacket->getPacketBufferPointer());
transaction->state = RMAPTransaction::Initiated;
{
std::lock_guard<std::mutex> stateGuard(transaction->stateMutex);
sendPacket(commandPacket->getPacketBufferPointer());
transaction->state = RMAPTransaction::Initiated;
}
} else {
throw RMAPEngineException(RMAPEngineException::RMAPEngineIsNotStarted);
}
Expand Down
56 changes: 14 additions & 42 deletions includes/RMAPInitiator.hh
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,8 @@ private:
RMAPEngine* rmapEngine;
RMAPPacket* commandPacket;
RMAPPacket* replyPacket;
CxxUtilities::Mutex mutex;

CxxUtilities::Mutex deleteReplyPacketMutex;
std::mutex transactionMutex;
std::mutex deleteReplyPacketMutex;

private:
//optional DB
Expand Down Expand Up @@ -192,27 +191,12 @@ public:

public:
void deleteReplyPacket() {
deleteReplyPacketMutex.lock();
std::lock_guard<std::mutex> guard(deleteReplyPacketMutex);
if (replyPacket == NULL) {
deleteReplyPacketMutex.unlock();
return;
}
delete replyPacket;
replyPacket = NULL;
deleteReplyPacketMutex.unlock();
}

public:
void lock() {
using namespace std;
// cout << "RMAPInitiator Locking a mutex." << endl;
mutex.lock();
}

void unlock() {
using namespace std;
// cout << "RMAPInitiator UnLocking a mutex." << endl;
mutex.unlock();
}

public:
Expand Down Expand Up @@ -300,7 +284,7 @@ public:
double timeoutDuration = DefaultTimeoutDuration) throw (RMAPEngineException, RMAPInitiatorException,
RMAPReplyException) {
using namespace std;
lock();
std::lock_guard<std::mutex> guard(transactionMutex);
transaction.isNonblockingMode = false;
if (replyPacket != NULL) {
deleteReplyPacket();
Expand Down Expand Up @@ -330,11 +314,9 @@ public:
try {
rmapEngine->initiateTransaction(transaction);
} catch (RMAPEngineException& e) {
unlock();
transaction.state = RMAPTransaction::NotInitiated;
throw RMAPInitiatorException(RMAPInitiatorException::RMAPTransactionCouldNotBeInitiated);
} catch (...) {
unlock();
transaction.state = RMAPTransaction::NotInitiated;
throw RMAPInitiatorException(RMAPInitiatorException::RMAPTransactionCouldNotBeInitiated);
}
Expand All @@ -344,28 +326,24 @@ public:
transaction.replyPacket = NULL;
if (replyPacket->getStatus() != RMAPReplyStatus::CommandExcecutedSuccessfully) {
uint8_t replyStatus = replyPacket->getStatus();
unlock();
transaction.state = RMAPTransaction::NotInitiated;
deleteReplyPacket();
throw RMAPReplyException(replyStatus);
}
if (length < replyPacket->getDataBuffer()->size()) {
unlock();
transaction.state = RMAPTransaction::NotInitiated;
deleteReplyPacket();
throw RMAPInitiatorException(RMAPInitiatorException::ReadReplyWithInsufficientData);
}
replyPacket->getData(buffer, length);
transaction.state = RMAPTransaction::NotInitiated;
unlock();
//when successful, replay packet is retained until next transaction for inspection by user application
//deleteReplyPacket();
return;
} else {
//cancel transaction (return transaction ID)
rmapEngine->cancelTransaction(&transaction);
transaction.state = RMAPTransaction::NotInitiated;
unlock();
deleteReplyPacket();
throw RMAPInitiatorException(RMAPInitiatorException::Timeout);
}
Expand Down Expand Up @@ -427,7 +405,7 @@ public:
void nonblockingRead(RMAPTargetNode* rmapTargetNode, uint32_t memoryAddress, uint32_t length)
throw (RMAPEngineException, RMAPInitiatorException, RMAPReplyException) {
using namespace std;
lock();
std::lock_guard<std::mutex> guard(transactionMutex);
transaction.isNonblockingMode = true;
if (replyPacket != NULL) {
deleteReplyPacket();
Expand Down Expand Up @@ -456,15 +434,12 @@ public:
}
try {
rmapEngine->initiateTransaction(transaction);
unlock();
return;
} catch (RMAPEngineException& e) {
transaction.state = RMAPTransaction::NotInitiated;
unlock();
throw RMAPInitiatorException(RMAPInitiatorException::RMAPTransactionCouldNotBeInitiated);
} catch (...) {
transaction.state = RMAPTransaction::NotInitiated;
unlock();
throw RMAPInitiatorException(RMAPInitiatorException::RMAPTransactionCouldNotBeInitiated);
}
}
Expand Down Expand Up @@ -589,7 +564,7 @@ public:
void write(RMAPTargetNode *rmapTargetNode, uint32_t memoryAddress, uint8_t *data, uint32_t length,
double timeoutDuration = DefaultTimeoutDuration) throw (RMAPEngineException, RMAPInitiatorException,
RMAPReplyException) {
lock();
std::lock_guard<std::mutex> guard(transactionMutex);
transaction.isNonblockingMode = false;
if (replyPacket != NULL) {
deleteReplyPacket();
Expand Down Expand Up @@ -625,11 +600,9 @@ public:
if (!replyMode) { //if reply is not expected
if (transaction.state == RMAPTransaction::Initiated) {
transaction.state = RMAPTransaction::Initiated;
unlock();
return;
} else {
transaction.state = RMAPTransaction::NotInitiated;
unlock();
//command was not sent successfully
throw RMAPInitiatorException(RMAPInitiatorException::RMAPTransactionCouldNotBeInitiated);
}
Expand All @@ -638,9 +611,9 @@ public:

//if reply is expected
transaction.condition.wait(timeoutDuration);
if (transaction.state == RMAPTransaction::CommandSent) {
switch(transaction.state){
case RMAPTransaction::CommandSent:
if (replyMode) {
unlock();
//cancel transaction (return transaction ID)
rmapEngine->cancelTransaction(&transaction);
//reply packet is not created, and therefore the line below is not necessary
Expand All @@ -649,39 +622,38 @@ public:
throw RMAPInitiatorException(RMAPInitiatorException::Timeout);
} else {
transaction.state = RMAPTransaction::NotInitiated;
unlock();
return;
}
} else if (transaction.state == RMAPTransaction::ReplyReceived) {
break;
case RMAPTransaction::ReplyReceived:
replyPacket = transaction.replyPacket;
transaction.replyPacket = NULL;
if (replyPacket->getStatus() != RMAPReplyStatus::CommandExcecutedSuccessfully) {
uint8_t replyStatus = replyPacket->getStatus();
unlock();
deleteReplyPacket();
transaction.state = RMAPTransaction::NotInitiated;
throw RMAPReplyException(replyStatus);
}
if (replyPacket->getStatus() == RMAPReplyStatus::CommandExcecutedSuccessfully) {
unlock();
//When successful, replay packet is retained until next transaction for inspection by user application
//deleteReplyPacket();
transaction.state = RMAPTransaction::NotInitiated;
return;
} else {
uint8_t replyStatus = replyPacket->getStatus();
unlock();
deleteReplyPacket();
transaction.state = RMAPTransaction::NotInitiated;
throw RMAPReplyException(replyStatus);
}
} else if (transaction.state == RMAPTransaction::Timeout) {
unlock();
break;
case RMAPTransaction::Timeout: // fallthrough
default:
//cancel transaction (return transaction ID)
rmapEngine->cancelTransaction(&transaction);
deleteReplyPacket();
transaction.state = RMAPTransaction::NotInitiated;
throw RMAPInitiatorException(RMAPInitiatorException::Timeout);
break;
}
}

Expand Down
41 changes: 14 additions & 27 deletions includes/RMAPTransaction.hh
Original file line number Diff line number Diff line change
Expand Up @@ -37,38 +37,28 @@
#include "CxxUtilities/CxxUtilities.hh"
#include "RMAPPacket.hh"

#include <mutex>

class RMAPTransaction {
public:
uint8_t targetLogicalAddress;
uint8_t initiatorLogicalAddress;
uint16_t transactionID;
uint32_t transactionIDMode;
static constexpr double DefaultTimeoutDuration = 1000.0;

uint8_t targetLogicalAddress{};
uint8_t initiatorLogicalAddress{};
uint16_t transactionID{};
uint32_t transactionIDMode = AutoTransactionID;
CxxUtilities::Condition condition;
double timeoutDuration;
uint32_t state;
double timeoutDuration = DefaultTimeoutDuration;
uint32_t state{};
std::mutex stateMutex;
bool isNonblockingMode = false;
RMAPPacket* commandPacket{};
RMAPPacket* replyPacket{};

public:
enum {
AutoTransactionID = 0x00, ManualTransactionID = 0x01
};

public:
bool isNonblockingMode;

public:
RMAPPacket* commandPacket;
RMAPPacket* replyPacket;

public:
RMAPTransaction() {
timeoutDuration = DefaultTimeoutDuration;
transactionIDMode = AutoTransactionID;
replyPacket = NULL;
commandPacket = NULL;
isNonblockingMode = false;
}

public:
enum {
//for RMAPInitiator-related transaction
NotInitiated = 0x00,
Expand All @@ -84,9 +74,6 @@ public:
ReplyCompleted = 0x14
};

public:
static constexpr double DefaultTimeoutDuration = 1000;

public:
RMAPPacket* getCommandPacket() const {
return commandPacket;
Expand Down
1 change: 1 addition & 0 deletions test/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
build
Loading

0 comments on commit 396dc5c

Please sign in to comment.