diff --git a/roxie/ccd/ccdsnmp.cpp b/roxie/ccd/ccdsnmp.cpp index dfcc69ac1c2..63a0785bd61 100644 --- a/roxie/ccd/ccdsnmp.cpp +++ b/roxie/ccd/ccdsnmp.cpp @@ -37,10 +37,10 @@ RoxieQueryStats loQueryStats; RoxieQueryStats hiQueryStats; RoxieQueryStats slaQueryStats; RoxieQueryStats combinedQueryStats; -RelaxedAtomic retriesIgnoredPrm; -RelaxedAtomic retriesIgnoredSec; -RelaxedAtomic retriesNeeded; -RelaxedAtomic retriesReceivedPrm; +RelaxedAtomic retriesIgnoredPrm __attribute__((aligned(CACHE_LINE_SIZE))); +RelaxedAtomic retriesIgnoredSec __attribute__((aligned(CACHE_LINE_SIZE))); +RelaxedAtomic retriesNeeded __attribute__((aligned(CACHE_LINE_SIZE))); +RelaxedAtomic retriesReceivedPrm __attribute__((aligned(CACHE_LINE_SIZE))); RelaxedAtomic retriesReceivedSec; RelaxedAtomic retriesSent; RelaxedAtomic rowsIn; diff --git a/system/jlib/jatomic.hpp b/system/jlib/jatomic.hpp index c32b5db4e6c..700c7dceeb6 100644 --- a/system/jlib/jatomic.hpp +++ b/system/jlib/jatomic.hpp @@ -51,7 +51,7 @@ auto sub_fetch(T & value, decltype(value.load()) delta, std::memory_order order //NOTE: Counts will never be lost, but the values read from another thread may be inconsistent. //E.g., thread 1 updates x than y, thread 2 may read an updated value of y, but an old value of x. template -class RelaxedAtomic : public std::atomic +class alignas(CACHE_LINE_SIZE) RelaxedAtomic : public std::atomic { public: typedef std::atomic BASE; diff --git a/testing/unittests/unittests.cpp b/testing/unittests/unittests.cpp index 21df821f322..a4c7e7e8ca6 100644 --- a/testing/unittests/unittests.cpp +++ b/testing/unittests/unittests.cpp @@ -928,4 +928,129 @@ CPPUNIT_TEST_SUITE_REGISTRATION( PipeRunTest ); CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( PipeRunTest, "PipeRunTest" ); #endif +class RelaxedAtomicTimingTest : public CppUnit::TestFixture +{ + CPPUNIT_TEST_SUITE( RelaxedAtomicTimingTest ); + CPPUNIT_TEST(testRun); + CPPUNIT_TEST_SUITE_END(); + + void testRun() + { + testRunner(0); + testRunner(1); + testRunner(2); + testRunner(3); + testRunner(4); + } + + void testRunner(unsigned mode) + { + CCycleTimer timer; + unsigned count = 100000000; + RelaxedAtomic ra[201]; + CriticalSection lock[201]; + + for (int a = 0; a < 201; a++) + ra[a] = 0; + + class T : public CThreaded + { + public: + T(unsigned _count, RelaxedAtomic &_ra, CriticalSection &_lock, unsigned _mode) : CThreaded(""), count(_count), ra(_ra), lock(_lock), mode(_mode) + {} + virtual int run() override + { + switch(mode) + { + case 0: test0(); break; + case 1: test1(); break; + // Disabled next two for now as slow and not especially interesting + // case 2: test2(); break; + // case 3: test3(); break; + case 4: test4(); break; + } + return 0; + } + void test0() + { + RelaxedAtomic &a = ra; + while (count--) + a++; + } + void test1() + { + RelaxedAtomic &a = ra; + while (count--) + a.fastAdd(1); + } + void test2() + { + int &a = (int &) ra; + while (count--) + { + CriticalBlock b(lock); + a++; + } + } + void test3() + { + RelaxedAtomic &a = ra; + while (count--) + { + CriticalBlock b(lock); + a.fastAdd(1); + } + } + void test4() + { + int &a = (int &) ra; + while (count--) + { + if (a != count) + a++; + } + ra = a; + } + + unsigned mode; + unsigned count; + RelaxedAtomic &ra; + CriticalSection &lock; + } t1a(count, ra[0], lock[0], mode), t2a(count, ra[0], lock[0], mode), t3a(count, ra[0], lock[0], mode), + t1b(count, ra[0], lock[0], mode), t2b(count, ra[1], lock[1], mode), t3b(count, ra[2], lock[2], mode), + t1c(count, ra[0], lock[0], mode), t2c(count, ra[100], lock[100], mode), t3c(count, ra[200], lock[200], mode);; + DBGLOG("Testing RelaxedAtomics (test mode %u)", mode); + t1a.start(); + t2a.start(); + t3a.start(); + t1a.join(); + t2a.join(); + t3a.join(); + DBGLOG("Same RAs took %ums, value %d", timer.elapsedMs(), ra[0]+0); + for (int a = 0; a < 201; a++) + ra[a] = 0; + timer.reset(); + t1b.start(); + t2b.start(); + t3b.start(); + t1b.join(); + t2b.join(); + t3b.join(); + DBGLOG("Adjacent RAs took %ums, values %d %d %d", timer.elapsedMs(), ra[0]+0, ra[1]+0, ra[2]+0); + for (int a = 0; a < 201; a++) + ra[a] = 0; + timer.reset(); + t1c.start(); + t2c.start(); + t3c.start(); + t1c.join(); + t2c.join(); + t3c.join(); + DBGLOG("Spaced RAs took %ums, values %d %d %d", timer.elapsedMs(), ra[0]+0, ra[100]+0, ra[200]+0); + } +}; + +CPPUNIT_TEST_SUITE_REGISTRATION( RelaxedAtomicTimingTest ); +CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( RelaxedAtomicTimingTest, "RelaxedAtomicTimingTest" ); + #endif // _USE_CPPUNIT