0%

iOS 信号量原理篇

信号量原理篇

源码版本:libdispatch-1173.40.5

信号量的实现主要是创建,销毁,wait和signal,四个方法。

信号量结构体:

1
2
3
4
5
6
7
8
9
DISPATCH_CLASS_DECL(semaphore, OBJECT);
struct dispatch_semaphore_s {
DISPATCH_OBJECT_HEADER(semaphore);
long volatile dsema_value;
long dsema_orig;
_dispatch_sema4_t dsema_sema;
};

typedef semaphore_t _dispatch_sema4_t;

信号量的创建—dispatch_semaphore_create

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
dispatch_semaphore_t
dispatch_semaphore_create(long value)
{
dispatch_semaphore_t dsema;

// If the internal value is negative, then the absolute of the value is
// equal to the number of waiting threads. Therefore it is bogus to
// initialize the semaphore with a negative value.
if (value < 0) {
return DISPATCH_BAD_INPUT;
}

dsema = _dispatch_object_alloc(DISPATCH_VTABLE(semaphore),
sizeof(struct dispatch_semaphore_s));
dsema->do_next = DISPATCH_OBJECT_LISTLESS;
dsema->do_targetq = _dispatch_get_default_queue(false);
dsema->dsema_value = value;
_dispatch_sema4_init(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
dsema->dsema_orig = value;
return dsema;
}

可以看到不能传入一个小于0的值。信号量有几个重要的属性:

dsema_value:记录当前计数值,后续的wait和signal函数会改变该值。

dsema_sema:信号量 (真正干事情的)

dsema_orig:记录创建时传入的初始计数值。在信号量销毁时有用到。

信号量wait操作—dispatch_semaphore_wait

1
2
3
4
5
6
7
8
9
long
dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout)
{
long value = os_atomic_dec2o(dsema, dsema_value, acquire);
if (likely(value >= 0)) {
return 0;
}
return _dispatch_semaphore_wait_slow(dsema, timeout);
}

os_atomic_dec2o是一个宏:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#define os_atomic_dec2o(p, f, m) \
os_atomic_sub2o(p, f, 1, m)

#define os_atomic_sub2o(p, f, v, m) \
os_atomic_sub(&(p)->f, (v), m)

#define os_atomic_sub(p, v, m) \
_os_atomic_c11_op((p), (v), m, sub, -)

