前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >PHP 高级编程之多线程

PHP 高级编程之多线程

作者头像
netkiller old
发布2018-03-05 15:19:09
2.7K0
发布2018-03-05 15:19:09
举报
文章被收录于专栏:NetkillerNetkiller

PHP 高级编程之多线程

http://netkiller.github.io/journal/php.thread.html


目录

  • 1. 多线程环境安装
    • 1.1. PHP 5.5.9
    • 1.2. 安装 pthreads 扩展
  • 2. Thread
  • 3. Worker 与 Stackable
  • 4. 互斥锁
    • 4.1. 多线程与共享内存
  • 5. 线程同步
  • 6. 线程池
    • 6.1. 线程池
    • 6.2. 动态队列线程池
    • 6.3. pthreads Pool类
  • 7. 多线程文件安全读写(文件锁)
  • 8. 多线程与数据连接
    • 8.1. Worker 与 PDO
    • 8.2. Pool 与 PDO
    • 8.3. 多线程中操作数据库总结
  • 9. Thread And ZeroMQ
    • 9.1. 数据库端
    • 9.2. 数据处理端

1. 多线程环境安装

1.1. PHP 5.5.9

安装PHP 5.5.9

https://github.com/oscm/shell/blob/master/php/5.5.9.sh

代码语言:javascript
复制
./configure --prefix=/srv/php-5.5.9 \
--with-config-file-path=/srv/php-5.5.9/etc \
--with-config-file-scan-dir=/srv/php-5.5.9/etc/conf.d \
--enable-fpm \
--with-fpm-user=www \
--with-fpm-group=www \
--with-pear \
--with-curl \
--with-gd \
--with-jpeg-dir \
--with-png-dir \
--with-freetype-dir \
--with-zlib-dir \
--with-iconv \
--with-mcrypt \
--with-mhash \
--with-pdo-mysql \
--with-mysql-sock=/var/lib/mysql/mysql.sock \
--with-openssl \
--with-xsl \
--with-recode \
--enable-sockets \
--enable-soap \
--enable-mbstring \
--enable-gd-native-ttf \
--enable-zip \
--enable-xml \
--enable-bcmath \
--enable-calendar \
--enable-shmop \
--enable-dba \
--enable-wddx \
--enable-sysvsem \
--enable-sysvshm \
--enable-sysvmsg \
--enable-opcache \
--enable-pcntl \
--enable-maintainer-zts \
--disable-debug			

编译必须启用zts支持否则无法安装 pthreads(--enable-maintainer-zts)

1.2. 安装 pthreads 扩展

安装https://github.com/oscm/shell/blob/master/php/pecl/pthreads.sh

代码语言:javascript
复制
# curl -s https://raw.github.com/oscm/shell/master/php/pecl/pthreads.sh | bash			

查看pthreads是否已经安装

代码语言:javascript
复制
# php -m | grep pthreads			

2. Thread

代码语言:javascript
复制
		<?php
class HelloWorld extends Thread {
    public function __construct($world) {
       $this->world = $world;
    }

    public function run() {
        print_r(sprintf("Hello %s\n", $this->world));
    }
}

$thread = new HelloWorld("World");

if ($thread->start()) {
    printf("Thread #%lu says: %s\n", $thread->getThreadId(), $thread->join());
}
?>		

3. Worker 与 Stackable

代码语言:javascript
复制
		<?php
class SQLQuery extends Stackable {

        public function __construct($sql) {
                $this->sql = $sql;
        }

        public function run() {
                $dbh  = $this->worker->getConnection();
                $row = $dbh->query($this->sql);
                while($member = $row->fetch(PDO::FETCH_ASSOC)){
                        print_r($member);
                }
        }

}

class ExampleWorker extends Worker {
        public static $dbh;
        public function __construct($name) {
        }

        /*
        * The run method should just prepare the environment for the work that is coming ...
        */
        public function run(){
                self::$dbh = new PDO('mysql:host=192.168.2.1;dbname=example','www','123456');
        }
        public function getConnection(){
                return self::$dbh;
        }
}

$worker = new ExampleWorker("My Worker Thread");

$work=new SQLQuery('select * from members order by id desc limit 5');
$worker->stack($work);

$table1 = new SQLQuery('select * from demousers limit 2');
$worker->stack($table1);

