进程同步
临界区
我们假设 balance
是个全局变量,多线程同时执行以下操作。
1
balance = balance + 1
以上这句并不是原子性的,分为三步(实际上更复杂,还涉及到多CPU情况下的问题,比如有可能写入某个核自身的缓存,未必被其他核可见)
- 读取balance的值
- 将读到的值加1
- 将计算后的值写回内存
如果两个线程同时执行上面这行代码,就会产生难以预料的结果: 2或者1
这被称作 数据竞争(data race)
, 这种错误非常难以排查和复现。
产生这种问题的根本原因是操作系统对线程的调度,使得它并不能在指令执行顺序上提供保证。这就需要程序本身进行一些防护,以控制某些特定语句的执行顺序。
临界区:todo
锁
首先值得声明的是,锁的实现可以完全借助于软件,比如皮特森算法,但是使用起来不方便,很复杂而且难以理解。
原子指令
基于此,现在的硬件已经提供了实现锁的手段: 原子指令
。原子指令有很多种: TestAndSet
, CompareAndSwap(CAS)
, FetchAndAdd
.
这里可能存在疑问:对于多处理器,这些原子指令能否做到不和其他处理器上运行的原子指令交叉吗?事实上是有保证的,因为这些原子指令都要读内存,而现代的cpu已经对此做了锁总线的处理。也就是一个原子指令会直接霸占整个内存总线,使得别的原子指令必须等它执行后才能读内存。
以下有关原语的代码其实都是处理器的指令,我们只是用c语言来表述其中的逻辑。
1
2
3
4
5
6
int TestAndSet(int *p, int new){
int old_reg_value = *p;
*p = new;
return old_reg_value;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 真实的写法应该是这样
// char CompareAndSwap(int *ptr, int old, int new) {
// // 1 char CompareAndSwap(int *ptr, int old, int new) {
// unsigned char ret;
// // Note that sete sets a 'byte' not the word
// __asm__ __volatile__ (
// " lock\n"
// " cmpxchgl %2,%1\n"
// " sete %0\n"
// : "=q" (ret), "=m" (*ptr)
// : "r" (new), "m" (*ptr), "a" (old)
// : "memory");
// return ret;
// }
int CompareAndSwap(int *p, int old_value, int new){
int old_reg_value = *p;
if old_reg_value == old_value {
*p = new_value;
}
return old_reg_value;
}
1
2
3
4
5
int FetchAndAdd(int *ptr) {
int old = *ptr;
*ptr = old + 1;
return old;
}
自旋锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
typedef struct lock_t {
int flag;
} lock_t;
void init_lock(loct_t *mutex){
mutex->flag = 0;
}
void lock(lock_t *mutex){
while(CompareAndSwap(&mutex->flag, 0, 1) == 0){
// spin
}
}
void unlock(lock_t *mutex){
mutex->flag = 0;
}
自旋锁的优势在于,结构简单,不容易出错。
缺点也很明显:
-
首先,每个等待的线程都是忙等待,这样就浪费了大量的cpu周期。比如有n个线程争夺一把锁,有一个线程争取到了,但是在他当前的时间片中并没有执行完临界区的所有代码。那么剩余n-1个线程都要白白等待一个周期。cpu的利用率就瞬间变为了1/n。但是本身临界区就很短的话,用自旋锁其实效率也不错。 这一点也可在自旋得不到锁后使用
yield()
原语进行cpu的释放,但是时间片轮转仍然需要进行上下文切换,仍然有损耗,这一点会在后文解决。 -
其次,其他线程可能饥饿。也就是可能长期无法获得锁,这一点就需要采用更公平的锁。
两阶段锁
两阶段锁是自旋锁的一种改进形式,也是 linux 中采用的形式。
第一阶段采用自旋的形式试图获得锁,尝试只进行有限次(甚至是一次),如果获取不到再进入第二阶段。
第二阶段即睡眠,等待被唤醒。以节省cpu. 唤醒之后再次进入一阶段,这是为了避免在唤醒后又有新的线程获取了锁。
条件变量
在并发程序中,一个常见的场景是,线程在等待某个条件满足。这当然可以不停自旋等待,但是效率太低。因此发明了 条件变量(condition variable)
. 条件变量在wait时会主动放弃cpu. 但是值得注意的是,signal在调用时,如果没有正在等待的线程,就不会产生任何效果,下一个wait的线程不会直接唤醒,这一点与信号量不同。
1
2
pthread_cond_wait(pthread_cond_t *c, pthread_mutex_t *m);
pthread_cond_signal(pthread_cond_t *c);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 这里省略掉了 pthread_* 这些方法的错误处理,只是为了展现逻辑
lock_t mutex;
int done = 0;
void child(){
lock(&mutex);
done = 1;
pthread_cond_signal(&cond);
unlock(&mutex);
}
// 等待 child() 执行完成
void parent(){
// 如果不使用锁,就可能出现进入while后,执行wait前,child()执行完成的情况,这样会导致父进程永远睡眠
lock(&mutex);
// 如果不使用done,就可能发生 signal在wait前的情况,也会导致父进程永远睡眠
while(done == 0){
pthread_cond_wait(&cond,&mutex);
}
unlock(&mutex);
}
根据 标准的要求,pthread_cond_wait 必须要持有锁的情况下才能使用,在使用后,mutex会被释放以避免死锁。另外 pthread_cond_signal 除特殊情况外,最好也持有锁,这样能避免很多问题。
还有一个最佳实践是: 在唤醒之后,对于条件要再次检查,这是因为被唤醒进程并不是直接恢复执行,而是放到就绪队列中,这样恢复执行的时候其实未必仍然满足条件。这被称作 Mesa 语义(Mesa semantic)
。虽然在上文的例子中不明显,但是如果换一个稍微复杂的例子,那么就很可能出bug。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
cond_t empty, fill;
mutex_t mutex;
void *producer(void *arg) {
int i;
for (i = 0; i < loops; i++) {
Pthread_mutex_lock(&mutex);
while (count == 1)
Pthread_cond_wait(&empty, &mutex);
put(i);
Pthread_cond_signal(&fill);
Pthread_mutex_unlock(&mutex);
}
}
void *consumer(void *arg) {
int i;
for (i = 0; i < loops; i++) {
Pthread_mutex_lock(&mutex);
while (count == 0)
Pthread_cond_wait(&fill, &mutex);
int tmp = get();
Pthread_cond_signal(&empty);
Pthread_mutex_unlock(&mutex);
printf("%d\n", tmp);
}
}
以上这一段代码,如果把while改成if, 那么在一个生产者,两个消费者的情况下,就可能出bug。比如第一个生产者生产完之后,唤醒了消费者1(c1), 消费者2(c2)开始执行,消费掉了T1生产的内容,然后这时候c1开始执行,那么它就必须要检查目前缓冲区的内容个数,否则就会尝试读取空的缓冲区,引发bug.
信号量
信号量是有一个整数值的对象,可以用两个函数来操作它,PV的叫法是因为发明者Dijkstra是荷兰人,这是荷兰语中的wait和post。在 POSIX 中,采用以下的函数来进行操作;
1
2
3
4
5
6
7
8
9
10
#include <semaphore.h>
// 初始化信号量。 pshared 参数指明信号量是由进程内线程共享,还是由进程之间共享。如果 pshared 的值为 0,那么信号量将被进程内的线程共享,并且应该放置在这个进程的所有线程都可见的地址上。如果 pshared 是非零值,那么信号量将在进程之间共享。
int sem_init(sem_t *sem, int pshared, unsigned int value)
// P操作,使得信号量的值减1
int sem_wait(sem_t *s);
// V操作,使得信号量的值加1
int sem_post(sem_t *s);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
sem_t s;
void * child(void *arg) {
printf("child\n");
sem_post(&s); // signal here: child is done
return NULL;
}
int main(int argc, char *argv[]) {
sem_init(&s, 0, 0); // 注意这里必须是0
printf("parent: begin\n");
pthread_t c;
Pthread_create(c, NULL, child, NULL);
sem_wait(&s); // wait here for child
printf("parent: end\n");
return 0;
}
信号
信号全程为 软中断信号
.unix 系统中有多种信号,如 SIGHUP, SIGKILL等。
信号的发送往往是内核发送的,也可以是一个程序通过内核来向另一个程序发送信号。
对于信号的接受,有三种应对方式
- 忽略。但是有两种信号不能忽略,
SIGKILL
和SIGSTOP
. - 捕捉信号。即在程序中有对某些信号特殊的处理函数,注册到内核,使得内核对此进程使用这些信号时,自动调用这段函数。
- 系统默认动作。绝大多数是终止程序运行。
注意,信号的执行类似于中断,是强制切换控制流到信号处理函数,然后回到原来的位置。并且信号处理函数执行过程中也可能被其他的信号处理函数打断,因此信号处理程序中只能使用异步信号安全的函数(不包含 printf, malloc 等)。
另外,信号本身只传递信号类型,并不传递其他信息。而且,信号并不会排队,比如到来了一个K信号,正在处理的过程中又来一个k信号,那后来的会被丢弃掉。因此信号处理程序必须要认为自己接受到信号的时候,已经来了不止一个信号,不可以试图用信号来计数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#include <signal.h>
#include <stdio.h>
#include <windows.h>
void sig_handler(int sig) {
printf("get sig: %d\n", sig);
exit(0);
}
int main() {
// 只自定义 SIGINT 的处理
if (signal(SIGINT, sig_handler) == SIG_ERR) {
printf("set signal error\n");
return 0;
}
Sleep(10000);
return 0;
}
管道
管道在创建时获得一个固定大小的字节数。当一个进程试图往管道中写时,如果有足够的空间,则写请求立即被执行,否则该进程被阻塞。如果一个进程试图读取的字节数多于当前管道中的字节数,也将被阻塞。
匿名管道
管道采用一个 pipe(int *fd)
来生成,会返回两个文件描述符, fd[0]
用来读, fd[1]
用来写。
一个常见的范式是父进程fork出子进程,然后子进程就会复制一份父进程的文件描述符,然后父子进程各关掉一个(比如父亲关闭读,子进程关闭写,那么就可以实现从父进程向子进程发送数据,反之亦然)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 注意,此管道仅在Linux下可用
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
int main() {
int fd[2] = {};
if (pipe(fd) < 0) {
printf("pipe error\n");
exit(0);
}
pid_t pid;
if ((pid = fork()) < 0) {
printf("pipe error\n");
exit(0);
}
if (pid > 0) { // parent
close(fd[0]);
write(fd[1], "hello world\n", 12);
} else { // child
const int MAX_LINE = 20;
char line[MAX_LINE];
close(fd[1]);
int n = read(fd[0], line, MAX_LINE);
write(STDOUT_FILENO, line, n);
}
return 0;
}
当然也可以用高层的方法 popen()
来实现,这个是通过调用 shell 来实现的。
命名管道
命名管道本质上就是文件,适用于任何两个进程之间的交互。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// writer
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#define FIFO_NAME "/tmp/myfifo"
int main() {
if (access(FIFO_NAME, F_OK)) {
int err = mkfifo(FIFO_NAME, 0777);
if (err < 0) {
printf("mkfifo error\n");
exit(0);
}
}
int fifo_fd = open(FIFO_NAME, O_WRONLY);
if (fifo_fd < 0) {
printf("open fifo error\n");
exit(0);
}
char msg[] = "hello world";
write(fifo_fd, msg, strlen(msg));
while (1) {
} // 等待读取 fifo
close(fifo_fd);
return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// reader
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#define FIFO_NAME "/tmp/myfifo"
int main() {
if (access(FIFO_NAME, F_OK)) {
int err = mkfifo(FIFO_NAME, 0777);
if (err < 0) {
printf("mkfifo error\n");
exit(0);
}
}
int fifo_fd = open(FIFO_NAME, O_RDONLY);
if (fifo_fd < 0) {
printf("open fifo error\n");
exit(0);
}
char recv[100] = {0};
read(fifo_fd, recv, sizeof(recv)); // 读数据
printf("read from my_fifo buf=[%s]\n", recv);
close(fifo_fd);
return 0;
}