0%

读写锁的源码解读-sysrepo笔记(7)

前言

sysrepo对资源的保护自己实现了一个读写锁, 支持多个线程同时读,只有一个线程在写的保护。使用的是互斥量加条件量来实现的。读写互斥使用一个变量控制。结构如下(基于1.4.87版本,新版本变了,原理差不多):

结构体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

/**
* @brief Lock mode.
*/
typedef enum sr_lock_mode_e {
SR_LOCK_NONE = 0, /**< Not locked. */
SR_LOCK_READ, /**< Read lock. */
SR_LOCK_WRITE, /**< Write lock. */
} sr_lock_mode_t;

/**
* @brief Sysrepo read-write lock.
*/
typedef struct sr_rwlock_s {
pthread_mutex_t mutex; /**< Lock mutex. */
pthread_cond_t cond; /**< Lock condition variable. */
uint16_t readers; /**< Current read-locked users. */
} sr_rwlock_t;

接口

对这个锁的操作主要有如下接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

/**
* @brief Initialize a sysrepo RW lock.
*
* @param[in,out] rwlock RW lock to initialize.
* @param[in] shared Whether the RW lock will be shared between processes or not.
* @return err_info, NULL on success.
*/
sr_error_info_t *sr_rwlock_init(sr_rwlock_t *rwlock, int shared);

/**
* @brief Destroy a sysrepo RW lock.
*
* @param[in] rwlock RW lock to destroy.
*/
void sr_rwlock_destroy(sr_rwlock_t *rwlock);

/**
* @brief Lock a sysrepo RW lock.
*
* @param[in] rwlock RW lock to lock.
* @param[in] timeout_ms Timeout in ms for locking.
* @param[in] mode Whether to write-lock or read-lock.
* @param[in] func Name of the calling function for logging.
* @return err_info, NULL on success.
*/
sr_error_info_t *sr_rwlock(sr_rwlock_t *rwlock, int timeout_ms, sr_lock_mode_t mode, const char *func);

/**
* @brief Unlock a sysrepo RW lock.
*
* @param[in] rwlock RW lock to unlock.
* @param[in] mode Whether to write-unlock or read-unlock.
* @param[in] func Name of the calling function for logging.
*/
void sr_rwunlock(sr_rwlock_t *rwlock, sr_lock_mode_t mode, const char *func);


实现原理

通过一个互斥量和一个条件量,还有锁结构的读者数目实现控制读写交互。

加锁

如果是写加锁,首先通过获取互斥量,然后等待读者数为0或者条件量(等待过程中,超时会释放掉互斥量),写加锁成功。 此时互斥量被写者占用。
在写加锁后,互斥量被占用。此时,再有其他写加锁或者读加锁,都获取不到互斥量,保证了写安全。

如果是读加锁,首先通过获取互斥量,然后增加读者数,释放互斥量,读加锁成功。此时互斥量空闲。
在读加锁后,互斥量空闲。如果有其他写加锁来,那么互斥量被占用,但读者数目非0,读加锁操作会阻塞等待读者数为0。
如果有其他读加锁来,那么互斥量先被占用,增加读者数,然后释放互斥量。读加锁成功,此时同时有多个读者。

解锁

如果是写解锁,如果读者数为0,广播条件量,然后释放互斥量。之后,再有其他写加锁或者读加锁,可以获取到互斥量,进行操作。

如果是读解锁,先获取互斥量,然后读者数减去1,如读者数为0,广播条件量,释放互斥量。之后,再有其他写加锁或者读加锁,可以获取到互斥量,进行操作。

代码实现

sr_rwlock_init

初始化接口, 主要对锁的结构体进行初始化。 这里有个shared表示这个锁支持进程间的互斥, 实际上就是这个锁结构在共享内存空间里面,可以支持多个进程对其进行操作。如果不在共享内存中, 就是进程的私有变量,shared就不能为1.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
sr_error_info_t *
sr_rwlock_init(sr_rwlock_t *rwlock, int shared)
{
sr_error_info_t *err_info = NULL;

if ((err_info = sr_mutex_init(&rwlock->mutex, shared))) {
return err_info;
}
rwlock->readers = 0;
if ((err_info = sr_cond_init(&rwlock->cond, shared))) {
pthread_mutex_destroy(&rwlock->mutex);
return err_info;
}

return NULL;
}

sr_error_info_t *
sr_mutex_init(pthread_mutex_t *lock, int shared)
{
sr_error_info_t *err_info = NULL;
pthread_mutexattr_t attr;
int ret;

/* check address alignment */
if (SR_MUTEX_ALIGN_CHECK(lock)) {
sr_errinfo_new(&err_info, SR_ERR_INTERNAL, NULL, "Mutex address not aligned.");
return err_info;
}

if (shared) {
/* init attr */
if ((ret = pthread_mutexattr_init(&attr))) {
sr_errinfo_new(&err_info, SR_ERR_SYS, NULL, "Initializing pthread attr failed (%s).", strerror(ret));
return err_info;
}
if ((ret = pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED))) {
pthread_mutexattr_destroy(&attr);
sr_errinfo_new(&err_info, SR_ERR_SYS, NULL, "Changing pthread attr failed (%s).", strerror(ret));
return err_info;
}

if ((ret = pthread_mutex_init(lock, &attr))) {
pthread_mutexattr_destroy(&attr);
sr_errinfo_new(&err_info, SR_ERR_SYS, NULL, "Initializing pthread mutex failed (%s).", strerror(ret));
return err_info;
}
pthread_mutexattr_destroy(&attr);
} else {
if ((ret = pthread_mutex_init(lock, NULL))) {
sr_errinfo_new(&err_info, SR_ERR_SYS, NULL, "Initializing pthread mutex failed (%s).", strerror(ret));
return err_info;
}
}

