Page MenuHomePhorge

No OneTemporary

diff --git a/src/future/Future.php b/src/future/Future.php
index 08772279..f58a5287 100644
--- a/src/future/Future.php
+++ b/src/future/Future.php
@@ -1,132 +1,157 @@
<?php
/**
* A 'future' or 'promise' is an object which represents the result of some
* pending computation. For a more complete overview of futures, see
* @{article:Using Futures}.
*/
abstract class Future extends Phobject {
private $hasResult = false;
private $result;
-
- protected $exception;
+ private $exception;
/**
* Is this future's process complete? Specifically, can this future be
* resolved without blocking?
*
* @return bool If true, the external process is complete and resolving this
* future will not block.
*/
abstract public function isReady();
/**
* Resolve a future and return its result, blocking until the result is ready
* if necessary.
*
* @return wild Future result.
*/
public function resolve() {
$args = func_get_args();
if (count($args)) {
throw new Exception(
pht(
'Parameter "timeout" to "Future->resolve()" is no longer '.
'supported. Update the caller so it no longer passes a '.
'timeout.'));
}
- $graph = new FutureIterator(array($this));
- $graph->resolveAll();
+ if (!$this->hasResult()) {
+ $graph = new FutureIterator(array($this));
+ $graph->resolveAll();
+ }
- if ($this->exception) {
- throw $this->exception;
+ if ($this->hasException()) {
+ throw $this->getException();
}
return $this->getResult();
}
- public function setException(Exception $ex) {
- $this->exception = $ex;
- return $this;
- }
+ final public function updateFuture() {
+ if ($this->hasException()) {
+ return;
+ }
- public function getException() {
- return $this->exception;
+ if ($this->hasResult()) {
+ return;
+ }
+
+ try {
+ $this->isReady();
+ } catch (Exception $ex) {
+ $this->setException($ex);
+ } catch (Throwable $ex) {
+ $this->setException($ex);
+ }
}
/**
* Retrieve a list of sockets which we can wait to become readable while
* a future is resolving. If your future has sockets which can be
* `select()`ed, return them here (or in @{method:getWriteSockets}) to make
* the resolve loop do a `select()`. If you do not return sockets in either
* case, you'll get a busy wait.
*
* @return list A list of sockets which we expect to become readable.
*/
public function getReadSockets() {
return array();
}
/**
* Retrieve a list of sockets which we can wait to become writable while a
* future is resolving. See @{method:getReadSockets}.
*
* @return list A list of sockets which we expect to become writable.
*/
public function getWriteSockets() {
return array();
}
/**
* Default amount of time to wait on stream select for this future. Normally
* 1 second is fine, but if the future has a timeout sooner than that it
* should return the amount of time left before the timeout.
*/
public function getDefaultWait() {
return 1;
}
public function start() {
$this->isReady();
return $this;
}
/**
* Retrieve the final result of the future.
*
* @return wild Final resolution of this future.
*/
final protected function getResult() {
if (!$this->hasResult()) {
throw new Exception(
pht(
'Future has not yet resolved. Resolve futures before retrieving '.
'results.'));
}
return $this->result;
}
final protected function setResult($result) {
if ($this->hasResult()) {
throw new Exception(
pht(
'Future has already resolved. Futures may not resolve more than '.
'once.'));
}
$this->hasResult = true;
$this->result = $result;
return $this;
}
- final protected function hasResult() {
+ final public function hasResult() {
return $this->hasResult;
}
+ final private function setException($exception) {
+ // NOTE: The parameter may be an Exception or a Throwable.
+ $this->exception = $exception;
+ return $this;
+ }
+
+ final private function getException() {
+ return $this->exception;
+ }
+
+ final public function hasException() {
+ return ($this->exception !== null);
+ }
+
+
}
diff --git a/src/future/FutureIterator.php b/src/future/FutureIterator.php
index 933fe9d8..c8b01294 100644
--- a/src/future/FutureIterator.php
+++ b/src/future/FutureIterator.php
@@ -1,379 +1,382 @@
<?php
/**
* FutureIterator aggregates @{class:Future}s and allows you to respond to them
* in the order they resolve. This is useful because it minimizes the amount of
* time your program spends waiting on parallel processes.
*
* $futures = array(
* 'a.txt' => new ExecFuture('wc -c a.txt'),
* 'b.txt' => new ExecFuture('wc -c b.txt'),
* 'c.txt' => new ExecFuture('wc -c c.txt'),
* );
*
* foreach (new FutureIterator($futures) as $key => $future) {
* // IMPORTANT: keys are preserved but the order of elements is not. This
* // construct iterates over the futures in the order they resolve, so the
* // fastest future is the one you'll get first. This allows you to start
* // doing followup processing as soon as possible.
*
* list($err, $stdout) = $future->resolve();
* do_some_processing($stdout);
* }
*
* For a general overview of futures, see @{article:Using Futures}.
*
* @task basics Basics
* @task config Configuring Iteration
* @task iterator Iterator Interface
* @task internal Internals
*/
final class FutureIterator extends Phobject implements Iterator {
protected $wait = array();
protected $work = array();
protected $futures = array();
protected $key;
protected $limit;
protected $timeout;
protected $isTimeout = false;
/* -( Basics )------------------------------------------------------------- */
/**
* Create a new iterator over a list of futures.
*
* @param list List of @{class:Future}s to resolve.
* @task basics
*/
public function __construct(array $futures) {
assert_instances_of($futures, 'Future');
$this->futures = $futures;
}
/**
* Block until all futures resolve.
*
* @return void
* @task basics
*/
public function resolveAll() {
iterator_to_array($this);
}
/**
* Add another future to the set of futures. This is useful if you have a
* set of futures to run mostly in parallel, but some futures depend on
* others.
*
* @param Future @{class:Future} to add to iterator
* @task basics
*/
public function addFuture(Future $future, $key = null) {
if ($key === null) {
$this->futures[] = $future;
$this->wait[] = last_key($this->futures);
} else if (!isset($this->futures[$key])) {
$this->futures[$key] = $future;
$this->wait[] = $key;
} else {
throw new Exception(pht('Invalid key %s', $key));
}
// Start running the future if we don't have $this->limit futures running
// already. updateWorkingSet() won't start running the future if there's no
// limit, so we'll manually poke it here in that case.
$this->updateWorkingSet();
if (!$this->limit) {
$future->isReady();
}
return $this;
}
/* -( Configuring Iteration )---------------------------------------------- */
/**
* Set a maximum amount of time you want to wait before the iterator will
* yield a result. If no future has resolved yet, the iterator will yield
* null for key and value. Among other potential uses, you can use this to
* show some busy indicator:
*
* $futures = id(new FutureIterator($futures))
* ->setUpdateInterval(1);
* foreach ($futures as $future) {
* if ($future === null) {
* echo "Still working...\n";
* } else {
* // ...
* }
* }
*
* This will echo "Still working..." once per second as long as futures are
* resolving. By default, FutureIterator never yields null.
*
* @param float Maximum number of seconds to block waiting on futures before
* yielding null.
* @return this
*
* @task config
*/
public function setUpdateInterval($interval) {
$this->timeout = $interval;
return $this;
}
/**
* Limit the number of simultaneously executing futures.
*
* $futures = id(new FutureIterator($futures))
* ->limit(4);
* foreach ($futures as $future) {
* // Run no more than 4 futures simultaneously.
* }
*
* @param int Maximum number of simultaneous jobs allowed.
* @return this
*
* @task config
*/
public function limit($max) {
$this->limit = $max;
return $this;
}
/* -( Iterator Interface )------------------------------------------------- */
/**
* @task iterator
*/
public function rewind() {
$this->wait = array_keys($this->futures);
$this->work = null;
$this->updateWorkingSet();
$this->next();
}
/**
* @task iterator
*/
public function next() {
$this->key = null;
if (!count($this->wait)) {
return;
}
$read_sockets = array();
$write_sockets = array();
$start = microtime(true);
$timeout = $this->timeout;
$this->isTimeout = false;
$check = $this->getWorkingSet();
+
$resolve = null;
do {
$read_sockets = array();
$write_sockets = array();
$can_use_sockets = true;
$wait_time = 1;
foreach ($check as $wait => $key) {
$future = $this->futures[$key];
- try {
- if ($future->getException()) {
+
+ $future->updateFuture();
+
+ if ($future->hasException()) {
+ if ($resolve === null) {
$resolve = $wait;
- continue;
- }
- if ($future->isReady()) {
- if ($resolve === null) {
- $resolve = $wait;
- }
- continue;
}
+ continue;
+ }
- $got_sockets = false;
- $socks = $future->getReadSockets();
- if ($socks) {
- $got_sockets = true;
- foreach ($socks as $socket) {
- $read_sockets[] = $socket;
- }
+ if ($future->hasResult()) {
+ if ($resolve === null) {
+ $resolve = $wait;
}
+ continue;
+ }
- $socks = $future->getWriteSockets();
- if ($socks) {
- $got_sockets = true;
- foreach ($socks as $socket) {
- $write_sockets[] = $socket;
- }
+ $got_sockets = false;
+ $socks = $future->getReadSockets();
+ if ($socks) {
+ $got_sockets = true;
+ foreach ($socks as $socket) {
+ $read_sockets[] = $socket;
}
+ }
- // If any currently active future had neither read nor write sockets,
- // we can't wait for the current batch of items using sockets.
- if (!$got_sockets) {
- $can_use_sockets = false;
- } else {
- $wait_time = min($wait_time, $future->getDefaultWait());
+ $socks = $future->getWriteSockets();
+ if ($socks) {
+ $got_sockets = true;
+ foreach ($socks as $socket) {
+ $write_sockets[] = $socket;
}
- } catch (Exception $ex) {
- $this->futures[$key]->setException($ex);
- $resolve = $wait;
- break;
+ }
+
+ // If any currently active future had neither read nor write sockets,
+ // we can't wait for the current batch of items using sockets.
+ if (!$got_sockets) {
+ $can_use_sockets = false;
+ } else {
+ $wait_time = min($wait_time, $future->getDefaultWait());
}
}
+
if ($resolve === null) {
// Check for a setUpdateInterval() timeout.
if ($timeout !== null) {
$elapsed = microtime(true) - $start;
if ($elapsed > $timeout) {
$this->isTimeout = true;
return;
} else {
$wait_time = $timeout - $elapsed;
}
}
if ($can_use_sockets) {
$this->waitForSockets($read_sockets, $write_sockets, $wait_time);
} else {
usleep(1000);
}
}
} while ($resolve === null);
$this->key = $this->wait[$resolve];
unset($this->wait[$resolve]);
+
$this->updateWorkingSet();
}
/**
* @task iterator
*/
public function current() {
if ($this->isTimeout) {
return null;
}
return $this->futures[$this->key];
}
/**
* @task iterator
*/
public function key() {
if ($this->isTimeout) {
return null;
}
return $this->key;
}
/**
* @task iterator
*/
public function valid() {
if ($this->isTimeout) {
return true;
}
return ($this->key !== null);
}
/* -( Internals )---------------------------------------------------------- */
/**
* @task internal
*/
protected function getWorkingSet() {
if ($this->work === null) {
return $this->wait;
}
return $this->work;
}
/**
* @task internal
*/
protected function updateWorkingSet() {
if (!$this->limit) {
return;
}
$old = $this->work;
$this->work = array_slice($this->wait, 0, $this->limit, true);
// If we're using a limit, our futures are sleeping and need to be polled
// to begin execution, so poll any futures which weren't in our working set
// before.
foreach ($this->work as $work => $key) {
if (!isset($old[$work])) {
$this->futures[$key]->isReady();
}
}
}
/**
* Wait for activity on one of several sockets.
*
* @param list List of sockets expected to become readable.
* @param list List of sockets expected to become writable.
* @param float Timeout, in seconds.
* @return void
*/
private function waitForSockets(
array $read_list,
array $write_list,
$timeout = 1.0) {
static $handler_installed = false;
if (!$handler_installed) {
// If we're spawning child processes, we need to install a signal handler
// here to catch cases like execing '(sleep 60 &) &' where the child
// exits but a socket is kept open. But we don't actually need to do
// anything because the SIGCHLD will interrupt the stream_select(), as
// long as we have a handler registered.
if (function_exists('pcntl_signal')) {
if (!pcntl_signal(SIGCHLD, array(__CLASS__, 'handleSIGCHLD'))) {
throw new Exception(pht('Failed to install signal handler!'));
}
}
$handler_installed = true;
}
$timeout_sec = (int)$timeout;
$timeout_usec = (int)(1000000 * ($timeout - $timeout_sec));
$exceptfds = array();
$ok = @stream_select(
$read_list,
$write_list,
$exceptfds,
$timeout_sec,
$timeout_usec);
if ($ok === false) {
// Hopefully, means we received a SIGCHLD. In the worst case, we degrade
// to a busy wait.
}
}
public static function handleSIGCHLD($signo) {
// This function is a dummy, we just need to have some handler registered
// so that PHP will get interrupted during "stream_select()". If we don't
// register a handler, "stream_select()" won't fail.
}
}
diff --git a/src/future/FutureProxy.php b/src/future/FutureProxy.php
index 4b90df40..65cbffa2 100644
--- a/src/future/FutureProxy.php
+++ b/src/future/FutureProxy.php
@@ -1,64 +1,68 @@
<?php
/**
* Wraps another @{class:Future} and allows you to post-process its result once
* it resolves.
*/
abstract class FutureProxy extends Future {
private $proxied;
public function __construct(Future $proxied = null) {
if ($proxied) {
$this->setProxiedFuture($proxied);
}
}
public function setProxiedFuture(Future $proxied) {
$this->proxied = $proxied;
return $this;
}
protected function getProxiedFuture() {
if (!$this->proxied) {
throw new Exception(pht('The proxied future has not been provided yet.'));
}
return $this->proxied;
}
public function isReady() {
- return $this->getProxiedFuture()->isReady();
- }
+ if ($this->hasResult()) {
+ return true;
+ }
- public function resolve() {
- $result = $this->getProxiedFuture()->resolve();
- $result = $this->didReceiveResult($result);
- $this->setResult($result);
- return $this->getResult();
- }
+ $proxied = $this->getProxiedFuture();
- public function setException(Exception $ex) {
- $this->getProxiedFuture()->setException($ex);
- return $this;
+ $is_ready = $proxied->isReady();
+
+ if ($proxied->hasResult()) {
+ $result = $proxied->getResult();
+ $result = $this->didReceiveResult($result);
+ $this->setResult($result);
+ }
+
+ return $is_ready;
}
- public function getException() {
- return $this->getProxiedFuture()->getException();
+ public function resolve() {
+ $this->getProxiedFuture()->resolve();
+ $this->isReady();
+ return $this->getResult();
}
public function getReadSockets() {
return $this->getProxiedFuture()->getReadSockets();
}
public function getWriteSockets() {
return $this->getProxiedFuture()->getWriteSockets();
}
public function start() {
$this->getProxiedFuture()->start();
return $this;
}
abstract protected function didReceiveResult($result);
}

File Metadata

Mime Type
text/x-diff
Expires
Sun, Jan 19, 20:03 (2 w, 5 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1128263
Default Alt Text
(17 KB)

Event Timeline