转载

多进程日志实现

在绝大多数情况下,日志模块对于一个程序而言是必不可少的,对于服务端程序的重要性尤甚。

因为几乎所有服务端程序都是作为守护进程而运行的,因此指望开发者从terminal获取程序的状态并不现实,这个时候,依靠日志记录关键信息就很重要了,当然日志的作用远不止记录信息,它还可以用于恢复,相信从事数据库开发的读者对这应该十分熟悉,但这不属于本文的范畴。

对于Java开发者而言,他们是很幸福的,因为有log4j,甚至还有slf4j,而对于C语言而言,并没有发现同等地位或者功能的日志库,更别提类似slf4j之类的日志库抽象层了。于是,我拿起键盘,再次造起了轮子。

本文谈及的实现是为了多进程程序而设计的,简单而有效是我们追求的目标,通常而言简单的程序性能不会太差,当然乱用锁的除外。

0x00 多进程下的问题

多进程程序最大的问题在于race condition,在多进程日志记录这个具体的情景下,最终记录日志信息的文件(以下简称日志文件)就成了hotspot(热点)。假设不考虑操作系统的机制,多个进程同时写日志,最直观的问题就是无法保证日志信息的有序性,最终的结果可能如下:

<code>进程A的日志片段 | 进程B的日志片段 | 进程A的日志片段 。。。 </code>

假设我们能够保证write操作的原子性,那么这个问题便不存在了。幸运的是,POSIX的write在一定程度上提供了这种机制,前提是设置了O_APPEND标志,当开启这个标记后,在一定程度上可以理解为write的原子追加操作。

但问题是,并不是所有的系统都遵循POSIX标准的,并且从当前获得的信息来看,并不是任意长度的append write操作都能够保证原子性,这里应该是有个阀值的,具体跟OS和FS有关。

先把OS提供的机制搁在一边,让我们先考虑写日志的操作。

由于我们无法对日志信息的长度作任何假设,因此我们必须考虑到因为需要频繁记录短日志信息而大量调用write的情况。须知系统调用的代价是很大的,因此相对理性的做法是先将日志信息缓存,然后待缓冲区满后一次性write。

因此我个人认为相对理性的做法还是实现一个多进程日志库,而非让每个进程直接write,即使OS能够在一定程度上保证append write的原子性。

0x01 实现思想

在单进程程序下,是不存在日志文件写竞争的问题的。在实现本日志库时,采用的核心思想也是如此。只需让一个单独的进程或者线程完成写日志的任务即可,也就是说这里使用的是C/S架构,写日志的进程或线程是Server,而提出记录日志请求的进程是Client。

出于实现的复杂度考虑,这里的Server采用线程来实现。具体而言,就是初始化日志库的进程会启动一个独立的线程作为日志库的Server,而这个进程以及其所派生的所有子进程都将是Client。

当然,这并非最完美的实现,因为它存在单点故障问题,这是Master/Salver架构无法回避的,当然,如果Server端实现得足够简单,甚至简单到没bug,它应该基本不会crash的。当然即使存在潜在的故障,也可以使用一些容错的手段,比如说选举算法,相关内容超过了本文的范畴。

0x02 进程间通讯

那么剩下的问题,似乎就只是进程间通讯了,这已经是老生常谈的话题了,经典的方案如下:

  • 共享内存
  • 管道
  • 套接字
  • 消息队列

在实现的时候,我并没有考虑共享内存,为了保证共享内存的原子性,例如采用semphore,会造成额外的开销和复杂度,似乎并没有必要这么做。

管道分为匿名和具名,区别是前者只能用在具有派生关系的进程之间,而后者就解决了这个问题。

管道和套接字之间的区别是,前者是单向的,而后者则是双向的,并且如果后者使用Datagram,就可以保留消息的边界,而前者是做不到这点的。另外,UNIX Domain Socket的性能和PIPE差不多,因为这个时候并不需要解析协议。

消息队列(MQ)一般而言是相对于高层Application而言的,对于服务器而言,似乎并没有什么必要。

很多情况下,当考虑进程间通讯时,都是首推套接字的,因为进程间通讯可能涉及跨主机通讯,对于这点,只有套接字能够支持。

由于当前的需求是单机多进程,考虑到性能,这里就采用UNIX Domain Socket了,今后改成支持多机多进程时,也会比较容易。

0x03 实现一览

如下为实现示例,它并不能直接运行,因为我对涉及IO操作和time格式化操作的函数做了一定的封装,但代码中并未提供这些函数,因此如果你想copy-paste,这是不可能的,你需要需要自行理解,然后亲手动手实现相关函数。

