diff --git a/daemon-config-example.json b/daemon-config-example.json index 3ba5ca6..7d6b9e5 100644 --- a/daemon-config-example.json +++ b/daemon-config-example.json @@ -1,4 +1,6 @@ { + "ServerUrl": "http://127.0.0.1:5283/", + "ServerToken": "", "RabbitMQUrl": "amqp://localhost/", "RedisUrl": "redis://127.0.0.1:6379", "TestData": "/opt/syzoj/data/testdata", diff --git a/package.json b/package.json index 107f0e5..0e09dd6 100644 --- a/package.json +++ b/package.json @@ -32,6 +32,7 @@ "request-promise": "^4.2.1", "simple-sandbox": "^0.3.5", "socket.io": "^2.0.3", + "socket.io-client": "^2.2.0", "source-map-support": "^0.4.16", "syspipe": "^0.1.5", "tar": "^3.2.1", @@ -56,6 +57,7 @@ "@types/request": "^2.0.3", "@types/request-promise": "^4.1.37", "@types/socket.io": "^1.4.30", + "@types/socket.io-client": "^1.4.32", "@types/uuid": "^3.4.1", "@types/winston": "^2.3.5", "typescript": "^2.5.1" diff --git a/src/daemon/cleanup.ts b/src/daemon/cleanup.ts index 7e98c62..4d7f471 100644 --- a/src/daemon/cleanup.ts +++ b/src/daemon/cleanup.ts @@ -1,8 +1,10 @@ -import {disconnect as disconnectRMQ } from './rmq'; +import { disconnect as disconnectRMQ } from './rmq'; +import { disconnect as disconnectSIO } from './remote'; import winston = require('winston'); export function cleanUp(retCode: number) { winston.info('Cleaning up...'); disconnectRMQ(); - process.exit(1); -} \ No newline at end of file + disconnectSIO(); + process.exit(retCode); +} diff --git a/src/daemon/config.ts b/src/daemon/config.ts index f2369eb..1315e66 100644 --- a/src/daemon/config.ts +++ b/src/daemon/config.ts @@ -4,6 +4,8 @@ import winston = require('winston'); import { configureWinston } from '../winston-common'; export interface ConfigStructure { + serverUrl: string; + serverToken: string; rabbitMQ: string; testDataDirectory: string; priority: number; @@ -25,6 +27,8 @@ function readJSON(path: string): any { const configJSON = readJSON(options["config"]); export const globalConfig: ConfigStructure = { + serverUrl: configJSON.ServerUrl, + serverToken: configJSON.ServerToken, rabbitMQ: configJSON.RabbitMQUrl, testDataDirectory: configJSON.TestData, priority: configJSON.Priority, @@ -33,4 +37,4 @@ export const globalConfig: ConfigStructure = { tempDirectory: configJSON.TempDirectory } -configureWinston(options.verbose); \ No newline at end of file +configureWinston(options.verbose); diff --git a/src/daemon/index.ts b/src/daemon/index.ts index 2cb1a0f..643fd62 100644 --- a/src/daemon/index.ts +++ b/src/daemon/index.ts @@ -4,30 +4,32 @@ import winston = require('winston'); import { globalConfig as Cfg } from './config'; import util = require('util'); import rmq = require('./rmq'); +import remote = require('./remote'); import { judge } from './judge'; import { JudgeResult, ErrorType, ProgressReportType, OverallResult } from '../interfaces'; (async function () { winston.info("Daemon starts."); + await remote.connect(); await rmq.connect(); winston.info("Start consuming the queue."); - await rmq.waitForTask(async (task) => { + await remote.waitForTask(async (task) => { let result: OverallResult; try { - await rmq.reportProgress({ taskId: task.content.taskId, type: ProgressReportType.Started, progress: null }); + await remote.reportProgress({ taskId: task.content.taskId, type: ProgressReportType.Started, progress: null }); result = await judge(task.content, task.extraData, async (progress) => { - await rmq.reportProgress({ taskId: task.content.taskId, type: ProgressReportType.Progress, progress: progress }); + await remote.reportProgress({ taskId: task.content.taskId, type: ProgressReportType.Progress, progress: progress }); }, async (progress) => { const data = { taskId: task.content.taskId, type: ProgressReportType.Compiled, progress: progress }; - await rmq.reportProgress(data); - await rmq.reportResult(data); + await remote.reportProgress(data); + await remote.reportResult(data); }); } catch (err) { winston.warn(`Judge error!!! TaskId: ${task.content.taskId}`, err); result = { error: ErrorType.SystemError, systemMessage: `An error occurred.\n${err.toString()}` }; } const resultReport = { taskId: task.content.taskId, type: ProgressReportType.Finished, progress: result }; - await rmq.reportProgress(resultReport); - await rmq.reportResult(resultReport); + await remote.reportProgress(resultReport); + await remote.reportResult(resultReport); }); -})().then(() => { winston.info("Initialization logic completed."); }, (err) => { winston.error(util.inspect(err)); process.exit(1); }); \ No newline at end of file +})().then(() => { winston.info("Initialization logic completed."); }, (err) => { winston.error(util.inspect(err)); process.exit(1); }); diff --git a/src/daemon/remote.ts b/src/daemon/remote.ts new file mode 100644 index 0000000..27b6d79 --- /dev/null +++ b/src/daemon/remote.ts @@ -0,0 +1,75 @@ +import * as url from 'url'; +import * as util from 'util'; +import { globalConfig as Cfg } from './config'; +import msgpack = require('msgpack-lite'); +import winston = require('winston'); +import { ProgressReportData } from '../interfaces'; +import { JudgeTask } from './interfaces'; +import * as SocketIOClient from 'socket.io-client'; + +let socketIOConnection: SocketIOClient.Socket; +let cancelCurrentPull: Function; + +export async function connect() { + const socketIOUrl = url.resolve(Cfg.serverUrl, 'judge'); + winston.verbose(`Connect to Socket.IO "${socketIOUrl}"...`); + socketIOConnection = SocketIOClient(socketIOUrl); + + socketIOConnection.on('disconnect', () => { + winston.verbose(`Disconnected from Socket.IO "${socketIOUrl}"...`); + if (cancelCurrentPull) cancelCurrentPull(); + }); +} + +export async function disconnect() { + socketIOConnection.close(); +} + +export async function waitForTask(handle: (task: JudgeTask) => Promise) { + while (true) { + winston.verbose('Waiting for new task...'); + await new Promise((resolve, reject) => { + // This should be cancelled if socket disconnects. + let cancelled = false; + cancelCurrentPull = () => { + cancelled = true; + winston.verbose('Cancelled task polling since disconnected.'); + resolve(); + } + + socketIOConnection.once('onTask', async (payload: Buffer, ack: Function) => { + // After cancelled, a new pull is emitted while socket's still disconnected. + if (cancelled) return; + + try { + winston.verbose('onTask.'); + await handle(msgpack.decode(payload)); + ack(); + resolve(); + } catch (e) { + reject(e); + } + }); + + socketIOConnection.emit('waitForTask', Cfg.serverToken, () => { + winston.verbose('waitForTask acked.'); + }); + }); + } +} + +// Difference between result and progress: +// The `progress' is to be handled by *all* frontend proxies and pushed to all clients. +// The `result' is to be handled only *once*, and is to be written to the database. + +export async function reportProgress(data: ProgressReportData) { + winston.verbose('Reporting progress', data); + const payload = msgpack.encode(data); + socketIOConnection.emit('reportProgress', Cfg.serverToken, payload); +} + +export async function reportResult(data: ProgressReportData) { + winston.verbose('Reporting result', data); + const payload = msgpack.encode(data); + socketIOConnection.emit('reportResult', Cfg.serverToken, payload); +} diff --git a/src/daemon/rmq.ts b/src/daemon/rmq.ts index ac7913a..afb9850 100644 --- a/src/daemon/rmq.ts +++ b/src/daemon/rmq.ts @@ -4,23 +4,15 @@ import msgpack = require('msgpack-lite'); import winston = require('winston'); import util = require('util'); import uuid = require('uuid'); -import { RPCRequest, RPCReplyType, JudgeResult, ProgressReportData, RPCReply } from '../interfaces'; +import { RPCRequest, RPCReplyType, RPCReply } from '../interfaces'; import { cleanUp } from './cleanup'; -import { JudgeTask } from './interfaces'; import * as rmqCommon from '../rmq-common'; let amqpConnection: amqp.Connection; -let publicChannel: amqp.Channel; export async function connect() { winston.verbose(`Connecting to RabbitMQ "${Cfg.rabbitMQ}"...`); amqpConnection = await amqp.connect(Cfg.rabbitMQ); - winston.debug(`Connected to RabbitMQ, asserting queues`); - publicChannel = await newChannel(); - await rmqCommon.assertTaskQueue(publicChannel); - await rmqCommon.assertProgressReportExchange(publicChannel); - await rmqCommon.assertJudgeQueue(publicChannel); - await rmqCommon.assertResultReportQueue(publicChannel); amqpConnection.on('error', (err) => { winston.error(`RabbitMQ connection failure: ${err.toString()}`); cleanUp(2); @@ -35,22 +27,6 @@ async function newChannel(): Promise { return await amqpConnection.createChannel(); } -export async function waitForTask(handle: (task: JudgeTask) => Promise) { - await rmqCommon.waitForTask(amqpConnection, rmqCommon.judgeQueueName, Cfg.priority, () => false, handle); -} - -export async function reportProgress(data: ProgressReportData) { - winston.verbose('Reporting progress', data); - const payload = msgpack.encode(data); - publicChannel.publish(rmqCommon.progressExchangeName, '', payload); -} - -export async function reportResult(data: ProgressReportData) { - winston.verbose('Reporting result', data); - const payload = msgpack.encode(data); - publicChannel.sendToQueue(rmqCommon.resultReportQueueName, payload); -} - // started: Callback when this task is started. export async function runTask(task: RPCRequest, priority: number, started?: () => void): Promise { const correlationId = uuid(); @@ -87,4 +63,4 @@ export async function runTask(task: RPCRequest, priority: number, started?: () = winston.debug(`Task ${correlationId} sent.`); return resultPromise; -} \ No newline at end of file +} diff --git a/src/rmq-common.ts b/src/rmq-common.ts index 0a030b1..a5f9647 100644 --- a/src/rmq-common.ts +++ b/src/rmq-common.ts @@ -1,12 +1,9 @@ import amqp = require('amqplib'); -import msgpack = require('msgpack-lite'); import winston = require('winston'); +import msgpack = require('msgpack-lite'); export const maxPriority = 5; export const taskQueueName = 'task'; -export const progressExchangeName = 'progress'; -export const resultReportQueueName = 'result'; -export const judgeQueueName = 'judge'; export async function assertTaskQueue(channel: amqp.Channel) { await channel.assertQueue(taskQueueName, { @@ -14,26 +11,6 @@ export async function assertTaskQueue(channel: amqp.Channel) { }); } -// Difference between result and progress: -// The `progress' is to be handled by *all* frontend proxies and pushed to all clients. -// The `result' is to be handled only *once*, and is to be written to the database. - -export async function assertProgressReportExchange(channel: amqp.Channel) { - await channel.assertExchange(progressExchangeName, 'fanout', { durable: false }); -} - -export async function assertResultReportQueue(channel: amqp.Channel) { - await channel.assertQueue(resultReportQueueName, { durable: true }); -} - -export async function assertJudgeQueue(channel: amqp.Channel) { - await channel.assertQueue(judgeQueueName, { - maxPriority: maxPriority, - durable: true - }); -} - - export async function waitForTask(conn: amqp.Connection, queueName: string, priority: number, retry: (err: Error) => boolean, handle: (task: T) => Promise) { const channel = await conn.createChannel(); channel.prefetch(1); @@ -50,6 +27,6 @@ export async function waitForTask(conn: amqp.Connection, queueName: string, p channel.nack(msg, false, retry(err)); }); }, { - priority: priority - }); + priority: priority + }); } diff --git a/src/runner/index.ts b/src/runner/index.ts index d91c3bb..dcabec1 100644 --- a/src/runner/index.ts +++ b/src/runner/index.ts @@ -28,4 +28,4 @@ import { judgeStandard, judgeAnswerSubmission, judgeInteraction } from './judge' throw new Error(`Task type ${task.type} not supported!`); } }); -})().then(() => { winston.info("Initialization logic completed."); }, (err) => { winston.error(util.inspect(err)); process.exit(1); }); \ No newline at end of file +})().then(() => { winston.info("Initialization logic completed."); }, (err) => { winston.error(util.inspect(err)); process.exit(1); });