Browse Source

Use Socket.IO to communicate with web server

master
Menci 6 years ago
parent
commit
78680ff3c7
  1. 2
      daemon-config-example.json
  2. 2
      package.json
  3. 6
      src/daemon/cleanup.ts
  4. 4
      src/daemon/config.ts
  5. 16
      src/daemon/index.ts
  6. 75
      src/daemon/remote.ts
  7. 26
      src/daemon/rmq.ts
  8. 29
      src/rmq-common.ts

2
daemon-config-example.json

@ -1,4 +1,6 @@
{ {
"ServerUrl": "http://127.0.0.1:5283/",
"ServerToken": "",
"RabbitMQUrl": "amqp://localhost/", "RabbitMQUrl": "amqp://localhost/",
"RedisUrl": "redis://127.0.0.1:6379", "RedisUrl": "redis://127.0.0.1:6379",
"TestData": "/opt/syzoj/data/testdata", "TestData": "/opt/syzoj/data/testdata",

2
package.json

@ -32,6 +32,7 @@
"request-promise": "^4.2.1", "request-promise": "^4.2.1",
"simple-sandbox": "^0.3.5", "simple-sandbox": "^0.3.5",
"socket.io": "^2.0.3", "socket.io": "^2.0.3",
"socket.io-client": "^2.2.0",
"source-map-support": "^0.4.16", "source-map-support": "^0.4.16",
"syspipe": "^0.1.5", "syspipe": "^0.1.5",
"tar": "^3.2.1", "tar": "^3.2.1",
@ -56,6 +57,7 @@
"@types/request": "^2.0.3", "@types/request": "^2.0.3",
"@types/request-promise": "^4.1.37", "@types/request-promise": "^4.1.37",
"@types/socket.io": "^1.4.30", "@types/socket.io": "^1.4.30",
"@types/socket.io-client": "^1.4.32",
"@types/uuid": "^3.4.1", "@types/uuid": "^3.4.1",
"@types/winston": "^2.3.5", "@types/winston": "^2.3.5",
"typescript": "^2.5.1" "typescript": "^2.5.1"

6
src/daemon/cleanup.ts

@ -1,8 +1,10 @@
import {disconnect as disconnectRMQ } from './rmq'; import { disconnect as disconnectRMQ } from './rmq';
import { disconnect as disconnectSIO } from './remote';
import winston = require('winston'); import winston = require('winston');
export function cleanUp(retCode: number) { export function cleanUp(retCode: number) {
winston.info('Cleaning up...'); winston.info('Cleaning up...');
disconnectRMQ(); disconnectRMQ();
process.exit(1); disconnectSIO();
process.exit(retCode);
} }

4
src/daemon/config.ts

@ -4,6 +4,8 @@ import winston = require('winston');
import { configureWinston } from '../winston-common'; import { configureWinston } from '../winston-common';
export interface ConfigStructure { export interface ConfigStructure {
serverUrl: string;
serverToken: string;
rabbitMQ: string; rabbitMQ: string;
testDataDirectory: string; testDataDirectory: string;
priority: number; priority: number;
@ -25,6 +27,8 @@ function readJSON(path: string): any {
const configJSON = readJSON(options["config"]); const configJSON = readJSON(options["config"]);
export const globalConfig: ConfigStructure = { export const globalConfig: ConfigStructure = {
serverUrl: configJSON.ServerUrl,
serverToken: configJSON.ServerToken,
rabbitMQ: configJSON.RabbitMQUrl, rabbitMQ: configJSON.RabbitMQUrl,
testDataDirectory: configJSON.TestData, testDataDirectory: configJSON.TestData,
priority: configJSON.Priority, priority: configJSON.Priority,

16
src/daemon/index.ts

@ -4,30 +4,32 @@ import winston = require('winston');
import { globalConfig as Cfg } from './config'; import { globalConfig as Cfg } from './config';
import util = require('util'); import util = require('util');
import rmq = require('./rmq'); import rmq = require('./rmq');
import remote = require('./remote');
import { judge } from './judge'; import { judge } from './judge';
import { JudgeResult, ErrorType, ProgressReportType, OverallResult } from '../interfaces'; import { JudgeResult, ErrorType, ProgressReportType, OverallResult } from '../interfaces';
(async function () { (async function () {
winston.info("Daemon starts."); winston.info("Daemon starts.");
await remote.connect();
await rmq.connect(); await rmq.connect();
winston.info("Start consuming the queue."); winston.info("Start consuming the queue.");
await rmq.waitForTask(async (task) => { await remote.waitForTask(async (task) => {
let result: OverallResult; let result: OverallResult;
try { try {
await rmq.reportProgress({ taskId: task.content.taskId, type: ProgressReportType.Started, progress: null }); await remote.reportProgress({ taskId: task.content.taskId, type: ProgressReportType.Started, progress: null });
result = await judge(task.content, task.extraData, async (progress) => { result = await judge(task.content, task.extraData, async (progress) => {
await rmq.reportProgress({ taskId: task.content.taskId, type: ProgressReportType.Progress, progress: progress }); await remote.reportProgress({ taskId: task.content.taskId, type: ProgressReportType.Progress, progress: progress });
}, async (progress) => { }, async (progress) => {
const data = { taskId: task.content.taskId, type: ProgressReportType.Compiled, progress: progress }; const data = { taskId: task.content.taskId, type: ProgressReportType.Compiled, progress: progress };
await rmq.reportProgress(data); await remote.reportProgress(data);
await rmq.reportResult(data); await remote.reportResult(data);
}); });
} catch (err) { } catch (err) {
winston.warn(`Judge error!!! TaskId: ${task.content.taskId}`, err); winston.warn(`Judge error!!! TaskId: ${task.content.taskId}`, err);
result = { error: ErrorType.SystemError, systemMessage: `An error occurred.\n${err.toString()}` }; result = { error: ErrorType.SystemError, systemMessage: `An error occurred.\n${err.toString()}` };
} }
const resultReport = { taskId: task.content.taskId, type: ProgressReportType.Finished, progress: result }; const resultReport = { taskId: task.content.taskId, type: ProgressReportType.Finished, progress: result };
await rmq.reportProgress(resultReport); await remote.reportProgress(resultReport);
await rmq.reportResult(resultReport); await remote.reportResult(resultReport);
}); });
})().then(() => { winston.info("Initialization logic completed."); }, (err) => { winston.error(util.inspect(err)); process.exit(1); }); })().then(() => { winston.info("Initialization logic completed."); }, (err) => { winston.error(util.inspect(err)); process.exit(1); });

75
src/daemon/remote.ts

@ -0,0 +1,75 @@
import * as url from 'url';
import * as util from 'util';
import { globalConfig as Cfg } from './config';
import msgpack = require('msgpack-lite');
import winston = require('winston');
import { ProgressReportData } from '../interfaces';
import { JudgeTask } from './interfaces';
import * as SocketIOClient from 'socket.io-client';
let socketIOConnection: SocketIOClient.Socket;
let cancelCurrentPull: Function;
export async function connect() {
const socketIOUrl = url.resolve(Cfg.serverUrl, 'judge');
winston.verbose(`Connect to Socket.IO "${socketIOUrl}"...`);
socketIOConnection = SocketIOClient(socketIOUrl);
socketIOConnection.on('disconnect', () => {
winston.verbose(`Disconnected from Socket.IO "${socketIOUrl}"...`);
if (cancelCurrentPull) cancelCurrentPull();
});
}
export async function disconnect() {
socketIOConnection.close();
}
export async function waitForTask(handle: (task: JudgeTask) => Promise<void>) {
while (true) {
winston.verbose('Waiting for new task...');
await new Promise((resolve, reject) => {
// This should be cancelled if socket disconnects.
let cancelled = false;
cancelCurrentPull = () => {
cancelled = true;
winston.verbose('Cancelled task polling since disconnected.');
resolve();
}
socketIOConnection.once('onTask', async (payload: Buffer, ack: Function) => {
// After cancelled, a new pull is emitted while socket's still disconnected.
if (cancelled) return;
try {
winston.verbose('onTask.');
await handle(msgpack.decode(payload));
ack();
resolve();
} catch (e) {
reject(e);
}
});
socketIOConnection.emit('waitForTask', Cfg.serverToken, () => {
winston.verbose('waitForTask acked.');
});
});
}
}
// Difference between result and progress:
// The `progress' is to be handled by *all* frontend proxies and pushed to all clients.
// The `result' is to be handled only *once*, and is to be written to the database.
export async function reportProgress(data: ProgressReportData) {
winston.verbose('Reporting progress', data);
const payload = msgpack.encode(data);
socketIOConnection.emit('reportProgress', Cfg.serverToken, payload);
}
export async function reportResult(data: ProgressReportData) {
winston.verbose('Reporting result', data);
const payload = msgpack.encode(data);
socketIOConnection.emit('reportResult', Cfg.serverToken, payload);
}

26
src/daemon/rmq.ts

@ -4,23 +4,15 @@ import msgpack = require('msgpack-lite');
import winston = require('winston'); import winston = require('winston');
import util = require('util'); import util = require('util');
import uuid = require('uuid'); import uuid = require('uuid');
import { RPCRequest, RPCReplyType, JudgeResult, ProgressReportData, RPCReply } from '../interfaces'; import { RPCRequest, RPCReplyType, RPCReply } from '../interfaces';
import { cleanUp } from './cleanup'; import { cleanUp } from './cleanup';
import { JudgeTask } from './interfaces';
import * as rmqCommon from '../rmq-common'; import * as rmqCommon from '../rmq-common';
let amqpConnection: amqp.Connection; let amqpConnection: amqp.Connection;
let publicChannel: amqp.Channel;
export async function connect() { export async function connect() {
winston.verbose(`Connecting to RabbitMQ "${Cfg.rabbitMQ}"...`); winston.verbose(`Connecting to RabbitMQ "${Cfg.rabbitMQ}"...`);
amqpConnection = await amqp.connect(Cfg.rabbitMQ); amqpConnection = await amqp.connect(Cfg.rabbitMQ);
winston.debug(`Connected to RabbitMQ, asserting queues`);
publicChannel = await newChannel();
await rmqCommon.assertTaskQueue(publicChannel);
await rmqCommon.assertProgressReportExchange(publicChannel);
await rmqCommon.assertJudgeQueue(publicChannel);
await rmqCommon.assertResultReportQueue(publicChannel);
amqpConnection.on('error', (err) => { amqpConnection.on('error', (err) => {
winston.error(`RabbitMQ connection failure: ${err.toString()}`); winston.error(`RabbitMQ connection failure: ${err.toString()}`);
cleanUp(2); cleanUp(2);
@ -35,22 +27,6 @@ async function newChannel(): Promise<amqp.Channel> {
return await amqpConnection.createChannel(); return await amqpConnection.createChannel();
} }
export async function waitForTask(handle: (task: JudgeTask) => Promise<void>) {
await rmqCommon.waitForTask(amqpConnection, rmqCommon.judgeQueueName, Cfg.priority, () => false, handle);
}
export async function reportProgress(data: ProgressReportData) {
winston.verbose('Reporting progress', data);
const payload = msgpack.encode(data);
publicChannel.publish(rmqCommon.progressExchangeName, '', payload);
}
export async function reportResult(data: ProgressReportData) {
winston.verbose('Reporting result', data);
const payload = msgpack.encode(data);
publicChannel.sendToQueue(rmqCommon.resultReportQueueName, payload);
}
// started: Callback when this task is started. // started: Callback when this task is started.
export async function runTask(task: RPCRequest, priority: number, started?: () => void): Promise<any> { export async function runTask(task: RPCRequest, priority: number, started?: () => void): Promise<any> {
const correlationId = uuid(); const correlationId = uuid();

29
src/rmq-common.ts

@ -1,12 +1,9 @@
import amqp = require('amqplib'); import amqp = require('amqplib');
import msgpack = require('msgpack-lite');
import winston = require('winston'); import winston = require('winston');
import msgpack = require('msgpack-lite');
export const maxPriority = 5; export const maxPriority = 5;
export const taskQueueName = 'task'; export const taskQueueName = 'task';
export const progressExchangeName = 'progress';
export const resultReportQueueName = 'result';
export const judgeQueueName = 'judge';
export async function assertTaskQueue(channel: amqp.Channel) { export async function assertTaskQueue(channel: amqp.Channel) {
await channel.assertQueue(taskQueueName, { await channel.assertQueue(taskQueueName, {
@ -14,26 +11,6 @@ export async function assertTaskQueue(channel: amqp.Channel) {
}); });
} }
// Difference between result and progress:
// The `progress' is to be handled by *all* frontend proxies and pushed to all clients.
// The `result' is to be handled only *once*, and is to be written to the database.
export async function assertProgressReportExchange(channel: amqp.Channel) {
await channel.assertExchange(progressExchangeName, 'fanout', { durable: false });
}
export async function assertResultReportQueue(channel: amqp.Channel) {
await channel.assertQueue(resultReportQueueName, { durable: true });
}
export async function assertJudgeQueue(channel: amqp.Channel) {
await channel.assertQueue(judgeQueueName, {
maxPriority: maxPriority,
durable: true
});
}
export async function waitForTask<T>(conn: amqp.Connection, queueName: string, priority: number, retry: (err: Error) => boolean, handle: (task: T) => Promise<void>) { export async function waitForTask<T>(conn: amqp.Connection, queueName: string, priority: number, retry: (err: Error) => boolean, handle: (task: T) => Promise<void>) {
const channel = await conn.createChannel(); const channel = await conn.createChannel();
channel.prefetch(1); channel.prefetch(1);
@ -50,6 +27,6 @@ export async function waitForTask<T>(conn: amqp.Connection, queueName: string, p
channel.nack(msg, false, retry(err)); channel.nack(msg, false, retry(err));
}); });
}, { }, {
priority: priority priority: priority
}); });
} }

Loading…
Cancel
Save