Page Menu
Home
Phorge
Search
Configure Global Search
Log In
Files
F3281448
FutureIterator.php
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Award Token
Flag For Later
Advanced/Developer...
View Handle
View Hovercard
Size
11 KB
Referenced Files
None
Subscribers
None
FutureIterator.php
View Options
<?php
/**
* FutureIterator aggregates @{class:Future}s and allows you to respond to them
* in the order they resolve. This is useful because it minimizes the amount of
* time your program spends waiting on parallel processes.
*
* $futures = array(
* 'a.txt' => new ExecFuture('wc -c a.txt'),
* 'b.txt' => new ExecFuture('wc -c b.txt'),
* 'c.txt' => new ExecFuture('wc -c c.txt'),
* );
*
* foreach (new FutureIterator($futures) as $key => $future) {
* // IMPORTANT: keys are preserved but the order of elements is not. This
* // construct iterates over the futures in the order they resolve, so the
* // fastest future is the one you'll get first. This allows you to start
* // doing followup processing as soon as possible.
*
* list($err, $stdout) = $future->resolve();
* do_some_processing($stdout);
* }
*
* For a general overview of futures, see @{article:Using Futures}.
*
* @task basics Basics
* @task config Configuring Iteration
* @task iterator Iterator Interface
* @task internal Internals
*/
final
class
FutureIterator
extends
Phobject
implements
Iterator
{
private
$hold
=
array
(
)
;
private
$wait
=
array
(
)
;
private
$work
=
array
(
)
;
private
$futures
=
array
(
)
;
private
$key
;
private
$limit
;
private
$timeout
;
private
$isTimeout
=
false
;
private
$hasRewound
=
false
;
/* -( Basics )------------------------------------------------------------- */
/**
* Create a new iterator over a list of futures.
*
* @param list List of @{class:Future}s to resolve.
* @task basics
*/
public
function
__construct
(
array
$futures
)
{
assert_instances_of
(
$futures
,
'Future'
)
;
foreach
(
$futures
as
$map_key
=>
$future
)
{
$future
->
setFutureKey
(
$map_key
)
;
$this
->
addFuture
(
$future
)
;
}
}
/**
* Block until all futures resolve.
*
* @return void
* @task basics
*/
public
function
resolveAll
(
)
{
// If a caller breaks out of a "foreach" and then calls "resolveAll()",
// interpret it to mean that we should iterate over whatever futures
// remain.
if
(
$this
->
hasRewound
)
{
while
(
$this
->
valid
(
)
)
{
$this
->
next
(
)
;
}
}
else
{
iterator_to_array
(
$this
)
;
}
}
/**
* Add another future to the set of futures. This is useful if you have a
* set of futures to run mostly in parallel, but some futures depend on
* others.
*
* @param Future @{class:Future} to add to iterator
* @task basics
*/
public
function
addFuture
(
Future
$future
)
{
$key
=
$future
->
getFutureKey
(
)
;
if
(
isset
(
$this
->
futures
[
$key
]
)
)
{
throw
new
Exception
(
pht
(
'This future graph already has a future with key "%s". Each '
.
'future must have a unique key.'
,
$key
)
)
;
}
$this
->
futures
[
$key
]
=
$future
;
$this
->
hold
[
$key
]
=
$key
;
return
$this
;
}
/* -( Configuring Iteration )---------------------------------------------- */
/**
* Set a maximum amount of time you want to wait before the iterator will
* yield a result. If no future has resolved yet, the iterator will yield
* null for key and value. Among other potential uses, you can use this to
* show some busy indicator:
*
* $futures = id(new FutureIterator($futures))
* ->setUpdateInterval(1);
* foreach ($futures as $future) {
* if ($future === null) {
* echo "Still working...\n";
* } else {
* // ...
* }
* }
*
* This will echo "Still working..." once per second as long as futures are
* resolving. By default, FutureIterator never yields null.
*
* @param float Maximum number of seconds to block waiting on futures before
* yielding null.
* @return this
*
* @task config
*/
public
function
setUpdateInterval
(
$interval
)
{
$this
->
timeout
=
$interval
;
return
$this
;
}
/**
* Limit the number of simultaneously executing futures.
*
* $futures = id(new FutureIterator($futures))
* ->limit(4);
* foreach ($futures as $future) {
* // Run no more than 4 futures simultaneously.
* }
*
* @param int Maximum number of simultaneous jobs allowed.
* @return this
*
* @task config
*/
public
function
limit
(
$max
)
{
$this
->
limit
=
$max
;
return
$this
;
}
public
function
setMaximumWorkingSetSize
(
$limit
)
{
$this
->
limit
=
$limit
;
return
$this
;
}
public
function
getMaximumWorkingSetSize
(
)
{
return
$this
->
limit
;
}
/* -( Iterator Interface )------------------------------------------------- */
/**
* @task iterator
*/
public
function
rewind
(
)
{
if
(
$this
->
hasRewound
)
{
throw
new
Exception
(
pht
(
'Future graphs can not be rewound.'
)
)
;
}
$this
->
hasRewound
=
true
;
$this
->
next
(
)
;
}
/**
* @task iterator
*/
public
function
next
(
)
{
$this
->
key
=
null
;
$this
->
updateWorkingSet
(
)
;
if
(
!
$this
->
work
)
{
return
;
}
$start
=
microtime
(
true
)
;
$timeout
=
$this
->
timeout
;
$this
->
isTimeout
=
false
;
$working_set
=
array_select_keys
(
$this
->
futures
,
$this
->
work
)
;
while
(
true
)
{
// Update every future first. This is a no-op on futures which have
// already resolved or failed, but we want to give futures an
// opportunity to make progress even if we can resolve something.
foreach
(
$working_set
as
$future_key
=>
$future
)
{
$future
->
updateFuture
(
)
;
}
// Check if any future has resolved or failed. If we have any such
// futures, we'll return the first one from the iterator.
$resolve_key
=
null
;
foreach
(
$working_set
as
$future_key
=>
$future
)
{
if
(
$future
->
hasException
(
)
)
{
$resolve_key
=
$future_key
;
break
;
}
if
(
$future
->
hasResult
(
)
)
{
$resolve_key
=
$future_key
;
break
;
}
}
// We've found a future to resolve, so we're done here for now.
if
(
$resolve_key
!==
null
)
{
$this
->
moveFutureToDone
(
$resolve_key
)
;
return
;
}
// We don't have any futures to resolve yet. Check if we're reached
// an update interval.
$wait_time
=
1
;
if
(
$timeout
!==
null
)
{
$elapsed
=
microtime
(
true
)
-
$start
;
if
(
$elapsed
>
$timeout
)
{
$this
->
isTimeout
=
true
;
return
;
}
$wait_time
=
min
(
$wait_time
,
$timeout
-
$elapsed
)
;
}
// We're going to wait. If possible, we'd like to wait with sockets.
// If we can't, we'll just sleep.
$read_sockets
=
array
(
)
;
$write_sockets
=
array
(
)
;
foreach
(
$working_set
as
$future_key
=>
$future
)
{
$sockets
=
$future
->
getReadSockets
(
)
;
foreach
(
$sockets
as
$socket
)
{
$read_sockets
[
]
=
$socket
;
}
$sockets
=
$future
->
getWriteSockets
(
)
;
foreach
(
$sockets
as
$socket
)
{
$write_sockets
[
]
=
$socket
;
}
}
$use_sockets
=
(
$read_sockets
||
$write_sockets
)
;
if
(
$use_sockets
)
{
foreach
(
$working_set
as
$future
)
{
$wait_time
=
min
(
$wait_time
,
$future
->
getDefaultWait
(
)
)
;
}
$this
->
waitForSockets
(
$read_sockets
,
$write_sockets
,
$wait_time
)
;
}
else
{
usleep
(
1000
)
;
}
}
}
/**
* @task iterator
*/
public
function
current
(
)
{
if
(
$this
->
isTimeout
)
{
return
null
;
}
return
$this
->
futures
[
$this
->
key
]
;
}
/**
* @task iterator
*/
public
function
key
(
)
{
if
(
$this
->
isTimeout
)
{
return
null
;
}
return
$this
->
key
;
}
/**
* @task iterator
*/
public
function
valid
(
)
{
if
(
$this
->
isTimeout
)
{
return
true
;
}
return
(
$this
->
key
!==
null
)
;
}
/* -( Internals )---------------------------------------------------------- */
/**
* @task internal
*/
protected
function
updateWorkingSet
(
)
{
$limit
=
$this
->
getMaximumWorkingSetSize
(
)
;
$work_count
=
count
(
$this
->
work
)
;
// If we're already working on the maximum number of futures, we just have
// to wait for something to resolve. There's no benefit to updating the
// queue since we can never make any meaningful progress.
if
(
$limit
)
{
if
(
$work_count
>=
$limit
)
{
return
;
}
}
// If any futures that are currently held are no longer blocked by
// dependencies, move them from "hold" to "wait".
foreach
(
$this
->
hold
as
$future_key
)
{
if
(
!
$this
->
canMoveFutureToWait
(
$future_key
)
)
{
continue
;
}
$this
->
moveFutureToWait
(
$future_key
)
;
}
$wait_count
=
count
(
$this
->
wait
)
;
$hold_count
=
count
(
$this
->
hold
)
;
if
(
!
$work_count
&&
!
$wait_count
&&
$hold_count
)
{
throw
new
Exception
(
pht
(
'Future graph is stalled: some futures are held, but no futures '
.
'are waiting or working. The graph can never resolve.'
)
)
;
}
// Figure out how many futures we can start. If we don't have a limit,
// we can start every waiting future. If we do have a limit, we can only
// start as many futures as we have slots for.
if
(
$limit
)
{
$work_limit
=
min
(
$limit
,
$wait_count
)
;
}
else
{
$work_limit
=
$wait_count
;
}
// If we're ready to start futures, start them now.
if
(
$work_limit
)
{
foreach
(
$this
->
wait
as
$future_key
)
{
$this
->
moveFutureToWork
(
$future_key
)
;
$work_limit
--
;
if
(
!
$work_limit
)
{
return
;
}
}
}
}
private
function
canMoveFutureToWait
(
$future_key
)
{
return
true
;
}
private
function
moveFutureToWait
(
$future_key
)
{
unset
(
$this
->
hold
[
$future_key
]
)
;
$this
->
wait
[
$future_key
]
=
$future_key
;
}
private
function
moveFutureToWork
(
$future_key
)
{
unset
(
$this
->
wait
[
$future_key
]
)
;
$this
->
work
[
$future_key
]
=
$future_key
;
$this
->
futures
[
$future_key
]
->
startFuture
(
)
;
}
private
function
moveFutureToDone
(
$future_key
)
{
$this
->
key
=
$future_key
;
unset
(
$this
->
work
[
$future_key
]
)
;
// Before we return, do another working set update so we start any
// futures that are ready to go as soon as we can.
$this
->
updateWorkingSet
(
)
;
$this
->
futures
[
$future_key
]
->
endFuture
(
)
;
}
/**
* Wait for activity on one of several sockets.
*
* @param list List of sockets expected to become readable.
* @param list List of sockets expected to become writable.
* @param float Timeout, in seconds.
* @return void
*/
private
function
waitForSockets
(
array
$read_list
,
array
$write_list
,
$timeout
=
1.0
)
{
static
$handler_installed
=
false
;
if
(
!
$handler_installed
)
{
// If we're spawning child processes, we need to install a signal handler
// here to catch cases like execing '(sleep 60 &) &' where the child
// exits but a socket is kept open. But we don't actually need to do
// anything because the SIGCHLD will interrupt the stream_select(), as
// long as we have a handler registered.
if
(
function_exists
(
'pcntl_signal'
)
)
{
if
(
!
pcntl_signal
(
SIGCHLD
,
array
(
__CLASS__
,
'handleSIGCHLD'
)
)
)
{
throw
new
Exception
(
pht
(
'Failed to install signal handler!'
)
)
;
}
}
$handler_installed
=
true
;
}
$timeout_sec
=
(int)
$timeout
;
$timeout_usec
=
(int)
(
1000000
*
(
$timeout
-
$timeout_sec
)
)
;
$exceptfds
=
array
(
)
;
$ok
=
@
stream_select
(
$read_list
,
$write_list
,
$exceptfds
,
$timeout_sec
,
$timeout_usec
)
;
if
(
$ok
===
false
)
{
// Hopefully, means we received a SIGCHLD. In the worst case, we degrade
// to a busy wait.
}
}
public
static
function
handleSIGCHLD
(
$signo
)
{
// This function is a dummy, we just need to have some handler registered
// so that PHP will get interrupted during "stream_select()". If we don't
// register a handler, "stream_select()" won't fail.
}
}
File Metadata
Details
Attached
Mime Type
text/x-php
Expires
Sun, Mar 23, 22:26 (1 w, 1 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1136328
Default Alt Text
FutureIterator.php (11 KB)
Attached To
Mode
rARC Arcanist
Attached
Detach File
Event Timeline
Log In to Comment