typedef struct log {     handler_t  server;     handler_t  client;     handler_t  logfile;     pthread_t  tid;     char*      data;     unsigned   len; } log_t, *Log; // 初始化Logger Log Log_init(Log logger) {     int fd[2];     // 使用socketpair创建客户端和服务端fd     if(socketpair(AF_UNIX, SOCK_STREAM, 0, fd) == -1) {         perror("Log_init");         exit(-1);     }     logger->server.fileno = fd[0];     logger->client.fileno = fd[1];     logger->server.type   = H_LOG;     logger->client.type   = H_LOG;     return logger; } // 获取logger client Log Log_getClient(Log logger) {     Handler_close(&logger->server);     // 此时将服务端的fd置为-1     logger->server.fileno = -1;     return logger; } // 获取logger server Log Log_getServer(Log logger, const char* filename) {     // server端也持有一个client     logger->data = (char*)calloc(1, LOG_BUF_LEN);     int fd = open(filename, O_CREAT | O_WRONLY | O_APPEND, 0644);     if(fd == -1) {         perror("Log_getServer");         exit(-1);     }     logger->logfile.fileno = fd;     logger->logfile.type   = H_FILE;     return logger; } // 关闭logger void Log_close(Log logger) {     if(logger->server.fileno != -1) {         Handler_close(&logger->client);         // 如果是master进程,则需要join服务线程         pthread_join(logger->tid, NULL);         Handler_close(&logger->server);         Handler_close(&logger->logfile);         free(logger->data);     } else {         Handler_close(&logger->client);     } } // 日志记录 void Log_record(Log logger, log_level_t level, const char* fmt, ...) {     char buf[2048];     char* p = buf;     uint8_t ls = LOG_STR_LEN[level];     strncpy(p, LOG_STR[level], ls);     p += ls;     *p++ = ' ';     time_t tnow = time(NULL);     // 将time转换成字符串     DateTime_formatTimeStamp(&tnow, p);     p += TIMESTAMP_LEN - 1;     /* separator "%s - %s" */     *p++ = ' '; *p++ = '-'; *p++ = ' ';     va_list varg;     va_start(varg, fmt);     vsnprintf(p, 2000, fmt, varg);     va_end(varg);     size_t len = strlen(buf);     buf[len++] = '/n';     int8_t ret = IO_writeSpec(&logger->client, buf, &len);     assert(ret == 0); } // 服务线程主函数 void* Log_main(void* arg) {     Log logger   = (Log)arg;     char* buf    = logger->data;     size_t len   = LOG_BUF_LEN;     size_t nread = LOG_BUF_LEN;     size_t off   = 0;     int8_t ret;     while(ret = IO_read(&logger->server, buf + off, &nread) == 0) {         off += nread;         len -= nread;         nread = len;         if(len == 0) {             // 对write函数的封装,写入指定长度的数据             ret = IO_writeSpec(&logger->logfile, buf, &off);             if(ret == -1) {                 perror("Log_main::write");                 exit(-1);             }             off   = 0;             len   = LOG_BUF_LEN;             nread = LOG_BUF_LEN;         }     }     if(ret == -1) {         perror("Log_main::write");         exit(-1);     }     /* peer shutdown */     /* @ASSERTION: ret == 1 */     // 如果client都close,那么ret == 1,结束循环,服务端将退出     if(off) {         ret = IO_writeSpec(&logger->logfile, buf, &off);         assert(ret == 0);     }     pthread_exit(0); } // 启动服务线程 void Log_runServer(Log logger) {     pthread_create(&logger->tid, NULL, Log_main, logger); }

上述代码比较简单,通过socketpair这一函数,可以直接创建服务端和客户端的fd。注意在fork的时候,fd也会被复制,因此在获取客户端的时候需要将服务端的fd关闭。服务线程的结束依靠read函数返回0来触发,但是fd有一个引用计数,只有将所有进程的client fd都close,才能使得该fd的引用计数为0,此时read函数才回返回0,表示对端关闭。

事实上,由于我们并不需要双向通讯,如果不考虑扩展性,这里使用PIPE也是可以的,方法差不多,替换socketpair即可,两者的性能也基本一致。

0xFF 总结

在0x01一节中,我们提到了目前这种实现并不完美,虽然并不存在真正完美的方案,但我们可以尽量向这个目标靠近。

对于本文而言,我们需要尽量保证Server线程不挂,这里主要面对的问题即为signal,更加健壮的做法是在Server线程中将一些signal给屏蔽,而上文的代码中并没有进行相应的处理,而这并不难,剩下的任务就留给大家了。

如果关于多进程日志的实现,各位如果有更加简单且有效的idea,还望能够分享。

原文  http://blog.lucode.net/linux/multiprocess-logger.html
正文到此结束
Loading...