0%

超时实现-sysrepo笔记(8)

前言

最近跟同事联调的时候,遇到了一个问题。我这边使用sysrepo实现了一个更改系统时间的功能的rpc,然后调用方每次调用的时候,如果设置的是将来的某个时刻,总是返回超时,设置当前时间以前的某个时刻,又是正常的。跟踪查看回调的代码,没有任何异常,而且时间的确设置下去了,也正常返回了。这是一个必现的问题,大概想了一下,应该是代码中使用系统时间判断超时了,需要深入看一下sysrepo的代码。

定位

继续深入到netopeer2跟踪到了具体返回超时的代码行:

1
2
3
4
5
rc = sr_rpc_subscribe_tree(np2srv.sr_sess, xpath, cb, NULL, 0, SR_SUBSCR_CTX_REUSE, &np2srv.sr_rpc_sub); \
if (rc != SR_ERR_OK) { \
ERR("Subscribing for \"%s\" RPC failed (%s).", xpath, sr_strerror(rc)); \
goto error; \
}

继续sr_rpc_subscribe_tree看到超时相关代码,一层层深入调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

sr_rpc_subscribe_tree
-->sr_shmsub_rpc_notify
-->sr_shmsub_notify_finish_wrunlock


static sr_error_info_t *
sr_shmsub_notify_finish_wrunlock(sr_sub_shm_t *sub_shm, size_t shm_struct_size, sr_sub_event_t expected_ev,
uint32_t timeout_ms, sr_error_info_t **cb_err_info)
{
...
sr_time_get(&timeout_ts, timeout_ms);

/* wait until this event was processed */
ret = 0;
while (!ret && (sub_shm->lock.readers || (!SR_IS_NOTIFY_EVENT(sub_shm->event) && (sub_shm->event != SR_SUB_EV_NONE)))) {
/* COND WAIT */
ret = pthread_cond_timedwait(&sub_shm->lock.cond, &sub_shm->lock.mutex, &timeout_ts);
}

if (ret) {
if (ret == ETIMEDOUT) {
/* handle corner-case when the subscriber has just woken up and is processing this event,
* lock should never be held for long */
sr_time_get(&timeout_ts, SR_RWLOCK_READ_TIMEOUT);
while (sub_shm->lock.readers) {
/* COND WAIT */
pthread_cond_timedwait(&sub_shm->lock.cond, &sub_shm->lock.mutex, &timeout_ts);
}

/* event timeout */
sr_errinfo_new(cb_err_info, SR_ERR_TIME_OUT, NULL, "Callback event \"%s\" with ID %u processing timed out.",
sr_ev2str(event), request_id);
if ((event == sub_shm->event) && (request_id == sub_shm->request_id)
&& ((expected_ev == SR_SUB_EV_SUCCESS) || (expected_ev == SR_SUB_EV_ERROR))) {
sub_shm->event = SR_SUB_EV_ERROR;
}
} else {
/* other error */
SR_ERRINFO_COND(&err_info, __func__, ret);
}
}
...
}

最后确认了,是这边pthread_cond_timedwait调用返回了超时。可以看到,外面只是传入了超时间隔timeout_ms,这里调用sr_time_get获取当前的时间,加上timeout_ms计算出超时的时刻。继续看sr_time_get的实现,里面获取时间,传递的clockid是CLOCK_REALTIME,获取系统实时的时间。这边就可以肯定,后面超时就是以系统时间为准了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void
sr_time_get(struct timespec *ts, uint32_t add_ms)
{
sr_error_info_t *err_info = NULL;

if (clock_gettime(CLOCK_REALTIME, ts) == -1) {
SR_ERRINFO_SYSERRNO(&err_info, "clock_gettime");
/* will not happen anyway */
sr_errinfo_free(&err_info);
return;
}

add_ms += ts->tv_nsec / 1000000;
ts->tv_nsec %= 1000000;
ts->tv_nsec += (add_ms % 1000) * 1000000;
ts->tv_sec += add_ms / 1000;
}

在调用rpc回调的时候,系统时间被修改为未来的时间,超过超时的时刻,那么pthread_cond_timedwait就会返回超时了。

更深入一步

我们继续看一下pthread_cond_timedwait的实现,这个是在libc里面的代码,需要下载libc的源码。

跟随超时时间,可以获取一层层的调用关系(看64位的实现):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
pthread_cond_timedwait
--> __pthread_cond_timedwait64
--> __pthread_cond_wait_common // 根据条件量的属性获取clockid
--> __futex_abstimed_wait_cancelable64
--> __futex_abstimed_wait_common
--> __futex_abstimed_wait_common64
--> futex_time64 // 最后是这个系统调用