$worker->start();
$worker->shutdown();
?>		

4. 互斥锁

什么情况下会用到互斥锁?在你需要控制多个线程同一时刻只能有一个线程工作的情况下可以使用。

下面我们举一个例子,一个简单的计数器程序,说明有无互斥锁情况下的不同。

代码语言:javascript
复制
		<?php
$counter = 0;
//$handle=fopen("php://memory", "rw");
//$handle=fopen("php://temp", "rw");
$handle=fopen("/tmp/counter.txt", "w");
fwrite($handle, $counter );
fclose($handle);

class CounterThread extends Thread {
	public function __construct($mutex = null){
		$this->mutex = $mutex;
        $this->handle = fopen("/tmp/counter.txt", "w+");
    }
	public function __destruct(){
		fclose($this->handle);
	}
    public function run() {
		if($this->mutex)
			$locked=Mutex::lock($this->mutex);

		$counter = intval(fgets($this->handle));
		$counter++;
		rewind($this->handle);
		fputs($this->handle, $counter );
		printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);

		if($this->mutex)
			Mutex::unlock($this->mutex);
    }
}

//没有互斥锁
for ($i=0;$i<50;$i++){
	$threads[$i] = new CounterThread();
	$threads[$i]->start();

}

//加入互斥锁
$mutex = Mutex::create(true);
for ($i=0;$i<50;$i++){
	$threads[$i] = new CounterThread($mutex);
	$threads[$i]->start();

}

Mutex::unlock($mutex);
for ($i=0;$i<50;$i++){
	$threads[$i]->join();
}
Mutex::destroy($mutex);

?>		

我们使用文件/tmp/counter.txt保存计数器值,每次打开该文件将数值加一,然后写回文件。当多个线程同时操作一个文件的时候,就会线程运行先后取到的数值不同,写回的数值也不同,最终计数器的数值会混乱。

没有加入锁的结果是计数始终被覆盖,最终结果是2

而加入互斥锁后,只有其中的一个进程完成加一工作并释放锁,其他线程才能得到解锁信号,最终顺利完成计数器累加操作

上面例子也可以通过对文件加锁实现,这里主要讲的是多线程锁,后面会涉及文件锁。

4.1. 多线程与共享内存

在共享内存的例子中,没有使用任何锁,仍然可能正常工作,可能工作内存操作本身具备锁的功能。

代码语言:javascript
复制
			<?php
$tmp = tempnam(__FILE__, 'PHP');
$key = ftok($tmp, 'a');

$shmid = shm_attach($key);
$counter = 0;
shm_put_var( $shmid, 1, $counter );

class CounterThread extends Thread {
	public function __construct($shmid){
        $this->shmid = $shmid;
    }
    public function run() {

		$counter = shm_get_var( $this->shmid, 1 );
		$counter++;
		shm_put_var( $this->shmid, 1, $counter );

		printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);
    }
}

for ($i=0;$i<100;$i++){
	$threads[] = new CounterThread($shmid);
}
for ($i=0;$i<100;$i++){
	$threads[$i]->start();

}

for ($i=0;$i<100;$i++){
	$threads[$i]->join();
}
shm_remove( $shmid );
shm_detach( $shmid );
?>			

5. 线程同步

有些场景我们不希望 thread->start() 就开始运行程序,而是希望线程等待我们的命令。

$thread->wait();测作用是 thread->start()后线程并不会立即运行,只有收到 $thread->notify(); 发出的信号后才运行

代码语言:javascript
复制
		<?php
$tmp = tempnam(__FILE__, 'PHP');
$key = ftok($tmp, 'a');

$shmid = shm_attach($key);
$counter = 0;
shm_put_var( $shmid, 1, $counter );

class CounterThread extends Thread {
	public function __construct($shmid){
        $this->shmid = $shmid;
    }
    public function run() {

        $this->synchronized(function($thread){
            $thread->wait();
        }, $this);

		$counter = shm_get_var( $this->shmid, 1 );
		$counter++;
		shm_put_var( $this->shmid, 1, $counter );

		printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);
    }
}

for ($i=0;$i<100;$i++){
	$threads[] = new CounterThread($shmid);
}
for ($i=0;$i<100;$i++){
	$threads[$i]->start();

}

