代码详解:信号量与自旋锁

信号量

        信号量(semaphore)是建立在原子操作的基础上的,它是由荷兰学者E.W.Dijkstra 提出来的。信号量实际上是一个整数型变量,有两个最基本的原子操作,就是操作系统理论教材中常说的P(Prolagen)操作和V(Verhogen) 操作,又被称为PV 原语。这两个单词是荷兰语,在Linux 内核中,人们使用down 和up 来表示。我们还是以上一节的打印机的例子来说明PV 操作。假设只有一个打印机,因此信号量printer 被初始设置为1。当进程需要使
用打印机时,进行P(down)操作,该操作定义如下:

代码片段2.17 示例代码
-------------------------------------------------------------------------------------------

1 printer--;
2 if (printer >= 0) {
3 /* 使用打印机。*/
4 } else {
5 /*
6 * 把当前进程设置为阻塞状态,并且加入到信号量printer 的等待队列,
7 * 然后调度其他进程运行。
8 */
9 }


        当一个进程在使用打印机结束时,必须进行V(up)操作,定义如下:
代码片段2.18 示例代码

1 printer++;
2 if (printer <= 0) {


2.3 同步与互斥29
3 /* 从信号量printer 的等待队列中唤醒一个进程。*/
4 }

        从这里可以看出,信号量的初值表示资源的可用数量,当它为0 时,说明供求刚好满足,当它小于0 时,说明供小于求,当它大于0 时反映了供大于求。例如当信号量为-2 时,说明等待队列中有两个进程在等待使用打印机。

在Linux 内核中,信号量定义如下:

代码片段2.19 节自include/asm/semaphore_32.h
-------------------------------------------------------------------------------------------

1 struct semaphore {
2 /* 信号量。*/
3 atomic_t count;
4 /* 等待该信号量的进程个数。*/
5 int sleepers;
6 /* 该信号量的等待队列。*/
7 wait_queue_head_t wait;
8 };


         Linux 内核中的P 操作对应于函数down(),定义如下:

           代码片段2.20 节自include/asm/semaphore_32.h
-------------------------------------------------------------------------------------------

1 static inline void down(struct semaphore * sem)
2 {
3 might_sleep();
4 __asm__ __volatile__(
5 # 在多CPU 平台下使用lock 前缀,锁住内存总线。
6 "# atomic down operation\n\t"
7 LOCK_PREFIX "decl %0\n\t"
8
9 # 如果sem->count >=0 就跳转到标号2 处。
10 "jns 2f\n"

11 "\tlea %0,%%eax\n\t"
12 "call __down_failed\n"
13 "2:"
14 :"+m" (sem->count)
15 :
16 :"memory","ax");
17 }

         在这个函数中,%0 对应于sem->count,如果sem->count 减1 后,结果不小于0,则说明down 操作成功完成。否则调用__down_failed,它最终将调用__down()把当前进程设置为等待状态,并把当前进程加入到该信号量的等待队列,调度其他进程来运行。当某个进程释放相应的资源时调用up(),定义如下:

代码片段2.21 节自include/asm/semaphore_32.h
-------------------------------------------------------------------------------------------

1 static inline void up(struct semaphore * sem)
2 {
3 __asm__ __volatile__(
4 # 在多CPU 平台下使用lock 前缀,锁住内存总线。
5 "# atomic up operation\n\t"
6 LOCK_PREFIX "incl %0\n\t"
7
8 # 如果sem->count 加1 大于0,就跳转到标号1。
9 "jg 1f\n\t"
10 "lea %0,%%eax\n\t"

11
12 "call __up_wakeup\n"
13 "1:"
14 :"+m" (sem->count)
15 :
16 :"memory","ax");
17 } 
        在这段代码中,如果sem->count 加1 的结果小于等于0,就说明该信号量的等待队列中不为空,因此调用__up_wakeup()从等待队列中唤醒相应的进程。

 自旋锁
         在前面的原子操作中,使用lock 前缀能够避免CPU 在执行单条指令时被打扰,然而当面对成块指令时就无能为力了。我们考虑下面这样一段代码:

代码片段2.22 示例代码
-------------------------------------------------------------------------------------------

