diff --git a/app/Helpers/SshMultiplexingHelper.php b/app/Helpers/SshMultiplexingHelper.php index 167d8d54f..6984dac4b 100644 --- a/app/Helpers/SshMultiplexingHelper.php +++ b/app/Helpers/SshMultiplexingHelper.php @@ -41,13 +41,14 @@ public static function generateScpCommand(Server $server, string $source, string { $sshConfig = self::serverSshConfiguration($server); $sshKeyLocation = $sshConfig['sshKeyLocation']; + $multiplexingEnabled = self::isMultiplexingEnabled(); $scpCommand = 'timeout '.config('constants.ssh.command_timeout').' scp '; if ($server->isIpv6()) { $scpCommand .= '-6 '; } - if (self::isMultiplexingEnabled()) { + if ($multiplexingEnabled) { $scpCommand .= self::multiplexingOptions($server); } @@ -58,10 +59,14 @@ public static function generateScpCommand(Server $server, string $source, string $scpCommand .= self::getCommonSshOptions($server, $sshKeyLocation, self::getConnectionTimeout($server), config('constants.ssh.server_interval'), isScp: true); if ($server->isIpv6()) { - return $scpCommand."{$source} ".escapeshellarg($server->user).'@['.escapeshellarg($server->ip)."]:{$dest}"; + $scpCommand .= "{$source} ".escapeshellarg($server->user).'@['.escapeshellarg($server->ip)."]:{$dest}"; + } else { + $scpCommand .= "{$source} ".self::escapedUserAtHost($server).":{$dest}"; } - return $scpCommand."{$source} ".self::escapedUserAtHost($server).":{$dest}"; + return $multiplexingEnabled + ? self::withFirstUseMuxLock($server, $scpCommand) + : $scpCommand; } public static function generateSshCommand(Server $server, string $command, bool $disableMultiplexing = false): string @@ -72,12 +77,13 @@ public static function generateSshCommand(Server $server, string $command, bool $sshConfig = self::serverSshConfiguration($server); $sshKeyLocation = $sshConfig['sshKeyLocation']; + $multiplexingEnabled = ! $disableMultiplexing && self::isMultiplexingEnabled(); self::validateSshKey($server->privateKey); $sshCommand = 'timeout '.config('constants.ssh.command_timeout').' ssh '; - if (! $disableMultiplexing && self::isMultiplexingEnabled()) { + if ($multiplexingEnabled) { $sshCommand .= self::multiplexingOptions($server); } @@ -90,9 +96,13 @@ public static function generateSshCommand(Server $server, string $command, bool $delimiter = base64_encode(Hash::make($command)); $command = str_replace($delimiter, '', $command); - return $sshCommand.self::escapedUserAtHost($server)." 'bash -se' << \\$delimiter".PHP_EOL + $sshCommand .= self::escapedUserAtHost($server)." 'bash -se' << \\$delimiter".PHP_EOL .$command.PHP_EOL .$delimiter; + + return $multiplexingEnabled + ? self::withFirstUseMuxLock($server, $sshCommand) + : $sshCommand; } private static function multiplexingOptions(Server $server): string @@ -107,6 +117,78 @@ private static function muxSocket(Server $server): string return '/var/www/html/storage/app/ssh/mux/mux_'.$server->uuid; } + private static function muxLockDirectory(Server $server): string + { + return self::muxSocket($server).'.lock'; + } + + private static function withFirstUseMuxLock(Server $server, string $command): string + { + $muxSocket = self::muxSocket($server); + $lockDirectory = self::muxLockDirectory($server); + $lockTimeout = (int) config('constants.ssh.mux_lock_timeout'); + + $script = <<<'SH' +cmd=$1 +socket=$2 +lock=$3 +timeout=$4 + +run_command() { + sh -c "$cmd" +} + +if [ -S "$socket" ]; then + run_command + exit $? +fi + +waited=0 +while ! mkdir "$lock" 2>/dev/null; do + if [ -S "$socket" ]; then + run_command + exit $? + fi + + if [ "$waited" -ge "$timeout" ]; then + run_command + exit $? + fi + + waited=$((waited + 1)) + sleep 1 +done + +cleanup() { + if [ -n "${child:-}" ] && kill -0 "$child" 2>/dev/null; then + kill "$child" 2>/dev/null + fi + rmdir "$lock" 2>/dev/null +} +trap cleanup INT TERM HUP + +sh -c "$cmd" & +child=$! + +for _ in 1 2 3 4 5 6 7 8 9 10; do + if [ -S "$socket" ] || ! kill -0 "$child" 2>/dev/null; then + break + fi + sleep 0.1 +done + +rmdir "$lock" 2>/dev/null +wait "$child" +exit $? +SH; + + return 'sh -c '.escapeshellarg($script).' -- ' + .escapeshellarg($command).' ' + .escapeshellarg($muxSocket).' ' + .escapeshellarg($lockDirectory).' ' + .escapeshellarg((string) $lockTimeout); + } + private static function escapedUserAtHost(Server $server): string { return escapeshellarg($server->user).'@'.escapeshellarg($server->ip); diff --git a/app/Jobs/CleanupStaleMultiplexedConnections.php b/app/Jobs/CleanupStaleMultiplexedConnections.php index 92e485702..0a10fa420 100644 --- a/app/Jobs/CleanupStaleMultiplexedConnections.php +++ b/app/Jobs/CleanupStaleMultiplexedConnections.php @@ -38,10 +38,6 @@ private function cleanupDuplicateSshProcesses(): void $groups = []; foreach ($this->listProcesses() as $process) { - if (! preg_match('#(^|/)ssh -fN#', $process['args'])) { - continue; - } - $controlPath = $this->extractControlPath($process['args']); if (! is_string($controlPath) || ! str_starts_with($controlPath, $muxDir.'/')) { continue; @@ -75,17 +71,13 @@ private function cleanupOrphanedSshProcesses(): void $minAge = (int) config('constants.ssh.mux_orphan_min_age'); foreach ($this->listProcesses() as $process) { - // Backgrounded ssh master: current `ssh -fN` or legacy `ssh -fNM`. - if (! preg_match('#(^|/)ssh -fN#', $process['args'])) { - continue; - } - // Only ever touch ssh processes pointing at Coolify's mux directory. - if (! preg_match('#ControlPath=('.preg_quote($muxDir, '#').'/\S+)#', $process['args'], $pathMatch)) { + $controlPath = $this->extractControlPath($process['args']); + if (! is_string($controlPath) || ! str_starts_with($controlPath, $muxDir.'/')) { continue; } - if ($process['etimes'] >= $minAge && ! file_exists($pathMatch[1])) { + if ($process['etimes'] >= $minAge && ! file_exists($controlPath)) { $this->reapOrphan('ssh', $process); } } @@ -214,6 +206,10 @@ private function resetDuplicateGroup(string $controlPath, array $processes): voi private function extractControlPath(string $args): ?string { if (! preg_match('/(?:^|\s)-o\s+ControlPath=(?:"([^"]+)"|\'([^\']+)\'|(\S+))/', $args, $matches)) { + if (preg_match('/^ssh:\s+(\S+)\s+\[mux\]$/', $args, $matches)) { + return $matches[1]; + } + return null; } diff --git a/tests/Feature/SshMultiplexingLockTest.php b/tests/Feature/SshMultiplexingLockTest.php index e34b8a735..c39d5a48f 100644 --- a/tests/Feature/SshMultiplexingLockTest.php +++ b/tests/Feature/SshMultiplexingLockTest.php @@ -66,8 +66,10 @@ function makeMuxServer(): Server $command = SshMultiplexingHelper::generateSshCommand($server, 'echo ok'); expect($command) + ->toStartWith('sh -c') ->toContain('-o ControlMaster=auto') ->toContain("-o ControlPath=/var/www/html/storage/app/ssh/mux/mux_{$server->uuid}") + ->toContain("/var/www/html/storage/app/ssh/mux/mux_{$server->uuid}.lock") ->toContain('-o ControlPersist=3600') ->not->toContain('ssh -fN') ->not->toContain('-O check'); @@ -83,6 +85,7 @@ function makeMuxServer(): Server $command = SshMultiplexingHelper::generateSshCommand($server, 'echo ok', disableMultiplexing: true); expect($command) + ->not->toStartWith('sh -c') ->not->toContain('-o ControlMaster=auto') ->not->toContain('-o ControlPath=') ->not->toContain('-o ControlPersist='); @@ -97,8 +100,10 @@ function makeMuxServer(): Server $command = SshMultiplexingHelper::generateScpCommand($server, '/tmp/source', '/tmp/dest'); expect($command) + ->toStartWith('sh -c') ->toContain('-o ControlMaster=auto') ->toContain("-o ControlPath=/var/www/html/storage/app/ssh/mux/mux_{$server->uuid}") + ->toContain("/var/www/html/storage/app/ssh/mux/mux_{$server->uuid}.lock") ->toContain('-o ControlPersist=3600') ->not->toContain('ssh -fN') ->not->toContain('-O check'); @@ -146,6 +151,32 @@ function makeMuxServer(): Server File::delete($liveSocket); }); +it('kills old orphaned native openssh mux masters whose control socket no longer exists', function () { + config(['constants.ssh.mux_orphan_reap_enabled' => true]); + $muxDir = storage_path('app/ssh/mux'); + File::ensureDirectoryExists($muxDir); + + $liveSocket = $muxDir.'/mux_native_live_'.uniqid(); + $orphanSocket = $muxDir.'/mux_native_orphan_'.uniqid(); + File::put($liveSocket, 'x'); + + Process::fake([ + 'ps*' => Process::result(output: "111 1 5000 ssh: {$liveSocket} [mux]\n". + "222 1 5000 ssh: {$orphanSocket} [mux]\n"), + 'kill*' => Process::result(exitCode: 0), + ]); + + $job = new CleanupStaleMultiplexedConnections; + $method = new ReflectionMethod($job, 'cleanupOrphanedSshProcesses'); + $method->setAccessible(true); + $method->invoke($job); + + Process::assertRan(fn ($process) => str_contains($process->command, 'kill') && str_contains($process->command, '222')); + Process::assertNotRan(fn ($process) => str_contains($process->command, 'kill') && str_contains($process->command, '111')); + + File::delete($liveSocket); +}); + it('kills only old orphaned cloudflared proxies whose parent ssh is gone', function () { config(['constants.ssh.mux_orphan_reap_enabled' => true]); @@ -211,6 +242,29 @@ function makeMuxServer(): Server expect(file_exists($controlPath))->toBeFalse(); }); +it('resets duplicate native openssh mux process groups atomically when reaping is enabled', function () { + config(['constants.ssh.mux_orphan_reap_enabled' => true]); + $muxDir = storage_path('app/ssh/mux'); + File::ensureDirectoryExists($muxDir); + $controlPath = $muxDir.'/mux_native_duplicate_'.uniqid(); + File::put($controlPath, 'socket'); + + Process::fake([ + 'ps*' => Process::result(output: "111 1 5000 ssh: {$controlPath} [mux]\n". + "222 1 5000 ssh: {$controlPath} [mux]\n"), + 'kill*' => Process::result(exitCode: 0), + ]); + + $job = new CleanupStaleMultiplexedConnections; + $method = new ReflectionMethod($job, 'cleanupDuplicateSshProcesses'); + $method->setAccessible(true); + $method->invoke($job); + + Process::assertRan(fn ($process) => str_contains($process->command, 'kill') && str_contains($process->command, '111')); + Process::assertRan(fn ($process) => str_contains($process->command, 'kill') && str_contains($process->command, '222')); + expect(file_exists($controlPath))->toBeFalse(); +}); + it('removes mux files for non-existent servers when reaping is enabled', function () { config(['constants.ssh.mux_orphan_reap_enabled' => true]); Storage::fake('ssh-mux');