Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move the implementation into a consistent namespace libfsst. #28

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 62 additions & 48 deletions fsst.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,20 @@
//
// You can contact the authors via the FSST source repository : https://github.com/cwida/fsst
#ifdef FSST12

#include "fsst12.h" // the official FSST API -- also usable by C mortals

#else
#include "fsst.h" // the official FSST API -- also usable by C mortals
#endif

#include <condition_variable>
#include <iostream>
#include <fstream>
#include <mutex>
#include <vector>
#include <thread>

using namespace std;

// Utility to compress and decompress (-d) data with FSST (using stdin and stdout).
Expand All @@ -45,48 +49,54 @@ using namespace std;
namespace {

class BinarySemaphore {
private:
private:
mutex m;
condition_variable cv;
bool value;

public:
public:
explicit BinarySemaphore(bool initialValue = false) : value(initialValue) {}

void wait() {
unique_lock<mutex> lock(m);
while (!value) cv.wait(lock);
value = false;
}

void post() {
{ unique_lock<mutex> lock(m); value = true; }
{
unique_lock<mutex> lock(m);
value = true;
}
cv.notify_one();
}
};

bool stopThreads = false;
BinarySemaphore srcDoneIO[2], dstDoneIO[2], srcDoneCPU[2], dstDoneCPU[2];
unsigned char *srcBuf[2] = { NULL, NULL };
unsigned char *dstBuf[2] = { NULL, NULL };
unsigned char *dstMem[2] = { NULL, NULL };
size_t srcLen[2] = { 0, 0 };
size_t dstLen[2] = { 0, 0 };
unsigned char *srcBuf[2] = {NULL, NULL};
unsigned char *dstBuf[2] = {NULL, NULL};
unsigned char *dstMem[2] = {NULL, NULL};
size_t srcLen[2] = {0, 0};
size_t dstLen[2] = {0, 0};

#define FSST_MEMBUF (1ULL<<22)
int decompress = 0;
size_t blksz = FSST_MEMBUF-(1+FSST_MAXHEADER/2); // block size of compression (max compressed size must fit 3 bytes)
size_t blksz =
FSST_MEMBUF - (1 + FSST_MAXHEADER / 2); // block size of compression (max compressed size must fit 3 bytes)

#define DESERIALIZE(p) (((unsigned long long) (p)[0]) << 16) | (((unsigned long long) (p)[1]) << 8) | ((unsigned long long) (p)[2])
#define SERIALIZE(l,p) { (p)[0] = ((l)>>16)&255; (p)[1] = ((l)>>8)&255; (p)[2] = (l)&255; }
#define SERIALIZE(l, p) { (p)[0] = ((l)>>16)&255; (p)[1] = ((l)>>8)&255; (p)[2] = (l)&255; }

void reader(ifstream& src) {
for(int swap=0; true; swap = 1-swap) {
void reader(ifstream &src) {
for (int swap = 0; true; swap = 1 - swap) {
srcDoneCPU[swap].wait();
if (stopThreads) break;
src.read((char*) srcBuf[swap], blksz);
src.read((char *) srcBuf[swap], blksz);
srcLen[swap] = (unsigned long) src.gcount();
if (decompress) {
if (blksz && srcLen[swap] == blksz) {
blksz = DESERIALIZE(srcBuf[swap]+blksz-3); // read size of next block
blksz = DESERIALIZE(srcBuf[swap] + blksz - 3); // read size of next block
srcLen[swap] -= 3; // cut off size bytes
} else {
blksz = 0;
Expand All @@ -96,33 +106,33 @@ void reader(ifstream& src) {
}
}

void writer(ofstream& dst) {
for(int swap=0; true; swap = 1-swap) {
void writer(ofstream &dst) {
for (int swap = 0; true; swap = 1 - swap) {
dstDoneCPU[swap].wait();
if (!dstLen[swap]) break;
dst.write((char*) dstBuf[swap], dstLen[swap]);
dst.write((char *) dstBuf[swap], dstLen[swap]);
dstDoneIO[swap].post();
}
for(int swap=0; swap<2; swap++)
for (int swap = 0; swap < 2; swap++)
dstDoneIO[swap].post();
}

}

int main(int argc, char* argv[]) {
int main(int argc, char *argv[]) {
size_t srcTot = 0, dstTot = 0;
if (argc < 2 || argc > 4 || (argc == 4 && (argv[1][0] != '-' || argv[1][1] != 'd' || argv[1][2]))) {
cerr << "usage: " << argv[0] << " -d infile outfile" << endl;
cerr << " " << argv[0] << " infile outfile" << endl;
cerr << " " << argv[0] << " infile" << endl;
cerr << " " << argv[0] << " infile outfile" << endl;
cerr << " " << argv[0] << " infile" << endl;
return -1;
}
decompress = (argc == 4);
string srcfile(argv[1+decompress]), dstfile;
string srcfile(argv[1 + decompress]), dstfile;
if (argc == 2) {
dstfile = srcfile + ".fsst";
} else {
dstfile = argv[2+decompress];
dstfile = argv[2 + decompress];
}
ifstream src;
ofstream dst;
Expand All @@ -132,62 +142,66 @@ int main(int argc, char* argv[]) {
dst.exceptions(ios_base::badbit);
src.exceptions(ios_base::badbit);
if (decompress) {
unsigned char tmp[3];
src.read((char*) tmp, 3);
if (src.gcount() != 3) {
cerr << "failed to open input." << endl;
return -1;
}
blksz = DESERIALIZE(tmp); // read first block size
unsigned char tmp[3];
src.read((char *) tmp, 3);
if (src.gcount() != 3) {
cerr << "failed to open input." << endl;
return -1;
}
blksz = DESERIALIZE(tmp); // read first block size
}
vector<unsigned char> buffer(FSST_MEMBUF*6);
vector<unsigned char> buffer(FSST_MEMBUF * 6);
srcBuf[0] = buffer.data();
srcBuf[1] = srcBuf[0] + (FSST_MEMBUF*(1ULL+decompress));
dstMem[0] = srcBuf[1] + (FSST_MEMBUF*(1ULL+decompress));
dstMem[1] = dstMem[0] + (FSST_MEMBUF*(2ULL-decompress));
srcBuf[1] = srcBuf[0] + (FSST_MEMBUF * (1ULL + decompress));
dstMem[0] = srcBuf[1] + (FSST_MEMBUF * (1ULL + decompress));
dstMem[1] = dstMem[0] + (FSST_MEMBUF * (2ULL - decompress));

for(int swap=0; swap<2; swap++) {
for (int swap = 0; swap < 2; swap++) {
srcDoneCPU[swap].post(); // input buffer is not being processed initially
dstDoneIO[swap].post(); // output buffer is not being written initially
}
thread readerThread([&src]{ reader(src); });
thread writerThread([&dst]{ writer(dst); });
thread readerThread([&src] { reader(src); });
thread writerThread([&dst] { writer(dst); });

for(int swap=0; true; swap = 1-swap) {
for (int swap = 0; true; swap = 1 - swap) {
srcDoneIO[swap].wait(); // wait until input buffer is available (i.e. done reading)
dstDoneIO[swap].wait(); // wait until output buffer is ready writing hence free for use
if (srcLen[swap] == 0) {
dstLen[swap] = 0;
break;
}
if (decompress) {
fsst_decoder_t decoder;
size_t hdr = fsst_import(&decoder, srcBuf[swap]);
dstLen[swap] = fsst_decompress(&decoder, srcLen[swap] - hdr, srcBuf[swap] + hdr, FSST_MEMBUF, dstBuf[swap] = dstMem[swap]);
fsst_decoder_t decoder;
size_t hdr = fsst_import(&decoder, srcBuf[swap]);
dstLen[swap] = fsst_decompress(&decoder, srcLen[swap] - hdr, srcBuf[swap] + hdr, FSST_MEMBUF,
dstBuf[swap] = dstMem[swap]);
} else {
unsigned char tmp[FSST_MAXHEADER];
fsst_encoder_t* encoder = fsst_create(1, &srcLen[swap], const_cast<const unsigned char **>(&srcBuf[swap]), 0);
fsst_encoder_t *encoder = fsst_create(1, &srcLen[swap], const_cast<const unsigned char **>(&srcBuf[swap]),
0);
size_t hdr = fsst_export(encoder, tmp);
if (fsst_compress(encoder, 1, &srcLen[swap], const_cast<const unsigned char **>(&srcBuf[swap]),
FSST_MEMBUF * 2, dstMem[swap] + FSST_MAXHEADER + 3,
&dstLen[swap], &dstBuf[swap]) < 1)
return -1;
dstLen[swap] += 3 + hdr;
dstBuf[swap] -= 3 + hdr;
SERIALIZE(dstLen[swap],dstBuf[swap]); // block starts with size
copy(tmp, tmp+hdr, dstBuf[swap]+3); // then the header (followed by the compressed bytes which are already there)
fsst_destroy(encoder);
dstBuf[swap] -= 3 + hdr;
SERIALIZE(dstLen[swap], dstBuf[swap]); // block starts with size
copy(tmp, tmp + hdr,
dstBuf[swap] + 3); // then the header (followed by the compressed bytes which are already there)
fsst_destroy(encoder);
}
srcTot += srcLen[swap];
dstTot += dstLen[swap];
srcDoneCPU[swap].post(); // input buffer may be re-used by the reader for the next block
dstDoneCPU[swap].post(); // output buffer is ready for writing out
}
cerr << (decompress?"Dec":"C") << "ompressed " << srcTot << " bytes into " << dstTot << " bytes ==> " << (int) ((100*dstTot)/srcTot) << "%" << endl;
cerr << (decompress ? "Dec" : "C") << "ompressed " << srcTot << " bytes into " << dstTot << " bytes ==> "
<< (int) ((100 * dstTot) / srcTot) << "%" << endl;

// force wait until all background writes finished
stopThreads = true;
for(int swap=0; swap<2; swap++) {
for (int swap = 0; swap < 2; swap++) {
srcDoneCPU[swap].post();
dstDoneCPU[swap].post();
}
Expand Down
124 changes: 67 additions & 57 deletions fsst_avx512.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,30 @@
#include <immintrin.h>

#ifdef _WIN32
namespace libfsst {
bool fsst_hasAVX512() {
int info[4];
__cpuidex(info, 0x00000007, 0);
return (info[1]>>16)&1;
}
}
#else
#include <cpuid.h>
bool fsst_hasAVX512() {
int info[4];
__cpuid_count(0x00000007, 0, info[0], info[1], info[2], info[3]);
return (info[1]>>16)&1;
namespace libfsst {
bool fsst_hasAVX512() {
int info[4];
__cpuid_count(0x00000007, 0, info[0], info[1], info[2], info[3]);
return (info[1] >> 16) & 1;
}
}
#endif
#else
namespace libfsst {
bool fsst_hasAVX512() { return false; }
}
#endif

namespace libfsst {
// BULK COMPRESSION OF STRINGS
//
// In one call of this function, we can compress 512 strings, each of maximum length 511 bytes.
Expand Down Expand Up @@ -70,14 +77,15 @@ bool fsst_hasAVX512() { return false; }
// This reduces the effectiveness of unrolling, hence -O2 makes the loop perform worse than -O1 which skips this optimization.
// Assembly inspection confirmed that 3-way unroll with -O1 avoids needless load/stores.

size_t fsst_compressAVX512(SymbolTable &symbolTable, u8* codeBase, u8* symbolBase, SIMDjob *input, SIMDjob *output, size_t n, size_t unroll) {
size_t processed = 0;
// define some constants (all_x means that all 8 lanes contain 64-bits value X)
size_t fsst_compressAVX512(SymbolTable &symbolTable, u8 *codeBase, u8 *symbolBase, SIMDjob *input, SIMDjob *output,
size_t n, size_t unroll) {
size_t processed = 0;
// define some constants (all_x means that all 8 lanes contain 64-bits value X)
#ifdef __AVX512F__
//__m512i all_suffixLim= _mm512_broadcastq_epi64(_mm_set1_epi64((__m64) (u64) symbolTable->suffixLim)); -- for variants b,c
__m512i all_MASK = _mm512_broadcastq_epi64(_mm_set1_epi64((__m64) (u64) -1));
__m512i all_PRIME = _mm512_broadcastq_epi64(_mm_set1_epi64((__m64) (u64) FSST_HASH_PRIME));
__m512i all_ICL_FREE = _mm512_broadcastq_epi64(_mm_set1_epi64((__m64) (u64) FSST_ICL_FREE));
//__m512i all_suffixLim= _mm512_broadcastq_epi64(_mm_set1_epi64((__m64) (u64) symbolTable->suffixLim)); -- for variants b,c
__m512i all_MASK = _mm512_broadcastq_epi64(_mm_set1_epi64((__m64) (u64) -1));
__m512i all_PRIME = _mm512_broadcastq_epi64(_mm_set1_epi64((__m64) (u64) FSST_HASH_PRIME));
__m512i all_ICL_FREE = _mm512_broadcastq_epi64(_mm_set1_epi64((__m64) (u64) FSST_ICL_FREE));
#define all_HASH _mm512_srli_epi64(all_MASK, 64-FSST_HASH_LOG2SIZE)
#define all_ONE _mm512_srli_epi64(all_MASK, 63)
#define all_M19 _mm512_srli_epi64(all_MASK, 45)
Expand All @@ -87,54 +95,56 @@ size_t fsst_compressAVX512(SymbolTable &symbolTable, u8* codeBase, u8* symbolBas
#define all_FFFF _mm512_srli_epi64(all_MASK, 48)
#define all_FF _mm512_srli_epi64(all_MASK, 56)

SIMDjob *inputEnd = input+n;
assert(n >= unroll*8 && n <= 512); // should be close to 512
__m512i job1, job2, job3, job4; // will contain current jobs, for each unroll 1,2,3,4
__mmask8 loadmask1 = 255, loadmask2 = 255*(unroll>1), loadmask3 = 255*(unroll>2), loadmask4 = 255*(unroll>3); // 2b loaded new strings bitmask per unroll
u32 delta1 = 8, delta2 = 8*(unroll>1), delta3 = 8*(unroll>2), delta4 = 8*(unroll>3); // #new loads this SIMD iteration per unroll
SIMDjob *inputEnd = input+n;
assert(n >= unroll*8 && n <= 512); // should be close to 512
__m512i job1, job2, job3, job4; // will contain current jobs, for each unroll 1,2,3,4
__mmask8 loadmask1 = 255, loadmask2 = 255*(unroll>1), loadmask3 = 255*(unroll>2), loadmask4 = 255*(unroll>3); // 2b loaded new strings bitmask per unroll
u32 delta1 = 8, delta2 = 8*(unroll>1), delta3 = 8*(unroll>2), delta4 = 8*(unroll>3); // #new loads this SIMD iteration per unroll

if (unroll >= 4) {
while (input+delta1+delta2+delta3+delta4 < inputEnd) {
#include "fsst_avx512_unroll4.inc"
}
} else if (unroll == 3) {
while (input+delta1+delta2+delta3 < inputEnd) {
#include "fsst_avx512_unroll3.inc"
}
} else if (unroll == 2) {
while (input+delta1+delta2 < inputEnd) {
#include "fsst_avx512_unroll2.inc"
}
} else {
while (input+delta1 < inputEnd) {
#include "fsst_avx512_unroll1.inc"
}
}
if (unroll >= 4) {
while (input+delta1+delta2+delta3+delta4 < inputEnd) {
#include "fsst_avx512_unroll4.inc"
}
} else if (unroll == 3) {
while (input+delta1+delta2+delta3 < inputEnd) {
#include "fsst_avx512_unroll3.inc"
}
} else if (unroll == 2) {
while (input+delta1+delta2 < inputEnd) {
#include "fsst_avx512_unroll2.inc"
}
} else {
while (input+delta1 < inputEnd) {
#include "fsst_avx512_unroll1.inc"
}
}

// flush the job states of the unfinished strings at the end of output[]
processed = n - (inputEnd - input);
u32 unfinished = 0;
if (unroll > 1) {
if (unroll > 2) {
if (unroll > 3) {
_mm512_mask_compressstoreu_epi64(output+unfinished, loadmask4=~loadmask4, job4);
unfinished += _mm_popcnt_u32((int) loadmask4);
}
_mm512_mask_compressstoreu_epi64(output+unfinished, loadmask3=~loadmask3, job3);
unfinished += _mm_popcnt_u32((int) loadmask3);
}
_mm512_mask_compressstoreu_epi64(output+unfinished, loadmask2=~loadmask2, job2);
unfinished += _mm_popcnt_u32((int) loadmask2);
}
_mm512_mask_compressstoreu_epi64(output+unfinished, loadmask1=~loadmask1, job1);
// flush the job states of the unfinished strings at the end of output[]
processed = n - (inputEnd - input);
u32 unfinished = 0;
if (unroll > 1) {
if (unroll > 2) {
if (unroll > 3) {
_mm512_mask_compressstoreu_epi64(output+unfinished, loadmask4=~loadmask4, job4);
unfinished += _mm_popcnt_u32((int) loadmask4);
}
_mm512_mask_compressstoreu_epi64(output+unfinished, loadmask3=~loadmask3, job3);
unfinished += _mm_popcnt_u32((int) loadmask3);
}
_mm512_mask_compressstoreu_epi64(output+unfinished, loadmask2=~loadmask2, job2);
unfinished += _mm_popcnt_u32((int) loadmask2);
}
_mm512_mask_compressstoreu_epi64(output+unfinished, loadmask1=~loadmask1, job1);
#else
(void) symbolTable;
(void) codeBase;
(void) symbolBase;
(void) input;
(void) output;
(void) n;
(void) unroll;
(void) symbolTable;
(void) codeBase;
(void) symbolBase;
(void) input;
(void) output;
(void) n;
(void) unroll;
#endif
return processed;
return processed;
}
}

Loading