Skip to content

Commit

Permalink
Merge pull request #1 from smonkewitz/qserv
Browse files Browse the repository at this point in the history
Qserv XrdClient fixes
  • Loading branch information
wangd committed Sep 24, 2013
2 parents 18bf11a + e0792ca commit dad842b
Show file tree
Hide file tree
Showing 12 changed files with 105 additions and 165 deletions.
12 changes: 6 additions & 6 deletions src/XrdClient/XrdClient.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const char *XrdClientCVSID = "$Id$";
#include <signal.h>


XrdSysSemWait XrdClient::fConcOpenSem(DFLT_MAXCONCURRENTOPENS);
XrdSysSemaphore XrdClient::fConcOpenSem(DFLT_MAXCONCURRENTOPENS);

//_____________________________________________________________________________
// Calls the Open func in order to parallelize the Open requests
Expand Down Expand Up @@ -1154,8 +1154,7 @@ bool XrdClient::LowOpen(const char *file, kXR_unt16 mode, kXR_unt16 options,
// Send a kXR_open request in order to open the remote file
ClientRequest openFileRequest;

char buf[1024];
struct ServerResponseBody_Open *openresp = (struct ServerResponseBody_Open *)buf;
struct ServerResponseBody_Open *openresp = 0;

memset(&openFileRequest, 0, sizeof(openFileRequest));

Expand All @@ -1174,8 +1173,8 @@ bool XrdClient::LowOpen(const char *file, kXR_unt16 mode, kXR_unt16 options,

// Send request to server and receive response
bool resp = fConnModule->SendGenCommand(&openFileRequest,
(const void *)finalfilename.c_str(),
0, openresp, false, (char *)"Open");
(const void *)finalfilename.c_str(),
(void **)&openresp, 0, true, (char *)"Open");

if (resp && (fConnModule->LastServerResp.status == 0)) {
// Get the file handle to use for future read/write...
Expand Down Expand Up @@ -1206,7 +1205,8 @@ bool XrdClient::LowOpen(const char *file, kXR_unt16 mode, kXR_unt16 options,
}

}

// Free memory allocated by SendGenCommand
free(openresp);

return fOpenPars.opened;
}
Expand Down
4 changes: 2 additions & 2 deletions src/XrdClient/XrdClient.hh
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
#include "XrdClient/XrdClientAbs.hh"
#include "XrdOuc/XrdOucString.hh"
#include "XrdClient/XrdClientThread.hh"
#include "XrdSys/XrdSysSemWait.hh"
#include "XrdSys/XrdSysPthread.hh"
#include "XrdVersion.hh"
#include <vector>
#include <string>
Expand Down Expand Up @@ -114,7 +114,7 @@ private:
XrdClientThread *fOpenerTh;

// Used to limit the maximum number of concurrent opens
static XrdSysSemWait fConcOpenSem;
static XrdSysSemaphore fConcOpenSem;

bool fOpenWithRefresh;

Expand Down
12 changes: 6 additions & 6 deletions src/XrdClient/XrdClientInputBuffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,11 @@ int XrdClientInputBuffer::WipeStreamid(int streamid)
}

