init Abgabe
This commit is contained in:
@@ -0,0 +1,83 @@
|
||||
<?php
|
||||
|
||||
namespace React\Stream;
|
||||
|
||||
use Evenement\EventEmitter;
|
||||
|
||||
final class CompositeStream extends EventEmitter implements DuplexStreamInterface
|
||||
{
|
||||
private $readable;
|
||||
private $writable;
|
||||
private $closed = false;
|
||||
|
||||
public function __construct(ReadableStreamInterface $readable, WritableStreamInterface $writable)
|
||||
{
|
||||
$this->readable = $readable;
|
||||
$this->writable = $writable;
|
||||
|
||||
if (!$readable->isReadable() || !$writable->isWritable()) {
|
||||
$this->close();
|
||||
return;
|
||||
}
|
||||
|
||||
Util::forwardEvents($this->readable, $this, array('data', 'end', 'error'));
|
||||
Util::forwardEvents($this->writable, $this, array('drain', 'error', 'pipe'));
|
||||
|
||||
$this->readable->on('close', array($this, 'close'));
|
||||
$this->writable->on('close', array($this, 'close'));
|
||||
}
|
||||
|
||||
public function isReadable()
|
||||
{
|
||||
return $this->readable->isReadable();
|
||||
}
|
||||
|
||||
public function pause()
|
||||
{
|
||||
$this->readable->pause();
|
||||
}
|
||||
|
||||
public function resume()
|
||||
{
|
||||
if (!$this->writable->isWritable()) {
|
||||
return;
|
||||
}
|
||||
|
||||
$this->readable->resume();
|
||||
}
|
||||
|
||||
public function pipe(WritableStreamInterface $dest, array $options = array())
|
||||
{
|
||||
return Util::pipe($this, $dest, $options);
|
||||
}
|
||||
|
||||
public function isWritable()
|
||||
{
|
||||
return $this->writable->isWritable();
|
||||
}
|
||||
|
||||
public function write($data)
|
||||
{
|
||||
return $this->writable->write($data);
|
||||
}
|
||||
|
||||
public function end($data = null)
|
||||
{
|
||||
$this->readable->pause();
|
||||
$this->writable->end($data);
|
||||
}
|
||||
|
||||
public function close()
|
||||
{
|
||||
if ($this->closed) {
|
||||
return;
|
||||
}
|
||||
|
||||
$this->closed = true;
|
||||
$this->readable->close();
|
||||
$this->writable->close();
|
||||
|
||||
$this->emit('close');
|
||||
$this->removeAllListeners();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,240 @@
|
||||
<?php
|
||||
|
||||
namespace React\Stream;
|
||||
|
||||
use Evenement\EventEmitter;
|
||||
use React\EventLoop\Loop;
|
||||
use React\EventLoop\LoopInterface;
|
||||
use InvalidArgumentException;
|
||||
|
||||
final class DuplexResourceStream extends EventEmitter implements DuplexStreamInterface
|
||||
{
|
||||
private $stream;
|
||||
|
||||
/** @var LoopInterface */
|
||||
private $loop;
|
||||
|
||||
/**
|
||||
* Controls the maximum buffer size in bytes to read at once from the stream.
|
||||
*
|
||||
* This can be a positive number which means that up to X bytes will be read
|
||||
* at once from the underlying stream resource. Note that the actual number
|
||||
* of bytes read may be lower if the stream resource has less than X bytes
|
||||
* currently available.
|
||||
*
|
||||
* This can be `-1` which means read everything available from the
|
||||
* underlying stream resource.
|
||||
* This should read until the stream resource is not readable anymore
|
||||
* (i.e. underlying buffer drained), note that this does not neccessarily
|
||||
* mean it reached EOF.
|
||||
*
|
||||
* @var int
|
||||
*/
|
||||
private $bufferSize;
|
||||
private $buffer;
|
||||
|
||||
private $readable = true;
|
||||
private $writable = true;
|
||||
private $closing = false;
|
||||
private $listening = false;
|
||||
|
||||
/**
|
||||
* @param resource $stream
|
||||
* @param ?LoopInterface $loop
|
||||
* @param ?int $readChunkSize
|
||||
* @param ?WritableStreamInterface $buffer
|
||||
*/
|
||||
public function __construct($stream, $loop = null, $readChunkSize = null, $buffer = null)
|
||||
{
|
||||
if (!\is_resource($stream) || \get_resource_type($stream) !== "stream") {
|
||||
throw new InvalidArgumentException('First parameter must be a valid stream resource');
|
||||
}
|
||||
|
||||
// ensure resource is opened for reading and wrting (fopen mode must contain "+")
|
||||
$meta = \stream_get_meta_data($stream);
|
||||
if (isset($meta['mode']) && $meta['mode'] !== '' && \strpos($meta['mode'], '+') === false) {
|
||||
throw new InvalidArgumentException('Given stream resource is not opened in read and write mode');
|
||||
}
|
||||
|
||||
// this class relies on non-blocking I/O in order to not interrupt the event loop
|
||||
// e.g. pipes on Windows do not support this: https://bugs.php.net/bug.php?id=47918
|
||||
if ($buffer !== null && !$buffer instanceof WritableResourceStream && \stream_set_blocking($stream, false) !== true) {
|
||||
throw new \RuntimeException('Unable to set stream resource to non-blocking mode');
|
||||
}
|
||||
|
||||
if ($loop !== null && !$loop instanceof LoopInterface) { // manual type check to support legacy PHP < 7.1
|
||||
throw new \InvalidArgumentException('Argument #2 ($loop) expected null|React\EventLoop\LoopInterface');
|
||||
}
|
||||
if ($buffer !== null && !$buffer instanceof WritableStreamInterface) { // manual type check to support legacy PHP < 7.1
|
||||
throw new \InvalidArgumentException('Argument #4 ($buffer) expected null|React\Stream\WritableStreamInterface');
|
||||
}
|
||||
|
||||
// Use unbuffered read operations on the underlying stream resource.
|
||||
// Reading chunks from the stream may otherwise leave unread bytes in
|
||||
// PHP's stream buffers which some event loop implementations do not
|
||||
// trigger events on (edge triggered).
|
||||
// This does not affect the default event loop implementation (level
|
||||
// triggered), so we can ignore platforms not supporting this (HHVM).
|
||||
// Pipe streams (such as STDIN) do not seem to require this and legacy
|
||||
// PHP versions cause SEGFAULTs on unbuffered pipe streams, so skip this.
|
||||
if (\function_exists('stream_set_read_buffer') && !$this->isLegacyPipe($stream)) {
|
||||
\stream_set_read_buffer($stream, 0);
|
||||
}
|
||||
|
||||
if ($buffer === null) {
|
||||
$buffer = new WritableResourceStream($stream, $loop);
|
||||
}
|
||||
|
||||
$this->stream = $stream;
|
||||
$this->loop = $loop ?: Loop::get();
|
||||
$this->bufferSize = ($readChunkSize === null) ? 65536 : (int)$readChunkSize;
|
||||
$this->buffer = $buffer;
|
||||
|
||||
$that = $this;
|
||||
|
||||
$this->buffer->on('error', function ($error) use ($that) {
|
||||
$that->emit('error', array($error));
|
||||
});
|
||||
|
||||
$this->buffer->on('close', array($this, 'close'));
|
||||
|
||||
$this->buffer->on('drain', function () use ($that) {
|
||||
$that->emit('drain');
|
||||
});
|
||||
|
||||
$this->resume();
|
||||
}
|
||||
|
||||
public function isReadable()
|
||||
{
|
||||
return $this->readable;
|
||||
}
|
||||
|
||||
public function isWritable()
|
||||
{
|
||||
return $this->writable;
|
||||
}
|
||||
|
||||
public function pause()
|
||||
{
|
||||
if ($this->listening) {
|
||||
$this->loop->removeReadStream($this->stream);
|
||||
$this->listening = false;
|
||||
}
|
||||
}
|
||||
|
||||
public function resume()
|
||||
{
|
||||
if (!$this->listening && $this->readable) {
|
||||
$this->loop->addReadStream($this->stream, array($this, 'handleData'));
|
||||
$this->listening = true;
|
||||
}
|
||||
}
|
||||
|
||||
public function write($data)
|
||||
{
|
||||
if (!$this->writable) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return $this->buffer->write($data);
|
||||
}
|
||||
|
||||
public function close()
|
||||
{
|
||||
if (!$this->writable && !$this->closing) {
|
||||
return;
|
||||
}
|
||||
|
||||
$this->closing = false;
|
||||
|
||||
$this->readable = false;
|
||||
$this->writable = false;
|
||||
|
||||
$this->emit('close');
|
||||
$this->pause();
|
||||
$this->buffer->close();
|
||||
$this->removeAllListeners();
|
||||
|
||||
if (\is_resource($this->stream)) {
|
||||
\fclose($this->stream);
|
||||
}
|
||||
}
|
||||
|
||||
public function end($data = null)
|
||||
{
|
||||
if (!$this->writable) {
|
||||
return;
|
||||
}
|
||||
|
||||
$this->closing = true;
|
||||
|
||||
$this->readable = false;
|
||||
$this->writable = false;
|
||||
$this->pause();
|
||||
|
||||
$this->buffer->end($data);
|
||||
}
|
||||
|
||||
public function pipe(WritableStreamInterface $dest, array $options = array())
|
||||
{
|
||||
return Util::pipe($this, $dest, $options);
|
||||
}
|
||||
|
||||
/** @internal */
|
||||
public function handleData($stream)
|
||||
{
|
||||
$error = null;
|
||||
\set_error_handler(function ($errno, $errstr, $errfile, $errline) use (&$error) {
|
||||
$error = new \ErrorException(
|
||||
$errstr,
|
||||
0,
|
||||
$errno,
|
||||
$errfile,
|
||||
$errline
|
||||
);
|
||||
});
|
||||
|
||||
$data = \stream_get_contents($stream, $this->bufferSize);
|
||||
|
||||
\restore_error_handler();
|
||||
|
||||
if ($error !== null) {
|
||||
$this->emit('error', array(new \RuntimeException('Unable to read from stream: ' . $error->getMessage(), 0, $error)));
|
||||
$this->close();
|
||||
return;
|
||||
}
|
||||
|
||||
if ($data !== '') {
|
||||
$this->emit('data', array($data));
|
||||
} elseif (\feof($this->stream)) {
|
||||
// no data read => we reached the end and close the stream
|
||||
$this->emit('end');
|
||||
$this->close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns whether this is a pipe resource in a legacy environment
|
||||
*
|
||||
* This works around a legacy PHP bug (#61019) that was fixed in PHP 5.4.28+
|
||||
* and PHP 5.5.12+ and newer.
|
||||
*
|
||||
* @param resource $resource
|
||||
* @return bool
|
||||
* @link https://github.com/reactphp/child-process/issues/40
|
||||
*
|
||||
* @codeCoverageIgnore
|
||||
*/
|
||||
private function isLegacyPipe($resource)
|
||||
{
|
||||
if (\PHP_VERSION_ID < 50428 || (\PHP_VERSION_ID >= 50500 && \PHP_VERSION_ID < 50512)) {
|
||||
$meta = \stream_get_meta_data($resource);
|
||||
|
||||
if (isset($meta['stream_type']) && $meta['stream_type'] === 'STDIO') {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,39 @@
|
||||
<?php
|
||||
|
||||
namespace React\Stream;
|
||||
|
||||
/**
|
||||
* The `DuplexStreamInterface` is responsible for providing an interface for
|
||||
* duplex streams (both readable and writable).
|
||||
*
|
||||
* It builds on top of the existing interfaces for readable and writable streams
|
||||
* and follows the exact same method and event semantics.
|
||||
* If you're new to this concept, you should look into the
|
||||
* `ReadableStreamInterface` and `WritableStreamInterface` first.
|
||||
*
|
||||
* Besides defining a few methods, this interface also implements the
|
||||
* `EventEmitterInterface` which allows you to react to the same events defined
|
||||
* on the `ReadbleStreamInterface` and `WritableStreamInterface`.
|
||||
*
|
||||
* The event callback functions MUST be a valid `callable` that obeys strict
|
||||
* parameter definitions and MUST accept event parameters exactly as documented.
|
||||
* The event callback functions MUST NOT throw an `Exception`.
|
||||
* The return value of the event callback functions will be ignored and has no
|
||||
* effect, so for performance reasons you're recommended to not return any
|
||||
* excessive data structures.
|
||||
*
|
||||
* Every implementation of this interface MUST follow these event semantics in
|
||||
* order to be considered a well-behaving stream.
|
||||
*
|
||||
* > Note that higher-level implementations of this interface may choose to
|
||||
* define additional events with dedicated semantics not defined as part of
|
||||
* this low-level stream specification. Conformance with these event semantics
|
||||
* is out of scope for this interface, so you may also have to refer to the
|
||||
* documentation of such a higher-level implementation.
|
||||
*
|
||||
* @see ReadableStreamInterface
|
||||
* @see WritableStreamInterface
|
||||
*/
|
||||
interface DuplexStreamInterface extends ReadableStreamInterface, WritableStreamInterface
|
||||
{
|
||||
}
|
||||
@@ -0,0 +1,188 @@
|
||||
<?php
|
||||
|
||||
namespace React\Stream;
|
||||
|
||||
use Evenement\EventEmitter;
|
||||
use React\EventLoop\Loop;
|
||||
use React\EventLoop\LoopInterface;
|
||||
use InvalidArgumentException;
|
||||
|
||||
final class ReadableResourceStream extends EventEmitter implements ReadableStreamInterface
|
||||
{
|
||||
/**
|
||||
* @var resource
|
||||
*/
|
||||
private $stream;
|
||||
|
||||
/** @var LoopInterface */
|
||||
private $loop;
|
||||
|
||||
/**
|
||||
* Controls the maximum buffer size in bytes to read at once from the stream.
|
||||
*
|
||||
* This value SHOULD NOT be changed unless you know what you're doing.
|
||||
*
|
||||
* This can be a positive number which means that up to X bytes will be read
|
||||
* at once from the underlying stream resource. Note that the actual number
|
||||
* of bytes read may be lower if the stream resource has less than X bytes
|
||||
* currently available.
|
||||
*
|
||||
* This can be `-1` which means read everything available from the
|
||||
* underlying stream resource.
|
||||
* This should read until the stream resource is not readable anymore
|
||||
* (i.e. underlying buffer drained), note that this does not neccessarily
|
||||
* mean it reached EOF.
|
||||
*
|
||||
* @var int
|
||||
*/
|
||||
private $bufferSize;
|
||||
|
||||
private $closed = false;
|
||||
private $listening = false;
|
||||
|
||||
/**
|
||||
* @param resource $stream
|
||||
* @param ?LoopInterface $loop
|
||||
* @param ?int $readChunkSize
|
||||
*/
|
||||
public function __construct($stream, $loop = null, $readChunkSize = null)
|
||||
{
|
||||
if (!\is_resource($stream) || \get_resource_type($stream) !== "stream") {
|
||||
throw new InvalidArgumentException('First parameter must be a valid stream resource');
|
||||
}
|
||||
|
||||
// ensure resource is opened for reading (fopen mode must contain "r" or "+")
|
||||
$meta = \stream_get_meta_data($stream);
|
||||
if (isset($meta['mode']) && $meta['mode'] !== '' && \strpos($meta['mode'], 'r') === \strpos($meta['mode'], '+')) {
|
||||
throw new InvalidArgumentException('Given stream resource is not opened in read mode');
|
||||
}
|
||||
|
||||
// this class relies on non-blocking I/O in order to not interrupt the event loop
|
||||
// e.g. pipes on Windows do not support this: https://bugs.php.net/bug.php?id=47918
|
||||
if (\stream_set_blocking($stream, false) !== true) {
|
||||
throw new \RuntimeException('Unable to set stream resource to non-blocking mode');
|
||||
}
|
||||
|
||||
if ($loop !== null && !$loop instanceof LoopInterface) { // manual type check to support legacy PHP < 7.1
|
||||
throw new \InvalidArgumentException('Argument #2 ($loop) expected null|React\EventLoop\LoopInterface');
|
||||
}
|
||||
|
||||
// Use unbuffered read operations on the underlying stream resource.
|
||||
// Reading chunks from the stream may otherwise leave unread bytes in
|
||||
// PHP's stream buffers which some event loop implementations do not
|
||||
// trigger events on (edge triggered).
|
||||
// This does not affect the default event loop implementation (level
|
||||
// triggered), so we can ignore platforms not supporting this (HHVM).
|
||||
// Pipe streams (such as STDIN) do not seem to require this and legacy
|
||||
// PHP versions cause SEGFAULTs on unbuffered pipe streams, so skip this.
|
||||
if (\function_exists('stream_set_read_buffer') && !$this->isLegacyPipe($stream)) {
|
||||
\stream_set_read_buffer($stream, 0);
|
||||
}
|
||||
|
||||
$this->stream = $stream;
|
||||
$this->loop = $loop ?: Loop::get();
|
||||
$this->bufferSize = ($readChunkSize === null) ? 65536 : (int)$readChunkSize;
|
||||
|
||||
$this->resume();
|
||||
}
|
||||
|
||||
public function isReadable()
|
||||
{
|
||||
return !$this->closed;
|
||||
}
|
||||
|
||||
public function pause()
|
||||
{
|
||||
if ($this->listening) {
|
||||
$this->loop->removeReadStream($this->stream);
|
||||
$this->listening = false;
|
||||
}
|
||||
}
|
||||
|
||||
public function resume()
|
||||
{
|
||||
if (!$this->listening && !$this->closed) {
|
||||
$this->loop->addReadStream($this->stream, array($this, 'handleData'));
|
||||
$this->listening = true;
|
||||
}
|
||||
}
|
||||
|
||||
public function pipe(WritableStreamInterface $dest, array $options = array())
|
||||
{
|
||||
return Util::pipe($this, $dest, $options);
|
||||
}
|
||||
|
||||
public function close()
|
||||
{
|
||||
if ($this->closed) {
|
||||
return;
|
||||
}
|
||||
|
||||
$this->closed = true;
|
||||
|
||||
$this->emit('close');
|
||||
$this->pause();
|
||||
$this->removeAllListeners();
|
||||
|
||||
if (\is_resource($this->stream)) {
|
||||
\fclose($this->stream);
|
||||
}
|
||||
}
|
||||
|
||||
/** @internal */
|
||||
public function handleData()
|
||||
{
|
||||
$error = null;
|
||||
\set_error_handler(function ($errno, $errstr, $errfile, $errline) use (&$error) {
|
||||
$error = new \ErrorException(
|
||||
$errstr,
|
||||
0,
|
||||
$errno,
|
||||
$errfile,
|
||||
$errline
|
||||
);
|
||||
});
|
||||
|
||||
$data = \stream_get_contents($this->stream, $this->bufferSize);
|
||||
|
||||
\restore_error_handler();
|
||||
|
||||
if ($error !== null) {
|
||||
$this->emit('error', array(new \RuntimeException('Unable to read from stream: ' . $error->getMessage(), 0, $error)));
|
||||
$this->close();
|
||||
return;
|
||||
}
|
||||
|
||||
if ($data !== '') {
|
||||
$this->emit('data', array($data));
|
||||
} elseif (\feof($this->stream)) {
|
||||
// no data read => we reached the end and close the stream
|
||||
$this->emit('end');
|
||||
$this->close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns whether this is a pipe resource in a legacy environment
|
||||
*
|
||||
* This works around a legacy PHP bug (#61019) that was fixed in PHP 5.4.28+
|
||||
* and PHP 5.5.12+ and newer.
|
||||
*
|
||||
* @param resource $resource
|
||||
* @return bool
|
||||
* @link https://github.com/reactphp/child-process/issues/40
|
||||
*
|
||||
* @codeCoverageIgnore
|
||||
*/
|
||||
private function isLegacyPipe($resource)
|
||||
{
|
||||
if (\PHP_VERSION_ID < 50428 || (\PHP_VERSION_ID >= 50500 && \PHP_VERSION_ID < 50512)) {
|
||||
$meta = \stream_get_meta_data($resource);
|
||||
|
||||
if (isset($meta['stream_type']) && $meta['stream_type'] === 'STDIO') {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,362 @@
|
||||
<?php
|
||||
|
||||
namespace React\Stream;
|
||||
|
||||
use Evenement\EventEmitterInterface;
|
||||
|
||||
/**
|
||||
* The `ReadableStreamInterface` is responsible for providing an interface for
|
||||
* read-only streams and the readable side of duplex streams.
|
||||
*
|
||||
* Besides defining a few methods, this interface also implements the
|
||||
* `EventEmitterInterface` which allows you to react to certain events:
|
||||
*
|
||||
* data event:
|
||||
* The `data` event will be emitted whenever some data was read/received
|
||||
* from this source stream.
|
||||
* The event receives a single mixed argument for incoming data.
|
||||
*
|
||||
* ```php
|
||||
* $stream->on('data', function ($data) {
|
||||
* echo $data;
|
||||
* });
|
||||
* ```
|
||||
*
|
||||
* This event MAY be emitted any number of times, which may be zero times if
|
||||
* this stream does not send any data at all.
|
||||
* It SHOULD not be emitted after an `end` or `close` event.
|
||||
*
|
||||
* The given `$data` argument may be of mixed type, but it's usually
|
||||
* recommended it SHOULD be a `string` value or MAY use a type that allows
|
||||
* representation as a `string` for maximum compatibility.
|
||||
*
|
||||
* Many common streams (such as a TCP/IP connection or a file-based stream)
|
||||
* will emit the raw (binary) payload data that is received over the wire as
|
||||
* chunks of `string` values.
|
||||
*
|
||||
* Due to the stream-based nature of this, the sender may send any number
|
||||
* of chunks with varying sizes. There are no guarantees that these chunks
|
||||
* will be received with the exact same framing the sender intended to send.
|
||||
* In other words, many lower-level protocols (such as TCP/IP) transfer the
|
||||
* data in chunks that may be anywhere between single-byte values to several
|
||||
* dozens of kilobytes. You may want to apply a higher-level protocol to
|
||||
* these low-level data chunks in order to achieve proper message framing.
|
||||
*
|
||||
* end event:
|
||||
* The `end` event will be emitted once the source stream has successfully
|
||||
* reached the end of the stream (EOF).
|
||||
*
|
||||
* ```php
|
||||
* $stream->on('end', function () {
|
||||
* echo 'END';
|
||||
* });
|
||||
* ```
|
||||
*
|
||||
* This event SHOULD be emitted once or never at all, depending on whether
|
||||
* a successful end was detected.
|
||||
* It SHOULD NOT be emitted after a previous `end` or `close` event.
|
||||
* It MUST NOT be emitted if the stream closes due to a non-successful
|
||||
* end, such as after a previous `error` event.
|
||||
*
|
||||
* After the stream is ended, it MUST switch to non-readable mode,
|
||||
* see also `isReadable()`.
|
||||
*
|
||||
* This event will only be emitted if the *end* was reached successfully,
|
||||
* not if the stream was interrupted by an unrecoverable error or explicitly
|
||||
* closed. Not all streams know this concept of a "successful end".
|
||||
* Many use-cases involve detecting when the stream closes (terminates)
|
||||
* instead, in this case you should use the `close` event.
|
||||
* After the stream emits an `end` event, it SHOULD usually be followed by a
|
||||
* `close` event.
|
||||
*
|
||||
* Many common streams (such as a TCP/IP connection or a file-based stream)
|
||||
* will emit this event if either the remote side closes the connection or
|
||||
* a file handle was successfully read until reaching its end (EOF).
|
||||
*
|
||||
* Note that this event should not be confused with the `end()` method.
|
||||
* This event defines a successful end *reading* from a source stream, while
|
||||
* the `end()` method defines *writing* a successful end to a destination
|
||||
* stream.
|
||||
*
|
||||
* error event:
|
||||
* The `error` event will be emitted once a fatal error occurs, usually while
|
||||
* trying to read from this stream.
|
||||
* The event receives a single `Exception` argument for the error instance.
|
||||
*
|
||||
* ```php
|
||||
* $stream->on('error', function (Exception $e) {
|
||||
* echo 'Error: ' . $e->getMessage() . PHP_EOL;
|
||||
* });
|
||||
* ```
|
||||
*
|
||||
* This event SHOULD be emitted once the stream detects a fatal error, such
|
||||
* as a fatal transmission error or after an unexpected `data` or premature
|
||||
* `end` event.
|
||||
* It SHOULD NOT be emitted after a previous `error`, `end` or `close` event.
|
||||
* It MUST NOT be emitted if this is not a fatal error condition, such as
|
||||
* a temporary network issue that did not cause any data to be lost.
|
||||
*
|
||||
* After the stream errors, it MUST close the stream and SHOULD thus be
|
||||
* followed by a `close` event and then switch to non-readable mode, see
|
||||
* also `close()` and `isReadable()`.
|
||||
*
|
||||
* Many common streams (such as a TCP/IP connection or a file-based stream)
|
||||
* only deal with data transmission and do not make assumption about data
|
||||
* boundaries (such as unexpected `data` or premature `end` events).
|
||||
* In other words, many lower-level protocols (such as TCP/IP) may choose
|
||||
* to only emit this for a fatal transmission error once and will then
|
||||
* close (terminate) the stream in response.
|
||||
*
|
||||
* If this stream is a `DuplexStreamInterface`, you should also notice
|
||||
* how the writable side of the stream also implements an `error` event.
|
||||
* In other words, an error may occur while either reading or writing the
|
||||
* stream which should result in the same error processing.
|
||||
*
|
||||
* close event:
|
||||
* The `close` event will be emitted once the stream closes (terminates).
|
||||
*
|
||||
* ```php
|
||||
* $stream->on('close', function () {
|
||||
* echo 'CLOSED';
|
||||
* });
|
||||
* ```
|
||||
*
|
||||
* This event SHOULD be emitted once or never at all, depending on whether
|
||||
* the stream ever terminates.
|
||||
* It SHOULD NOT be emitted after a previous `close` event.
|
||||
*
|
||||
* After the stream is closed, it MUST switch to non-readable mode,
|
||||
* see also `isReadable()`.
|
||||
*
|
||||
* Unlike the `end` event, this event SHOULD be emitted whenever the stream
|
||||
* closes, irrespective of whether this happens implicitly due to an
|
||||
* unrecoverable error or explicitly when either side closes the stream.
|
||||
* If you only want to detect a *successful* end, you should use the `end`
|
||||
* event instead.
|
||||
*
|
||||
* Many common streams (such as a TCP/IP connection or a file-based stream)
|
||||
* will likely choose to emit this event after reading a *successful* `end`
|
||||
* event or after a fatal transmission `error` event.
|
||||
*
|
||||
* If this stream is a `DuplexStreamInterface`, you should also notice
|
||||
* how the writable side of the stream also implements a `close` event.
|
||||
* In other words, after receiving this event, the stream MUST switch into
|
||||
* non-writable AND non-readable mode, see also `isWritable()`.
|
||||
* Note that this event should not be confused with the `end` event.
|
||||
*
|
||||
* The event callback functions MUST be a valid `callable` that obeys strict
|
||||
* parameter definitions and MUST accept event parameters exactly as documented.
|
||||
* The event callback functions MUST NOT throw an `Exception`.
|
||||
* The return value of the event callback functions will be ignored and has no
|
||||
* effect, so for performance reasons you're recommended to not return any
|
||||
* excessive data structures.
|
||||
*
|
||||
* Every implementation of this interface MUST follow these event semantics in
|
||||
* order to be considered a well-behaving stream.
|
||||
*
|
||||
* > Note that higher-level implementations of this interface may choose to
|
||||
* define additional events with dedicated semantics not defined as part of
|
||||
* this low-level stream specification. Conformance with these event semantics
|
||||
* is out of scope for this interface, so you may also have to refer to the
|
||||
* documentation of such a higher-level implementation.
|
||||
*
|
||||
* @see EventEmitterInterface
|
||||
*/
|
||||
interface ReadableStreamInterface extends EventEmitterInterface
|
||||
{
|
||||
/**
|
||||
* Checks whether this stream is in a readable state (not closed already).
|
||||
*
|
||||
* This method can be used to check if the stream still accepts incoming
|
||||
* data events or if it is ended or closed already.
|
||||
* Once the stream is non-readable, no further `data` or `end` events SHOULD
|
||||
* be emitted.
|
||||
*
|
||||
* ```php
|
||||
* assert($stream->isReadable() === false);
|
||||
*
|
||||
* $stream->on('data', assertNeverCalled());
|
||||
* $stream->on('end', assertNeverCalled());
|
||||
* ```
|
||||
*
|
||||
* A successfully opened stream always MUST start in readable mode.
|
||||
*
|
||||
* Once the stream ends or closes, it MUST switch to non-readable mode.
|
||||
* This can happen any time, explicitly through `close()` or
|
||||
* implicitly due to a remote close or an unrecoverable transmission error.
|
||||
* Once a stream has switched to non-readable mode, it MUST NOT transition
|
||||
* back to readable mode.
|
||||
*
|
||||
* If this stream is a `DuplexStreamInterface`, you should also notice
|
||||
* how the writable side of the stream also implements an `isWritable()`
|
||||
* method. Unless this is a half-open duplex stream, they SHOULD usually
|
||||
* have the same return value.
|
||||
*
|
||||
* @return bool
|
||||
*/
|
||||
public function isReadable();
|
||||
|
||||
/**
|
||||
* Pauses reading incoming data events.
|
||||
*
|
||||
* Removes the data source file descriptor from the event loop. This
|
||||
* allows you to throttle incoming data.
|
||||
*
|
||||
* Unless otherwise noted, a successfully opened stream SHOULD NOT start
|
||||
* in paused state.
|
||||
*
|
||||
* Once the stream is paused, no futher `data` or `end` events SHOULD
|
||||
* be emitted.
|
||||
*
|
||||
* ```php
|
||||
* $stream->pause();
|
||||
*
|
||||
* $stream->on('data', assertShouldNeverCalled());
|
||||
* $stream->on('end', assertShouldNeverCalled());
|
||||
* ```
|
||||
*
|
||||
* This method is advisory-only, though generally not recommended, the
|
||||
* stream MAY continue emitting `data` events.
|
||||
*
|
||||
* You can continue processing events by calling `resume()` again.
|
||||
*
|
||||
* Note that both methods can be called any number of times, in particular
|
||||
* calling `pause()` more than once SHOULD NOT have any effect.
|
||||
*
|
||||
* @see self::resume()
|
||||
* @return void
|
||||
*/
|
||||
public function pause();
|
||||
|
||||
/**
|
||||
* Resumes reading incoming data events.
|
||||
*
|
||||
* Re-attach the data source after a previous `pause()`.
|
||||
*
|
||||
* ```php
|
||||
* $stream->pause();
|
||||
*
|
||||
* Loop::addTimer(1.0, function () use ($stream) {
|
||||
* $stream->resume();
|
||||
* });
|
||||
* ```
|
||||
*
|
||||
* Note that both methods can be called any number of times, in particular
|
||||
* calling `resume()` without a prior `pause()` SHOULD NOT have any effect.
|
||||
*
|
||||
* @see self::pause()
|
||||
* @return void
|
||||
*/
|
||||
public function resume();
|
||||
|
||||
/**
|
||||
* Pipes all the data from this readable source into the given writable destination.
|
||||
*
|
||||
* Automatically sends all incoming data to the destination.
|
||||
* Automatically throttles the source based on what the destination can handle.
|
||||
*
|
||||
* ```php
|
||||
* $source->pipe($dest);
|
||||
* ```
|
||||
*
|
||||
* Similarly, you can also pipe an instance implementing `DuplexStreamInterface`
|
||||
* into itself in order to write back all the data that is received.
|
||||
* This may be a useful feature for a TCP/IP echo service:
|
||||
*
|
||||
* ```php
|
||||
* $connection->pipe($connection);
|
||||
* ```
|
||||
*
|
||||
* This method returns the destination stream as-is, which can be used to
|
||||
* set up chains of piped streams:
|
||||
*
|
||||
* ```php
|
||||
* $source->pipe($decodeGzip)->pipe($filterBadWords)->pipe($dest);
|
||||
* ```
|
||||
*
|
||||
* By default, this will call `end()` on the destination stream once the
|
||||
* source stream emits an `end` event. This can be disabled like this:
|
||||
*
|
||||
* ```php
|
||||
* $source->pipe($dest, array('end' => false));
|
||||
* ```
|
||||
*
|
||||
* Note that this only applies to the `end` event.
|
||||
* If an `error` or explicit `close` event happens on the source stream,
|
||||
* you'll have to manually close the destination stream:
|
||||
*
|
||||
* ```php
|
||||
* $source->pipe($dest);
|
||||
* $source->on('close', function () use ($dest) {
|
||||
* $dest->end('BYE!');
|
||||
* });
|
||||
* ```
|
||||
*
|
||||
* If the source stream is not readable (closed state), then this is a NO-OP.
|
||||
*
|
||||
* ```php
|
||||
* $source->close();
|
||||
* $source->pipe($dest); // NO-OP
|
||||
* ```
|
||||
*
|
||||
* If the destinantion stream is not writable (closed state), then this will simply
|
||||
* throttle (pause) the source stream:
|
||||
*
|
||||
* ```php
|
||||
* $dest->close();
|
||||
* $source->pipe($dest); // calls $source->pause()
|
||||
* ```
|
||||
*
|
||||
* Similarly, if the destination stream is closed while the pipe is still
|
||||
* active, it will also throttle (pause) the source stream:
|
||||
*
|
||||
* ```php
|
||||
* $source->pipe($dest);
|
||||
* $dest->close(); // calls $source->pause()
|
||||
* ```
|
||||
*
|
||||
* Once the pipe is set up successfully, the destination stream MUST emit
|
||||
* a `pipe` event with this source stream an event argument.
|
||||
*
|
||||
* @param WritableStreamInterface $dest
|
||||
* @param array $options
|
||||
* @return WritableStreamInterface $dest stream as-is
|
||||
*/
|
||||
public function pipe(WritableStreamInterface $dest, array $options = array());
|
||||
|
||||
/**
|
||||
* Closes the stream (forcefully).
|
||||
*
|
||||
* This method can be used to (forcefully) close the stream.
|
||||
*
|
||||
* ```php
|
||||
* $stream->close();
|
||||
* ```
|
||||
*
|
||||
* Once the stream is closed, it SHOULD emit a `close` event.
|
||||
* Note that this event SHOULD NOT be emitted more than once, in particular
|
||||
* if this method is called multiple times.
|
||||
*
|
||||
* After calling this method, the stream MUST switch into a non-readable
|
||||
* mode, see also `isReadable()`.
|
||||
* This means that no further `data` or `end` events SHOULD be emitted.
|
||||
*
|
||||
* ```php
|
||||
* $stream->close();
|
||||
* assert($stream->isReadable() === false);
|
||||
*
|
||||
* $stream->on('data', assertNeverCalled());
|
||||
* $stream->on('end', assertNeverCalled());
|
||||
* ```
|
||||
*
|
||||
* If this stream is a `DuplexStreamInterface`, you should also notice
|
||||
* how the writable side of the stream also implements a `close()` method.
|
||||
* In other words, after calling this method, the stream MUST switch into
|
||||
* non-writable AND non-readable mode, see also `isWritable()`.
|
||||
* Note that this method should not be confused with the `end()` method.
|
||||
*
|
||||
* @return void
|
||||
* @see WritableStreamInterface::close()
|
||||
*/
|
||||
public function close();
|
||||
}
|
||||
@@ -0,0 +1,195 @@
|
||||
<?php
|
||||
|
||||
namespace React\Stream;
|
||||
|
||||
use Evenement\EventEmitter;
|
||||
use InvalidArgumentException;
|
||||
|
||||
/**
|
||||
* The `ThroughStream` implements the
|
||||
* [`DuplexStreamInterface`](#duplexstreaminterface) and will simply pass any data
|
||||
* you write to it through to its readable end.
|
||||
*
|
||||
* ```php
|
||||
* $through = new ThroughStream();
|
||||
* $through->on('data', $this->expectCallableOnceWith('hello'));
|
||||
*
|
||||
* $through->write('hello');
|
||||
* ```
|
||||
*
|
||||
* Similarly, the [`end()` method](#end) will end the stream and emit an
|
||||
* [`end` event](#end-event) and then [`close()`](#close-1) the stream.
|
||||
* The [`close()` method](#close-1) will close the stream and emit a
|
||||
* [`close` event](#close-event).
|
||||
* Accordingly, this is can also be used in a [`pipe()`](#pipe) context like this:
|
||||
*
|
||||
* ```php
|
||||
* $through = new ThroughStream();
|
||||
* $source->pipe($through)->pipe($dest);
|
||||
* ```
|
||||
*
|
||||
* Optionally, its constructor accepts any callable function which will then be
|
||||
* used to *filter* any data written to it. This function receives a single data
|
||||
* argument as passed to the writable side and must return the data as it will be
|
||||
* passed to its readable end:
|
||||
*
|
||||
* ```php
|
||||
* $through = new ThroughStream('strtoupper');
|
||||
* $source->pipe($through)->pipe($dest);
|
||||
* ```
|
||||
*
|
||||
* Note that this class makes no assumptions about any data types. This can be
|
||||
* used to convert data, for example for transforming any structured data into
|
||||
* a newline-delimited JSON (NDJSON) stream like this:
|
||||
*
|
||||
* ```php
|
||||
* $through = new ThroughStream(function ($data) {
|
||||
* return json_encode($data) . PHP_EOL;
|
||||
* });
|
||||
* $through->on('data', $this->expectCallableOnceWith("[2, true]\n"));
|
||||
*
|
||||
* $through->write(array(2, true));
|
||||
* ```
|
||||
*
|
||||
* The callback function is allowed to throw an `Exception`. In this case,
|
||||
* the stream will emit an `error` event and then [`close()`](#close-1) the stream.
|
||||
*
|
||||
* ```php
|
||||
* $through = new ThroughStream(function ($data) {
|
||||
* if (!is_string($data)) {
|
||||
* throw new \UnexpectedValueException('Only strings allowed');
|
||||
* }
|
||||
* return $data;
|
||||
* });
|
||||
* $through->on('error', $this->expectCallableOnce()));
|
||||
* $through->on('close', $this->expectCallableOnce()));
|
||||
* $through->on('data', $this->expectCallableNever()));
|
||||
*
|
||||
* $through->write(2);
|
||||
* ```
|
||||
*
|
||||
* @see WritableStreamInterface::write()
|
||||
* @see WritableStreamInterface::end()
|
||||
* @see DuplexStreamInterface::close()
|
||||
* @see WritableStreamInterface::pipe()
|
||||
*/
|
||||
final class ThroughStream extends EventEmitter implements DuplexStreamInterface
|
||||
{
|
||||
private $readable = true;
|
||||
private $writable = true;
|
||||
private $closed = false;
|
||||
private $paused = false;
|
||||
private $drain = false;
|
||||
private $callback;
|
||||
|
||||
public function __construct($callback = null)
|
||||
{
|
||||
if ($callback !== null && !\is_callable($callback)) {
|
||||
throw new InvalidArgumentException('Invalid transformation callback given');
|
||||
}
|
||||
|
||||
$this->callback = $callback;
|
||||
}
|
||||
|
||||
public function pause()
|
||||
{
|
||||
// only allow pause if still readable, false otherwise
|
||||
$this->paused = $this->readable;
|
||||
}
|
||||
|
||||
public function resume()
|
||||
{
|
||||
$this->paused = false;
|
||||
|
||||
// emit drain event if previous write was paused (throttled)
|
||||
if ($this->drain) {
|
||||
$this->drain = false;
|
||||
$this->emit('drain');
|
||||
}
|
||||
}
|
||||
|
||||
public function pipe(WritableStreamInterface $dest, array $options = array())
|
||||
{
|
||||
return Util::pipe($this, $dest, $options);
|
||||
}
|
||||
|
||||
public function isReadable()
|
||||
{
|
||||
return $this->readable;
|
||||
}
|
||||
|
||||
public function isWritable()
|
||||
{
|
||||
return $this->writable;
|
||||
}
|
||||
|
||||
public function write($data)
|
||||
{
|
||||
if (!$this->writable) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if ($this->callback !== null) {
|
||||
try {
|
||||
$data = \call_user_func($this->callback, $data);
|
||||
} catch (\Exception $e) {
|
||||
$this->emit('error', array($e));
|
||||
$this->close();
|
||||
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
$this->emit('data', array($data));
|
||||
|
||||
// emit drain event on next resume if currently paused (throttled)
|
||||
if ($this->paused) {
|
||||
$this->drain = true;
|
||||
}
|
||||
|
||||
// continue writing if still writable and not paused (throttled), false otherwise
|
||||
return $this->writable && !$this->paused;
|
||||
}
|
||||
|
||||
public function end($data = null)
|
||||
{
|
||||
if (!$this->writable) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (null !== $data) {
|
||||
$this->write($data);
|
||||
|
||||
// return if write() already caused the stream to close
|
||||
if (!$this->writable) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
$this->readable = false;
|
||||
$this->writable = false;
|
||||
$this->paused = false;
|
||||
$this->drain = false;
|
||||
|
||||
$this->emit('end');
|
||||
$this->close();
|
||||
}
|
||||
|
||||
public function close()
|
||||
{
|
||||
if ($this->closed) {
|
||||
return;
|
||||
}
|
||||
|
||||
$this->readable = false;
|
||||
$this->writable = false;
|
||||
$this->paused = false;
|
||||
$this->drain = false;
|
||||
|
||||
$this->closed = true;
|
||||
$this->callback = null;
|
||||
|
||||
$this->emit('close');
|
||||
$this->removeAllListeners();
|
||||
}
|
||||
}
|
||||
+75
@@ -0,0 +1,75 @@
|
||||
<?php
|
||||
|
||||
namespace React\Stream;
|
||||
|
||||
final class Util
|
||||
{
|
||||
/**
|
||||
* Pipes all the data from the given $source into the $dest
|
||||
*
|
||||
* @param ReadableStreamInterface $source
|
||||
* @param WritableStreamInterface $dest
|
||||
* @param array $options
|
||||
* @return WritableStreamInterface $dest stream as-is
|
||||
* @see ReadableStreamInterface::pipe() for more details
|
||||
*/
|
||||
public static function pipe(ReadableStreamInterface $source, WritableStreamInterface $dest, array $options = array())
|
||||
{
|
||||
// source not readable => NO-OP
|
||||
if (!$source->isReadable()) {
|
||||
return $dest;
|
||||
}
|
||||
|
||||
// destination not writable => just pause() source
|
||||
if (!$dest->isWritable()) {
|
||||
$source->pause();
|
||||
|
||||
return $dest;
|
||||
}
|
||||
|
||||
$dest->emit('pipe', array($source));
|
||||
|
||||
// forward all source data events as $dest->write()
|
||||
$source->on('data', $dataer = function ($data) use ($source, $dest) {
|
||||
$feedMore = $dest->write($data);
|
||||
|
||||
if (false === $feedMore) {
|
||||
$source->pause();
|
||||
}
|
||||
});
|
||||
$dest->on('close', function () use ($source, $dataer) {
|
||||
$source->removeListener('data', $dataer);
|
||||
$source->pause();
|
||||
});
|
||||
|
||||
// forward destination drain as $source->resume()
|
||||
$dest->on('drain', $drainer = function () use ($source) {
|
||||
$source->resume();
|
||||
});
|
||||
$source->on('close', function () use ($dest, $drainer) {
|
||||
$dest->removeListener('drain', $drainer);
|
||||
});
|
||||
|
||||
// forward end event from source as $dest->end()
|
||||
$end = isset($options['end']) ? $options['end'] : true;
|
||||
if ($end) {
|
||||
$source->on('end', $ender = function () use ($dest) {
|
||||
$dest->end();
|
||||
});
|
||||
$dest->on('close', function () use ($source, $ender) {
|
||||
$source->removeListener('end', $ender);
|
||||
});
|
||||
}
|
||||
|
||||
return $dest;
|
||||
}
|
||||
|
||||
public static function forwardEvents($source, $target, array $events)
|
||||
{
|
||||
foreach ($events as $event) {
|
||||
$source->on($event, function () use ($event, $target) {
|
||||
$target->emit($event, \func_get_args());
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,178 @@
|
||||
<?php
|
||||
|
||||
namespace React\Stream;
|
||||
|
||||
use Evenement\EventEmitter;
|
||||
use React\EventLoop\Loop;
|
||||
use React\EventLoop\LoopInterface;
|
||||
|
||||
final class WritableResourceStream extends EventEmitter implements WritableStreamInterface
|
||||
{
|
||||
private $stream;
|
||||
|
||||
/** @var LoopInterface */
|
||||
private $loop;
|
||||
|
||||
/**
|
||||
* @var int
|
||||
*/
|
||||
private $softLimit;
|
||||
|
||||
/**
|
||||
* @var int
|
||||
*/
|
||||
private $writeChunkSize;
|
||||
|
||||
private $listening = false;
|
||||
private $writable = true;
|
||||
private $closed = false;
|
||||
private $data = '';
|
||||
|
||||
/**
|
||||
* @param resource $stream
|
||||
* @param ?LoopInterface $loop
|
||||
* @param ?int $writeBufferSoftLimit
|
||||
* @param ?int $writeChunkSize
|
||||
*/
|
||||
public function __construct($stream, $loop = null, $writeBufferSoftLimit = null, $writeChunkSize = null)
|
||||
{
|
||||
if (!\is_resource($stream) || \get_resource_type($stream) !== "stream") {
|
||||
throw new \InvalidArgumentException('First parameter must be a valid stream resource');
|
||||
}
|
||||
|
||||
// ensure resource is opened for writing (fopen mode must contain either of "waxc+")
|
||||
$meta = \stream_get_meta_data($stream);
|
||||
if (isset($meta['mode']) && $meta['mode'] !== '' && \strtr($meta['mode'], 'waxc+', '.....') === $meta['mode']) {
|
||||
throw new \InvalidArgumentException('Given stream resource is not opened in write mode');
|
||||
}
|
||||
|
||||
// this class relies on non-blocking I/O in order to not interrupt the event loop
|
||||
// e.g. pipes on Windows do not support this: https://bugs.php.net/bug.php?id=47918
|
||||
if (\stream_set_blocking($stream, false) !== true) {
|
||||
throw new \RuntimeException('Unable to set stream resource to non-blocking mode');
|
||||
}
|
||||
|
||||
if ($loop !== null && !$loop instanceof LoopInterface) { // manual type check to support legacy PHP < 7.1
|
||||
throw new \InvalidArgumentException('Argument #2 ($loop) expected null|React\EventLoop\LoopInterface');
|
||||
}
|
||||
|
||||
$this->stream = $stream;
|
||||
$this->loop = $loop ?: Loop::get();
|
||||
$this->softLimit = ($writeBufferSoftLimit === null) ? 65536 : (int)$writeBufferSoftLimit;
|
||||
$this->writeChunkSize = ($writeChunkSize === null) ? -1 : (int)$writeChunkSize;
|
||||
}
|
||||
|
||||
public function isWritable()
|
||||
{
|
||||
return $this->writable;
|
||||
}
|
||||
|
||||
public function write($data)
|
||||
{
|
||||
if (!$this->writable) {
|
||||
return false;
|
||||
}
|
||||
|
||||
$this->data .= $data;
|
||||
|
||||
if (!$this->listening && $this->data !== '') {
|
||||
$this->listening = true;
|
||||
|
||||
$this->loop->addWriteStream($this->stream, array($this, 'handleWrite'));
|
||||
}
|
||||
|
||||
return !isset($this->data[$this->softLimit - 1]);
|
||||
}
|
||||
|
||||
public function end($data = null)
|
||||
{
|
||||
if (null !== $data) {
|
||||
$this->write($data);
|
||||
}
|
||||
|
||||
$this->writable = false;
|
||||
|
||||
// close immediately if buffer is already empty
|
||||
// otherwise wait for buffer to flush first
|
||||
if ($this->data === '') {
|
||||
$this->close();
|
||||
}
|
||||
}
|
||||
|
||||
public function close()
|
||||
{
|
||||
if ($this->closed) {
|
||||
return;
|
||||
}
|
||||
|
||||
if ($this->listening) {
|
||||
$this->listening = false;
|
||||
$this->loop->removeWriteStream($this->stream);
|
||||
}
|
||||
|
||||
$this->closed = true;
|
||||
$this->writable = false;
|
||||
$this->data = '';
|
||||
|
||||
$this->emit('close');
|
||||
$this->removeAllListeners();
|
||||
|
||||
if (\is_resource($this->stream)) {
|
||||
\fclose($this->stream);
|
||||
}
|
||||
}
|
||||
|
||||
/** @internal */
|
||||
public function handleWrite()
|
||||
{
|
||||
$error = null;
|
||||
\set_error_handler(function ($_, $errstr) use (&$error) {
|
||||
$error = $errstr;
|
||||
});
|
||||
|
||||
if ($this->writeChunkSize === -1) {
|
||||
$sent = \fwrite($this->stream, $this->data);
|
||||
} else {
|
||||
$sent = \fwrite($this->stream, $this->data, $this->writeChunkSize);
|
||||
}
|
||||
|
||||
\restore_error_handler();
|
||||
|
||||
// Only report errors if *nothing* could be sent and an error has been raised.
|
||||
// Ignore non-fatal warnings if *some* data could be sent.
|
||||
// Any hard (permanent) error will fail to send any data at all.
|
||||
// Sending excessive amounts of data will only flush *some* data and then
|
||||
// report a temporary error (EAGAIN) which we do not raise here in order
|
||||
// to keep the stream open for further tries to write.
|
||||
// Should this turn out to be a permanent error later, it will eventually
|
||||
// send *nothing* and we can detect this.
|
||||
if (($sent === 0 || $sent === false) && $error !== null) {
|
||||
$this->emit('error', array(new \RuntimeException('Unable to write to stream: ' . $error)));
|
||||
$this->close();
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
$exceeded = isset($this->data[$this->softLimit - 1]);
|
||||
$this->data = (string) \substr($this->data, $sent);
|
||||
|
||||
// buffer has been above limit and is now below limit
|
||||
if ($exceeded && !isset($this->data[$this->softLimit - 1])) {
|
||||
$this->emit('drain');
|
||||
}
|
||||
|
||||
// buffer is now completely empty => stop trying to write
|
||||
if ($this->data === '') {
|
||||
// stop waiting for resource to be writable
|
||||
if ($this->listening) {
|
||||
$this->loop->removeWriteStream($this->stream);
|
||||
$this->listening = false;
|
||||
}
|
||||
|
||||
// buffer is end()ing and now completely empty => close buffer
|
||||
if (!$this->writable) {
|
||||
$this->close();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,347 @@
|
||||
<?php
|
||||
|
||||
namespace React\Stream;
|
||||
|
||||
use Evenement\EventEmitterInterface;
|
||||
|
||||
/**
|
||||
* The `WritableStreamInterface` is responsible for providing an interface for
|
||||
* write-only streams and the writable side of duplex streams.
|
||||
*
|
||||
* Besides defining a few methods, this interface also implements the
|
||||
* `EventEmitterInterface` which allows you to react to certain events:
|
||||
*
|
||||
* drain event:
|
||||
* The `drain` event will be emitted whenever the write buffer became full
|
||||
* previously and is now ready to accept more data.
|
||||
*
|
||||
* ```php
|
||||
* $stream->on('drain', function () use ($stream) {
|
||||
* echo 'Stream is now ready to accept more data';
|
||||
* });
|
||||
* ```
|
||||
*
|
||||
* This event SHOULD be emitted once every time the buffer became full
|
||||
* previously and is now ready to accept more data.
|
||||
* In other words, this event MAY be emitted any number of times, which may
|
||||
* be zero times if the buffer never became full in the first place.
|
||||
* This event SHOULD NOT be emitted if the buffer has not become full
|
||||
* previously.
|
||||
*
|
||||
* This event is mostly used internally, see also `write()` for more details.
|
||||
*
|
||||
* pipe event:
|
||||
* The `pipe` event will be emitted whenever a readable stream is `pipe()`d
|
||||
* into this stream.
|
||||
* The event receives a single `ReadableStreamInterface` argument for the
|
||||
* source stream.
|
||||
*
|
||||
* ```php
|
||||
* $stream->on('pipe', function (ReadableStreamInterface $source) use ($stream) {
|
||||
* echo 'Now receiving piped data';
|
||||
*
|
||||
* // explicitly close target if source emits an error
|
||||
* $source->on('error', function () use ($stream) {
|
||||
* $stream->close();
|
||||
* });
|
||||
* });
|
||||
*
|
||||
* $source->pipe($stream);
|
||||
* ```
|
||||
*
|
||||
* This event MUST be emitted once for each readable stream that is
|
||||
* successfully piped into this destination stream.
|
||||
* In other words, this event MAY be emitted any number of times, which may
|
||||
* be zero times if no stream is ever piped into this stream.
|
||||
* This event MUST NOT be emitted if either the source is not readable
|
||||
* (closed already) or this destination is not writable (closed already).
|
||||
*
|
||||
* This event is mostly used internally, see also `pipe()` for more details.
|
||||
*
|
||||
* error event:
|
||||
* The `error` event will be emitted once a fatal error occurs, usually while
|
||||
* trying to write to this stream.
|
||||
* The event receives a single `Exception` argument for the error instance.
|
||||
*
|
||||
* ```php
|
||||
* $stream->on('error', function (Exception $e) {
|
||||
* echo 'Error: ' . $e->getMessage() . PHP_EOL;
|
||||
* });
|
||||
* ```
|
||||
*
|
||||
* This event SHOULD be emitted once the stream detects a fatal error, such
|
||||
* as a fatal transmission error.
|
||||
* It SHOULD NOT be emitted after a previous `error` or `close` event.
|
||||
* It MUST NOT be emitted if this is not a fatal error condition, such as
|
||||
* a temporary network issue that did not cause any data to be lost.
|
||||
*
|
||||
* After the stream errors, it MUST close the stream and SHOULD thus be
|
||||
* followed by a `close` event and then switch to non-writable mode, see
|
||||
* also `close()` and `isWritable()`.
|
||||
*
|
||||
* Many common streams (such as a TCP/IP connection or a file-based stream)
|
||||
* only deal with data transmission and may choose
|
||||
* to only emit this for a fatal transmission error once and will then
|
||||
* close (terminate) the stream in response.
|
||||
*
|
||||
* If this stream is a `DuplexStreamInterface`, you should also notice
|
||||
* how the readable side of the stream also implements an `error` event.
|
||||
* In other words, an error may occur while either reading or writing the
|
||||
* stream which should result in the same error processing.
|
||||
*
|
||||
* close event:
|
||||
* The `close` event will be emitted once the stream closes (terminates).
|
||||
*
|
||||
* ```php
|
||||
* $stream->on('close', function () {
|
||||
* echo 'CLOSED';
|
||||
* });
|
||||
* ```
|
||||
*
|
||||
* This event SHOULD be emitted once or never at all, depending on whether
|
||||
* the stream ever terminates.
|
||||
* It SHOULD NOT be emitted after a previous `close` event.
|
||||
*
|
||||
* After the stream is closed, it MUST switch to non-writable mode,
|
||||
* see also `isWritable()`.
|
||||
*
|
||||
* This event SHOULD be emitted whenever the stream closes, irrespective of
|
||||
* whether this happens implicitly due to an unrecoverable error or
|
||||
* explicitly when either side closes the stream.
|
||||
*
|
||||
* Many common streams (such as a TCP/IP connection or a file-based stream)
|
||||
* will likely choose to emit this event after flushing the buffer from
|
||||
* the `end()` method, after receiving a *successful* `end` event or after
|
||||
* a fatal transmission `error` event.
|
||||
*
|
||||
* If this stream is a `DuplexStreamInterface`, you should also notice
|
||||
* how the readable side of the stream also implements a `close` event.
|
||||
* In other words, after receiving this event, the stream MUST switch into
|
||||
* non-writable AND non-readable mode, see also `isReadable()`.
|
||||
* Note that this event should not be confused with the `end` event.
|
||||
*
|
||||
* The event callback functions MUST be a valid `callable` that obeys strict
|
||||
* parameter definitions and MUST accept event parameters exactly as documented.
|
||||
* The event callback functions MUST NOT throw an `Exception`.
|
||||
* The return value of the event callback functions will be ignored and has no
|
||||
* effect, so for performance reasons you're recommended to not return any
|
||||
* excessive data structures.
|
||||
*
|
||||
* Every implementation of this interface MUST follow these event semantics in
|
||||
* order to be considered a well-behaving stream.
|
||||
*
|
||||
* > Note that higher-level implementations of this interface may choose to
|
||||
* define additional events with dedicated semantics not defined as part of
|
||||
* this low-level stream specification. Conformance with these event semantics
|
||||
* is out of scope for this interface, so you may also have to refer to the
|
||||
* documentation of such a higher-level implementation.
|
||||
*
|
||||
* @see EventEmitterInterface
|
||||
* @see DuplexStreamInterface
|
||||
*/
|
||||
interface WritableStreamInterface extends EventEmitterInterface
|
||||
{
|
||||
/**
|
||||
* Checks whether this stream is in a writable state (not closed already).
|
||||
*
|
||||
* This method can be used to check if the stream still accepts writing
|
||||
* any data or if it is ended or closed already.
|
||||
* Writing any data to a non-writable stream is a NO-OP:
|
||||
*
|
||||
* ```php
|
||||
* assert($stream->isWritable() === false);
|
||||
*
|
||||
* $stream->write('end'); // NO-OP
|
||||
* $stream->end('end'); // NO-OP
|
||||
* ```
|
||||
*
|
||||
* A successfully opened stream always MUST start in writable mode.
|
||||
*
|
||||
* Once the stream ends or closes, it MUST switch to non-writable mode.
|
||||
* This can happen any time, explicitly through `end()` or `close()` or
|
||||
* implicitly due to a remote close or an unrecoverable transmission error.
|
||||
* Once a stream has switched to non-writable mode, it MUST NOT transition
|
||||
* back to writable mode.
|
||||
*
|
||||
* If this stream is a `DuplexStreamInterface`, you should also notice
|
||||
* how the readable side of the stream also implements an `isReadable()`
|
||||
* method. Unless this is a half-open duplex stream, they SHOULD usually
|
||||
* have the same return value.
|
||||
*
|
||||
* @return bool
|
||||
*/
|
||||
public function isWritable();
|
||||
|
||||
/**
|
||||
* Write some data into the stream.
|
||||
*
|
||||
* A successful write MUST be confirmed with a boolean `true`, which means
|
||||
* that either the data was written (flushed) immediately or is buffered and
|
||||
* scheduled for a future write. Note that this interface gives you no
|
||||
* control over explicitly flushing the buffered data, as finding the
|
||||
* appropriate time for this is beyond the scope of this interface and left
|
||||
* up to the implementation of this interface.
|
||||
*
|
||||
* Many common streams (such as a TCP/IP connection or file-based stream)
|
||||
* may choose to buffer all given data and schedule a future flush by using
|
||||
* an underlying EventLoop to check when the resource is actually writable.
|
||||
*
|
||||
* If a stream cannot handle writing (or flushing) the data, it SHOULD emit
|
||||
* an `error` event and MAY `close()` the stream if it can not recover from
|
||||
* this error.
|
||||
*
|
||||
* If the internal buffer is full after adding `$data`, then `write()`
|
||||
* SHOULD return `false`, indicating that the caller should stop sending
|
||||
* data until the buffer drains.
|
||||
* The stream SHOULD send a `drain` event once the buffer is ready to accept
|
||||
* more data.
|
||||
*
|
||||
* Similarly, if the stream is not writable (already in a closed state)
|
||||
* it MUST NOT process the given `$data` and SHOULD return `false`,
|
||||
* indicating that the caller should stop sending data.
|
||||
*
|
||||
* The given `$data` argument MAY be of mixed type, but it's usually
|
||||
* recommended it SHOULD be a `string` value or MAY use a type that allows
|
||||
* representation as a `string` for maximum compatibility.
|
||||
*
|
||||
* Many common streams (such as a TCP/IP connection or a file-based stream)
|
||||
* will only accept the raw (binary) payload data that is transferred over
|
||||
* the wire as chunks of `string` values.
|
||||
*
|
||||
* Due to the stream-based nature of this, the sender may send any number
|
||||
* of chunks with varying sizes. There are no guarantees that these chunks
|
||||
* will be received with the exact same framing the sender intended to send.
|
||||
* In other words, many lower-level protocols (such as TCP/IP) transfer the
|
||||
* data in chunks that may be anywhere between single-byte values to several
|
||||
* dozens of kilobytes. You may want to apply a higher-level protocol to
|
||||
* these low-level data chunks in order to achieve proper message framing.
|
||||
*
|
||||
* @param mixed|string $data
|
||||
* @return bool
|
||||
*/
|
||||
public function write($data);
|
||||
|
||||
/**
|
||||
* Successfully ends the stream (after optionally sending some final data).
|
||||
*
|
||||
* This method can be used to successfully end the stream, i.e. close
|
||||
* the stream after sending out all data that is currently buffered.
|
||||
*
|
||||
* ```php
|
||||
* $stream->write('hello');
|
||||
* $stream->write('world');
|
||||
* $stream->end();
|
||||
* ```
|
||||
*
|
||||
* If there's no data currently buffered and nothing to be flushed, then
|
||||
* this method MAY `close()` the stream immediately.
|
||||
*
|
||||
* If there's still data in the buffer that needs to be flushed first, then
|
||||
* this method SHOULD try to write out this data and only then `close()`
|
||||
* the stream.
|
||||
* Once the stream is closed, it SHOULD emit a `close` event.
|
||||
*
|
||||
* Note that this interface gives you no control over explicitly flushing
|
||||
* the buffered data, as finding the appropriate time for this is beyond the
|
||||
* scope of this interface and left up to the implementation of this
|
||||
* interface.
|
||||
*
|
||||
* Many common streams (such as a TCP/IP connection or file-based stream)
|
||||
* may choose to buffer all given data and schedule a future flush by using
|
||||
* an underlying EventLoop to check when the resource is actually writable.
|
||||
*
|
||||
* You can optionally pass some final data that is written to the stream
|
||||
* before ending the stream. If a non-`null` value is given as `$data`, then
|
||||
* this method will behave just like calling `write($data)` before ending
|
||||
* with no data.
|
||||
*
|
||||
* ```php
|
||||
* // shorter version
|
||||
* $stream->end('bye');
|
||||
*
|
||||
* // same as longer version
|
||||
* $stream->write('bye');
|
||||
* $stream->end();
|
||||
* ```
|
||||
*
|
||||
* After calling this method, the stream MUST switch into a non-writable
|
||||
* mode, see also `isWritable()`.
|
||||
* This means that no further writes are possible, so any additional
|
||||
* `write()` or `end()` calls have no effect.
|
||||
*
|
||||
* ```php
|
||||
* $stream->end();
|
||||
* assert($stream->isWritable() === false);
|
||||
*
|
||||
* $stream->write('nope'); // NO-OP
|
||||
* $stream->end(); // NO-OP
|
||||
* ```
|
||||
*
|
||||
* If this stream is a `DuplexStreamInterface`, calling this method SHOULD
|
||||
* also end its readable side, unless the stream supports half-open mode.
|
||||
* In other words, after calling this method, these streams SHOULD switch
|
||||
* into non-writable AND non-readable mode, see also `isReadable()`.
|
||||
* This implies that in this case, the stream SHOULD NOT emit any `data`
|
||||
* or `end` events anymore.
|
||||
* Streams MAY choose to use the `pause()` method logic for this, but
|
||||
* special care may have to be taken to ensure a following call to the
|
||||
* `resume()` method SHOULD NOT continue emitting readable events.
|
||||
*
|
||||
* Note that this method should not be confused with the `close()` method.
|
||||
*
|
||||
* @param mixed|string|null $data
|
||||
* @return void
|
||||
*/
|
||||
public function end($data = null);
|
||||
|
||||
/**
|
||||
* Closes the stream (forcefully).
|
||||
*
|
||||
* This method can be used to forcefully close the stream, i.e. close
|
||||
* the stream without waiting for any buffered data to be flushed.
|
||||
* If there's still data in the buffer, this data SHOULD be discarded.
|
||||
*
|
||||
* ```php
|
||||
* $stream->close();
|
||||
* ```
|
||||
*
|
||||
* Once the stream is closed, it SHOULD emit a `close` event.
|
||||
* Note that this event SHOULD NOT be emitted more than once, in particular
|
||||
* if this method is called multiple times.
|
||||
*
|
||||
* After calling this method, the stream MUST switch into a non-writable
|
||||
* mode, see also `isWritable()`.
|
||||
* This means that no further writes are possible, so any additional
|
||||
* `write()` or `end()` calls have no effect.
|
||||
*
|
||||
* ```php
|
||||
* $stream->close();
|
||||
* assert($stream->isWritable() === false);
|
||||
*
|
||||
* $stream->write('nope'); // NO-OP
|
||||
* $stream->end(); // NO-OP
|
||||
* ```
|
||||
*
|
||||
* Note that this method should not be confused with the `end()` method.
|
||||
* Unlike the `end()` method, this method does not take care of any existing
|
||||
* buffers and simply discards any buffer contents.
|
||||
* Likewise, this method may also be called after calling `end()` on a
|
||||
* stream in order to stop waiting for the stream to flush its final data.
|
||||
*
|
||||
* ```php
|
||||
* $stream->end();
|
||||
* Loop::addTimer(1.0, function () use ($stream) {
|
||||
* $stream->close();
|
||||
* });
|
||||
* ```
|
||||
*
|
||||
* If this stream is a `DuplexStreamInterface`, you should also notice
|
||||
* how the readable side of the stream also implements a `close()` method.
|
||||
* In other words, after calling this method, the stream MUST switch into
|
||||
* non-writable AND non-readable mode, see also `isReadable()`.
|
||||
*
|
||||
* @return void
|
||||
* @see ReadableStreamInterface::close()
|
||||
*/
|
||||
public function close();
|
||||
}
|
||||
Reference in New Issue
Block a user