From 5c67766f41a1df9b05e4955428d15a7772040358 Mon Sep 17 00:00:00 2001 From: Andras Bacsai <5845193+andrasbacsai@users.noreply.github.com> Date: Fri, 22 May 2026 18:17:37 +0200 Subject: [PATCH] fix(ssh): serialize initial mux connection creation Wrap first-use SSH and SCP multiplexed commands with a lock to avoid racing while the control socket is created. Also detect native OpenSSH mux master process names during stale connection cleanup and cover both orphaned and duplicate mux processes with tests. --- app/Helpers/SshMultiplexingHelper.php | 92 ++++++++++++++++++- .../CleanupStaleMultiplexedConnections.php | 18 ++-- tests/Feature/SshMultiplexingLockTest.php | 54 +++++++++++ 3 files changed, 148 insertions(+), 16 deletions(-) diff --git a/app/Helpers/SshMultiplexingHelper.php b/app/Helpers/SshMultiplexingHelper.php index 167d8d54f..6984dac4b 100644 --- a/app/Helpers/SshMultiplexingHelper.php +++ b/app/Helpers/SshMultiplexingHelper.php @@ -41,13 +41,14 @@ public static function generateScpCommand(Server $server, string $source, string { $sshConfig = self::serverSshConfiguration($server); $sshKeyLocation = $sshConfig['sshKeyLocation']; + $multiplexingEnabled = self::isMultiplexingEnabled(); $scpCommand = 'timeout '.config('constants.ssh.command_timeout').' scp '; if ($server->isIpv6()) { $scpCommand .= '-6 '; } - if (self::isMultiplexingEnabled()) { + if ($multiplexingEnabled) { $scpCommand .= self::multiplexingOptions($server); } @@ -58,10 +59,14 @@ public static function generateScpCommand(Server $server, string $source, string $scpCommand .= self::getCommonSshOptions($server, $sshKeyLocation, self::getConnectionTimeout($server), config('constants.ssh.server_interval'), isScp: true); if ($server->isIpv6()) { - return $scpCommand."{$source} ".escapeshellarg($server->user).'@['.escapeshellarg($server->ip)."]:{$dest}"; + $scpCommand .= "{$source} ".escapeshellarg($server->user).'@['.escapeshellarg($server->ip)."]:{$dest}"; + } else { + $scpCommand .= "{$source} ".self::escapedUserAtHost($server).":{$dest}"; } - return $scpCommand."{$source} ".self::escapedUserAtHost($server).":{$dest}"; + return $multiplexingEnabled + ? self::withFirstUseMuxLock($server, $scpCommand) + : $scpCommand; } public static function generateSshCommand(Server $server, string $command, bool $disableMultiplexing = false): string @@ -72,12 +77,13 @@ public static function generateSshCommand(Server $server, string $command, bool $sshConfig = self::serverSshConfiguration($server); $sshKeyLocation = $sshConfig['sshKeyLocation']; + $multiplexingEnabled = ! $disableMultiplexing && self::isMultiplexingEnabled(); self::validateSshKey($server->privateKey); $sshCommand = 'timeout '.config('constants.ssh.command_timeout').' ssh '; - if (! $disableMultiplexing && self::isMultiplexingEnabled()) { + if ($multiplexingEnabled) { $sshCommand .= self::multiplexingOptions($server); } @@ -90,9 +96,13 @@ public static function generateSshCommand(Server $server, string $command, bool $delimiter = base64_encode(Hash::make($command)); $command = str_replace($delimiter, '', $command); - return $sshCommand.self::escapedUserAtHost($server)." 'bash -se' << \\$delimiter".PHP_EOL + $sshCommand .= self::escapedUserAtHost($server)." 'bash -se' << \\$delimiter".PHP_EOL .$command.PHP_EOL .$delimiter; + + return $multiplexingEnabled + ? self::withFirstUseMuxLock($server, $sshCommand) + : $sshCommand; } private static function multiplexingOptions(Server $server): string @@ -107,6 +117,78 @@ private static function muxSocket(Server $server): string return '/var/www/html/storage/app/ssh/mux/mux_'.$server->uuid; } + private static function muxLockDirectory(Server $server): string + { + return self::muxSocket($server).'.lock'; + } + + private static function withFirstUseMuxLock(Server $server, string $command): string + { + $muxSocket = self::muxSocket($server); + $lockDirectory = self::muxLockDirectory($server); + $lockTimeout = (int) config('constants.ssh.mux_lock_timeout'); + + $script = <<<'SH' +cmd=$1 +socket=$2 +lock=$3 +timeout=$4 + +run_command() { + sh -c "$cmd" +} + +if [ -S "$socket" ]; then + run_command + exit $? +fi + +waited=0 +while ! mkdir "$lock" 2>/dev/null; do + if [ -S "$socket" ]; then + run_command + exit $? + fi + + if [ "$waited" -ge "$timeout" ]; then + run_command + exit $? + fi + + waited=$((waited + 1)) + sleep 1 +done + +cleanup() { + if [ -n "${child:-}" ] && kill -0 "$child" 2>/dev/null; then + kill "$child" 2>/dev/null + fi + rmdir "$lock" 2>/dev/null +} +trap cleanup INT TERM HUP + +sh -c "$cmd" & +child=$! + +for _ in 1 2 3 4 5 6 7 8 9 10; do + if [ -S "$socket" ] || ! kill -0 "$child" 2>/dev/null; then + break + fi + sleep 0.1 +done + +rmdir "$lock" 2>/dev/null +wait "$child" +exit $? +SH; + + return 'sh -c '.escapeshellarg($script).' -- ' + .escapeshellarg($command).' ' + .escapeshellarg($muxSocket).' ' + .escapeshellarg($lockDirectory).' ' + .escapeshellarg((string) $lockTimeout); + } + private static function escapedUserAtHost(Server $server): string { return escapeshellarg($server->user).'@'.escapeshellarg($server->ip); diff --git a/app/Jobs/CleanupStaleMultiplexedConnections.php b/app/Jobs/CleanupStaleMultiplexedConnections.php index 92e485702..0a10fa420 100644 --- a/app/Jobs/CleanupStaleMultiplexedConnections.php +++ b/app/Jobs/CleanupStaleMultiplexedConnections.php @@ -38,10 +38,6 @@ private function cleanupDuplicateSshProcesses(): void $groups = []; foreach ($this->listProcesses() as $process) { - if (! preg_match('#(^|/)ssh -fN#', $process['args'])) { - continue; - } - $controlPath = $this->extractControlPath($process['args']); if (! is_string($controlPath) || ! str_starts_with($controlPath, $muxDir.'/')) { continue; @@ -75,17 +71,13 @@ private function cleanupOrphanedSshProcesses(): void $minAge = (int) config('constants.ssh.mux_orphan_min_age'); foreach ($this->listProcesses() as $process) { - // Backgrounded ssh master: current `ssh -fN` or legacy `ssh -fNM`. - if (! preg_match('#(^|/)ssh -fN#', $process['args'])) { - continue; - } - // Only ever touch ssh processes pointing at Coolify's mux directory. - if (! preg_match('#ControlPath=('.preg_quote($muxDir, '#').'/\S+)#', $process['args'], $pathMatch)) { + $controlPath = $this->extractControlPath($process['args']); + if (! is_string($controlPath) || ! str_starts_with($controlPath, $muxDir.'/')) { continue; } - if ($process['etimes'] >= $minAge && ! file_exists($pathMatch[1])) { + if ($process['etimes'] >= $minAge && ! file_exists($controlPath)) { $this->reapOrphan('ssh', $process); } } @@ -214,6 +206,10 @@ private function resetDuplicateGroup(string $controlPath, array $processes): voi private function extractControlPath(string $args): ?string { if (! preg_match('/(?:^|\s)-o\s+ControlPath=(?:"([^"]+)"|\'([^\']+)\'|(\S+))/', $args, $matches)) { + if (preg_match('/^ssh:\s+(\S+)\s+\[mux\]$/', $args, $matches)) { + return $matches[1]; + } + return null; } diff --git a/tests/Feature/SshMultiplexingLockTest.php b/tests/Feature/SshMultiplexingLockTest.php index e34b8a735..c39d5a48f 100644 --- a/tests/Feature/SshMultiplexingLockTest.php +++ b/tests/Feature/SshMultiplexingLockTest.php @@ -66,8 +66,10 @@ function makeMuxServer(): Server $command = SshMultiplexingHelper::generateSshCommand($server, 'echo ok'); expect($command) + ->toStartWith('sh -c') ->toContain('-o ControlMaster=auto') ->toContain("-o ControlPath=/var/www/html/storage/app/ssh/mux/mux_{$server->uuid}") + ->toContain("/var/www/html/storage/app/ssh/mux/mux_{$server->uuid}.lock") ->toContain('-o ControlPersist=3600') ->not->toContain('ssh -fN') ->not->toContain('-O check'); @@ -83,6 +85,7 @@ function makeMuxServer(): Server $command = SshMultiplexingHelper::generateSshCommand($server, 'echo ok', disableMultiplexing: true); expect($command) + ->not->toStartWith('sh -c') ->not->toContain('-o ControlMaster=auto') ->not->toContain('-o ControlPath=') ->not->toContain('-o ControlPersist='); @@ -97,8 +100,10 @@ function makeMuxServer(): Server $command = SshMultiplexingHelper::generateScpCommand($server, '/tmp/source', '/tmp/dest'); expect($command) + ->toStartWith('sh -c') ->toContain('-o ControlMaster=auto') ->toContain("-o ControlPath=/var/www/html/storage/app/ssh/mux/mux_{$server->uuid}") + ->toContain("/var/www/html/storage/app/ssh/mux/mux_{$server->uuid}.lock") ->toContain('-o ControlPersist=3600') ->not->toContain('ssh -fN') ->not->toContain('-O check'); @@ -146,6 +151,32 @@ function makeMuxServer(): Server File::delete($liveSocket); }); +it('kills old orphaned native openssh mux masters whose control socket no longer exists', function () { + config(['constants.ssh.mux_orphan_reap_enabled' => true]); + $muxDir = storage_path('app/ssh/mux'); + File::ensureDirectoryExists($muxDir); + + $liveSocket = $muxDir.'/mux_native_live_'.uniqid(); + $orphanSocket = $muxDir.'/mux_native_orphan_'.uniqid(); + File::put($liveSocket, 'x'); + + Process::fake([ + 'ps*' => Process::result(output: "111 1 5000 ssh: {$liveSocket} [mux]\n". + "222 1 5000 ssh: {$orphanSocket} [mux]\n"), + 'kill*' => Process::result(exitCode: 0), + ]); + + $job = new CleanupStaleMultiplexedConnections; + $method = new ReflectionMethod($job, 'cleanupOrphanedSshProcesses'); + $method->setAccessible(true); + $method->invoke($job); + + Process::assertRan(fn ($process) => str_contains($process->command, 'kill') && str_contains($process->command, '222')); + Process::assertNotRan(fn ($process) => str_contains($process->command, 'kill') && str_contains($process->command, '111')); + + File::delete($liveSocket); +}); + it('kills only old orphaned cloudflared proxies whose parent ssh is gone', function () { config(['constants.ssh.mux_orphan_reap_enabled' => true]); @@ -211,6 +242,29 @@ function makeMuxServer(): Server expect(file_exists($controlPath))->toBeFalse(); }); +it('resets duplicate native openssh mux process groups atomically when reaping is enabled', function () { + config(['constants.ssh.mux_orphan_reap_enabled' => true]); + $muxDir = storage_path('app/ssh/mux'); + File::ensureDirectoryExists($muxDir); + $controlPath = $muxDir.'/mux_native_duplicate_'.uniqid(); + File::put($controlPath, 'socket'); + + Process::fake([ + 'ps*' => Process::result(output: "111 1 5000 ssh: {$controlPath} [mux]\n". + "222 1 5000 ssh: {$controlPath} [mux]\n"), + 'kill*' => Process::result(exitCode: 0), + ]); + + $job = new CleanupStaleMultiplexedConnections; + $method = new ReflectionMethod($job, 'cleanupDuplicateSshProcesses'); + $method->setAccessible(true); + $method->invoke($job); + + Process::assertRan(fn ($process) => str_contains($process->command, 'kill') && str_contains($process->command, '111')); + Process::assertRan(fn ($process) => str_contains($process->command, 'kill') && str_contains($process->command, '222')); + expect(file_exists($controlPath))->toBeFalse(); +}); + it('removes mux files for non-existent servers when reaping is enabled', function () { config(['constants.ssh.mux_orphan_reap_enabled' => true]); Storage::fake('ssh-mux');