PHP 高级编程之多线程

PHP 高级编程之多线程

http://netkiller.github.io/journal/php.thread.html


目录

  • 1. 多线程环境安装
    • 1.1. PHP 5.5.9
    • 1.2. 安装 pthreads 扩展
  • 2. Thread
  • 3. Worker 与 Stackable
  • 4. 互斥锁
    • 4.1. 多线程与共享内存
  • 5. 线程同步
  • 6. 线程池
    • 6.1. 线程池
    • 6.2. 动态队列线程池
    • 6.3. pthreads Pool类
  • 7. 多线程文件安全读写(文件锁)
  • 8. 多线程与数据连接
    • 8.1. Worker 与 PDO
    • 8.2. Pool 与 PDO
    • 8.3. 多线程中操作数据库总结
  • 9. Thread And ZeroMQ
    • 9.1. 数据库端
    • 9.2. 数据处理端

1. 多线程环境安装

1.1. PHP 5.5.9

安装PHP 5.5.9

https://github.com/oscm/shell/blob/master/php/5.5.9.sh

./configure --prefix=/srv/php-5.5.9 \
--with-config-file-path=/srv/php-5.5.9/etc \
--with-config-file-scan-dir=/srv/php-5.5.9/etc/conf.d \
--enable-fpm \
--with-fpm-user=www \
--with-fpm-group=www \
--with-pear \
--with-curl \
--with-gd \
--with-jpeg-dir \
--with-png-dir \
--with-freetype-dir \
--with-zlib-dir \
--with-iconv \
--with-mcrypt \
--with-mhash \
--with-pdo-mysql \
--with-mysql-sock=/var/lib/mysql/mysql.sock \
--with-openssl \
--with-xsl \
--with-recode \
--enable-sockets \
--enable-soap \
--enable-mbstring \
--enable-gd-native-ttf \
--enable-zip \
--enable-xml \
--enable-bcmath \
--enable-calendar \
--enable-shmop \
--enable-dba \
--enable-wddx \
--enable-sysvsem \
--enable-sysvshm \
--enable-sysvmsg \
--enable-opcache \
--enable-pcntl \
--enable-maintainer-zts \
--disable-debug			

编译必须启用zts支持否则无法安装 pthreads(--enable-maintainer-zts)

1.2. 安装 pthreads 扩展

安装https://github.com/oscm/shell/blob/master/php/pecl/pthreads.sh

# curl -s https://raw.github.com/oscm/shell/master/php/pecl/pthreads.sh | bash			

查看pthreads是否已经安装

# php -m | grep pthreads			

2. Thread

		<?php
class HelloWorld extends Thread {
    public function __construct($world) {
       $this->world = $world;
    }

    public function run() {
        print_r(sprintf("Hello %s\n", $this->world));
    }
}

$thread = new HelloWorld("World");

if ($thread->start()) {
    printf("Thread #%lu says: %s\n", $thread->getThreadId(), $thread->join());
}
?>		

3. Worker 与 Stackable

		<?php
class SQLQuery extends Stackable {

        public function __construct($sql) {
                $this->sql = $sql;
        }

        public function run() {
                $dbh  = $this->worker->getConnection();
                $row = $dbh->query($this->sql);
                while($member = $row->fetch(PDO::FETCH_ASSOC)){
                        print_r($member);
                }
        }

}

class ExampleWorker extends Worker {
        public static $dbh;
        public function __construct($name) {
        }

        /*
        * The run method should just prepare the environment for the work that is coming ...
        */
        public function run(){
                self::$dbh = new PDO('mysql:host=192.168.2.1;dbname=example','www','123456');
        }
        public function getConnection(){
                return self::$dbh;
        }
}

$worker = new ExampleWorker("My Worker Thread");

$work=new SQLQuery('select * from members order by id desc limit 5');
$worker->stack($work);

$table1 = new SQLQuery('select * from demousers limit 2');
$worker->stack($table1);

$worker->start();
$worker->shutdown();
?>		

4. 互斥锁

什么情况下会用到互斥锁?在你需要控制多个线程同一时刻只能有一个线程工作的情况下可以使用。

下面我们举一个例子,一个简单的计数器程序,说明有无互斥锁情况下的不同。

		<?php
$counter = 0;
//$handle=fopen("php://memory", "rw");
//$handle=fopen("php://temp", "rw");
$handle=fopen("/tmp/counter.txt", "w");
fwrite($handle, $counter );
fclose($handle);