int
___pthread_cond_timedwait (pthread_cond_t *cond, pthread_mutex_t *mutex,
const struct timespec *abstime)
{
struct __timespec64 ts64 = valid_timespec_to_timespec64 (*abstime);

return __pthread_cond_timedwait64 (cond, mutex, &ts64);
}


int
___pthread_cond_timedwait64 (pthread_cond_t *cond, pthread_mutex_t *mutex,
const struct __timespec64 *abstime)
{
/* Check parameter validity. This should also tell the compiler that
it can assume that abstime is not NULL. */
if (! valid_nanoseconds (abstime->tv_nsec))
return EINVAL;

/* Relaxed MO is suffice because clock ID bit is only modified
in condition creation. */
unsigned int flags = atomic_load_relaxed (&cond->__data.__wrefs);
clockid_t clockid = (flags & __PTHREAD_COND_CLOCK_MONOTONIC_MASK)
? CLOCK_MONOTONIC : CLOCK_REALTIME;
return __pthread_cond_wait_common (cond, mutex, clockid, abstime);
}

逻辑分析

整个逻辑看下来比较清晰了。首先条件量可以设置属性,设置超时时刻的类型。然后,计算超时的时候获取对应类型的时间加上超时间隔,计算出超时的时刻,然后就是调用系统调用判断是否超时了。

因此我们可以修改条件量的的时间属性来根本解决这个问题。我们修改的是系统时间,那么我们就不以系统实时时间来判定超时,使用CLOCK_MONOTONIC类型来解决。主要代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

// 初始化条件量
pthread_condattr_t cattr;
int ret = pthread_condattr_init(&cattr);
ret = pthread_condattr_setclock(&cattr, CLOCK_MONOTONIC);

ret = pthread_cond_init(&signal->cond, &signal->cattr);


// 时间获取

void
sr_time_get(struct timespec *ts, uint32_t add_ms)
{
sr_error_info_t *err_info = NULL;

// CLOCK_REALTIME 改为CLOCK_MONOTONIC
if (clock_gettime(CLOCK_MONOTONIC, ts) == -1) {
SR_ERRINFO_SYSERRNO(&err_info, "clock_gettime");
/* will not happen anyway */
sr_errinfo_free(&err_info);
return;
}

add_ms += ts->tv_nsec / 1000000;
ts->tv_nsec %= 1000000;
ts->tv_nsec += (add_ms % 1000) * 1000000;
ts->tv_sec += add_ms / 1000;
}

这样修改了一下,影响的面比较大,而且涉及修改的库比较多。 netopeer2sysrepolibnetconf2等都有相关修改,rpc这个机制是通用的,又不是仅为修改系统时间来用的。我简单验证了一下,虽然解决了修改系统rpc的问题,但是又出现了一些其他的超时,应该是没有改全。 OK,可以说这个方案可行,但是代价比较大,这样修改影响面太广了。

最终方案

我们看到这个出现问题的根因是修改了系统时间之后,pthrea_cond_timedwait返回了超时。那么根据处理逻辑,我们可以让接口先返回,然后再延迟设置系统时间,这样就可以规避问题了。我在回调函数里面,先对时间参数做合法性校验,基本就保证了修改系统时间一定成功了,然后启动一个线程,延迟1s再修改系统时间,之后线程结束。OK,验证效果达到预期,功能完美完成。改动量少,影响少,又可以完成功能。

延伸

不同的clockid对应不同的时间:

  • CLOCK_REALTIME:系统实时时间,随系统实时时间改变而改变,即从UTC1970-1-1 0:0:0开始计时,
    中间时刻如果系统时间被用户改成其他,则对应的时间相应改变
  • CLOCK_MONOTONIC:从系统启动这一刻起开始计时,不受系统时间被用户改变的影响
  • CLOCK_PROCESS_CPUTIME_ID:本进程到当前代码系统CPU花费的时间
  • CLOCK_THREAD_CPUTIME_ID:本线程到当前代码系统CPU花费的时间

有时候遇到问题,获取到了根本原因,但是根据各个方面的考量,解决问题的方案不一定是从根本上解决问题,还可以绕过根本原因,让其不能达成。就好比遇到一座山,我们不一定要爬到山顶翻过去,还从山脚下找其他路绕过去。解决问题的方法总比问题多。

行动,才不会被动!

欢迎关注个人公众号 微信 -> 搜索 -> fishmwei,沟通交流。