Page MenuHomePhorge

PhutilChannel.php
No OneTemporary

PhutilChannel.php

<?php
/**
* Wrapper around streams, pipes, and other things that have basic read/write
* I/O characteristics.
*
* Channels include buffering, so you can do fire-and-forget writes and reads
* without worrying about network/pipe buffers. Use @{method:read} and
* @{method:write} to read and write.
*
* Channels are nonblocking and provide a select()-oriented interface so you
* can reasonably write server-like and daemon-like things with them. Use
* @{method:waitForAny} to select channels.
*
* Channel operations other than @{method:update} generally operate on buffers.
* Writes and reads affect buffers, while @{method:update} flushes output
* buffers and fills input buffers.
*
* Channels are either "open" or "closed". You can detect that a channel has
* closed by calling @{method:isOpen} or examining the return value of
* @{method:update}.
*
* NOTE: Channels are new (as of June 2012) and subject to interface changes.
*
* @task io Reading and Writing
* @task wait Waiting for Activity
* @task update Responding to Activity
* @task impl Channel Implementation
*/
abstract class PhutilChannel extends Phobject {
private $ibuf = '';
private $obuf;
private $name;
private $readBufferSize;
public function __construct() {
$this->obuf = new PhutilRope();
}
/* -( Reading and Writing )------------------------------------------------ */
/**
* Read from the channel. A channel defines the format of data that is read
* from it, so this method may return strings, objects, or anything else.
*
* The default implementation returns bytes.
*
* @return wild Data from the channel, normally bytes.
*
* @task io
*/
public function read() {
$result = $this->ibuf;
$this->ibuf = '';
return $result;
}
/**
* Write to the channel. A channel defines what data format it accepts,
* so this method may take strings, objects, or anything else.
*
* The default implementation accepts bytes.
*
* @param wild $bytes Data to write to the channel, normally bytes.
* @return $this
*
* @task io
*/
public function write($bytes) {
if (!is_scalar($bytes)) {
throw new Exception(
pht(
'%s may only write strings!',
__METHOD__.'()'));
}
$this->obuf->append($bytes);
return $this;
}
/* -( Waiting for Activity )----------------------------------------------- */
/**
* Wait for any activity on a list of channels. Convenience wrapper around
* @{method:waitForActivity}.
*
* @param list<PhutilChannel> $channels A list of channels to wait for.
* @param dict $options (optional) Options, see above.
* @return void
*
* @task wait
*/
public static function waitForAny(array $channels, array $options = array()) {
return self::waitForActivity($channels, $channels, $options);
}
/**
* Wait (using select()) for channels to become ready for reads or writes.
* This method blocks until some channel is ready to be updated.
*
* It does not provide a way to determine which channels are ready to be
* updated. The expectation is that you'll just update every channel. This
* might change eventually.
*
* Available options are:
*
* - 'read' (list<stream>) Additional streams to select for read.
* - 'write' (list<stream>) Additional streams to select for write.
* - 'except' (list<stream>) Additional streams to select for except.
* - 'timeout' (float) Select timeout, defaults to 1.
*
* NOTE: Extra streams must be //streams//, not //sockets//, because this
* method uses `stream_select()`, not `socket_select()`.
*
* @param list<PhutilChannel> $reads List of channels to wait for reads on.
* @param list<PhutilChannel> $writes List of channels to wait for writes on.
* @param dict $options (optional) Options, see above.
* @return void
*
* @task wait
*/
public static function waitForActivity(
array $reads,
array $writes,
array $options = array()) {
assert_instances_of($reads, __CLASS__);
assert_instances_of($writes, __CLASS__);
$read = idx($options, 'read', array());
$write = idx($options, 'write', array());
$except = idx($options, 'except', array());
$wait = idx($options, 'timeout', 1);
// TODO: It would be nice to just be able to categorically reject these as
// unselectable.
foreach (array($reads, $writes) as $channels) {
foreach ($channels as $channel) {
$r_sockets = $channel->getReadSockets();
$w_sockets = $channel->getWriteSockets();
// If any channel has no read sockets and no write sockets, assume it
// isn't selectable and return immediately (effectively degrading to a
// busy wait).
if (!$r_sockets && !$w_sockets) {
return false;
}
}
}
foreach ($reads as $channel) {
// If any of the read channels have data in read buffers, return
// immediately. If we don't, we risk running select() on a bunch of
// sockets which won't become readable because the data the application
// expects is already in a read buffer.
if (!$channel->isReadBufferEmpty()) {
return;
}
$r_sockets = $channel->getReadSockets();
foreach ($r_sockets as $socket) {
$read[] = $socket;
$except[] = $socket;
}
}
foreach ($writes as $channel) {
if ($channel->isWriteBufferEmpty()) {
// If the channel's write buffer is empty, don't select the write
// sockets, since they're writable immediately.
$w_sockets = array();
} else {
$w_sockets = $channel->getWriteSockets();
}
foreach ($w_sockets as $socket) {
$write[] = $socket;
$except[] = $socket;
}
}
if (!$read && !$write && !$except) {
return false;
}
$wait_sec = (int)$wait;
$wait_usec = 1000000 * ($wait - $wait_sec);
@stream_select($read, $write, $except, $wait_sec, $wait_usec);
}
/* -( Responding to Activity )--------------------------------------------- */
/**
* Updates the channel, filling input buffers and flushing output buffers.
* Returns false if the channel has closed.
*
* @return bool True if the channel is still open.
*
* @task update
*/
public function update() {
$maximum_read = PHP_INT_MAX;
if ($this->readBufferSize !== null) {
$maximum_read = ($this->readBufferSize - strlen($this->ibuf));
}
while ($maximum_read > 0) {
$in = $this->readBytes($maximum_read);
if (!strlen($in)) {
// Reading is blocked for now.
break;
}
$this->ibuf .= $in;
$maximum_read -= strlen($in);
}
while ($this->obuf->getByteLength()) {
$len = $this->writeBytes($this->obuf->getAnyPrefix());
if (!$len) {
// Writing is blocked for now.
break;
}
$this->obuf->removeBytesFromHead($len);
}
return $this->isOpen();
}
/* -( Channel Implementation )--------------------------------------------- */
/**
* Set a channel name. This is primarily intended to allow you to debug
* channel code more easily, by naming channels something meaningful.
*
* @param string $name Channel name.
* @return $this
*
* @task impl
*/
public function setName($name) {
$this->name = $name;
return $this;
}
/**
* Get the channel name, as set by @{method:setName}.
*
* @return string Name of the channel.
*
* @task impl
*/
public function getName() {
return coalesce($this->name, get_class($this));
}
/**
* Test if the channel is open: active, can be read from and written to, etc.
*
* @return bool True if the channel is open.
*
* @task impl
*/
abstract public function isOpen();
/**
* Close the channel for writing.
*
* @return void
* @task impl
*/
abstract public function closeWriteChannel();
/**
* Test if the channel is open for reading.
*
* @return bool True if the channel is open for reading.
*
* @task impl
*/
public function isOpenForReading() {
return $this->isOpen();
}
/**
* Test if the channel is open for writing.
*
* @return bool True if the channel is open for writing.
*
* @task impl
*/
public function isOpenForWriting() {
return $this->isOpen();
}
/**
* Read from the channel's underlying I/O.
*
* @param int $length Maximum number of bytes to read.
* @return string Bytes, if available.
*
* @task impl
*/
abstract protected function readBytes($length);
/**
* Write to the channel's underlying I/O.
*
* @param string $bytes Bytes to write.
* @return int Number of bytes written.
*
* @task impl
*/
abstract protected function writeBytes($bytes);
/**
* Get sockets to select for reading.
*
* @return list<stream> Read sockets.
*
* @task impl
*/
protected function getReadSockets() {
return array();
}
/**
* Get sockets to select for writing.
*
* @return list<stream> Write sockets.
*
* @task impl
*/
protected function getWriteSockets() {
return array();
}
/**
* Set the maximum size of the channel's read buffer. Reads will artificially
* block once the buffer reaches this size until the in-process buffer is
* consumed.
*
* @param int|null $size Maximum read buffer size, or `null` for a limitless
* buffer.
* @return $this
* @task impl
*/
public function setReadBufferSize($size) {
$this->readBufferSize = $size;
return $this;
}
/**
* Test state of the read buffer.
*
* @return bool True if the read buffer is empty.
*
* @task impl
*/
public function isReadBufferEmpty() {
return (strlen($this->ibuf) == 0);
}
/**
* Test state of the write buffer.
*
* @return bool True if the write buffer is empty.
*
* @task impl
*/
public function isWriteBufferEmpty() {
return !$this->getWriteBufferSize();
}
/**
* Get the number of bytes we're currently waiting to write.
*
* @return int Number of waiting bytes.
*
* @task impl
*/
public function getWriteBufferSize() {
return $this->obuf->getByteLength();
}
/**
* Wait for any buffered writes to complete. This is a blocking call. When
* the call returns, the write buffer will be empty.
*
* @task impl
*/
public function flush() {
while (!$this->isWriteBufferEmpty()) {
self::waitForAny(array($this));
if (!$this->update()) {
throw new Exception(pht('Channel closed while flushing output!'));
}
}
return $this;
}
}

File Metadata

Mime Type
text/x-php
Expires
Thu, Dec 19, 17:00 (18 h, 36 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1014856
Default Alt Text
PhutilChannel.php (10 KB)

Event Timeline