#define _os_atomic_c11_op(p, v, m, o, op) \
({ _os_atomic_basetypeof(p) _v = (v), _r = \
atomic_fetch_##o##_explicit(_os_atomic_c11_atomic(p), _v, \
memory_order_##m); (__typeof__(_r))(_r op _v); })

#define _os_atomic_basetypeof(p) \
__typeof__(atomic_load_explicit(_os_atomic_c11_atomic(p), memory_order_relaxed))

#define _os_atomic_c11_atomic(p) \
((__typeof__(*(p)) _Atomic *)(p))

展开后如下:

1
long value = ({ __typeof__(atomic_load_explicit(((__typeof__(*((&(dsema)->dsema_value))) _Atomic *)((&(dsema)->dsema_value))), memory_order_relaxed)) _v = (((1))), _r = atomic_fetch_sub_explicit(((__typeof__(*((&(dsema)->dsema_value))) _Atomic *)((&(dsema)->dsema_value))), _v, memory_order_acquire); (__typeof__(_r))(_r - _v); });

简化下就是:

1
2
3
4
5
6
7
long value = (
{
_v = 1,
_r = atomic_fetch_sub_explicit(*((&(dsema)->dsema_value)), 1, memory_order_acquire);
_r - _v;
}
);

类似于:

1
2
3
long value = ({
int a = 3; int b = 1; a - b;
}); //value的值就等于a-b = 2; 源码里好多这样的宏

而atomic_fetch_sub_explicit函数表示dsema_value -= 1,并返回dsema_value原来的值。

因此long value = os_atomic_dec2o(dsema, dsema_value, acquire);

这句代码的意思就是:

原子操作,将信号量自减1,并将结果赋值给value。

由于是原子操作,因此同一时刻只会有一个线程对dsema_value进行减1。多线程下不会出问题。

如果value>=0,则返回0,这种情况不会阻塞线程。

否则调用_dispatch_semaphore_wait_slow函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
DISPATCH_NOINLINE
static long
_dispatch_semaphore_wait_slow(dispatch_semaphore_t dsema,
dispatch_time_t timeout)
{
long orig;

_dispatch_sema4_create(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
switch (timeout) {
default:
if (!_dispatch_sema4_timedwait(&dsema->dsema_sema, timeout)) {
break;
}
// Fall through and try to undo what the fast path did to
// dsema->dsema_value
case DISPATCH_TIME_NOW:
orig = dsema->dsema_value;
while (orig < 0) {
if (os_atomic_cmpxchgvw2o(dsema, dsema_value, orig, orig + 1,
&orig, relaxed)) { //超时的系统才会自动+1
return _DSEMA4_TIMEOUT();
}
}
// Another thread called semaphore_signal().
// Fall through and drain the wakeup.
case DISPATCH_TIME_FOREVER:
_dispatch_sema4_wait(&dsema->dsema_sema);
break;
}
return 0;
}

该方法会根据传入的dispatch_time_t进行区分:

其他等待时间:比如传入dispatch_time(DISPATCH_TIME_NOW, (int64_t)(2 * NSEC_PER_SEC)),即2s。

该case里主要就是_dispatch_sema4_timedwait:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
bool
_dispatch_sema4_timedwait(_dispatch_sema4_t *sema, dispatch_time_t timeout)
{
mach_timespec_t _timeout;
kern_return_t kr;

do {
uint64_t nsec = _dispatch_timeout(timeout);
_timeout.tv_sec = (__typeof__(_timeout.tv_sec))(nsec / NSEC_PER_SEC);
_timeout.tv_nsec = (__typeof__(_timeout.tv_nsec))(nsec % NSEC_PER_SEC);
kr = semaphore_timedwait(*sema, _timeout);
} while (unlikely(kr == KERN_ABORTED));

if (kr == KERN_OPERATION_TIMED_OUT) {
return true;
}
DISPATCH_SEMAPHORE_VERIFY_KR(kr);
return false;
}

计算剩余时间,调用mach内核的等待函数semaphore_timedwait()进行等待,线程进入休眠状态。如果在指定时间内没有得到通知,则会一直阻塞住,监听dsema_port等待其通知。

如果kr == KERN_OPERATION_TIMED_OUT,代表等待超时,则返回true。其他情况返回false。

当_dispatch_sema4_timedwait返回true时,case会掉入到DISPATCH_TIME_NOW处理。

当_dispatch_sema4_timedwait返回false时,代表在等待期间收到了一个信号,线程被唤醒。case会break;dispatch_semaphore_wait也就返回0。

DISPATCH_TIME_NOW:也就是不等待。

1
2
os_atomic_cmpxchgvw2o(dsema, dsema_value, orig, orig + 1,
&orig, relaxed)

该宏由以下宏组成:

1
2
3
4
5
6
7
8
9
10
11
12
13
#define os_atomic_cmpxchgvw2o(p, f, e, v, g, m) \
os_atomic_cmpxchgvw(&(p)->f, (e), (v), (g), m)

#define os_atomic_cmpxchgvw(p, e, v, g, m) \
({ _os_atomic_basetypeof(p) _r = (e); _Bool _b = \
atomic_compare_exchange_weak_explicit(_os_atomic_c11_atomic(p), \
&_r, v, memory_order_##m, memory_order_relaxed); *(g) = _r; _b; })

#define _os_atomic_basetypeof(p) \
__typeof__(atomic_load_explicit(_os_atomic_c11_atomic(p), memory_order_relaxed))

#define _os_atomic_c11_atomic(p) \
((__typeof__(*(p)) _Atomic *)(p))

将宏展开:

1
2
3
4
5
6
({ 
__typeof__(atomic_load_explicit(((__typeof__(*(&(dsema)->dsema_value)) _Atomic *)(&(dsema)->dsema_value)), memory_order_relaxed)) _r = ((orig));
_Bool _b = atomic_compare_exchange_weak_explicit(((__typeof__(*(&(dsema)->dsema_value)) _Atomic *)(&(dsema)->dsema_value)), &_r, (orig + 1), memory_order_relaxed, memory_order_relaxed);
*((&orig)) = _r;
_b;
})

主要是atomic_compare_exchange_weak_explicit函数的意思了:

_Bool atomic_compare_exchange_weak_explicit(volatile A * obj,C * expected,C desired,memory_order succ,memory_order fail);

比较并交换被封装的值(weak)与参数 expected 所指定的值是否相等,如果:
相等,则用 desired 替换原子对象的旧值。
不相等,则用原子对象的旧值替换 expected ,因此调用该函数之后,如果被该原子对象封装的值与参数 expected 所指定的值不相等,expected 中的内容就是原子对象的旧值。
该函数通常会读取原子对象封装的值,如果比较为 true(即原子对象的值等于 expected),则替换原子对象的旧值,但整个操作是原子的,在某个线程读取和修改该原子对象时,另外的线程不能读取和修改该原子对象。

比较的结果:true如果*obj等于*exp,否则false

在这里,orig肯定<0,所以会进入while循环,

如果orig和dsema_value相等(在这里orig是等于dsema_value的),则dsema_value加1,os_atomic_cmpxchgvw2o返回true,最终return _DSEMA4_TIMEOUT();

因此DISPATCH_TIME_NOW的作用:将信号量自加1(抵消掉了前面的自减1。注意即使这里自加1,dsema_value的值也不一定就是0),然后返回_DSEMA4_TIMEOUT()一个非0数。

DISPATCH_TIME_FOREVER:永久等待一个信号的发送。

这里主要就是_dispatch_sema4_wait的作用:

1
2
3
4
5
6
7
8
9
void
_dispatch_sema4_wait(_dispatch_sema4_t *sema)
{
kern_return_t kr;
do {
kr = semaphore_wait(*sema);
} while (kr == KERN_ABORTED);
DISPATCH_SEMAPHORE_VERIFY_KR(kr);
}

调用mach内核的等待函数semaphore_wait()进行等待。直到收到一个信号被唤醒。

信号量signal操作—dispatch_semaphore_signal

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
long
dispatch_semaphore_signal(dispatch_semaphore_t dsema)
{
long value = os_atomic_inc2o(dsema, dsema_value, release);
if (likely(value > 0)) {
return 0;
}
if (unlikely(value == LONG_MIN)) {
DISPATCH_CLIENT_CRASH(value,
"Unbalanced call to dispatch_semaphore_signal()");
}
return _dispatch_semaphore_signal_slow(dsema);
}

DISPATCH_NOINLINE
long
_dispatch_semaphore_signal_slow(dispatch_semaphore_t dsema)
{
_dispatch_sema4_create(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
_dispatch_sema4_signal(&dsema->dsema_sema, 1);
return 1;
}

void
_dispatch_sema4_signal(_dispatch_sema4_t *sema, long count)
{
do {
kern_return_t kr = semaphore_signal(*sema);
DISPATCH_SEMAPHORE_VERIFY_KR(kr);
} while (--count);
}

首先是宏os_atomic_inc2o,这个看字面意思很容易猜到是原子操作将dsema_value自加1,并将结果赋值给value。

如果value > 0,表示信号量足够,直接返回0。

如果value == LONG_MIN,则崩溃。应该是过度wait了。

否则调用_dispatch_semaphore_signal_slow函数,最终会调用内核函数semaphore_signal(*sema);发出一个信号唤醒一个wait的线程,返回1。

因此dispatch_semaphore_signal的作用就是将信号量+1,如果加1后信号量还是<=0,说明有wait的线程,那就唤醒最先等待的线程。

信号量的销毁—_dispatch_semaphore_dispose

信号量对象遵守ARC内存管理,当信号量对象引用计数为0时,比如将self.semaphore = nil,或者self.semaphore = 新信号量对象,那么之前的信号量对象将销毁。所以当还有wait中的线程时千万不要把信号量对象给整销毁了。

销毁时系统会调用_dispatch_semaphore_dispose函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
void
_dispatch_semaphore_dispose(dispatch_object_t dou,
DISPATCH_UNUSED bool *allow_free)
{
dispatch_semaphore_t dsema = dou._dsema;

if (dsema->dsema_value < dsema->dsema_orig) {
DISPATCH_CLIENT_CRASH(dsema->dsema_orig - dsema->dsema_value,
"Semaphore object deallocated while in use");
}

_dispatch_sema4_dispose(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
}

DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_sema4_dispose(_dispatch_sema4_t *sema, int policy)
{
if (_dispatch_sema4_is_created(sema)) {
_dispatch_sema4_dispose_slow(sema, policy);
}
}

void
_dispatch_sema4_dispose_slow(_dispatch_sema4_t *sema, int policy)
{
semaphore_t sema_port = *sema;
*sema = MACH_PORT_DEAD;
#if DISPATCH_USE_OS_SEMAPHORE_CACHE
if (policy == _DSEMA4_POLICY_FIFO) {
return os_put_cached_semaphore((os_semaphore_t)sema_port);
}
#endif
kern_return_t kr = semaphore_destroy(mach_task_self(), sema_port);
DISPATCH_SEMAPHORE_VERIFY_KR(kr);
}

如果dsema_value < dsema_orig,程序会崩溃并提示Semaphore object deallocated while in use。提示这个并不一定说明还有线程阻塞和等待signal,这是系统的一种过于保守的保护措施,目的就是要让业务层平衡使用signal和wait。所以函数说明才提醒我们wait和signal要配对使用。

换句话,只有当signal次数>=wait的次数,dsema_value才会>=dsema_orig。从实现来看,我们要保证signal次数>=wait的次数。

然后就是调用_dispatch_sema4_dispose进行销毁操作。

eg:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
- (void)test_semaphore_do_task_after_all_work1 {
/**
错误实现,没有理解dispatch_semaphore_signal的作用。
signal函数的作用:1.将信号量加1。2.如果之前有线程被阻塞,则按优先级唤醒一个线程。
下面的实现不仅达不到要求,并且还会崩溃.
*/
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
NSLog(@"current1:%@",[NSThread currentThread]);

dispatch_semaphore_t semaphore = dispatch_semaphore_create(2); //初始化时信号量是2.

dispatch_semaphore_wait(semaphore, DISPATCH_TIME_FOREVER);
__block NSInteger rs1 = 0;
[self requestSomethingWithN:40 completion:^(NSInteger result) {
rs1 = result;
NSLog(@"40 rs = %ld", (long)result);
dispatch_semaphore_signal(semaphore);
}];

dispatch_semaphore_wait(semaphore, DISPATCH_TIME_FOREVER);
__block NSInteger rs2 = 0;
[self requestSomethingWithN:41 completion:^(NSInteger result) {
rs2 = result;
NSLog(@"41 rs = %ld", (long)result);
/**
这里semaphore是等到block执行完成后才释放的。还以为block没强引用semaphore呢。
崩溃:
"BUG IN CLIENT OF LIBDISPATCH: Semaphore object deallocated while in use (current value < original value)"
*/
dispatch_semaphore_signal(semaphore); //销毁时,信号量是1,小于初始值2,因此崩溃。
}];

dispatch_semaphore_wait(semaphore, DISPATCH_TIME_FOREVER);
NSInteger rs = rs1 + rs2;
NSLog(@"==rs=%ld", rs);
});
}

思考

Q1:使用互斥锁和条件变量实现一个信号量?

在实现时特别要注意条件的判断。不能使用信号量计数值作为条件的判断条件,否则很容易导致循环等待。必须要额外定义一个变量专门用于条件的判断。

1
2
3
4
5
6
7
8
@interface XQCustomSemaphore ()

@property (nonatomic, assign) pthread_mutex_t mutex;
@property (nonatomic, assign) pthread_cond_t cond;
@property (nonatomic, assign) NSInteger value; //信号量的值
@property (nonatomic, assign) NSInteger wakeups; //记录唤醒次数,这个变量非常重要,条件判断时应该使用该变量

@end

实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
@implementation XQCustomSemaphore

- (void)dealloc {
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}

- (instancetype)initWithSemaphore:(NSInteger)semaphore {
self = [super init];
if (self) {
pthread_mutex_init(&_mutex, NULL);
pthread_cond_init(&_cond, NULL);
_value = semaphore;
_wakeups = 0;
}
return self;
}

- (void)signal {
pthread_mutex_lock(&_mutex);
_value++;
if (_value <= 0) {
_wakeups++;
pthread_cond_signal(&_cond);
}
pthread_mutex_unlock(&_mutex);
}

- (void)wait {
pthread_mutex_lock(&_mutex);
_value--;
if (_value < 0) {
do {
pthread_cond_wait(&_cond, &_mutex);
} while (_wakeups < 1); //如果因为其他原因被唤醒,_wakeups的值必定小于 1,于是又等待。
_wakeups--;
}
pthread_mutex_unlock(&_mutex);
}

//错误实现
- (void)signal {
pthread_mutex_lock(&_mutex);
_value++;
if (_value <= 0) {
pthread_cond_signal(&_cond);
}
pthread_mutex_unlock(&_mutex);
}

- (void)wait {
pthread_mutex_lock(&_mutex);
_value--;
while (_value < 0) { //wait--wait--signal.signal之后value=-1,还是小于0.所以必须定义一个wakeups变量用于条件判断。
pthread_cond_wait(&_cond, &_mutex);
}
pthread_mutex_unlock(&_mutex);
}

参考:操作系统思考 第十一章 C语言中的信号量

参考

深入浅出 GCD 之 dispatch_semaphore 介绍信号量源码实现的,不过版本较低

写给本身看的源码系列: GCD的信号量semaphore 新版本的

觉得文章有帮助可以打赏一下哦!