feat(jobs): optimize async job dispatches and enhance Stripe subscription sync

Reduce unnecessary job queue pressure and improve subscription sync reliability:

- Cache ServerStorageCheckJob dispatch to only trigger on disk percentage changes
- Rate-limit ConnectProxyToNetworksJob to maximum once per 10 minutes
- Add progress callback support to SyncStripeSubscriptionsJob for UI feedback
- Implement bulk fetching of valid Stripe subscription IDs for efficiency
- Detect and report resubscribed users (same email, different customer ID)
- Fix CleanupUnreachableServers query operator (>= 3 instead of = 3)
- Improve empty subId validation in PushServerUpdateJob
- Optimize relationship access by using properties instead of query methods
- Add comprehensive test coverage for all optimizations
This commit is contained in:
Andras Bacsai 2026-02-28 13:18:44 +01:00
parent 6b2a669cb9
commit f68793ed69
6 changed files with 402 additions and 37 deletions

View file

@ -14,7 +14,7 @@ class CleanupUnreachableServers extends Command
public function handle()
{
echo "Running unreachable server cleanup...\n";
$servers = Server::where('unreachable_count', 3)->where('unreachable_notification_sent', true)->where('updated_at', '<', now()->subDays(7))->get();
$servers = Server::where('unreachable_count', '>=', 3)->where('unreachable_notification_sent', true)->where('updated_at', '<', now()->subDays(7))->get();
if ($servers->count() > 0) {
foreach ($servers as $server) {
echo "Cleanup unreachable server ($server->id) with name $server->name";

View file

@ -36,7 +36,14 @@ public function handle(): int
$this->newLine();
$job = new SyncStripeSubscriptionsJob($fix);
$result = $job->handle();
$fetched = 0;
$result = $job->handle(function (int $count) use (&$fetched): void {
$fetched = $count;
$this->output->write("\r Fetching subscriptions from Stripe... {$fetched}");
});
if ($fetched > 0) {
$this->output->write("\r".str_repeat(' ', 60)."\r");
}
if (isset($result['error'])) {
$this->error($result['error']);
@ -68,6 +75,19 @@ public function handle(): int
$this->info('No discrepancies found. All subscriptions are in sync.');
}
if (count($result['resubscribed']) > 0) {
$this->newLine();
$this->warn('Resubscribed users (same email, different customer): '.count($result['resubscribed']));
$this->newLine();
foreach ($result['resubscribed'] as $resub) {
$this->line(" - Team ID: {$resub['team_id']} | Email: {$resub['email']}");
$this->line(" Old: {$resub['old_stripe_subscription_id']} (cus: {$resub['old_stripe_customer_id']})");
$this->line(" New: {$resub['new_stripe_subscription_id']} (cus: {$resub['new_stripe_customer_id']}) [{$resub['new_status']}]");
$this->newLine();
}
}
if (count($result['errors']) > 0) {
$this->newLine();
$this->error('Errors encountered: '.count($result['errors']));

View file

@ -24,6 +24,7 @@
use Illuminate\Queue\Middleware\WithoutOverlapping;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Collection;
use Illuminate\Support\Facades\Cache;
use Laravel\Horizon\Contracts\Silenced;
class PushServerUpdateJob implements ShouldBeEncrypted, ShouldQueue, Silenced
@ -130,7 +131,14 @@ public function handle()
$this->containers = collect(data_get($data, 'containers'));
$filesystemUsageRoot = data_get($data, 'filesystem_usage_root.used_percentage');
ServerStorageCheckJob::dispatch($this->server, $filesystemUsageRoot);
// Only dispatch storage check when disk percentage actually changes
$storageCacheKey = 'storage-check:'.$this->server->id;
$lastPercentage = Cache::get($storageCacheKey);
if ($lastPercentage === null || (string) $lastPercentage !== (string) $filesystemUsageRoot) {
Cache::put($storageCacheKey, $filesystemUsageRoot, 600);
ServerStorageCheckJob::dispatch($this->server, $filesystemUsageRoot);
}
if ($this->containers->isEmpty()) {
return;
@ -207,7 +215,7 @@ public function handle()
$serviceId = $labels->get('coolify.serviceId');
$subType = $labels->get('coolify.service.subType');
$subId = $labels->get('coolify.service.subId');
if (empty($subId)) {
if (empty(trim((string) $subId))) {
continue;
}
if ($subType === 'application') {
@ -327,6 +335,10 @@ private function aggregateServiceContainerStatuses()
// Parse key: serviceId:subType:subId
[$serviceId, $subType, $subId] = explode(':', $key);
if (empty($subId)) {
continue;
}
$service = $this->services->where('id', $serviceId)->first();
if (! $service) {
continue;
@ -335,9 +347,9 @@ private function aggregateServiceContainerStatuses()
// Get the service sub-resource (ServiceApplication or ServiceDatabase)
$subResource = null;
if ($subType === 'application') {
$subResource = $service->applications()->where('id', $subId)->first();
$subResource = $service->applications->where('id', $subId)->first();
} elseif ($subType === 'database') {
$subResource = $service->databases()->where('id', $subId)->first();
$subResource = $service->databases->where('id', $subId)->first();
}
if (! $subResource) {
@ -476,8 +488,13 @@ private function updateProxyStatus()
} catch (\Throwable $e) {
}
} else {
// Connect proxy to networks asynchronously to avoid blocking the status update
ConnectProxyToNetworksJob::dispatch($this->server);
// Connect proxy to networks periodically (every 10 min) to avoid excessive job dispatches.
// On-demand triggers (new network, service deploy) use dispatchSync() and bypass this.
$proxyCacheKey = 'connect-proxy:'.$this->server->id;
if (! Cache::has($proxyCacheKey)) {
Cache::put($proxyCacheKey, true, 600);
ConnectProxyToNetworksJob::dispatch($this->server);
}
}
}
}
@ -545,7 +562,7 @@ private function updateServiceSubStatus(string $serviceId, string $subType, stri
return;
}
if ($subType === 'application') {
$application = $service->applications()->where('id', $subId)->first();
$application = $service->applications->where('id', $subId)->first();
if ($application) {
if ($application->status !== $containerStatus) {
$application->status = $containerStatus;
@ -553,7 +570,7 @@ private function updateServiceSubStatus(string $serviceId, string $subType, stri
}
}
} elseif ($subType === 'database') {
$database = $service->databases()->where('id', $subId)->first();
$database = $service->databases->where('id', $subId)->first();
if ($database) {
if ($database->status !== $containerStatus) {
$database->status = $containerStatus;

View file

@ -22,7 +22,7 @@ public function __construct(public bool $fix = false)
$this->onQueue('high');
}
public function handle(): array
public function handle(?\Closure $onProgress = null): array
{
if (! isCloud() || ! isStripe()) {
return ['error' => 'Not running on Cloud or Stripe not configured'];
@ -33,48 +33,73 @@ public function handle(): array
->get();
$stripe = new \Stripe\StripeClient(config('subscription.stripe_api_key'));
// Bulk fetch all valid subscription IDs from Stripe (active + past_due)
$validStripeIds = $this->fetchValidStripeSubscriptionIds($stripe, $onProgress);
// Find DB subscriptions not in the valid set
$staleSubscriptions = $subscriptions->filter(
fn (Subscription $sub) => ! in_array($sub->stripe_subscription_id, $validStripeIds)
);
// For each stale subscription, get the exact Stripe status and check for resubscriptions
$discrepancies = [];
$resubscribed = [];
$errors = [];
foreach ($subscriptions as $subscription) {
foreach ($staleSubscriptions as $subscription) {
try {
$stripeSubscription = $stripe->subscriptions->retrieve(
$subscription->stripe_subscription_id
);
$stripeStatus = $stripeSubscription->status;
// Check if Stripe says cancelled but we think it's active
if (in_array($stripeSubscription->status, ['canceled', 'incomplete_expired', 'unpaid'])) {
$discrepancies[] = [
'subscription_id' => $subscription->id,
'team_id' => $subscription->team_id,
'stripe_subscription_id' => $subscription->stripe_subscription_id,
'stripe_status' => $stripeSubscription->status,
];
// Only fix if --fix flag is passed
if ($this->fix) {
$subscription->update([
'stripe_invoice_paid' => false,
'stripe_past_due' => false,
]);
if ($stripeSubscription->status === 'canceled') {
$subscription->team?->subscriptionEnded();
}
}
}
// Small delay to avoid Stripe rate limits
usleep(100000); // 100ms
usleep(100000); // 100ms rate limit delay
} catch (\Exception $e) {
$errors[] = [
'subscription_id' => $subscription->id,
'error' => $e->getMessage(),
];
continue;
}
// Check if this user resubscribed under a different customer/subscription
$activeSub = $this->findActiveSubscriptionByEmail($stripe, $stripeSubscription->customer);
if ($activeSub) {
$resubscribed[] = [
'subscription_id' => $subscription->id,
'team_id' => $subscription->team_id,
'email' => $activeSub['email'],
'old_stripe_subscription_id' => $subscription->stripe_subscription_id,
'old_stripe_customer_id' => $stripeSubscription->customer,
'new_stripe_subscription_id' => $activeSub['subscription_id'],
'new_stripe_customer_id' => $activeSub['customer_id'],
'new_status' => $activeSub['status'],
];
continue;
}
$discrepancies[] = [
'subscription_id' => $subscription->id,
'team_id' => $subscription->team_id,
'stripe_subscription_id' => $subscription->stripe_subscription_id,
'stripe_status' => $stripeStatus,
];
if ($this->fix) {
$subscription->update([
'stripe_invoice_paid' => false,
'stripe_past_due' => false,
]);
if ($stripeStatus === 'canceled') {
$subscription->team?->subscriptionEnded();
}
}
}
// Only notify if discrepancies found and fixed
if ($this->fix && count($discrepancies) > 0) {
send_internal_notification(
'SyncStripeSubscriptionsJob: Fixed '.count($discrepancies)." discrepancies:\n".
@ -85,8 +110,88 @@ public function handle(): array
return [
'total_checked' => $subscriptions->count(),
'discrepancies' => $discrepancies,
'resubscribed' => $resubscribed,
'errors' => $errors,
'fixed' => $this->fix,
];
}
/**
* Given a Stripe customer ID, get their email and search for other customers
* with the same email that have an active subscription.
*
* @return array{email: string, customer_id: string, subscription_id: string, status: string}|null
*/
private function findActiveSubscriptionByEmail(\Stripe\StripeClient $stripe, string $customerId): ?array
{
try {
$customer = $stripe->customers->retrieve($customerId);
$email = $customer->email;
if (! $email) {
return null;
}
usleep(100000);
$customers = $stripe->customers->all([
'email' => $email,
'limit' => 10,
]);
usleep(100000);
foreach ($customers->data as $matchingCustomer) {
if ($matchingCustomer->id === $customerId) {
continue;
}
$subs = $stripe->subscriptions->all([
'customer' => $matchingCustomer->id,
'limit' => 10,
]);
usleep(100000);
foreach ($subs->data as $sub) {
if (in_array($sub->status, ['active', 'past_due'])) {
return [
'email' => $email,
'customer_id' => $matchingCustomer->id,
'subscription_id' => $sub->id,
'status' => $sub->status,
];
}
}
}
} catch (\Exception $e) {
// Silently skip — will fall through to normal discrepancy
}
return null;
}
/**
* Bulk fetch all active and past_due subscription IDs from Stripe.
*
* @return array<string>
*/
private function fetchValidStripeSubscriptionIds(\Stripe\StripeClient $stripe, ?\Closure $onProgress = null): array
{
$validIds = [];
$fetched = 0;
foreach (['active', 'past_due'] as $status) {
foreach ($stripe->subscriptions->all(['status' => $status, 'limit' => 100])->autoPagingIterator() as $sub) {
$validIds[] = $sub->id;
$fetched++;
if ($onProgress) {
$onProgress($fetched);
}
}
}
return $validIds;
}
}

View file

@ -0,0 +1,73 @@
<?php
use App\Models\Server;
use App\Models\Team;
use Illuminate\Foundation\Testing\RefreshDatabase;
uses(RefreshDatabase::class);
it('cleans up servers with unreachable_count >= 3 after 7 days', function () {
$team = Team::factory()->create();
$server = Server::factory()->create([
'team_id' => $team->id,
'unreachable_count' => 50,
'unreachable_notification_sent' => true,
'updated_at' => now()->subDays(8),
]);
$this->artisan('cleanup:unreachable-servers')->assertSuccessful();
$server->refresh();
expect($server->ip)->toBe('1.2.3.4');
});
it('does not clean up servers with unreachable_count less than 3', function () {
$team = Team::factory()->create();
$server = Server::factory()->create([
'team_id' => $team->id,
'unreachable_count' => 2,
'unreachable_notification_sent' => true,
'updated_at' => now()->subDays(8),
]);
$originalIp = $server->ip;
$this->artisan('cleanup:unreachable-servers')->assertSuccessful();
$server->refresh();
expect($server->ip)->toBe($originalIp);
});
it('does not clean up servers updated within 7 days', function () {
$team = Team::factory()->create();
$server = Server::factory()->create([
'team_id' => $team->id,
'unreachable_count' => 10,
'unreachable_notification_sent' => true,
'updated_at' => now()->subDays(3),
]);
$originalIp = $server->ip;
$this->artisan('cleanup:unreachable-servers')->assertSuccessful();
$server->refresh();
expect($server->ip)->toBe($originalIp);
});
it('does not clean up servers without notification sent', function () {
$team = Team::factory()->create();
$server = Server::factory()->create([
'team_id' => $team->id,
'unreachable_count' => 10,
'unreachable_notification_sent' => false,
'updated_at' => now()->subDays(8),
]);
$originalIp = $server->ip;
$this->artisan('cleanup:unreachable-servers')->assertSuccessful();
$server->refresh();
expect($server->ip)->toBe($originalIp);
});

View file

@ -0,0 +1,150 @@
<?php
use App\Jobs\ConnectProxyToNetworksJob;
use App\Jobs\PushServerUpdateJob;
use App\Jobs\ServerStorageCheckJob;
use App\Models\Server;
use App\Models\Team;
use Illuminate\Foundation\Testing\RefreshDatabase;
use Illuminate\Support\Facades\Cache;
use Illuminate\Support\Facades\Queue;
uses(RefreshDatabase::class);
beforeEach(function () {
Queue::fake();
Cache::flush();
});
it('dispatches storage check when disk percentage changes', function () {
$team = Team::factory()->create();
$server = Server::factory()->create(['team_id' => $team->id]);
$data = [
'containers' => [],
'filesystem_usage_root' => ['used_percentage' => 45],
];
$job = new PushServerUpdateJob($server, $data);
$job->handle();
Queue::assertPushed(ServerStorageCheckJob::class, function ($job) use ($server) {
return $job->server->id === $server->id && $job->percentage === 45;
});
});
it('does not dispatch storage check when disk percentage is unchanged', function () {
$team = Team::factory()->create();
$server = Server::factory()->create(['team_id' => $team->id]);
// Simulate a previous push that cached the percentage
Cache::put('storage-check:'.$server->id, 45, 600);
$data = [
'containers' => [],
'filesystem_usage_root' => ['used_percentage' => 45],
];
$job = new PushServerUpdateJob($server, $data);
$job->handle();
Queue::assertNotPushed(ServerStorageCheckJob::class);
});
it('dispatches storage check when disk percentage changes from cached value', function () {
$team = Team::factory()->create();
$server = Server::factory()->create(['team_id' => $team->id]);
// Simulate a previous push that cached 45%
Cache::put('storage-check:'.$server->id, 45, 600);
$data = [
'containers' => [],
'filesystem_usage_root' => ['used_percentage' => 50],
];
$job = new PushServerUpdateJob($server, $data);
$job->handle();
Queue::assertPushed(ServerStorageCheckJob::class, function ($job) use ($server) {
return $job->server->id === $server->id && $job->percentage === 50;
});
});
it('rate-limits ConnectProxyToNetworksJob dispatch to every 10 minutes', function () {
$team = Team::factory()->create();
$server = Server::factory()->create(['team_id' => $team->id]);
$server->settings->update(['is_reachable' => true, 'is_usable' => true]);
// First push: should dispatch ConnectProxyToNetworksJob
$containersWithProxy = [
[
'name' => 'coolify-proxy',
'state' => 'running',
'health_status' => 'healthy',
'labels' => ['coolify.managed' => true],
],
];
$data = [
'containers' => $containersWithProxy,
'filesystem_usage_root' => ['used_percentage' => 10],
];
$job = new PushServerUpdateJob($server, $data);
$job->handle();
Queue::assertPushed(ConnectProxyToNetworksJob::class, 1);
// Second push: should NOT dispatch ConnectProxyToNetworksJob (rate-limited)
Queue::fake();
$job2 = new PushServerUpdateJob($server, $data);
$job2->handle();
Queue::assertNotPushed(ConnectProxyToNetworksJob::class);
});
it('dispatches ConnectProxyToNetworksJob again after cache expires', function () {
$team = Team::factory()->create();
$server = Server::factory()->create(['team_id' => $team->id]);
$server->settings->update(['is_reachable' => true, 'is_usable' => true]);
$containersWithProxy = [
[
'name' => 'coolify-proxy',
'state' => 'running',
'health_status' => 'healthy',
'labels' => ['coolify.managed' => true],
],
];
$data = [
'containers' => $containersWithProxy,
'filesystem_usage_root' => ['used_percentage' => 10],
];
// First push
$job = new PushServerUpdateJob($server, $data);
$job->handle();
Queue::assertPushed(ConnectProxyToNetworksJob::class, 1);
// Clear cache to simulate expiration
Cache::forget('connect-proxy:'.$server->id);
// Next push: should dispatch again
Queue::fake();
$job2 = new PushServerUpdateJob($server, $data);
$job2->handle();
Queue::assertPushed(ConnectProxyToNetworksJob::class, 1);
});
it('uses default queue for PushServerUpdateJob', function () {
$team = Team::factory()->create();
$server = Server::factory()->create(['team_id' => $team->id]);
$job = new PushServerUpdateJob($server, ['containers' => []]);
expect($job->queue)->toBeNull();
});