Skip to content

Commit

Permalink
fix data races
Browse files Browse the repository at this point in the history
  • Loading branch information
arvidn authored and Yostra committed Dec 19, 2020
1 parent bfe84cf commit eb96d99
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 38 deletions.
2 changes: 1 addition & 1 deletion src/callback.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class WesolowskiCallback :public INUDUPLListener {
virtual void OnIteration(int type, void *data, uint64_t iteration) = 0;

std::unique_ptr<form[]> forms;
int64_t iterations = 0;
std::atomic<int64_t> iterations{0};
integer D;
integer L;
PulmarkReducer* reducer;
Expand Down
49 changes: 15 additions & 34 deletions src/threading.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
#ifndef THREADING_H
#define THREADING_H

#include "alloc.hpp"
#include <boost/align/aligned_alloc.hpp>
#include <atomic>

//mp_limb_t is an unsigned integer
static_assert(sizeof(mp_limb_t)==8, "");
Expand Down Expand Up @@ -549,19 +550,13 @@ template<bool is_write, class type> void prefetch(const type& p) {
template<class type> void prefetch_write(const type& p) { prefetch<true>(p); }
template<class type> void prefetch_read(const type& p) { prefetch<false>(p); }

void memory_barrier() {
asm volatile( "" ::: "memory" );
}

struct alignas(64) thread_counter {
uint64 counter_value=0; //updated atomically since only one thread can write to it
uint64 error_flag=0;
std::atomic<uint64> counter_value{0}; //updated atomically since only one thread can write to it
std::atomic<bool> error_flag{false};

void reset() {
memory_barrier();
counter_value=0;
error_flag=0;
memory_barrier();
error_flag = false;
}

thread_counter() {
Expand Down Expand Up @@ -597,10 +592,8 @@ struct thread_state {
//print( "raise_error", is_slave );
//}

memory_barrier();
this_counter().error_flag=1;
other_counter().error_flag=1;
memory_barrier();
this_counter().error_flag = true;
other_counter().error_flag = true;
}

uint64 v() {
Expand All @@ -614,11 +607,9 @@ struct thread_state {
return true;
}

memory_barrier();

uint64 spin_counter=0;
while (other_counter().counter_value < t_v) {
if (this_counter().error_flag || other_counter().error_flag) {
if (this_counter().error_flag.load() || other_counter().error_flag.load()) {
raise_error();
break;
}
Expand All @@ -633,11 +624,8 @@ struct thread_state {
}

++spin_counter;
memory_barrier();
}

memory_barrier();

if (!(this_counter().error_flag)) {
last_fence=t_v;
}
Expand All @@ -656,8 +644,6 @@ struct thread_state {
return true;
}

memory_barrier(); //wait for all writes to finish (on x86 this doesn't do anything but the compiler still needs it)

assert(t_v>=v());

if (this_counter().error_flag) {
Expand All @@ -666,7 +652,6 @@ struct thread_state {

this_counter().counter_value=t_v;

memory_barrier(); //want the counter writes to be low latency so prevent the compiler from caching it
return !(this_counter().error_flag);
}

Expand All @@ -681,17 +666,15 @@ struct thread_state {
/*void wait_for_error_to_be_cleared() {
assert(is_slave && enable_threads);
while (this_counter().error_flag) {
memory_barrier();
std::this_thread::yield();
}
}
void clear_error() {
assert(!is_slave);
memory_barrier();
this_counter().error_flag=0;
other_counter().error_flag=0;
memory_barrier();
this_counter().error_flag = false;
other_counter().error_flag = false;
}*/
};

Expand Down Expand Up @@ -796,7 +779,8 @@ template<class mpz_type> bool gcd_unsigned(
data.threshold=(uint64*)&threshold[0];

data.uv_counter_start=c_thread_state.counter_start+counter_start_delta+1;
data.out_uv_counter_addr=&(c_thread_state.this_counter().counter_value);
// TODO: come up with something better here
data.out_uv_counter_addr=reinterpret_cast<uint64_t*>(&(c_thread_state.this_counter().counter_value));
data.out_uv_addr=(uint64*)&(c_results.uv_entries[1]);
data.iter=-1;
data.a_end_index=(a_limbs==0)? 0 : a_limbs-1;
Expand All @@ -805,12 +789,9 @@ template<class mpz_type> bool gcd_unsigned(
assert((uint64(data.out_uv_addr)&63)==0); //should be cache line aligned
}

memory_barrier();
int error_code=hasAVX2()?
asm_code::asm_avx2_func_gcd_unsigned(&data):
asm_code::asm_cel_func_gcd_unsigned(&data);

memory_barrier();
asm_code::asm_avx2_func_gcd_unsigned(&data):
asm_code::asm_cel_func_gcd_unsigned(&data);

if (error_code!=0) {
c_thread_state.raise_error();
Expand Down
7 changes: 6 additions & 1 deletion src/vdf.h
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,11 @@ class ProverManager {
}

void RunEventLoop() {
// this is running in a separate thread. Any member variables it
// accesses must be one of:
// * protected by a mutex
// * owned entirely by this thread
// * atomic
const bool multi_proc_machine = (std::thread::hardware_concurrency() >= 16) ? true : false;
bool warned = false;
bool increased_proving = false;
Expand Down Expand Up @@ -755,7 +760,7 @@ class ProverManager {
// Maximum iter that can be proved.
uint64_t max_proving_iteration = 0;
// Where the VDF thread is at.
uint64_t vdf_iteration = 0;
std::atomic<uint64_t> vdf_iteration{0};
bool proof_done;
uint64_t intermediates_iter;
};
Expand Down
2 changes: 0 additions & 2 deletions src/vdf_fast.h
Original file line number Diff line number Diff line change
Expand Up @@ -1061,14 +1061,12 @@ uint64 repeated_square_fast_multithread(square_state_type &square_state, form& f
slave_counter[square_state.pairindex].reset();

square_state.init(D, L, f.a, f.b);
memory_barrier();

thread slave_thread(repeated_square_fast_work, std::ref(square_state), false, base, iterations, std::ref(nuduplListener));

repeated_square_fast_work(square_state, true, base, iterations, nuduplListener);

slave_thread.join(); //slave thread can't get stuck; is supposed to error out instead
memory_barrier();

uint64 res;
square_state.assign(f.a, f.b, f.c, res);
Expand Down

0 comments on commit eb96d99

Please sign in to comment.