信号量原理篇 源码版本:libdispatch-1173.40.5
信号量的实现主要是创建,销毁,wait和signal,四个方法。
信号量结构体:
1 2 3 4 5 6 7 8 9 DISPATCH_CLASS_DECL(semaphore, OBJECT); struct dispatch_semaphore_s { DISPATCH_OBJECT_HEADER(semaphore); long volatile dsema_value; long dsema_orig; _dispatch_sema4_t dsema_sema; }; typedef semaphore_t _dispatch_sema4_t ;
信号量的创建—dispatch_semaphore_create 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 dispatch_semaphore_t dispatch_semaphore_create (long value) { dispatch_semaphore_t dsema; if (value < 0 ) { return DISPATCH_BAD_INPUT; } dsema = _dispatch_object_alloc(DISPATCH_VTABLE(semaphore), sizeof (struct dispatch_semaphore_s)); dsema->do_next = DISPATCH_OBJECT_LISTLESS; dsema->do_targetq = _dispatch_get_default_queue(false ); dsema->dsema_value = value; _dispatch_sema4_init(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO); dsema->dsema_orig = value; return dsema; }
可以看到不能传入一个小于0的值。信号量有几个重要的属性:
dsema_value:记录当前计数值,后续的wait和signal函数会改变该值。
dsema_sema:信号量 (真正干事情的)
dsema_orig:记录创建时传入的初始计数值。在信号量销毁时有用到。
信号量wait操作—dispatch_semaphore_wait 1 2 3 4 5 6 7 8 9 long dispatch_semaphore_wait (dispatch_semaphore_t dsema, dispatch_time_t timeout) { long value = os_atomic_dec2o(dsema, dsema_value, acquire); if (likely(value >= 0 )) { return 0 ; } return _dispatch_semaphore_wait_slow(dsema, timeout); }
os_atomic_dec2o是一个宏:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 #define os_atomic_dec2o(p, f, m) \ os_atomic_sub2o(p, f, 1, m) #define os_atomic_sub2o(p, f, v, m) \ os_atomic_sub(&(p)->f, (v), m) #define os_atomic_sub(p, v, m) \ _os_atomic_c11_op((p), (v), m, sub, -) #define _os_atomic_c11_op(p, v, m, o, op) \ ({ _os_atomic_basetypeof(p) _v = (v), _r = \ atomic_fetch_##o##_explicit(_os_atomic_c11_atomic(p), _v, \ memory_order_##m); (__typeof__(_r))(_r op _v); }) #define _os_atomic_basetypeof(p) \ __typeof__(atomic_load_explicit(_os_atomic_c11_atomic(p), memory_order_relaxed)) #define _os_atomic_c11_atomic(p) \ ((__typeof__(*(p)) _Atomic *)(p))
展开后如下:
1 long value = ({ __typeof__(atomic_load_explicit(((__typeof__(*((&(dsema)->dsema_value))) _Atomic *)((&(dsema)->dsema_value))), memory_order_relaxed)) _v = (((1 ))), _r = atomic_fetch_sub_explicit(((__typeof__(*((&(dsema)->dsema_value))) _Atomic *)((&(dsema)->dsema_value))), _v, memory_order_acquire); (__typeof__(_r))(_r - _v); });
简化下就是:
1 2 3 4 5 6 7 long value = ( { _v = 1 , _r = atomic_fetch_sub_explicit(*((&(dsema)->dsema_value)), 1 , memory_order_acquire); _r - _v; } );
类似于:
1 2 3 long value = ({ int a = 3 ; int b = 1 ; a - b; });
而atomic_fetch_sub_explicit函数表示dsema_value -= 1,并返回dsema_value原来的值。
因此long value = os_atomic_dec2o(dsema, dsema_value, acquire);
这句代码的意思就是:
原子操作,将信号量自减1,并将结果赋值给value。
由于是原子操作,因此同一时刻只会有一个线程对dsema_value进行减1。多线程下不会出问题。
如果value>=0,则返回0,这种情况不会阻塞线程。
否则调用_dispatch_semaphore_wait_slow函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 DISPATCH_NOINLINE static long _dispatch_semaphore_wait_slow(dispatch_semaphore_t dsema, dispatch_time_t timeout) { long orig; _dispatch_sema4_create(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO); switch (timeout) { default : if (!_dispatch_sema4_timedwait(&dsema->dsema_sema, timeout)) { break ; } case DISPATCH_TIME_NOW: orig = dsema->dsema_value; while (orig < 0 ) { if (os_atomic_cmpxchgvw2o(dsema, dsema_value, orig, orig + 1 , &orig, relaxed)) { return _DSEMA4_TIMEOUT(); } } case DISPATCH_TIME_FOREVER: _dispatch_sema4_wait(&dsema->dsema_sema); break ; } return 0 ; }
该方法会根据传入的dispatch_time_t进行区分:
其他等待时间:比如传入dispatch_time(DISPATCH_TIME_NOW, (int64_t)(2 * NSEC_PER_SEC)),即2s。
该case里主要就是_dispatch_sema4_timedwait:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 bool _dispatch_sema4_timedwait(_dispatch_sema4_t *sema, dispatch_time_t timeout) { mach_timespec_t _timeout; kern_return_t kr; do { uint64_t nsec = _dispatch_timeout(timeout); _timeout.tv_sec = (__typeof__(_timeout.tv_sec))(nsec / NSEC_PER_SEC); _timeout.tv_nsec = (__typeof__(_timeout.tv_nsec))(nsec % NSEC_PER_SEC); kr = semaphore_timedwait(*sema, _timeout); } while (unlikely(kr == KERN_ABORTED)); if (kr == KERN_OPERATION_TIMED_OUT) { return true ; } DISPATCH_SEMAPHORE_VERIFY_KR(kr); return false ; }
计算剩余时间,调用mach内核的等待函数semaphore_timedwait()进行等待,线程进入休眠状态。如果在指定时间内没有得到通知,则会一直阻塞住,监听dsema_port等待其通知。
如果kr == KERN_OPERATION_TIMED_OUT,代表等待超时,则返回true。其他情况返回false。
当_dispatch_sema4_timedwait返回true时,case会掉入到DISPATCH_TIME_NOW处理。
当_dispatch_sema4_timedwait返回false时,代表在等待期间收到了一个信号,线程被唤醒。case会break;dispatch_semaphore_wait也就返回0。
DISPATCH_TIME_NOW:也就是不等待。
1 2 os_atomic_cmpxchgvw2o(dsema, dsema_value, orig, orig + 1 , &orig, relaxed)
该宏由以下宏组成:
1 2 3 4 5 6 7 8 9 10 11 12 13 #define os_atomic_cmpxchgvw2o(p, f, e, v, g, m) \ os_atomic_cmpxchgvw(&(p)->f, (e), (v), (g), m) #define os_atomic_cmpxchgvw(p, e, v, g, m) \ ({ _os_atomic_basetypeof(p) _r = (e); _Bool _b = \ atomic_compare_exchange_weak_explicit(_os_atomic_c11_atomic(p), \ &_r, v, memory_order_##m, memory_order_relaxed); *(g) = _r; _b; }) #define _os_atomic_basetypeof(p) \ __typeof__(atomic_load_explicit(_os_atomic_c11_atomic(p), memory_order_relaxed)) #define _os_atomic_c11_atomic(p) \ ((__typeof__(*(p)) _Atomic *)(p))
将宏展开:
1 2 3 4 5 6 ({ __typeof__(atomic_load_explicit(((__typeof__(*(&(dsema)->dsema_value)) _Atomic *)(&(dsema)->dsema_value)), memory_order_relaxed)) _r = ((orig)); _Bool _b = atomic_compare_exchange_weak_explicit(((__typeof__(*(&(dsema)->dsema_value)) _Atomic *)(&(dsema)->dsema_value)), &_r, (orig + 1 ), memory_order_relaxed, memory_order_relaxed); *((&orig)) = _r; _b; })
主要是atomic_compare_exchange_weak_explicit函数的意思了:
_Bool atomic_compare_exchange_weak_explicit(volatile A * obj,C * expected,C desired,memory_order succ,memory_order fail);
比较并交换被封装的值(weak)与参数 expected 所指定的值是否相等,如果: 相等,则用 desired 替换原子对象的旧值。 不相等,则用原子对象的旧值替换 expected ,因此调用该函数之后,如果被该原子对象封装的值与参数 expected 所指定的值不相等,expected 中的内容就是原子对象的旧值。 该函数通常会读取原子对象封装的值,如果比较为 true(即原子对象的值等于 expected),则替换原子对象的旧值,但整个操作是原子的,在某个线程读取和修改该原子对象时,另外的线程不能读取和修改该原子对象。
比较的结果:true如果*obj等于*exp,否则false。
在这里,orig肯定<0,所以会进入while循环,
如果orig和dsema_value相等(在这里orig是等于dsema_value的),则dsema_value加1,os_atomic_cmpxchgvw2o返回true,最终return _DSEMA4_TIMEOUT();
因此DISPATCH_TIME_NOW的作用:将信号量自加1(抵消掉了前面的自减1。注意即使这里自加1,dsema_value的值也不一定就是0),然后返回_DSEMA4_TIMEOUT()一个非0数。
DISPATCH_TIME_FOREVER:永久等待一个信号的发送。
这里主要就是_dispatch_sema4_wait的作用:
1 2 3 4 5 6 7 8 9 void _dispatch_sema4_wait(_dispatch_sema4_t *sema) { kern_return_t kr; do { kr = semaphore_wait(*sema); } while (kr == KERN_ABORTED); DISPATCH_SEMAPHORE_VERIFY_KR(kr); }
调用mach内核的等待函数semaphore_wait()进行等待。直到收到一个信号被唤醒。
信号量signal操作—dispatch_semaphore_signal 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 long dispatch_semaphore_signal (dispatch_semaphore_t dsema) { long value = os_atomic_inc2o(dsema, dsema_value, release); if (likely(value > 0 )) { return 0 ; } if (unlikely(value == LONG_MIN)) { DISPATCH_CLIENT_CRASH(value, "Unbalanced call to dispatch_semaphore_signal()" ); } return _dispatch_semaphore_signal_slow(dsema); } DISPATCH_NOINLINE long _dispatch_semaphore_signal_slow(dispatch_semaphore_t dsema) { _dispatch_sema4_create(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO); _dispatch_sema4_signal(&dsema->dsema_sema, 1 ); return 1 ; } void _dispatch_sema4_signal(_dispatch_sema4_t *sema, long count) { do { kern_return_t kr = semaphore_signal(*sema); DISPATCH_SEMAPHORE_VERIFY_KR(kr); } while (--count); }
首先是宏os_atomic_inc2o,这个看字面意思很容易猜到是原子操作将dsema_value自加1,并将结果赋值给value。
如果value > 0,表示信号量足够,直接返回0。
如果value == LONG_MIN,则崩溃。应该是过度wait了。
否则调用_dispatch_semaphore_signal_slow函数,最终会调用内核函数semaphore_signal(*sema);发出一个信号唤醒一个wait的线程,返回1。
因此dispatch_semaphore_signal的作用就是将信号量+1,如果加1后信号量还是<=0,说明有wait的线程,那就唤醒最先等待的线程。
信号量的销毁—_dispatch_semaphore_dispose 信号量对象遵守ARC内存管理,当信号量对象引用计数为0时,比如将self.semaphore = nil,或者self.semaphore = 新信号量对象,那么之前的信号量对象将销毁。所以当还有wait中的线程时千万不要把信号量对象给整销毁了。
销毁时系统会调用_dispatch_semaphore_dispose函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 void _dispatch_semaphore_dispose(dispatch_object_t dou, DISPATCH_UNUSED bool *allow_free) { dispatch_semaphore_t dsema = dou._dsema; if (dsema->dsema_value < dsema->dsema_orig) { DISPATCH_CLIENT_CRASH(dsema->dsema_orig - dsema->dsema_value, "Semaphore object deallocated while in use" ); } _dispatch_sema4_dispose(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO); } DISPATCH_ALWAYS_INLINE static inline void _dispatch_sema4_dispose(_dispatch_sema4_t *sema, int policy) { if (_dispatch_sema4_is_created(sema)) { _dispatch_sema4_dispose_slow(sema, policy); } } void _dispatch_sema4_dispose_slow(_dispatch_sema4_t *sema, int policy) { semaphore_t sema_port = *sema; *sema = MACH_PORT_DEAD; #if DISPATCH_USE_OS_SEMAPHORE_CACHE if (policy == _DSEMA4_POLICY_FIFO) { return os_put_cached_semaphore((os_semaphore_t )sema_port); } #endif kern_return_t kr = semaphore_destroy(mach_task_self(), sema_port); DISPATCH_SEMAPHORE_VERIFY_KR(kr); }
如果dsema_value < dsema_orig,程序会崩溃并提示Semaphore object deallocated while in use。提示这个并不一定说明还有线程阻塞和等待signal,这是系统的一种过于保守的保护措施,目的就是要让业务层平衡使用signal和wait。所以函数说明才提醒我们wait和signal要配对使用。
换句话,只有当signal次数>=wait的次数,dsema_value才会>=dsema_orig。从实现来看,我们要保证signal次数>=wait的次数。
然后就是调用_dispatch_sema4_dispose进行销毁操作。
eg:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 - (void )test_semaphore_do_task_after_all_work1 { dispatch_async (dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0 ), ^{ NSLog (@"current1:%@" ,[NSThread currentThread]); dispatch_semaphore_t semaphore = dispatch_semaphore_create(2 ); dispatch_semaphore_wait(semaphore, DISPATCH_TIME_FOREVER); __block NSInteger rs1 = 0 ; [self requestSomethingWithN:40 completion:^(NSInteger result) { rs1 = result; NSLog (@"40 rs = %ld" , (long )result); dispatch_semaphore_signal(semaphore); }]; dispatch_semaphore_wait(semaphore, DISPATCH_TIME_FOREVER); __block NSInteger rs2 = 0 ; [self requestSomethingWithN:41 completion:^(NSInteger result) { rs2 = result; NSLog (@"41 rs = %ld" , (long )result); dispatch_semaphore_signal(semaphore); }]; dispatch_semaphore_wait(semaphore, DISPATCH_TIME_FOREVER); NSInteger rs = rs1 + rs2; NSLog (@"==rs=%ld" , rs); }); }
思考
Q1:使用互斥锁和条件变量实现一个信号量?
在实现时特别要注意条件的判断。不能使用信号量计数值作为条件的判断条件,否则很容易导致循环等待。必须要额外定义一个变量专门用于条件的判断。
1 2 3 4 5 6 7 8 @interface XQCustomSemaphore ()@property (nonatomic , assign ) pthread_mutex_t mutex;@property (nonatomic , assign ) pthread_cond_t cond;@property (nonatomic , assign ) NSInteger value; @property (nonatomic , assign ) NSInteger wakeups; @end
实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 @implementation XQCustomSemaphore - (void )dealloc { pthread_mutex_destroy(&_mutex); pthread_cond_destroy(&_cond); } - (instancetype )initWithSemaphore:(NSInteger )semaphore { self = [super init]; if (self ) { pthread_mutex_init(&_mutex, NULL ); pthread_cond_init(&_cond, NULL ); _value = semaphore; _wakeups = 0 ; } return self ; } - (void )signal { pthread_mutex_lock(&_mutex); _value++; if (_value <= 0 ) { _wakeups++; pthread_cond_signal(&_cond); } pthread_mutex_unlock(&_mutex); } - (void )wait { pthread_mutex_lock(&_mutex); _value--; if (_value < 0 ) { do { pthread_cond_wait(&_cond, &_mutex); } while (_wakeups < 1 ); _wakeups--; } pthread_mutex_unlock(&_mutex); } - (void )signal { pthread_mutex_lock(&_mutex); _value++; if (_value <= 0 ) { pthread_cond_signal(&_cond); } pthread_mutex_unlock(&_mutex); } - (void )wait { pthread_mutex_lock(&_mutex); _value--; while (_value < 0 ) { pthread_cond_wait(&_cond, &_mutex); } pthread_mutex_unlock(&_mutex); }
参考:操作系统思考 第十一章 C语言中的信号量
参考 深入浅出 GCD 之 dispatch_semaphore 介绍信号量源码实现的,不过版本较低
写给本身看的源码系列: GCD的信号量semaphore 新版本的