const enums = require('./enums'); const util = require('util'); const winston = require('winston'); const msgPack = require('msgpack-lite'); const fs = require('fs-extra'); const interface = require('./judger_interfaces'); const judgeResult = require('./judgeResult'); let Problem = syzoj.model('problem'); const judgeStateCache = new Map(); const progressPusher = require('../modules/socketio'); function getRunningTaskStatusString(result) { let isPending = status => [0, 1].includes(status); let allFinished = 0, allTotal = 0; for (let subtask of result.judge.subtasks) { for (let curr of subtask.cases) { allTotal++; if (!isPending(curr.status)) allFinished++; } } return `Running ${allFinished}/${allTotal}`; } let judgeQueue; async function connect() { const JudgeState = syzoj.model('judge_state'); const blockableRedisClient = syzoj.redis.duplicate(); judgeQueue = { redisZADD: util.promisify(syzoj.redis.zadd).bind(syzoj.redis), redisBZPOPMAX: util.promisify(blockableRedisClient.bzpopmax).bind(blockableRedisClient), async push(data, priority) { return await this.redisZADD('judge', priority, JSON.stringify(data)); }, async poll(timeout) { const result = await this.redisBZPOPMAX('judge', timeout); if (!result) return null; return { data: JSON.parse(result[1]), priority: result[2] }; } }; const judgeNamespace = syzoj.socketIO.of('judge'); judgeNamespace.on('connect', socket => { winston.info(`Judge client ${socket.id} connected.`); let pendingAckTaskObj = null, waitingForTask = false; socket.on('waitForTask', async (token, ack) => { // Ignore requests with invalid token. if (token != syzoj.config.judge_token) { winston.warn(`Judge client ${socket.id} emitted waitForTask with invalid token.`); return; } ack(); if (waitingForTask) { winston.warn(`Judge client ${socket.id} emitted waitForTask, but already waiting, ignoring.`); return; } waitingForTask = true; winston.warn(`Judge client ${socket.id} emitted waitForTask.`); // Poll the judge queue, timeout = 10s. let obj; while (socket.connected && !obj) { obj = await judgeQueue.poll(10); } if (!obj) { winston.warn(`Judge client ${socket.id} disconnected, stop poll the queue.`); // Socket disconnected and no task got. return; } winston.warn(`Judge task ${obj.data.content.taskId} poped from queue.`); // Re-push to queue if got task but judge client already disconnected. if (socket.disconnected) { winston.warn(`Judge client ${socket.id} got task but disconnected re-pushing task ${obj.data.content.taskId} to queue.`); judgeQueue.push(obj.data, obj.priority); return; } // Send task to judge client, and wait for ack. const task = obj.data; pendingAckTaskObj = obj; winston.warn(`Sending task ${task.content.taskId} to judge client ${socket.id}.`); socket.emit('onTask', msgPack.encode(task), () => { // Acked. winston.warn(`Judge client ${socket.id} acked task ${task.content.taskId}.`); pendingAckTaskObj = null; waitingForTask = false; }); }); socket.on('disconnect', reason => { winston.warn(`Judge client ${socket.id} disconnected, reason = ${util.inspect(reason)}.`); if (pendingAckTaskObj) { // A task sent but not acked, push to queue again. winston.warn(`Re-pushing task ${pendingAckTaskObj.data.content.taskId} to judge queue.`); judgeQueue.push(pendingAckTaskObj.data, pendingAckTaskObj.priority); pendingAckTaskObj = null; } }); socket.on('reportProgress', async (token, payload) => { // Ignore requests with invalid token. if (token !== syzoj.config.judge_token) { winston.warn(`Judge client ${socket.id} emitted reportProgress with invalid token.`); return; } const progress = msgPack.decode(payload); winston.verbose(`Got progress from progress exchange, id: ${progress.taskId}`); if (progress.type === interface.ProgressReportType.Started) { progressPusher.createTask(progress.taskId); judgeStateCache.set(progress.taskId, { result: 'Compiling', score: 0, time: 0, memory: 0 }); } else if (progress.type === interface.ProgressReportType.Compiled) { progressPusher.updateCompileStatus(progress.taskId, progress.progress); } else if (progress.type === interface.ProgressReportType.Progress) { const convertedResult = judgeResult.convertResult(progress.taskId, progress.progress); judgeStateCache.set(progress.taskId, { result: getRunningTaskStatusString(progress.progress), score: convertedResult.score, time: convertedResult.time, memory: convertedResult.memory }); progressPusher.updateProgress(progress.taskId, progress.progress); } else if (progress.type === interface.ProgressReportType.Finished) { progressPusher.updateResult(progress.taskId, progress.progress); setTimeout(() => { judgeStateCache.delete(progress.taskId); }, 5000); } else if (progress.type === interface.ProgressReportType.Reported) { progressPusher.cleanupProgress(progress.taskId); } }); socket.on('reportResult', async (token, payload) => { // Ignore requests with invalid token. if (token !== syzoj.config.judge_token) { winston.warn(`Judge client ${socket.id} emitted reportResult with invalid token.`); return; } const result = msgPack.decode(payload); winston.verbose('Received report for task ' + result.taskId); const judge_state = await JudgeState.findOne({ where: { task_id: result.taskId } }); if (result.type === interface.ProgressReportType.Finished) { const convertedResult = judgeResult.convertResult(result.taskId, result.progress); winston.verbose('Reporting report finished: ' + result.taskId); progressPusher.cleanupProgress(result.taskId); if (!judge_state) return; const problemId = judge_state.problem_id; let problemQuery = await Problem.createQueryBuilder(); const problemInfo = await problemQuery.where("id = :id", {id: problemId}).getOne(); const problem_time_limit = problemInfo.time_limit; const subtasksArray = convertedResult.result.judge ? convertedResult.result.judge.subtasks : []; const timeArray = subtasksArray.map((subtasksItem) => { return subtasksItem.cases.map((subtasksItemCases) => { return subtasksItemCases.result.time }) }); function flat(array, result = []) { for (let i = 0; i < array.length; i++) { if (Array.isArray(array[i])) { flat(array[i], result); } else { result.push(array[i]); } } return result; } const flatTimeArray = flat(timeArray); const maxItemTime = Math.max.apply(this, flatTimeArray); let baseScore = 0; if (judge_state.type === 0 || judge_state.type === 1) { baseScore = 100; } if (judge_state.type === 2) { baseScore = syzoj.config.practice_rating } if (convertedResult.statusString === 'Accepted' && maxItemTime < problem_time_limit * 0.9) { const beyondTime = problem_time_limit - maxItemTime; const extraScore = (beyondTime / problem_time_limit).toString().slice(0, 3) * baseScore; if (judge_state.type === 2) { judge_state.score = convertedResult.score + extraScore + syzoj.config.practice_rating; } judge_state.score = convertedResult.score + extraScore; } else { judge_state.score = convertedResult.score; } judge_state.pending = false; judge_state.status = convertedResult.statusString; judge_state.total_time = convertedResult.time; judge_state.max_memory = convertedResult.memory; judge_state.result = convertedResult.result; await judge_state.save(); await judge_state.updateRelatedInfo(false); } else if (result.type === interface.ProgressReportType.Compiled) { if (!judge_state) return; judge_state.compilation = result.progress; await judge_state.save(); } else { winston.error('Unsupported result type: ' + result.type); } }); }); } module.exports.connect = connect; module.exports.judge = async function (judge_state, problem, priority) { let type, param, extraData = null; switch (problem.type) { case 'submit-answer': type = enums.ProblemType.AnswerSubmission; param = null; extraData = await fs.readFile(syzoj.model('file').resolvePath('answer', judge_state.code)); break; case 'interaction': type = enums.ProblemType.Interaction; param = { language: judge_state.language, code: judge_state.code, timeLimit: problem.time_limit, memoryLimit: problem.memory_limit, } break; default: type = enums.ProblemType.Standard; param = { language: judge_state.language, code: judge_state.code, timeLimit: problem.time_limit, memoryLimit: problem.memory_limit, fileIOInput: problem.file_io ? problem.file_io_input_name : null, fileIOOutput: problem.file_io ? problem.file_io_output_name : null }; break; } const content = { taskId: judge_state.task_id, testData: problem.id.toString(), type: type, priority: priority, param: param }; judgeQueue.push({ content: content, extraData: extraData }, priority); winston.warn(`Judge task ${content.taskId} enqueued.`); } module.exports.getCachedJudgeState = taskId => judgeStateCache.get(taskId);