class CounterThread extends Thread {
	public function __construct($mutex = null){
		$this->mutex = $mutex;
        $this->handle = fopen("/tmp/counter.txt", "w+");
    }
	public function __destruct(){
		fclose($this->handle);
	}
    public function run() {
		if($this->mutex)
			$locked=Mutex::lock($this->mutex);

		$counter = intval(fgets($this->handle));
		$counter++;
		rewind($this->handle);
		fputs($this->handle, $counter );
		printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);

		if($this->mutex)
			Mutex::unlock($this->mutex);
    }
}

//没有互斥锁
for ($i=0;$i<50;$i++){
	$threads[$i] = new CounterThread();
	$threads[$i]->start();

}

//加入互斥锁
$mutex = Mutex::create(true);
for ($i=0;$i<50;$i++){
	$threads[$i] = new CounterThread($mutex);
	$threads[$i]->start();

}

Mutex::unlock($mutex);
for ($i=0;$i<50;$i++){
	$threads[$i]->join();
}
Mutex::destroy($mutex);

?>		

我们使用文件/tmp/counter.txt保存计数器值,每次打开该文件将数值加一,然后写回文件。当多个线程同时操作一个文件的时候,就会线程运行先后取到的数值不同,写回的数值也不同,最终计数器的数值会混乱。

没有加入锁的结果是计数始终被覆盖,最终结果是2

而加入互斥锁后,只有其中的一个进程完成加一工作并释放锁,其他线程才能得到解锁信号,最终顺利完成计数器累加操作

上面例子也可以通过对文件加锁实现,这里主要讲的是多线程锁,后面会涉及文件锁。

4.1. 多线程与共享内存

在共享内存的例子中,没有使用任何锁,仍然可能正常工作,可能工作内存操作本身具备锁的功能。

			<?php
$tmp = tempnam(__FILE__, 'PHP');
$key = ftok($tmp, 'a');

$shmid = shm_attach($key);
$counter = 0;
shm_put_var( $shmid, 1, $counter );

class CounterThread extends Thread {
	public function __construct($shmid){
        $this->shmid = $shmid;
    }
    public function run() {

		$counter = shm_get_var( $this->shmid, 1 );
		$counter++;
		shm_put_var( $this->shmid, 1, $counter );

		printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);
    }
}

for ($i=0;$i<100;$i++){
	$threads[] = new CounterThread($shmid);
}
for ($i=0;$i<100;$i++){
	$threads[$i]->start();

}

for ($i=0;$i<100;$i++){
	$threads[$i]->join();
}
shm_remove( $shmid );
shm_detach( $shmid );
?>			

5. 线程同步

有些场景我们不希望 thread->start() 就开始运行程序,而是希望线程等待我们的命令。

$thread->wait();测作用是 thread->start()后线程并不会立即运行,只有收到 $thread->notify(); 发出的信号后才运行

		<?php
$tmp = tempnam(__FILE__, 'PHP');
$key = ftok($tmp, 'a');

$shmid = shm_attach($key);
$counter = 0;
shm_put_var( $shmid, 1, $counter );

class CounterThread extends Thread {
	public function __construct($shmid){
        $this->shmid = $shmid;
    }
    public function run() {

        $this->synchronized(function($thread){
            $thread->wait();
        }, $this);

		$counter = shm_get_var( $this->shmid, 1 );
		$counter++;
		shm_put_var( $this->shmid, 1, $counter );

		printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);
    }
}

for ($i=0;$i<100;$i++){
	$threads[] = new CounterThread($shmid);
}
for ($i=0;$i<100;$i++){
	$threads[$i]->start();

}

for ($i=0;$i<100;$i++){
	$threads[$i]->synchronized(function($thread){
		$thread->notify();
	}, $threads[$i]);
}

for ($i=0;$i<100;$i++){
	$threads[$i]->join();
}
shm_remove( $shmid );
shm_detach( $shmid );
?>		

6. 线程池

6.1. 线程池

自行实现一个Pool类

			<?php
class Update extends Thread {

    public $running = false;
    public $row = array();
    public function __construct($row) {

	$this->row = $row;
        $this->sql = null;
    }

    public function run() {

	if(strlen($this->row['bankno']) > 100 ){
		$bankno = safenet_decrypt($this->row['bankno']);
	}else{
		$error = sprintf("%s, %s\r\n",$this->row['id'], $this->row['bankno']);
		file_put_contents("bankno_error.log", $error, FILE_APPEND);
	}

	if( strlen($bankno) > 7 ){
		$sql = sprintf("update members set bankno = '%s' where id = '%s';", $bankno, $this->row['id']);

		$this->sql = $sql;
	}

	printf("%s\n",$this->sql);
    }

}

