fix: better worker system
This commit is contained in:
parent
71c15e0ff5
commit
ee4360de3a
@ -22,6 +22,7 @@
|
|||||||
"@fastify/jwt": "6.3.2",
|
"@fastify/jwt": "6.3.2",
|
||||||
"@fastify/static": "6.5.0",
|
"@fastify/static": "6.5.0",
|
||||||
"@iarna/toml": "2.2.5",
|
"@iarna/toml": "2.2.5",
|
||||||
|
"@ladjs/graceful": "3.0.2",
|
||||||
"@prisma/client": "3.15.2",
|
"@prisma/client": "3.15.2",
|
||||||
"axios": "0.27.2",
|
"axios": "0.27.2",
|
||||||
"bcryptjs": "2.4.3",
|
"bcryptjs": "2.4.3",
|
||||||
|
@ -7,9 +7,8 @@ import path, { join } from 'path';
|
|||||||
import autoLoad from '@fastify/autoload';
|
import autoLoad from '@fastify/autoload';
|
||||||
import { asyncExecShell, asyncSleep, isDev, listSettings, prisma, version } from './lib/common';
|
import { asyncExecShell, asyncSleep, isDev, listSettings, prisma, version } from './lib/common';
|
||||||
import { scheduler } from './lib/scheduler';
|
import { scheduler } from './lib/scheduler';
|
||||||
import axios from 'axios';
|
|
||||||
import compareVersions from 'compare-versions';
|
import compareVersions from 'compare-versions';
|
||||||
|
import Graceful from '@ladjs/graceful'
|
||||||
declare module 'fastify' {
|
declare module 'fastify' {
|
||||||
interface FastifyInstance {
|
interface FastifyInstance {
|
||||||
config: {
|
config: {
|
||||||
@ -104,45 +103,39 @@ fastify.listen({ port, host }, async (err: any, address: any) => {
|
|||||||
}
|
}
|
||||||
console.log(`Coolify's API is listening on ${host}:${port}`);
|
console.log(`Coolify's API is listening on ${host}:${port}`);
|
||||||
await initServer();
|
await initServer();
|
||||||
await scheduler.start('cleanupPrismaEngines');
|
// await scheduler.start('cleanupPrismaEngines');
|
||||||
await scheduler.start('checkProxies');
|
// await scheduler.start('checkProxies');
|
||||||
|
const graceful = new Graceful({ brees: [scheduler] });
|
||||||
|
graceful.listen();
|
||||||
|
|
||||||
setInterval(async () => {
|
setInterval(async () => {
|
||||||
if (!scheduler.workers.has('deployApplication')) {
|
if (!scheduler.workers.has('deployApplication')) {
|
||||||
scheduler.run('deployApplication');
|
scheduler.run('deployApplication');
|
||||||
}
|
}
|
||||||
|
if (!scheduler.workers.has('infrastructure')) {
|
||||||
|
scheduler.run('infrastructure');
|
||||||
|
}
|
||||||
}, 2000)
|
}, 2000)
|
||||||
|
|
||||||
// Check for update & if no build is running
|
// autoUpdater
|
||||||
setInterval(async () => {
|
setInterval(async () => {
|
||||||
const { isAutoUpdateEnabled } = await prisma.setting.findFirst();
|
scheduler.workers.has('infrastructure') && scheduler.workers.get('infrastructure').postMessage("action:autoUpdater")
|
||||||
if (isAutoUpdateEnabled) {
|
|
||||||
const currentVersion = version;
|
|
||||||
const { data: versions } = await axios
|
|
||||||
.get(
|
|
||||||
`https://get.coollabs.io/versions.json`
|
|
||||||
, {
|
|
||||||
params: {
|
|
||||||
appId: process.env['COOLIFY_APP_ID'] || undefined,
|
|
||||||
version: currentVersion
|
|
||||||
}
|
|
||||||
})
|
|
||||||
const latestVersion = versions['coolify'].main.version;
|
|
||||||
const isUpdateAvailable = compareVersions(latestVersion, currentVersion);
|
|
||||||
if (isUpdateAvailable === 1) {
|
|
||||||
if (!scheduler.workers.has('deployApplication')) {
|
|
||||||
await scheduler.run('autoUpdater')
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}, isDev ? 5000 : 60000 * 15)
|
}, isDev ? 5000 : 60000 * 15)
|
||||||
|
|
||||||
// Cleanup storage
|
// cleanupStorage
|
||||||
setInterval(async () => {
|
setInterval(async () => {
|
||||||
if (!scheduler.workers.has('deployApplication') && !scheduler.workers.has('cleanupStorage')) {
|
scheduler.workers.has('infrastructure') && scheduler.workers.get('infrastructure').postMessage("action:cleanupStorage")
|
||||||
await scheduler.run('cleanupStorage')
|
}, isDev ? 6000 : 60000 * 10)
|
||||||
}
|
|
||||||
}, isDev ? 5000 : 60000 * 10)
|
// checkProxies
|
||||||
|
setInterval(async () => {
|
||||||
|
scheduler.workers.has('infrastructure') && scheduler.workers.get('infrastructure').postMessage("action:checkProxies")
|
||||||
|
}, 10000)
|
||||||
|
|
||||||
|
// cleanupPrismaEngines
|
||||||
|
setInterval(async () => {
|
||||||
|
scheduler.workers.has('infrastructure') && scheduler.workers.get('infrastructure').postMessage("action:cleanupPrismaEngines")
|
||||||
|
}, 60000)
|
||||||
|
|
||||||
await getArch();
|
await getArch();
|
||||||
await getIPAddress();
|
await getIPAddress();
|
||||||
|
@ -14,17 +14,19 @@ import * as buildpacks from '../lib/buildPacks';
|
|||||||
if (message === 'error') throw new Error('oops');
|
if (message === 'error') throw new Error('oops');
|
||||||
if (message === 'cancel') {
|
if (message === 'cancel') {
|
||||||
parentPort.postMessage('cancelled');
|
parentPort.postMessage('cancelled');
|
||||||
|
await prisma.$disconnect()
|
||||||
process.exit(0);
|
process.exit(0);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
try {
|
const pThrottle = await import('p-throttle')
|
||||||
const pThrottle = await import('p-throttle')
|
const throttle = pThrottle.default({
|
||||||
const throttle = pThrottle.default({
|
limit: 1,
|
||||||
limit: 1,
|
interval: 2000
|
||||||
interval: 2000
|
});
|
||||||
});
|
|
||||||
|
|
||||||
const th = throttle(async () => {
|
|
||||||
|
const th = throttle(async () => {
|
||||||
|
try {
|
||||||
const queuedBuilds = await prisma.build.findMany({ where: { status: 'queued' }, orderBy: { createdAt: 'asc' } });
|
const queuedBuilds = await prisma.build.findMany({ where: { status: 'queued' }, orderBy: { createdAt: 'asc' } });
|
||||||
const { concurrentBuilds } = await prisma.setting.findFirst({})
|
const { concurrentBuilds } = await prisma.setting.findFirst({})
|
||||||
if (queuedBuilds.length > 0) {
|
if (queuedBuilds.length > 0) {
|
||||||
@ -356,14 +358,13 @@ import * as buildpacks from '../lib/buildPacks';
|
|||||||
}
|
}
|
||||||
await pAll.default(actions, { concurrency })
|
await pAll.default(actions, { concurrency })
|
||||||
}
|
}
|
||||||
})
|
} catch (error) {
|
||||||
while (true) {
|
} finally {
|
||||||
await th()
|
|
||||||
}
|
}
|
||||||
|
})
|
||||||
|
|
||||||
} catch (error) {
|
while (true) {
|
||||||
} finally {
|
await th()
|
||||||
await prisma.$disconnect()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
220
apps/api/src/jobs/infrastructure.ts
Normal file
220
apps/api/src/jobs/infrastructure.ts
Normal file
@ -0,0 +1,220 @@
|
|||||||
|
import { parentPort } from 'node:worker_threads';
|
||||||
|
import axios from 'axios';
|
||||||
|
import compareVersions from 'compare-versions';
|
||||||
|
import { asyncExecShell, cleanupDockerStorage, executeDockerCmd, isDev, prisma, startTraefikTCPProxy, generateDatabaseConfiguration, startTraefikProxy, listSettings, version } from '../lib/common';
|
||||||
|
|
||||||
|
async function disconnect() {
|
||||||
|
await prisma.$disconnect();
|
||||||
|
}
|
||||||
|
async function autoUpdater() {
|
||||||
|
try {
|
||||||
|
const currentVersion = version;
|
||||||
|
const { data: versions } = await axios
|
||||||
|
.get(
|
||||||
|
`https://get.coollabs.io/versions.json`
|
||||||
|
, {
|
||||||
|
params: {
|
||||||
|
appId: process.env['COOLIFY_APP_ID'] || undefined,
|
||||||
|
version: currentVersion
|
||||||
|
}
|
||||||
|
})
|
||||||
|
const latestVersion = versions['coolify'].main.version;
|
||||||
|
const isUpdateAvailable = compareVersions(latestVersion, currentVersion);
|
||||||
|
if (isUpdateAvailable === 1) {
|
||||||
|
const activeCount = 0
|
||||||
|
if (activeCount === 0) {
|
||||||
|
if (!isDev) {
|
||||||
|
console.log(`Updating Coolify to ${latestVersion}.`);
|
||||||
|
await asyncExecShell(`docker pull coollabsio/coolify:${latestVersion}`);
|
||||||
|
await asyncExecShell(`env | grep COOLIFY > .env`);
|
||||||
|
await asyncExecShell(
|
||||||
|
`docker run --rm -tid --env-file .env -v /var/run/docker.sock:/var/run/docker.sock -v coolify-db coollabsio/coolify:${latestVersion} /bin/sh -c "env | grep COOLIFY > .env && echo 'TAG=${latestVersion}' >> .env && docker stop -t 0 coolify && docker rm coolify && docker compose up -d --force-recreate"`
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
console.log('Updating (not really in dev mode).');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
console.log(error);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
async function checkProxies(){
|
||||||
|
try {
|
||||||
|
const { default: isReachable } = await import('is-port-reachable');
|
||||||
|
let portReachable;
|
||||||
|
|
||||||
|
const { arch, ipv4, ipv6 } = await listSettings();
|
||||||
|
// Coolify Proxy local
|
||||||
|
const engine = '/var/run/docker.sock';
|
||||||
|
const localDocker = await prisma.destinationDocker.findFirst({
|
||||||
|
where: { engine, network: 'coolify' }
|
||||||
|
});
|
||||||
|
if (localDocker && localDocker.isCoolifyProxyUsed) {
|
||||||
|
portReachable = await isReachable(80, { host: ipv4 || ipv6 })
|
||||||
|
console.log({ port: 80, portReachable });
|
||||||
|
if (!portReachable) {
|
||||||
|
await startTraefikProxy(localDocker.id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TCP Proxies
|
||||||
|
const databasesWithPublicPort = await prisma.database.findMany({
|
||||||
|
where: { publicPort: { not: null } },
|
||||||
|
include: { settings: true, destinationDocker: true }
|
||||||
|
});
|
||||||
|
for (const database of databasesWithPublicPort) {
|
||||||
|
const { destinationDockerId, destinationDocker, publicPort, id } = database;
|
||||||
|
if (destinationDockerId && destinationDocker.isCoolifyProxyUsed) {
|
||||||
|
const { privatePort } = generateDatabaseConfiguration(database, arch);
|
||||||
|
portReachable = await isReachable(publicPort, { host: destinationDocker.remoteIpAddress || ipv4 || ipv6 })
|
||||||
|
console.log({ publicPort, portReachable });
|
||||||
|
if (!portReachable) {
|
||||||
|
await startTraefikTCPProxy(destinationDocker, id, publicPort, privatePort);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
const wordpressWithFtp = await prisma.wordpress.findMany({
|
||||||
|
where: { ftpPublicPort: { not: null } },
|
||||||
|
include: { service: { include: { destinationDocker: true } } }
|
||||||
|
});
|
||||||
|
for (const ftp of wordpressWithFtp) {
|
||||||
|
const { service, ftpPublicPort } = ftp;
|
||||||
|
const { destinationDockerId, destinationDocker, id } = service;
|
||||||
|
if (destinationDockerId && destinationDocker.isCoolifyProxyUsed) {
|
||||||
|
portReachable = await isReachable(ftpPublicPort, { host: destinationDocker.remoteIpAddress || ipv4 || ipv6 })
|
||||||
|
console.log({ ftpPublicPort, portReachable });
|
||||||
|
if (!portReachable) {
|
||||||
|
await startTraefikTCPProxy(destinationDocker, id, ftpPublicPort, 22, 'wordpressftp');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// HTTP Proxies
|
||||||
|
const minioInstances = await prisma.minio.findMany({
|
||||||
|
where: { publicPort: { not: null } },
|
||||||
|
include: { service: { include: { destinationDocker: true } } }
|
||||||
|
});
|
||||||
|
for (const minio of minioInstances) {
|
||||||
|
const { service, publicPort } = minio;
|
||||||
|
const { destinationDockerId, destinationDocker, id } = service;
|
||||||
|
if (destinationDockerId && destinationDocker.isCoolifyProxyUsed) {
|
||||||
|
portReachable = await isReachable(publicPort, { host: destinationDocker.remoteIpAddress || ipv4 || ipv6 })
|
||||||
|
console.log({ publicPort, portReachable });
|
||||||
|
if (!portReachable) {
|
||||||
|
await startTraefikTCPProxy(destinationDocker, id, publicPort, 9000);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch(error) {
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
async function cleanupPrismaEngines(){
|
||||||
|
if (!isDev) {
|
||||||
|
try {
|
||||||
|
const { stdout } = await asyncExecShell(`ps -ef | grep /app/prisma-engines/query-engine | grep -v grep | wc -l | xargs`)
|
||||||
|
if (stdout.trim() != null && stdout.trim() != '' && Number(stdout.trim()) > 1) {
|
||||||
|
await asyncExecShell(`killall -q -e /app/prisma-engines/query-engine -o 10m`)
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
console.log(error);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
async function cleanupStorage() {
|
||||||
|
const destinationDockers = await prisma.destinationDocker.findMany();
|
||||||
|
let enginesDone = new Set()
|
||||||
|
for (const destination of destinationDockers) {
|
||||||
|
if (enginesDone.has(destination.engine) || enginesDone.has(destination.remoteIpAddress)) return
|
||||||
|
if (destination.engine) enginesDone.add(destination.engine)
|
||||||
|
if (destination.remoteIpAddress) enginesDone.add(destination.remoteIpAddress)
|
||||||
|
|
||||||
|
let lowDiskSpace = false;
|
||||||
|
try {
|
||||||
|
let stdout = null
|
||||||
|
if (!isDev) {
|
||||||
|
const output = await executeDockerCmd({ dockerId: destination.id, command: `CONTAINER=$(docker ps -lq | head -1) && docker exec $CONTAINER sh -c 'df -kPT /'` })
|
||||||
|
stdout = output.stdout;
|
||||||
|
} else {
|
||||||
|
const output = await asyncExecShell(
|
||||||
|
`df -kPT /`
|
||||||
|
);
|
||||||
|
stdout = output.stdout;
|
||||||
|
}
|
||||||
|
let lines = stdout.trim().split('\n');
|
||||||
|
let header = lines[0];
|
||||||
|
let regex =
|
||||||
|
/^Filesystem\s+|Type\s+|1024-blocks|\s+Used|\s+Available|\s+Capacity|\s+Mounted on\s*$/g;
|
||||||
|
const boundaries = [];
|
||||||
|
let match;
|
||||||
|
|
||||||
|
while ((match = regex.exec(header))) {
|
||||||
|
boundaries.push(match[0].length);
|
||||||
|
}
|
||||||
|
|
||||||
|
boundaries[boundaries.length - 1] = -1;
|
||||||
|
const data = lines.slice(1).map((line) => {
|
||||||
|
const cl = boundaries.map((boundary) => {
|
||||||
|
const column = boundary > 0 ? line.slice(0, boundary) : line;
|
||||||
|
line = line.slice(boundary);
|
||||||
|
return column.trim();
|
||||||
|
});
|
||||||
|
return {
|
||||||
|
capacity: Number.parseInt(cl[5], 10) / 100
|
||||||
|
};
|
||||||
|
});
|
||||||
|
if (data.length > 0) {
|
||||||
|
const { capacity } = data[0];
|
||||||
|
if (capacity > 0.8) {
|
||||||
|
lowDiskSpace = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
console.log(error);
|
||||||
|
}
|
||||||
|
await cleanupDockerStorage(destination.id, lowDiskSpace, false)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
(async () => {
|
||||||
|
let status ={
|
||||||
|
cleanupStorage: false,
|
||||||
|
autoUpdater: false
|
||||||
|
}
|
||||||
|
if (parentPort) {
|
||||||
|
parentPort.on('message', async (message) => {
|
||||||
|
if (parentPort) {
|
||||||
|
if (message === 'error') throw new Error('oops');
|
||||||
|
if (message === 'cancel') {
|
||||||
|
parentPort.postMessage('cancelled');
|
||||||
|
process.exit(1);
|
||||||
|
}
|
||||||
|
if (message === 'action:cleanupStorage') {
|
||||||
|
if (!status.autoUpdater) {
|
||||||
|
status.cleanupStorage = true
|
||||||
|
await cleanupStorage();
|
||||||
|
status.cleanupStorage = false
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (message === 'action:cleanupPrismaEngines') {
|
||||||
|
await cleanupPrismaEngines();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (message === 'action:checkProxies') {
|
||||||
|
await checkProxies();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (message === 'action:autoUpdater') {
|
||||||
|
if (!status.cleanupStorage) {
|
||||||
|
status.autoUpdater = true
|
||||||
|
await autoUpdater();
|
||||||
|
status.autoUpdater = false
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} else process.exit(0);
|
||||||
|
})();
|
@ -18,23 +18,22 @@ const options: any = {
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
jobs: [
|
jobs: [
|
||||||
{
|
{ name: 'infrastructure' },
|
||||||
name: 'deployApplication',
|
{ name: 'deployApplication' },
|
||||||
},
|
// {
|
||||||
{
|
// name: 'cleanupStorage',
|
||||||
name: 'cleanupStorage',
|
// },
|
||||||
},
|
// {
|
||||||
{
|
// name: 'cleanupPrismaEngines',
|
||||||
name: 'cleanupPrismaEngines',
|
// interval: '1m'
|
||||||
interval: '1m'
|
// },
|
||||||
},
|
// {
|
||||||
{
|
// name: 'checkProxies',
|
||||||
name: 'checkProxies',
|
// interval: '10s'
|
||||||
interval: '10s'
|
// },
|
||||||
},
|
// {
|
||||||
{
|
// name: 'autoUpdater',
|
||||||
name: 'autoUpdater',
|
// }
|
||||||
}
|
|
||||||
],
|
],
|
||||||
};
|
};
|
||||||
if (isDev) options.root = path.join(__dirname, '../jobs');
|
if (isDev) options.root = path.join(__dirname, '../jobs');
|
||||||
|
647
pnpm-lock.yaml
647
pnpm-lock.yaml
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue
Block a user