用维基百科来解释
如果系统的其余部分看起来是即时发生的,则操作(或一组操作)是原子的或不可中断的。没有锁,只有简单的 CPU 指令(“从内存中读取这个字节”)是原子的(不可分割的)。在单 CPU 系统上,可以暂时禁用中断(因此不会中断一系列操作)但实际上通过使用同步原语(通常是互斥锁)来实现原子性。
增加变量(i++
)是 _ 而不是 _ 原子,因为它需要三个不同的步骤:将位模式从存储器复制到 CPU 中;使用 CPU 的寄存器执行计算;将位模式复制回内存。在此递增序列期间,另一个线程或进程仍然可以读取旧值,并且当递增序列完成时,对同一存储器的其他写入也将被覆盖。
注意,这只是一个介绍 - 编写高性能的线程安全数据结构需要它自己的书!这是一个非线程安全的简单数据结构(栈):
// A simple fixed-sized stack (version 1)
#define STACK_SIZE 20
int count;
double values[STACK_SIZE];
void push(double v) {
values[count++] = v;
}
double pop() {
return values[--count];
}
int is_empty() {
return count == 0;
}
栈的版本 1 不是线程安全的,因为如果两个线程同时调用 push 或 pop,则结果或栈可能不一致。例如,假设两个线程同时调用 pop,则两个线程可以读取相同的值,两者都可以读取原始计数值。
为了将其转换为线程安全的数据结构,我们需要识别代码的 _ 关键部分 _,即代码的哪个部分一次只能有一个线程。在上面的例子中,push
,pop
和is_empty
函数访问相同的变量(即存储器)和栈的所有关键部分。
当push
(和pop
)正在执行时,数据结构是不一致的状态(例如,计数可能尚未写入,因此可能仍包含原始值)。通过使用互斥锁包装这些方法,我们可以确保一次只有一个线程可以更新(或读取)栈。
候选“解决方案”如下所示。这是对的吗?如果没有,它将如何失败?
// An attempt at a thread-safe stack (version 2)
#define STACK_SIZE 20
int count;
double values[STACK_SIZE];
pthread_mutex_t m1 = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t m2 = PTHREAD_MUTEX_INITIALIZER;
void push(double v) {
pthread_mutex_lock(&m1);
values[count++] = v;
pthread_mutex_unlock(&m1);
}
double pop() {
pthread_mutex_lock(&m2);
double v = values[--count];
pthread_mutex_unlock(&m2);
return v;
}
int is_empty() {
pthread_mutex_lock(&m1);
return count == 0;
pthread_mutex_unlock(&m1);
}
上面的代码('版本 2')包含至少一个错误。花点时间看看你是否可以得到错误并找出后果。
如果三个同时调用push()
,则m1
确保只有一个线程处理栈(两个线程需要等到第一个线程完成(调用解锁)),然后第二个线程将被允许继续进入临界区,最后第二个线程完成后,第三个线程将被允许继续。
类似的参数适用于pop
的并发调用(同时调用)。但是,版本 2 不会阻止 push 和 pop 同时运行,因为push
和pop
使用两个不同的互斥锁。
在这种情况下,修复很简单 - 对推送和弹出功能使用相同的互斥锁。
代码有第二个错误;比较后is_empty
返回,不会解锁互斥锁。但是,错误不会立即被发现。例如,假设一个线程调用is_empty
,第二个线程稍后调用push
。这个线程会神秘地停止。使用调试器可以发现线程卡在push
方法内的 lock()方法中,因为锁定从未被先前的is_empty
调用解锁。因此,在一个线程中的疏忽导致在任意其他线程中很晚的问题。
更好的版本如下所示 -
// An attempt at a thread-safe stack (version 3)
int count;
double values[count];
pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;
void push(double v) {
pthread_mutex_lock(&m);
values[count++] = v;
pthread_mutex_unlock(&m);
}
double pop() {
pthread_mutex_lock(&m);
double v = values[--count];
pthread_mutex_unlock(&m);
return v;
}
int is_empty() {
pthread_mutex_lock(&m);
int result= count == 0;
pthread_mutex_unlock(&m);
return result;
}
版本 3 是线程安全的(我们已确保所有关键部分的互斥)但是有两点需要注意:
is_empty
是线程安全的,但其结果可能已经过时,即在线程获得结果时栈可能不再为空!- 没有防止下溢(在空栈上弹出)或溢出(推入已经满的栈)的保护
后一点可以使用计数信号量来修复。
该实现假设一个栈。更通用的版本可能包含互斥锁作为内存结构的一部分,并使用 pthread_mutex_init 初始化互斥锁。例如,
// Support for multiple stacks (each one has a mutex)
typedef struct stack {
int count;
pthread_mutex_t m;
double *values;
} stack_t;
stack_t* stack_create(int capacity) {
stack_t *result = malloc(sizeof(stack_t));
result->count = 0;
result->values = malloc(sizeof(double) * capacity);
pthread_mutex_init(&result->m, NULL);
return result;
}
void stack_destroy(stack_t *s) {
free(s->values);
pthread_mutex_destroy(&s->m);
free(s);
}
// Warning no underflow or overflow checks!
void push(stack_t *s, double v) {
pthread_mutex_lock(&s->m);
s->values[(s->count)++] = v;
pthread_mutex_unlock(&s->m); }
double pop(stack_t *s) {
pthread_mutex_lock(&s->m);
double v = s->values[--(s->count)];
pthread_mutex_unlock(&s->m);
return v;
}
int is_empty(stack_t *s) {
pthread_mutex_lock(&s->m);
int result = s->count == 0;
pthread_mutex_unlock(&s->m);
return result;
}
使用示例:
int main() {
stack_t *s1 = stack_create(10 /* Max capacity*/);
stack_t *s2 = stack_create(10);
push(s1, 3.141);
push(s2, pop(s1));
stack_destroy(s2);
stack_destroy(s1);
}
使用计数信号量!使用计数信号量来跟踪剩余的空格数和另一个信号量来跟踪栈中的项目数。我们将这两个信号量称为“sremain”和“sitems”。记住,如果信号量的计数已减少到零(由另一个调用 sem_post 的线程),sem_wait
将等待。
// Sketch #1
sem_t sitems;
sem_t sremain;
void stack_init(){
sem_init(&sitems, 0, 0);
sem_init(&sremain, 0, 10);
}
double pop() {
// Wait until there's at least one item
sem_wait(&sitems);
...
void push(double v) {
// Wait until there's at least one space
sem_wait(&sremain);
...
草图#2 过早地实现了post
。另一个在推送中等待的线程可能会错误地尝试写入完整栈(类似地,在 pop()中等待的线程被允许过早地继续)。
// Sketch #2 (Error!)
double pop() {
// Wait until there's at least one item
sem_wait(&sitems);
sem_post(&sremain); // error! wakes up pushing() thread too early
return values[--count];
}
void push(double v) {
// Wait until there's at least one space
sem_wait(&sremain);
sem_post(&sitems); // error! wakes up a popping() thread too early
values[count++] = v;
}
Sketch 3 实现了正确的信号量逻辑,但是你能发现错误吗?
// Sketch #3 (Error!)
double pop() {
// Wait until there's at least one item
sem_wait(&sitems);
double v= values[--count];
sem_post(&sremain);
return v;
}
void push(double v) {
// Wait until there's at least one space
sem_wait(&sremain);
values[count++] = v;
sem_post(&sitems);
}
Sketch 3 使用信号量正确执行缓冲区已满和缓冲区空条件。然而,没有 _ 互斥 _:两个线程可能同时位于 _ 临界区 _ 中,这会破坏数据结构(或最不会导致数据丢失)。解决方法是在临界区周围包含一个互斥锁:
// Simple single stack - see above example on how to convert this into a multiple stacks.
// Also a robust POSIX implementation would check for EINTR and error codes of sem_wait.
// PTHREAD_MUTEX_INITIALIZER for statics (use pthread_mutex_init() for stack/heap memory)
pthread_mutex_t m= PTHREAD_MUTEX_INITIALIZER;
int count = 0;
double values[10];
sem_t sitems, sremain;
void init() {
sem_init(&sitems, 0, 0);
sem_init(&sremains, 0, 10); // 10 spaces
}
double pop() {
// Wait until there's at least one item
sem_wait(&sitems);
pthread_mutex_lock(&m); // CRITICAL SECTION
double v= values[--count];
pthread_mutex_unlock(&m);
sem_post(&sremain); // Hey world, there's at least one space
return v;
}
void push(double v) {
// Wait until there's at least one space
sem_wait(&sremain);
pthread_mutex_lock(&m); // CRITICAL SECTION
values[count++] = v;
pthread_mutex_unlock(&m);
sem_post(&sitems); // Hey world, there's at least one item
}
// Note a robust solution will need to check sem_wait's result for EINTR (more about this later)
- 锁定/解锁错误的互斥锁(由于愚蠢的错字)
- 没有解锁互斥锁(因为在错误情况下提前返回)
- 资源泄漏(不调用
pthread_mutex_destroy
) - 使用未初始化的互斥锁(或使用已经被破坏的互斥锁)
- 在线程上锁定互斥锁两次(不先解锁)
- 死锁和优先级倒置(稍后我们将讨论这些)