Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/model/tasks.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ const TaskSchema = new Schema({
status: {
type: String,
required: true,
enum: ['Queued', 'Processing', 'Paused', 'Cancelled', 'Completed'],
enum: ['Queued', 'Processing', 'Paused', 'Cancelled', 'Completed', "Pending Async"],
default: 'Queued',
index: true
},
Expand Down
48 changes: 45 additions & 3 deletions src/tasks.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,19 @@ export async function findAndProcessAQueuedTask() {
{status: 'Processing'},
{new: true}
)

if (task != null) {
activeTasks++
await processNextTaskRound(task)
activeTasks--
}

const asyncTasks = await TaskModel.find({status: 'Pending Async'});

asyncTasks.forEach(async task => {
await checkAsyncTaskStatus(task);
})

} catch (err) {
if (task == null) {
logger.error(`An error occurred while looking for rerun tasks: ${err}`)
Expand All @@ -45,6 +53,34 @@ export async function findAndProcessAQueuedTask() {
}
}

async function checkAsyncTaskStatus(task) {
const pendingAsyncTransactions = task.transactions.filter(transaction => transaction.rerunStatus === 'Pending Async');

let remainingAsyncTransactions = pendingAsyncTransactions.length;

pendingAsyncTransactions.forEach(async transaction => {
const currentTransactionStatus = await TransactionModel.findById(transaction.rerunID);

if (["Successful", "Completed with error(s)", "Failed"].includes(currentTransactionStatus.status)) {
transaction.tstatus = 'Completed';
transaction.rerunStatus = currentTransactionStatus.status;
await task.save();
remainingAsyncTransactions--;
}
});


if (remainingAsyncTransactions === 0){
task.status = 'Completed';
task.completedDate = new Date();
await task.save()
logger.info(`Async task ${task._id} completed`);
}

return;

}

function rerunTaskProcessor() {
if (live) {
findAndProcessAQueuedTask()
Expand Down Expand Up @@ -139,6 +175,8 @@ async function processNextTaskRound(task) {
return
}

let taskHasAsyncTransactions = false

const promises = transactions.map(transaction => {
task.remainingTransactions--

Expand All @@ -158,7 +196,11 @@ async function processNextTaskRound(task) {
logger.error(
`An error occurred while rerunning transaction ${transaction.tid} for task ${task._id}: ${err}`
)
} else {
}else if(response.status === 202){
transaction.tstatus = 'Processing'
taskHasAsyncTransactions = true
}
else {
transaction.tstatus = 'Completed'
}
return resolve()
Expand All @@ -177,8 +219,8 @@ async function processNextTaskRound(task) {
if (task.remainingTransactions) {
await processNextTaskRound(task)
} else {
task.status = 'Completed'
task.completedDate = new Date()
task.status = taskHasAsyncTransactions ? 'Pending Async' : 'Completed'
task.completedDate = taskHasAsyncTransactions ? null : new Date()
logger.info(`Round completed for rerun task #${task._id} - Task completed`)

await task.save().catch(err => {
Expand Down