1 struct _foo {
2 int tag;
3 int a;
4 int b;
5 int c;
6 };
7
8 #define SIZE 10
9 struct _foo foo[SIZE];
10

11 int clean_by_tag(int tag)
12 {
13 for (int i = 0; i < SIZE; i++) {
14 if (foo[i].tag == tag) {
15 foo[i].a = 0;
16 foo[i].b = 0;
17 foo[i].c = 0;
18 }
19 }
20 }

         由于CPU 在每条指令结束时都会进行中断检查,而中断产生具有不确定性。现在假设在单处理器系统上,一个进程在执行到第15行时,发生了一个中断,此时进入中断处理函数,而在中断处理函数中,中断处理结束时,内核可能调度另外一个进程运行(见第6章), 如果恰好这个进程需要访问上面要修改的数组成员,那么它将得到错误的数据,这些数据的一部分是旧的,另外一部分是新的。在某些场合,这将会导致严重的问题。为此要保证一个代码块的原子性,最好的办法是在第15行之前关闭中断,在第17行之后再开启中断。

        在x86 中,可以通过cli 指令清除标志寄存器中的中断允许位,在使用sti 指令开启中断前,CPU 执行完一条指令后,就不会进行中断检查,因此可以免受中断打扰。由于关闭中断后,系统的外部设备可能得不到响应,因此这就要求进程关中断的时间必须是短暂的,并且开启中断前,进程不得进入睡眠状态。

        通过关中断的方式可以使CPU 在执行某些“临界”代码块中免受打扰,但是cli 只能关闭当前CPU 的中断,因此在多CPU 系统中,还是无法避免这个问题,因为当其中一颗CPU 关闭中断后,执行上面的代码的同时,另外一颗CPU 可能也会访问同一个数据,在这种情况下同样会得到错误的结果。虽然可以通过信号量机制来防止其他CPU 在同时访问同一个数据,但是由于下面的理由,信号量显得不太合适。

        (1) 信号量引起的进程切换消耗相对较大,显得不划算。由于这类“临界”代码的特征是执行时间非常短暂,也就是说CPU 执行这段代码的时间,远远小于进程切换的消耗,因此在这种情况下,还不如让另外一个CPU 进入“忙等待”状态,直到临界代码操作完成。

        (2) 由于信号量可能引起的进程切换,但是在某些环境下,是不允许进程切换的,例如在中断环境中(见第6章)。因此在这种情况下,当一个进程不能进入临界区时,最好的办法是让CPU 进入忙等待状态。其基本原理就是设置一个锁变量,用来保护临界区代码,当CPU 进入临界区之前,检查锁的状态,如果已经上锁,则当前CPU 执行一个空循环反复检测锁的状态,直到其他的CPU 解锁。由于在测试期间,CPU 处于忙等待的“自旋”状态,因此把这种机制称为自旋锁。在Linux 内核中,自旋锁的类型为spinlock_t,其定义如下:

代码片段2.23 节自include/linux/spinlock_types.h

-------------------------------------------------------------------------------------------

1 typedef struct {
2 raw_spinlock_t raw_lock;
3 #if defined(CONFIG_PREEMPT) && defined(CONFIG_SMP)
4 unsigned int break_lock;
5 #endif
6 #ifdef CONFIG_DEBUG_SPINLOCK
7 unsigned int magic, owner_cpu;
8 void *owner;
9 #endif
10 #ifdef CONFIG_DEBUG_LOCK_ALLOC


11 struct lockdep_map dep_map;
12 #endif
13 } spinlock_t;


        为了方便讨论,我们假设没有配置CONFIG_PREEMPT 及CONFIG_DEBUG_XXX 标志,所以spinlock_t 就有一个raw_lock 成员,其类型为raw_spinlock_t,定义如下:

             代码片段2.24 raw_spinlock_t 定义
-------------------------------------------------------------------------------------------
1 #ifdef CONFIG_SMP
2 typedef struct {
3 unsigned int slock;
4 } raw_spinlock_t;
5 #else
6 typedef struct { } raw_spinlock_t;
7 #endif

         在单CPU 的情况下,raw_spinlock_t 为空,这个结构定义在include/linux/spinlock_types_up.h 文件中。在多CPU 的情况下,raw_spinlock_t 结构包含了一个slock 成员,slock 将记录锁的状态。

         当使用自旋锁时,首先需要用spin_lock_init 来初始化,spin_lock_init 是一个宏,其定义如下:

