fix(crons): dispatch due schedules across chunks
Interleave due backups and tasks so one schedule type cannot starve the other, and defer task job context loading until execution.
This commit is contained in:
parent
9d1ede0733
commit
1c5d5676ef
3 changed files with 469 additions and 160 deletions
|
|
@ -6,14 +6,15 @@
|
|||
use App\Models\ScheduledTask;
|
||||
use App\Models\Server;
|
||||
use App\Models\Team;
|
||||
use Cron\CronExpression;
|
||||
use Illuminate\Bus\Queueable;
|
||||
use Illuminate\Contracts\Queue\ShouldQueue;
|
||||
use Illuminate\Database\Eloquent\Builder;
|
||||
use Illuminate\Foundation\Bus\Dispatchable;
|
||||
use Illuminate\Queue\InteractsWithQueue;
|
||||
use Illuminate\Queue\Middleware\WithoutOverlapping;
|
||||
use Illuminate\Queue\SerializesModels;
|
||||
use Illuminate\Support\Carbon;
|
||||
use Illuminate\Support\Collection;
|
||||
use Illuminate\Support\Facades\Cache;
|
||||
use Illuminate\Support\Facades\Log;
|
||||
use Illuminate\Support\Facades\Redis;
|
||||
|
|
@ -22,6 +23,8 @@ class ScheduledJobManager implements ShouldQueue
|
|||
{
|
||||
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
|
||||
|
||||
private const CHUNK_SIZE = 100;
|
||||
|
||||
/**
|
||||
* The time when this job execution started.
|
||||
* Used to ensure all scheduled items are evaluated against the same point in time.
|
||||
|
|
@ -96,21 +99,11 @@ public function handle(): void
|
|||
'execution_time' => $this->executionTime->toIso8601String(),
|
||||
]);
|
||||
|
||||
// Process backups - don't let failures stop task processing
|
||||
// Process scheduled backups and tasks together so neither type starves the other.
|
||||
try {
|
||||
$this->processScheduledBackups();
|
||||
$this->processScheduledBackupsAndTasks();
|
||||
} catch (\Exception $e) {
|
||||
Log::channel('scheduled-errors')->error('Failed to process scheduled backups', [
|
||||
'error' => $e->getMessage(),
|
||||
'trace' => $e->getTraceAsString(),
|
||||
]);
|
||||
}
|
||||
|
||||
// Process tasks - don't let failures stop the job manager
|
||||
try {
|
||||
$this->processScheduledTasks();
|
||||
} catch (\Exception $e) {
|
||||
Log::channel('scheduled-errors')->error('Failed to process scheduled tasks', [
|
||||
Log::channel('scheduled-errors')->error('Failed to process scheduled backups and tasks', [
|
||||
'error' => $e->getMessage(),
|
||||
'trace' => $e->getTraceAsString(),
|
||||
]);
|
||||
|
|
@ -141,125 +134,211 @@ public function handle(): void
|
|||
}
|
||||
}
|
||||
|
||||
private function processScheduledBackups(): void
|
||||
private function processScheduledBackupsAndTasks(): void
|
||||
{
|
||||
$backups = ScheduledDatabaseBackup::with(['database'])
|
||||
$lastBackupId = 0;
|
||||
$lastTaskId = 0;
|
||||
|
||||
do {
|
||||
$backups = $this->scheduledBackupQuery($lastBackupId)->get();
|
||||
$tasks = $this->scheduledTaskQuery($lastTaskId)->get();
|
||||
|
||||
if ($backups->isNotEmpty()) {
|
||||
$lastBackupId = $backups->last()->id;
|
||||
}
|
||||
|
||||
if ($tasks->isNotEmpty()) {
|
||||
$lastTaskId = $tasks->last()->id;
|
||||
}
|
||||
|
||||
$this->processInterleavedDueSchedules(
|
||||
$this->dueScheduledBackups($backups),
|
||||
$this->dueScheduledTasks($tasks),
|
||||
);
|
||||
} while ($backups->isNotEmpty() || $tasks->isNotEmpty());
|
||||
}
|
||||
|
||||
/**
|
||||
* @param array<int, array{backup: ScheduledDatabaseBackup, server: Server}> $dueBackups
|
||||
* @param array<int, array{task: ScheduledTask, server: Server}> $dueTasks
|
||||
*/
|
||||
private function processInterleavedDueSchedules(array $dueBackups, array $dueTasks): void
|
||||
{
|
||||
$maxCount = max(count($dueBackups), count($dueTasks));
|
||||
|
||||
for ($index = 0; $index < $maxCount; $index++) {
|
||||
if (isset($dueBackups[$index])) {
|
||||
$this->processScheduledBackup($dueBackups[$index]['backup'], $dueBackups[$index]['server']);
|
||||
}
|
||||
|
||||
if (isset($dueTasks[$index])) {
|
||||
$this->processScheduledTask($dueTasks[$index]['task'], $dueTasks[$index]['server']);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private function scheduledBackupQuery(int $lastBackupId): Builder
|
||||
{
|
||||
return ScheduledDatabaseBackup::with(['database', 'team.subscription'])
|
||||
->where('enabled', true)
|
||||
->get();
|
||||
->where('id', '>', $lastBackupId)
|
||||
->orderBy('id')
|
||||
->limit(self::CHUNK_SIZE);
|
||||
}
|
||||
|
||||
private function scheduledTaskQuery(int $lastTaskId): Builder
|
||||
{
|
||||
return ScheduledTask::with([
|
||||
'service.destination.server.settings',
|
||||
'service.destination.server.team.subscription',
|
||||
'application.destination.server.settings',
|
||||
'application.destination.server.team.subscription',
|
||||
])
|
||||
->where('enabled', true)
|
||||
->where('id', '>', $lastTaskId)
|
||||
->orderBy('id')
|
||||
->limit(self::CHUNK_SIZE);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param iterable<ScheduledDatabaseBackup> $backups
|
||||
* @return array<int, array{backup: ScheduledDatabaseBackup, server: Server}>
|
||||
*/
|
||||
private function dueScheduledBackups(iterable $backups): array
|
||||
{
|
||||
$dueBackups = [];
|
||||
|
||||
foreach ($backups as $backup) {
|
||||
try {
|
||||
$server = $backup->server();
|
||||
$skipReason = $this->getBackupSkipReason($backup, $server);
|
||||
if ($skipReason !== null) {
|
||||
$this->skippedCount++;
|
||||
$this->logSkip('backup', $skipReason, [
|
||||
'backup_id' => $backup->id,
|
||||
'database_id' => $backup->database_id,
|
||||
'database_type' => $backup->database_type,
|
||||
'team_id' => $backup->team_id ?? null,
|
||||
]);
|
||||
|
||||
if (blank(data_get($backup, 'database')) || blank($server)) {
|
||||
$this->processScheduledBackup($backup, $server);
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
$serverTimezone = data_get($server->settings, 'server_timezone', config('app.timezone'));
|
||||
|
||||
if (validate_timezone($serverTimezone) === false) {
|
||||
$serverTimezone = config('app.timezone');
|
||||
}
|
||||
|
||||
$frequency = $backup->frequency;
|
||||
if (isset(VALID_CRON_STRINGS[$frequency])) {
|
||||
$frequency = VALID_CRON_STRINGS[$frequency];
|
||||
}
|
||||
|
||||
if (shouldRunCronNow($frequency, $serverTimezone, "scheduled-backup:{$backup->id}", $this->executionTime)) {
|
||||
DatabaseBackupJob::dispatch($backup);
|
||||
$this->dispatchedCount++;
|
||||
Log::channel('scheduled')->info('Backup dispatched', [
|
||||
'backup_id' => $backup->id,
|
||||
'database_id' => $backup->database_id,
|
||||
'database_type' => $backup->database_type,
|
||||
'team_id' => $backup->team_id ?? null,
|
||||
'server_id' => $server->id,
|
||||
]);
|
||||
if ($this->isDueCandidateBeforeExpensiveChecks($backup->frequency, $server, "scheduled-backup:{$backup->id}")) {
|
||||
$dueBackups[] = [
|
||||
'backup' => $backup,
|
||||
'server' => $server,
|
||||
];
|
||||
}
|
||||
} catch (\Exception $e) {
|
||||
Log::channel('scheduled-errors')->error('Error processing backup', [
|
||||
Log::channel('scheduled-errors')->error('Error prechecking backup', [
|
||||
'backup_id' => $backup->id,
|
||||
'error' => $e->getMessage(),
|
||||
]);
|
||||
}
|
||||
}
|
||||
|
||||
return $dueBackups;
|
||||
}
|
||||
|
||||
private function processScheduledTasks(): void
|
||||
/**
|
||||
* @param iterable<ScheduledTask> $tasks
|
||||
* @return array<int, array{task: ScheduledTask, server: Server}>
|
||||
*/
|
||||
private function dueScheduledTasks(iterable $tasks): array
|
||||
{
|
||||
$tasks = ScheduledTask::with(['service', 'application'])
|
||||
->where('enabled', true)
|
||||
->get();
|
||||
$dueTasks = [];
|
||||
|
||||
foreach ($tasks as $task) {
|
||||
try {
|
||||
$server = $task->server();
|
||||
|
||||
// Phase 1: Critical checks (always — cheap, handles orphans and infra issues)
|
||||
$criticalSkip = $this->getTaskCriticalSkipReason($task, $server);
|
||||
if ($criticalSkip !== null) {
|
||||
$this->skippedCount++;
|
||||
$this->logSkip('task', $criticalSkip, [
|
||||
'task_id' => $task->id,
|
||||
'task_name' => $task->name,
|
||||
'team_id' => $server?->team_id,
|
||||
]);
|
||||
if (blank($server) || (! $task->service && ! $task->application)) {
|
||||
$this->processScheduledTask($task, $server);
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
$serverTimezone = data_get($server->settings, 'server_timezone', config('app.timezone'));
|
||||
|
||||
if (validate_timezone($serverTimezone) === false) {
|
||||
$serverTimezone = config('app.timezone');
|
||||
if ($this->isDueCandidateBeforeExpensiveChecks($task->frequency, $server, "scheduled-task:{$task->id}")) {
|
||||
$dueTasks[] = [
|
||||
'task' => $task,
|
||||
'server' => $server,
|
||||
];
|
||||
}
|
||||
|
||||
$frequency = $task->frequency;
|
||||
if (isset(VALID_CRON_STRINGS[$frequency])) {
|
||||
$frequency = VALID_CRON_STRINGS[$frequency];
|
||||
}
|
||||
|
||||
if (! shouldRunCronNow($frequency, $serverTimezone, "scheduled-task:{$task->id}", $this->executionTime)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Phase 2: Runtime checks (only when cron is due — avoids noise for stopped resources)
|
||||
$runtimeSkip = $this->getTaskRuntimeSkipReason($task);
|
||||
if ($runtimeSkip !== null) {
|
||||
$this->skippedCount++;
|
||||
$this->logSkip('task', $runtimeSkip, [
|
||||
'task_id' => $task->id,
|
||||
'task_name' => $task->name,
|
||||
'team_id' => $server->team_id,
|
||||
]);
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
ScheduledTaskJob::dispatch($task);
|
||||
$this->dispatchedCount++;
|
||||
Log::channel('scheduled')->info('Task dispatched', [
|
||||
'task_id' => $task->id,
|
||||
'task_name' => $task->name,
|
||||
'team_id' => $server->team_id,
|
||||
'server_id' => $server->id,
|
||||
]);
|
||||
} catch (\Exception $e) {
|
||||
Log::channel('scheduled-errors')->error('Error processing task', [
|
||||
Log::channel('scheduled-errors')->error('Error prechecking task', [
|
||||
'task_id' => $task->id,
|
||||
'error' => $e->getMessage(),
|
||||
]);
|
||||
}
|
||||
}
|
||||
|
||||
return $dueTasks;
|
||||
}
|
||||
|
||||
private function processScheduledBackup(ScheduledDatabaseBackup $backup, ?Server $precheckedServer = null): void
|
||||
{
|
||||
try {
|
||||
$server = $precheckedServer ?? $backup->server();
|
||||
$skipReason = $this->getBackupSkipReason($backup, $server);
|
||||
if ($skipReason !== null) {
|
||||
$this->skippedCount++;
|
||||
$this->logBackupSkip($backup, $skipReason);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
if ($this->shouldDispatch($backup->frequency, $server, "scheduled-backup:{$backup->id}")) {
|
||||
DatabaseBackupJob::dispatch($backup);
|
||||
$this->dispatchedCount++;
|
||||
Log::channel('scheduled')->info('Backup dispatched', [
|
||||
'backup_id' => $backup->id,
|
||||
'database_id' => $backup->database_id,
|
||||
'database_type' => $backup->database_type,
|
||||
'team_id' => $backup->team_id ?? null,
|
||||
'server_id' => $server->id,
|
||||
]);
|
||||
}
|
||||
} catch (\Exception $e) {
|
||||
Log::channel('scheduled-errors')->error('Error processing backup', [
|
||||
'backup_id' => $backup->id,
|
||||
'error' => $e->getMessage(),
|
||||
]);
|
||||
}
|
||||
}
|
||||
|
||||
private function processScheduledTask(ScheduledTask $task, ?Server $precheckedServer = null): void
|
||||
{
|
||||
try {
|
||||
$server = $precheckedServer ?? $task->server();
|
||||
$criticalSkip = $this->getTaskCriticalSkipReason($task, $server);
|
||||
if ($criticalSkip !== null) {
|
||||
$this->skippedCount++;
|
||||
$this->logTaskSkip($task, $criticalSkip, $server);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
if (! $this->shouldDispatch($task->frequency, $server, "scheduled-task:{$task->id}")) {
|
||||
return;
|
||||
}
|
||||
|
||||
$runtimeSkip = $this->getTaskRuntimeSkipReason($task);
|
||||
if ($runtimeSkip !== null) {
|
||||
$this->skippedCount++;
|
||||
$this->logTaskSkip($task, $runtimeSkip, $server);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
ScheduledTaskJob::dispatch($task);
|
||||
$this->dispatchedCount++;
|
||||
Log::channel('scheduled')->info('Task dispatched', [
|
||||
'task_id' => $task->id,
|
||||
'task_name' => $task->name,
|
||||
'team_id' => $server->team_id,
|
||||
'server_id' => $server->id,
|
||||
]);
|
||||
} catch (\Exception $e) {
|
||||
Log::channel('scheduled-errors')->error('Error processing task', [
|
||||
'task_id' => $task->id,
|
||||
'error' => $e->getMessage(),
|
||||
]);
|
||||
}
|
||||
}
|
||||
|
||||
private function getBackupSkipReason(ScheduledDatabaseBackup $backup, ?Server $server): ?string
|
||||
|
|
@ -327,71 +406,70 @@ private function getTaskRuntimeSkipReason(ScheduledTask $task): ?string
|
|||
|
||||
private function processDockerCleanups(): void
|
||||
{
|
||||
// Get all servers that need cleanup checks
|
||||
$servers = $this->getServersForCleanup();
|
||||
|
||||
foreach ($servers as $server) {
|
||||
try {
|
||||
$skipReason = $this->getDockerCleanupSkipReason($server);
|
||||
if ($skipReason !== null) {
|
||||
$this->skippedCount++;
|
||||
$this->logSkip('docker_cleanup', $skipReason, [
|
||||
'server_id' => $server->id,
|
||||
'server_name' => $server->name,
|
||||
'team_id' => $server->team_id,
|
||||
]);
|
||||
|
||||
continue;
|
||||
$this->getServersForCleanupQuery()
|
||||
->chunkById(self::CHUNK_SIZE, function ($servers): void {
|
||||
foreach ($servers as $server) {
|
||||
$this->processDockerCleanup($server);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
$serverTimezone = data_get($server->settings, 'server_timezone', config('app.timezone'));
|
||||
if (validate_timezone($serverTimezone) === false) {
|
||||
$serverTimezone = config('app.timezone');
|
||||
}
|
||||
|
||||
$frequency = data_get($server->settings, 'docker_cleanup_frequency', '0 * * * *');
|
||||
if (isset(VALID_CRON_STRINGS[$frequency])) {
|
||||
$frequency = VALID_CRON_STRINGS[$frequency];
|
||||
}
|
||||
|
||||
// Use the frozen execution time for consistent evaluation
|
||||
if (shouldRunCronNow($frequency, $serverTimezone, "docker-cleanup:{$server->id}", $this->executionTime)) {
|
||||
DockerCleanupJob::dispatch(
|
||||
$server,
|
||||
false,
|
||||
$server->settings->delete_unused_volumes,
|
||||
$server->settings->delete_unused_networks
|
||||
);
|
||||
$this->dispatchedCount++;
|
||||
Log::channel('scheduled')->info('Docker cleanup dispatched', [
|
||||
'server_id' => $server->id,
|
||||
'server_name' => $server->name,
|
||||
'team_id' => $server->team_id,
|
||||
]);
|
||||
}
|
||||
} catch (\Exception $e) {
|
||||
Log::channel('scheduled-errors')->error('Error processing docker cleanup', [
|
||||
private function processDockerCleanup(Server $server): void
|
||||
{
|
||||
try {
|
||||
$skipReason = $this->getDockerCleanupSkipReason($server);
|
||||
if ($skipReason !== null) {
|
||||
$this->skippedCount++;
|
||||
$this->logSkip('docker_cleanup', $skipReason, [
|
||||
'server_id' => $server->id,
|
||||
'server_name' => $server->name,
|
||||
'error' => $e->getMessage(),
|
||||
'team_id' => $server->team_id,
|
||||
]);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
$frequency = data_get($server->settings, 'docker_cleanup_frequency', '0 * * * *');
|
||||
|
||||
if ($this->shouldDispatch($frequency, $server, "docker-cleanup:{$server->id}")) {
|
||||
DockerCleanupJob::dispatch(
|
||||
$server,
|
||||
false,
|
||||
$server->settings->delete_unused_volumes,
|
||||
$server->settings->delete_unused_networks
|
||||
);
|
||||
$this->dispatchedCount++;
|
||||
Log::channel('scheduled')->info('Docker cleanup dispatched', [
|
||||
'server_id' => $server->id,
|
||||
'server_name' => $server->name,
|
||||
'team_id' => $server->team_id,
|
||||
]);
|
||||
}
|
||||
} catch (\Exception $e) {
|
||||
Log::channel('scheduled-errors')->error('Error processing docker cleanup', [
|
||||
'server_id' => $server->id,
|
||||
'server_name' => $server->name,
|
||||
'error' => $e->getMessage(),
|
||||
]);
|
||||
}
|
||||
}
|
||||
|
||||
private function getServersForCleanup(): Collection
|
||||
private function getServersForCleanupQuery(): Builder
|
||||
{
|
||||
$query = Server::with('settings')
|
||||
->where('ip', '!=', '1.2.3.4');
|
||||
|
||||
if (isCloud()) {
|
||||
$servers = $query->whereRelation('team.subscription', 'stripe_invoice_paid', true)->get();
|
||||
$own = Team::find(0)->servers()->with('settings')->get();
|
||||
|
||||
return $servers->merge($own);
|
||||
$query
|
||||
->with('team.subscription')
|
||||
->where(function (Builder $query): void {
|
||||
$query
|
||||
->where('team_id', 0)
|
||||
->orWhereRelation('team.subscription', 'stripe_invoice_paid', true);
|
||||
});
|
||||
}
|
||||
|
||||
return $query->get();
|
||||
return $query;
|
||||
}
|
||||
|
||||
private function getDockerCleanupSkipReason(Server $server): ?string
|
||||
|
|
@ -418,4 +496,71 @@ private function logSkip(string $type, string $reason, array $context = []): voi
|
|||
'execution_time' => $this->executionTime?->toIso8601String(),
|
||||
], $context));
|
||||
}
|
||||
|
||||
private function shouldDispatch(string $frequency, Server $server, string $dedupKey): bool
|
||||
{
|
||||
return shouldRunCronNow(
|
||||
$this->normalizeFrequency($frequency),
|
||||
$this->serverTimezone($server),
|
||||
$dedupKey,
|
||||
$this->executionTime,
|
||||
);
|
||||
}
|
||||
|
||||
private function isDueCandidateBeforeExpensiveChecks(string $frequency, Server $server, string $dedupKey): bool
|
||||
{
|
||||
$cron = new CronExpression($this->normalizeFrequency($frequency));
|
||||
$executionTime = ($this->executionTime ?? Carbon::now())->copy()->setTimezone($this->serverTimezone($server));
|
||||
$lastDispatched = Cache::get($dedupKey);
|
||||
$previousDue = Carbon::instance($cron->getPreviousRunDate($executionTime, allowCurrentDate: true));
|
||||
|
||||
if ($lastDispatched === null) {
|
||||
$isDue = $cron->isDue($executionTime);
|
||||
|
||||
if (! $isDue) {
|
||||
Cache::put($dedupKey, $previousDue->toIso8601String(), 2592000);
|
||||
}
|
||||
|
||||
return $isDue;
|
||||
}
|
||||
|
||||
$shouldFire = $previousDue->gt(Carbon::parse($lastDispatched));
|
||||
|
||||
if (! $shouldFire) {
|
||||
Cache::put($dedupKey, $previousDue->toIso8601String(), 2592000);
|
||||
}
|
||||
|
||||
return $shouldFire;
|
||||
}
|
||||
|
||||
private function normalizeFrequency(string $frequency): string
|
||||
{
|
||||
return VALID_CRON_STRINGS[$frequency] ?? $frequency;
|
||||
}
|
||||
|
||||
private function serverTimezone(Server $server): string
|
||||
{
|
||||
$timezone = data_get($server->settings, 'server_timezone', config('app.timezone'));
|
||||
|
||||
return validate_timezone($timezone) ? $timezone : config('app.timezone');
|
||||
}
|
||||
|
||||
private function logBackupSkip(ScheduledDatabaseBackup $backup, string $reason): void
|
||||
{
|
||||
$this->logSkip('backup', $reason, [
|
||||
'backup_id' => $backup->id,
|
||||
'database_id' => $backup->database_id,
|
||||
'database_type' => $backup->database_type,
|
||||
'team_id' => $backup->team_id ?? null,
|
||||
]);
|
||||
}
|
||||
|
||||
private function logTaskSkip(ScheduledTask $task, string $reason, ?Server $server): void
|
||||
{
|
||||
$this->logSkip('task', $reason, [
|
||||
'task_id' => $task->id,
|
||||
'task_name' => $task->name,
|
||||
'team_id' => $server?->team_id,
|
||||
]);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -40,13 +40,13 @@ class ScheduledTaskJob implements ShouldBeEncrypted, ShouldQueue
|
|||
*/
|
||||
public $timeout = 300;
|
||||
|
||||
public Team $team;
|
||||
public ?Team $team = null;
|
||||
|
||||
public ?Server $server = null;
|
||||
|
||||
public ScheduledTask $task;
|
||||
|
||||
public Application|Service $resource;
|
||||
public Application|Service|null $resource = null;
|
||||
|
||||
public ?ScheduledTaskExecution $task_log = null;
|
||||
|
||||
|
|
@ -61,25 +61,34 @@ class ScheduledTaskJob implements ShouldBeEncrypted, ShouldQueue
|
|||
|
||||
public array $containers = [];
|
||||
|
||||
public string $server_timezone;
|
||||
public string $server_timezone = 'UTC';
|
||||
|
||||
public function __construct(ScheduledTask $task)
|
||||
{
|
||||
$this->onQueue(crons_queue());
|
||||
|
||||
$this->task = $task;
|
||||
if ($service = $task->service()->first()) {
|
||||
$this->resource = $service;
|
||||
} elseif ($application = $task->application()->first()) {
|
||||
$this->resource = $application;
|
||||
$this->timeout = $this->task->timeout ?? 300;
|
||||
}
|
||||
|
||||
private function initializeExecutionContext(): void
|
||||
{
|
||||
$this->task->loadMissing([
|
||||
'service.destination.server.settings',
|
||||
'application.destination.server.settings',
|
||||
]);
|
||||
|
||||
if ($this->task->service) {
|
||||
$this->resource = $this->task->service;
|
||||
} elseif ($this->task->application) {
|
||||
$this->resource = $this->task->application;
|
||||
} else {
|
||||
throw new \RuntimeException('ScheduledTaskJob failed: No resource found.');
|
||||
}
|
||||
$this->team = Team::findOrFail($task->team_id);
|
||||
$this->server_timezone = $this->getServerTimezone();
|
||||
|
||||
// Set timeout from task configuration
|
||||
$this->timeout = $this->task->timeout ?? 300;
|
||||
$this->team = Team::findOrFail($this->task->team_id);
|
||||
$this->server_timezone = $this->getServerTimezone();
|
||||
$this->server = $this->resource->destination->server;
|
||||
}
|
||||
|
||||
private function getServerTimezone(): string
|
||||
|
|
@ -98,6 +107,8 @@ public function handle(): void
|
|||
$startTime = Carbon::now();
|
||||
|
||||
try {
|
||||
$this->initializeExecutionContext();
|
||||
|
||||
$this->task_log = ScheduledTaskExecution::create([
|
||||
'scheduled_task_id' => $this->task->id,
|
||||
'started_at' => $startTime,
|
||||
|
|
@ -107,8 +118,6 @@ public function handle(): void
|
|||
// Store execution ID for timeout handling
|
||||
$this->executionId = $this->task_log->id;
|
||||
|
||||
$this->server = $this->resource->destination->server;
|
||||
|
||||
if ($this->resource->type() === 'application') {
|
||||
$containers = getCurrentApplicationContainerStatus($this->server, $this->resource->id, 0);
|
||||
if ($containers->count() > 0) {
|
||||
|
|
@ -179,7 +188,10 @@ public function handle(): void
|
|||
// Re-throw to trigger Laravel's retry mechanism with backoff
|
||||
throw $e;
|
||||
} finally {
|
||||
ScheduledTaskDone::dispatch($this->team->id);
|
||||
if ($this->team) {
|
||||
ScheduledTaskDone::dispatch($this->team->id);
|
||||
}
|
||||
|
||||
if ($this->task_log) {
|
||||
$finishedAt = Carbon::now();
|
||||
$duration = round($startTime->floatDiffInSeconds($finishedAt), 2);
|
||||
|
|
@ -205,6 +217,8 @@ public function backoff(): array
|
|||
*/
|
||||
public function failed(?\Throwable $exception): void
|
||||
{
|
||||
$this->team ??= Team::find($this->task->team_id);
|
||||
|
||||
Log::channel('scheduled-errors')->error('ScheduledTask permanently failed', [
|
||||
'job' => 'ScheduledTaskJob',
|
||||
'task_id' => $this->task->uuid,
|
||||
|
|
|
|||
150
tests/Feature/ScheduledJobManagerDispatchTest.php
Normal file
150
tests/Feature/ScheduledJobManagerDispatchTest.php
Normal file
|
|
@ -0,0 +1,150 @@
|
|||
<?php
|
||||
|
||||
use App\Jobs\ScheduledJobManager;
|
||||
use App\Jobs\ScheduledTaskJob;
|
||||
use App\Models\Application;
|
||||
use App\Models\Environment;
|
||||
use App\Models\PrivateKey;
|
||||
use App\Models\Project;
|
||||
use App\Models\ScheduledTask;
|
||||
use App\Models\Server;
|
||||
use App\Models\StandaloneDocker;
|
||||
use App\Models\Team;
|
||||
use Illuminate\Foundation\Testing\RefreshDatabase;
|
||||
use Illuminate\Support\Carbon;
|
||||
use Illuminate\Support\Facades\Cache;
|
||||
use Illuminate\Support\Facades\DB;
|
||||
use Illuminate\Support\Facades\Queue;
|
||||
|
||||
uses(RefreshDatabase::class);
|
||||
|
||||
it('dispatches scheduled tasks across chunks', function () {
|
||||
config(['constants.coolify.self_hosted' => true]);
|
||||
Carbon::setTestNow(Carbon::create(2026, 5, 27, 0, 1, 0, 'UTC'));
|
||||
Queue::fake();
|
||||
|
||||
$team = Team::factory()->create();
|
||||
$privateKey = PrivateKey::create([
|
||||
'name' => 'Test Key',
|
||||
'private_key' => '-----BEGIN OPENSSH PRIVATE KEY-----
|
||||
b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAAAMwAAAAtzc2gtZW
|
||||
QyNTUxOQAAACBbhpqHhqv6aI67Mj9abM3DVbmcfYhZAhC7ca4d9UCevAAAAJi/QySHv0Mk
|
||||
hwAAAAtzc2gtZWQyNTUxOQAAACBbhpqHhqv6aI67Mj9abM3DVbmcfYhZAhC7ca4d9UCevA
|
||||
AAAECBQw4jg1WRT2IGHMncCiZhURCts2s24HoDS0thHnnRKVuGmoeGq/pojrsyP1pszcNV
|
||||
uZx9iFkCELtxrh31QJ68AAAAEXNhaWxANzZmZjY2ZDJlMmRkAQIDBA==
|
||||
-----END OPENSSH PRIVATE KEY-----',
|
||||
'team_id' => $team->id,
|
||||
]);
|
||||
$server = Server::factory()->create([
|
||||
'team_id' => $team->id,
|
||||
'private_key_id' => $privateKey->id,
|
||||
]);
|
||||
$server->settings()->update([
|
||||
'is_reachable' => true,
|
||||
'is_usable' => true,
|
||||
'force_disabled' => false,
|
||||
'docker_cleanup_frequency' => '0 * * * *',
|
||||
]);
|
||||
|
||||
$destination = StandaloneDocker::where('server_id', $server->id)->first()
|
||||
?? StandaloneDocker::factory()->create(['server_id' => $server->id]);
|
||||
$project = Project::factory()->create(['team_id' => $team->id]);
|
||||
$environment = Environment::factory()->create(['project_id' => $project->id]);
|
||||
$application = Application::factory()->create([
|
||||
'environment_id' => $environment->id,
|
||||
'destination_id' => $destination->id,
|
||||
'destination_type' => StandaloneDocker::class,
|
||||
'status' => 'running',
|
||||
]);
|
||||
|
||||
ScheduledTask::factory()
|
||||
->count(101)
|
||||
->create([
|
||||
'team_id' => $team->id,
|
||||
'application_id' => $application->id,
|
||||
'frequency' => '* * * * *',
|
||||
'enabled' => true,
|
||||
]);
|
||||
|
||||
(new ScheduledJobManager)->handle();
|
||||
|
||||
Queue::assertPushed(ScheduledTaskJob::class, 101);
|
||||
});
|
||||
|
||||
it('skips expensive dispatch for non-due schedules while seeding dedup cache', function () {
|
||||
config(['constants.coolify.self_hosted' => true]);
|
||||
Carbon::setTestNow(Carbon::create(2026, 5, 27, 0, 1, 0, 'UTC'));
|
||||
Queue::fake();
|
||||
|
||||
$application = createScheduledTaskApplication();
|
||||
|
||||
$task = ScheduledTask::factory()->create([
|
||||
'team_id' => $application->environment->project->team_id,
|
||||
'application_id' => $application->id,
|
||||
'frequency' => '0 2 * * *',
|
||||
'enabled' => true,
|
||||
]);
|
||||
|
||||
(new ScheduledJobManager)->handle();
|
||||
|
||||
Queue::assertNotPushed(ScheduledTaskJob::class);
|
||||
expect(Cache::get("scheduled-task:{$task->id}"))->not->toBeNull();
|
||||
});
|
||||
|
||||
it('does not query relationships when constructing scheduled task jobs', function () {
|
||||
$application = createScheduledTaskApplication();
|
||||
|
||||
$task = ScheduledTask::factory()->create([
|
||||
'team_id' => $application->environment->project->team_id,
|
||||
'application_id' => $application->id,
|
||||
'frequency' => '* * * * *',
|
||||
'enabled' => true,
|
||||
])->fresh();
|
||||
|
||||
DB::flushQueryLog();
|
||||
DB::enableQueryLog();
|
||||
|
||||
$job = new ScheduledTaskJob($task);
|
||||
|
||||
expect(DB::getQueryLog())->toBeEmpty()
|
||||
->and($job->queue)->toBe(crons_queue())
|
||||
->and($job->timeout)->toBe(300);
|
||||
});
|
||||
|
||||
function createScheduledTaskApplication(): Application
|
||||
{
|
||||
$team = Team::factory()->create();
|
||||
$privateKey = PrivateKey::create([
|
||||
'name' => 'Test Key',
|
||||
'private_key' => '-----BEGIN OPENSSH PRIVATE KEY-----
|
||||
b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAAAMwAAAAtzc2gtZW
|
||||
QyNTUxOQAAACBbhpqHhqv6aI67Mj9abM3DVbmcfYhZAhC7ca4d9UCevAAAAJi/QySHv0Mk
|
||||
hwAAAAtzc2gtZWQyNTUxOQAAACBbhpqHhqv6aI67Mj9abM3DVbmcfYhZAhC7ca4d9UCevA
|
||||
AAAECBQw4jg1WRT2IGHMncCiZhURCts2s24HoDS0thHnnRKVuGmoeGq/pojrsyP1pszcNV
|
||||
uZx9iFkCELtxrh31QJ68AAAAEXNhaWxANzZmZjY2ZDJlMmRkAQIDBA==
|
||||
-----END OPENSSH PRIVATE KEY-----',
|
||||
'team_id' => $team->id,
|
||||
]);
|
||||
$server = Server::factory()->create([
|
||||
'team_id' => $team->id,
|
||||
'private_key_id' => $privateKey->id,
|
||||
]);
|
||||
$server->settings()->update([
|
||||
'is_reachable' => true,
|
||||
'is_usable' => true,
|
||||
'force_disabled' => false,
|
||||
'docker_cleanup_frequency' => '0 * * * *',
|
||||
]);
|
||||
|
||||
$destination = StandaloneDocker::where('server_id', $server->id)->first()
|
||||
?? StandaloneDocker::factory()->create(['server_id' => $server->id]);
|
||||
$project = Project::factory()->create(['team_id' => $team->id]);
|
||||
$environment = Environment::factory()->create(['project_id' => $project->id]);
|
||||
|
||||
return Application::factory()->create([
|
||||
'environment_id' => $environment->id,
|
||||
'destination_id' => $destination->id,
|
||||
'destination_type' => StandaloneDocker::class,
|
||||
'status' => 'running',
|
||||
]);
|
||||
}
|
||||
Loading…
Reference in a new issue