onQueue($this->determineQueue()); } private function determineQueue(): string { $preferredQueue = 'crons'; $fallbackQueue = 'high'; $configuredQueues = explode(',', env('HORIZON_QUEUES', 'high,default')); return in_array($preferredQueue, $configuredQueues) ? $preferredQueue : $fallbackQueue; } /** * Get the middleware the job should pass through. */ public function middleware(): array { // Self-healing: clear any stale lock before WithoutOverlapping tries to acquire it. // Stale locks (TTL = -1) can occur during upgrades, Redis restarts, or edge cases. // @see https://github.com/coollabsio/coolify/issues/8327 self::clearStaleLockIfPresent(); return [ (new WithoutOverlapping('scheduled-job-manager')) ->expireAfter(90) // Lock expires after 90s to handle high-load environments with many tasks ->dontRelease(), // Don't re-queue on lock conflict ]; } /** * Clear a stale WithoutOverlapping lock if it has no TTL (TTL = -1). * * This provides continuous self-healing since it runs every time the job is dispatched. * Stale locks permanently block all scheduled job executions with no user-visible error. */ private static function clearStaleLockIfPresent(): void { try { $cachePrefix = config('cache.prefix', ''); $lockKey = $cachePrefix.'laravel-queue-overlap:'.self::class.':scheduled-job-manager'; $ttl = Redis::connection('default')->ttl($lockKey); if ($ttl === -1) { Redis::connection('default')->del($lockKey); Log::channel('scheduled')->warning('Cleared stale ScheduledJobManager lock', [ 'lock_key' => $lockKey, ]); } } catch (\Throwable $e) { // Never let lock cleanup failure prevent the job from running Log::channel('scheduled-errors')->error('Failed to check/clear stale lock', [ 'error' => $e->getMessage(), ]); } } public function handle(): void { // Freeze the execution time at the start of the job $this->executionTime = Carbon::now(); $this->dispatchedCount = 0; $this->skippedCount = 0; Log::channel('scheduled')->info('ScheduledJobManager started', [ 'execution_time' => $this->executionTime->toIso8601String(), ]); // Process backups - don't let failures stop task processing try { $this->processScheduledBackups(); } catch (\Exception $e) { Log::channel('scheduled-errors')->error('Failed to process scheduled backups', [ 'error' => $e->getMessage(), 'trace' => $e->getTraceAsString(), ]); } // Process tasks - don't let failures stop the job manager try { $this->processScheduledTasks(); } catch (\Exception $e) { Log::channel('scheduled-errors')->error('Failed to process scheduled tasks', [ 'error' => $e->getMessage(), 'trace' => $e->getTraceAsString(), ]); } // Process Docker cleanups - don't let failures stop the job manager try { $this->processDockerCleanups(); } catch (\Exception $e) { Log::channel('scheduled-errors')->error('Failed to process docker cleanups', [ 'error' => $e->getMessage(), 'trace' => $e->getTraceAsString(), ]); } Log::channel('scheduled')->info('ScheduledJobManager completed', [ 'execution_time' => $this->executionTime->toIso8601String(), 'duration_ms' => $this->executionTime->diffInMilliseconds(Carbon::now()), 'dispatched' => $this->dispatchedCount, 'skipped' => $this->skippedCount, ]); // Write heartbeat so the UI can detect when the scheduler has stopped try { Cache::put('scheduled-job-manager:heartbeat', now()->toIso8601String(), 300); } catch (\Throwable) { // Non-critical; don't let heartbeat failure affect the job } } private function processScheduledBackups(): void { $backups = ScheduledDatabaseBackup::with(['database']) ->where('enabled', true) ->get(); foreach ($backups as $backup) { try { $server = $backup->server(); $skipReason = $this->getBackupSkipReason($backup, $server); if ($skipReason !== null) { $this->skippedCount++; $this->logSkip('backup', $skipReason, [ 'backup_id' => $backup->id, 'database_id' => $backup->database_id, 'database_type' => $backup->database_type, 'team_id' => $backup->team_id ?? null, ]); continue; } $serverTimezone = data_get($server->settings, 'server_timezone', config('app.timezone')); if (validate_timezone($serverTimezone) === false) { $serverTimezone = config('app.timezone'); } $frequency = $backup->frequency; if (isset(VALID_CRON_STRINGS[$frequency])) { $frequency = VALID_CRON_STRINGS[$frequency]; } if ($this->shouldRunNow($frequency, $serverTimezone, "scheduled-backup:{$backup->id}")) { DatabaseBackupJob::dispatch($backup); $this->dispatchedCount++; Log::channel('scheduled')->info('Backup dispatched', [ 'backup_id' => $backup->id, 'database_id' => $backup->database_id, 'database_type' => $backup->database_type, 'team_id' => $backup->team_id ?? null, 'server_id' => $server->id, ]); } } catch (\Exception $e) { Log::channel('scheduled-errors')->error('Error processing backup', [ 'backup_id' => $backup->id, 'error' => $e->getMessage(), ]); } } } private function processScheduledTasks(): void { $tasks = ScheduledTask::with(['service', 'application']) ->where('enabled', true) ->get(); foreach ($tasks as $task) { try { $server = $task->server(); // Phase 1: Critical checks (always — cheap, handles orphans and infra issues) $criticalSkip = $this->getTaskCriticalSkipReason($task, $server); if ($criticalSkip !== null) { $this->skippedCount++; $this->logSkip('task', $criticalSkip, [ 'task_id' => $task->id, 'task_name' => $task->name, 'team_id' => $server?->team_id, ]); continue; } $serverTimezone = data_get($server->settings, 'server_timezone', config('app.timezone')); if (validate_timezone($serverTimezone) === false) { $serverTimezone = config('app.timezone'); } $frequency = $task->frequency; if (isset(VALID_CRON_STRINGS[$frequency])) { $frequency = VALID_CRON_STRINGS[$frequency]; } if (! $this->shouldRunNow($frequency, $serverTimezone, "scheduled-task:{$task->id}")) { continue; } // Phase 2: Runtime checks (only when cron is due — avoids noise for stopped resources) $runtimeSkip = $this->getTaskRuntimeSkipReason($task); if ($runtimeSkip !== null) { $this->skippedCount++; $this->logSkip('task', $runtimeSkip, [ 'task_id' => $task->id, 'task_name' => $task->name, 'team_id' => $server->team_id, ]); continue; } ScheduledTaskJob::dispatch($task); $this->dispatchedCount++; Log::channel('scheduled')->info('Task dispatched', [ 'task_id' => $task->id, 'task_name' => $task->name, 'team_id' => $server->team_id, 'server_id' => $server->id, ]); } catch (\Exception $e) { Log::channel('scheduled-errors')->error('Error processing task', [ 'task_id' => $task->id, 'error' => $e->getMessage(), ]); } } } private function getBackupSkipReason(ScheduledDatabaseBackup $backup, ?Server $server): ?string { if (blank(data_get($backup, 'database'))) { $backup->delete(); return 'database_deleted'; } if (blank($server)) { $backup->delete(); return 'server_deleted'; } if ($server->isFunctional() === false) { return 'server_not_functional'; } if (isCloud() && data_get($server->team->subscription, 'stripe_invoice_paid', false) === false && $server->team->id !== 0) { return 'subscription_unpaid'; } return null; } private function getTaskCriticalSkipReason(ScheduledTask $task, ?Server $server): ?string { if (blank($server)) { $task->delete(); return 'server_deleted'; } if ($server->isFunctional() === false) { return 'server_not_functional'; } if (isCloud() && data_get($server->team->subscription, 'stripe_invoice_paid', false) === false && $server->team->id !== 0) { return 'subscription_unpaid'; } if (! $task->service && ! $task->application) { $task->delete(); return 'resource_deleted'; } return null; } private function getTaskRuntimeSkipReason(ScheduledTask $task): ?string { if ($task->application && str($task->application->status)->contains('running') === false) { return 'application_not_running'; } if ($task->service && str($task->service->status)->contains('running') === false) { return 'service_not_running'; } return null; } /** * Determine if a cron schedule should run now. * * When a dedupKey is provided, uses getPreviousRunDate() + last-dispatch tracking * instead of isDue(). This is resilient to queue delays — even if the job is delayed * by minutes, it still catches the missed cron window. Without dedupKey, falls back * to simple isDue() check. */ private function shouldRunNow(string $frequency, string $timezone, ?string $dedupKey = null): bool { $cron = new CronExpression($frequency); $baseTime = $this->executionTime ?? Carbon::now(); $executionTime = $baseTime->copy()->setTimezone($timezone); // No dedup key → simple isDue check (used by docker cleanups) if ($dedupKey === null) { return $cron->isDue($executionTime); } // Get the most recent time this cron was due (including current minute) $previousDue = Carbon::instance($cron->getPreviousRunDate($executionTime, allowCurrentDate: true)); $lastDispatched = Cache::get($dedupKey); if ($lastDispatched === null) { // First run after restart or cache loss: only fire if actually due right now. // Seed the cache so subsequent runs can use tolerance/catch-up logic. $isDue = $cron->isDue($executionTime); if ($isDue) { Cache::put($dedupKey, $executionTime->toIso8601String(), 86400); } return $isDue; } // Subsequent runs: fire if there's been a due time since last dispatch if ($previousDue->gt(Carbon::parse($lastDispatched))) { Cache::put($dedupKey, $executionTime->toIso8601String(), 86400); return true; } return false; } private function processDockerCleanups(): void { // Get all servers that need cleanup checks $servers = $this->getServersForCleanup(); foreach ($servers as $server) { try { $skipReason = $this->getDockerCleanupSkipReason($server); if ($skipReason !== null) { $this->skippedCount++; $this->logSkip('docker_cleanup', $skipReason, [ 'server_id' => $server->id, 'server_name' => $server->name, 'team_id' => $server->team_id, ]); continue; } $serverTimezone = data_get($server->settings, 'server_timezone', config('app.timezone')); if (validate_timezone($serverTimezone) === false) { $serverTimezone = config('app.timezone'); } $frequency = data_get($server->settings, 'docker_cleanup_frequency', '0 * * * *'); if (isset(VALID_CRON_STRINGS[$frequency])) { $frequency = VALID_CRON_STRINGS[$frequency]; } // Use the frozen execution time for consistent evaluation if ($this->shouldRunNow($frequency, $serverTimezone)) { DockerCleanupJob::dispatch( $server, false, $server->settings->delete_unused_volumes, $server->settings->delete_unused_networks ); $this->dispatchedCount++; Log::channel('scheduled')->info('Docker cleanup dispatched', [ 'server_id' => $server->id, 'server_name' => $server->name, 'team_id' => $server->team_id, ]); } } catch (\Exception $e) { Log::channel('scheduled-errors')->error('Error processing docker cleanup', [ 'server_id' => $server->id, 'server_name' => $server->name, 'error' => $e->getMessage(), ]); } } } private function getServersForCleanup(): Collection { $query = Server::with('settings') ->where('ip', '!=', '1.2.3.4'); if (isCloud()) { $servers = $query->whereRelation('team.subscription', 'stripe_invoice_paid', true)->get(); $own = Team::find(0)->servers()->with('settings')->get(); return $servers->merge($own); } return $query->get(); } private function getDockerCleanupSkipReason(Server $server): ?string { if (! $server->isFunctional()) { return 'server_not_functional'; } // In cloud, check subscription status (except team 0) if (isCloud() && $server->team_id !== 0) { if (data_get($server->team->subscription, 'stripe_invoice_paid', false) === false) { return 'subscription_unpaid'; } } return null; } private function logSkip(string $type, string $reason, array $context = []): void { Log::channel('scheduled')->info(ucfirst(str_replace('_', ' ', $type)).' skipped', array_merge([ 'type' => $type, 'skip_reason' => $reason, 'execution_time' => $this->executionTime?->toIso8601String(), ], $context)); } }