diff --git a/rtk/IO.cpp b/rtk/IO.cpp index b33afbd..4765688 100644 --- a/rtk/IO.cpp +++ b/rtk/IO.cpp @@ -483,7 +483,8 @@ cerr<<"fini"; void smplVec::shuffle_singl() { //auto engine = std::default_random_engine{}; - auto engine = std::mt19937_64{}; + std::random_device rd; + auto engine = std::mt19937_64{rd()}; std::shuffle(std::begin(arr), std::end(arr), engine); } diff --git a/rtk/Rare.cpp b/rtk/Rare.cpp index 94028ac..c342804 100644 --- a/rtk/Rare.cpp +++ b/rtk/Rare.cpp @@ -1,29 +1,33 @@ #include "Rare.h" -const char* rar_ver="0.91"; +const char* rar_ver="0.92"; rareStruct* calcDivRar(int i, Matrix* Mo, DivEsts* div, long rareDep, vector>* abundInRow, vector>* occuencesInRow, string outF, int repeats, int writeFiles){ - smplVec* cur = Mo->getSampleVec(i); - string curS = Mo->getSampleName(i); - div->SampleName = curS; + + smplVec* cur = Mo->getSampleVec(i); + string curS = Mo->getSampleName(i); + div->SampleName = curS; std::vector> cnts; vector cntsMap; string cntsName; string skippedNames; bool wrAtAll(writeFiles > 0); + cur->rarefy(rareDep, outF, repeats, div, cntsMap, cntsName, skippedNames, abundInRow, occuencesInRow, writeFiles, false, wrAtAll); - //delete cur; - //return div; - rareStruct* tmpRS = new rareStruct();// = {*div, retCnts}; - tmpRS->div = div; - tmpRS->cnts = cntsMap; - tmpRS->cntsName = cntsName; - tmpRS->skippedNames = skippedNames; + + // put everything in our nice return container + rareStruct* tmpRS = new rareStruct(); + tmpRS->div = div; + tmpRS->cnts = cntsMap; + tmpRS->cntsName = cntsName; + tmpRS->skippedNames = skippedNames; + tmpRS->i = i; + delete cur; return tmpRS; } @@ -44,12 +48,13 @@ rareStruct* calcDivRarVec(int i, vector fileNames, DivEsts* div, long ra div, cntsMap, cntsName, skippedNames, abundInRow, occuencesInRow, writeFiles, false, wrAtAll); - rareStruct* tmpRS = new rareStruct();// = {*div, retCnts}; - tmpRS->div = div; - tmpRS->cnts = cntsMap; - tmpRS->cntsName = cntsName; - tmpRS->skippedNames = skippedNames; - tmpRS->IDs = cur->getRowNames(); + rareStruct* tmpRS = new rareStruct();// = {*div, retCnts}; + tmpRS->div = div; + tmpRS->cnts = cntsMap; + tmpRS->cntsName = cntsName; + tmpRS->skippedNames = skippedNames; + tmpRS->IDs = cur->getRowNames(); + tmpRS->i = i; delete cur; @@ -230,7 +235,7 @@ void options::print_rare_details(){ -void rareExtremLowMem(string inF, string outF, int writeFiles, string arg4, int repeats, int numThr = 1, bool storeBinary = false){ +void rareExtremLowMem(options * opts, string inF, string outF, int writeFiles, string arg4, int repeats, int numThr = 1, bool storeBinary = false){ // this mode takes the file, reads it in memory // prints the columns to their own files // then it loads those files again and @@ -266,85 +271,105 @@ void rareExtremLowMem(string inF, string outF, int writeFiles, string arg4, int vector< vector< rare_map > > MaRare (NoOfMatrices); std::vector cntsNames; vector < vector < string > > tmpMatFiles (NoOfMatrices ); - int done = 0; // number of samples processed for multithreading - uint i = 0; - std::future *tt = new std::future[numThr - 1]; - - - //rarefection code - vector divvs(fileNames.size(),NULL); - while(i < fileNames.size()){ - - // allow multithreading - int thirds = (int) floor((fileNames.size()-3)/3); - if(i < 3 || i % thirds == 0 ){ - cout << "At Sample " << i+1 << " of " << fileNames.size() << " Samples" << std::endl ; - if(i > 3 && i % thirds == 0 ){ - cout << "..." << std::endl ; - } - }else if( i == 3){ - cout << "..." << std::endl ; - } - - uint toWhere = done + numThr - 1; - if ((uint)((uint)fileNames.size() - 2 ) < toWhere){ - toWhere = fileNames.size() - 2; - } - // launch samples in threads - for (; i < toWhere; i++){ - DivEsts * div = new DivEsts(); - div->SampleName = SampleNames[i]; - tt[i - done] = async(std::launch::async, calcDivRarVec, i, fileNames, div, rareDep, &abundInRow, &occuencesInRow, outF, repeats, writeFiles); - } - // launch one in the mainthread - DivEsts * div = new DivEsts(); - div->SampleName = SampleNames[i]; - rareStruct* tmpRS; - tmpRS = calcDivRarVec(i, fileNames, div, rareDep, &abundInRow, &occuencesInRow, outF, repeats, writeFiles); - i++; - - - // process created data, first threads, then main thread - i = done; - for (; i < toWhere; i++){ - rareStruct* tmpRSas; - tmpRSas = tt[i-done].get(); - divvs[i] = tmpRSas->div; - string curS = SampleNames[i]; - //divvs[i-done]->print2file(outF + curS + "_alpha_div.tsv"); - - // add the matrices to the container - if(NoOfMatrices > 0){ - if(storeBinary == true){ - binaryStoreSample(tmpMatFiles, tmpRSas, rowNames,outF, cntsNames, true); - }else{ - memoryStoreSample(tmpRSas, MaRare, cntsNames, true); - } - } - - delete tmpRSas; + vector divvs(fileNames.size(),NULL); + vector < job > slots(opts->threads); + size_t smpls = Mo->smplNum(); + // now start a async in each slot + uint i = 0; + bool breakpoint(true); + while (breakpoint) { + + // check for any finished jobs + for( uint j = 0; j < slots.size(); j++ ){ + if( i >= smpls){ + breakpoint = false; + // break in case we have more slots than work + break; + } + + // open new slots + if( slots[j].inUse == false){ + + slots[j].inUse = true; + // launch an async task + DivEsts * div = new DivEsts(); + div->SampleName = SampleNames[i]; + slots[j].fut = async(std::launch::async, calcDivRarVec, i, fileNames, div, rareDep, &abundInRow, &occuencesInRow, outF, opts->repeats, opts->write); + + // send user some feedback + int thirds = 1; + if(smpls > 4){ + thirds = (int) floor((smpls-3)/3); + } + + if(i < 3 || i % thirds == 0 ){ + cout << "At Sample " << i+1 << " of " << smpls << " Samples" << std::endl; + if(i > 3 && i % thirds == 0 ){ + cout << "..." << std::endl ; + } + }else if( i == 3){ + cout << "..." << std::endl ; + } + + i++; + + }else if(slots[j].fut.wait_for(std::chrono::milliseconds(20)) == std::future_status::ready){ + + // move the information + rareStruct* tmpRS; + tmpRS = slots[j].fut.get(); + divvs[tmpRS->i] = tmpRS->div; + string curS = SampleNames[tmpRS->i]; + cout << curS << std::endl; + // add the matrices to the container + if (NoOfMatrices > 0) { + if (opts->writeSwap) { + binaryStoreSample(tmpMatFiles, tmpRS, rowNames, outF, cntsNames, true); + } + else { + memoryStoreSample(tmpRS, MaRare, cntsNames, true); + } + } + + delete tmpRS; + // free slot + slots[j].inUse = false; + } + } + + } - // main thread divv push back - divvs[i] = tmpRS->div; - string curS = SampleNames[i]; - //divvs[i]->print2file(outF + curS + "_alpha_div.tsv"); - - - // add the matrices to the container - if(NoOfMatrices > 0){ - if(storeBinary == true){ - binaryStoreSample(tmpMatFiles, tmpRS, rowNames,outF, cntsNames, true); - }else{ - memoryStoreSample(tmpRS, MaRare, cntsNames, true); - } + // wait for the threads to finish up. + for(uint j = 0; j < slots.size(); j++){ + if(slots[j].inUse == false ){ + // only copy if there is work to be done + continue; + } + slots[j].fut.wait(); + // move the information + rareStruct* tmpRS; + tmpRS = slots[j].fut.get(); + divvs[tmpRS->i] = tmpRS->div; + string curS = SampleNames[tmpRS->i]; + + // add the matrices to the container + if (NoOfMatrices > 0) { + if (opts->writeSwap) { + binaryStoreSample(tmpMatFiles, tmpRS, rowNames, outF, cntsNames, true); + } + else { + memoryStoreSample(tmpRS, MaRare, cntsNames, true); + } + } + + delete tmpRS; + // free slot + slots[j].inUse = false; } - - delete tmpRS; - i++; - done = i; - } - + + + // print the div estimates out into a file printDivMat(outF , divvs, true); for (size_t i = 0; i < divvs.size(); i++){ @@ -589,7 +614,7 @@ int main(int argc, char* argv[]) } else if (mode == "swap") { vector < vector < string > > tmpMatFiles(opts->write); - rareExtremLowMem(inF, outF, opts->write, arg4, opts->repeats, numThr, opts->writeSwap); + rareExtremLowMem(opts, inF, outF, opts->write, arg4, opts->repeats, numThr, opts->writeSwap); printf("Time taken: %.2fs\n", (double)(clock() - tStart) / CLOCKS_PER_SEC); std::exit(0); } @@ -626,69 +651,106 @@ int main(int argc, char* argv[]) //object to keep matrices vector < vector < string > > tmpMatFiles(opts->write); //cerr << "TH"; - std::future *tt = new std::future[numThr - 1]; - uint i = 0; uint done = 0; - while (i < Mo->smplNum()) { - int thirds = (int)floor((Mo->smplNum() - 3) / 3); - if (i < 3 || i % thirds == 0) { - cout << "At Sample " << i + 1 << " of " << Mo->smplNum() << " Samples" << std::endl; - if (i > 3 && i % thirds == 0) { - cout << "..." << std::endl; - } - } - else if (i == 3) { - cout << "..." << std::endl; - } - uint toWhere = done + numThr - 1; if ((uint)((uint)Mo->smplNum() - 2) < toWhere) { toWhere = Mo->smplNum() - 2; } - for (; i < toWhere; i++) { - DivEsts * div = new DivEsts(); - tt[i - done] = async(std::launch::async, calcDivRar, i, Mo, div, rareDep, &abundInRow, &occuencesInRow, outF, opts->repeats, opts->write); - } - //use main thread to calc one sample as well - DivEsts * div = new DivEsts(); - rareStruct* tmpRS; - tmpRS = calcDivRar(i, Mo, div, rareDep, &abundInRow, &occuencesInRow, outF, opts->repeats, opts->write); - - - i++; - i = done; - for (; i < toWhere; i++) { - rareStruct* tmpRS; - tmpRS = tt[i - done].get(); - divvs[i] = tmpRS->div; - string curS = Mo->getSampleName(i); - //divvs[i-done]->print2file(outF + curS + "_alpha_div.tsv"); - - // add the matrices to the container - if (NoOfMatrices > 0) { - if (opts->writeSwap) { - binaryStoreSample(tmpMatFiles, tmpRS, rowNames, outF, cntsNames, false); - } - else { - memoryStoreSample(tmpRS, MaRare, cntsNames, false); - } - } + // vector keeping all the slots + vector < job > slots(opts->threads); + + // now start a async in each slot + uint i = 0; + + + size_t smpls = Mo->smplNum(); + bool breakpoint(true); + while (breakpoint) { + + + // check for any finished jobs + for( uint j = 0; j < slots.size(); j++ ){ + if( i >= smpls){ + breakpoint = false; + // break in case we have more slots than work + break; + } + + // open new slots + if( slots[j].inUse == false){ + + slots[j].inUse = true; + // launch an async task + DivEsts * div = new DivEsts(); + slots[j].fut = async(std::launch::async, calcDivRar, i, Mo, div, rareDep, &abundInRow, &occuencesInRow, outF, opts->repeats, opts->write); + + // send user some feedback + int thirds = 1; + if(smpls > 4){ + thirds = (int) floor((smpls-3)/3); + } + if(i < 3 || i % thirds == 0 ){ + cout << "At Sample " << i+1 << " of " << smpls << " Samples" << std::endl ; + if(i > 3 && i % thirds == 0 ){ + cout << "..." << std::endl ; + } + }else if( i == 3){ + cout << "..." << std::endl ; + } + + i++; + + }else if(slots[j].fut.wait_for(std::chrono::milliseconds(20)) == std::future_status::ready){ + + // move the information + rareStruct* tmpRS; + tmpRS = slots[j].fut.get(); + divvs[tmpRS->i] = tmpRS->div; + string curS = Mo->getSampleName(tmpRS->i); + + // add the matrices to the container + if (NoOfMatrices > 0) { + if (opts->writeSwap) { + binaryStoreSample(tmpMatFiles, tmpRS, rowNames, outF, cntsNames, false); + } + else { + memoryStoreSample(tmpRS, MaRare, cntsNames, false); + } + } + + delete tmpRS; + // free slot + slots[j].inUse = false; + } + } + + + } - delete tmpRS; - } - // main thread divv push back - divvs[i] = tmpRS->div; - string curS = Mo->getSampleName(i); - //divvs[i]->print2file(outF + curS + "_alpha_div.tsv"); - - // add the matrices to the container - if (NoOfMatrices > 0) { - if (opts->writeSwap) { - binaryStoreSample(tmpMatFiles, tmpRS, rowNames, outF, cntsNames, false); - } - else { - memoryStoreSample(tmpRS, MaRare, cntsNames, false); - } - } - delete tmpRS; - i++; - done = i; + // wait for the threads to finish up. + for(uint j = 0; j < slots.size(); j++){ + if(slots[j].inUse == false ){ + // only copy if there is work to be done + continue; + } + slots[j].fut.wait(); + // move the information + rareStruct* tmpRS; + tmpRS = slots[j].fut.get(); + divvs[tmpRS->i] = tmpRS->div; + string curS = Mo->getSampleName(tmpRS->i); + + // add the matrices to the container + if (NoOfMatrices > 0) { + if (opts->writeSwap) { + binaryStoreSample(tmpMatFiles, tmpRS, rowNames, outF, cntsNames, false); + } + else { + memoryStoreSample(tmpRS, MaRare, cntsNames, false); + } + } + + delete tmpRS; + // free slot + slots[j].inUse = false; } + + // output matrix printDivMat(outF, divvs, true); for (size_t i = 0; i < divvs.size(); i++) { delete divvs[i]; @@ -717,7 +779,7 @@ int main(int argc, char* argv[]) computeCE(ACE, occuencesInRow); writeGlobalDiv(ICE, ACE, chao2, outF + "_gDiv.tsv"); - printf("Time taken: %.2fs\n", (double)(clock() - tStart) / CLOCKS_PER_SEC); + printf("CPU time taken: %.2fs\n", (double)(clock() - tStart) / CLOCKS_PER_SEC); //cout << "Finished\n"; std::exit(0); } diff --git a/rtk/Rare.h b/rtk/Rare.h index 2283274..5a261b5 100644 --- a/rtk/Rare.h +++ b/rtk/Rare.h @@ -3,12 +3,19 @@ #include "options.h" struct rareStruct{ + int i; DivEsts* div; string cntsName; //std::vector> cnts; vector cnts; string skippedNames; vector IDs; + +}; + +struct job { + std::future fut; + bool inUse = false; }; void binaryStoreSample(vector< vector< string > > & tmpMatFiles, rareStruct* tmpRS,