//________________________________________________________________________
XrdSysSemWait *XrdClientInputBuffer::GetSyncObjOrMakeOne(int streamid) {
XrdSysSemaphore *XrdClientInputBuffer::GetSyncObjOrMakeOne(int streamid) {
// Gets the right sync obj to wait for messages for a given streamid
// If the semaphore is not available, it creates one.

XrdSysSemWait *sem;
XrdSysSemaphore *sem;

{
XrdSysMutexHelper mtx(fMutex);
Expand All @@ -88,7 +88,7 @@ XrdSysSemWait *XrdClientInputBuffer::GetSyncObjOrMakeOne(int streamid) {
sem = fSyncobjRepo.Find(buf);

if (!sem) {
sem = new XrdSysSemWait(0);
sem = new XrdSysSemaphore(0);

fSyncobjRepo.Rep(buf, sem);
return sem;
Expand All @@ -111,7 +111,7 @@ XrdClientInputBuffer::XrdClientInputBuffer() {


//_______________________________________________________________________
int DeleteHashItem(const char *key, XrdSysSemWait *sem, void *Arg) {
int DeleteHashItem(const char *key, XrdSysSemaphore *sem, void *Arg) {

// This makes the Apply method delete the entry
return -1;
Expand Down Expand Up @@ -146,7 +146,7 @@ int XrdClientInputBuffer::PutMsg(XrdClientMessage* m)
{
// Put message in the list
int sz;
XrdSysSemWait *sem = 0;
XrdSysSemaphore *sem = 0;

{
XrdSysMutexHelper mtx(fMutex);
Expand Down Expand Up @@ -175,7 +175,7 @@ XrdClientMessage *XrdClientInputBuffer::GetMsg(int streamid, int secstimeout)
// If there are no XrdClientMessages for the streamid, it waits for a number
// of seconds for something to come

XrdSysSemWait *sem = 0;
XrdSysSemaphore *sem = 0;
XrdClientMessage *res = 0, *m = 0;

// Find the sem where to wait for a msg
Expand Down
7 changes: 3 additions & 4 deletions src/XrdClient/XrdClientInputBuffer.hh
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

#include "XrdClient/XrdClientMessage.hh"
#include "XrdSys/XrdSysPthread.hh"
#include "XrdSys/XrdSysSemWait.hh"
#include "XrdOuc/XrdOucHash.hh"
#include "XrdClient/XrdClientVector.hh"

Expand All @@ -35,15 +34,15 @@ private:
XrdClientVector<XrdClientMessage*> fMsgQue; // queue for incoming messages
int fMsgIter; // an iterator on it

XrdSysRecMutex fMutex; // mutex to protect data structures
XrdSysRecMutex fMutex; // mutex to protect data structures

XrdOucHash<XrdSysSemWait> fSyncobjRepo;
XrdOucHash<XrdSysSemaphore> fSyncobjRepo;
// each streamid counts on a condition
// variable to make the caller wait
// until some data is available


XrdSysSemWait *GetSyncObjOrMakeOne(int streamid);
XrdSysSemaphore *GetSyncObjOrMakeOne(int streamid);

int MsgForStreamidCnt(int streamid);

Expand Down
21 changes: 3 additions & 18 deletions src/XrdClient/XrdClientPhyConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -286,9 +286,7 @@ void XrdClientPhyConnection::Disconnect()

//____________________________________________________________________________
bool XrdClientPhyConnection::CheckAutoTerm() {
bool doexit = FALSE;

{
XrdSysMutexHelper l(fMutex);

// Parametric asynchronous stuff
Expand All @@ -298,26 +296,13 @@ bool XrdClientPhyConnection::CheckAutoTerm() {
Info(XrdClientDebug::kHIDEBUG,
"CheckAutoTerm", "Self-Cancelling reader thread.");

{
XrdSysMutexHelper l(fMutex);
fReaderthreadrunning--;
}

//delete fSocket;
//fSocket = 0;

doexit = TRUE;
}

}
fReaderthreadrunning--;

return true;

if (doexit) {
UnlockChannel();
return true;
}

return false;
return false;
}


Expand Down
3 changes: 1 addition & 2 deletions src/XrdClient/XrdClientPhyConnection.hh
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#include "XrdClient/XrdClientUrlInfo.hh"
#include "XrdClient/XrdClientThread.hh"
#include "XrdSys/XrdSysPthread.hh"
#include "XrdSys/XrdSysSemWait.hh"

#include <time.h> // for time_t data type

Expand Down Expand Up @@ -74,7 +73,7 @@ private:

UnsolRespProcResult HandleUnsolicited(XrdClientMessage *m);

XrdSysSemWait fReaderCV;
XrdSysSemaphore fReaderCV;

short fLogConnCnt; // Number of logical connections using this phyconn

Expand Down
17 changes: 8 additions & 9 deletions src/XrdClient/XrdCpMthrQueue.hh
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

#include "XrdSys/XrdSysPthread.hh"
#include "XrdClient/XrdClientVector.hh"
#include "XrdSys/XrdSysSemWait.hh"
#include "XrdSys/XrdSysHeaders.hh"

using namespace std;
Expand All @@ -28,16 +27,16 @@ struct XrdCpMessage {
class XrdCpMthrQueue {
private:
long fTotSize;
XrdClientVector<XrdCpMessage*> fMsgQue; // queue for incoming messages
int fMsgIter; // an iterator on it
int fWrWait; // Write waiters
XrdClientVector<XrdCpMessage*> fMsgQue; // queue for incoming messages
int fMsgIter; // an iterator on it
int fWrWait; // Write waiters

XrdSysRecMutex fMutex; // mutex to protect data structures
XrdSysRecMutex fMutex; // mutex to protect data structures

XrdSysSemWait fReadSem; // variable to make the reader wait
// until some data is available
XrdSysSemaphore fWriteSem; // variable to make the writer wait
// if the queue is full
XrdSysSemaphore fReadSem; // variable to make the reader wait
// until some data is available
XrdSysSemaphore fWriteSem; // variable to make the writer wait
// if the queue is full
public:

XrdCpMthrQueue();
Expand Down
2 changes: 1 addition & 1 deletion src/XrdNet/XrdNetSocket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ int XrdNetSocket::setOpts(int xfd, int opts, XrdSysError *eDest)
const int one = 1;
const SOCKLEN_t szone = (SOCKLEN_t)sizeof(one);
static int tcpprotid = XrdSysDNS::getProtoID("tcp");
static struct linger liopts = {1, XRDNETSOCKET_LINGER};
struct linger liopts = {1, XRDNETSOCKET_LINGER};
const SOCKLEN_t szlio = (SOCKLEN_t)sizeof(liopts);

if (!(opts & XRDNET_NOCLOSEX) && fcntl(xfd, F_SETFD, FD_CLOEXEC))
Expand Down
73 changes: 63 additions & 10 deletions src/XrdSys/XrdSysPthread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ int XrdSysCondVar::Wait(int sec)

// Wait for the condition or timeout
//
do {retc = pthread_cond_timedwait(&cvar, &cmut, &tval);}
while (retc && (retc != ETIMEDOUT));
retc = pthread_cond_timedwait(&cvar, &cmut, &tval);
if (retc && retc != ETIMEDOUT) {throw "pthread_cond_timedwait() failed";}

if (relMutex) UnLock();
return retc == ETIMEDOUT;
Expand Down Expand Up @@ -151,8 +151,8 @@ int XrdSysCondVar::WaitMS(int msec)

// Now wait for the condition or timeout
//
do {retc = pthread_cond_timedwait(&cvar, &cmut, &tval);}
while (retc && (retc != ETIMEDOUT));
retc = pthread_cond_timedwait(&cvar, &cmut, &tval);
if (retc && retc != ETIMEDOUT) {throw "pthread_cond_timedwait() failed";}

if (relMutex) UnLock();
return retc == ETIMEDOUT;
Expand Down Expand Up @@ -205,18 +205,71 @@ void XrdSysSemaphore::Wait()
// free is the OS implements an unfair mutex;
//
semVar.Lock();
if (semVal < 1 || semWait)
while(semVal < 1)
{semWait++;
semVar.Wait();
semWait--;
}

semWait++;
while(semVal < 1) semVar.Wait();
semWait--;

// Decrement the semaphore value and return
//
semVal--;
semVar.UnLock();
}

void XrdSysSemaphore::Wait(int sec)
{
struct timespec tval;
int retc;

semVar.Lock();

retc = semVal < 1;
if (retc)
{
// Adjust the time in seconds
tval.tv_sec = time(0) + sec;
tval.tv_nsec = 0;

semWait++;
// Wait until the semaphore value is positive or the deadline expires.
do {retc = pthread_cond_timedwait(&semVar.cvar, &semVar.cmut, &tval);
if (retc && retc != ETIMEDOUT) {throw "pthread_cond_timedwait() failed";}
}
while(semVal < 1 && retc == 0);
semWait--;
retc = semVal < 1;
}
// Decrement the semaphore value if it became positive,
// unlock the underlying cond var and return
//
if (!retc) semVal--;
semVar.UnLock();
return retc;
}

#else

/******************************************************************************/
/* W a i t */
/******************************************************************************/

int XrdSysSemaphore::Wait(int sec)
{
struct timespec tval;

// Simply adjust the time in seconds
//
tval.tv_sec = time(0) + sec;
tval.tv_nsec = 0;

// Wait for the semaphore or timeout
//
while(sem_timedwait(&h_semaphore, &tval))
{if (errno == ETIMEDOUT) return 1;
if (errno != EINTR) {throw "sem_timedwait() failed";}
}
return 0;
}
#endif

/******************************************************************************/
Expand Down
12 changes: 12 additions & 0 deletions src/XrdSys/XrdSysPthread.hh
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@

#include "XrdSys/XrdSysError.hh"


class XrdSysSemaphore;

/******************************************************************************/
/* X r d S y s C o n d V a r */
/******************************************************************************/
Expand Down Expand Up @@ -68,6 +71,12 @@ pthread_cond_t cvar;
pthread_mutex_t cmut;
int relMutex;
const char *condID;

#ifdef __macos__
// Allow pthread_cond_timedwait to be called directly so that the Wait(int)
// duration need not be recomputed on every wakeup.
friend class XrdSysSemaphore;
#endif
};


Expand Down Expand Up @@ -285,6 +294,7 @@ public:
void Post();

void Wait();
int Wait(int sec);

XrdSysSemaphore(int semval=1,const char *cid=0) : semVar(0, cid)
{semVal = semval; semWait = 0;}
Expand Down Expand Up @@ -321,6 +331,8 @@ inline void Wait() {while (sem_wait(&h_semaphore))
}
}

int Wait(int sec);

XrdSysSemaphore(int semval=1, const char * =0)
{if (sem_init(&h_semaphore, 0, semval))
{throw "sem_init() failed";}
Expand Down
Loading

0 comments on commit dad842b

Please sign in to comment.