Page Menu
Home
Phorge
Search
Configure Global Search
Log In
Files
F2896851
PhutilDaemon.php
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Award Token
Flag For Later
Advanced/Developer...
View Handle
View Hovercard
Size
10 KB
Referenced Files
None
Subscribers
None
PhutilDaemon.php
View Options
<?php
/**
* Scaffolding for implementing robust background processing scripts.
*
*
* Autoscaling
* ===========
*
* Autoscaling automatically launches copies of a daemon when it is busy
* (scaling the pool up) and stops them when they're idle (scaling the pool
* down). This is appropriate for daemons which perform highly parallelizable
* work.
*
* To make a daemon support autoscaling, the implementation should look
* something like this:
*
* while (!$this->shouldExit()) {
* if (work_available()) {
* $this->willBeginWork();
* do_work();
* $this->sleep(0);
* } else {
* $this->willBeginIdle();
* $this->sleep(1);
* }
* }
*
* In particular, call @{method:willBeginWork} before becoming busy, and
* @{method:willBeginIdle} when no work is available. If the daemon is launched
* into an autoscale pool, this will cause the pool to automatically scale up
* when busy and down when idle.
*
* See @{class:PhutilHighIntensityIntervalDaemon} for an example of a simple
* autoscaling daemon.
*
* Launching a daemon which does not make these callbacks into an autoscale
* pool will have no effect.
*
* @task overseer Communicating With the Overseer
* @task autoscale Autoscaling Daemon Pools
*/
abstract
class
PhutilDaemon
extends
Phobject
{
const
MESSAGETYPE_STDOUT
=
'stdout'
;
const
MESSAGETYPE_HEARTBEAT
=
'heartbeat'
;
const
MESSAGETYPE_BUSY
=
'busy'
;
const
MESSAGETYPE_IDLE
=
'idle'
;
const
MESSAGETYPE_DOWN
=
'down'
;
const
MESSAGETYPE_HIBERNATE
=
'hibernate'
;
const
WORKSTATE_BUSY
=
'busy'
;
const
WORKSTATE_IDLE
=
'idle'
;
private
$argv
;
private
$traceMode
;
private
$traceMemory
;
private
$verbose
;
private
$notifyReceived
;
private
$inGracefulShutdown
;
private
$workState
=
null
;
private
$idleSince
=
null
;
private
$scaledownDuration
;
final
public
function
setVerbose
(
$verbose
)
{
$this
->
verbose
=
$verbose
;
return
$this
;
}
final
public
function
getVerbose
(
)
{
return
$this
->
verbose
;
}
final
public
function
setScaledownDuration
(
$scaledown_duration
)
{
$this
->
scaledownDuration
=
$scaledown_duration
;
return
$this
;
}
final
public
function
getScaledownDuration
(
)
{
return
$this
->
scaledownDuration
;
}
final
public
function
__construct
(
array
$argv
)
{
$this
->
argv
=
$argv
;
$router
=
PhutilSignalRouter
::
getRouter
(
)
;
$handler_key
=
'daemon.term'
;
if
(
!
$router
->
getHandler
(
$handler_key
)
)
{
$handler
=
new
PhutilCallbackSignalHandler
(
SIGTERM
,
__CLASS__
.
'::onTermSignal'
)
;
$router
->
installHandler
(
$handler_key
,
$handler
)
;
}
pcntl_signal
(
SIGINT
,
array
(
$this
,
'onGracefulSignal'
)
)
;
pcntl_signal
(
SIGUSR2
,
array
(
$this
,
'onNotifySignal'
)
)
;
// Without discard mode, this consumes unbounded amounts of memory. Keep
// memory bounded.
PhutilServiceProfiler
::
getInstance
(
)
->
enableDiscardMode
(
)
;
$this
->
beginStdoutCapture
(
)
;
}
final
public
function
__destruct
(
)
{
$this
->
endStdoutCapture
(
)
;
}
final
public
function
stillWorking
(
)
{
$this
->
emitOverseerMessage
(
self
::
MESSAGETYPE_HEARTBEAT
,
null
)
;
if
(
$this
->
traceMemory
)
{
$daemon
=
get_class
(
$this
)
;
fprintf
(
STDERR
,
"%s %s %s\n"
,
'<RAMS>'
,
$daemon
,
pht
(
'Memory Usage: %s KB'
,
new
PhutilNumber
(
memory_get_usage
(
)
/
1024
,
1
)
)
)
;
}
}
final
public
function
shouldExit
(
)
{
return
$this
->
inGracefulShutdown
;
}
final
protected
function
shouldHibernate
(
$duration
)
{
// Don't hibernate if we don't have very long to sleep.
if
(
$duration
<
30
)
{
return
false
;
}
// Never hibernate if we're part of a pool and could scale down instead.
// We only hibernate the last process to drop the pool size to zero.
if
(
$this
->
getScaledownDuration
(
)
)
{
return
false
;
}
// Don't hibernate for too long.
$duration
=
min
(
$duration
,
phutil_units
(
'3 minutes in seconds'
)
)
;
$this
->
emitOverseerMessage
(
self
::
MESSAGETYPE_HIBERNATE
,
array
(
'duration'
=>
$duration
,
)
)
;
$this
->
log
(
pht
(
'Preparing to hibernate for %s second(s).'
,
new
PhutilNumber
(
$duration
)
)
)
;
return
true
;
}
final
protected
function
sleep
(
$duration
)
{
$this
->
notifyReceived
=
false
;
$this
->
willSleep
(
$duration
)
;
$this
->
stillWorking
(
)
;
$scale_down
=
$this
->
getScaledownDuration
(
)
;
$max_sleep
=
60
;
if
(
$scale_down
)
{
$max_sleep
=
min
(
$max_sleep
,
$scale_down
)
;
}
if
(
$scale_down
)
{
if
(
$this
->
workState
==
self
::
WORKSTATE_IDLE
)
{
$dur
=
$this
->
getIdleDuration
(
)
;
$this
->
log
(
pht
(
'Idle for %s seconds.'
,
$dur
)
)
;
}
}
while
(
$duration
>
0
&&
!
$this
->
notifyReceived
&&
!
$this
->
shouldExit
(
)
)
{
// If this is an autoscaling clone and we've been idle for too long,
// we're going to scale the pool down by exiting and not restarting. The
// DOWN message tells the overseer that we don't want to be restarted.
if
(
$scale_down
)
{
if
(
$this
->
workState
==
self
::
WORKSTATE_IDLE
)
{
if
(
$this
->
idleSince
&&
(
$this
->
idleSince
+
$scale_down
<
time
(
)
)
)
{
$this
->
inGracefulShutdown
=
true
;
$this
->
emitOverseerMessage
(
self
::
MESSAGETYPE_DOWN
,
null
)
;
$this
->
log
(
pht
(
'Daemon was idle for more than %s second(s), '
.
'scaling pool down.'
,
new
PhutilNumber
(
$scale_down
)
)
)
;
break
;
}
}
}
sleep
(
min
(
$duration
,
$max_sleep
)
)
;
$duration
-=
$max_sleep
;
$this
->
stillWorking
(
)
;
}
}
protected
function
willSleep
(
$duration
)
{
return
;
}
public
static
function
onTermSignal
(
$signo
)
{
self
::
didCatchSignal
(
$signo
)
;
}
final
protected
function
getArgv
(
)
{
return
$this
->
argv
;
}
final
public
function
execute
(
)
{
$this
->
willRun
(
)
;
$this
->
run
(
)
;
}
abstract
protected
function
run
(
)
;
final
public
function
setTraceMemory
(
)
{
$this
->
traceMemory
=
true
;
return
$this
;
}
final
public
function
getTraceMemory
(
)
{
return
$this
->
traceMemory
;
}
final
public
function
setTraceMode
(
)
{
$this
->
traceMode
=
true
;
PhutilServiceProfiler
::
installEchoListener
(
)
;
PhutilConsole
::
getConsole
(
)
->
getServer
(
)
->
setEnableLog
(
true
)
;
$this
->
didSetTraceMode
(
)
;
return
$this
;
}
final
public
function
getTraceMode
(
)
{
return
$this
->
traceMode
;
}
final
public
function
onGracefulSignal
(
$signo
)
{
self
::
didCatchSignal
(
$signo
)
;
$this
->
inGracefulShutdown
=
true
;
}
final
public
function
onNotifySignal
(
$signo
)
{
self
::
didCatchSignal
(
$signo
)
;
$this
->
notifyReceived
=
true
;
$this
->
onNotify
(
$signo
)
;
}
protected
function
onNotify
(
$signo
)
{
// This is a hook for subclasses.
}
protected
function
willRun
(
)
{
// This is a hook for subclasses.
}
protected
function
didSetTraceMode
(
)
{
// This is a hook for subclasses.
}
final
protected
function
log
(
$message
)
{
if
(
$this
->
verbose
)
{
$daemon
=
get_class
(
$this
)
;
fprintf
(
STDERR
,
"%s %s %s\n"
,
'<VERB>'
,
$daemon
,
$message
)
;
}
}
private
static
function
didCatchSignal
(
$signo
)
{
$signame
=
phutil_get_signal_name
(
$signo
)
;
fprintf
(
STDERR
,
"%s Caught signal %s (%s).\n"
,
'<SGNL>'
,
$signo
,
$signame
)
;
}
/* -( Communicating With the Overseer )------------------------------------ */
private
function
beginStdoutCapture
(
)
{
ob_start
(
array
(
$this
,
'didReceiveStdout'
)
,
2
)
;
}
private
function
endStdoutCapture
(
)
{
ob_end_flush
(
)
;
}
public
function
didReceiveStdout
(
$data
)
{
if
(
!
strlen
(
$data
)
)
{
return
''
;
}
return
$this
->
encodeOverseerMessage
(
self
::
MESSAGETYPE_STDOUT
,
$data
)
;
}
private
function
encodeOverseerMessage
(
$type
,
$data
)
{
$structure
=
array
(
$type
)
;
if
(
$data
!==
null
)
{
$structure
[
]
=
$data
;
}
return
json_encode
(
$structure
)
.
"\n"
;
}
private
function
emitOverseerMessage
(
$type
,
$data
)
{
$this
->
endStdoutCapture
(
)
;
echo
$this
->
encodeOverseerMessage
(
$type
,
$data
)
;
$this
->
beginStdoutCapture
(
)
;
}
public
static
function
errorListener
(
$event
,
$value
,
array
$metadata
)
{
// If the caller has redirected the error log to a file, PHP won't output
// messages to stderr, so the overseer can't capture them. Install a
// listener which just echoes errors to stderr, so the overseer is always
// aware of errors.
$console
=
PhutilConsole
::
getConsole
(
)
;
$message
=
idx
(
$metadata
,
'default_message'
)
;
if
(
$message
)
{
$console
->
writeErr
(
"%s\n"
,
$message
)
;
}
if
(
idx
(
$metadata
,
'trace'
)
)
{
$trace
=
PhutilErrorHandler
::
formatStacktrace
(
$metadata
[
'trace'
]
)
;
$console
->
writeErr
(
"%s\n"
,
$trace
)
;
}
}
/* -( Autoscaling )-------------------------------------------------------- */
/**
* Prepare to become busy. This may autoscale the pool up.
*
* This notifies the overseer that the daemon has become busy. If daemons
* that are part of an autoscale pool are continuously busy for a prolonged
* period of time, the overseer may scale up the pool.
*
* @return this
* @task autoscale
*/
protected
function
willBeginWork
(
)
{
if
(
$this
->
workState
!=
self
::
WORKSTATE_BUSY
)
{
$this
->
workState
=
self
::
WORKSTATE_BUSY
;
$this
->
idleSince
=
null
;
$this
->
emitOverseerMessage
(
self
::
MESSAGETYPE_BUSY
,
null
)
;
}
return
$this
;
}
/**
* Prepare to idle. This may autoscale the pool down.
*
* This notifies the overseer that the daemon is no longer busy. If daemons
* that are part of an autoscale pool are idle for a prolonged period of
* time, they may exit to scale the pool down.
*
* @return this
* @task autoscale
*/
protected
function
willBeginIdle
(
)
{
if
(
$this
->
workState
!=
self
::
WORKSTATE_IDLE
)
{
$this
->
workState
=
self
::
WORKSTATE_IDLE
;
$this
->
idleSince
=
time
(
)
;
$this
->
emitOverseerMessage
(
self
::
MESSAGETYPE_IDLE
,
null
)
;
}
return
$this
;
}
protected
function
getIdleDuration
(
)
{
if
(
!
$this
->
idleSince
)
{
return
null
;
}
$now
=
time
(
)
;
return
(
$now
-
$this
->
idleSince
)
;
}
}
File Metadata
Details
Attached
Mime Type
text/x-php
Expires
Jan 19 2025, 23:39 (6 w, 6 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1110878
Default Alt Text
PhutilDaemon.php (10 KB)
Attached To
Mode
rP Phorge
Attached
Detach File
Event Timeline
Log In to Comment