2022-05-06 11:41:39 +02:00

184 lines
4.7 KiB
TypeScript

import * as Bullmq from 'bullmq';
import { default as ProdBullmq, QueueScheduler } from 'bullmq';
import { dev } from '$app/env';
import { prisma } from '$lib/database';
import builder from './builder';
import logger from './logger';
import cleanup from './cleanup';
import proxy from './proxy';
import proxyTcpHttp from './proxyTcpHttp';
import ssl from './ssl';
import sslrenewal from './sslrenewal';
import autoUpdater from './autoUpdater';
import { asyncExecShell, saveBuildLog } from '$lib/common';
let { Queue, Worker } = Bullmq;
let redisHost = 'localhost';
if (!dev) {
Queue = ProdBullmq.Queue;
Worker = ProdBullmq.Worker;
redisHost = 'coolify-redis';
}
const connectionOptions = {
connection: {
host: redisHost
}
};
const cron = async (): Promise<void> => {
new QueueScheduler('proxy', connectionOptions);
new QueueScheduler('proxyTcpHttp', connectionOptions);
new QueueScheduler('cleanup', connectionOptions);
new QueueScheduler('ssl', connectionOptions);
new QueueScheduler('sslRenew', connectionOptions);
new QueueScheduler('autoUpdater', connectionOptions);
const queue = {
proxy: new Queue('proxy', { ...connectionOptions }),
proxyTcpHttp: new Queue('proxyTcpHttp', { ...connectionOptions }),
cleanup: new Queue('cleanup', { ...connectionOptions }),
ssl: new Queue('ssl', { ...connectionOptions }),
sslRenew: new Queue('sslRenew', { ...connectionOptions }),
autoUpdater: new Queue('autoUpdater', { ...connectionOptions })
};
await queue.proxy.drain();
await queue.proxyTcpHttp.drain();
await queue.cleanup.drain();
await queue.ssl.drain();
await queue.sslRenew.drain();
await queue.autoUpdater.drain();
new Worker(
'proxy',
async () => {
await proxy();
},
{
...connectionOptions
}
);
new Worker(
'proxyTcpHttp',
async () => {
await proxyTcpHttp();
},
{
...connectionOptions
}
);
new Worker(
'ssl',
async () => {
await ssl();
},
{
...connectionOptions
}
);
new Worker(
'cleanup',
async () => {
await cleanup();
},
{
...connectionOptions
}
);
new Worker(
'sslRenew',
async () => {
await sslrenewal();
},
{
...connectionOptions
}
);
new Worker(
'autoUpdater',
async () => {
await autoUpdater();
},
{
...connectionOptions
}
);
await queue.proxy.add('proxy', {}, { repeat: { every: 10000 } });
await queue.proxyTcpHttp.add('proxyTcpHttp', {}, { repeat: { every: 10000 } });
await queue.ssl.add('ssl', {}, { repeat: { every: dev ? 10000 : 60000 } });
if (!dev) await queue.cleanup.add('cleanup', {}, { repeat: { every: 300000 } });
if (!dev) await queue.sslRenew.add('sslRenew', {}, { repeat: { every: 1800000 } });
if (!dev) await queue.autoUpdater.add('autoUpdater', {}, { repeat: { every: 60000 } });
};
cron().catch((error) => {
console.log('cron failed to start');
console.log(error);
});
const buildQueueName = 'build_queue';
const buildQueue = new Queue(buildQueueName, connectionOptions);
const buildWorker = new Worker(buildQueueName, async (job) => await builder(job), {
concurrency: 1,
...connectionOptions
});
buildQueue.resume().catch((err) => {
console.log('Build queue failed to resume!', err);
});
buildWorker.on('completed', async (job: Bullmq.Job) => {
try {
await prisma.build.update({ where: { id: job.data.build_id }, data: { status: 'success' } });
} catch (error) {
setTimeout(async () => {
await prisma.build.update({ where: { id: job.data.build_id }, data: { status: 'success' } });
}, 1234);
} finally {
const workdir = `/tmp/build-sources/${job.data.repository}/${job.data.build_id}`;
if (!dev) await asyncExecShell(`rm -fr ${workdir}`);
await prisma.build.update({ where: { id: job.data.build_id }, data: { status: 'success' } });
}
return;
});
buildWorker.on('failed', async (job: Bullmq.Job, failedReason) => {
try {
await prisma.build.update({ where: { id: job.data.build_id }, data: { status: 'failed' } });
} catch (error) {
setTimeout(async () => {
await prisma.build.update({ where: { id: job.data.build_id }, data: { status: 'failed' } });
}, 1234);
} finally {
const workdir = `/tmp/build-sources/${job.data.repository}`;
if (!dev) await asyncExecShell(`rm -fr ${workdir}`);
await prisma.build.update({ where: { id: job.data.build_id }, data: { status: 'failed' } });
}
await saveBuildLog({
line: 'Failed to deploy!',
buildId: job.data.build_id,
applicationId: job.data.id
});
await saveBuildLog({
line: `Reason: ${failedReason.toString()}`,
buildId: job.data.build_id,
applicationId: job.data.id
});
});
const buildLogQueueName = 'log_queue';
const buildLogQueue = new Queue(buildLogQueueName, connectionOptions);
const buildLogWorker = new Worker(buildLogQueueName, async (job) => await logger(job), {
concurrency: 1,
...connectionOptions
});
export { buildQueue, buildLogQueue };