Page Menu
Home
Phorge
Search
Configure Global Search
Log In
Files
F2892963
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Award Token
Flag For Later
Advanced/Developer...
View Handle
View Hovercard
Size
16 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/src/applications/aphlict/management/PhabricatorAphlictManagementWorkflow.php b/src/applications/aphlict/management/PhabricatorAphlictManagementWorkflow.php
index cc21d68d6f..59ba40b8a4 100644
--- a/src/applications/aphlict/management/PhabricatorAphlictManagementWorkflow.php
+++ b/src/applications/aphlict/management/PhabricatorAphlictManagementWorkflow.php
@@ -1,275 +1,273 @@
<?php
abstract class PhabricatorAphlictManagementWorkflow
extends PhabricatorManagementWorkflow {
private $debug = false;
private $clientHost;
public function didConstruct() {
$this
->setArguments(
array(
array(
'name' => 'client-host',
'param' => 'hostname',
'help' => pht('Hostname to bind to for the client server.'),
),
));
}
public function execute(PhutilArgumentParser $args) {
$this->clientHost = $args->getArg('client-host');
return 0;
}
final public function getPIDPath() {
return PhabricatorEnv::getEnvConfig('notification.pidfile');
}
final public function getPID() {
$pid = null;
if (Filesystem::pathExists($this->getPIDPath())) {
$pid = (int)Filesystem::readFile($this->getPIDPath());
}
return $pid;
}
final public function cleanup($signo = '?') {
global $g_future;
if ($g_future) {
$g_future->resolveKill();
$g_future = null;
}
Filesystem::remove($this->getPIDPath());
exit(1);
}
protected final function setDebug($debug) {
$this->debug = $debug;
}
public static function requireExtensions() {
self::mustHaveExtension('pcntl');
self::mustHaveExtension('posix');
}
private static function mustHaveExtension($ext) {
if (!extension_loaded($ext)) {
echo "ERROR: The PHP extension '{$ext}' is not installed. You must ".
"install it to run aphlict on this machine.\n";
exit(1);
}
$extension = new ReflectionExtension($ext);
foreach ($extension->getFunctions() as $function) {
$function = $function->name;
if (!function_exists($function)) {
echo "ERROR: The PHP function {$function}() is disabled. You must ".
"enable it to run aphlict on this machine.\n";
exit(1);
}
}
}
final protected function willLaunch() {
$console = PhutilConsole::getConsole();
$pid = $this->getPID();
if ($pid) {
throw new PhutilArgumentUsageException(
pht(
'Unable to start notifications server because it is already '.
'running. Use `aphlict restart` to restart it.'));
}
if (posix_getuid() == 0) {
throw new PhutilArgumentUsageException(
pht(
// TODO: Update this message after a while.
'The notification server should not be run as root. It no '.
'longer requires access to privileged ports.'));
}
// Make sure we can write to the PID file.
if (!$this->debug) {
Filesystem::writeFile($this->getPIDPath(), '');
}
// First, start the server in configuration test mode with --test. This
// will let us error explicitly if there are missing modules, before we
// fork and lose access to the console.
$test_argv = $this->getServerArgv();
$test_argv[] = '--test=true';
execx(
'%s %s %Ls',
$this->getNodeBinary(),
$this->getAphlictScriptPath(),
$test_argv);
}
private function getServerArgv() {
$ssl_key = PhabricatorEnv::getEnvConfig('notification.ssl-key');
$ssl_cert = PhabricatorEnv::getEnvConfig('notification.ssl-cert');
$server_uri = PhabricatorEnv::getEnvConfig('notification.server-uri');
$server_uri = new PhutilURI($server_uri);
$client_uri = PhabricatorEnv::getEnvConfig('notification.client-uri');
$client_uri = new PhutilURI($client_uri);
$log = PhabricatorEnv::getEnvConfig('notification.log');
$server_argv = array();
$server_argv[] = '--client-port='.$client_uri->getPort();
$server_argv[] = '--admin-port='.$server_uri->getPort();
$server_argv[] = '--admin-host='.$server_uri->getDomain();
if ($ssl_key) {
$server_argv[] = '--ssl-key='.$ssl_key;
}
if ($ssl_cert) {
$server_argv[] = '--ssl-cert='.$ssl_cert;
}
- if (!$this->debug) {
- $server_argv[] = '--log='.$log;
- }
+ $server_argv[] = '--log='.$log;
if ($this->clientHost) {
$server_argv[] = '--client-host='.$this->clientHost;
}
return $server_argv;
}
private function getAphlictScriptPath() {
$root = dirname(phutil_get_library_root('phabricator'));
return $root.'/support/aphlict/server/aphlict_server.js';
}
final protected function launch() {
$console = PhutilConsole::getConsole();
if ($this->debug) {
$console->writeOut(pht("Starting Aphlict server in foreground...\n"));
} else {
Filesystem::writeFile($this->getPIDPath(), getmypid());
}
$command = csprintf(
'%s %s %Ls',
$this->getNodeBinary(),
$this->getAphlictScriptPath(),
$this->getServerArgv());
if (!$this->debug) {
declare(ticks = 1);
pcntl_signal(SIGINT, array($this, 'cleanup'));
pcntl_signal(SIGTERM, array($this, 'cleanup'));
}
register_shutdown_function(array($this, 'cleanup'));
if ($this->debug) {
$console->writeOut("Launching server:\n\n $ ".$command."\n\n");
$err = phutil_passthru('%C', $command);
$console->writeOut(">>> Server exited!\n");
exit($err);
} else {
while (true) {
global $g_future;
$g_future = new ExecFuture('exec %C', $command);
$g_future->resolve();
// If the server exited, wait a couple of seconds and restart it.
unset($g_future);
sleep(2);
}
}
}
/* -( Commands )----------------------------------------------------------- */
final protected function executeStartCommand() {
$console = PhutilConsole::getConsole();
$this->willLaunch();
$pid = pcntl_fork();
if ($pid < 0) {
throw new Exception('Failed to fork()!');
} else if ($pid) {
$console->writeErr(pht("Aphlict Server started.\n"));
exit(0);
}
// When we fork, the child process will inherit its parent's set of open
// file descriptors. If the parent process of bin/aphlict is waiting for
// bin/aphlict's file descriptors to close, it will be stuck waiting on
// the daemonized process. (This happens if e.g. bin/aphlict is started
// in another script using passthru().)
fclose(STDOUT);
fclose(STDERR);
$this->launch();
return 0;
}
final protected function executeStopCommand() {
$console = PhutilConsole::getConsole();
$pid = $this->getPID();
if (!$pid) {
$console->writeErr(pht("Aphlict is not running.\n"));
return 0;
}
$console->writeErr(pht("Stopping Aphlict Server (%s)...\n", $pid));
posix_kill($pid, SIGINT);
$start = time();
do {
if (!PhabricatorDaemonReference::isProcessRunning($pid)) {
$console->writeOut(
"%s\n",
pht('Aphlict Server (%s) exited normally.', $pid));
$pid = null;
break;
}
usleep(100000);
} while (time() < $start + 5);
if ($pid) {
$console->writeErr(pht('Sending %s a SIGKILL.', $pid)."\n");
posix_kill($pid, SIGKILL);
unset($pid);
}
Filesystem::remove($this->getPIDPath());
return 0;
}
private function getNodeBinary() {
if (Filesystem::binaryExists('nodejs')) {
return 'nodejs';
}
if (Filesystem::binaryExists('node')) {
return 'node';
}
throw new PhutilArgumentUsageException(
pht(
'No `nodejs` or `node` binary was found in $PATH. You must install '.
'Node.js to start the Aphlict server.'));
}
}
diff --git a/support/aphlict/server/aphlict_server.js b/support/aphlict/server/aphlict_server.js
index 9649c50c8a..aa5784f16b 100644
--- a/support/aphlict/server/aphlict_server.js
+++ b/support/aphlict/server/aphlict_server.js
@@ -1,245 +1,263 @@
var JX = require('./lib/javelin').JX;
var http = require('http');
var https = require('https');
var util = require('util');
var fs = require('fs');
JX.require('lib/AphlictListenerList', __dirname);
JX.require('lib/AphlictLog', __dirname);
function parse_command_line_arguments(argv) {
var config = {
'client-port': 22280,
'admin-port': 22281,
'client-host': '0.0.0.0',
'admin-host': '127.0.0.1',
log: '/var/log/aphlict.log',
'ssl-key': null,
'ssl-cert': null,
test: false
};
for (var ii = 2; ii < argv.length; ii++) {
var arg = argv[ii];
var matches = arg.match(/^--([^=]+)=(.*)$/);
if (!matches) {
throw new Error("Unknown argument '" + arg + "'!");
}
if (!(matches[1] in config)) {
throw new Error("Unknown argument '" + matches[1] + "'!");
}
config[matches[1]] = matches[2];
}
config['client-port'] = parseInt(config['client-port'], 10);
config['admin-port'] = parseInt(config['admin-port'], 10);
return config;
}
var debug = new JX.AphlictLog()
.addConsole(console);
var config = parse_command_line_arguments(process.argv);
process.on('uncaughtException', function(err) {
- debug.log('\n<<< UNCAUGHT EXCEPTION! >>>\n' + err.stack);
+ var context = null;
+ if ((err.code == 'EACCES') &&
+ (err.path == config.log)) {
+ context = util.format(
+ 'Unable to open logfile ("%s"). Check that permissions are set ' +
+ 'correctly.',
+ err.path);
+ }
+
+ var message = [
+ '\n<<< UNCAUGHT EXCEPTION! >>>',
+ ];
+ if (context) {
+ message.push(context);
+ }
+ message.push(err.stack);
+
+ debug.log(message.join('\n\n'));
+
process.exit(1);
});
var WebSocket;
try {
WebSocket = require('ws');
} catch (ex) {
throw new Error(
'You need to install the Node.js "ws" module for websocket support. ' +
'See "Notifications User Guide: Setup and Configuration" in the ' +
'documentation for instructions. ' + ex.toString());
}
var ssl_config = {
enabled: (config['ssl-key'] || config['ssl-cert'])
};
// Load the SSL certificates (if any were provided) now, so that runs with
// `--test` will see any errors.
if (ssl_config.enabled) {
ssl_config.key = fs.readFileSync(config['ssl-key']);
ssl_config.cert = fs.readFileSync(config['ssl-cert']);
}
// Add the logfile so we'll fail if we can't write to it.
-if (config.logfile) {
- debug.addLogfile(config.logfile);
+if (config.log) {
+ debug.addLogfile(config.log);
}
// If we're just doing a configuration test, exit here before starting any
// servers.
if (config.test) {
debug.log('Configuration test OK.');
process.exit(0);
}
var start_time = new Date().getTime();
var messages_out = 0;
var messages_in = 0;
var clients = new JX.AphlictListenerList();
function https_discard_handler(req, res) {
res.writeHead(501);
res.end('HTTP/501 Use Websockets\n');
}
var ws;
if (ssl_config.enabled) {
var https_server = https.createServer({
key: ssl_config.key,
cert: ssl_config.cert
}, https_discard_handler).listen(
config['client-port'],
config['client-host']);
ws = new WebSocket.Server({server: https_server});
} else {
ws = new WebSocket.Server({
port: config['client-port'],
host: config['client-host'],
});
}
ws.on('connection', function(ws) {
var listener = clients.addListener(ws);
function log() {
debug.log(
util.format('<%s>', listener.getDescription()) +
' ' +
util.format.apply(null, arguments));
}
log('Connected from %s.', ws._socket.remoteAddress);
ws.on('message', function(data) {
log('Received message: %s', data);
var message;
try {
message = JSON.parse(data);
} catch (err) {
log('Message is invalid: %s', err.message);
return;
}
switch (message.command) {
case 'subscribe':
log(
'Subscribed to: %s',
JSON.stringify(message.data));
listener.subscribe(message.data);
break;
case 'unsubscribe':
log(
'Unsubscribed from: %s',
JSON.stringify(message.data));
listener.unsubscribe(message.data);
break;
default:
log('Unrecognized command "%s".', message.command || '<undefined>');
}
});
ws.on('close', function() {
clients.removeListener(listener);
log('Disconnected.');
});
ws.on('error', function(err) {
log('Error: %s', err.message);
});
});
function transmit(msg) {
var listeners = clients.getListeners().filter(function(client) {
return client.isSubscribedToAny(msg.subscribers);
});
for (var i = 0; i < listeners.length; i++) {
var listener = listeners[i];
try {
listener.writeMessage(msg);
++messages_out;
debug.log('<%s> Wrote Message', listener.getDescription());
} catch (error) {
clients.removeListener(listener);
debug.log('<%s> Write Error: %s', listener.getDescription(), error);
}
}
}
http.createServer(function(request, response) {
// Publishing a notification.
if (request.url == '/') {
if (request.method == 'POST') {
var body = '';
request.on('data', function(data) {
body += data;
});
request.on('end', function() {
try {
var msg = JSON.parse(body);
debug.log('Received notification: ' + JSON.stringify(msg));
++messages_in;
try {
transmit(msg);
response.writeHead(200, {'Content-Type': 'text/plain'});
} catch (err) {
debug.log(
'<%s> Internal Server Error! %s',
request.socket.remoteAddress,
err);
response.writeHead(500, 'Internal Server Error');
}
} catch (err) {
debug.log(
'<%s> Bad Request! %s',
request.socket.remoteAddress,
err);
response.writeHead(400, 'Bad Request');
} finally {
response.end();
}
});
} else {
response.writeHead(405, 'Method Not Allowed');
response.end();
}
} else if (request.url == '/status/') {
var status = {
'uptime': (new Date().getTime() - start_time),
'clients.active': clients.getActiveListenerCount(),
'clients.total': clients.getTotalListenerCount(),
'messages.in': messages_in,
'messages.out': messages_out,
'log': config.log,
'version': 6
};
response.writeHead(200, {'Content-Type': 'application/json'});
response.write(JSON.stringify(status));
response.end();
} else {
response.writeHead(404, 'Not Found');
response.end();
}
}).listen(config['admin-port'], config['admin-host']);
debug.log('Started Server (PID %d)', process.pid);
diff --git a/support/aphlict/server/lib/AphlictLog.js b/support/aphlict/server/lib/AphlictLog.js
index 73b9031194..15c7dbc719 100644
--- a/support/aphlict/server/lib/AphlictLog.js
+++ b/support/aphlict/server/lib/AphlictLog.js
@@ -1,52 +1,52 @@
var JX = require('javelin').JX;
var fs = require('fs');
var util = require('util');
JX.install('AphlictLog', {
construct: function() {
this._writeToLogs = [];
this._writeToConsoles = [];
},
members: {
_writeToConsoles: null,
_writeToLogs: null,
addLogfile: function(path) {
var options = {
flags: 'a',
encoding: 'utf8',
mode: 066
};
- var logfile = fs.createWriteSteam(path, options);
+ var logfile = fs.createWriteStream(path, options);
this._writeToLogs.push(logfile);
return this;
},
addConsole: function(console) {
this._writeToConsoles.push(console);
return this;
},
log: function() {
var str = util.format.apply(null, arguments);
var date = new Date().toLocaleString();
str = '[' + date + '] ' + str;
var ii;
for (ii = 0; ii < this._writeToConsoles.length; ii++) {
this._writeToConsoles[ii].log(str);
}
for (ii = 0; ii < this._writeToLogs.length; ii++) {
this._writeToLogs[ii].write(str + '\n');
}
}
}
});
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Sun, Jan 19, 17:38 (1 w, 5 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1127042
Default Alt Text
(16 KB)
Attached To
Mode
rP Phorge
Attached
Detach File
Event Timeline
Log In to Comment