PHP管道通信

PHP Jenner 8882℃ 0评论

项目所有源码可在https://github.com/huyanping/simple-fork-php找到。simple-fork-php是php多进程编程框架,以下实现的管道队列为框架的一部分。

管道是IPC(进程通信)最古老的的形式;像在shell中我们使用的ls | wc -l统计文件数量,中间的|实际上就是一个无名管道,它能够将ls的标准输出重定向为wc -l的标准输入。

管道是半双工的,数据只能向一个方向流动;需要双方通信时,需要建立起两个管道。管道单独构成一种独立的文件系统,管道对于管道两端的进程而言,就是一个文件,但它不是普通的文件,它不属于某种文件系统,而是自立门户,单独构成一种文件系统,并且只存在与内存中。一个进程向管道中写的内容被管道另一端的进程读出。写入的内容每次都添加在管道缓冲区的末尾,并且每次都是从缓冲区的头部读出数据。(有点像队列)

无名管道只能由于有亲缘关系的父子进程之间通信,如果要在不同归属的进程之间通信,则只能使用命名管道。由于命名管道能够实现无名管道的功能,这里我们主要介绍命名管道。

命名管道的作用更像是队列。php提供了posix_mkfifo函数创建命名管道文件,然后可以使用流操作管道文件。

一段简单的代码如下:

$pipe = posix_mkfifo('/tmp/test.pipe');

$pid = pcntl_fork();

if($pid==0){
    fwrite($pipe, 'test');
    sleep(1);
}else{
    echo fread($pipe, 4);
}

上面代码需要注意两个个问题,fread读取时必须指定读取的长度,如果读取到的长度比指定长度要小,则fread会阻塞(下面会介绍无阻塞的方式);另一个问题管道不会持久化数据,任何一方关闭管道都会造成管道内数据的丢失。

我们可以对管道做进一步封装,实现一个先进先出的流式通信组件(无协议):

<?php
/**
 * @author Jenner <hypxm@qq.com>
 * @blog http://www.huyanping.cn
 * @license https://opensource.org/licenses/MIT MIT
 * @datetime: 2015/11/24 16:29
 */

namespace Jenner\SimpleFork\Queue;


class Pipe
{
    /**
     * @var resource
     */
    protected $read;

    /**
     * @var resource
     */
    protected $write;

    /**
     * @var string
     */
    protected $filename;

    /**
     * @var bool
     */
    protected $block;

    /**
     * @param string $filename fifo filename
     * @param int $mode
     * @param bool $block if blocking
     */
    public function __construct($filename = '/tmp/simple-fork.pipe', $mode = 0666, $block = false)
    {
        if (!file_exists($filename) && !posix_mkfifo($filename, $mode)) {
            throw new \RuntimeException("create pipe failed");
        }
        if (filetype($filename) != "fifo") {
            throw new \RuntimeException("file exists and it is not a fifo file");
        }

        $this->filename = $filename;
        $this->block = $block;
    }

    public function setBlock($block = true)
    {
        if (is_resource($this->read)) {
            $set = stream_set_blocking($this->read, $block);
            if (!$set) {
                throw new \RuntimeException("stream_set_blocking failed");
            }
        }

        if (is_resource($this->write)) {
            $set = stream_set_blocking($this->write, $block);
            if (!$set) {
                throw new \RuntimeException("stream_set_blocking failed");
            }
        }

        $this->block = $block;
    }

    /**
     * if the stream is blocking, you would better set the value of size,
     * it will not return until the data size is equal to the value of param size
     *
     * @param int $size
     * @return string
     */
    public function read($size = 1024)
    {
        if (!is_resource($this->read)) {
            $this->read = fopen($this->filename, 'r+');
            if (!is_resource($this->read)) {
                throw new \RuntimeException("open file failed");
            }
            if (!$this->block) {
                $set = stream_set_blocking($this->read, false);
                if (!$set) {
                    throw new \RuntimeException("stream_set_blocking failed");
                }
            }
        }

        return fread($this->read, $size);
    }

    /**
     * @param $message
     * @return int
     */
    public function write($message)
    {
        if (!is_resource($this->write)) {
            $this->write = fopen($this->filename, 'w+');
            if (!is_resource($this->write)) {
                throw new \RuntimeException("open file failed");
            }
            if (!$this->block) {
                $set = stream_set_blocking($this->write, false);
                if (!$set) {
                    throw new \RuntimeException("stream_set_blocking failed");
                }
            }
        }

        return fwrite($this->write, $message);
    }

