Page Menu
Home
Phorge
Search
Configure Global Search
Log In
Files
F2894551
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Award Token
Flag For Later
Advanced/Developer...
View Handle
View Hovercard
Size
17 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/src/future/Future.php b/src/future/Future.php
index 08772279..f58a5287 100644
--- a/src/future/Future.php
+++ b/src/future/Future.php
@@ -1,132 +1,157 @@
<?php
/**
* A 'future' or 'promise' is an object which represents the result of some
* pending computation. For a more complete overview of futures, see
* @{article:Using Futures}.
*/
abstract class Future extends Phobject {
private $hasResult = false;
private $result;
-
- protected $exception;
+ private $exception;
/**
* Is this future's process complete? Specifically, can this future be
* resolved without blocking?
*
* @return bool If true, the external process is complete and resolving this
* future will not block.
*/
abstract public function isReady();
/**
* Resolve a future and return its result, blocking until the result is ready
* if necessary.
*
* @return wild Future result.
*/
public function resolve() {
$args = func_get_args();
if (count($args)) {
throw new Exception(
pht(
'Parameter "timeout" to "Future->resolve()" is no longer '.
'supported. Update the caller so it no longer passes a '.
'timeout.'));
}
- $graph = new FutureIterator(array($this));
- $graph->resolveAll();
+ if (!$this->hasResult()) {
+ $graph = new FutureIterator(array($this));
+ $graph->resolveAll();
+ }
- if ($this->exception) {
- throw $this->exception;
+ if ($this->hasException()) {
+ throw $this->getException();
}
return $this->getResult();
}
- public function setException(Exception $ex) {
- $this->exception = $ex;
- return $this;
- }
+ final public function updateFuture() {
+ if ($this->hasException()) {
+ return;
+ }
- public function getException() {
- return $this->exception;
+ if ($this->hasResult()) {
+ return;
+ }
+
+ try {
+ $this->isReady();
+ } catch (Exception $ex) {
+ $this->setException($ex);
+ } catch (Throwable $ex) {
+ $this->setException($ex);
+ }
}
/**
* Retrieve a list of sockets which we can wait to become readable while
* a future is resolving. If your future has sockets which can be
* `select()`ed, return them here (or in @{method:getWriteSockets}) to make
* the resolve loop do a `select()`. If you do not return sockets in either
* case, you'll get a busy wait.
*
* @return list A list of sockets which we expect to become readable.
*/
public function getReadSockets() {
return array();
}
/**
* Retrieve a list of sockets which we can wait to become writable while a
* future is resolving. See @{method:getReadSockets}.
*
* @return list A list of sockets which we expect to become writable.
*/
public function getWriteSockets() {
return array();
}
/**
* Default amount of time to wait on stream select for this future. Normally
* 1 second is fine, but if the future has a timeout sooner than that it
* should return the amount of time left before the timeout.
*/
public function getDefaultWait() {
return 1;
}
public function start() {
$this->isReady();
return $this;
}
/**
* Retrieve the final result of the future.
*
* @return wild Final resolution of this future.
*/
final protected function getResult() {
if (!$this->hasResult()) {
throw new Exception(
pht(
'Future has not yet resolved. Resolve futures before retrieving '.
'results.'));
}
return $this->result;
}
final protected function setResult($result) {
if ($this->hasResult()) {
throw new Exception(
pht(
'Future has already resolved. Futures may not resolve more than '.
'once.'));
}
$this->hasResult = true;
$this->result = $result;
return $this;
}
- final protected function hasResult() {
+ final public function hasResult() {
return $this->hasResult;
}
+ final private function setException($exception) {
+ // NOTE: The parameter may be an Exception or a Throwable.
+ $this->exception = $exception;
+ return $this;
+ }
+
+ final private function getException() {
+ return $this->exception;
+ }
+
+ final public function hasException() {
+ return ($this->exception !== null);
+ }
+
+
}
diff --git a/src/future/FutureIterator.php b/src/future/FutureIterator.php
index 933fe9d8..c8b01294 100644
--- a/src/future/FutureIterator.php
+++ b/src/future/FutureIterator.php
@@ -1,379 +1,382 @@
<?php
/**
* FutureIterator aggregates @{class:Future}s and allows you to respond to them
* in the order they resolve. This is useful because it minimizes the amount of
* time your program spends waiting on parallel processes.
*
* $futures = array(
* 'a.txt' => new ExecFuture('wc -c a.txt'),
* 'b.txt' => new ExecFuture('wc -c b.txt'),
* 'c.txt' => new ExecFuture('wc -c c.txt'),
* );
*
* foreach (new FutureIterator($futures) as $key => $future) {
* // IMPORTANT: keys are preserved but the order of elements is not. This
* // construct iterates over the futures in the order they resolve, so the
* // fastest future is the one you'll get first. This allows you to start
* // doing followup processing as soon as possible.
*
* list($err, $stdout) = $future->resolve();
* do_some_processing($stdout);
* }
*
* For a general overview of futures, see @{article:Using Futures}.
*
* @task basics Basics
* @task config Configuring Iteration
* @task iterator Iterator Interface
* @task internal Internals
*/
final class FutureIterator extends Phobject implements Iterator {
protected $wait = array();
protected $work = array();
protected $futures = array();
protected $key;
protected $limit;
protected $timeout;
protected $isTimeout = false;
/* -( Basics )------------------------------------------------------------- */
/**
* Create a new iterator over a list of futures.
*
* @param list List of @{class:Future}s to resolve.
* @task basics
*/
public function __construct(array $futures) {
assert_instances_of($futures, 'Future');
$this->futures = $futures;
}
/**
* Block until all futures resolve.
*
* @return void
* @task basics
*/
public function resolveAll() {
iterator_to_array($this);
}
/**
* Add another future to the set of futures. This is useful if you have a
* set of futures to run mostly in parallel, but some futures depend on
* others.
*
* @param Future @{class:Future} to add to iterator
* @task basics
*/
public function addFuture(Future $future, $key = null) {
if ($key === null) {
$this->futures[] = $future;
$this->wait[] = last_key($this->futures);
} else if (!isset($this->futures[$key])) {
$this->futures[$key] = $future;
$this->wait[] = $key;
} else {
throw new Exception(pht('Invalid key %s', $key));
}
// Start running the future if we don't have $this->limit futures running
// already. updateWorkingSet() won't start running the future if there's no
// limit, so we'll manually poke it here in that case.
$this->updateWorkingSet();
if (!$this->limit) {
$future->isReady();
}
return $this;
}
/* -( Configuring Iteration )---------------------------------------------- */
/**
* Set a maximum amount of time you want to wait before the iterator will
* yield a result. If no future has resolved yet, the iterator will yield
* null for key and value. Among other potential uses, you can use this to
* show some busy indicator:
*
* $futures = id(new FutureIterator($futures))
* ->setUpdateInterval(1);
* foreach ($futures as $future) {
* if ($future === null) {
* echo "Still working...\n";
* } else {
* // ...
* }
* }
*
* This will echo "Still working..." once per second as long as futures are
* resolving. By default, FutureIterator never yields null.
*
* @param float Maximum number of seconds to block waiting on futures before
* yielding null.
* @return this
*
* @task config
*/
public function setUpdateInterval($interval) {
$this->timeout = $interval;
return $this;
}
/**
* Limit the number of simultaneously executing futures.
*
* $futures = id(new FutureIterator($futures))
* ->limit(4);
* foreach ($futures as $future) {
* // Run no more than 4 futures simultaneously.
* }
*
* @param int Maximum number of simultaneous jobs allowed.
* @return this
*
* @task config
*/
public function limit($max) {
$this->limit = $max;
return $this;
}
/* -( Iterator Interface )------------------------------------------------- */
/**
* @task iterator
*/
public function rewind() {
$this->wait = array_keys($this->futures);
$this->work = null;
$this->updateWorkingSet();
$this->next();
}
/**
* @task iterator
*/
public function next() {
$this->key = null;
if (!count($this->wait)) {
return;
}
$read_sockets = array();
$write_sockets = array();
$start = microtime(true);
$timeout = $this->timeout;
$this->isTimeout = false;
$check = $this->getWorkingSet();
+
$resolve = null;
do {
$read_sockets = array();
$write_sockets = array();
$can_use_sockets = true;
$wait_time = 1;
foreach ($check as $wait => $key) {
$future = $this->futures[$key];
- try {
- if ($future->getException()) {
+
+ $future->updateFuture();
+
+ if ($future->hasException()) {
+ if ($resolve === null) {
$resolve = $wait;
- continue;
- }
- if ($future->isReady()) {
- if ($resolve === null) {
- $resolve = $wait;
- }
- continue;
}
+ continue;
+ }
- $got_sockets = false;
- $socks = $future->getReadSockets();
- if ($socks) {
- $got_sockets = true;
- foreach ($socks as $socket) {
- $read_sockets[] = $socket;
- }
+ if ($future->hasResult()) {
+ if ($resolve === null) {
+ $resolve = $wait;
}
+ continue;
+ }
- $socks = $future->getWriteSockets();
- if ($socks) {
- $got_sockets = true;
- foreach ($socks as $socket) {
- $write_sockets[] = $socket;
- }
+ $got_sockets = false;
+ $socks = $future->getReadSockets();
+ if ($socks) {
+ $got_sockets = true;
+ foreach ($socks as $socket) {
+ $read_sockets[] = $socket;
}
+ }
- // If any currently active future had neither read nor write sockets,
- // we can't wait for the current batch of items using sockets.
- if (!$got_sockets) {
- $can_use_sockets = false;
- } else {
- $wait_time = min($wait_time, $future->getDefaultWait());
+ $socks = $future->getWriteSockets();
+ if ($socks) {
+ $got_sockets = true;
+ foreach ($socks as $socket) {
+ $write_sockets[] = $socket;
}
- } catch (Exception $ex) {
- $this->futures[$key]->setException($ex);
- $resolve = $wait;
- break;
+ }
+
+ // If any currently active future had neither read nor write sockets,
+ // we can't wait for the current batch of items using sockets.
+ if (!$got_sockets) {
+ $can_use_sockets = false;
+ } else {
+ $wait_time = min($wait_time, $future->getDefaultWait());
}
}
+
if ($resolve === null) {
// Check for a setUpdateInterval() timeout.
if ($timeout !== null) {
$elapsed = microtime(true) - $start;
if ($elapsed > $timeout) {
$this->isTimeout = true;
return;
} else {
$wait_time = $timeout - $elapsed;
}
}
if ($can_use_sockets) {
$this->waitForSockets($read_sockets, $write_sockets, $wait_time);
} else {
usleep(1000);
}
}
} while ($resolve === null);
$this->key = $this->wait[$resolve];
unset($this->wait[$resolve]);
+
$this->updateWorkingSet();
}
/**
* @task iterator
*/
public function current() {
if ($this->isTimeout) {
return null;
}
return $this->futures[$this->key];
}
/**
* @task iterator
*/
public function key() {
if ($this->isTimeout) {
return null;
}
return $this->key;
}
/**
* @task iterator
*/
public function valid() {
if ($this->isTimeout) {
return true;
}
return ($this->key !== null);
}
/* -( Internals )---------------------------------------------------------- */
/**
* @task internal
*/
protected function getWorkingSet() {
if ($this->work === null) {
return $this->wait;
}
return $this->work;
}
/**
* @task internal
*/
protected function updateWorkingSet() {
if (!$this->limit) {
return;
}
$old = $this->work;
$this->work = array_slice($this->wait, 0, $this->limit, true);
// If we're using a limit, our futures are sleeping and need to be polled
// to begin execution, so poll any futures which weren't in our working set
// before.
foreach ($this->work as $work => $key) {
if (!isset($old[$work])) {
$this->futures[$key]->isReady();
}
}
}
/**
* Wait for activity on one of several sockets.
*
* @param list List of sockets expected to become readable.
* @param list List of sockets expected to become writable.
* @param float Timeout, in seconds.
* @return void
*/
private function waitForSockets(
array $read_list,
array $write_list,
$timeout = 1.0) {
static $handler_installed = false;
if (!$handler_installed) {
// If we're spawning child processes, we need to install a signal handler
// here to catch cases like execing '(sleep 60 &) &' where the child
// exits but a socket is kept open. But we don't actually need to do
// anything because the SIGCHLD will interrupt the stream_select(), as
// long as we have a handler registered.
if (function_exists('pcntl_signal')) {
if (!pcntl_signal(SIGCHLD, array(__CLASS__, 'handleSIGCHLD'))) {
throw new Exception(pht('Failed to install signal handler!'));
}
}
$handler_installed = true;
}
$timeout_sec = (int)$timeout;
$timeout_usec = (int)(1000000 * ($timeout - $timeout_sec));
$exceptfds = array();
$ok = @stream_select(
$read_list,
$write_list,
$exceptfds,
$timeout_sec,
$timeout_usec);
if ($ok === false) {
// Hopefully, means we received a SIGCHLD. In the worst case, we degrade
// to a busy wait.
}
}
public static function handleSIGCHLD($signo) {
// This function is a dummy, we just need to have some handler registered
// so that PHP will get interrupted during "stream_select()". If we don't
// register a handler, "stream_select()" won't fail.
}
}
diff --git a/src/future/FutureProxy.php b/src/future/FutureProxy.php
index 4b90df40..65cbffa2 100644
--- a/src/future/FutureProxy.php
+++ b/src/future/FutureProxy.php
@@ -1,64 +1,68 @@
<?php
/**
* Wraps another @{class:Future} and allows you to post-process its result once
* it resolves.
*/
abstract class FutureProxy extends Future {
private $proxied;
public function __construct(Future $proxied = null) {
if ($proxied) {
$this->setProxiedFuture($proxied);
}
}
public function setProxiedFuture(Future $proxied) {
$this->proxied = $proxied;
return $this;
}
protected function getProxiedFuture() {
if (!$this->proxied) {
throw new Exception(pht('The proxied future has not been provided yet.'));
}
return $this->proxied;
}
public function isReady() {
- return $this->getProxiedFuture()->isReady();
- }
+ if ($this->hasResult()) {
+ return true;
+ }
- public function resolve() {
- $result = $this->getProxiedFuture()->resolve();
- $result = $this->didReceiveResult($result);
- $this->setResult($result);
- return $this->getResult();
- }
+ $proxied = $this->getProxiedFuture();
- public function setException(Exception $ex) {
- $this->getProxiedFuture()->setException($ex);
- return $this;
+ $is_ready = $proxied->isReady();
+
+ if ($proxied->hasResult()) {
+ $result = $proxied->getResult();
+ $result = $this->didReceiveResult($result);
+ $this->setResult($result);
+ }
+
+ return $is_ready;
}
- public function getException() {
- return $this->getProxiedFuture()->getException();
+ public function resolve() {
+ $this->getProxiedFuture()->resolve();
+ $this->isReady();
+ return $this->getResult();
}
public function getReadSockets() {
return $this->getProxiedFuture()->getReadSockets();
}
public function getWriteSockets() {
return $this->getProxiedFuture()->getWriteSockets();
}
public function start() {
$this->getProxiedFuture()->start();
return $this;
}
abstract protected function didReceiveResult($result);
}
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Sun, Jan 19, 20:03 (2 w, 5 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1128263
Default Alt Text
(17 KB)
Attached To
Mode
rARC Arcanist
Attached
Detach File
Event Timeline
Log In to Comment