Browse Source

Use Socket.IO to communicate with judge client

master
Menci 6 years ago
parent
commit
d1d019383e
  1. 7
      app.js
  2. 1
      config-example.json
  3. 242
      libs/judger.js
  4. 4
      modules/socketio.js

7
app.js

@ -79,9 +79,6 @@ global.syzoj = {
})()); })());
await this.connectDatabase(); await this.connectDatabase();
if (!module.parent) {
await this.lib('judger').connect();
}
// redis and redisCache is for syzoj-renderer // redis and redisCache is for syzoj-renderer
const redis = require('redis'); const redis = require('redis');
@ -91,6 +88,10 @@ global.syzoj = {
set: util.promisify(this.redis.set).bind(this.redis) set: util.promisify(this.redis.set).bind(this.redis)
}; };
if (!module.parent) {
await this.lib('judger').connect();
}
this.loadModules(); this.loadModules();
}, },
async connectDatabase() { async connectDatabase() {

1
config-example.json

@ -110,6 +110,7 @@
} }
], ],
"session_secret": "@SESSION_SECRET@", "session_secret": "@SESSION_SECRET@",
"judge_token": "@JUDGE_TOKEN@",
"rabbitMQ": "amqp://localhost/", "rabbitMQ": "amqp://localhost/",
"redis": "redis://127.0.0.1:6379", "redis": "redis://127.0.0.1:6379",
"email_jwt_secret": "@EMAIL_JWT_SECRET@", "email_jwt_secret": "@EMAIL_JWT_SECRET@",

242
libs/judger.js

