0%

管道的神操作(事件通知)-sysrepo笔记(1)

前言

由于项目需要,安排了一个月左右的时间研究一下开源代码sysrepo这个共享内存型的数据库,后面得根据需要实现一些定制的需求,还有性能提升,数据是yang模型的数据。整体代码量大概3w行,每天刷个三四千行,每天日子过得很快很快,安排熟悉代码的时间也就七天,很是紧凑,消耗的还是有点儿慢。收获也颇多吧,从中也看到了许多linux接口的使用,后面陆续分享上来。

管道

今天讲的是管道的应用吧,在sysrepo中基本上对每个模块的路径事件的订阅都会创建一个subscription_ctx这么个结构,每个结构都会有一个管道关联起来。事件的内容写在一个内存映射的文件中,然后再通过管道写一个字节的消息。 这边,每个管道都会有一个线程进行读操作,当读到消息后,就去对应的内存映射文件中读取内容,然后再处理事件。处理完事件之后,把结果写回到内存映射文件的后面。

这里,使用管道实现了事件通知的作用。而不像平常使用的那样,用来直接传递的数据内容。在这里,每个管道都有唯一的一个管道id来标识它,管道的文件也是和这个管道id相关联的。管道文件存放在/etc/sysrepo/目录下,带有evpipe和数字的就是了。

这里,对管道的读有设置了一个超时时间,超时了就继续下一次读。然后,涉及一些管道读写方面的知识。

读管道:

    1. 管道中有数据,read返回实际读到的字节数。
    1. 管道中无数据:
      (1) 管道写端被全部关闭,read返回0 (好像读到文件结尾)
      (2) 写端没有全部被关闭,read阻塞等待(不久的将来可能有数据递达,此时会让出cpu)

写管道:

    1. 管道读端全部被关闭, 进程异常终止(也可使用捕捉SIGPIPE信号,使进程不终止)
    1. 管道读端没有全部关闭:
      (1) 管道已满,write阻塞。
      (2) 管道未满,write将数据写入,并返回实际写入的字节数。

这边对管道的写,每次都是重写打开写端,然后写一个字节, 通知完了就关闭。读端通过select监听读的fd,当有数据了就读了, 然后到对应的位置去取数据。不像正常的那样父子进程各自维护读写端。这边管道不是作为进程间通信来用的,而是线程间的事件传递。

示例代码

https://github.com/fishmwei/blog_code/tree/master/sysrepo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <time.h>
#include <pthread.h>
#include <fcntl.h>
#include <sys/stat.h>
// #include <asprintf.h>

int getPipePath(int number, char **path)
{
if (asprintf(path, "/sr_evpipe%d", number) == -1) {
return -1;
}

// printf("get path %s\r\n", *path);
return 0;
}


void write_notify(int number) {
char *path = NULL, buf[1] = {0};
int fd = -1, ret;

if (getPipePath(number, &path) < 0) {
printf("error get pipe path");
return;
}

if ((fd = open(path, O_WRONLY|O_NONBLOCK)) == -1)
{
free(path);
return;
}


free(path);

do {
ret = write(fd, buf, 1);
} while (!ret);
if (ret == -1) {
printf("error write");
return;
}

printf("write a notify\r\n");
if (fd > -1) {
close(fd);
}
}

void do_read(int fd)
{
int ret = 0;
char buf[1];
do {
ret = read(fd, buf, 1);
} while (ret == 1);
if ((ret == -1) && (errno != EAGAIN)) {
printf("read error\r\n");
return;
}
printf("read ok , do something!\r\n");
}

void *pipe_listen_thread(void *arg)
{
fd_set rfds;
struct timeval tv;
time_t stop_time_in = 0;
// pipe read

int read = *(int *)arg;
while (1) {
tv.tv_sec = 10;
tv.tv_usec = 0;
FD_ZERO(&rfds);
FD_SET(read, &rfds);
int ret = select(read + 1, &rfds, NULL, NULL, &tv);
if ((ret == -1) && (errno != EINTR)) {
break;
} else if ((!ret || ((ret == -1)&&(errno == EINTR)))) {
printf("time out");
continue;
}
// do read
do_read(read);
}
}

void main() {
int number = 0;
char *path = NULL;

if (getPipePath(number, &path) < 0) {
return;
}
unlink(path);
mode_t um = umask(0);
int ret = mkfifo(path, 00622);
umask(um);
if (ret == -1) {
printf("mkfifo error");
free(path);
return;
}
int readFd = open(path, O_RDWR | O_NONBLOCK);
if (readFd == -1) {
printf("open error");
free(path);
return;
}

free(path);

pthread_t tid;
if (ret = pthread_create(&tid, NULL, pipe_listen_thread, &readFd))
{
close(readFd);
return;
}

while (1) {
sleep(5);
write_notify(number);
}

pthread_join(tid, NULL);
close(readFd);
return ;
}


avatar

行动,才不会被动!

欢迎关注个人公众号 微信 -> 搜索 -> fishmwei,沟通交流。