Page Menu
Home
Phorge
Search
Configure Global Search
Log In
Files
F2892248
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Award Token
Flag For Later
Advanced/Developer...
View Handle
View Hovercard
Size
12 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/src/future/FutureIterator.php b/src/future/FutureIterator.php
index 028c02da..96a41e3d 100644
--- a/src/future/FutureIterator.php
+++ b/src/future/FutureIterator.php
@@ -1,463 +1,470 @@
<?php
/**
* FutureIterator aggregates @{class:Future}s and allows you to respond to them
* in the order they resolve. This is useful because it minimizes the amount of
* time your program spends waiting on parallel processes.
*
* $futures = array(
* 'a.txt' => new ExecFuture('wc -c a.txt'),
* 'b.txt' => new ExecFuture('wc -c b.txt'),
* 'c.txt' => new ExecFuture('wc -c c.txt'),
* );
*
* foreach (new FutureIterator($futures) as $key => $future) {
* // IMPORTANT: keys are preserved but the order of elements is not. This
* // construct iterates over the futures in the order they resolve, so the
* // fastest future is the one you'll get first. This allows you to start
* // doing followup processing as soon as possible.
*
* list($err, $stdout) = $future->resolve();
* do_some_processing($stdout);
* }
*
* For a general overview of futures, see @{article:Using Futures}.
*
* @task basics Basics
* @task config Configuring Iteration
* @task iterator Iterator Interface
* @task internal Internals
*/
final class FutureIterator
extends Phobject
implements Iterator {
private $hold = array();
private $wait = array();
private $work = array();
private $futures = array();
private $key;
private $limit;
private $timeout;
private $isTimeout = false;
private $hasRewound = false;
/* -( Basics )------------------------------------------------------------- */
/**
* Create a new iterator over a list of futures.
*
* @param list List of @{class:Future}s to resolve.
* @task basics
*/
public function __construct(array $futures) {
assert_instances_of($futures, 'Future');
foreach ($futures as $map_key => $future) {
$future->setFutureKey($map_key);
$this->addFuture($future);
}
}
/**
* Block until all futures resolve.
*
* @return void
* @task basics
*/
public function resolveAll() {
// If a caller breaks out of a "foreach" and then calls "resolveAll()",
// interpret it to mean that we should iterate over whatever futures
// remain.
if ($this->hasRewound) {
while ($this->valid()) {
$this->next();
}
} else {
iterator_to_array($this);
}
}
/**
* Add another future to the set of futures. This is useful if you have a
* set of futures to run mostly in parallel, but some futures depend on
* others.
*
* @param Future @{class:Future} to add to iterator
* @task basics
*/
public function addFuture(Future $future) {
$key = $future->getFutureKey();
if (isset($this->futures[$key])) {
throw new Exception(
pht(
'This future graph already has a future with key "%s". Each '.
'future must have a unique key.',
$key));
}
$this->futures[$key] = $future;
$this->hold[$key] = $key;
return $this;
}
/* -( Configuring Iteration )---------------------------------------------- */
/**
* Set a maximum amount of time you want to wait before the iterator will
* yield a result. If no future has resolved yet, the iterator will yield
* null for key and value. Among other potential uses, you can use this to
* show some busy indicator:
*
* $futures = id(new FutureIterator($futures))
* ->setUpdateInterval(1);
* foreach ($futures as $future) {
* if ($future === null) {
* echo "Still working...\n";
* } else {
* // ...
* }
* }
*
* This will echo "Still working..." once per second as long as futures are
* resolving. By default, FutureIterator never yields null.
*
* @param float Maximum number of seconds to block waiting on futures before
* yielding null.
* @return this
*
* @task config
*/
public function setUpdateInterval($interval) {
$this->timeout = $interval;
return $this;
}
/**
* Limit the number of simultaneously executing futures.
*
* $futures = id(new FutureIterator($futures))
* ->limit(4);
* foreach ($futures as $future) {
* // Run no more than 4 futures simultaneously.
* }
*
* @param int Maximum number of simultaneous jobs allowed.
* @return this
*
* @task config
*/
public function limit($max) {
$this->limit = $max;
return $this;
}
public function setMaximumWorkingSetSize($limit) {
$this->limit = $limit;
return $this;
}
public function getMaximumWorkingSetSize() {
return $this->limit;
}
/* -( Iterator Interface )------------------------------------------------- */
/**
* @task iterator
*/
public function rewind() {
if ($this->hasRewound) {
throw new Exception(
pht('Future graphs can not be rewound.'));
}
$this->hasRewound = true;
$this->next();
}
/**
* @task iterator
*/
public function next() {
- $this->key = null;
+ // See T13572. If we preivously resolved and returned a Future, release
+ // it now. This prevents us from holding Futures indefinitely when callers
+ // like FuturePool build long-lived iterators and keep adding new Futures
+ // to them.
+ if ($this->key !== null) {
+ unset($this->futures[$this->key]);
+ $this->key = null;
+ }
$this->updateWorkingSet();
if (!$this->work) {
return;
}
$start = microtime(true);
$timeout = $this->timeout;
$this->isTimeout = false;
$working_set = array_select_keys($this->futures, $this->work);
while (true) {
// Update every future first. This is a no-op on futures which have
// already resolved or failed, but we want to give futures an
// opportunity to make progress even if we can resolve something.
foreach ($working_set as $future_key => $future) {
$future->updateFuture();
}
// Check if any future has resolved or failed. If we have any such
// futures, we'll return the first one from the iterator.
$resolve_key = null;
foreach ($working_set as $future_key => $future) {
if ($future->canResolve()) {
$resolve_key = $future_key;
break;
}
}
// We've found a future to resolve, so we're done here for now.
if ($resolve_key !== null) {
$this->moveFutureToDone($resolve_key);
return;
}
// We don't have any futures to resolve yet. Check if we're reached
// an update interval.
$wait_time = 1;
if ($timeout !== null) {
$elapsed = microtime(true) - $start;
if ($elapsed > $timeout) {
$this->isTimeout = true;
return;
}
$wait_time = min($wait_time, $timeout - $elapsed);
}
// We're going to wait. If possible, we'd like to wait with sockets.
// If we can't, we'll just sleep.
$read_sockets = array();
$write_sockets = array();
foreach ($working_set as $future_key => $future) {
$sockets = $future->getReadSockets();
foreach ($sockets as $socket) {
$read_sockets[] = $socket;
}
$sockets = $future->getWriteSockets();
foreach ($sockets as $socket) {
$write_sockets[] = $socket;
}
}
$use_sockets = ($read_sockets || $write_sockets);
if ($use_sockets) {
foreach ($working_set as $future) {
$wait_time = min($wait_time, $future->getDefaultWait());
}
$this->waitForSockets($read_sockets, $write_sockets, $wait_time);
} else {
usleep(1000);
}
}
}
/**
* @task iterator
*/
public function current() {
if ($this->isTimeout) {
return null;
}
return $this->futures[$this->key];
}
/**
* @task iterator
*/
public function key() {
if ($this->isTimeout) {
return null;
}
return $this->key;
}
/**
* @task iterator
*/
public function valid() {
if ($this->isTimeout) {
return true;
}
return ($this->key !== null);
}
/* -( Internals )---------------------------------------------------------- */
/**
* @task internal
*/
protected function updateWorkingSet() {
$limit = $this->getMaximumWorkingSetSize();
$work_count = count($this->work);
// If we're already working on the maximum number of futures, we just have
// to wait for something to resolve. There's no benefit to updating the
// queue since we can never make any meaningful progress.
if ($limit) {
if ($work_count >= $limit) {
return;
}
}
// If any futures that are currently held are no longer blocked by
// dependencies, move them from "hold" to "wait".
foreach ($this->hold as $future_key) {
if (!$this->canMoveFutureToWait($future_key)) {
continue;
}
$this->moveFutureToWait($future_key);
}
$wait_count = count($this->wait);
$hold_count = count($this->hold);
if (!$work_count && !$wait_count && $hold_count) {
throw new Exception(
pht(
'Future graph is stalled: some futures are held, but no futures '.
'are waiting or working. The graph can never resolve.'));
}
// Figure out how many futures we can start. If we don't have a limit,
// we can start every waiting future. If we do have a limit, we can only
// start as many futures as we have slots for.
if ($limit) {
$work_limit = min($limit, $wait_count);
} else {
$work_limit = $wait_count;
}
// If we're ready to start futures, start them now.
if ($work_limit) {
foreach ($this->wait as $future_key) {
$this->moveFutureToWork($future_key);
$work_limit--;
if (!$work_limit) {
return;
}
}
}
}
private function canMoveFutureToWait($future_key) {
return true;
}
private function moveFutureToWait($future_key) {
unset($this->hold[$future_key]);
$this->wait[$future_key] = $future_key;
}
private function moveFutureToWork($future_key) {
unset($this->wait[$future_key]);
$this->work[$future_key] = $future_key;
$future = $this->futures[$future_key];
if (!$future->getHasFutureStarted()) {
$future
->setRaiseExceptionOnStart(false)
->start();
}
}
private function moveFutureToDone($future_key) {
$this->key = $future_key;
unset($this->work[$future_key]);
// Before we return, do another working set update so we start any
// futures that are ready to go as soon as we can.
$this->updateWorkingSet();
}
/**
* Wait for activity on one of several sockets.
*
* @param list List of sockets expected to become readable.
* @param list List of sockets expected to become writable.
* @param float Timeout, in seconds.
* @return void
*/
private function waitForSockets(
array $read_list,
array $write_list,
$timeout = 1.0) {
static $handler_installed = false;
if (!$handler_installed) {
// If we're spawning child processes, we need to install a signal handler
// here to catch cases like execing '(sleep 60 &) &' where the child
// exits but a socket is kept open. But we don't actually need to do
// anything because the SIGCHLD will interrupt the stream_select(), as
// long as we have a handler registered.
if (function_exists('pcntl_signal')) {
if (!pcntl_signal(SIGCHLD, array(__CLASS__, 'handleSIGCHLD'))) {
throw new Exception(pht('Failed to install signal handler!'));
}
}
$handler_installed = true;
}
$timeout_sec = (int)$timeout;
$timeout_usec = (int)(1000000 * ($timeout - $timeout_sec));
$exceptfds = array();
$ok = @stream_select(
$read_list,
$write_list,
$exceptfds,
$timeout_sec,
$timeout_usec);
if ($ok === false) {
// Hopefully, means we received a SIGCHLD. In the worst case, we degrade
// to a busy wait.
}
}
public static function handleSIGCHLD($signo) {
// This function is a dummy, we just need to have some handler registered
// so that PHP will get interrupted during "stream_select()". If we don't
// register a handler, "stream_select()" won't fail.
}
}
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Sun, Jan 19, 16:29 (2 w, 6 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1126489
Default Alt Text
(12 KB)
Attached To
Mode
rARC Arcanist
Attached
Detach File
Event Timeline
Log In to Comment