代码片段2.25 节自include/linux/spinlock.h

1
2
# define spin_lock_init(lock) \
do { *(lock) = SPIN_LOCK_UNLOCKED; } while (0)
        由于涉及自旋锁的调试,SPIN_LOCK_UNLOCKED 是一个套了好几层的宏,这里我们略去中间的宏,最终SPIN_LOCK_UNLOCKED 展开如下:

1 #ifdef CONFIG_SMP
代码片段2.26 SPIN_LOCK_UNLOCKED 宏
2
3
4
5
lock.raw_lock = 1;
#else
lock.raw_lock = {};
#endif

        在配置了CONFIG_SMP 的情况下,就把raw_lock 成员初始化为1,否则由于raw_lock 为空,所以在这里的初始化过程中,什么也不用做。

        申请和释放自旋锁分别由spin_lock_irq()和spin_unlock_irq()完成,spin_lock_irq()首先关闭中断,然后测试锁的状态,如果不可用就进入忙等待状态。这也是一个套了好多层的宏,我们略去中间步骤,结果如下:

代码片段2.27 节自kernel/spinlock.c

-------------------------------------------------------------------------------------------

1 #define spin_lock_irq(lock) _spin_lock_irq(lock)
2
3 void __lockfunc _spin_lock_irq(spinlock_t *lock)
4 {
5 local_irq_disable();
6 preempt_disable();
7 spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);
8 LOCK_CONTENDED(lock, _raw_spin_trylock, _raw_spin_lock);
9 }
10

11 /* LOCK_CONTENDED 其实就是调用_raw_spin_lock 函数。*/
12 #define LOCK_CONTENDED(_lock, try, lock)
13 lock(_lock)

        preempt_disable()的作用是关闭进程抢占,由于中断或系统调用之后,可能会调度其他的进程运行(例如当前进程时间片用完,或者有一个拥有更高优先级的进程已经进入了就绪状态),preempt_disable()关闭调度器的这个功能,从而保证当前进程在执行临界区代码的过程中不会被其他进程干扰。

         local_irq_disable()的作用是关闭中断,而LOCK_CONTENDED 最终将调用_raw_spin_lock()函数。在x86 平台上,local_irq_disable()最终调用native_irq_disable(),定义如下:

代码片段2.28 节自include/asm-x86/irqflags_32.h

-------------------------------------------------------------------------------------------

1 static inline void native_irq_disable(void)
2 {
3 asm volatile("cli": : :"memory");
4 }

 

        这其实就是一条关中断的汇编指令,前面我们说过,在单CPU 的系统中,进入临界区代码只需要关闭中断就可以了,而多CPU 的系统中,则需要测试自旋锁的状态。所以_raw_spin_lock()也有两个版本:

代码片段2.29 节自include/asm/spinlock_32.h
-------------------------------------------------------------------------------------------

1 # define _raw_spin_lock(lock) __raw_spin_lock(&(lock)->raw_lock)
2
3 #ifdef CONFIG_SMP
4 /* 多CPU,测试锁的状态,有可能进入自旋状态。*/
5 static inline void __raw_spin_lock(raw_spinlock_t *lock)
6 {
7 asm volatile("\n1:\t"
8 /* 锁住内存总线,lock->slock--。*/
9 LOCK_PREFIX " ; decb %0\n\t"


10 /*

11 * 如果lock->lock-->= 0,说明成功获取到了这个自旋锁,

12 * 跳转到标号3(第28行),成功退出。

13 */

14 "jns 3f\n"

15

16 /* 否则进入自旋状态,执行nop 指令,并测试lock->slock 是否为0。*/

17 "2:\t"

18 "rep;nop\n\t"

19 "cmpb $0,%0\n\t"

20

21 /* 如果lock->slock <=0,就跳转到标号2(第17行),继续“自旋”。*/
22 "jle 2b\n\t"
23 /*
24 * 否则,lock->slock > 0,就说明别的CPU 释放了该自旋锁,
25 * 就跳转到标号1(第7行),尝试开锁。
26 */
27 "jmp 1b\n"
28 "3:\n\t"

29 : "+m" (lock->slock) : : "memory");
30 }
31
32 #else
33 /* 单CPU 展开为一个空操作。*/
34 # define __raw_spin_lock(lock) do { (void)(lock); } while (0)
35 #endif

         从上面可以看出,当一个CPU 获取自旋锁失败时,这个CPU 就在循环中做无用功,等待其他的CPU 释放自旋锁。正是这个原因,所以要求持有自旋锁的时间必须尽可能短暂,并且持有自旋锁时不能进入睡眠状态。

