From f68793ed69de6167926fa0f34e3574034b71e46c Mon Sep 17 00:00:00 2001 From: Andras Bacsai <5845193+andrasbacsai@users.noreply.github.com> Date: Sat, 28 Feb 2026 13:18:44 +0100 Subject: [PATCH] feat(jobs): optimize async job dispatches and enhance Stripe subscription sync Reduce unnecessary job queue pressure and improve subscription sync reliability: - Cache ServerStorageCheckJob dispatch to only trigger on disk percentage changes - Rate-limit ConnectProxyToNetworksJob to maximum once per 10 minutes - Add progress callback support to SyncStripeSubscriptionsJob for UI feedback - Implement bulk fetching of valid Stripe subscription IDs for efficiency - Detect and report resubscribed users (same email, different customer ID) - Fix CleanupUnreachableServers query operator (>= 3 instead of = 3) - Improve empty subId validation in PushServerUpdateJob - Optimize relationship access by using properties instead of query methods - Add comprehensive test coverage for all optimizations --- .../Commands/CleanupUnreachableServers.php | 2 +- .../Cloud/SyncStripeSubscriptions.php | 22 ++- app/Jobs/PushServerUpdateJob.php | 33 +++- app/Jobs/SyncStripeSubscriptionsJob.php | 159 +++++++++++++++--- .../Feature/CleanupUnreachableServersTest.php | 73 ++++++++ .../PushServerUpdateJobOptimizationTest.php | 150 +++++++++++++++++ 6 files changed, 402 insertions(+), 37 deletions(-) create mode 100644 tests/Feature/CleanupUnreachableServersTest.php create mode 100644 tests/Feature/PushServerUpdateJobOptimizationTest.php diff --git a/app/Console/Commands/CleanupUnreachableServers.php b/app/Console/Commands/CleanupUnreachableServers.php index def01b265..09563a2c3 100644 --- a/app/Console/Commands/CleanupUnreachableServers.php +++ b/app/Console/Commands/CleanupUnreachableServers.php @@ -14,7 +14,7 @@ class CleanupUnreachableServers extends Command public function handle() { echo "Running unreachable server cleanup...\n"; - $servers = Server::where('unreachable_count', 3)->where('unreachable_notification_sent', true)->where('updated_at', '<', now()->subDays(7))->get(); + $servers = Server::where('unreachable_count', '>=', 3)->where('unreachable_notification_sent', true)->where('updated_at', '<', now()->subDays(7))->get(); if ($servers->count() > 0) { foreach ($servers as $server) { echo "Cleanup unreachable server ($server->id) with name $server->name"; diff --git a/app/Console/Commands/Cloud/SyncStripeSubscriptions.php b/app/Console/Commands/Cloud/SyncStripeSubscriptions.php index e64f86926..46f6b4edd 100644 --- a/app/Console/Commands/Cloud/SyncStripeSubscriptions.php +++ b/app/Console/Commands/Cloud/SyncStripeSubscriptions.php @@ -36,7 +36,14 @@ public function handle(): int $this->newLine(); $job = new SyncStripeSubscriptionsJob($fix); - $result = $job->handle(); + $fetched = 0; + $result = $job->handle(function (int $count) use (&$fetched): void { + $fetched = $count; + $this->output->write("\r Fetching subscriptions from Stripe... {$fetched}"); + }); + if ($fetched > 0) { + $this->output->write("\r".str_repeat(' ', 60)."\r"); + } if (isset($result['error'])) { $this->error($result['error']); @@ -68,6 +75,19 @@ public function handle(): int $this->info('No discrepancies found. All subscriptions are in sync.'); } + if (count($result['resubscribed']) > 0) { + $this->newLine(); + $this->warn('Resubscribed users (same email, different customer): '.count($result['resubscribed'])); + $this->newLine(); + + foreach ($result['resubscribed'] as $resub) { + $this->line(" - Team ID: {$resub['team_id']} | Email: {$resub['email']}"); + $this->line(" Old: {$resub['old_stripe_subscription_id']} (cus: {$resub['old_stripe_customer_id']})"); + $this->line(" New: {$resub['new_stripe_subscription_id']} (cus: {$resub['new_stripe_customer_id']}) [{$resub['new_status']}]"); + $this->newLine(); + } + } + if (count($result['errors']) > 0) { $this->newLine(); $this->error('Errors encountered: '.count($result['errors'])); diff --git a/app/Jobs/PushServerUpdateJob.php b/app/Jobs/PushServerUpdateJob.php index 5d018cf19..85684ff19 100644 --- a/app/Jobs/PushServerUpdateJob.php +++ b/app/Jobs/PushServerUpdateJob.php @@ -24,6 +24,7 @@ use Illuminate\Queue\Middleware\WithoutOverlapping; use Illuminate\Queue\SerializesModels; use Illuminate\Support\Collection; +use Illuminate\Support\Facades\Cache; use Laravel\Horizon\Contracts\Silenced; class PushServerUpdateJob implements ShouldBeEncrypted, ShouldQueue, Silenced @@ -130,7 +131,14 @@ public function handle() $this->containers = collect(data_get($data, 'containers')); $filesystemUsageRoot = data_get($data, 'filesystem_usage_root.used_percentage'); - ServerStorageCheckJob::dispatch($this->server, $filesystemUsageRoot); + + // Only dispatch storage check when disk percentage actually changes + $storageCacheKey = 'storage-check:'.$this->server->id; + $lastPercentage = Cache::get($storageCacheKey); + if ($lastPercentage === null || (string) $lastPercentage !== (string) $filesystemUsageRoot) { + Cache::put($storageCacheKey, $filesystemUsageRoot, 600); + ServerStorageCheckJob::dispatch($this->server, $filesystemUsageRoot); + } if ($this->containers->isEmpty()) { return; @@ -207,7 +215,7 @@ public function handle() $serviceId = $labels->get('coolify.serviceId'); $subType = $labels->get('coolify.service.subType'); $subId = $labels->get('coolify.service.subId'); - if (empty($subId)) { + if (empty(trim((string) $subId))) { continue; } if ($subType === 'application') { @@ -327,6 +335,10 @@ private function aggregateServiceContainerStatuses() // Parse key: serviceId:subType:subId [$serviceId, $subType, $subId] = explode(':', $key); + if (empty($subId)) { + continue; + } + $service = $this->services->where('id', $serviceId)->first(); if (! $service) { continue; @@ -335,9 +347,9 @@ private function aggregateServiceContainerStatuses() // Get the service sub-resource (ServiceApplication or ServiceDatabase) $subResource = null; if ($subType === 'application') { - $subResource = $service->applications()->where('id', $subId)->first(); + $subResource = $service->applications->where('id', $subId)->first(); } elseif ($subType === 'database') { - $subResource = $service->databases()->where('id', $subId)->first(); + $subResource = $service->databases->where('id', $subId)->first(); } if (! $subResource) { @@ -476,8 +488,13 @@ private function updateProxyStatus() } catch (\Throwable $e) { } } else { - // Connect proxy to networks asynchronously to avoid blocking the status update - ConnectProxyToNetworksJob::dispatch($this->server); + // Connect proxy to networks periodically (every 10 min) to avoid excessive job dispatches. + // On-demand triggers (new network, service deploy) use dispatchSync() and bypass this. + $proxyCacheKey = 'connect-proxy:'.$this->server->id; + if (! Cache::has($proxyCacheKey)) { + Cache::put($proxyCacheKey, true, 600); + ConnectProxyToNetworksJob::dispatch($this->server); + } } } } @@ -545,7 +562,7 @@ private function updateServiceSubStatus(string $serviceId, string $subType, stri return; } if ($subType === 'application') { - $application = $service->applications()->where('id', $subId)->first(); + $application = $service->applications->where('id', $subId)->first(); if ($application) { if ($application->status !== $containerStatus) { $application->status = $containerStatus; @@ -553,7 +570,7 @@ private function updateServiceSubStatus(string $serviceId, string $subType, stri } } } elseif ($subType === 'database') { - $database = $service->databases()->where('id', $subId)->first(); + $database = $service->databases->where('id', $subId)->first(); if ($database) { if ($database->status !== $containerStatus) { $database->status = $containerStatus; diff --git a/app/Jobs/SyncStripeSubscriptionsJob.php b/app/Jobs/SyncStripeSubscriptionsJob.php index 9eb946e4d..4301a80d1 100644 --- a/app/Jobs/SyncStripeSubscriptionsJob.php +++ b/app/Jobs/SyncStripeSubscriptionsJob.php @@ -22,7 +22,7 @@ public function __construct(public bool $fix = false) $this->onQueue('high'); } - public function handle(): array + public function handle(?\Closure $onProgress = null): array { if (! isCloud() || ! isStripe()) { return ['error' => 'Not running on Cloud or Stripe not configured']; @@ -33,48 +33,73 @@ public function handle(): array ->get(); $stripe = new \Stripe\StripeClient(config('subscription.stripe_api_key')); + + // Bulk fetch all valid subscription IDs from Stripe (active + past_due) + $validStripeIds = $this->fetchValidStripeSubscriptionIds($stripe, $onProgress); + + // Find DB subscriptions not in the valid set + $staleSubscriptions = $subscriptions->filter( + fn (Subscription $sub) => ! in_array($sub->stripe_subscription_id, $validStripeIds) + ); + + // For each stale subscription, get the exact Stripe status and check for resubscriptions $discrepancies = []; + $resubscribed = []; $errors = []; - foreach ($subscriptions as $subscription) { + foreach ($staleSubscriptions as $subscription) { try { $stripeSubscription = $stripe->subscriptions->retrieve( $subscription->stripe_subscription_id ); + $stripeStatus = $stripeSubscription->status; - // Check if Stripe says cancelled but we think it's active - if (in_array($stripeSubscription->status, ['canceled', 'incomplete_expired', 'unpaid'])) { - $discrepancies[] = [ - 'subscription_id' => $subscription->id, - 'team_id' => $subscription->team_id, - 'stripe_subscription_id' => $subscription->stripe_subscription_id, - 'stripe_status' => $stripeSubscription->status, - ]; - - // Only fix if --fix flag is passed - if ($this->fix) { - $subscription->update([ - 'stripe_invoice_paid' => false, - 'stripe_past_due' => false, - ]); - - if ($stripeSubscription->status === 'canceled') { - $subscription->team?->subscriptionEnded(); - } - } - } - - // Small delay to avoid Stripe rate limits - usleep(100000); // 100ms + usleep(100000); // 100ms rate limit delay } catch (\Exception $e) { $errors[] = [ 'subscription_id' => $subscription->id, 'error' => $e->getMessage(), ]; + + continue; + } + + // Check if this user resubscribed under a different customer/subscription + $activeSub = $this->findActiveSubscriptionByEmail($stripe, $stripeSubscription->customer); + if ($activeSub) { + $resubscribed[] = [ + 'subscription_id' => $subscription->id, + 'team_id' => $subscription->team_id, + 'email' => $activeSub['email'], + 'old_stripe_subscription_id' => $subscription->stripe_subscription_id, + 'old_stripe_customer_id' => $stripeSubscription->customer, + 'new_stripe_subscription_id' => $activeSub['subscription_id'], + 'new_stripe_customer_id' => $activeSub['customer_id'], + 'new_status' => $activeSub['status'], + ]; + + continue; + } + + $discrepancies[] = [ + 'subscription_id' => $subscription->id, + 'team_id' => $subscription->team_id, + 'stripe_subscription_id' => $subscription->stripe_subscription_id, + 'stripe_status' => $stripeStatus, + ]; + + if ($this->fix) { + $subscription->update([ + 'stripe_invoice_paid' => false, + 'stripe_past_due' => false, + ]); + + if ($stripeStatus === 'canceled') { + $subscription->team?->subscriptionEnded(); + } } } - // Only notify if discrepancies found and fixed if ($this->fix && count($discrepancies) > 0) { send_internal_notification( 'SyncStripeSubscriptionsJob: Fixed '.count($discrepancies)." discrepancies:\n". @@ -85,8 +110,88 @@ public function handle(): array return [ 'total_checked' => $subscriptions->count(), 'discrepancies' => $discrepancies, + 'resubscribed' => $resubscribed, 'errors' => $errors, 'fixed' => $this->fix, ]; } + + /** + * Given a Stripe customer ID, get their email and search for other customers + * with the same email that have an active subscription. + * + * @return array{email: string, customer_id: string, subscription_id: string, status: string}|null + */ + private function findActiveSubscriptionByEmail(\Stripe\StripeClient $stripe, string $customerId): ?array + { + try { + $customer = $stripe->customers->retrieve($customerId); + $email = $customer->email; + + if (! $email) { + return null; + } + + usleep(100000); + + $customers = $stripe->customers->all([ + 'email' => $email, + 'limit' => 10, + ]); + + usleep(100000); + + foreach ($customers->data as $matchingCustomer) { + if ($matchingCustomer->id === $customerId) { + continue; + } + + $subs = $stripe->subscriptions->all([ + 'customer' => $matchingCustomer->id, + 'limit' => 10, + ]); + + usleep(100000); + + foreach ($subs->data as $sub) { + if (in_array($sub->status, ['active', 'past_due'])) { + return [ + 'email' => $email, + 'customer_id' => $matchingCustomer->id, + 'subscription_id' => $sub->id, + 'status' => $sub->status, + ]; + } + } + } + } catch (\Exception $e) { + // Silently skip — will fall through to normal discrepancy + } + + return null; + } + + /** + * Bulk fetch all active and past_due subscription IDs from Stripe. + * + * @return array + */ + private function fetchValidStripeSubscriptionIds(\Stripe\StripeClient $stripe, ?\Closure $onProgress = null): array + { + $validIds = []; + $fetched = 0; + + foreach (['active', 'past_due'] as $status) { + foreach ($stripe->subscriptions->all(['status' => $status, 'limit' => 100])->autoPagingIterator() as $sub) { + $validIds[] = $sub->id; + $fetched++; + + if ($onProgress) { + $onProgress($fetched); + } + } + } + + return $validIds; + } } diff --git a/tests/Feature/CleanupUnreachableServersTest.php b/tests/Feature/CleanupUnreachableServersTest.php new file mode 100644 index 000000000..edfd0511c --- /dev/null +++ b/tests/Feature/CleanupUnreachableServersTest.php @@ -0,0 +1,73 @@ += 3 after 7 days', function () { + $team = Team::factory()->create(); + $server = Server::factory()->create([ + 'team_id' => $team->id, + 'unreachable_count' => 50, + 'unreachable_notification_sent' => true, + 'updated_at' => now()->subDays(8), + ]); + + $this->artisan('cleanup:unreachable-servers')->assertSuccessful(); + + $server->refresh(); + expect($server->ip)->toBe('1.2.3.4'); +}); + +it('does not clean up servers with unreachable_count less than 3', function () { + $team = Team::factory()->create(); + $server = Server::factory()->create([ + 'team_id' => $team->id, + 'unreachable_count' => 2, + 'unreachable_notification_sent' => true, + 'updated_at' => now()->subDays(8), + ]); + + $originalIp = $server->ip; + + $this->artisan('cleanup:unreachable-servers')->assertSuccessful(); + + $server->refresh(); + expect($server->ip)->toBe($originalIp); +}); + +it('does not clean up servers updated within 7 days', function () { + $team = Team::factory()->create(); + $server = Server::factory()->create([ + 'team_id' => $team->id, + 'unreachable_count' => 10, + 'unreachable_notification_sent' => true, + 'updated_at' => now()->subDays(3), + ]); + + $originalIp = $server->ip; + + $this->artisan('cleanup:unreachable-servers')->assertSuccessful(); + + $server->refresh(); + expect($server->ip)->toBe($originalIp); +}); + +it('does not clean up servers without notification sent', function () { + $team = Team::factory()->create(); + $server = Server::factory()->create([ + 'team_id' => $team->id, + 'unreachable_count' => 10, + 'unreachable_notification_sent' => false, + 'updated_at' => now()->subDays(8), + ]); + + $originalIp = $server->ip; + + $this->artisan('cleanup:unreachable-servers')->assertSuccessful(); + + $server->refresh(); + expect($server->ip)->toBe($originalIp); +}); diff --git a/tests/Feature/PushServerUpdateJobOptimizationTest.php b/tests/Feature/PushServerUpdateJobOptimizationTest.php new file mode 100644 index 000000000..eb51059db --- /dev/null +++ b/tests/Feature/PushServerUpdateJobOptimizationTest.php @@ -0,0 +1,150 @@ +create(); + $server = Server::factory()->create(['team_id' => $team->id]); + + $data = [ + 'containers' => [], + 'filesystem_usage_root' => ['used_percentage' => 45], + ]; + + $job = new PushServerUpdateJob($server, $data); + $job->handle(); + + Queue::assertPushed(ServerStorageCheckJob::class, function ($job) use ($server) { + return $job->server->id === $server->id && $job->percentage === 45; + }); +}); + +it('does not dispatch storage check when disk percentage is unchanged', function () { + $team = Team::factory()->create(); + $server = Server::factory()->create(['team_id' => $team->id]); + + // Simulate a previous push that cached the percentage + Cache::put('storage-check:'.$server->id, 45, 600); + + $data = [ + 'containers' => [], + 'filesystem_usage_root' => ['used_percentage' => 45], + ]; + + $job = new PushServerUpdateJob($server, $data); + $job->handle(); + + Queue::assertNotPushed(ServerStorageCheckJob::class); +}); + +it('dispatches storage check when disk percentage changes from cached value', function () { + $team = Team::factory()->create(); + $server = Server::factory()->create(['team_id' => $team->id]); + + // Simulate a previous push that cached 45% + Cache::put('storage-check:'.$server->id, 45, 600); + + $data = [ + 'containers' => [], + 'filesystem_usage_root' => ['used_percentage' => 50], + ]; + + $job = new PushServerUpdateJob($server, $data); + $job->handle(); + + Queue::assertPushed(ServerStorageCheckJob::class, function ($job) use ($server) { + return $job->server->id === $server->id && $job->percentage === 50; + }); +}); + +it('rate-limits ConnectProxyToNetworksJob dispatch to every 10 minutes', function () { + $team = Team::factory()->create(); + $server = Server::factory()->create(['team_id' => $team->id]); + $server->settings->update(['is_reachable' => true, 'is_usable' => true]); + + // First push: should dispatch ConnectProxyToNetworksJob + $containersWithProxy = [ + [ + 'name' => 'coolify-proxy', + 'state' => 'running', + 'health_status' => 'healthy', + 'labels' => ['coolify.managed' => true], + ], + ]; + + $data = [ + 'containers' => $containersWithProxy, + 'filesystem_usage_root' => ['used_percentage' => 10], + ]; + + $job = new PushServerUpdateJob($server, $data); + $job->handle(); + + Queue::assertPushed(ConnectProxyToNetworksJob::class, 1); + + // Second push: should NOT dispatch ConnectProxyToNetworksJob (rate-limited) + Queue::fake(); + $job2 = new PushServerUpdateJob($server, $data); + $job2->handle(); + + Queue::assertNotPushed(ConnectProxyToNetworksJob::class); +}); + +it('dispatches ConnectProxyToNetworksJob again after cache expires', function () { + $team = Team::factory()->create(); + $server = Server::factory()->create(['team_id' => $team->id]); + $server->settings->update(['is_reachable' => true, 'is_usable' => true]); + + $containersWithProxy = [ + [ + 'name' => 'coolify-proxy', + 'state' => 'running', + 'health_status' => 'healthy', + 'labels' => ['coolify.managed' => true], + ], + ]; + + $data = [ + 'containers' => $containersWithProxy, + 'filesystem_usage_root' => ['used_percentage' => 10], + ]; + + // First push + $job = new PushServerUpdateJob($server, $data); + $job->handle(); + + Queue::assertPushed(ConnectProxyToNetworksJob::class, 1); + + // Clear cache to simulate expiration + Cache::forget('connect-proxy:'.$server->id); + + // Next push: should dispatch again + Queue::fake(); + $job2 = new PushServerUpdateJob($server, $data); + $job2->handle(); + + Queue::assertPushed(ConnectProxyToNetworksJob::class, 1); +}); + +it('uses default queue for PushServerUpdateJob', function () { + $team = Team::factory()->create(); + $server = Server::factory()->create(['team_id' => $team->id]); + + $job = new PushServerUpdateJob($server, ['containers' => []]); + + expect($job->queue)->toBeNull(); +});