Page Menu
Home
Phorge
Search
Configure Global Search
Log In
Files
F2894120
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Award Token
Flag For Later
Advanced/Developer...
View Handle
View Hovercard
Size
18 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/src/future/Future.php b/src/future/Future.php
index 8ce48eb6..7ac13258 100644
--- a/src/future/Future.php
+++ b/src/future/Future.php
@@ -1,254 +1,253 @@
<?php
/**
* A 'future' or 'promise' is an object which represents the result of some
* pending computation. For a more complete overview of futures, see
* @{article:Using Futures}.
*/
abstract class Future extends Phobject {
private $hasResult = false;
private $hasStarted = false;
private $hasEnded = false;
private $result;
private $exception;
private $futureKey;
private $serviceProfilerCallID;
+ private static $nextKey = 1;
/**
* Is this future's process complete? Specifically, can this future be
* resolved without blocking?
*
* @return bool If true, the external process is complete and resolving this
* future will not block.
*/
abstract public function isReady();
/**
* Resolve a future and return its result, blocking until the result is ready
* if necessary.
*
* @return wild Future result.
*/
public function resolve() {
$args = func_get_args();
if (count($args)) {
throw new Exception(
pht(
'Parameter "timeout" to "Future->resolve()" is no longer '.
'supported. Update the caller so it no longer passes a '.
'timeout.'));
}
if ($this->hasException()) {
throw $this->getException();
}
if (!$this->hasResult()) {
$graph = new FutureIterator(array($this));
$graph->resolveAll();
}
return $this->getResult();
}
final public function startFuture() {
if ($this->hasStarted) {
throw new Exception(
pht(
'Future has already started; futures can not start more '.
'than once.'));
}
$this->hasStarted = true;
$this->startServiceProfiler();
$this->isReady();
}
final public function updateFuture() {
if ($this->hasException()) {
return;
}
if ($this->hasResult()) {
return;
}
try {
$this->isReady();
} catch (Exception $ex) {
$this->setException($ex);
} catch (Throwable $ex) {
$this->setException($ex);
}
}
final public function endFuture() {
if (!$this->hasException() && !$this->hasResult()) {
throw new Exception(
pht(
'Trying to end a future which has no exception and no result. '.
'Futures must resolve before they can be ended.'));
}
if ($this->hasEnded) {
throw new Exception(
pht(
'Future has already ended; futures can not end more '.
'than once.'));
}
$this->hasEnded = true;
$this->endServiceProfiler();
}
private function startServiceProfiler() {
// NOTE: This is a soft dependency so that we don't need to build the
// ServiceProfiler into the Phage agent. Normally, this class is always
// available.
if (!class_exists('PhutilServiceProfiler')) {
return;
}
$params = $this->getServiceProfilerStartParameters();
$profiler = PhutilServiceProfiler::getInstance();
$call_id = $profiler->beginServiceCall($params);
$this->serviceProfilerCallID = $call_id;
}
private function endServiceProfiler() {
$call_id = $this->serviceProfilerCallID;
if ($call_id === null) {
return;
}
$params = $this->getServiceProfilerResultParameters();
$profiler = PhutilServiceProfiler::getInstance();
$profiler->endServiceCall($call_id, $params);
}
protected function getServiceProfilerStartParameters() {
return array();
}
protected function getServiceProfilerResultParameters() {
return array();
}
/**
* Retrieve a list of sockets which we can wait to become readable while
* a future is resolving. If your future has sockets which can be
* `select()`ed, return them here (or in @{method:getWriteSockets}) to make
* the resolve loop do a `select()`. If you do not return sockets in either
* case, you'll get a busy wait.
*
* @return list A list of sockets which we expect to become readable.
*/
public function getReadSockets() {
return array();
}
/**
* Retrieve a list of sockets which we can wait to become writable while a
* future is resolving. See @{method:getReadSockets}.
*
* @return list A list of sockets which we expect to become writable.
*/
public function getWriteSockets() {
return array();
}
/**
* Default amount of time to wait on stream select for this future. Normally
* 1 second is fine, but if the future has a timeout sooner than that it
* should return the amount of time left before the timeout.
*/
public function getDefaultWait() {
return 1;
}
public function start() {
$this->isReady();
return $this;
}
/**
* Retrieve the final result of the future.
*
* @return wild Final resolution of this future.
*/
final protected function getResult() {
if (!$this->hasResult()) {
throw new Exception(
pht(
'Future has not yet resolved. Resolve futures before retrieving '.
'results.'));
}
return $this->result;
}
final protected function setResult($result) {
if ($this->hasResult()) {
throw new Exception(
pht(
'Future has already resolved. Futures may not resolve more than '.
'once.'));
}
$this->hasResult = true;
$this->result = $result;
return $this;
}
final public function hasResult() {
return $this->hasResult;
}
final private function setException($exception) {
// NOTE: The parameter may be an Exception or a Throwable.
$this->exception = $exception;
return $this;
}
final private function getException() {
return $this->exception;
}
final public function hasException() {
return ($this->exception !== null);
}
final public function setFutureKey($key) {
if ($this->futureKey !== null) {
throw new Exception(
pht(
'Future already has a key ("%s") assigned.',
$key));
}
$this->futureKey = $key;
return $this;
}
final public function getFutureKey() {
- static $next_key = 1;
-
if ($this->futureKey === null) {
- $this->futureKey = sprintf('Future/%d', $next_key++);
+ $this->futureKey = sprintf('Future/%d', self::$nextKey++);
}
return $this->futureKey;
}
}
diff --git a/src/future/FutureIterator.php b/src/future/FutureIterator.php
index 772fdc77..e33282c8 100644
--- a/src/future/FutureIterator.php
+++ b/src/future/FutureIterator.php
@@ -1,468 +1,464 @@
<?php
/**
* FutureIterator aggregates @{class:Future}s and allows you to respond to them
* in the order they resolve. This is useful because it minimizes the amount of
* time your program spends waiting on parallel processes.
*
* $futures = array(
* 'a.txt' => new ExecFuture('wc -c a.txt'),
* 'b.txt' => new ExecFuture('wc -c b.txt'),
* 'c.txt' => new ExecFuture('wc -c c.txt'),
* );
*
* foreach (new FutureIterator($futures) as $key => $future) {
* // IMPORTANT: keys are preserved but the order of elements is not. This
* // construct iterates over the futures in the order they resolve, so the
* // fastest future is the one you'll get first. This allows you to start
* // doing followup processing as soon as possible.
*
* list($err, $stdout) = $future->resolve();
* do_some_processing($stdout);
* }
*
* For a general overview of futures, see @{article:Using Futures}.
*
* @task basics Basics
* @task config Configuring Iteration
* @task iterator Iterator Interface
* @task internal Internals
*/
final class FutureIterator
extends Phobject
implements Iterator {
private $hold = array();
private $wait = array();
private $work = array();
private $futures = array();
private $key;
private $limit;
private $timeout;
private $isTimeout = false;
private $hasRewound = false;
/* -( Basics )------------------------------------------------------------- */
/**
* Create a new iterator over a list of futures.
*
* @param list List of @{class:Future}s to resolve.
* @task basics
*/
public function __construct(array $futures) {
assert_instances_of($futures, 'Future');
- $respect_keys = !phutil_is_natural_list($futures);
-
foreach ($futures as $map_key => $future) {
- if ($respect_keys) {
- $future->setFutureKey($map_key);
- }
+ $future->setFutureKey($map_key);
$this->addFuture($future);
}
}
/**
* Block until all futures resolve.
*
* @return void
* @task basics
*/
public function resolveAll() {
// If a caller breaks out of a "foreach" and then calls "resolveAll()",
// interpret it to mean that we should iterate over whatever futures
// remain.
if ($this->hasRewound) {
while ($this->valid()) {
$this->next();
}
} else {
iterator_to_array($this);
}
}
/**
* Add another future to the set of futures. This is useful if you have a
* set of futures to run mostly in parallel, but some futures depend on
* others.
*
* @param Future @{class:Future} to add to iterator
* @task basics
*/
public function addFuture(Future $future) {
$key = $future->getFutureKey();
if (isset($this->futures[$key])) {
throw new Exception(
pht(
'This future graph already has a future with key "%s". Each '.
'future must have a unique key.',
$key));
}
$this->futures[$key] = $future;
$this->hold[$key] = $key;
return $this;
}
/* -( Configuring Iteration )---------------------------------------------- */
/**
* Set a maximum amount of time you want to wait before the iterator will
* yield a result. If no future has resolved yet, the iterator will yield
* null for key and value. Among other potential uses, you can use this to
* show some busy indicator:
*
* $futures = id(new FutureIterator($futures))
* ->setUpdateInterval(1);
* foreach ($futures as $future) {
* if ($future === null) {
* echo "Still working...\n";
* } else {
* // ...
* }
* }
*
* This will echo "Still working..." once per second as long as futures are
* resolving. By default, FutureIterator never yields null.
*
* @param float Maximum number of seconds to block waiting on futures before
* yielding null.
* @return this
*
* @task config
*/
public function setUpdateInterval($interval) {
$this->timeout = $interval;
return $this;
}
/**
* Limit the number of simultaneously executing futures.
*
* $futures = id(new FutureIterator($futures))
* ->limit(4);
* foreach ($futures as $future) {
* // Run no more than 4 futures simultaneously.
* }
*
* @param int Maximum number of simultaneous jobs allowed.
* @return this
*
* @task config
*/
public function limit($max) {
$this->limit = $max;
return $this;
}
public function setMaximumWorkingSetSize($limit) {
$this->limit = $limit;
return $this;
}
public function getMaximumWorkingSetSize() {
return $this->limit;
}
/* -( Iterator Interface )------------------------------------------------- */
/**
* @task iterator
*/
public function rewind() {
if ($this->hasRewound) {
throw new Exception(
pht('Future graphs can not be rewound.'));
}
$this->hasRewound = true;
$this->next();
}
/**
* @task iterator
*/
public function next() {
$this->key = null;
$this->updateWorkingSet();
if (!$this->work) {
return;
}
$start = microtime(true);
$timeout = $this->timeout;
$this->isTimeout = false;
$working_set = array_select_keys($this->futures, $this->work);
while (true) {
// Update every future first. This is a no-op on futures which have
// already resolved or failed, but we want to give futures an
// opportunity to make progress even if we can resolve something.
foreach ($working_set as $future_key => $future) {
$future->updateFuture();
}
// Check if any future has resolved or failed. If we have any such
// futures, we'll return the first one from the iterator.
$resolve_key = null;
foreach ($working_set as $future_key => $future) {
if ($future->hasException()) {
$resolve_key = $future_key;
break;
}
if ($future->hasResult()) {
$resolve_key = $future_key;
break;
}
}
// We've found a future to resolve, so we're done here for now.
if ($resolve_key !== null) {
$this->moveFutureToDone($resolve_key);
return;
}
// We don't have any futures to resolve yet. Check if we're reached
// an update interval.
$wait_time = 1;
if ($timeout !== null) {
$elapsed = microtime(true) - $start;
if ($elapsed > $timeout) {
$this->isTimeout = true;
return;
}
$wait_time = min($wait_time, $timeout - $elapsed);
}
// We're going to wait. If possible, we'd like to wait with sockets.
// If we can't, we'll just sleep.
$read_sockets = array();
$write_sockets = array();
foreach ($working_set as $future_key => $future) {
$sockets = $future->getReadSockets();
foreach ($sockets as $socket) {
$read_sockets[] = $socket;
}
$sockets = $future->getWriteSockets();
foreach ($sockets as $socket) {
$write_sockets[] = $socket;
}
}
$use_sockets = ($read_sockets || $write_sockets);
if ($use_sockets) {
foreach ($working_set as $future) {
$wait_time = min($wait_time, $future->getDefaultWait());
}
$this->waitForSockets($read_sockets, $write_sockets, $wait_time);
} else {
usleep(1000);
}
}
}
/**
* @task iterator
*/
public function current() {
if ($this->isTimeout) {
return null;
}
return $this->futures[$this->key];
}
/**
* @task iterator
*/
public function key() {
if ($this->isTimeout) {
return null;
}
return $this->key;
}
/**
* @task iterator
*/
public function valid() {
if ($this->isTimeout) {
return true;
}
return ($this->key !== null);
}
/* -( Internals )---------------------------------------------------------- */
/**
* @task internal
*/
protected function updateWorkingSet() {
$limit = $this->getMaximumWorkingSetSize();
$work_count = count($this->work);
// If we're already working on the maximum number of futures, we just have
// to wait for something to resolve. There's no benefit to updating the
// queue since we can never make any meaningful progress.
if ($limit) {
if ($work_count >= $limit) {
return;
}
}
// If any futures that are currently held are no longer blocked by
// dependencies, move them from "hold" to "wait".
foreach ($this->hold as $future_key) {
if (!$this->canMoveFutureToWait($future_key)) {
continue;
}
$this->moveFutureToWait($future_key);
}
$wait_count = count($this->wait);
$hold_count = count($this->hold);
if (!$work_count && !$wait_count && $hold_count) {
throw new Exception(
pht(
'Future graph is stalled: some futures are held, but no futures '.
'are waiting or working. The graph can never resolve.'));
}
// Figure out how many futures we can start. If we don't have a limit,
// we can start every waiting future. If we do have a limit, we can only
// start as many futures as we have slots for.
if ($limit) {
$work_limit = min($limit, $wait_count);
} else {
$work_limit = $wait_count;
}
// If we're ready to start futures, start them now.
if ($work_limit) {
foreach ($this->wait as $future_key) {
$this->moveFutureToWork($future_key);
$work_limit--;
if (!$work_limit) {
return;
}
}
}
}
private function canMoveFutureToWait($future_key) {
return true;
}
private function moveFutureToWait($future_key) {
unset($this->hold[$future_key]);
$this->wait[$future_key] = $future_key;
}
private function moveFutureToWork($future_key) {
unset($this->wait[$future_key]);
$this->work[$future_key] = $future_key;
$this->futures[$future_key]->startFuture();
}
private function moveFutureToDone($future_key) {
$this->key = $future_key;
unset($this->work[$future_key]);
// Before we return, do another working set update so we start any
// futures that are ready to go as soon as we can.
$this->updateWorkingSet();
$this->futures[$future_key]->endFuture();
}
/**
* Wait for activity on one of several sockets.
*
* @param list List of sockets expected to become readable.
* @param list List of sockets expected to become writable.
* @param float Timeout, in seconds.
* @return void
*/
private function waitForSockets(
array $read_list,
array $write_list,
$timeout = 1.0) {
static $handler_installed = false;
if (!$handler_installed) {
// If we're spawning child processes, we need to install a signal handler
// here to catch cases like execing '(sleep 60 &) &' where the child
// exits but a socket is kept open. But we don't actually need to do
// anything because the SIGCHLD will interrupt the stream_select(), as
// long as we have a handler registered.
if (function_exists('pcntl_signal')) {
if (!pcntl_signal(SIGCHLD, array(__CLASS__, 'handleSIGCHLD'))) {
throw new Exception(pht('Failed to install signal handler!'));
}
}
$handler_installed = true;
}
$timeout_sec = (int)$timeout;
$timeout_usec = (int)(1000000 * ($timeout - $timeout_sec));
$exceptfds = array();
$ok = @stream_select(
$read_list,
$write_list,
$exceptfds,
$timeout_sec,
$timeout_usec);
if ($ok === false) {
// Hopefully, means we received a SIGCHLD. In the worst case, we degrade
// to a busy wait.
}
}
public static function handleSIGCHLD($signo) {
// This function is a dummy, we just need to have some handler registered
// so that PHP will get interrupted during "stream_select()". If we don't
// register a handler, "stream_select()" won't fail.
}
}
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Sun, Jan 19, 19:25 (1 d, 18 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1127949
Default Alt Text
(18 KB)
Attached To
Mode
rARC Arcanist
Attached
Detach File
Event Timeline
Log In to Comment