类似地,spin_unlock_irq()展开的结果如下:

代码片段2.30 节自include/linux/spinlock.h
-------------------------------------------------------------------------------------------

1 # define spin_unlock_irq(lock) \
2 do { \
3 __raw_spin_unlock(&(lock)->raw_lock); \
4 __release(lock); \
5 local_irq_enable(); \
6 } while (0) 

         在单CPU 系统中,spin_unlock_irq()只需要打开中断就可以了,因此

         __raw_spin_unlock()是一个空操作,开中断由native_irq_enable()函数完成,定义如下:

        代码片段2.31 节自include/asm-x86/irqflags_32.h
1 static inline void native_irq_enable(void)
2 {
3 asm volatile("sti": : :"memory");
4 } 

         在多CPU 系统中,spin_unlock_irq()还需要把spinlock 设置为开锁状态。
代码片段2.32 节自include/asm/spinlock_32.h

1 static inline void __raw_spin_unlock(raw_spinlock_t *lock)
2 {
3 /* lock->slock = 1 */
4 asm volatile("movb $1,%0" : "+m" (lock->slock) :: "memory");
5 }

        从上面的分析中,我们看到spin_lock_irq()和spin_unlock_irq()保护临界区代码不受打扰。然而申请自旋锁时需要关闭中断,释放自旋锁时又要开启中断,这里带来了一个潜在的问题。现在假设某段代码在调用spin_lock_irq()之前需要关闭中断,之后获取了某个自旋锁,在释放自旋锁时,spin_unlock_irq()开启了中断,然而该段代码此时可能根本不允许开启中断。因此我们需要在调用申请自旋锁时保存当时的中断许可情况,在释放自旋锁时恢复它,而不是盲目地开启中断。spin_lock_irqsave 和spin_unlock_irqrestore 就用来完成这个工作,定义如下:

          代码片段2.33 节自include/linux/spinlock.h
-------------------------------------------------------------------------------------------

1 #define spin_lock_irqsave(lock, flags) \
2
3
4
5
flags = _spin_lock_irqsave(lock)
#define spin_unlock_irqrestore(lock, flags) \
_spin_unlock_irqrestore(lock, flags)

        由于中断许可位位于CPU 的标志寄存器中,因此spin_lock_irqsave()在获取自旋锁之前,把标志寄存器的值保存到flags 中,而spin_unlock_irqrestore()在释放自旋锁之后,根据flags 恢复标志寄存器。这是通过下面的两个函数完成的。

                        代码片段2.34 节自include/asm-x86/irqflags_32.h

-------------------------------------------------------------------------------------------

1 static inline unsigned long native_save_fl(void)
2 {
3 unsigned long f;
4 /* 把标志寄存器的值保存到f 中。*/
5 asm volatile("pushfl ; popl %0":"=g" (f): /* no input */);
6 return f;
7 }
8
9 static inline void native_restore_fl(unsigned long f)
10 {
11 /* 根据f 恢复标志寄存器。*/
12 asm volatile("pushl %0 ; popfl": /* no output */
13 :"g" (f)
14 :"memory", "cc");
15 }

        除此之外,内核中还有许多自旋锁的操作函数,例如read_lock_irqsave()/read_unlock_irq() 等,在理解了前面讨论的知识的基础上,其它的也是很容易理解的,读者可自行分析。

本文摘自《独辟蹊径品内核:Linux内核源代码导读》

投 票

觉得本文不错,投一票   

评 论


验证码: 看不清?换一张