    /**
     *
     */
    public function close()
    {
        if (is_resource($this->read)) {
            fclose($this->read);
        }
        if (is_resource($this->write)) {
            fclose($this->write);
        }
    }

    /**
     *
     */
    public function __destruct()
    {
        $this->close();
    }

    public function remove()
    {
        return unlink($this->filename);
    }
}

以上代码能够实现一个简单的流式通信组件,使用方法如下:

$pid = pcntl_fork();

if($pid == 0){
    $pipe = new \Jenner\SimpleFork\Queue\Pipe();
    sleep(1);
    $pipe->write("test");
    sleep(1);
}else{
    $pipe = new \Jenner\SimpleFork\Queue\Pipe();
    $pipe->setBlock(true);
    $result = $pipe->read(4);
    echo $result . PHP_EOL;
}

我们知道PHP的流分为阻塞和无阻塞流,有时我们希望在管道中无数据的时候立即返回,而不是阻塞整个进程。我们可以通过stream_set_blocking函数修改流的属性达到无阻塞的目的,修改之后,如果管道中没有数据,read操作会立即返回。

下面我们用管道实现一个消息队列,我们知道管道内的流是无格式的,而消息是有格式的,如果要使用管道实现消息队列,我们需要在管道上层实现通信协议。协议的设计非常简单,我们使用一个字节表示消息的长度,最大不能超过int32,后面的字节表示数据。读取时我们先读取消息长度,再通过读取到的长度读取后面的数据。

一个简单的设计如下:

<?php
/**
 * @author Jenner <hypxm@qq.com>
 * @blog http://www.huyanping.cn
 * @license https://opensource.org/licenses/MIT MIT
 * @datetime: 2015/11/24 18:38
 */

namespace Jenner\SimpleFork\Queue;


class PipeQueue implements QueueInterface
{
    /**
     * @var Pipe
     */
    protected $pipe;

    /**
     * @var bool
     */
    protected $block;

    /**
     * @param string $filename fifo filename
     * @param int $mode
     * @param bool $block if blocking
     */
    public function __construct($filename = '/tmp/simple-fork.pipe', $mode = 0666)
    {
        $this->pipe = new Pipe($filename, $mode);
        $this->block = false;
        $this->pipe->setBlock($this->block);
    }

    /**
     * put value into the queue of channel
     *
     * @param $value
     * @return bool
     */
    public function put($value)
    {
        $len = strlen($value);
        if ($len > 2147483647) {
            throw new \RuntimeException("value is too long");
        }
        $raw = pack('N', $len) . $value;
        $write_len = $this->pipe->write($raw);

        return $write_len == strlen($raw);
    }

    /**
     * get value from the queue of channel
     *
     * @param bool $block if block when the queue is empty
     * @return bool|string
     */
    public function get($block = false)
    {
        if ($this->block != $block) {
            $this->pipe->setBlock($block);
            $this->block = $block;
        }
        $len = $this->pipe->read(4);
        if ($len === false) {
            throw new \RuntimeException("read pipe failed");
        }

        if (strlen($len) === 0) {
            return null;
        }
        $len = unpack('N', $len);
        if(empty($len) || !array_key_exists(1, $len) || empty($len[1])){
            throw new \RuntimeException("data protocol error");
        }
        $len = intval($len[1]);

        $value = '';
        while (true) {
            $temp = $this->pipe->read($len);
            if (strlen($temp) == $len) {
                return $temp;
            }
            $value .= $temp;
            $len -= strlen($temp);
            if ($len == 0) {
                return $value;
            }
        }
    }

    /**
     * remove the queue resource
     *
     * @return bool
     */
    public function remove()
    {
        $this->pipe->close();
        $this->pipe->remove();
    }
}

使用方法如下:

$queue = new \Jenner\SimpleFork\Queue\PipeQueue();
var_dump($queue->put('test'));
var_dump($queue->get());

 

原创文章,转载请注明: 转载自始终不够

本文链接地址: PHP管道通信

转载请注明:始终不够 » PHP管道通信

喜欢 (14)