|
|
|
const enums = require('./enums'),
|
|
|
|
rp = require('request-promise'),
|
|
|
|
url = require('url');
|
|
|
|
|
|
|
|
const amqp = require('amqplib');
|
|
|
|
const util = require('util');
|
|
|
|
const winston = require('winston');
|
|
|
|
const msgPack = require('msgpack-lite');
|
|
|
|
const interface = require('./judger_interfaces');
|
|
|
|
const judgeResult = require('./judgeResult');
|
|
|
|
|
|
|
|
let amqpConnection;
|
|
|
|
let amqpSendChannel;
|
|
|
|
let amqpConsumeChannel;
|
|
|
|
|
|
|
|
const judgeStateCache = new Map();
|
|
|
|
|
|
|
|
function getRunningTaskStatusString(result) {
|
|
|
|
let isPending = status => [0, 1].includes(status);
|
|
|
|
let allFinished = 0, allTotal = 0;
|
|
|
|
for (let subtask of result.judge.subtasks) {
|
|
|
|
for (let curr of subtask.cases) {
|
|
|
|
allTotal++;
|
|
|
|
if (!isPending(curr.status)) allFinished++;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return `Running ${allFinished}/${allTotal}`;
|
|
|
|
}
|
|
|
|
|
|
|
|
async function connect () {
|
|
|
|
amqpConnection = await amqp.connect(syzoj.config.rabbitMQ);
|
|
|
|
amqpSendChannel = await amqpConnection.createChannel();
|
|
|
|
await amqpSendChannel.assertQueue('judge', {
|
|
|
|
maxPriority: 5,
|
|
|
|
durable: true
|
|
|
|
});
|
|
|
|
await amqpSendChannel.assertQueue('result', {
|
|
|
|
durable: true
|
|
|
|
});
|
|
|
|
await amqpSendChannel.assertExchange('progress', 'fanout', {
|
|
|
|
durable: false
|
|
|
|
});
|
|
|
|
amqpConsumeChannel = await amqpConnection.createChannel();
|
|
|
|
amqpConsumeChannel.prefetch(1);
|
|
|
|
amqpConsumeChannel.consume('result', async (msg) => {
|
|
|
|
(async(msg) => {
|
|
|
|
const data = msgPack.decode(msg.content);
|
|
|
|
winston.verbose('Received report for task ' + data.taskId);
|
|
|
|
let JudgeState = syzoj.model('judge_state');
|
|
|
|
let judge_state = await JudgeState.findOne({ where: { task_id: data.taskId } });
|
|
|
|
if(data.type === interface.ProgressReportType.Finished) {
|
|
|
|
const convertedResult = judgeResult.convertResult(data.taskId, data.progress);
|
|
|
|
winston.verbose('Reporting report finished: ' + data.taskId);
|
|
|
|
const payload = msgPack.encode({ type: interface.ProgressReportType.Reported, taskId: data.taskId });
|
|
|
|
amqpSendChannel.publish('progress', '', payload);
|
|
|
|
if(!judge_state) return;
|
|
|
|
judge_state.score = convertedResult.score;
|
|
|
|
judge_state.pending = false;
|
|
|
|
judge_state.status = convertedResult.statusString;
|
|
|
|
judge_state.total_time = convertedResult.time;
|
|
|
|
judge_state.max_memory = convertedResult.memory;
|
|
|
|
judge_state.result = convertedResult.result;
|
|
|
|
await judge_state.save();
|
|
|
|
await judge_state.updateRelatedInfo();
|
|
|
|
} else if(data.type == interface.ProgressReportType.Compiled) {
|
|
|
|
if(!judge_state) return;
|
|
|
|
judge_state.compilation = data.progress;
|
|
|
|
await judge_state.save();
|
|
|
|
} else {
|
|
|
|
winston.error("Unsupported result type: " + data.type);
|
|
|
|
}
|
|
|
|
|
|
|
|
})(msg).then(async() => {
|
|
|
|
amqpConsumeChannel.ack(msg)
|
|
|
|
}, async(err) => {
|
|
|
|
winston.error('Error handling report', err);
|
|
|
|
amqpConsumeChannel.nack(msg, false, false);
|
|
|
|
});
|
|
|
|
});
|
|
|
|
socketio = require('../modules/socketio');
|
|
|
|
const progressChannel = await amqpConnection.createChannel();
|
|
|
|
const queueName = (await progressChannel.assertQueue('', { exclusive: true })).queue;
|
|
|
|
await progressChannel.bindQueue(queueName, 'progress', '');
|
|
|
|
await progressChannel.consume(queueName, (msg) => {
|
|
|
|
const data = msgPack.decode(msg.content);
|
|
|
|
winston.verbose(`Got result from progress exchange, id: ${data.taskId}`);
|
|
|
|
|
|
|
|
(async (result) => {
|
|
|
|
if (result.type === interface.ProgressReportType.Started) {
|
|
|
|
socketio.createTask(result.taskId);
|
|
|
|
judgeStateCache.set(data.taskId, {
|
|
|
|
result: 'Compiling',
|
|
|
|
score: 0,
|
|
|
|
time: 0,
|
|
|
|
memory: 0
|
|
|
|
})
|
|
|
|
} else if (result.type === interface.ProgressReportType.Compiled) {
|
|
|
|
socketio.updateCompileStatus(result.taskId, result.progress);
|
|
|
|
} else if (result.type === interface.ProgressReportType.Progress) {
|
|
|
|
const convertedResult = judgeResult.convertResult(data.taskId, data.progress);
|
|
|
|
judgeStateCache.set(data.taskId, {
|
|
|
|
result: getRunningTaskStatusString(data.progress),
|
|
|
|
score: convertedResult.score,
|
|
|
|
time: convertedResult.time,
|
|
|
|
memory: convertedResult.memory
|
|
|
|
});
|
|
|
|
socketio.updateProgress(result.taskId, result.progress);
|
|
|
|
} else if (result.type === interface.ProgressReportType.Finished) {
|
|
|
|
socketio.updateResult(result.taskId, result.progress);
|
|
|
|
setTimeout(() => {
|
|
|
|
judgeStateCache.delete(result.taskId);
|
|
|
|
}, 5000);
|
|
|
|
} else if (result.type === interface.ProgressReportType.Reported) {
|
|
|
|
socketio.cleanupProgress(result.taskId);
|
|
|
|
}
|
|
|
|
})(data).then(async() => {
|
|
|
|
progressChannel.ack(msg)
|
|
|
|
}, async(err) => {
|
|
|
|
console.log(err);
|
|
|
|
winston.error('Error handling progress', err);
|
|
|
|
progressChannel.nack(msg, false, false);
|
|
|
|
});
|
|
|
|
});
|
|
|
|
winston.debug('Created progress exchange queue', queueName);
|
|
|
|
amqpConnection.on('error', (err) => {
|
|
|
|
winston.error('RabbitMQ connection failure: ${err.toString()}');
|
|
|
|
amqpConnection.close();
|
|
|
|
process.exit(1);
|
|
|
|
});
|
|
|
|
}
|
|
|
|
module.exports.connect = connect;
|
|
|
|
|
|
|
|
module.exports.judge = async function (judge_state, problem, priority) {
|
|
|
|
let type, param, extraData = null;
|
|
|
|
switch (problem.type) {
|
|
|
|
case 'submit-answer':
|
|
|
|
type = enums.ProblemType.AnswerSubmission;
|
|
|
|
param = null;
|
|
|
|
let fs = Promise.promisifyAll(require('fs-extra'));
|
|
|
|
extraData = await fs.readFileAsync(syzoj.model('file').resolvePath('answer', judge_state.code));
|
|
|
|
break;
|
|
|
|
case 'interaction':
|
|
|
|
type = enums.ProblemType.Interaction;
|
|
|
|
param = {
|
|
|
|
language: judge_state.language,
|
|
|
|
code: judge_state.code,
|
|
|
|
timeLimit: problem.time_limit,
|
|
|
|
memoryLimit: problem.memory_limit,
|
|
|
|
}
|
|
|
|
break;
|
|
|
|
default:
|
|
|
|
type = enums.ProblemType.Standard;
|
|
|
|
param = {
|
|
|
|
language: judge_state.language,
|
|
|
|
code: judge_state.code,
|
|
|
|
timeLimit: problem.time_limit,
|
|
|
|
memoryLimit: problem.memory_limit,
|
|
|
|
fileIOInput: problem.file_io ? problem.file_io_input_name : null,
|
|
|
|
fileIOOutput: problem.file_io ? problem.file_io_output_name : null
|
|
|
|
};
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
const content = {
|
|
|
|
taskId: judge_state.task_id,
|
|
|
|
testData: problem.id.toString(),
|
|
|
|
type: type,
|
|
|
|
priority: priority,
|
|
|
|
param: param
|
|
|
|
};
|
|
|
|
|
|
|
|
amqpSendChannel.sendToQueue('judge', msgPack.encode({ content: content, extraData: extraData }), { priority: priority });
|
|
|
|
}
|
|
|
|
|
|
|
|
module.exports.getCachedJudgeState = taskId => judgeStateCache.get(taskId);
|