From d1d019383e5cb0c96ed2191f900970654e4055c0 Mon Sep 17 00:00:00 2001 From: Menci Date: Sat, 30 Mar 2019 14:24:03 +0800 Subject: [PATCH] Use Socket.IO to communicate with judge client --- app.js | 7 +- config-example.json | 1 + libs/judger.js | 340 ++++++++++++++++++++++++++------------------ modules/socketio.js | 4 +- 4 files changed, 208 insertions(+), 144 deletions(-) diff --git a/app.js b/app.js index 7ac8114..18197c9 100644 --- a/app.js +++ b/app.js @@ -79,9 +79,6 @@ global.syzoj = { })()); await this.connectDatabase(); - if (!module.parent) { - await this.lib('judger').connect(); - } // redis and redisCache is for syzoj-renderer const redis = require('redis'); @@ -91,6 +88,10 @@ global.syzoj = { set: util.promisify(this.redis.set).bind(this.redis) }; + if (!module.parent) { + await this.lib('judger').connect(); + } + this.loadModules(); }, async connectDatabase() { diff --git a/config-example.json b/config-example.json index ba7bcc8..152a4bd 100644 --- a/config-example.json +++ b/config-example.json @@ -110,6 +110,7 @@ } ], "session_secret": "@SESSION_SECRET@", + "judge_token": "@JUDGE_TOKEN@", "rabbitMQ": "amqp://localhost/", "redis": "redis://127.0.0.1:6379", "email_jwt_secret": "@EMAIL_JWT_SECRET@", diff --git a/libs/judger.js b/libs/judger.js index 7a8bbc1..d97b112 100644 --- a/libs/judger.js +++ b/libs/judger.js @@ -1,19 +1,15 @@ -const enums = require('./enums'), - rp = require('request-promise'), - url = require('url'); - -const amqp = require('amqplib'); +const enums = require('./enums'); const util = require('util'); const winston = require('winston'); const msgPack = require('msgpack-lite'); +const fs = Promise.promisifyAll(require('fs-extra')); const interface = require('./judger_interfaces'); const judgeResult = require('./judgeResult'); -let amqpConnection; -let amqpSendChannel; -let amqpConsumeChannel; +const JudgeState = syzoj.model('judge_state'); const judgeStateCache = new Map(); +const progressPusher = require('../modules/socketio'); function getRunningTaskStatusString(result) { let isPending = status => [0, 1].includes(status); @@ -28,149 +24,213 @@ function getRunningTaskStatusString(result) { return `Running ${allFinished}/${allTotal}`; } -async function connect () { - amqpConnection = await amqp.connect(syzoj.config.rabbitMQ); - amqpSendChannel = await amqpConnection.createChannel(); - await amqpSendChannel.assertQueue('judge', { - maxPriority: 5, - durable: true - }); - await amqpSendChannel.assertQueue('result', { - durable: true - }); - await amqpSendChannel.assertExchange('progress', 'fanout', { - durable: false +let judgeQueue; + +async function connect() { + judgeQueue = { + redisZADD: util.promisify(syzoj.redis.zadd).bind(syzoj.redis), + redisBZPOPMAX: util.promisify(syzoj.redis.bzpopmax).bind(syzoj.redis), + async push(data, priority) { + return await this.redisZADD('judge', priority, JSON.stringify(data)); + }, + async poll(timeout) { + const result = await this.redisBZPOPMAX('judge', timeout); + if (!result) return null; + + return { + data: JSON.parse(result[1]), + priority: result[2] + }; + } + }; + + const judgeNamespace = syzoj.socketIO.of('judge'); + judgeNamespace.on('connect', socket => { + winston.info(`Judge client ${socket.id} connected.`); + + let pendingAckTaskObj = null, waitingForTask = false; + socket.on('waitForTask', async (token, ack) => { + // Ignore requests with invalid token. + if (token != syzoj.config.judge_token) { + winston.warn(`Judge client ${socket.id} emitted waitForTask with invalid token.`); + return; + } + + ack(); + + if (waitingForTask) { + winston.verbose(`Judge client ${socket.id} emitted waitForTask, but already waiting, ignoring.`); + return; + } + + waitingForTask = true; + + winston.verbose(`Judge client ${socket.id} emitted waitForTask.`); + + // Poll the judge queue, timeout = 10s. + let obj; + while (socket.connected && !obj) { + obj = await judgeQueue.poll(10); + } + + if (!obj) { + winston.verbose(`Judge client ${socket.id} disconnected, stop poll the queue.`); + // Socket disconnected and no task got. + return; + } + + // Re-push to queue if got task but judge client already disconnected. + if (socket.disconnected) { + winston.verbose(`Judge client ${socket.id} got task but disconnected re-pushing task to queue.`); + judgeQueue.push(obj.data, obj.priority); + return; + } + + // Send task to judge client, and wait for ack. + const task = obj.data; + pendingAckTaskObj = obj; + winston.verbose(`Sending task ${task.content.taskId} to judge client ${socket.id}.`); + socket.emit('onTask', msgPack.encode(task), () => { + // Acked. + winston.verbose(`Judge client ${socket.id} acked task ${task.content.taskId}.`); + pendingAckTaskObj = null; + waitingForTask = false; + }); }); - amqpConsumeChannel = await amqpConnection.createChannel(); - amqpConsumeChannel.prefetch(1); - amqpConsumeChannel.consume('result', async (msg) => { - (async(msg) => { - const data = msgPack.decode(msg.content); - winston.verbose('Received report for task ' + data.taskId); - let JudgeState = syzoj.model('judge_state'); - let judge_state = await JudgeState.findOne({ where: { task_id: data.taskId } }); - if(data.type === interface.ProgressReportType.Finished) { - const convertedResult = judgeResult.convertResult(data.taskId, data.progress); - winston.verbose('Reporting report finished: ' + data.taskId); - const payload = msgPack.encode({ type: interface.ProgressReportType.Reported, taskId: data.taskId }); - amqpSendChannel.publish('progress', '', payload); - if(!judge_state) return; - judge_state.score = convertedResult.score; - judge_state.pending = false; - judge_state.status = convertedResult.statusString; - judge_state.total_time = convertedResult.time; - judge_state.max_memory = convertedResult.memory; - judge_state.result = convertedResult.result; - await judge_state.save(); - await judge_state.updateRelatedInfo(); - } else if(data.type == interface.ProgressReportType.Compiled) { - if(!judge_state) return; - judge_state.compilation = data.progress; - await judge_state.save(); - } else { - winston.error("Unsupported result type: " + data.type); - } - - })(msg).then(async() => { - amqpConsumeChannel.ack(msg) - }, async(err) => { - winston.error('Error handling report', err); - amqpConsumeChannel.nack(msg, false, false); - }); + + socket.on('disconnect', reason => { + winston.info(`Judge client ${socket.id} disconnected, reason = ${util.inspect(reason)}.`); + if (pendingAckTaskObj) { + // A task sent but not acked, push to queue again. + winston.warn(`Re-pushing task ${pendingAckTaskObj.data.content.taskId} to judge queue.`); + judgeQueue.push(pendingAckTaskObj.data, pendingAckTaskObj.priority); + pendingAckTaskObj = null; + } }); - socketio = require('../modules/socketio'); - const progressChannel = await amqpConnection.createChannel(); - const queueName = (await progressChannel.assertQueue('', { exclusive: true })).queue; - await progressChannel.bindQueue(queueName, 'progress', ''); - await progressChannel.consume(queueName, (msg) => { - const data = msgPack.decode(msg.content); - winston.verbose(`Got result from progress exchange, id: ${data.taskId}`); - - (async (result) => { - if (result.type === interface.ProgressReportType.Started) { - socketio.createTask(result.taskId); - judgeStateCache.set(data.taskId, { - result: 'Compiling', - score: 0, - time: 0, - memory: 0 - }) - } else if (result.type === interface.ProgressReportType.Compiled) { - socketio.updateCompileStatus(result.taskId, result.progress); - } else if (result.type === interface.ProgressReportType.Progress) { - const convertedResult = judgeResult.convertResult(data.taskId, data.progress); - judgeStateCache.set(data.taskId, { - result: getRunningTaskStatusString(data.progress), - score: convertedResult.score, - time: convertedResult.time, - memory: convertedResult.memory - }); - socketio.updateProgress(result.taskId, result.progress); - } else if (result.type === interface.ProgressReportType.Finished) { - socketio.updateResult(result.taskId, result.progress); - setTimeout(() => { - judgeStateCache.delete(result.taskId); - }, 5000); - } else if (result.type === interface.ProgressReportType.Reported) { - socketio.cleanupProgress(result.taskId); - } - })(data).then(async() => { - progressChannel.ack(msg) - }, async(err) => { - console.log(err); - winston.error('Error handling progress', err); - progressChannel.nack(msg, false, false); + + socket.on('reportProgress', async (token, payload) => { + // Ignore requests with invalid token. + if (token !== syzoj.config.judge_token) { + winston.warn(`Judge client ${socket.id} emitted reportProgress with invalid token.`); + return; + } + + const progress = msgPack.decode(payload); + winston.verbose(`Got progress from progress exchange, id: ${progress.taskId}`); + + if (progress.type === interface.ProgressReportType.Started) { + progressPusher.createTask(progress.taskId); + judgeStateCache.set(progress.taskId, { + result: 'Compiling', + score: 0, + time: 0, + memory: 0 + }); + } else if (progress.type === interface.ProgressReportType.Compiled) { + progressPusher.updateCompileStatus(progress.taskId, progress.progress); + } else if (progress.type === interface.ProgressReportType.Progress) { + const convertedResult = judgeResult.convertResult(progress.taskId, progress.progress); + judgeStateCache.set(progress.taskId, { + result: getRunningTaskStatusString(progress.progress), + score: convertedResult.score, + time: convertedResult.time, + memory: convertedResult.memory }); + progressPusher.updateProgress(progress.taskId, progress.progress); + } else if (progress.type === interface.ProgressReportType.Finished) { + progressPusher.updateResult(progress.taskId, progress.progress); + setTimeout(() => { + judgeStateCache.delete(progress.taskId); + }, 5000); + } else if (progress.type === interface.ProgressReportType.Reported) { + progressPusher.cleanupProgress(progress.taskId); + } }); - winston.debug('Created progress exchange queue', queueName); - amqpConnection.on('error', (err) => { - winston.error('RabbitMQ connection failure: ${err.toString()}'); - amqpConnection.close(); - process.exit(1); + + socket.on('reportResult', async (token, payload) => { + // Ignore requests with invalid token. + if (token !== syzoj.config.judge_token) { + winston.warn(`Judge client ${socket.id} emitted reportResult with invalid token.`); + return; + } + + const result = msgPack.decode(payload); + winston.verbose('Received report for task ' + result.taskId); + + const judge_state = await JudgeState.findOne({ + where: { + task_id: result.taskId + } + }); + + if (result.type === interface.ProgressReportType.Finished) { + const convertedResult = judgeResult.convertResult(result.taskId, result.progress); + winston.verbose('Reporting report finished: ' + result.taskId); + progressPusher.cleanupProgress(result.taskId); + + if (!judge_state) return; + judge_state.score = convertedResult.score; + judge_state.pending = false; + judge_state.status = convertedResult.statusString; + judge_state.total_time = convertedResult.time; + judge_state.max_memory = convertedResult.memory; + judge_state.result = convertedResult.result; + await judge_state.save(); + await judge_state.updateRelatedInfo(); + } else if (result.type == interface.ProgressReportType.Compiled) { + if (!judge_state) return; + judge_state.compilation = result.progress; + await judge_state.save(); + } else { + winston.error('Unsupported result type: ' + result.type); + } }); + }); } module.exports.connect = connect; module.exports.judge = async function (judge_state, problem, priority) { - let type, param, extraData = null; - switch (problem.type) { - case 'submit-answer': - type = enums.ProblemType.AnswerSubmission; - param = null; - let fs = Promise.promisifyAll(require('fs-extra')); - extraData = await fs.readFileAsync(syzoj.model('file').resolvePath('answer', judge_state.code)); - break; - case 'interaction': - type = enums.ProblemType.Interaction; - param = { - language: judge_state.language, - code: judge_state.code, - timeLimit: problem.time_limit, - memoryLimit: problem.memory_limit, - } - break; - default: - type = enums.ProblemType.Standard; - param = { - language: judge_state.language, - code: judge_state.code, - timeLimit: problem.time_limit, - memoryLimit: problem.memory_limit, - fileIOInput: problem.file_io ? problem.file_io_input_name : null, - fileIOOutput: problem.file_io ? problem.file_io_output_name : null - }; - break; - } + let type, param, extraData = null; + switch (problem.type) { + case 'submit-answer': + type = enums.ProblemType.AnswerSubmission; + param = null; + extraData = await fs.readFileAsync(syzoj.model('file').resolvePath('answer', judge_state.code)); + break; + case 'interaction': + type = enums.ProblemType.Interaction; + param = { + language: judge_state.language, + code: judge_state.code, + timeLimit: problem.time_limit, + memoryLimit: problem.memory_limit, + } + break; + default: + type = enums.ProblemType.Standard; + param = { + language: judge_state.language, + code: judge_state.code, + timeLimit: problem.time_limit, + memoryLimit: problem.memory_limit, + fileIOInput: problem.file_io ? problem.file_io_input_name : null, + fileIOOutput: problem.file_io ? problem.file_io_output_name : null + }; + break; + } - const content = { - taskId: judge_state.task_id, - testData: problem.id.toString(), - type: type, - priority: priority, - param: param - }; + const content = { + taskId: judge_state.task_id, + testData: problem.id.toString(), + type: type, + priority: priority, + param: param + }; - amqpSendChannel.sendToQueue('judge', msgPack.encode({ content: content, extraData: extraData }), { priority: priority }); + judgeQueue.push({ + content: content, + extraData: extraData + }, priority); } module.exports.getCachedJudgeState = taskId => judgeStateCache.get(taskId); diff --git a/modules/socketio.js b/modules/socketio.js index 9ce70bd..7acc982 100644 --- a/modules/socketio.js +++ b/modules/socketio.js @@ -203,6 +203,8 @@ function initializeSocketIO(s) { }; } }); + + return ioInstance; } exports.initializeSocketIO = initializeSocketIO; function createTask(taskId) { @@ -306,4 +308,4 @@ function cleanupProgress(taskId) { exports.cleanupProgress = cleanupProgress; //# sourceMappingURL=socketio.js.map -initializeSocketIO(app.server); +syzoj.socketIO = initializeSocketIO(app.server);