diff --git a/src/runner/rmq.ts b/src/runner/rmq.ts index a04028a..9052353 100644 --- a/src/runner/rmq.ts +++ b/src/runner/rmq.ts @@ -40,17 +40,29 @@ export async function waitForTask(handle: (task: RPCRequest) => Promise) { } response({ type: RPCReplyType.Started }); - try { - const request = msgpack.decode(msg.content) as RPCRequest; - const result = await handle(request); - response({ type: RPCReplyType.Finished, result: result }); - } catch (err) { - winston.warn(`Failed to run task ${msg.properties.correlationId}: ${err.toString()}, ${err.stack}`); - response({ type: RPCReplyType.Error, error: err.toString() }); + while (true) { + try { + const request = msgpack.decode(msg.content) as RPCRequest; + const result = await handle(request); + response({ type: RPCReplyType.Finished, result: result }); + break; + } catch (err) { + let errorMessage = `Failed to run task ${msg.properties.correlationId}: ${err.toString()}, ${err.stack}`; + winston.warn(errorMessage); + + // Only retry on 'Error: The child process has exited unexpectedly.' + if (errorMessage.indexOf('Error: The child process has exited unexpectedly.') !== -1) { + winston.warn('Retrying.'); + continue; + } + + response({ type: RPCReplyType.Error, error: err.toString() }); + break; + } } channel.ack(msg); }, { priority: Cfg.priority }); -} \ No newline at end of file +}