for ($i=0;$i<100;$i++){
	$threads[$i]->synchronized(function($thread){
		$thread->notify();
	}, $threads[$i]);
}

for ($i=0;$i<100;$i++){
	$threads[$i]->join();
}
shm_remove( $shmid );
shm_detach( $shmid );
?>		

6. 线程池

6.1. 线程池

自行实现一个Pool类

代码语言:javascript
复制
			<?php
class Update extends Thread {

    public $running = false;
    public $row = array();
    public function __construct($row) {

	$this->row = $row;
        $this->sql = null;
    }

    public function run() {

	if(strlen($this->row['bankno']) > 100 ){
		$bankno = safenet_decrypt($this->row['bankno']);
	}else{
		$error = sprintf("%s, %s\r\n",$this->row['id'], $this->row['bankno']);
		file_put_contents("bankno_error.log", $error, FILE_APPEND);
	}

	if( strlen($bankno) > 7 ){
		$sql = sprintf("update members set bankno = '%s' where id = '%s';", $bankno, $this->row['id']);

		$this->sql = $sql;
	}

	printf("%s\n",$this->sql);
    }

}

class Pool {
	public $pool = array();
	public function __construct($count) {
		$this->count = $count;
	}
	public function push($row){
		if(count($this->pool) < $this->count){
			$this->pool[] = new Update($row);
			return true;
		}else{
			return false;
		}
	}
	public function start(){
		foreach ( $this->pool as $id => $worker){
			$this->pool[$id]->start();
		}
	}
	public function join(){
		foreach ( $this->pool as $id => $worker){
               $this->pool[$id]->join();
		}
	}
	public function clean(){
		foreach ( $this->pool as $id => $worker){
			if(! $worker->isRunning()){
            	unset($this->pool[$id]);
            }
		}
	}
}

