本来要发一篇LBS(三)和《浪潮浮生记》,结果有事耽搁了整整两天,想了想拿以前的一篇自认为有价值的文章共享一下
事情是这样色(shai)儿的,前天晚些时候有一个搞灰产的羊毛狗子颠颠儿跑了过来跟我说“要整个大新闻”,简要对话如下:
毕竟拿人家钱手短,还是要替人消灾的。本着公开公正严谨负责的态度,我决定使用世界上最好的语言来帮他做这个大数据项目。
从工程哲学角度来讲,100多万数据如果只用一个进程跑,明显有点儿慢,所以多开几个进程就是了,于是就一边儿走一边儿在脑海里YY伪代码,你们感受一下:
<?php// 我不管你用啥办法把数据倒进来,读文件,读数据库,爱咋咋滴// fork出1000个进程查询,内存大,cpu屌,1000个进程怎么了?// 我买个CPU不就是为的用它么,你省着干什么?for ( i = 1 ; i <= 1000; i++ ) { pid = fork(); // 在子进程里查询 if ( 0 == pid ) { ret = get_dizhi_by_phone( '手机号' ); // 查询出来后,记录到data.log文件里 file_put_contents( 'data.log', ret, FILE_APPEND ); exit; }}
毕竟是大工程大项目,我还是需要仔细过滤一遍每个细节的。从高性能到高可用,从可扩展性到易用性,从可维护性到强鲁棒性,我们都是要考虑在内的。然后再一遍又一遍的深思熟虑中,我就意识到了一个潜在的问题:
所以,考虑到 “多进程写入同一文件可能会导致数据覆盖” 可能会导致上亿的损失,我决定认真对待一下这个问题,于是我写了下面的代码测试了一波儿:
<?phpfor ( $i = 1; $i <= 30; $i++ ) { $pid = pcntl_fork(); if ( 0 == $pid ) { $my_pid = posix_getpid(); for ( $counter = 1; $counter <= 10000; $counter++ ) { file_put_contents( "./api.log", "what\r\n", FILE_APPEND ); //file_put_contents( "./api.log", date( 'Y-m-d H:i:s' ).' : '.$str."\r\n", FILE_APPEND|LOCK_EX ); } exit; }}
顺带给看不懂代码的萌新说明一下?那坨代码是啥意思:
fork出30个进程,每个进程写入10000个what,每个what独占一行。程序执行完毕后,理论上我们打开api.log文件,应该有 30 * 10000 行数据,就说明没有出现多进程写同一文件覆盖数据这种事情。
在正式开始运行这段代码之前,请允许我先说明一下此时此刻我内心的真实想法:
首先,我认为一定凑不够 30 * 10000 行数据,一定会有被覆盖的 其次,我认为在FILE_APPEND后面或一下EX_LOCK选项,就一定不会数据被覆盖的现象
然而,我还是太年轻了:
上面代码运行完毕后,我用编辑器打开api.log,然后看了下行数你们感受一下
这剧情太狗血,安全没按照我的剧本演...
30,0000行数据就这样静静地躺在文件里...
说出来我自己可能都不信,然而事实确实是发生了...
于是我又重新试了好多遍,然而确实结果都是一致的...
众所周知,我是个不会使用PS以及Photoshop的人,所以上图毫无PS痕迹,想必一定是真的...
事已至此,如果想搞明白到底为啥为这样,就只能去表演一波儿真正的技术了!!!
下载php源码包,我手贱随便选的版本是7.0.33,解压缩后请进入到下面目录中:php-7.0.33/ext/standard,然后打开该目录下的file.c文件,然后搜一把 “ file_put_contents “ 关键字,然后… …
case IS_STRING: if (Z_STRLEN_P(data)) { numbytes = php_stream_write(stream, Z_STRVAL_P(data), Z_STRLEN_P(data)); if (numbytes != Z_STRLEN_P(data)) { php_error_docref(NULL, E_WARNING, "Only "ZEND_LONG_FMT" of %zd bytes written, possibly out of free disk space", numbytes, Z_STRLEN_P(data)); numbytes = -1; } } break;
php-7.0.33/main/streams/php_streams.h,可以看到php_stream_write函数实际上是_php_stream_write,请注意第二坨代码中的12行,即_php_stream_write_buffer函数
#define php_stream_write(stream, buf, count) _php_stream_write(stream, (buf), (count))
PHPAPI size_t _php_stream_write(php_stream *stream, const char *buf, size_t count){ size_t bytes;
if (buf == NULL || count == 0 || stream->ops->write == NULL) { return 0; }
if (stream->writefilters.head) { bytes = _php_stream_write_filtered(stream, buf, count, PSFS_FLAG_NORMAL); } else { bytes = _php_stream_write_buffer(stream, buf, count); }
if (bytes) { stream->flags |= PHP_STREAM_FLAG_WAS_WRITTEN; }
return bytes;}
php-7.0.33/main/streams/streams.c的1097行,请注意下面这坨代码的第19行,我们重点关注stream->ops->write,那么我们得看下stream指针指向的到底是什么鬼东西:
/* Writes a buffer directly to a stream, using multiple of the chunk size */static size_t _php_stream_write_buffer(php_stream *stream, const char *buf, size_t count){ size_t didwrite = 0, towrite, justwrote;
/* if we have a seekable stream we need to ensure that data is written at the * current stream->position. This means invalidating the read buffer and then * performing a low-level seek */ if (stream->ops->seek && (stream->flags & PHP_STREAM_FLAG_NO_SEEK) == 0 && stream->readpos != stream->writepos) { stream->readpos = stream->writepos = 0;
stream->ops->seek(stream, stream->position, SEEK_SET, &stream->position); } while (count > 0) { towrite = count; if (towrite > stream->chunk_size) towrite = stream->chunk_size;
justwrote = stream->ops->write(stream, buf, towrite);
/* convert justwrote to an integer, since normally it is unsigned */ if ((int)justwrote > 0) { buf += justwrote; count -= justwrote; didwrite += justwrote;
/* Only screw with the buffer if we can seek, otherwise we lose data * buffered from fifos and sockets */ if (stream->ops->seek && (stream->flags & PHP_STREAM_FLAG_NO_SEEK) == 0) { stream->position += justwrote; } } else { break; } } return didwrite;
}
php-7.0.33/main/streams/plain_wrapper.c,我们需要关注php_stdiop_write:
PHPAPI php_stream_ops php_stream_stdio_ops = { php_stdiop_write, php_stdiop_read, php_stdiop_close, php_stdiop_flush, "STDIO", php_stdiop_seek, php_stdiop_cast, php_stdiop_stat, php_stdiop_set_option};
php-7.0.33/main/streams/plain_wrapper.c,搜索定位到php_stdiop_write,啊哈,请注意下面这坨代码的216行,嗯哼,write系统调用,一切真相大白:
static size_t php_stdiop_write(php_stream *stream, const char *buf, size_t count){ php_stdio_stream_data *data = (php_stdio_stream_data*)stream->abstract;
assert(data != NULL);
if (data->fd >= 0) {#ifdef PHP_WIN32 int bytes_written; if (ZEND_SIZE_T_UINT_OVFL(count)) { count = UINT_MAX; } bytes_written = _write(data->fd, buf, (unsigned int)count);#else
// ⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️ // 注意看这里!注意看这里!注意看这里! // ⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️ int bytes_written = write(data->fd, buf, count);
#endif if (bytes_written < 0) return 0; return (size_t) bytes_written; } else {
#if HAVE_FLUSHIO if (!data->is_pipe && data->last_op == 'r') { zend_fseek(data->file, 0, SEEK_CUR); } data->last_op = 'w';#endif
return fwrite(buf, 1, count, data->file); }}
请重点关注上面那坨代码中的第19行,这是在linux平台下file-put-contents最终的实现,就是write系统调用。不知道这个鬼东西的萌新去搜索一下吧,感受一下。
此时此刻,我不得不在此引经据典了,让我们翻开APUE的第71页,然后我要开始复制粘贴了:
上面的描述可能比较拗口,我用人话来粗暴地翻译一下:
当多个进程打开同一个文件的时候,操作系统会记录每一个进程打开的所有文件的记录并放到一个列表中;比如路人甲进程和路人乙进程,都打开了api.log文件,然后路人甲进程和路人乙进程的打开文件列表里都会记录下api.log这个记录,然后这个记录指向两个具体的文件详情,比如说路人甲打开api.log后将光标往后移动到了第三行开始,路人乙打开的api.log光标停留在第五行末尾,每个进程和每个进程打开的文件详情注定都是不同的,不然就要出乱子了。 然而这并没有完,路人甲的打开文件详情和路人乙的打开文件详情中又分别有一个指针,这两个指针最终指向了同一个同一个同一个同一个同一个叫做 “ i节点 ” 的地方,这个节点中存储了一个很重要的信息:文件长度。我们一旦使用APPEND标记的话,那么在向文件中写入内容的时候就会 “ 首先根据i节点中文件长度将文件偏移量定位到尾部然后写 ”,这里 “ 定位到尾部然后写 ” 是一个原子性的操作,一个进程必须要完整地完成 “ 定位到尾部并写 ” 的操作,这两个步骤中不存在割裂开的情况!就是说路人甲进程刚完成 “定位到尾部” 操作,路人乙进程开始 “ 写 ” 了,然后路人甲开始 “ 写 ” 了,结果路人乙写的内容被路人甲进程覆盖掉了。
所以,问题到这里,可是暂时得出一个这样的结论了:当file-put-contents函数中一旦启用了FILE_APPEND标记,那么无论你用多少个进程向同一个文件中写内容都不会出现进程间内容覆盖这种问题,并不需要EX_LOCK标记。
那么问题来了:EX_LOCK是做什么用的?
这个问题我们回到这坨代码中,这里我没怎么细究,下面是我猜的(如果我猜错了,请打脸),注意代码中第15、16、18三行中有一个chunk-size,大概意思就是分块写,所以我琢磨EX_LOCK作用应该是当路人甲进程在FILE-APPEND一个非常大的字符串的时候,加上这个锁子就可以保证路人甲将这么一坨非常大非常大的字符串写完;如果不加的话,当每次写完一个chunk-size后,很有可能会被路人乙进程插进来写点儿东西,这样,写入的内容就乱了,就是说FILE-APPEND可以保证没有覆盖写漏写这种问题了,但是会有顺序错乱这种存在的可能性,而EX—LOCK则就是来解决这个问题的。
static size_t _php_stream_write_buffer(php_stream *stream, const char *buf, size_t count){ size_t didwrite = 0, towrite, justwrote;
/* if we have a seekable stream we need to ensure that data is written at the * current stream->position. This means invalidating the read buffer and then * performing a low-level seek */ if (stream->ops->seek && (stream->flags & PHP_STREAM_FLAG_NO_SEEK) == 0 && stream->readpos != stream->writepos) { stream->readpos = stream->writepos = 0;
stream->ops->seek(stream, stream->position, SEEK_SET, &stream->position); } while (count > 0) { towrite = count; if (towrite > stream->chunk_size) towrite = stream->chunk_size;
justwrote = stream->ops->write(stream, buf, towrite);
/* convert justwrote to an integer, since normally it is unsigned */ if ((int)justwrote > 0) { buf += justwrote; count -= justwrote; didwrite += justwrote;
/* Only screw with the buffer if we can seek, otherwise we lose data * buffered from fifos and sockets */ if (stream->ops->seek && (stream->flags & PHP_STREAM_FLAG_NO_SEEK) == 0) { stream->position += justwrote; } } else { break; } } return didwrite;
}