锁机制与生产者消费者问题

线程的并发带来对共享数据访问的问题在前一篇文章已经做了简单描述,不过上一篇文章仅仅讨论是多个线程共享某一个基本数据类型的数据,并对其操作的情况,在线程的运用中,除了对某个数据的互斥操作之外,通常还有多个进程相互合作完成某项工作的问题。

关于此类问题最简单的模型就是播放器解码和播放两个线程的协同工作,解码线程完成对源数据的解码工作并将解码完成的数据写入缓冲区,播放线程取得缓冲区内已经被解码完成的数据进行播放。为了保证数据的顺序性要保证解码线程向缓冲区写入数据和播放线程从缓冲区得到数据的操作是互斥的,这点通过上节提到的互斥锁就可以完成。我们只需要在实际操作的上下部分加上互斥锁即可完成。

但是缓冲区相比与基本数据元素一个很大区别就是:线程并不是在得到临界区的访问权限后就能立即进行操作,即使是得到缓冲区操作权限的线程依旧要受制于某些条件。比如上面提到的解码线程在往缓冲区写入数据前,如果缓冲已满,显然不能再往缓冲区写入数据,否则会覆盖尚未被取走的数据。同样地,播放线程在从缓冲区取得数据时,如果缓冲区为空,也不能进行取得数据的操作。为了实现对类似上述需求模型的正确操作,我们将上述问题统一抽象成生产者消费者问题。

解决这类线程同步问题依旧需要使用锁机制,只不过这里除了互斥锁之外还需要加两个变量控制缓冲区的访问,分别是used和unuse。当生产者向缓冲区输入数据前,需要先检查缓冲区是否还有空的槽位(unuse),如果没有的话,将线程本身加入阻塞队列。同样,消费者取出缓冲区某个元素之前,都需要先判断缓冲区中是否有可以被使用的数据元素,如果没有则需要阻塞自己。

与互斥锁mutex必须成对出现不同的是,无论是生产者还是消费者中,P操作和V操作的对象都不同。对于生产者而言,P操作是判断是否有空位,所以是对ununse进行P操作,如果能够得到缓冲区的操作权,当完成对缓冲区的写操作后,需要对变量used进行V操作来唤醒一个等待的消费者。在消费者中的操作正好相反。

#include <stdio.h>

#include <stdlib.h>

#include <pthread.h>

 

typedef struct{

int *buf;

int size;

int rear;

int front;

sem_t mutex;

sem_t unuse;

sem_t used;

}sbuf;

 

void sbuf_init(sbuf *sp, int n)

{

sp->buf = (int *)malloc(n * sizeof(int));

sp->size = n;

sp->rear = sp->front = 0;

sem_init(&mutex, 0, 1);

sem_init(&unuse, 0, n);

sem_init(&used, 0, 0);

}

 

void insert(sbuf *sp, int value)

{

sem_wait(&unuse);

sem_wait(&mutex);

sp->buf[(++sp->rear) % (sp->size)];

sem_post(&mutex);

sem_post(&used);

}

 

void remove(sbuf *sp)

{

sem_wait(&used);

sem_wait(&mutex);

int value = sp->buf[(++sp->front) % (sp->size)];

sem_post(&mutex);

sem_post(&unuse);

return value;

}

我们通过对上述问题的描述,写成了一个buf包(insert涵盖了生产者操作的核心代码,remove操作涵盖了消费者操作的核心代码),主要的变量均定义在struct结构中,size表示缓冲区大小,rear和front分别指向缓冲区队列的首尾(此处使用循环队列),mutex表示互斥锁,used表示缓冲区中目前可用的变量数,unuse表示缓冲区中尚未使用的空位数。在对结构体赋初值时,其他几个变量太多可说的,主要就需要明确used和unuse的初值是多少,初始情况下我们默认缓冲区为空,所以used = 0, unused = n。另外尤其需要注意的是互斥锁和同步锁的顺序,如果将互斥锁包在同步锁外侧的话,很有可能出现一种情况就是某个进程得到了缓冲区唯一的操作权,但是再去判断同步锁时被阻塞,这样就可能导致所有进程都在等待缓冲区导致死锁。

Advertisements