try {
	$dbh    = new PDO("mysql:host=" . str_replace(':', ';port=', $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array(
		PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',
		PDO::MYSQL_ATTR_COMPRESS => true
		)
	);

	$sql  = "select id,bankno from members order by id desc";
	$row = $dbh->query($sql);
	$pool = new Pool(5);
	while($member = $row->fetch(PDO::FETCH_ASSOC))
	{

		while(true){
			if($pool->push($member)){ //压入任务到池中
				break;
			}else{ //如果池已经满,就开始启动线程
				$pool->start();
				$pool->join();
				$pool->clean();
			}
		}
	}
	$pool->start();
    $pool->join();

	$dbh = null;

} catch (Exception $e) {
    echo '[' , date('H:i:s') , ']', '系统错误', $e->getMessage(), "\n";
}
?>			

6.2. 动态队列线程池

上面的例子是当线程池满后执行start统一启动,下面的例子是只要线程池中有空闲便立即创建新线程。

代码语言:javascript
复制
			<?php
class Update extends Thread {

    public $running = false;
    public $row = array();
    public function __construct($row) {

	$this->row = $row;
        $this->sql = null;
	//print_r($this->row);
    }

    public function run() {

	if(strlen($this->row['bankno']) > 100 ){
		$bankno = safenet_decrypt($this->row['bankno']);
	}else{
		$error = sprintf("%s, %s\r\n",$this->row['id'], $this->row['bankno']);
		file_put_contents("bankno_error.log", $error, FILE_APPEND);
	}

	if( strlen($bankno) > 7 ){
		$sql = sprintf("update members set bankno = '%s' where id = '%s';", $bankno, $this->row['id']);

		$this->sql = $sql;
	}

	printf("%s\n",$this->sql);
    }

}



try {
	$dbh    = new PDO("mysql:host=" . str_replace(':', ';port=', $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array(
		PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',
		PDO::MYSQL_ATTR_COMPRESS => true
		)
	);

	$sql     = "select id,bankno from members order by id desc limit 50";

	$row = $dbh->query($sql);
	$pool = array();
	while($member = $row->fetch(PDO::FETCH_ASSOC))
	{
		$id 	= $member['id'];
		while (true){
			if(count($pool) < 5){
				$pool[$id] = new Update($member);
				$pool[$id]->start();
				break;
			}else{
				foreach ( $pool as $name => $worker){
					if(! $worker->isRunning()){
						unset($pool[$name]);
					}
				}
			}
		}

	}

	$dbh = null;

} catch (Exception $e) {
    echo '【' , date('H:i:s') , '】', '【系统错误】', $e->getMessage(), "\n";
}
?>			

6.3. pthreads Pool类

pthreads 提供的 Pool class 例子

代码语言:javascript
复制
			<?php

class WebWorker extends Worker {

	public function __construct(SafeLog $logger) {
		$this->logger = $logger;
	}

	protected $loger;
}

class WebWork extends Stackable {

	public function isComplete() {
		return $this->complete;
	}

	public function run() {
		$this->worker
			->logger
			->log("%s executing in Thread #%lu",
				  __CLASS__, $this->worker->getThreadId());
		$this->complete = true;
	}

	protected $complete;
}

class SafeLog extends Stackable {

	protected function log($message, $args = []) {
		$args = func_get_args();

		if (($message = array_shift($args))) {
			echo vsprintf(
				"{$message}\n", $args);
		}
	}
}


$pool = new Pool(8, \WebWorker::class, [new SafeLog()]);

$pool->submit($w=new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->shutdown();

$pool->collect(function($work){
	return $work->isComplete();
});

var_dump($pool);			

7. 多线程文件安全读写(文件锁)

文件所种类。

代码语言:javascript
复制
LOCK_SH 取得共享锁定(读取的程序)。
LOCK_EX 取得独占锁定(写入的程序。
LOCK_UN 释放锁定(无论共享或独占)。
LOCK_NB 如果不希望 flock() 在锁定时堵塞		

共享锁例子

代码语言:javascript
复制
		<?php

$fp = fopen("/tmp/lock.txt", "r+");

if (flock($fp, LOCK_EX)) {  // 进行排它型锁定
    ftruncate($fp, 0);      // truncate file
    fwrite($fp, "Write something here\n");
    fflush($fp);            // flush output before releasing the lock
    flock($fp, LOCK_UN);    // 释放锁定
} else {
    echo "Couldn't get the lock!";
}

fclose($fp);

?>		

共享锁例子2

代码语言:javascript
复制
		<?php
$fp = fopen('/tmp/lock.txt', 'r+');

/* Activate the LOCK_NB option on an LOCK_EX operation */
if(!flock($fp, LOCK_EX | LOCK_NB)) {
    echo 'Unable to obtain lock';
    exit(-1);
}

/* ... */

fclose($fp);
?>		

8. 多线程与数据连接

pthreads 与 pdo 同时使用是,需要注意一点,需要静态声明public static $dbh;并且通过单例模式访问数据库连接。

8.1. Worker 与 PDO

代码语言:javascript
复制
			<?php
class Work extends Stackable {

        public function __construct() {
        }

        public function run() {
                $dbh  = $this->worker->getConnection();
                $sql     = "select id,name from members order by id desc limit 50";
                $row = $dbh->query($sql);
                while($member = $row->fetch(PDO::FETCH_ASSOC)){
                        print_r($member);
                }
        }

}

class ExampleWorker extends Worker {
        public static $dbh;
        public function __construct($name) {
        }

        /*
        * The run method should just prepare the environment for the work that is coming ...
        */
        public function run(){
                self::$dbh = new PDO('mysql:host=192.168.2.1;dbname=example','www','123456');
        }
        public function getConnection(){
                return self::$dbh;
        }
}

$worker = new ExampleWorker("My Worker Thread");

$work=new Work();
$worker->stack($work);

$worker->start();
$worker->shutdown();
?>			
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2015-09-23,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Netkiller 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • PHP 高级编程之多线程
    • http://netkiller.github.io/journal/php.thread.html
    • 1. 多线程环境安装
      • 1.1. PHP 5.5.9
        • 1.2. 安装 pthreads 扩展
        • 2. Thread
        • 3. Worker 与 Stackable
        • 4. 互斥锁
          • 4.1. 多线程与共享内存
          • 5. 线程同步
          • 6. 线程池
            • 6.1. 线程池
              • 6.2. 动态队列线程池
                • 6.3. pthreads Pool类
                • 7. 多线程文件安全读写(文件锁)
                • 8. 多线程与数据连接
                  • 8.1. Worker 与 PDO
                  领券
                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档