Move expensive runtime checks (service/application status) after cron validation to avoid running them for tasks that aren't due. Critical checks (orphans, infrastructure) remain in first phase. Also fix database heading parameters to be built from the model.
477 lines
17 KiB
PHP
477 lines
17 KiB
PHP
<?php
|
|
|
|
namespace App\Jobs;
|
|
|
|
use App\Models\ScheduledDatabaseBackup;
|
|
use App\Models\ScheduledTask;
|
|
use App\Models\Server;
|
|
use App\Models\Team;
|
|
use Cron\CronExpression;
|
|
use Illuminate\Bus\Queueable;
|
|
use Illuminate\Contracts\Queue\ShouldQueue;
|
|
use Illuminate\Foundation\Bus\Dispatchable;
|
|
use Illuminate\Queue\InteractsWithQueue;
|
|
use Illuminate\Queue\Middleware\WithoutOverlapping;
|
|
use Illuminate\Queue\SerializesModels;
|
|
use Illuminate\Support\Carbon;
|
|
use Illuminate\Support\Collection;
|
|
use Illuminate\Support\Facades\Cache;
|
|
use Illuminate\Support\Facades\Log;
|
|
use Illuminate\Support\Facades\Redis;
|
|
|
|
class ScheduledJobManager implements ShouldQueue
|
|
{
|
|
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
|
|
|
|
/**
|
|
* The time when this job execution started.
|
|
* Used to ensure all scheduled items are evaluated against the same point in time.
|
|
*/
|
|
private ?Carbon $executionTime = null;
|
|
|
|
private int $dispatchedCount = 0;
|
|
|
|
private int $skippedCount = 0;
|
|
|
|
/**
|
|
* Create a new job instance.
|
|
*/
|
|
public function __construct()
|
|
{
|
|
$this->onQueue($this->determineQueue());
|
|
}
|
|
|
|
private function determineQueue(): string
|
|
{
|
|
$preferredQueue = 'crons';
|
|
$fallbackQueue = 'high';
|
|
|
|
$configuredQueues = explode(',', env('HORIZON_QUEUES', 'high,default'));
|
|
|
|
return in_array($preferredQueue, $configuredQueues) ? $preferredQueue : $fallbackQueue;
|
|
}
|
|
|
|
/**
|
|
* Get the middleware the job should pass through.
|
|
*/
|
|
public function middleware(): array
|
|
{
|
|
// Self-healing: clear any stale lock before WithoutOverlapping tries to acquire it.
|
|
// Stale locks (TTL = -1) can occur during upgrades, Redis restarts, or edge cases.
|
|
// @see https://github.com/coollabsio/coolify/issues/8327
|
|
self::clearStaleLockIfPresent();
|
|
|
|
return [
|
|
(new WithoutOverlapping('scheduled-job-manager'))
|
|
->expireAfter(90) // Lock expires after 90s to handle high-load environments with many tasks
|
|
->dontRelease(), // Don't re-queue on lock conflict
|
|
];
|
|
}
|
|
|
|
/**
|
|
* Clear a stale WithoutOverlapping lock if it has no TTL (TTL = -1).
|
|
*
|
|
* This provides continuous self-healing since it runs every time the job is dispatched.
|
|
* Stale locks permanently block all scheduled job executions with no user-visible error.
|
|
*/
|
|
private static function clearStaleLockIfPresent(): void
|
|
{
|
|
try {
|
|
$cachePrefix = config('cache.prefix', '');
|
|
$lockKey = $cachePrefix.'laravel-queue-overlap:'.self::class.':scheduled-job-manager';
|
|
|
|
$ttl = Redis::connection('default')->ttl($lockKey);
|
|
|
|
if ($ttl === -1) {
|
|
Redis::connection('default')->del($lockKey);
|
|
Log::channel('scheduled')->warning('Cleared stale ScheduledJobManager lock', [
|
|
'lock_key' => $lockKey,
|
|
]);
|
|
}
|
|
} catch (\Throwable $e) {
|
|
// Never let lock cleanup failure prevent the job from running
|
|
Log::channel('scheduled-errors')->error('Failed to check/clear stale lock', [
|
|
'error' => $e->getMessage(),
|
|
]);
|
|
}
|
|
}
|
|
|
|
public function handle(): void
|
|
{
|
|
// Freeze the execution time at the start of the job
|
|
$this->executionTime = Carbon::now();
|
|
$this->dispatchedCount = 0;
|
|
$this->skippedCount = 0;
|
|
|
|
Log::channel('scheduled')->info('ScheduledJobManager started', [
|
|
'execution_time' => $this->executionTime->toIso8601String(),
|
|
]);
|
|
|
|
// Process backups - don't let failures stop task processing
|
|
try {
|
|
$this->processScheduledBackups();
|
|
} catch (\Exception $e) {
|
|
Log::channel('scheduled-errors')->error('Failed to process scheduled backups', [
|
|
'error' => $e->getMessage(),
|
|
'trace' => $e->getTraceAsString(),
|
|
]);
|
|
}
|
|
|
|
// Process tasks - don't let failures stop the job manager
|
|
try {
|
|
$this->processScheduledTasks();
|
|
} catch (\Exception $e) {
|
|
Log::channel('scheduled-errors')->error('Failed to process scheduled tasks', [
|
|
'error' => $e->getMessage(),
|
|
'trace' => $e->getTraceAsString(),
|
|
]);
|
|
}
|
|
|
|
// Process Docker cleanups - don't let failures stop the job manager
|
|
try {
|
|
$this->processDockerCleanups();
|
|
} catch (\Exception $e) {
|
|
Log::channel('scheduled-errors')->error('Failed to process docker cleanups', [
|
|
'error' => $e->getMessage(),
|
|
'trace' => $e->getTraceAsString(),
|
|
]);
|
|
}
|
|
|
|
Log::channel('scheduled')->info('ScheduledJobManager completed', [
|
|
'execution_time' => $this->executionTime->toIso8601String(),
|
|
'duration_ms' => $this->executionTime->diffInMilliseconds(Carbon::now()),
|
|
'dispatched' => $this->dispatchedCount,
|
|
'skipped' => $this->skippedCount,
|
|
]);
|
|
|
|
// Write heartbeat so the UI can detect when the scheduler has stopped
|
|
try {
|
|
Cache::put('scheduled-job-manager:heartbeat', now()->toIso8601String(), 300);
|
|
} catch (\Throwable) {
|
|
// Non-critical; don't let heartbeat failure affect the job
|
|
}
|
|
}
|
|
|
|
private function processScheduledBackups(): void
|
|
{
|
|
$backups = ScheduledDatabaseBackup::with(['database'])
|
|
->where('enabled', true)
|
|
->get();
|
|
|
|
foreach ($backups as $backup) {
|
|
try {
|
|
$server = $backup->server();
|
|
$skipReason = $this->getBackupSkipReason($backup, $server);
|
|
if ($skipReason !== null) {
|
|
$this->skippedCount++;
|
|
$this->logSkip('backup', $skipReason, [
|
|
'backup_id' => $backup->id,
|
|
'database_id' => $backup->database_id,
|
|
'database_type' => $backup->database_type,
|
|
'team_id' => $backup->team_id ?? null,
|
|
]);
|
|
|
|
continue;
|
|
}
|
|
|
|
$serverTimezone = data_get($server->settings, 'server_timezone', config('app.timezone'));
|
|
|
|
if (validate_timezone($serverTimezone) === false) {
|
|
$serverTimezone = config('app.timezone');
|
|
}
|
|
|
|
$frequency = $backup->frequency;
|
|
if (isset(VALID_CRON_STRINGS[$frequency])) {
|
|
$frequency = VALID_CRON_STRINGS[$frequency];
|
|
}
|
|
|
|
if ($this->shouldRunNow($frequency, $serverTimezone, "scheduled-backup:{$backup->id}")) {
|
|
DatabaseBackupJob::dispatch($backup);
|
|
$this->dispatchedCount++;
|
|
Log::channel('scheduled')->info('Backup dispatched', [
|
|
'backup_id' => $backup->id,
|
|
'database_id' => $backup->database_id,
|
|
'database_type' => $backup->database_type,
|
|
'team_id' => $backup->team_id ?? null,
|
|
'server_id' => $server->id,
|
|
]);
|
|
}
|
|
} catch (\Exception $e) {
|
|
Log::channel('scheduled-errors')->error('Error processing backup', [
|
|
'backup_id' => $backup->id,
|
|
'error' => $e->getMessage(),
|
|
]);
|
|
}
|
|
}
|
|
}
|
|
|
|
private function processScheduledTasks(): void
|
|
{
|
|
$tasks = ScheduledTask::with(['service', 'application'])
|
|
->where('enabled', true)
|
|
->get();
|
|
|
|
foreach ($tasks as $task) {
|
|
try {
|
|
$server = $task->server();
|
|
|
|
// Phase 1: Critical checks (always — cheap, handles orphans and infra issues)
|
|
$criticalSkip = $this->getTaskCriticalSkipReason($task, $server);
|
|
if ($criticalSkip !== null) {
|
|
$this->skippedCount++;
|
|
$this->logSkip('task', $criticalSkip, [
|
|
'task_id' => $task->id,
|
|
'task_name' => $task->name,
|
|
'team_id' => $server?->team_id,
|
|
]);
|
|
|
|
continue;
|
|
}
|
|
|
|
$serverTimezone = data_get($server->settings, 'server_timezone', config('app.timezone'));
|
|
|
|
if (validate_timezone($serverTimezone) === false) {
|
|
$serverTimezone = config('app.timezone');
|
|
}
|
|
|
|
$frequency = $task->frequency;
|
|
if (isset(VALID_CRON_STRINGS[$frequency])) {
|
|
$frequency = VALID_CRON_STRINGS[$frequency];
|
|
}
|
|
|
|
if (! $this->shouldRunNow($frequency, $serverTimezone, "scheduled-task:{$task->id}")) {
|
|
continue;
|
|
}
|
|
|
|
// Phase 2: Runtime checks (only when cron is due — avoids noise for stopped resources)
|
|
$runtimeSkip = $this->getTaskRuntimeSkipReason($task);
|
|
if ($runtimeSkip !== null) {
|
|
$this->skippedCount++;
|
|
$this->logSkip('task', $runtimeSkip, [
|
|
'task_id' => $task->id,
|
|
'task_name' => $task->name,
|
|
'team_id' => $server->team_id,
|
|
]);
|
|
|
|
continue;
|
|
}
|
|
|
|
ScheduledTaskJob::dispatch($task);
|
|
$this->dispatchedCount++;
|
|
Log::channel('scheduled')->info('Task dispatched', [
|
|
'task_id' => $task->id,
|
|
'task_name' => $task->name,
|
|
'team_id' => $server->team_id,
|
|
'server_id' => $server->id,
|
|
]);
|
|
} catch (\Exception $e) {
|
|
Log::channel('scheduled-errors')->error('Error processing task', [
|
|
'task_id' => $task->id,
|
|
'error' => $e->getMessage(),
|
|
]);
|
|
}
|
|
}
|
|
}
|
|
|
|
private function getBackupSkipReason(ScheduledDatabaseBackup $backup, ?Server $server): ?string
|
|
{
|
|
if (blank(data_get($backup, 'database'))) {
|
|
$backup->delete();
|
|
|
|
return 'database_deleted';
|
|
}
|
|
|
|
if (blank($server)) {
|
|
$backup->delete();
|
|
|
|
return 'server_deleted';
|
|
}
|
|
|
|
if ($server->isFunctional() === false) {
|
|
return 'server_not_functional';
|
|
}
|
|
|
|
if (isCloud() && data_get($server->team->subscription, 'stripe_invoice_paid', false) === false && $server->team->id !== 0) {
|
|
return 'subscription_unpaid';
|
|
}
|
|
|
|
return null;
|
|
}
|
|
|
|
private function getTaskCriticalSkipReason(ScheduledTask $task, ?Server $server): ?string
|
|
{
|
|
if (blank($server)) {
|
|
$task->delete();
|
|
|
|
return 'server_deleted';
|
|
}
|
|
|
|
if ($server->isFunctional() === false) {
|
|
return 'server_not_functional';
|
|
}
|
|
|
|
if (isCloud() && data_get($server->team->subscription, 'stripe_invoice_paid', false) === false && $server->team->id !== 0) {
|
|
return 'subscription_unpaid';
|
|
}
|
|
|
|
if (! $task->service && ! $task->application) {
|
|
$task->delete();
|
|
|
|
return 'resource_deleted';
|
|
}
|
|
|
|
return null;
|
|
}
|
|
|
|
private function getTaskRuntimeSkipReason(ScheduledTask $task): ?string
|
|
{
|
|
if ($task->application && str($task->application->status)->contains('running') === false) {
|
|
return 'application_not_running';
|
|
}
|
|
|
|
if ($task->service && str($task->service->status)->contains('running') === false) {
|
|
return 'service_not_running';
|
|
}
|
|
|
|
return null;
|
|
}
|
|
|
|
/**
|
|
* Determine if a cron schedule should run now.
|
|
*
|
|
* When a dedupKey is provided, uses getPreviousRunDate() + last-dispatch tracking
|
|
* instead of isDue(). This is resilient to queue delays — even if the job is delayed
|
|
* by minutes, it still catches the missed cron window. Without dedupKey, falls back
|
|
* to simple isDue() check.
|
|
*/
|
|
private function shouldRunNow(string $frequency, string $timezone, ?string $dedupKey = null): bool
|
|
{
|
|
$cron = new CronExpression($frequency);
|
|
$baseTime = $this->executionTime ?? Carbon::now();
|
|
$executionTime = $baseTime->copy()->setTimezone($timezone);
|
|
|
|
// No dedup key → simple isDue check (used by docker cleanups)
|
|
if ($dedupKey === null) {
|
|
return $cron->isDue($executionTime);
|
|
}
|
|
|
|
// Get the most recent time this cron was due (including current minute)
|
|
$previousDue = Carbon::instance($cron->getPreviousRunDate($executionTime, allowCurrentDate: true));
|
|
|
|
$lastDispatched = Cache::get($dedupKey);
|
|
|
|
if ($lastDispatched === null) {
|
|
// First run after restart or cache loss: only fire if actually due right now.
|
|
// Seed the cache so subsequent runs can use tolerance/catch-up logic.
|
|
$isDue = $cron->isDue($executionTime);
|
|
if ($isDue) {
|
|
Cache::put($dedupKey, $executionTime->toIso8601String(), 86400);
|
|
}
|
|
|
|
return $isDue;
|
|
}
|
|
|
|
// Subsequent runs: fire if there's been a due time since last dispatch
|
|
if ($previousDue->gt(Carbon::parse($lastDispatched))) {
|
|
Cache::put($dedupKey, $executionTime->toIso8601String(), 86400);
|
|
|
|
return true;
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
private function processDockerCleanups(): void
|
|
{
|
|
// Get all servers that need cleanup checks
|
|
$servers = $this->getServersForCleanup();
|
|
|
|
foreach ($servers as $server) {
|
|
try {
|
|
$skipReason = $this->getDockerCleanupSkipReason($server);
|
|
if ($skipReason !== null) {
|
|
$this->skippedCount++;
|
|
$this->logSkip('docker_cleanup', $skipReason, [
|
|
'server_id' => $server->id,
|
|
'server_name' => $server->name,
|
|
'team_id' => $server->team_id,
|
|
]);
|
|
|
|
continue;
|
|
}
|
|
|
|
$serverTimezone = data_get($server->settings, 'server_timezone', config('app.timezone'));
|
|
if (validate_timezone($serverTimezone) === false) {
|
|
$serverTimezone = config('app.timezone');
|
|
}
|
|
|
|
$frequency = data_get($server->settings, 'docker_cleanup_frequency', '0 * * * *');
|
|
if (isset(VALID_CRON_STRINGS[$frequency])) {
|
|
$frequency = VALID_CRON_STRINGS[$frequency];
|
|
}
|
|
|
|
// Use the frozen execution time for consistent evaluation
|
|
if ($this->shouldRunNow($frequency, $serverTimezone)) {
|
|
DockerCleanupJob::dispatch(
|
|
$server,
|
|
false,
|
|
$server->settings->delete_unused_volumes,
|
|
$server->settings->delete_unused_networks
|
|
);
|
|
$this->dispatchedCount++;
|
|
Log::channel('scheduled')->info('Docker cleanup dispatched', [
|
|
'server_id' => $server->id,
|
|
'server_name' => $server->name,
|
|
'team_id' => $server->team_id,
|
|
]);
|
|
}
|
|
} catch (\Exception $e) {
|
|
Log::channel('scheduled-errors')->error('Error processing docker cleanup', [
|
|
'server_id' => $server->id,
|
|
'server_name' => $server->name,
|
|
'error' => $e->getMessage(),
|
|
]);
|
|
}
|
|
}
|
|
}
|
|
|
|
private function getServersForCleanup(): Collection
|
|
{
|
|
$query = Server::with('settings')
|
|
->where('ip', '!=', '1.2.3.4');
|
|
|
|
if (isCloud()) {
|
|
$servers = $query->whereRelation('team.subscription', 'stripe_invoice_paid', true)->get();
|
|
$own = Team::find(0)->servers()->with('settings')->get();
|
|
|
|
return $servers->merge($own);
|
|
}
|
|
|
|
return $query->get();
|
|
}
|
|
|
|
private function getDockerCleanupSkipReason(Server $server): ?string
|
|
{
|
|
if (! $server->isFunctional()) {
|
|
return 'server_not_functional';
|
|
}
|
|
|
|
// In cloud, check subscription status (except team 0)
|
|
if (isCloud() && $server->team_id !== 0) {
|
|
if (data_get($server->team->subscription, 'stripe_invoice_paid', false) === false) {
|
|
return 'subscription_unpaid';
|
|
}
|
|
}
|
|
|
|
return null;
|
|
}
|
|
|
|
private function logSkip(string $type, string $reason, array $context = []): void
|
|
{
|
|
Log::channel('scheduled')->info(ucfirst(str_replace('_', ' ', $type)).' skipped', array_merge([
|
|
'type' => $type,
|
|
'skip_reason' => $reason,
|
|
'execution_time' => $this->executionTime?->toIso8601String(),
|
|
], $context));
|
|
}
|
|
}
|