Page MenuHomePhorge

No OneTemporary

diff --git a/src/applications/aphlict/management/PhabricatorAphlictManagementWorkflow.php b/src/applications/aphlict/management/PhabricatorAphlictManagementWorkflow.php
index 5bcb66c135..3d1a26c91d 100644
--- a/src/applications/aphlict/management/PhabricatorAphlictManagementWorkflow.php
+++ b/src/applications/aphlict/management/PhabricatorAphlictManagementWorkflow.php
@@ -1,572 +1,579 @@
<?php
abstract class PhabricatorAphlictManagementWorkflow
extends PhabricatorManagementWorkflow {
private $debug = false;
private $configData;
private $configPath;
final protected function setDebug($debug) {
$this->debug = $debug;
return $this;
}
protected function getLaunchArguments() {
return array(
array(
'name' => 'config',
'param' => 'file',
'help' => pht(
'Use a specific configuration file instead of the default '.
'configuration.'),
),
);
}
protected function parseLaunchArguments(PhutilArgumentParser $args) {
$config_file = $args->getArg('config');
if ($config_file) {
$full_path = Filesystem::resolvePath($config_file);
$show_path = $full_path;
} else {
$root = dirname(dirname(phutil_get_library_root('phabricator')));
$try = array(
'phabricator/conf/aphlict/aphlict.custom.json',
'phabricator/conf/aphlict/aphlict.default.json',
);
foreach ($try as $config) {
$full_path = $root.'/'.$config;
$show_path = $config;
if (Filesystem::pathExists($full_path)) {
break;
}
}
}
echo tsprintf(
"%s\n",
pht(
'Reading configuration from: %s',
$show_path));
try {
$data = Filesystem::readFile($full_path);
} catch (Exception $ex) {
throw new PhutilArgumentUsageException(
pht(
'Failed to read configuration file. %s',
$ex->getMessage()));
}
try {
$data = phutil_json_decode($data);
} catch (Exception $ex) {
throw new PhutilArgumentUsageException(
pht(
'Configuration file is not properly formatted JSON. %s',
$ex->getMessage()));
}
try {
PhutilTypeSpec::checkMap(
$data,
array(
'servers' => 'list<wild>',
'logs' => 'optional list<wild>',
'cluster' => 'optional list<wild>',
'pidfile' => 'string',
'memory.hint' => 'optional int',
));
} catch (Exception $ex) {
throw new PhutilArgumentUsageException(
pht(
'Configuration file has improper configuration keys at top '.
'level. %s',
$ex->getMessage()));
}
$servers = $data['servers'];
$has_client = false;
$has_admin = false;
$port_map = array();
foreach ($servers as $index => $server) {
PhutilTypeSpec::checkMap(
$server,
array(
'type' => 'string',
'port' => 'int',
'listen' => 'optional string|null',
'ssl.key' => 'optional string|null',
'ssl.cert' => 'optional string|null',
'ssl.chain' => 'optional string|null',
));
$port = $server['port'];
if (!isset($port_map[$port])) {
$port_map[$port] = $index;
} else {
throw new PhutilArgumentUsageException(
pht(
'Two servers (at indexes "%s" and "%s") both bind to the same '.
'port ("%s"). Each server must bind to a unique port.',
$port_map[$port],
$index,
$port));
}
$type = $server['type'];
switch ($type) {
case 'admin':
$has_admin = true;
break;
case 'client':
$has_client = true;
break;
default:
throw new PhutilArgumentUsageException(
pht(
'A specified server (at index "%s", on port "%s") has an '.
'invalid type ("%s"). Valid types are: admin, client.',
$index,
$port,
$type));
}
$ssl_key = idx($server, 'ssl.key');
$ssl_cert = idx($server, 'ssl.cert');
if (($ssl_key && !$ssl_cert) || ($ssl_cert && !$ssl_key)) {
throw new PhutilArgumentUsageException(
pht(
'A specified server (at index "%s", on port "%s") specifies '.
'only one of "%s" and "%s". Each server must specify neither '.
'(to disable SSL) or specify both (to enable it).',
$index,
$port,
'ssl.key',
'ssl.cert'));
}
$ssl_chain = idx($server, 'ssl.chain');
if ($ssl_chain && (!$ssl_key && !$ssl_cert)) {
throw new PhutilArgumentUsageException(
pht(
'A specified server (at index "%s", on port "%s") specifies '.
'a value for "%s", but no value for "%s" or "%s". Servers '.
'should only provide an SSL chain if they also provide an SSL '.
'key and SSL certificate.',
$index,
$port,
'ssl.chain',
'ssl.key',
'ssl.cert'));
}
}
if (!$servers) {
throw new PhutilArgumentUsageException(
pht(
'Configuration file does not specify any servers. This service '.
'will not be able to interact with the outside world if it does '.
'not listen on any ports. You must specify at least one "%s" '.
'server and at least one "%s" server.',
'admin',
'client'));
}
if (!$has_client) {
throw new PhutilArgumentUsageException(
pht(
'Configuration file does not specify any client servers. This '.
'service will be unable to transmit any notifications without a '.
'client server. You must specify at least one server with '.
'type "%s".',
'client'));
}
if (!$has_admin) {
throw new PhutilArgumentUsageException(
pht(
'Configuration file does not specify any administrative '.
'servers. This service will be unable to receive messages. '.
'You must specify at least one server with type "%s".',
'admin'));
}
$logs = idx($data, 'logs', array());
foreach ($logs as $index => $log) {
PhutilTypeSpec::checkMap(
$log,
array(
'path' => 'string',
));
$path = $log['path'];
try {
$dir = dirname($path);
if (!Filesystem::pathExists($dir)) {
Filesystem::createDirectory($dir, 0755, true);
}
} catch (FilesystemException $ex) {
throw new PhutilArgumentUsageException(
pht(
'Failed to create directory "%s" for specified log file (with '.
'index "%s"). You should manually create this directory or '.
'choose a different logfile location. %s',
$dir,
$index,
$ex->getMessage()));
}
}
$peer_map = array();
$cluster = idx($data, 'cluster', array());
foreach ($cluster as $index => $peer) {
PhutilTypeSpec::checkMap(
$peer,
array(
'host' => 'string',
'port' => 'int',
'protocol' => 'string',
));
$host = $peer['host'];
$port = $peer['port'];
$protocol = $peer['protocol'];
switch ($protocol) {
case 'http':
case 'https':
break;
default:
throw new PhutilArgumentUsageException(
pht(
'Configuration file specifies cluster peer ("%s", at index '.
'"%s") with an invalid protocol, "%s". Valid protocols are '.
'"%s" or "%s".',
$host,
$index,
$protocol,
'http',
'https'));
}
$peer_key = "{$host}:{$port}";
if (!isset($peer_map[$peer_key])) {
$peer_map[$peer_key] = $index;
} else {
throw new PhutilArgumentUsageException(
pht(
'Configuration file specifies cluster peer "%s" more than '.
'once (at indexes "%s" and "%s"). Each peer must have a '.
'unique host and port combination.',
$peer_key,
$peer_map[$peer_key],
$index));
}
}
$this->configData = $data;
$this->configPath = $full_path;
$pid_path = $this->getPIDPath();
try {
$dir = dirname($pid_path);
if (!Filesystem::pathExists($dir)) {
Filesystem::createDirectory($dir, 0755, true);
}
} catch (FilesystemException $ex) {
throw new PhutilArgumentUsageException(
pht(
'Failed to create directory "%s" for specified PID file. You '.
'should manually create this directory or choose a different '.
'PID file location. %s',
$dir,
$ex->getMessage()));
}
}
final public function getPIDPath() {
return $this->configData['pidfile'];
}
final public function getPID() {
$pid = null;
if (Filesystem::pathExists($this->getPIDPath())) {
$pid = (int)Filesystem::readFile($this->getPIDPath());
}
return $pid;
}
final public function cleanup($signo = null) {
global $g_future;
if ($g_future) {
$g_future->resolveKill();
$g_future = null;
}
Filesystem::remove($this->getPIDPath());
if ($signo !== null) {
$signame = phutil_get_signal_name($signo);
error_log("Caught signal {$signame}, exiting.");
}
exit(1);
}
public static function requireExtensions() {
self::mustHaveExtension('pcntl');
self::mustHaveExtension('posix');
}
private static function mustHaveExtension($ext) {
if (!extension_loaded($ext)) {
echo pht(
"ERROR: The PHP extension '%s' is not installed. You must ".
"install it to run Aphlict on this machine.",
$ext)."\n";
exit(1);
}
$extension = new ReflectionExtension($ext);
foreach ($extension->getFunctions() as $function) {
$function = $function->name;
if (!function_exists($function)) {
echo pht(
'ERROR: The PHP function %s is disabled. You must '.
'enable it to run Aphlict on this machine.',
$function.'()')."\n";
exit(1);
}
}
}
final protected function willLaunch() {
$console = PhutilConsole::getConsole();
$pid = $this->getPID();
if ($pid) {
throw new PhutilArgumentUsageException(
pht(
'Unable to start notifications server because it is already '.
'running. Use `%s` to restart it.',
'aphlict restart'));
}
if (posix_getuid() == 0) {
throw new PhutilArgumentUsageException(
pht('The notification server should not be run as root.'));
}
// Make sure we can write to the PID file.
if (!$this->debug) {
Filesystem::writeFile($this->getPIDPath(), '');
}
// First, start the server in configuration test mode with --test. This
// will let us error explicitly if there are missing modules, before we
// fork and lose access to the console.
$test_argv = $this->getServerArgv();
$test_argv[] = '--test=true';
execx('%C', $this->getStartCommand($test_argv));
}
private function getServerArgv() {
$server_argv = array();
$server_argv[] = '--config='.$this->configPath;
return $server_argv;
}
final protected function launch() {
$console = PhutilConsole::getConsole();
if ($this->debug) {
$console->writeOut(
"%s\n",
pht('Starting Aphlict server in foreground...'));
} else {
Filesystem::writeFile($this->getPIDPath(), getmypid());
}
$command = $this->getStartCommand($this->getServerArgv());
if (!$this->debug) {
declare(ticks = 1);
pcntl_signal(SIGINT, array($this, 'cleanup'));
pcntl_signal(SIGTERM, array($this, 'cleanup'));
}
register_shutdown_function(array($this, 'cleanup'));
if ($this->debug) {
$console->writeOut(
"%s\n\n $ %s\n\n",
pht('Launching server:'),
$command);
$err = phutil_passthru('%C', $command);
$console->writeOut(">>> %s\n", pht('Server exited!'));
exit($err);
} else {
while (true) {
global $g_future;
$g_future = new ExecFuture('exec %C', $command);
// Discard all output the subprocess produces: it writes to the log on
// disk, so we don't need to send the output anywhere and can just
// throw it away.
$g_future
->setStdoutSizeLimit(0)
->setStderrSizeLimit(0);
$g_future->resolve();
// If the server exited, wait a couple of seconds and restart it.
unset($g_future);
sleep(2);
}
}
}
/* -( Commands )----------------------------------------------------------- */
final protected function executeStartCommand() {
$console = PhutilConsole::getConsole();
$this->willLaunch();
$log = $this->getOverseerLogPath();
if ($log !== null) {
echo tsprintf(
"%s\n",
pht(
'Writing logs to: %s',
$log));
}
$pid = pcntl_fork();
if ($pid < 0) {
throw new Exception(
pht(
'Failed to %s!',
'fork()'));
} else if ($pid) {
$console->writeErr("%s\n", pht('Aphlict Server started.'));
exit(0);
}
// Redirect process errors to the error log. If we do not do this, any
// error the `aphlict` process itself encounters vanishes into thin air.
if ($log !== null) {
ini_set('error_log', $log);
}
// When we fork, the child process will inherit its parent's set of open
// file descriptors. If the parent process of bin/aphlict is waiting for
// bin/aphlict's file descriptors to close, it will be stuck waiting on
// the daemonized process. (This happens if e.g. bin/aphlict is started
// in another script using passthru().)
fclose(STDOUT);
fclose(STDERR);
$this->launch();
return 0;
}
final protected function executeStopCommand() {
$console = PhutilConsole::getConsole();
$pid = $this->getPID();
if (!$pid) {
$console->writeErr("%s\n", pht('Aphlict is not running.'));
return 0;
}
$console->writeErr("%s\n", pht('Stopping Aphlict Server (%s)...', $pid));
posix_kill($pid, SIGINT);
$start = time();
do {
if (!PhabricatorDaemonReference::isProcessRunning($pid)) {
$console->writeOut(
"%s\n",
pht('Aphlict Server (%s) exited normally.', $pid));
$pid = null;
break;
}
usleep(100000);
} while (time() < $start + 5);
if ($pid) {
$console->writeErr("%s\n", pht('Sending %s a SIGKILL.', $pid));
posix_kill($pid, SIGKILL);
unset($pid);
}
Filesystem::remove($this->getPIDPath());
return 0;
}
private function getNodeBinary() {
if (Filesystem::binaryExists('nodejs')) {
return 'nodejs';
}
if (Filesystem::binaryExists('node')) {
return 'node';
}
throw new PhutilArgumentUsageException(
pht(
'No `%s` or `%s` binary was found in %s. You must install '.
'Node.js to start the Aphlict server.',
'nodejs',
'node',
'$PATH'));
}
private function getAphlictScriptPath() {
$root = dirname(phutil_get_library_root('phabricator'));
return $root.'/support/aphlict/server/aphlict_server.js';
}
private function getNodeArgv() {
$argv = array();
$hint = idx($this->configData, 'memory.hint');
$hint = nonempty($hint, 256);
$argv[] = sprintf('--max-old-space-size=%d', $hint);
return $argv;
}
private function getStartCommand(array $server_argv) {
+ $launch_argv = array();
+
+ if ($this->debug) {
+ $launch_argv[] = '--debug=1';
+ }
+
return csprintf(
- '%R %Ls -- %s %Ls',
+ '%R %Ls -- %s %Ls %Ls',
$this->getNodeBinary(),
$this->getNodeArgv(),
$this->getAphlictScriptPath(),
+ $launch_argv,
$server_argv);
}
private function getOverseerLogPath() {
// For now, just return the first log. We could refine this eventually.
$logs = idx($this->configData, 'logs', array());
foreach ($logs as $log) {
return $log['path'];
}
return null;
}
}
diff --git a/support/aphlict/server/aphlict_server.js b/support/aphlict/server/aphlict_server.js
index 34bddf4fa9..3ccfefb6f5 100644
--- a/support/aphlict/server/aphlict_server.js
+++ b/support/aphlict/server/aphlict_server.js
@@ -1,199 +1,204 @@
'use strict';
var JX = require('./lib/javelin').JX;
var http = require('http');
var https = require('https');
var util = require('util');
var fs = require('fs');
function parse_command_line_arguments(argv) {
var args = {
test: false,
+ debug: false,
config: null
};
for (var ii = 2; ii < argv.length; ii++) {
var arg = argv[ii];
var matches = arg.match(/^--([^=]+)=(.*)$/);
if (!matches) {
throw new Error('Unknown argument "' + arg + '"!');
}
if (!(matches[1] in args)) {
throw new Error('Unknown argument "' + matches[1] + '"!');
}
args[matches[1]] = matches[2];
}
return args;
}
function parse_config(args) {
var data = fs.readFileSync(args.config);
return JSON.parse(data);
}
require('./lib/AphlictLog');
-var debug = new JX.AphlictLog()
- .addConsole(console);
+var debug = new JX.AphlictLog();
var args = parse_command_line_arguments(process.argv);
var config = parse_config(args);
+if (args.test || args.debug) {
+ debug.addConsole(console);
+ debug.setTrace(true);
+}
+
function set_exit_code(code) {
process.on('exit', function() {
process.exit(code);
});
}
process.on('uncaughtException', function(err) {
var context = null;
if (err.code == 'EACCES') {
context = util.format(
'Unable to open file ("%s"). Check that permissions are set ' +
'correctly.',
err.path);
}
var message = [
'\n<<< UNCAUGHT EXCEPTION! >>>',
];
if (context) {
message.push(context);
}
message.push(err.stack);
debug.log(message.join('\n\n'));
set_exit_code(1);
});
try {
require('ws');
} catch (ex) {
throw new Error(
'You need to install the Node.js "ws" module for websocket support. ' +
'See "Notifications User Guide: Setup and Configuration" in the ' +
'documentation for instructions. ' + ex.toString());
}
// NOTE: Require these only after checking for the "ws" module, since they
// depend on it.
require('./lib/AphlictAdminServer');
require('./lib/AphlictClientServer');
require('./lib/AphlictPeerList');
require('./lib/AphlictPeer');
var ii;
var logs = config.logs || [];
for (ii = 0; ii < logs.length; ii++) {
debug.addLog(logs[ii].path);
}
var servers = [];
for (ii = 0; ii < config.servers.length; ii++) {
var spec = config.servers[ii];
spec.listen = spec.listen || '0.0.0.0';
if (spec['ssl.key']) {
spec['ssl.key'] = fs.readFileSync(spec['ssl.key']);
}
if (spec['ssl.cert']){
spec['ssl.cert'] = fs.readFileSync(spec['ssl.cert']);
if (spec['ssl.chain']){
spec['ssl.cert'] += "\n" + fs.readFileSync(spec['ssl.chain']);
}
}
servers.push(spec);
}
// If we're just doing a configuration test, exit here before starting any
// servers.
if (args.test) {
debug.log('Configuration test OK.');
set_exit_code(0);
return;
}
debug.log('Starting servers (service PID %d).', process.pid);
for (ii = 0; ii < logs.length; ii++) {
debug.log('Logging to "%s".', logs[ii].path);
}
var aphlict_servers = [];
var aphlict_clients = [];
var aphlict_admins = [];
for (ii = 0; ii < servers.length; ii++) {
var server = servers[ii];
var is_client = (server.type == 'client');
var http_server;
if (server['ssl.key']) {
var https_config = {
key: server['ssl.key'],
cert: server['ssl.cert'],
};
http_server = https.createServer(https_config);
} else {
http_server = http.createServer();
}
var aphlict_server;
if (is_client) {
aphlict_server = new JX.AphlictClientServer(http_server);
} else {
aphlict_server = new JX.AphlictAdminServer(http_server);
}
aphlict_server.setLogger(debug);
aphlict_server.listen(server.port, server.listen);
debug.log(
'Started %s server (Port %d, %s).',
server.type,
server.port,
server['ssl.key'] ? 'With SSL' : 'No SSL');
aphlict_servers.push(aphlict_server);
if (is_client) {
aphlict_clients.push(aphlict_server);
} else {
aphlict_admins.push(aphlict_server);
}
}
var peer_list = new JX.AphlictPeerList();
debug.log(
'This server has fingerprint "%s".',
peer_list.getFingerprint());
var cluster = config.cluster || [];
for (ii = 0; ii < cluster.length; ii++) {
var peer = cluster[ii];
var peer_client = new JX.AphlictPeer()
.setHost(peer.host)
.setPort(peer.port)
.setProtocol(peer.protocol);
peer_list.addPeer(peer_client);
}
for (ii = 0; ii < aphlict_admins.length; ii++) {
var admin_server = aphlict_admins[ii];
admin_server.setClientServers(aphlict_clients);
admin_server.setPeerList(peer_list);
}
for (ii = 0; ii < aphlict_clients.length; ii++) {
var client_server = aphlict_clients[ii];
client_server.setAdminServers(aphlict_admins);
}
diff --git a/support/aphlict/server/lib/AphlictAdminServer.js b/support/aphlict/server/lib/AphlictAdminServer.js
index dd428063c2..e2192e8afd 100644
--- a/support/aphlict/server/lib/AphlictAdminServer.js
+++ b/support/aphlict/server/lib/AphlictAdminServer.js
@@ -1,262 +1,272 @@
'use strict';
var JX = require('./javelin').JX;
require('./AphlictListenerList');
-var http = require('http');
var url = require('url');
JX.install('AphlictAdminServer', {
construct: function(server) {
this._startTime = new Date().getTime();
this._messagesIn = 0;
this._messagesOut = 0;
server.on('request', JX.bind(this, this._onrequest));
this._server = server;
this._clientServers = [];
this._messageHistory = [];
},
properties: {
clientServers: null,
logger: null,
peerList: null
},
members: {
_messagesIn: null,
_messagesOut: null,
_server: null,
_startTime: null,
_messageHistory: null,
getListenerLists: function(instance) {
var clients = this.getClientServers();
var lists = [];
for (var ii = 0; ii < clients.length; ii++) {
lists.push(clients[ii].getListenerList(instance));
}
return lists;
},
log: function() {
var logger = this.getLogger();
if (!logger) {
return;
}
logger.log.apply(logger, arguments);
return this;
},
+ trace: function() {
+ var logger = this.getLogger();
+ if (!logger) {
+ return;
+ }
+
+ logger.trace.apply(logger, arguments);
+
+ return this;
+ },
+
listen: function() {
return this._server.listen.apply(this._server, arguments);
},
_onrequest: function(request, response) {
var self = this;
var u = url.parse(request.url, true);
var instance = u.query.instance || 'default';
// Publishing a notification.
if (u.pathname == '/') {
if (request.method == 'POST') {
var body = '';
request.on('data', function(data) {
body += data;
});
request.on('end', function() {
try {
var msg = JSON.parse(body);
- self.log(
+ self.trace(
'Received notification (' + instance + '): ' +
JSON.stringify(msg));
++self._messagesIn;
try {
self._transmit(instance, msg, response);
} catch (err) {
self.log(
'<%s> Internal Server Error! %s',
request.socket.remoteAddress,
err);
response.writeHead(500, 'Internal Server Error');
}
} catch (err) {
self.log(
'<%s> Bad Request! %s',
request.socket.remoteAddress,
err);
response.writeHead(400, 'Bad Request');
} finally {
response.end();
}
});
} else {
response.writeHead(405, 'Method Not Allowed');
response.end();
}
} else if (u.pathname == '/status/') {
this._handleStatusRequest(request, response, instance);
} else {
response.writeHead(404, 'Not Found');
response.end();
}
},
_handleStatusRequest: function(request, response, instance) {
var active_count = 0;
var total_count = 0;
var lists = this.getListenerLists(instance);
for (var ii = 0; ii < lists.length; ii++) {
var list = lists[ii];
active_count += list.getActiveListenerCount();
total_count += list.getTotalListenerCount();
}
var now = new Date().getTime();
var history_size = this._messageHistory.length;
var history_age = null;
if (history_size) {
history_age = (now - this._messageHistory[0].timestamp);
}
var server_status = {
'instance': instance,
'uptime': (now - this._startTime),
'clients.active': active_count,
'clients.total': total_count,
'messages.in': this._messagesIn,
'messages.out': this._messagesOut,
'version': 7,
'history.size': history_size,
'history.age': history_age
};
response.writeHead(200, {'Content-Type': 'application/json'});
response.write(JSON.stringify(server_status));
response.end();
},
/**
* Transmits a message to all subscribed listeners.
*/
_transmit: function(instance, message, response) {
var now = new Date().getTime();
this._messageHistory.push(
{
timestamp: now,
message: message
});
this._purgeHistory();
var peer_list = this.getPeerList();
message = peer_list.addFingerprint(message);
if (message) {
var lists = this.getListenerLists(instance);
for (var ii = 0; ii < lists.length; ii++) {
var list = lists[ii];
var listeners = list.getListeners();
this._transmitToListeners(list, listeners, message);
}
peer_list.broadcastMessage(instance, message);
}
// Respond to the caller with our fingerprint so it can stop sending
// us traffic we don't need to know about if it's a peer. In particular,
// this stops us from broadcasting messages to ourselves if we appear
// in the cluster list.
var receipt = {
fingerprint: this.getPeerList().getFingerprint()
};
response.writeHead(200, {'Content-Type': 'application/json'});
response.write(JSON.stringify(receipt));
},
_transmitToListeners: function(list, listeners, message) {
for (var ii = 0; ii < listeners.length; ii++) {
var listener = listeners[ii];
if (!listener.isSubscribedToAny(message.subscribers)) {
continue;
}
try {
listener.writeMessage(message);
++this._messagesOut;
- this.log(
+ this.trace(
'<%s> Wrote Message',
listener.getDescription());
} catch (error) {
list.removeListener(listener);
- this.log(
+ this.trace(
'<%s> Write Error: %s',
listener.getDescription(),
error);
}
}
},
getHistory: function(min_age) {
var history = this._messageHistory;
var results = [];
for (var ii = 0; ii < history.length; ii++) {
if (history[ii].timestamp >= min_age) {
results.push(history[ii].message);
}
}
return results;
},
_purgeHistory: function() {
var messages = this._messageHistory;
// Maximum number of messages to retain.
var size_limit = 4096;
// Find the index of the first item we're going to keep. If we have too
// many items, this will be somewhere past the beginning of the list.
var keep = Math.max(0, messages.length - size_limit);
// Maximum number of milliseconds of history to retain.
var age_limit = 60000;
// Move the index forward until we find an item that is recent enough
// to retain.
var now = new Date().getTime();
var min_age = (now - age_limit);
for (keep; keep < messages.length; keep++) {
if (messages[keep].timestamp >= min_age) {
break;
}
}
// Throw away extra messages.
if (keep) {
this._messageHistory.splice(0, keep);
}
}
}
});
diff --git a/support/aphlict/server/lib/AphlictClientServer.js b/support/aphlict/server/lib/AphlictClientServer.js
index 989b3f1db1..4157906bb6 100644
--- a/support/aphlict/server/lib/AphlictClientServer.js
+++ b/support/aphlict/server/lib/AphlictClientServer.js
@@ -1,190 +1,207 @@
'use strict';
var JX = require('./javelin').JX;
require('./AphlictListenerList');
-require('./AphlictLog');
var url = require('url');
var util = require('util');
var WebSocket = require('ws');
JX.install('AphlictClientServer', {
construct: function(server) {
server.on('request', JX.bind(this, this._onrequest));
this._server = server;
this._lists = {};
this._adminServers = [];
},
properties: {
logger: null,
adminServers: null
},
members: {
_server: null,
_lists: null,
getListenerList: function(instance) {
if (!this._lists[instance]) {
this._lists[instance] = new JX.AphlictListenerList(instance);
}
return this._lists[instance];
},
getHistory: function(age) {
var results = [];
var servers = this.getAdminServers();
for (var ii = 0; ii < servers.length; ii++) {
var messages = servers[ii].getHistory(age);
for (var jj = 0; jj < messages.length; jj++) {
results.push(messages[jj]);
}
}
return results;
},
log: function() {
var logger = this.getLogger();
if (!logger) {
return;
}
logger.log.apply(logger, arguments);
return this;
},
+ trace: function() {
+ var logger = this.getLogger();
+ if (!logger) {
+ return;
+ }
+
+ logger.trace.apply(logger, arguments);
+
+ return this;
+ },
+
_onrequest: function(request, response) {
// The websocket code upgrades connections before they get here, so
// this only handles normal HTTP connections. We just fail them with
// a 501 response.
response.writeHead(501);
response.end('HTTP/501 Use Websockets\n');
},
_parseInstanceFromPath: function(path) {
// If there's no "~" marker in the path, it's not an instance name.
// Users sometimes configure nginx or Apache to proxy based on the
// path.
if (path.indexOf('~') === -1) {
return 'default';
}
var instance = path.split('~')[1];
// Remove any "/" characters.
instance = instance.replace(/\//g, '');
if (!instance.length) {
return 'default';
}
return instance;
},
listen: function() {
var self = this;
var server = this._server.listen.apply(this._server, arguments);
var wss = new WebSocket.Server({server: server});
// This function checks for upgradeReq which is only available in
// ws2 by default, not ws3. See T12755 for more information.
wss.on('connection', function(ws, request) {
if ('upgradeReq' in ws) {
request = ws.upgradeReq;
}
var path = url.parse(request.url).pathname;
var instance = self._parseInstanceFromPath(path);
var listener = self.getListenerList(instance).addListener(ws);
- function log() {
- self.log(
- util.format('<%s>', listener.getDescription()) +
+ function msg(argv) {
+ return util.format('<%s>', listener.getDescription()) +
' ' +
- util.format.apply(null, arguments));
+ util.format.apply(null, argv);
+ }
+
+ function log() {
+ self.log(msg(arguments));
}
- log('Connected from %s.', ws._socket.remoteAddress);
+ function trace() {
+ self.trace(msg(arguments));
+ }
+
+ trace('Connected from %s.', ws._socket.remoteAddress);
ws.on('message', function(data) {
- log('Received message: %s', data);
+ trace('Received message: %s', data);
var message;
try {
message = JSON.parse(data);
} catch (err) {
log('Message is invalid: %s', err.message);
return;
}
switch (message.command) {
case 'subscribe':
- log(
+ trace(
'Subscribed to: %s',
JSON.stringify(message.data));
listener.subscribe(message.data);
break;
case 'unsubscribe':
- log(
+ trace(
'Unsubscribed from: %s',
JSON.stringify(message.data));
listener.unsubscribe(message.data);
break;
case 'replay':
var age = message.data.age || 60000;
var min_age = (new Date().getTime() - age);
var old_messages = self.getHistory(min_age);
for (var ii = 0; ii < old_messages.length; ii++) {
var old_message = old_messages[ii];
if (!listener.isSubscribedToAny(old_message.subscribers)) {
continue;
}
try {
listener.writeMessage(old_message);
} catch (error) {
break;
}
}
break;
case 'ping':
var pong = {
type: 'pong'
};
try {
listener.writeMessage(pong);
} catch (error) {
// Ignore any issues here, we'll clean up elsewhere.
}
break;
default:
log(
'Unrecognized command "%s".',
message.command || '<undefined>');
}
});
ws.on('close', function() {
self.getListenerList(instance).removeListener(listener);
- log('Disconnected.');
+ trace('Disconnected.');
});
});
}
}
});
diff --git a/support/aphlict/server/lib/AphlictLog.js b/support/aphlict/server/lib/AphlictLog.js
index 77d40793cb..62513b0f01 100644
--- a/support/aphlict/server/lib/AphlictLog.js
+++ b/support/aphlict/server/lib/AphlictLog.js
@@ -1,47 +1,61 @@
'use strict';
var JX = require('./javelin').JX;
var fs = require('fs');
var util = require('util');
JX.install('AphlictLog', {
construct: function() {
this._consoles = [];
this._logs = [];
},
members: {
_consoles: null,
_logs: null,
+ _trace: null,
+
+ setTrace: function(trace) {
+ this._trace = trace;
+ return this;
+ },
addConsole: function(console) {
this._consoles.push(console);
return this;
},
addLog: function(path) {
this._logs.push(fs.createWriteStream(path, {
flags: 'a',
encoding: 'utf8',
mode: '0664',
}));
return this;
},
+ trace: function() {
+ if (!this._trace) {
+ return;
+ }
+
+ return this.log.apply(this, arguments);
+ },
+
log: function() {
var str = util.format.apply(null, arguments);
var date = new Date().toLocaleString();
str = '[' + date + '] ' + str;
var ii;
for (ii = 0; ii < this._consoles.length; ii++) {
this._consoles[ii].log(str);
}
for (ii = 0; ii < this._logs.length; ii++) {
this._logs[ii].write(str + '\n');
}
},
},
});

File Metadata

Mime Type
text/x-diff
Expires
Jan 19 2025, 23:13 (6 w, 3 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1129746
Default Alt Text
(36 KB)

Event Timeline