Page MenuHomePhorge

No OneTemporary

diff --git a/src/future/Future.php b/src/future/Future.php
index 8ce48eb6..7ac13258 100644
--- a/src/future/Future.php
+++ b/src/future/Future.php
@@ -1,254 +1,253 @@
<?php
/**
* A 'future' or 'promise' is an object which represents the result of some
* pending computation. For a more complete overview of futures, see
* @{article:Using Futures}.
*/
abstract class Future extends Phobject {
private $hasResult = false;
private $hasStarted = false;
private $hasEnded = false;
private $result;
private $exception;
private $futureKey;
private $serviceProfilerCallID;
+ private static $nextKey = 1;
/**
* Is this future's process complete? Specifically, can this future be
* resolved without blocking?
*
* @return bool If true, the external process is complete and resolving this
* future will not block.
*/
abstract public function isReady();
/**
* Resolve a future and return its result, blocking until the result is ready
* if necessary.
*
* @return wild Future result.
*/
public function resolve() {
$args = func_get_args();
if (count($args)) {
throw new Exception(
pht(
'Parameter "timeout" to "Future->resolve()" is no longer '.
'supported. Update the caller so it no longer passes a '.
'timeout.'));
}
if ($this->hasException()) {
throw $this->getException();
}
if (!$this->hasResult()) {
$graph = new FutureIterator(array($this));
$graph->resolveAll();
}
return $this->getResult();
}
final public function startFuture() {
if ($this->hasStarted) {
throw new Exception(
pht(
'Future has already started; futures can not start more '.
'than once.'));
}
$this->hasStarted = true;
$this->startServiceProfiler();
$this->isReady();
}
final public function updateFuture() {
if ($this->hasException()) {
return;
}
if ($this->hasResult()) {
return;
}
try {
$this->isReady();
} catch (Exception $ex) {
$this->setException($ex);
} catch (Throwable $ex) {
$this->setException($ex);
}
}
final public function endFuture() {
if (!$this->hasException() && !$this->hasResult()) {
throw new Exception(
pht(
'Trying to end a future which has no exception and no result. '.
'Futures must resolve before they can be ended.'));
}
if ($this->hasEnded) {
throw new Exception(
pht(
'Future has already ended; futures can not end more '.
'than once.'));
}
$this->hasEnded = true;
$this->endServiceProfiler();
}
private function startServiceProfiler() {
// NOTE: This is a soft dependency so that we don't need to build the
// ServiceProfiler into the Phage agent. Normally, this class is always
// available.
if (!class_exists('PhutilServiceProfiler')) {
return;
}
$params = $this->getServiceProfilerStartParameters();
$profiler = PhutilServiceProfiler::getInstance();
$call_id = $profiler->beginServiceCall($params);
$this->serviceProfilerCallID = $call_id;
}
private function endServiceProfiler() {
$call_id = $this->serviceProfilerCallID;
if ($call_id === null) {
return;
}
$params = $this->getServiceProfilerResultParameters();
$profiler = PhutilServiceProfiler::getInstance();
$profiler->endServiceCall($call_id, $params);
}
protected function getServiceProfilerStartParameters() {
return array();
}
protected function getServiceProfilerResultParameters() {
return array();
}
/**
* Retrieve a list of sockets which we can wait to become readable while
* a future is resolving. If your future has sockets which can be
* `select()`ed, return them here (or in @{method:getWriteSockets}) to make
* the resolve loop do a `select()`. If you do not return sockets in either
* case, you'll get a busy wait.
*
* @return list A list of sockets which we expect to become readable.
*/
public function getReadSockets() {
return array();
}
/**
* Retrieve a list of sockets which we can wait to become writable while a
* future is resolving. See @{method:getReadSockets}.
*
* @return list A list of sockets which we expect to become writable.
*/
public function getWriteSockets() {
return array();
}
/**
* Default amount of time to wait on stream select for this future. Normally
* 1 second is fine, but if the future has a timeout sooner than that it
* should return the amount of time left before the timeout.
*/
public function getDefaultWait() {
return 1;
}
public function start() {
$this->isReady();
return $this;
}
/**
* Retrieve the final result of the future.
*
* @return wild Final resolution of this future.
*/
final protected function getResult() {
if (!$this->hasResult()) {
throw new Exception(
pht(
'Future has not yet resolved. Resolve futures before retrieving '.
'results.'));
}
return $this->result;
}
final protected function setResult($result) {
if ($this->hasResult()) {
throw new Exception(
pht(
'Future has already resolved. Futures may not resolve more than '.
'once.'));
}
$this->hasResult = true;
$this->result = $result;
return $this;
}
final public function hasResult() {
return $this->hasResult;
}
final private function setException($exception) {
// NOTE: The parameter may be an Exception or a Throwable.
$this->exception = $exception;
return $this;
}
final private function getException() {
return $this->exception;
}
final public function hasException() {
return ($this->exception !== null);
}
final public function setFutureKey($key) {
if ($this->futureKey !== null) {
throw new Exception(
pht(
'Future already has a key ("%s") assigned.',
$key));
}
$this->futureKey = $key;
return $this;
}
final public function getFutureKey() {
- static $next_key = 1;
-
if ($this->futureKey === null) {
- $this->futureKey = sprintf('Future/%d', $next_key++);
+ $this->futureKey = sprintf('Future/%d', self::$nextKey++);
}
return $this->futureKey;
}
}
diff --git a/src/future/FutureIterator.php b/src/future/FutureIterator.php
index 772fdc77..e33282c8 100644
--- a/src/future/FutureIterator.php
+++ b/src/future/FutureIterator.php
@@ -1,468 +1,464 @@
<?php
/**
* FutureIterator aggregates @{class:Future}s and allows you to respond to them
* in the order they resolve. This is useful because it minimizes the amount of
* time your program spends waiting on parallel processes.
*
* $futures = array(
* 'a.txt' => new ExecFuture('wc -c a.txt'),
* 'b.txt' => new ExecFuture('wc -c b.txt'),
* 'c.txt' => new ExecFuture('wc -c c.txt'),
* );
*
* foreach (new FutureIterator($futures) as $key => $future) {
* // IMPORTANT: keys are preserved but the order of elements is not. This
* // construct iterates over the futures in the order they resolve, so the
* // fastest future is the one you'll get first. This allows you to start
* // doing followup processing as soon as possible.
*
* list($err, $stdout) = $future->resolve();
* do_some_processing($stdout);
* }
*
* For a general overview of futures, see @{article:Using Futures}.
*
* @task basics Basics
* @task config Configuring Iteration
* @task iterator Iterator Interface
* @task internal Internals
*/
final class FutureIterator
extends Phobject
implements Iterator {
private $hold = array();
private $wait = array();
private $work = array();
private $futures = array();
private $key;
private $limit;
private $timeout;
private $isTimeout = false;
private $hasRewound = false;
/* -( Basics )------------------------------------------------------------- */
/**
* Create a new iterator over a list of futures.
*
* @param list List of @{class:Future}s to resolve.
* @task basics
*/
public function __construct(array $futures) {
assert_instances_of($futures, 'Future');
- $respect_keys = !phutil_is_natural_list($futures);
-
foreach ($futures as $map_key => $future) {
- if ($respect_keys) {
- $future->setFutureKey($map_key);
- }
+ $future->setFutureKey($map_key);
$this->addFuture($future);
}
}
/**
* Block until all futures resolve.
*
* @return void
* @task basics
*/
public function resolveAll() {
// If a caller breaks out of a "foreach" and then calls "resolveAll()",
// interpret it to mean that we should iterate over whatever futures
// remain.
if ($this->hasRewound) {
while ($this->valid()) {
$this->next();
}
} else {
iterator_to_array($this);
}
}
/**
* Add another future to the set of futures. This is useful if you have a
* set of futures to run mostly in parallel, but some futures depend on
* others.
*
* @param Future @{class:Future} to add to iterator
* @task basics
*/
public function addFuture(Future $future) {
$key = $future->getFutureKey();
if (isset($this->futures[$key])) {
throw new Exception(
pht(
'This future graph already has a future with key "%s". Each '.
'future must have a unique key.',
$key));
}
$this->futures[$key] = $future;
$this->hold[$key] = $key;
return $this;
}
/* -( Configuring Iteration )---------------------------------------------- */
/**
* Set a maximum amount of time you want to wait before the iterator will
* yield a result. If no future has resolved yet, the iterator will yield
* null for key and value. Among other potential uses, you can use this to
* show some busy indicator:
*
* $futures = id(new FutureIterator($futures))
* ->setUpdateInterval(1);
* foreach ($futures as $future) {
* if ($future === null) {
* echo "Still working...\n";
* } else {
* // ...
* }
* }
*
* This will echo "Still working..." once per second as long as futures are
* resolving. By default, FutureIterator never yields null.
*
* @param float Maximum number of seconds to block waiting on futures before
* yielding null.
* @return this
*
* @task config
*/
public function setUpdateInterval($interval) {
$this->timeout = $interval;
return $this;
}
/**
* Limit the number of simultaneously executing futures.
*
* $futures = id(new FutureIterator($futures))
* ->limit(4);
* foreach ($futures as $future) {
* // Run no more than 4 futures simultaneously.
* }
*
* @param int Maximum number of simultaneous jobs allowed.
* @return this
*
* @task config
*/
public function limit($max) {
$this->limit = $max;
return $this;
}
public function setMaximumWorkingSetSize($limit) {
$this->limit = $limit;
return $this;
}
public function getMaximumWorkingSetSize() {
return $this->limit;
}
/* -( Iterator Interface )------------------------------------------------- */
/**
* @task iterator
*/
public function rewind() {
if ($this->hasRewound) {
throw new Exception(
pht('Future graphs can not be rewound.'));
}
$this->hasRewound = true;
$this->next();
}
/**
* @task iterator
*/
public function next() {
$this->key = null;
$this->updateWorkingSet();
if (!$this->work) {
return;
}
$start = microtime(true);
$timeout = $this->timeout;
$this->isTimeout = false;
$working_set = array_select_keys($this->futures, $this->work);
while (true) {
// Update every future first. This is a no-op on futures which have
// already resolved or failed, but we want to give futures an
// opportunity to make progress even if we can resolve something.
foreach ($working_set as $future_key => $future) {
$future->updateFuture();
}
// Check if any future has resolved or failed. If we have any such
// futures, we'll return the first one from the iterator.
$resolve_key = null;
foreach ($working_set as $future_key => $future) {
if ($future->hasException()) {
$resolve_key = $future_key;
break;
}
if ($future->hasResult()) {
$resolve_key = $future_key;
break;
}
}
// We've found a future to resolve, so we're done here for now.
if ($resolve_key !== null) {
$this->moveFutureToDone($resolve_key);
return;
}
// We don't have any futures to resolve yet. Check if we're reached
// an update interval.
$wait_time = 1;
if ($timeout !== null) {
$elapsed = microtime(true) - $start;
if ($elapsed > $timeout) {
$this->isTimeout = true;
return;
}
$wait_time = min($wait_time, $timeout - $elapsed);
}
// We're going to wait. If possible, we'd like to wait with sockets.
// If we can't, we'll just sleep.
$read_sockets = array();
$write_sockets = array();
foreach ($working_set as $future_key => $future) {
$sockets = $future->getReadSockets();
foreach ($sockets as $socket) {
$read_sockets[] = $socket;
}
$sockets = $future->getWriteSockets();
foreach ($sockets as $socket) {
$write_sockets[] = $socket;
}
}
$use_sockets = ($read_sockets || $write_sockets);
if ($use_sockets) {
foreach ($working_set as $future) {
$wait_time = min($wait_time, $future->getDefaultWait());
}
$this->waitForSockets($read_sockets, $write_sockets, $wait_time);
} else {
usleep(1000);
}
}
}
/**
* @task iterator
*/
public function current() {
if ($this->isTimeout) {
return null;
}
return $this->futures[$this->key];
}
/**
* @task iterator
*/
public function key() {
if ($this->isTimeout) {
return null;
}
return $this->key;
}
/**
* @task iterator
*/
public function valid() {
if ($this->isTimeout) {
return true;
}
return ($this->key !== null);
}
/* -( Internals )---------------------------------------------------------- */
/**
* @task internal
*/
protected function updateWorkingSet() {
$limit = $this->getMaximumWorkingSetSize();
$work_count = count($this->work);
// If we're already working on the maximum number of futures, we just have
// to wait for something to resolve. There's no benefit to updating the
// queue since we can never make any meaningful progress.
if ($limit) {
if ($work_count >= $limit) {
return;
}
}
// If any futures that are currently held are no longer blocked by
// dependencies, move them from "hold" to "wait".
foreach ($this->hold as $future_key) {
if (!$this->canMoveFutureToWait($future_key)) {
continue;
}
$this->moveFutureToWait($future_key);
}
$wait_count = count($this->wait);
$hold_count = count($this->hold);
if (!$work_count && !$wait_count && $hold_count) {
throw new Exception(
pht(
'Future graph is stalled: some futures are held, but no futures '.
'are waiting or working. The graph can never resolve.'));
}
// Figure out how many futures we can start. If we don't have a limit,
// we can start every waiting future. If we do have a limit, we can only
// start as many futures as we have slots for.
if ($limit) {
$work_limit = min($limit, $wait_count);
} else {
$work_limit = $wait_count;
}
// If we're ready to start futures, start them now.
if ($work_limit) {
foreach ($this->wait as $future_key) {
$this->moveFutureToWork($future_key);
$work_limit--;
if (!$work_limit) {
return;
}
}
}
}
private function canMoveFutureToWait($future_key) {
return true;
}
private function moveFutureToWait($future_key) {
unset($this->hold[$future_key]);
$this->wait[$future_key] = $future_key;
}
private function moveFutureToWork($future_key) {
unset($this->wait[$future_key]);
$this->work[$future_key] = $future_key;
$this->futures[$future_key]->startFuture();
}
private function moveFutureToDone($future_key) {
$this->key = $future_key;
unset($this->work[$future_key]);
// Before we return, do another working set update so we start any
// futures that are ready to go as soon as we can.
$this->updateWorkingSet();
$this->futures[$future_key]->endFuture();
}
/**
* Wait for activity on one of several sockets.
*
* @param list List of sockets expected to become readable.
* @param list List of sockets expected to become writable.
* @param float Timeout, in seconds.
* @return void
*/
private function waitForSockets(
array $read_list,
array $write_list,
$timeout = 1.0) {
static $handler_installed = false;
if (!$handler_installed) {
// If we're spawning child processes, we need to install a signal handler
// here to catch cases like execing '(sleep 60 &) &' where the child
// exits but a socket is kept open. But we don't actually need to do
// anything because the SIGCHLD will interrupt the stream_select(), as
// long as we have a handler registered.
if (function_exists('pcntl_signal')) {
if (!pcntl_signal(SIGCHLD, array(__CLASS__, 'handleSIGCHLD'))) {
throw new Exception(pht('Failed to install signal handler!'));
}
}
$handler_installed = true;
}
$timeout_sec = (int)$timeout;
$timeout_usec = (int)(1000000 * ($timeout - $timeout_sec));
$exceptfds = array();
$ok = @stream_select(
$read_list,
$write_list,
$exceptfds,
$timeout_sec,
$timeout_usec);
if ($ok === false) {
// Hopefully, means we received a SIGCHLD. In the worst case, we degrade
// to a busy wait.
}
}
public static function handleSIGCHLD($signo) {
// This function is a dummy, we just need to have some handler registered
// so that PHP will get interrupted during "stream_select()". If we don't
// register a handler, "stream_select()" won't fail.
}
}

File Metadata

Mime Type
text/x-diff
Expires
Sun, Jan 19, 19:25 (1 d, 20 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1127949
Default Alt Text
(18 KB)

Event Timeline