Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/model/tasks.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ const TaskSchema = new Schema({
status: {
type: String,
required: true,
enum: ['Queued', 'Processing', 'Paused', 'Cancelled', 'Completed'],
enum: ['Queued', 'Processing', 'Paused', 'Cancelled', 'Completed', "Pending Async"],
default: 'Queued',
index: true
},
Expand Down
65 changes: 62 additions & 3 deletions src/tasks.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,19 @@ export async function findAndProcessAQueuedTask() {
{status: 'Processing'},
{new: true}
)

if (task != null) {
activeTasks++
await processNextTaskRound(task)
activeTasks--
}

const asyncTasks = await TaskModel.find({status: 'Pending Async'});

asyncTasks.forEach(async task => {
await checkAsyncTaskStatus(task);
})

} catch (err) {
if (task == null) {
logger.error(`An error occurred while looking for rerun tasks: ${err}`)
Expand All @@ -45,6 +53,51 @@ export async function findAndProcessAQueuedTask() {
}
}

async function checkAsyncTaskStatus(task) {
const pendingAsyncTransactions = task.transactions.reduce((acc, transaction) => {
if (transaction.rerunStatus === 'Pending Async') {
return [...acc, transaction];
}
return acc;
},[]);

let remainingAsyncTransactions = pendingAsyncTransactions.length;

pendingAsyncTransactions.forEach(async transaction => {
const currentTransactionStatus = await TransactionModel.findById(transaction.rerunID);

if (currentTransactionStatus.status === 'Successful') {
transaction.tstatus = 'Completed';
transaction.rerunStatus = 'Successful';
await task.save();
remainingAsyncTransactions--;
}
else if (currentTransactionStatus.status === 'Completed with error(s)') {
transaction.tstatus = 'Completed';
transaction.rerunStatus = 'Completed with error(s)';
await task.save();
remainingAsyncTransactions--;
}
else if (currentTransactionStatus.status === 'Failed') {
transaction.tstatus = 'Completed';
transaction.rerunStatus = 'Failed';
await task.save();
remainingAsyncTransactions--;
}
});


if (pendingAsyncTransactions.length === 0){
task.status = 'Completed';
task.completedDate = new Date();
await task.save()
logger.info(`Async task ${task._id} completed`);
}

return;

}

function rerunTaskProcessor() {
if (live) {
findAndProcessAQueuedTask()
Expand Down Expand Up @@ -139,6 +192,8 @@ async function processNextTaskRound(task) {
return
}

let taskHasAsyncTransactions = false

const promises = transactions.map(transaction => {
task.remainingTransactions--

Expand All @@ -158,7 +213,11 @@ async function processNextTaskRound(task) {
logger.error(
`An error occurred while rerunning transaction ${transaction.tid} for task ${task._id}: ${err}`
)
} else {
}else if(response.status === 202){
transaction.tstatus = 'Processing'
taskHasAsyncTransactions = true
}
else {
transaction.tstatus = 'Completed'
}
return resolve()
Expand All @@ -177,8 +236,8 @@ async function processNextTaskRound(task) {
if (task.remainingTransactions) {
await processNextTaskRound(task)
} else {
task.status = 'Completed'
task.completedDate = new Date()
task.status = taskHasAsyncTransactions ? 'Pending Async' : 'Completed'
task.completedDate = taskHasAsyncTransactions ? null : new Date()
logger.info(`Round completed for rerun task #${task._id} - Task completed`)

await task.save().catch(err => {
Expand Down