fix(queue): route cloud jobs to dedicated queues
Use config-based queue selection for deployment and scheduled jobs so cloud dispatches deployments to `deployments` and scheduled jobs to `crons`, while self-hosted keeps using `high`. Add coverage for deployment queue helper, start action routing, and scheduled job manager routing.
This commit is contained in:
parent
11dbcfcfe8
commit
e2199f1223
8 changed files with 109 additions and 20 deletions
|
|
@ -11,12 +11,16 @@
|
|||
use App\Models\StandalonePostgresql;
|
||||
use App\Models\StandaloneRedis;
|
||||
use Lorisleiva\Actions\Concerns\AsAction;
|
||||
use Lorisleiva\Actions\Decorators\JobDecorator;
|
||||
|
||||
class StartDatabase
|
||||
{
|
||||
use AsAction;
|
||||
|
||||
public string $jobQueue = 'high';
|
||||
public function configureJob(JobDecorator $job): void
|
||||
{
|
||||
$job->onQueue(deployment_queue());
|
||||
}
|
||||
|
||||
public function handle(StandaloneRedis|StandalonePostgresql|StandaloneMongodb|StandaloneMysql|StandaloneMariadb|StandaloneKeydb|StandaloneDragonfly|StandaloneClickhouse $database)
|
||||
{
|
||||
|
|
@ -25,28 +29,28 @@ public function handle(StandaloneRedis|StandalonePostgresql|StandaloneMongodb|St
|
|||
return 'Server is not functional';
|
||||
}
|
||||
switch ($database->getMorphClass()) {
|
||||
case \App\Models\StandalonePostgresql::class:
|
||||
case StandalonePostgresql::class:
|
||||
$activity = StartPostgresql::run($database);
|
||||
break;
|
||||
case \App\Models\StandaloneRedis::class:
|
||||
case StandaloneRedis::class:
|
||||
$activity = StartRedis::run($database);
|
||||
break;
|
||||
case \App\Models\StandaloneMongodb::class:
|
||||
case StandaloneMongodb::class:
|
||||
$activity = StartMongodb::run($database);
|
||||
break;
|
||||
case \App\Models\StandaloneMysql::class:
|
||||
case StandaloneMysql::class:
|
||||
$activity = StartMysql::run($database);
|
||||
break;
|
||||
case \App\Models\StandaloneMariadb::class:
|
||||
case StandaloneMariadb::class:
|
||||
$activity = StartMariadb::run($database);
|
||||
break;
|
||||
case \App\Models\StandaloneKeydb::class:
|
||||
case StandaloneKeydb::class:
|
||||
$activity = StartKeydb::run($database);
|
||||
break;
|
||||
case \App\Models\StandaloneDragonfly::class:
|
||||
case StandaloneDragonfly::class:
|
||||
$activity = StartDragonfly::run($database);
|
||||
break;
|
||||
case \App\Models\StandaloneClickhouse::class:
|
||||
case StandaloneClickhouse::class:
|
||||
$activity = StartClickhouse::run($database);
|
||||
break;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -11,14 +11,19 @@
|
|||
use App\Models\StandaloneMysql;
|
||||
use App\Models\StandalonePostgresql;
|
||||
use App\Models\StandaloneRedis;
|
||||
use App\Notifications\Container\ContainerRestarted;
|
||||
use Lorisleiva\Actions\Concerns\AsAction;
|
||||
use Lorisleiva\Actions\Decorators\JobDecorator;
|
||||
use Symfony\Component\Yaml\Yaml;
|
||||
|
||||
class StartDatabaseProxy
|
||||
{
|
||||
use AsAction;
|
||||
|
||||
public string $jobQueue = 'high';
|
||||
public function configureJob(JobDecorator $job): void
|
||||
{
|
||||
$job->onQueue(deployment_queue());
|
||||
}
|
||||
|
||||
public function handle(StandaloneRedis|StandalonePostgresql|StandaloneMongodb|StandaloneMysql|StandaloneMariadb|StandaloneKeydb|StandaloneDragonfly|StandaloneClickhouse|ServiceDatabase $database)
|
||||
{
|
||||
|
|
@ -29,7 +34,7 @@ public function handle(StandaloneRedis|StandalonePostgresql|StandaloneMongodb|St
|
|||
$proxyContainerName = "{$database->uuid}-proxy";
|
||||
$isSSLEnabled = $database->enable_ssl ?? false;
|
||||
|
||||
if ($database->getMorphClass() === \App\Models\ServiceDatabase::class) {
|
||||
if ($database->getMorphClass() === ServiceDatabase::class) {
|
||||
$databaseType = $database->databaseType();
|
||||
$network = $database->service->uuid;
|
||||
$server = data_get($database, 'service.destination.server');
|
||||
|
|
@ -132,7 +137,7 @@ public function handle(StandaloneRedis|StandalonePostgresql|StandaloneMongodb|St
|
|||
?? data_get($database, 'service.environment.project.team');
|
||||
|
||||
$team?->notify(
|
||||
new \App\Notifications\Container\ContainerRestarted(
|
||||
new ContainerRestarted(
|
||||
"TCP Proxy for {$database->name} database has been disabled due to error: {$e->getMessage()}",
|
||||
$server,
|
||||
)
|
||||
|
|
|
|||
|
|
@ -4,13 +4,17 @@
|
|||
|
||||
use App\Models\Service;
|
||||
use Lorisleiva\Actions\Concerns\AsAction;
|
||||
use Lorisleiva\Actions\Decorators\JobDecorator;
|
||||
use Symfony\Component\Yaml\Yaml;
|
||||
|
||||
class StartService
|
||||
{
|
||||
use AsAction;
|
||||
|
||||
public string $jobQueue = 'high';
|
||||
public function configureJob(JobDecorator $job): void
|
||||
{
|
||||
$job->onQueue(deployment_queue());
|
||||
}
|
||||
|
||||
public function handle(Service $service, bool $pullLatestImages = false, bool $stopBeforeStart = false)
|
||||
{
|
||||
|
|
|
|||
|
|
@ -197,7 +197,7 @@ public function tags()
|
|||
|
||||
public function __construct(public int $application_deployment_queue_id)
|
||||
{
|
||||
$this->onQueue('high');
|
||||
$this->onQueue(deployment_queue());
|
||||
|
||||
$this->application_deployment_queue = ApplicationDeploymentQueue::find($this->application_deployment_queue_id);
|
||||
$this->nixpacks_plan_json = collect([]);
|
||||
|
|
|
|||
|
|
@ -40,14 +40,15 @@ public function __construct()
|
|||
$this->onQueue($this->determineQueue());
|
||||
}
|
||||
|
||||
/**
|
||||
* On cloud this job runs on a dedicated `crons` queue so it can be drained by an isolated
|
||||
* Horizon worker pool; self-hosted keeps it on the shared `high` queue. Routing is decided
|
||||
* by `isCloud()` (config-based), so the dispatching process needs no special env — only
|
||||
* the worker must be configured to drain `crons` via `HORIZON_QUEUES`.
|
||||
*/
|
||||
private function determineQueue(): string
|
||||
{
|
||||
$preferredQueue = 'crons';
|
||||
$fallbackQueue = 'high';
|
||||
|
||||
$configuredQueues = explode(',', env('HORIZON_QUEUES', 'high,default'));
|
||||
|
||||
return in_array($preferredQueue, $configuredQueues) ? $preferredQueue : $fallbackQueue;
|
||||
return isCloud() ? 'crons' : 'high';
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -592,6 +592,22 @@ function isCloud(): bool
|
|||
return ! config('constants.coolify.self_hosted');
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolve the queue used for application deployments, database starts and service starts.
|
||||
*
|
||||
* On cloud these jobs run on a dedicated `deployments` queue so they can be drained by an
|
||||
* isolated Horizon worker pool; self-hosted keeps them on the shared `high` queue. Routing
|
||||
* is decided by `isCloud()` (config-based) rather than `HORIZON_QUEUES`, so the dispatching
|
||||
* process needs no special env — only the worker must be configured to drain `deployments`.
|
||||
*
|
||||
* IMPORTANT: on cloud a worker MUST include `deployments` in its `HORIZON_QUEUES`, otherwise
|
||||
* these jobs are never processed.
|
||||
*/
|
||||
function deployment_queue(): string
|
||||
{
|
||||
return isCloud() ? 'deployments' : 'high';
|
||||
}
|
||||
|
||||
function translate_cron_expression($expression_to_validate): string
|
||||
{
|
||||
if (isset(VALID_CRON_STRINGS[$expression_to_validate])) {
|
||||
|
|
|
|||
56
tests/Feature/QueueRoutingTest.php
Normal file
56
tests/Feature/QueueRoutingTest.php
Normal file
|
|
@ -0,0 +1,56 @@
|
|||
<?php
|
||||
|
||||
use App\Actions\Database\StartDatabase;
|
||||
use App\Actions\Database\StartDatabaseProxy;
|
||||
use App\Actions\Service\StartService;
|
||||
use App\Jobs\ScheduledJobManager;
|
||||
|
||||
describe('deployment_queue helper', function () {
|
||||
test('uses the high queue on self-hosted', function () {
|
||||
config(['constants.coolify.self_hosted' => true]);
|
||||
|
||||
expect(deployment_queue())->toBe('high');
|
||||
});
|
||||
|
||||
test('uses the deployments queue on cloud', function () {
|
||||
config(['constants.coolify.self_hosted' => false]);
|
||||
|
||||
expect(deployment_queue())->toBe('deployments');
|
||||
});
|
||||
});
|
||||
|
||||
describe('start action job routing', function () {
|
||||
test('routes to the deployments queue on cloud', function (string $actionClass) {
|
||||
config(['constants.coolify.self_hosted' => false]);
|
||||
|
||||
expect($actionClass::makeJob()->queue)->toBe('deployments');
|
||||
})->with([
|
||||
StartDatabase::class,
|
||||
StartDatabaseProxy::class,
|
||||
StartService::class,
|
||||
]);
|
||||
|
||||
test('routes to the high queue on self-hosted', function (string $actionClass) {
|
||||
config(['constants.coolify.self_hosted' => true]);
|
||||
|
||||
expect($actionClass::makeJob()->queue)->toBe('high');
|
||||
})->with([
|
||||
StartDatabase::class,
|
||||
StartDatabaseProxy::class,
|
||||
StartService::class,
|
||||
]);
|
||||
});
|
||||
|
||||
describe('scheduled job manager queue routing', function () {
|
||||
test('uses the crons queue on cloud', function () {
|
||||
config(['constants.coolify.self_hosted' => false]);
|
||||
|
||||
expect((new ScheduledJobManager)->queue)->toBe('crons');
|
||||
});
|
||||
|
||||
test('uses the high queue on self-hosted', function () {
|
||||
config(['constants.coolify.self_hosted' => true]);
|
||||
|
||||
expect((new ScheduledJobManager)->queue)->toBe('high');
|
||||
});
|
||||
});
|
||||
|
|
@ -2,6 +2,9 @@
|
|||
|
||||
use App\Jobs\ScheduledJobManager;
|
||||
use Illuminate\Queue\Middleware\WithoutOverlapping;
|
||||
use Tests\TestCase;
|
||||
|
||||
uses(TestCase::class);
|
||||
|
||||
it('uses WithoutOverlapping middleware with expireAfter to prevent stale locks', function () {
|
||||
$job = new ScheduledJobManager;
|
||||
|
|
|
|||
Loading…
Reference in a new issue