class Pool {
	public $pool = array();
	public function __construct($count) {
		$this->count = $count;
	}
	public function push($row){
		if(count($this->pool) < $this->count){
			$this->pool[] = new Update($row);
			return true;
		}else{
			return false;
		}
	}
	public function start(){
		foreach ( $this->pool as $id => $worker){
			$this->pool[$id]->start();
		}
	}
	public function join(){
		foreach ( $this->pool as $id => $worker){
               $this->pool[$id]->join();
		}
	}
	public function clean(){
		foreach ( $this->pool as $id => $worker){
			if(! $worker->isRunning()){
            	unset($this->pool[$id]);
            }
		}
	}
}

try {
	$dbh    = new PDO("mysql:host=" . str_replace(':', ';port=', $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array(
		PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',
		PDO::MYSQL_ATTR_COMPRESS => true
		)
	);

	$sql  = "select id,bankno from members order by id desc";
	$row = $dbh->query($sql);
	$pool = new Pool(5);
	while($member = $row->fetch(PDO::FETCH_ASSOC))
	{

		while(true){
			if($pool->push($member)){ //压入任务到池中
				break;
			}else{ //如果池已经满,就开始启动线程
				$pool->start();
				$pool->join();
				$pool->clean();
			}
		}
	}
	$pool->start();
    $pool->join();

	$dbh = null;

} catch (Exception $e) {
    echo '[' , date('H:i:s') , ']', '系统错误', $e->getMessage(), "\n";
}
?>			

6.2. 动态队列线程池

上面的例子是当线程池满后执行start统一启动,下面的例子是只要线程池中有空闲便立即创建新线程。

			<?php
class Update extends Thread {

    public $running = false;
    public $row = array();
    public function __construct($row) {

	$this->row = $row;
        $this->sql = null;
	//print_r($this->row);
    }

    public function run() {

	if(strlen($this->row['bankno']) > 100 ){
		$bankno = safenet_decrypt($this->row['bankno']);
	}else{
		$error = sprintf("%s, %s\r\n",$this->row['id'], $this->row['bankno']);
		file_put_contents("bankno_error.log", $error, FILE_APPEND);
	}

	if( strlen($bankno) > 7 ){
		$sql = sprintf("update members set bankno = '%s' where id = '%s';", $bankno, $this->row['id']);

		$this->sql = $sql;
	}

	printf("%s\n",$this->sql);
    }

}



try {
	$dbh    = new PDO("mysql:host=" . str_replace(':', ';port=', $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array(
		PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',
		PDO::MYSQL_ATTR_COMPRESS => true
		)
	);

	$sql     = "select id,bankno from members order by id desc limit 50";

	$row = $dbh->query($sql);
	$pool = array();
	while($member = $row->fetch(PDO::FETCH_ASSOC))
	{
		$id 	= $member['id'];
		while (true){
			if(count($pool) < 5){
				$pool[$id] = new Update($member);
				$pool[$id]->start();
				break;
			}else{
				foreach ( $pool as $name => $worker){
					if(! $worker->isRunning()){
						unset($pool[$name]);
					}
				}
			}
		}

	}

	$dbh = null;

} catch (Exception $e) {
    echo '【' , date('H:i:s') , '】', '【系统错误】', $e->getMessage(), "\n";
}
?>			

6.3. pthreads Pool类

pthreads 提供的 Pool class 例子

			<?php

class WebWorker extends Worker {

	public function __construct(SafeLog $logger) {
		$this->logger = $logger;
	}

	protected $loger;
}

class WebWork extends Stackable {

	public function isComplete() {
		return $this->complete;
	}

	public function run() {
		$this->worker
			->logger
			->log("%s executing in Thread #%lu",
				  __CLASS__, $this->worker->getThreadId());
		$this->complete = true;
	}

	protected $complete;
}

class SafeLog extends Stackable {

	protected function log($message, $args = []) {
		$args = func_get_args();

		if (($message = array_shift($args))) {
			echo vsprintf(
				"{$message}\n", $args);
		}
	}
}


$pool = new Pool(8, \WebWorker::class, [new SafeLog()]);

$pool->submit($w=new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->shutdown();

$pool->collect(function($work){
	return $work->isComplete();
});

var_dump($pool);			

7. 多线程文件安全读写(文件锁)

文件所种类。

LOCK_SH 取得共享锁定(读取的程序)。
LOCK_EX 取得独占锁定(写入的程序。
LOCK_UN 释放锁定(无论共享或独占)。
LOCK_NB 如果不希望 flock() 在锁定时堵塞		

共享锁例子

		<?php

$fp = fopen("/tmp/lock.txt", "r+");

if (flock($fp, LOCK_EX)) {  // 进行排它型锁定
    ftruncate($fp, 0);      // truncate file
    fwrite($fp, "Write something here\n");
    fflush($fp);            // flush output before releasing the lock
    flock($fp, LOCK_UN);    // 释放锁定
} else {
    echo "Couldn't get the lock!";
}

fclose($fp);

?>		

共享锁例子2

		<?php
$fp = fopen('/tmp/lock.txt', 'r+');

/* Activate the LOCK_NB option on an LOCK_EX operation */
if(!flock($fp, LOCK_EX | LOCK_NB)) {
    echo 'Unable to obtain lock';
    exit(-1);
}

/* ... */

fclose($fp);
?>		

8. 多线程与数据连接

pthreads 与 pdo 同时使用是,需要注意一点,需要静态声明public static $dbh;并且通过单例模式访问数据库连接。

8.1. Worker 与 PDO

			<?php
class Work extends Stackable {

        public function __construct() {
        }

        public function run() {
                $dbh  = $this->worker->getConnection();
                $sql     = "select id,name from members order by id desc limit 50";
                $row = $dbh->query($sql);
                while($member = $row->fetch(PDO::FETCH_ASSOC)){
                        print_r($member);
                }
        }

}

class ExampleWorker extends Worker {
        public static $dbh;
        public function __construct($name) {
        }

        /*
        * The run method should just prepare the environment for the work that is coming ...
        */
        public function run(){
                self::$dbh = new PDO('mysql:host=192.168.2.1;dbname=example','www','123456');
        }
        public function getConnection(){
                return self::$dbh;
        }
}

$worker = new ExampleWorker("My Worker Thread");

$work=new Work();
$worker->stack($work);

$worker->start();
$worker->shutdown();
?>			

原文发布于微信公众号 - Netkiller(netkiller-ebook)

原文发表时间:2015-09-23

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏阿杜的世界

Spring实战5-基于Spring构建Web应用主要内容

写在前面:关于Java Web,首先推荐一篇文章——写给java web一年左右工作经验的人,这篇文章的作者用精练的话语勾勒除了各种Java框架的缘由和最基本的...

1082
来自专栏一个会写诗的程序员的博客

Spring FrameWork 5.0 新功能 概览Spring FrameWork 5.0 新功能 概览

整个框架的代码基于java8 通过使用泛型等特性提高可读性 对java8提高直接的代码支撑

821
来自专栏编程之旅

Linux的用户管理(二)

上次的博客我们讲了关于Linux的用户管理的内容,现在我们来讲第二部分——系统用户组的管理。

1261
来自专栏流媒体

cmake用法

示例源码 在 linux 平台下使用 CMake 生成 Makefile 并编译的流程如下:

1223
来自专栏程序猿DD

Spring Cloud Config对特殊字符加密的处理

之前写过一篇关于配置中心对配置内容加密解密的介绍:《Spring Cloud构建微服务架构:分布式配置中心(加密解密)》。在这篇文章中,存在一个问题:当被加密内...

964
来自专栏偏前端工程师的驿站

Eclipse魔法堂:任务管理器

一、前言                                  Eclipse的任务管理器为我们提供一个方便的入口查看工程代办事宜,并定位到对应的代...

2138
来自专栏日常分享

Java 端口扫描器 TCP的实现方法

想必很多朋友都实现过一个简易的聊天室这个功能,其中涉及到Socket套接字这个类,我们通过一个特定的IP以及特定的端口创建一个服务端的套接字(ServerSoc...

1611
来自专栏美团技术团队

Spring MVC注解故障追踪记

Spring MVC是美团点评很多团队使用的Web框架。在基于Spring MVC的项目里,注解的使用几乎遍布在项目中的各个模块,有Java提供的注解,如:@O...

4167
来自专栏java闲聊

Netty(二) 创建简单的服务器

本篇文章是延续上一篇Netty文章,因此推荐先去看上一篇文章Netty(一),当然对Netty有一定认识略过。开始利用Netty创建一个简单的服务器

1232
来自专栏互联网技术杂谈

flume 1.8.0 开发之RPC

flume开发基础可见:https://cloud.tencent.com/developer/article/1195082

3975

扫码关注云+社区

领取腾讯云代金券