return NULL;
}

/**
* @brief Wrapper for pthread_cond_init().
*
* @param[out] cond Condition variable to initialize.
* @param[in] shared Whether the condition will be shared among processes.
* @return err_info, NULL on error.
*/
static sr_error_info_t *
sr_cond_init(pthread_cond_t *cond, int shared)
{
sr_error_info_t *err_info = NULL;
pthread_condattr_t attr;
int ret;

/* check address alignment */
if (SR_COND_ALIGN_CHECK(cond)) {
sr_errinfo_new(&err_info, SR_ERR_INTERNAL, NULL, "Condition variable address not aligned.");
return err_info;
}

if (shared) {
/* init attr */
if ((ret = pthread_condattr_init(&attr))) {
sr_errinfo_new(&err_info, SR_ERR_SYS, NULL, "Initializing pthread attr failed (%s).", strerror(ret));
return err_info;
}
if ((ret = pthread_condattr_setpshared(&attr, PTHREAD_PROCESS_SHARED))) {
pthread_condattr_destroy(&attr);
sr_errinfo_new(&err_info, SR_ERR_SYS, NULL, "Changing pthread attr failed (%s).", strerror(ret));
return err_info;
}

if ((ret = pthread_cond_init(cond, &attr))) {
pthread_condattr_destroy(&attr);
sr_errinfo_new(&err_info, SR_ERR_SYS, NULL, "Initializing pthread rwlock failed (%s).", strerror(ret));
return err_info;
}
pthread_condattr_destroy(&attr);
} else {
if ((ret = pthread_cond_init(cond, NULL))) {
sr_errinfo_new(&err_info, SR_ERR_SYS, NULL, "Initializing pthread rwlock failed (%s).", strerror(ret));
return err_info;
}
}

return NULL;
}

可以看到如果shared为1,会为pthread_mutex_t和pthread_cond_t设置属性attr, 设置PTHREAD_PROCESS_SHARED。这就是支持进程间互斥的条件量和互斥量。

sr_rwlock_destroy

销毁比较简单,就是释放pthread_mutex_t和pthread_cond_t的变量。

1
2
3
4
5
6
void
sr_rwlock_destroy(sr_rwlock_t *rwlock)
{
pthread_mutex_destroy(&rwlock->mutex);
pthread_cond_destroy(&rwlock->cond);
}

sr_rwlock

sr_rwlock这个函数实现了互斥量的获取和锁定,可以传入超时时间,在指定时间内未获取到互斥量则失败。如果是要对锁进行写操作,需要等所有的读锁释放掉。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
sr_error_info_t *
sr_rwlock(sr_rwlock_t *rwlock, int timeout_ms, sr_lock_mode_t mode, const char *func)
{
sr_error_info_t *err_info = NULL;
struct timespec timeout_ts;
int ret;

assert(mode != SR_LOCK_NONE);
assert(timeout_ms > 0);
sr_time_get(&timeout_ts, timeout_ms);

/* MUTEX LOCK */
ret = pthread_mutex_timedlock(&rwlock->mutex, &timeout_ts);
if (ret) {
SR_ERRINFO_LOCK(&err_info, func, ret);
return err_info;
}

if (mode == SR_LOCK_WRITE) {
/* write lock */
ret = 0;
// 获取条件量失败或者还有读锁,继续等待
while (!ret && rwlock->readers) {
/* COND WAIT */
ret = pthread_cond_timedwait(&rwlock->cond, &rwlock->mutex, &timeout_ts); // pthread_cond_timedwait 这个函数进入等待会释放互斥量,成功后会获得互斥量
}

if (ret) {
/* MUTEX UNLOCK */
pthread_mutex_unlock(&rwlock->mutex);

SR_ERRINFO_COND(&err_info, func, ret);
return err_info;
}
} else {
/* read lock */
++rwlock->readers;

/* MUTEX UNLOCK */
pthread_mutex_unlock(&rwlock->mutex);
}

return NULL;
}

sr_rwunlock

sr_rwunlock这个函数释放锁。释放读锁就是简单的对读者数减去1,当读者数为0发送一个条件量,然后释放互斥量。释放写锁就是当读者数为0发送一个条件量,然后释放互斥量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
void
sr_rwunlock(sr_rwlock_t *rwlock, sr_lock_mode_t mode, const char *func)
{
sr_error_info_t *err_info = NULL;
struct timespec timeout_ts;
int ret;

assert(mode != SR_LOCK_NONE);

if (mode == SR_LOCK_READ) {
sr_time_get(&timeout_ts, SR_RWLOCK_READ_TIMEOUT);

/* MUTEX LOCK */
ret = pthread_mutex_timedlock(&rwlock->mutex, &timeout_ts);
if (ret) {
SR_ERRINFO_LOCK(&err_info, func, ret);
sr_errinfo_free(&err_info);
}

if (!rwlock->readers) {
SR_ERRINFO_INT(&err_info);
sr_errinfo_free(&err_info);
} else {
/* remove a reader */
--rwlock->readers;
}
}

/* we are unlocking a write lock, there can be no readers */
assert((mode == SR_LOCK_READ) || !rwlock->readers);

if (!rwlock->readers) {
/* broadcast on condition */
pthread_cond_broadcast(&rwlock->cond);
}

/* MUTEX UNLOCK */
pthread_mutex_unlock(&rwlock->mutex);
}

小结

这就是sysrepo的读写锁实现。学习一个开源库,挖挖其内部的算法结构的实现,还是一个很不错的学习机会。IT行业就是工作到老,学习到老。

行动,才不会被动!

欢迎关注个人公众号 微信 -> 搜索 -> fishmwei,沟通交流。