Menci
6 years ago
7 changed files with 0 additions and 572 deletions
@ -1,8 +0,0 @@
|
||||
{ |
||||
"RabbitMQUrl": "amqp://localhost/", |
||||
"Listen": { |
||||
"host": "127.0.0.1", "port": 5284 |
||||
}, |
||||
"RemoteUrl": "http://127.0.0.1:5283", |
||||
"Token": "233" |
||||
} |
@ -1,8 +0,0 @@
|
||||
import {disconnect as disconnectRMQ } from './rmq'; |
||||
import winston = require('winston'); |
||||
|
||||
export function cleanUp(retCode: number) { |
||||
winston.info('Cleaning up...'); |
||||
disconnectRMQ(); |
||||
process.exit(1); |
||||
} |
@ -1,32 +0,0 @@
|
||||
import commandLineArgs = require('command-line-args'); |
||||
import fs = require('fs'); |
||||
import winston = require('winston'); |
||||
import { configureWinston } from '../winston-common'; |
||||
|
||||
export interface ConfigStructure { |
||||
rabbitMQ: string; |
||||
listen: { host: string, port: number }; |
||||
remoteUrl: string; |
||||
token: string; |
||||
} |
||||
|
||||
const optionDefinitions = [ |
||||
{ name: 'verbose', alias: 'v', type: Boolean }, |
||||
{ name: 'config', alias: 'c', type: String }, |
||||
]; |
||||
|
||||
const options = commandLineArgs(optionDefinitions); |
||||
|
||||
function readJSON(path: string): any { |
||||
return JSON.parse(fs.readFileSync(path, 'utf8')); |
||||
} |
||||
|
||||
const configJSON = readJSON(options["config"]); |
||||
export const globalConfig: ConfigStructure = { |
||||
rabbitMQ: configJSON.RabbitMQUrl, |
||||
listen: configJSON.Listen, |
||||
remoteUrl: configJSON.RemoteUrl, |
||||
token: configJSON.Token |
||||
} |
||||
|
||||
configureWinston(options.verbose); |
@ -1,47 +0,0 @@
|
||||
import express = require('express'); |
||||
import winston = require('winston'); |
||||
import urlLib = require('url'); |
||||
import rp = require('request-promise'); |
||||
|
||||
import { globalConfig as Cfg } from './config'; |
||||
import { pushTask } from './rmq'; |
||||
|
||||
const taskRouter: express.Router = express.Router(); |
||||
|
||||
interface JudgeTask { |
||||
content: any; |
||||
extraFileLocation?: string; |
||||
} |
||||
|
||||
taskRouter.use((req, res, next) => { |
||||
if (req.get('Token') !== Cfg.token) { |
||||
return res.status(403).send('Incorrect token'); |
||||
} else { |
||||
next(); |
||||
} |
||||
}); |
||||
|
||||
taskRouter.put('/task', async (req, res) => { |
||||
if (!req.body) { |
||||
return res.sendStatus(400); |
||||
} |
||||
try { |
||||
winston.info("Got task: " + JSON.stringify(req.body.content.taskId)); |
||||
const task = req.body as JudgeTask; |
||||
let extraData: Buffer = null; |
||||
if (task.extraFileLocation != null) { |
||||
winston.verbose(`Have extra data, downloading from '${task.extraFileLocation}'...`); |
||||
extraData = await rp(urlLib.resolve(Cfg.remoteUrl, task.extraFileLocation), { |
||||
encoding: null, |
||||
simple: true |
||||
}); |
||||
winston.verbose("Extra data downloaded."); |
||||
} |
||||
pushTask({ content: task.content, extraData: extraData }); |
||||
return res.status(200).send('OK'); |
||||
} catch (err) { |
||||
return res.status(500).send(err.toString()); |
||||
} |
||||
}); |
||||
|
||||
export default taskRouter; |
@ -1,72 +0,0 @@
|
||||
require('source-map-support').install(); |
||||
|
||||
import express = require('express'); |
||||
import bodyParser = require('body-parser'); |
||||
import Bluebird = require('bluebird'); |
||||
import urlLib = require('url'); |
||||
import rp = require('request-promise'); |
||||
import winston = require('winston'); |
||||
import http = require('http'); |
||||
import cors = require('cors'); |
||||
|
||||
import { globalConfig as Cfg } from './config'; |
||||
import { connect, waitForResult, waitForProgress, reportReported } from './rmq'; |
||||
import { convertResult } from '../judgeResult'; |
||||
import { ProgressReportType, OverallResult, TaskStatus, CompilationResult } from '../interfaces'; |
||||
import taskRouter from './daemonRouter'; |
||||
import { cleanupProgress, initializeSocketIO, createTask, updateCompileStatus, updateProgress, updateResult } from './socketio'; |
||||
|
||||
const app = express(); |
||||
app.use(bodyParser.json()); |
||||
// app.use(cors({ origin: true, credentials: true }));
|
||||
app.use('/daemon', taskRouter); |
||||
|
||||
(async () => { |
||||
await connect(); |
||||
await waitForResult(async (result) => { |
||||
winston.info("Reporting...", result); |
||||
|
||||
const submit = async function (url, obj) { |
||||
winston.debug(`POST ${Cfg.remoteUrl}, data = ${JSON.stringify(obj)}`); |
||||
await rp(urlLib.resolve(Cfg.remoteUrl, url), { |
||||
method: 'POST', |
||||
body: obj, |
||||
headers: { |
||||
Token: Cfg.token |
||||
}, |
||||
json: true, |
||||
simple: true |
||||
}); |
||||
} |
||||
|
||||
if (result.type === ProgressReportType.Finished) { |
||||
await submit("api/v2/judge/finished", convertResult(result.taskId, result.progress as OverallResult)); |
||||
reportReported(result.taskId); |
||||
} else if (result.type === ProgressReportType.Compiled) { |
||||
await submit("api/v2/judge/compiled", { |
||||
taskId: result.taskId, |
||||
result: result.progress |
||||
}); |
||||
} else { |
||||
winston.error("Unsupported result type: " + result.type); |
||||
} |
||||
winston.verbose("Reported."); |
||||
}); |
||||
await waitForProgress(async (result) => { |
||||
if (result.type === ProgressReportType.Started) { |
||||
createTask(result.taskId); |
||||
} else if (result.type === ProgressReportType.Compiled) { |
||||
updateCompileStatus(result.taskId, result.progress as CompilationResult); |
||||
} else if (result.type === ProgressReportType.Progress) { |
||||
updateProgress(result.taskId, result.progress as OverallResult); |
||||
} else if (result.type === ProgressReportType.Finished) { |
||||
updateResult(result.taskId, result.progress as OverallResult); |
||||
} else if (result.type === ProgressReportType.Reported) { |
||||
cleanupProgress(result.taskId); |
||||
} |
||||
}); |
||||
})().then(() => { |
||||
const server = http.createServer(app); |
||||
initializeSocketIO(server); |
||||
server.listen(Cfg.listen.port, Cfg.listen.host); |
||||
}); |
@ -1,71 +0,0 @@
|
||||
import amqp = require('amqplib'); |
||||
import { globalConfig as Cfg } from './config'; |
||||
import msgpack = require('msgpack-lite'); |
||||
import winston = require('winston'); |
||||
import util = require('util'); |
||||
import { cleanUp } from './cleanup'; |
||||
import * as rmqCommon from '../rmq-common'; |
||||
import requestErrors = require('request-promise/errors'); |
||||
import { JudgeResult, ProgressReportData, ProgressReportType } from '../interfaces'; |
||||
|
||||
let amqpConnection: amqp.Connection; |
||||
let publicChannel: amqp.Channel; |
||||
|
||||
export async function connect() { |
||||
winston.verbose(`Connecting to RabbitMQ "${Cfg.rabbitMQ}"...`); |
||||
amqpConnection = await amqp.connect(Cfg.rabbitMQ); |
||||
winston.debug(`Connected to RabbitMQ, asserting queues`); |
||||
publicChannel = await newChannel(); |
||||
await rmqCommon.assertJudgeQueue(publicChannel); |
||||
await rmqCommon.assertResultReportQueue(publicChannel); |
||||
await rmqCommon.assertProgressReportExchange(publicChannel); |
||||
amqpConnection.on('error', (err) => { |
||||
winston.error(`RabbitMQ connection failure: ${err.toString()}`); |
||||
cleanUp(2); |
||||
}); |
||||
} |
||||
|
||||
export async function disconnect() { |
||||
await amqpConnection.close(); |
||||
} |
||||
|
||||
async function newChannel(): Promise<amqp.Channel> { |
||||
return await amqpConnection.createChannel(); |
||||
} |
||||
|
||||
export function pushTask(task: any) { |
||||
winston.info("Got task, pushing to queue"); |
||||
publicChannel.sendToQueue(rmqCommon.judgeQueueName, msgpack.encode(task), { |
||||
priority: task.content.priority |
||||
}); |
||||
} |
||||
|
||||
export async function waitForResult(handle: (result: ProgressReportData) => Promise<void>) { |
||||
await rmqCommon.waitForTask(amqpConnection, rmqCommon.resultReportQueueName, null, (err) => { |
||||
if (err instanceof requestErrors.RequestError || err instanceof requestErrors.StatusCodeError || err instanceof requestErrors.TransformError) { |
||||
return true; |
||||
} else return false; |
||||
}, handle); |
||||
} |
||||
|
||||
export async function waitForProgress(handle: (result: ProgressReportData) => Promise<void>) { |
||||
const channel = await newChannel(); |
||||
const queueName = (await channel.assertQueue('', { exclusive: true })).queue; |
||||
await channel.bindQueue(queueName, rmqCommon.progressExchangeName, ''); |
||||
await channel.consume(queueName, (msg: amqp.Message) => { |
||||
const data = msgpack.decode(msg.content) as ProgressReportData; |
||||
winston.verbose(`Got result from progress exchange, id: ${data.taskId}`); |
||||
|
||||
handle(data).then(async () => { |
||||
channel.ack(msg) |
||||
}, async (err) => { |
||||
channel.nack(msg, false, false); |
||||
}); |
||||
}); |
||||
} |
||||
|
||||
export async function reportReported(taskId: string) { |
||||
winston.verbose('Reporting report finished: ' + taskId); |
||||
const payload = msgpack.encode({ type: ProgressReportType.Reported, taskId: taskId }); |
||||
publicChannel.publish(rmqCommon.progressExchangeName, '', payload); |
||||
} |
@ -1,334 +0,0 @@
|
||||
import http = require('http'); |
||||
import socketio = require('socket.io'); |
||||
import diff = require('jsondiffpatch'); |
||||
import jwt = require('jsonwebtoken'); |
||||
import winston = require('winston'); |
||||
|
||||
import { globalConfig as Cfg } from './config'; |
||||
import { convertResult } from '../judgeResult'; |
||||
import { JudgeResult, TaskStatus, CompilationResult, OverallResult } from '../interfaces'; |
||||
|
||||
interface RoughResult { |
||||
result: string; |
||||
score: number; |
||||
time: number; |
||||
memory: number; |
||||
} |
||||
|
||||
let ioInstance: SocketIO.Server; |
||||
let detailProgressNamespace: SocketIO.Namespace; |
||||
// To do: find a better name
|
||||
let roughProgressNamespace: SocketIO.Namespace; |
||||
// Provide support for NOI contests in which participants
|
||||
// can only see whether his / her submission is successfully compiled.
|
||||
let compileProgressNamespace: SocketIO.Namespace; |
||||
|
||||
const currentJudgeList: { [taskId: string]: OverallResult } = {}; |
||||
const finishedJudgeList: { [taskId: string]: RoughResult } = {}; |
||||
const compiledList = []; |
||||
|
||||
// The detail progress is pushed to client in the delta form.
|
||||
// However, the messages may arrive in an unorder form.
|
||||
// In that case, the client will re-connect the server.
|
||||
const clientDetailProgressList: { [clientId: string]: { version: number, content: OverallResult } } = {}; |
||||
const clientDisplayConfigList: { [clientId: string]: DisplayConfig } = {}; |
||||
|
||||
interface DisplayConfig { |
||||
showScore: boolean; |
||||
showUsage: boolean; |
||||
showCode: boolean; |
||||
showResult: boolean; |
||||
showDetailResult: boolean; |
||||
showTestdata: boolean; |
||||
inContest: boolean; |
||||
// hideTestcaseDetails?: boolean;
|
||||
}; |
||||
|
||||
function processOverallResult(source: OverallResult, config: DisplayConfig): OverallResult { |
||||
if (source == null) |
||||
return null; |
||||
if (source.error != null) { |
||||
return { |
||||
error: source.error, |
||||
systemMessage: source.systemMessage |
||||
}; |
||||
} |
||||
return { |
||||
compile: source.compile, |
||||
judge: config.showDetailResult ? (source.judge && { |
||||
subtasks: source.judge.subtasks && source.judge.subtasks.map(st => ({ |
||||
score: st.score, |
||||
cases: st.cases.map(cs => ({ |
||||
status: cs.status, |
||||
result: cs.result && { |
||||
type: cs.result.type, |
||||
time: config.showUsage ? cs.result.time : undefined, |
||||
memory: config.showUsage ? cs.result.memory : undefined, |
||||
scoringRate: cs.result.scoringRate, |
||||
systemMessage: cs.result.systemMessage, |
||||
input: config.showTestdata ? cs.result.input : undefined, |
||||
output: config.showTestdata ? cs.result.output : undefined, |
||||
userOutput: config.showTestdata ? cs.result.userOutput : undefined, |
||||
userError: config.showTestdata ? cs.result.userError : undefined, |
||||
spjMessage: config.showTestdata ? cs.result.spjMessage : undefined, |
||||
} |
||||
})) |
||||
})) |
||||
}) : null |
||||
} |
||||
} |
||||
|
||||
function getCompileStatus(status: string): string { |
||||
if (["System Error", "Compile Error", "No Testdata"].includes(status)) { |
||||
return status; |
||||
} else { |
||||
return "Submitted"; |
||||
} |
||||
} |
||||
|
||||
function processRoughResult(source: RoughResult, config: DisplayConfig): RoughResult { |
||||
const result = config.showResult ? |
||||
source.result : |
||||
getCompileStatus(source.result); |
||||
return { |
||||
result: result, |
||||
time: config.showUsage ? source.time : null, |
||||
memory: config.showUsage ? source.memory : null, |
||||
score: config.showScore ? source.score : null |
||||
}; |
||||
} |
||||
|
||||
function forAllClients(ns: SocketIO.Namespace, taskId: string, exec: (socketId: string) => void): void { |
||||
ns.in(taskId.toString()).clients((err, clients) => { |
||||
if (!err) { |
||||
clients.forEach(client => { |
||||
exec(client); |
||||
}); |
||||
} else { |
||||
winston.warn(`Error while listing socketio clients in ${taskId}`, err); |
||||
} |
||||
}); |
||||
} |
||||
|
||||
|
||||
export function initializeSocketIO(s: http.Server) { |
||||
ioInstance = socketio(s); |
||||
|
||||
const initializeNamespace = (name, exec: (token: any, socket: SocketIO.Socket) => Promise<any>) => { |
||||
const newNamespace = ioInstance.of('/' + name); |
||||
newNamespace.on('connection', (socket) => { |
||||
socket.on('disconnect', () => { |
||||
winston.info(`Client ${socket.id} disconnected.`); |
||||
delete clientDisplayConfigList[socket.id]; |
||||
if (clientDetailProgressList[socket.id]) { |
||||
delete clientDetailProgressList[socket.id]; |
||||
} |
||||
}); |
||||
socket.on('join', (reqJwt, cb) => { |
||||
winston.info(`Client ${socket.id} connected.`); |
||||
let req; |
||||
try { |
||||
req = jwt.verify(reqJwt, Cfg.token); |
||||
if (req.type !== name) { |
||||
throw new Error("Request type in token mismatch."); |
||||
} |
||||
clientDisplayConfigList[socket.id] = req.displayConfig; |
||||
const taskId = req.taskId; |
||||
winston.verbose(`A client trying to join ${name} namespace for ${taskId}.`); |
||||
socket.join(taskId.toString()); |
||||
exec(req, socket).then(x => cb(x), err => cb({ ok: false, message: err.toString() })); |
||||
} catch (err) { |
||||
winston.info('Error while joining.'); |
||||
cb({ |
||||
ok: false, |
||||
message: err.toString() |
||||
}); |
||||
return; |
||||
} |
||||
}); |
||||
}); |
||||
return newNamespace; |
||||
}; |
||||
|
||||
detailProgressNamespace = initializeNamespace('detail', async (req, socket) => { |
||||
const taskId = req.taskId; |
||||
if (finishedJudgeList[taskId]) { |
||||
winston.debug(`Judge task #${taskId} has been finished, ${JSON.stringify(currentJudgeList[taskId])}`); |
||||
return { |
||||
ok: true, |
||||
running: false, |
||||
finished: true, |
||||
result: processOverallResult(currentJudgeList[taskId], clientDisplayConfigList[socket.id]), |
||||
roughResult: processRoughResult(finishedJudgeList[taskId], clientDisplayConfigList[socket.id]) |
||||
}; |
||||
} else { |
||||
winston.debug(`Judge task #${taskId} has not been finished`); |
||||
// If running
|
||||
if (currentJudgeList[taskId]) { |
||||
clientDetailProgressList[socket.id] = { |
||||
version: 0, |
||||
content: processOverallResult(currentJudgeList[taskId], clientDisplayConfigList[socket.id]) |
||||
}; |
||||
return { |
||||
ok: true, |
||||
finished: false, |
||||
running: true, |
||||
current: clientDetailProgressList[socket.id] |
||||
}; |
||||
} else { |
||||
// If not running yet, the creation of clientDetailProgressList
|
||||
// will be done in the starting procedure (createTask function).
|
||||
return { |
||||
ok: true, |
||||
finished: false, |
||||
running: false |
||||
}; |
||||
} |
||||
} |
||||
}); |
||||
|
||||
roughProgressNamespace = initializeNamespace('rough', async (req, socket) => { |
||||
const taskId = req.taskId; |
||||
if (finishedJudgeList[taskId]) { |
||||
return { |
||||
ok: true, |
||||
running: false, |
||||
finished: true, |
||||
result: processRoughResult(finishedJudgeList[taskId], clientDisplayConfigList[socket.id]) |
||||
}; |
||||
} |
||||
else if (currentJudgeList[taskId]) { |
||||
return { |
||||
ok: true, |
||||
running: true, |
||||
finished: false |
||||
}; |
||||
} else { |
||||
return { |
||||
ok: true, |
||||
running: false, |
||||
finished: false |
||||
}; |
||||
} |
||||
}); |
||||
|
||||
compileProgressNamespace = initializeNamespace('compile', async (req, socket) => { |
||||
const taskId = req.taskId; |
||||
if (compiledList[taskId]) { |
||||
return { |
||||
ok: true, |
||||
running: false, |
||||
finished: true, |
||||
result: compiledList[taskId] |
||||
}; |
||||
} else if (currentJudgeList[taskId]) { |
||||
return { |
||||
ok: true, |
||||
running: true, |
||||
finished: false |
||||
}; |
||||
} else { |
||||
return { |
||||
ok: true, |
||||
running: false, |
||||
finished: false |
||||
}; |
||||
} |
||||
}); |
||||
} |
||||
|
||||
export function createTask(taskId: string) { |
||||
winston.debug(`Judge task #${taskId} has started`); |
||||
|
||||
currentJudgeList[taskId] = {}; |
||||
finishedJudgeList[taskId] = null; |
||||
forAllClients(detailProgressNamespace, taskId, (clientId) => { |
||||
clientDetailProgressList[clientId] = { |
||||
version: 0, |
||||
content: {} |
||||
}; |
||||
}); |
||||
|
||||
roughProgressNamespace.to(taskId.toString()).emit("start", { taskId: taskId }); |
||||
detailProgressNamespace.to(taskId.toString()).emit("start", { taskId: taskId }); |
||||
compileProgressNamespace.to(taskId.toString()).emit("start", { taskId: taskId }); |
||||
} |
||||
|
||||
export function updateCompileStatus(taskId: string, result: CompilationResult) { |
||||
winston.debug(`Updating compilation status for #${taskId}`); |
||||
|
||||
compiledList[taskId] = { result: result.status === TaskStatus.Done ? 'Submitted' : 'Compile Error' }; |
||||
compileProgressNamespace.to(taskId.toString()).emit('finish', { |
||||
taskId: taskId, |
||||
result: compiledList[taskId] |
||||
}); |
||||
} |
||||
|
||||
export function updateProgress(taskId: string, data: OverallResult) { |
||||
winston.verbose(`Updating progress for #${taskId}`); |
||||
|
||||
currentJudgeList[taskId] = data; |
||||
forAllClients(detailProgressNamespace, taskId, (client) => { |
||||
winston.debug(`Pushing progress update to ${client}`) |
||||
if (clientDetailProgressList[client] && clientDisplayConfigList[client]) { // avoid race condition
|
||||
const original = clientDetailProgressList[client].content; |
||||
const updated = processOverallResult(currentJudgeList[taskId], clientDisplayConfigList[client]); |
||||
const version = clientDetailProgressList[client].version; |
||||
detailProgressNamespace.sockets[client].emit('update', { |
||||
taskId: taskId, |
||||
from: version, |
||||
to: version + 1, |
||||
delta: diff.diff(original, updated) |
||||
}) |
||||
clientDetailProgressList[client].version++; |
||||
} |
||||
}); |
||||
} |
||||
|
||||
export function updateResult(taskId: string, data: OverallResult) { |
||||
currentJudgeList[taskId] = data; |
||||
|
||||
if (compiledList[taskId] == null) { |
||||
if (data.error != null) { |
||||
compiledList[taskId] = { result: "System Error" }; |
||||
compileProgressNamespace.to(taskId.toString()).emit('finish', { |
||||
taskId: taskId, |
||||
result: compiledList[taskId] |
||||
}); |
||||
} |
||||
} |
||||
|
||||
const finalResult = convertResult(taskId, data); |
||||
const roughResult = { |
||||
result: finalResult.statusString, |
||||
time: finalResult.time, |
||||
memory: finalResult.memory, |
||||
score: finalResult.score |
||||
}; |
||||
finishedJudgeList[taskId] = roughResult; |
||||
|
||||
forAllClients(roughProgressNamespace, taskId, (client) => { |
||||
winston.debug(`Pushing rough result to ${client}`) |
||||
roughProgressNamespace.sockets[client].emit('finish', { |
||||
taskId: taskId, |
||||
result: processRoughResult(finishedJudgeList[taskId], clientDisplayConfigList[client]) |
||||
}); |
||||
}); |
||||
|
||||
forAllClients(detailProgressNamespace, taskId, (client) => { |
||||
if (clientDisplayConfigList[client]) { // avoid race condition
|
||||
winston.debug(`Pushing detail result to ${client}`) |
||||
detailProgressNamespace.sockets[client].emit('finish', { |
||||
taskId: taskId, |
||||
result: processOverallResult(currentJudgeList[taskId], clientDisplayConfigList[client]), |
||||
roughResult: processRoughResult(finishedJudgeList[taskId], clientDisplayConfigList[client]) |
||||
}); |
||||
delete clientDetailProgressList[client]; |
||||
} |
||||
}); |
||||
} |
||||
|
||||
export function cleanupProgress(taskId: string) { |
||||
// Prevent race condition
|
||||
setTimeout(() => { delete currentJudgeList[taskId]; }, 10000); |
||||
} |
Loading…
Reference in new issue