@ -1,19 +1,15 @@
const enums = require('./enums'), const enums = require('./enums');
rp = require('request-promise'),
url = require('url');
const amqp = require('amqplib');
const util = require('util'); const util = require('util');
const winston = require('winston'); const winston = require('winston');
const msgPack = require('msgpack-lite'); const msgPack = require('msgpack-lite');
const fs = Promise.promisifyAll(require('fs-extra'));
const interface = require('./judger_interfaces'); const interface = require('./judger_interfaces');
const judgeResult = require('./judgeResult'); const judgeResult = require('./judgeResult');
let amqpConnection; const JudgeState = syzoj.model('judge_state');
let amqpSendChannel;
let amqpConsumeChannel;
const judgeStateCache = new Map(); const judgeStateCache = new Map();
const progressPusher = require('../modules/socketio');
function getRunningTaskStatusString(result) { function getRunningTaskStatusString(result) {
let isPending = status => [0, 1].includes(status); let isPending = status => [0, 1].includes(status);
@ -28,105 +24,167 @@ function getRunningTaskStatusString(result) {
return `Running ${allFinished}/${allTotal}`; return `Running ${allFinished}/${allTotal}`;
} }
let judgeQueue;
async function connect() { async function connect() {
amqpConnection = await amqp.connect(syzoj.config.rabbitMQ); judgeQueue = {
amqpSendChannel = await amqpConnection.createChannel(); redisZADD: util.promisify(syzoj.redis.zadd).bind(syzoj.redis),
await amqpSendChannel.assertQueue('judge', { redisBZPOPMAX: util.promisify(syzoj.redis.bzpopmax).bind(syzoj.redis),
maxPriority: 5, async push(data, priority) {
durable: true return await this.redisZADD('judge', priority, JSON.stringify(data));
},
async poll(timeout) {
const result = await this.redisBZPOPMAX('judge', timeout);
if (!result) return null;
return {
data: JSON.parse(result[1]),
priority: result[2]
};
}
};
const judgeNamespace = syzoj.socketIO.of('judge');
judgeNamespace.on('connect', socket => {
winston.info(`Judge client ${socket.id} connected.`);
let pendingAckTaskObj = null, waitingForTask = false;
socket.on('waitForTask', async (token, ack) => {
// Ignore requests with invalid token.
if (token != syzoj.config.judge_token) {
winston.warn(`Judge client ${socket.id} emitted waitForTask with invalid token.`);
return;
}
ack();
if (waitingForTask) {
winston.verbose(`Judge client ${socket.id} emitted waitForTask, but already waiting, ignoring.`);
return;
}
waitingForTask = true;
winston.verbose(`Judge client ${socket.id} emitted waitForTask.`);
// Poll the judge queue, timeout = 10s.
let obj;
while (socket.connected && !obj) {
obj = await judgeQueue.poll(10);
}
if (!obj) {
winston.verbose(`Judge client ${socket.id} disconnected, stop poll the queue.`);
// Socket disconnected and no task got.
return;
}
// Re-push to queue if got task but judge client already disconnected.
if (socket.disconnected) {
winston.verbose(`Judge client ${socket.id} got task but disconnected re-pushing task to queue.`);
judgeQueue.push(obj.data, obj.priority);
return;
}
// Send task to judge client, and wait for ack.
const task = obj.data;
pendingAckTaskObj = obj;
winston.verbose(`Sending task ${task.content.taskId} to judge client ${socket.id}.`);
socket.emit('onTask', msgPack.encode(task), () => {
// Acked.
winston.verbose(`Judge client ${socket.id} acked task ${task.content.taskId}.`);
pendingAckTaskObj = null;
waitingForTask = false;
}); });
await amqpSendChannel.assertQueue('result', {
durable: true
}); });
await amqpSendChannel.assertExchange('progress', 'fanout', {
durable: false socket.on('disconnect', reason => {
winston.info(`Judge client ${socket.id} disconnected, reason = ${util.inspect(reason)}.`);
if (pendingAckTaskObj) {
// A task sent but not acked, push to queue again.
winston.warn(`Re-pushing task ${pendingAckTaskObj.data.content.taskId} to judge queue.`);
judgeQueue.push(pendingAckTaskObj.data, pendingAckTaskObj.priority);
pendingAckTaskObj = null;
}
}); });
amqpConsumeChannel = await amqpConnection.createChannel();
amqpConsumeChannel.prefetch(1); socket.on('reportProgress', async (token, payload) => {
amqpConsumeChannel.consume('result', async (msg) => { // Ignore requests with invalid token.
(async(msg) => { if (token !== syzoj.config.judge_token) {
const data = msgPack.decode(msg.content); winston.warn(`Judge client ${socket.id} emitted reportProgress with invalid token.`);
winston.verbose('Received report for task ' + data.taskId); return;
let JudgeState = syzoj.model('judge_state');
let judge_state = await JudgeState.findOne({ where: { task_id: data.taskId } });
if(data.type === interface.ProgressReportType.Finished) {
const convertedResult = judgeResult.convertResult(data.taskId, data.progress);
winston.verbose('Reporting report finished: ' + data.taskId);
const payload = msgPack.encode({ type: interface.ProgressReportType.Reported, taskId: data.taskId });
amqpSendChannel.publish('progress', '', payload);
if(!judge_state) return;
judge_state.score = convertedResult.score;
judge_state.pending = false;
judge_state.status = convertedResult.statusString;
judge_state.total_time = convertedResult.time;
judge_state.max_memory = convertedResult.memory;
judge_state.result = convertedResult.result;
await judge_state.save();
await judge_state.updateRelatedInfo();
} else if(data.type == interface.ProgressReportType.Compiled) {
if(!judge_state) return;
judge_state.compilation = data.progress;
await judge_state.save();
} else {
winston.error("Unsupported result type: " + data.type);
} }
})(msg).then(async() => { const progress = msgPack.decode(payload);
amqpConsumeChannel.ack(msg) winston.verbose(`Got progress from progress exchange, id: ${progress.taskId}`);
}, async(err) => {
winston.error('Error handling report', err); if (progress.type === interface.ProgressReportType.Started) {
amqpConsumeChannel.nack(msg, false, false); progressPusher.createTask(progress.taskId);
}); judgeStateCache.set(progress.taskId, {
});
socketio = require('../modules/socketio');
const progressChannel = await amqpConnection.createChannel();
const queueName = (await progressChannel.assertQueue('', { exclusive: true })).queue;
await progressChannel.bindQueue(queueName, 'progress', '');
await progressChannel.consume(queueName, (msg) => {
const data = msgPack.decode(msg.content);
winston.verbose(`Got result from progress exchange, id: ${data.taskId}`);
(async (result) => {
if (result.type === interface.ProgressReportType.Started) {
socketio.createTask(result.taskId);
judgeStateCache.set(data.taskId, {
result: 'Compiling', result: 'Compiling',
score: 0, score: 0,
time: 0, time: 0,
memory: 0 memory: 0
}) });
} else if (result.type === interface.ProgressReportType.Compiled) { } else if (progress.type === interface.ProgressReportType.Compiled) {
socketio.updateCompileStatus(result.taskId, result.progress); progressPusher.updateCompileStatus(progress.taskId, progress.progress);
} else if (result.type === interface.ProgressReportType.Progress) { } else if (progress.type === interface.ProgressReportType.Progress) {
const convertedResult = judgeResult.convertResult(data.taskId, data.progress); const convertedResult = judgeResult.convertResult(progress.taskId, progress.progress);
judgeStateCache.set(data.taskId, { judgeStateCache.set(progress.taskId, {
result: getRunningTaskStatusString(data.progress), result: getRunningTaskStatusString(progress.progress),
score: convertedResult.score, score: convertedResult.score,
time: convertedResult.time, time: convertedResult.time,
memory: convertedResult.memory memory: convertedResult.memory
}); });
socketio.updateProgress(result.taskId, result.progress); progressPusher.updateProgress(progress.taskId, progress.progress);
} else if (result.type === interface.ProgressReportType.Finished) { } else if (progress.type === interface.ProgressReportType.Finished) {
socketio.updateResult(result.taskId, result.progress); progressPusher.updateResult(progress.taskId, progress.progress);
setTimeout(() => { setTimeout(() => {
judgeStateCache.delete(result.taskId); judgeStateCache.delete(progress.taskId);
}, 5000); }, 5000);
} else if (result.type === interface.ProgressReportType.Reported) { } else if (progress.type === interface.ProgressReportType.Reported) {
socketio.cleanupProgress(result.taskId); progressPusher.cleanupProgress(progress.taskId);
} }
})(data).then(async() => { });
progressChannel.ack(msg)
}, async(err) => { socket.on('reportResult', async (token, payload) => {
console.log(err); // Ignore requests with invalid token.
winston.error('Error handling progress', err); if (token !== syzoj.config.judge_token) {
progressChannel.nack(msg, false, false); winston.warn(`Judge client ${socket.id} emitted reportResult with invalid token.`);
return;
}
const result = msgPack.decode(payload);
winston.verbose('Received report for task ' + result.taskId);
const judge_state = await JudgeState.findOne({
where: {
task_id: result.taskId
}
}); });
if (result.type === interface.ProgressReportType.Finished) {
const convertedResult = judgeResult.convertResult(result.taskId, result.progress);
winston.verbose('Reporting report finished: ' + result.taskId);
progressPusher.cleanupProgress(result.taskId);
if (!judge_state) return;
judge_state.score = convertedResult.score;
judge_state.pending = false;
judge_state.status = convertedResult.statusString;
judge_state.total_time = convertedResult.time;
judge_state.max_memory = convertedResult.memory;
judge_state.result = convertedResult.result;
await judge_state.save();
await judge_state.updateRelatedInfo();
} else if (result.type == interface.ProgressReportType.Compiled) {
if (!judge_state) return;
judge_state.compilation = result.progress;
await judge_state.save();
} else {
winston.error('Unsupported result type: ' + result.type);
}
}); });
winston.debug('Created progress exchange queue', queueName);
amqpConnection.on('error', (err) => {
winston.error('RabbitMQ connection failure: ${err.toString()}');
amqpConnection.close();
process.exit(1);
}); });
} }
module.exports.connect = connect; module.exports.connect = connect;
@ -137,7 +195,6 @@ module.exports.judge = async function (judge_state, problem, priority) {
case 'submit-answer': case 'submit-answer':
type = enums.ProblemType.AnswerSubmission; type = enums.ProblemType.AnswerSubmission;
param = null; param = null;
let fs = Promise.promisifyAll(require('fs-extra'));
extraData = await fs.readFileAsync(syzoj.model('file').resolvePath('answer', judge_state.code)); extraData = await fs.readFileAsync(syzoj.model('file').resolvePath('answer', judge_state.code));
break; break;
case 'interaction': case 'interaction':
@ -170,7 +227,10 @@ module.exports.judge = async function (judge_state, problem, priority) {
param: param param: param
}; };
amqpSendChannel.sendToQueue('judge', msgPack.encode({ content: content, extraData: extraData }), { priority: priority }); judgeQueue.push({
content: content,
extraData: extraData
}, priority);
} }
module.exports.getCachedJudgeState = taskId => judgeStateCache.get(taskId); module.exports.getCachedJudgeState = taskId => judgeStateCache.get(taskId);

4
modules/socketio.js

@ -203,6 +203,8 @@ function initializeSocketIO(s) {
}; };
} }
}); });
return ioInstance;
} }
exports.initializeSocketIO = initializeSocketIO; exports.initializeSocketIO = initializeSocketIO;
function createTask(taskId) { function createTask(taskId) {
@ -306,4 +308,4 @@ function cleanupProgress(taskId) {
exports.cleanupProgress = cleanupProgress; exports.cleanupProgress = cleanupProgress;
//# sourceMappingURL=socketio.js.map //# sourceMappingURL=socketio.js.map
initializeSocketIO(app.server); syzoj.socketIO = initializeSocketIO(app.server);

Loading…
Cancel
Save