fix(ssh): serialize initial mux connection creation

Wrap first-use SSH and SCP multiplexed commands with a lock to avoid racing while the control socket is created. Also detect native OpenSSH mux master process names during stale connection cleanup and cover both orphaned and duplicate mux processes with tests.
This commit is contained in:
Andras Bacsai 2026-05-22 18:17:37 +02:00
parent 54a020cf1b
commit 5c67766f41
3 changed files with 148 additions and 16 deletions

View file

@ -41,13 +41,14 @@ public static function generateScpCommand(Server $server, string $source, string
{
$sshConfig = self::serverSshConfiguration($server);
$sshKeyLocation = $sshConfig['sshKeyLocation'];
$multiplexingEnabled = self::isMultiplexingEnabled();
$scpCommand = 'timeout '.config('constants.ssh.command_timeout').' scp ';
if ($server->isIpv6()) {
$scpCommand .= '-6 ';
}
if (self::isMultiplexingEnabled()) {
if ($multiplexingEnabled) {
$scpCommand .= self::multiplexingOptions($server);
}
@ -58,10 +59,14 @@ public static function generateScpCommand(Server $server, string $source, string
$scpCommand .= self::getCommonSshOptions($server, $sshKeyLocation, self::getConnectionTimeout($server), config('constants.ssh.server_interval'), isScp: true);
if ($server->isIpv6()) {
return $scpCommand."{$source} ".escapeshellarg($server->user).'@['.escapeshellarg($server->ip)."]:{$dest}";
$scpCommand .= "{$source} ".escapeshellarg($server->user).'@['.escapeshellarg($server->ip)."]:{$dest}";
} else {
$scpCommand .= "{$source} ".self::escapedUserAtHost($server).":{$dest}";
}
return $scpCommand."{$source} ".self::escapedUserAtHost($server).":{$dest}";
return $multiplexingEnabled
? self::withFirstUseMuxLock($server, $scpCommand)
: $scpCommand;
}
public static function generateSshCommand(Server $server, string $command, bool $disableMultiplexing = false): string
@ -72,12 +77,13 @@ public static function generateSshCommand(Server $server, string $command, bool
$sshConfig = self::serverSshConfiguration($server);
$sshKeyLocation = $sshConfig['sshKeyLocation'];
$multiplexingEnabled = ! $disableMultiplexing && self::isMultiplexingEnabled();
self::validateSshKey($server->privateKey);
$sshCommand = 'timeout '.config('constants.ssh.command_timeout').' ssh ';
if (! $disableMultiplexing && self::isMultiplexingEnabled()) {
if ($multiplexingEnabled) {
$sshCommand .= self::multiplexingOptions($server);
}
@ -90,9 +96,13 @@ public static function generateSshCommand(Server $server, string $command, bool
$delimiter = base64_encode(Hash::make($command));
$command = str_replace($delimiter, '', $command);
return $sshCommand.self::escapedUserAtHost($server)." 'bash -se' << \\$delimiter".PHP_EOL
$sshCommand .= self::escapedUserAtHost($server)." 'bash -se' << \\$delimiter".PHP_EOL
.$command.PHP_EOL
.$delimiter;
return $multiplexingEnabled
? self::withFirstUseMuxLock($server, $sshCommand)
: $sshCommand;
}
private static function multiplexingOptions(Server $server): string
@ -107,6 +117,78 @@ private static function muxSocket(Server $server): string
return '/var/www/html/storage/app/ssh/mux/mux_'.$server->uuid;
}
private static function muxLockDirectory(Server $server): string
{
return self::muxSocket($server).'.lock';
}
private static function withFirstUseMuxLock(Server $server, string $command): string
{
$muxSocket = self::muxSocket($server);
$lockDirectory = self::muxLockDirectory($server);
$lockTimeout = (int) config('constants.ssh.mux_lock_timeout');
$script = <<<'SH'
cmd=$1
socket=$2
lock=$3
timeout=$4
run_command() {
sh -c "$cmd"
}
if [ -S "$socket" ]; then
run_command
exit $?
fi
waited=0
while ! mkdir "$lock" 2>/dev/null; do
if [ -S "$socket" ]; then
run_command
exit $?
fi
if [ "$waited" -ge "$timeout" ]; then
run_command
exit $?
fi
waited=$((waited + 1))
sleep 1
done
cleanup() {
if [ -n "${child:-}" ] && kill -0 "$child" 2>/dev/null; then
kill "$child" 2>/dev/null
fi
rmdir "$lock" 2>/dev/null
}
trap cleanup INT TERM HUP
sh -c "$cmd" &
child=$!
for _ in 1 2 3 4 5 6 7 8 9 10; do
if [ -S "$socket" ] || ! kill -0 "$child" 2>/dev/null; then
break
fi
sleep 0.1
done
rmdir "$lock" 2>/dev/null
wait "$child"
exit $?
SH;
return 'sh -c '.escapeshellarg($script).' -- '
.escapeshellarg($command).' '
.escapeshellarg($muxSocket).' '
.escapeshellarg($lockDirectory).' '
.escapeshellarg((string) $lockTimeout);
}
private static function escapedUserAtHost(Server $server): string
{
return escapeshellarg($server->user).'@'.escapeshellarg($server->ip);

View file

@ -38,10 +38,6 @@ private function cleanupDuplicateSshProcesses(): void
$groups = [];
foreach ($this->listProcesses() as $process) {
if (! preg_match('#(^|/)ssh -fN#', $process['args'])) {
continue;
}
$controlPath = $this->extractControlPath($process['args']);
if (! is_string($controlPath) || ! str_starts_with($controlPath, $muxDir.'/')) {
continue;
@ -75,17 +71,13 @@ private function cleanupOrphanedSshProcesses(): void
$minAge = (int) config('constants.ssh.mux_orphan_min_age');
foreach ($this->listProcesses() as $process) {
// Backgrounded ssh master: current `ssh -fN` or legacy `ssh -fNM`.
if (! preg_match('#(^|/)ssh -fN#', $process['args'])) {
continue;
}
// Only ever touch ssh processes pointing at Coolify's mux directory.
if (! preg_match('#ControlPath=('.preg_quote($muxDir, '#').'/\S+)#', $process['args'], $pathMatch)) {
$controlPath = $this->extractControlPath($process['args']);
if (! is_string($controlPath) || ! str_starts_with($controlPath, $muxDir.'/')) {
continue;
}
if ($process['etimes'] >= $minAge && ! file_exists($pathMatch[1])) {
if ($process['etimes'] >= $minAge && ! file_exists($controlPath)) {
$this->reapOrphan('ssh', $process);
}
}
@ -214,6 +206,10 @@ private function resetDuplicateGroup(string $controlPath, array $processes): voi
private function extractControlPath(string $args): ?string
{
if (! preg_match('/(?:^|\s)-o\s+ControlPath=(?:"([^"]+)"|\'([^\']+)\'|(\S+))/', $args, $matches)) {
if (preg_match('/^ssh:\s+(\S+)\s+\[mux\]$/', $args, $matches)) {
return $matches[1];
}
return null;
}

View file

@ -66,8 +66,10 @@ function makeMuxServer(): Server
$command = SshMultiplexingHelper::generateSshCommand($server, 'echo ok');
expect($command)
->toStartWith('sh -c')
->toContain('-o ControlMaster=auto')
->toContain("-o ControlPath=/var/www/html/storage/app/ssh/mux/mux_{$server->uuid}")
->toContain("/var/www/html/storage/app/ssh/mux/mux_{$server->uuid}.lock")
->toContain('-o ControlPersist=3600')
->not->toContain('ssh -fN')
->not->toContain('-O check');
@ -83,6 +85,7 @@ function makeMuxServer(): Server
$command = SshMultiplexingHelper::generateSshCommand($server, 'echo ok', disableMultiplexing: true);
expect($command)
->not->toStartWith('sh -c')
->not->toContain('-o ControlMaster=auto')
->not->toContain('-o ControlPath=')
->not->toContain('-o ControlPersist=');
@ -97,8 +100,10 @@ function makeMuxServer(): Server
$command = SshMultiplexingHelper::generateScpCommand($server, '/tmp/source', '/tmp/dest');
expect($command)
->toStartWith('sh -c')
->toContain('-o ControlMaster=auto')
->toContain("-o ControlPath=/var/www/html/storage/app/ssh/mux/mux_{$server->uuid}")
->toContain("/var/www/html/storage/app/ssh/mux/mux_{$server->uuid}.lock")
->toContain('-o ControlPersist=3600')
->not->toContain('ssh -fN')
->not->toContain('-O check');
@ -146,6 +151,32 @@ function makeMuxServer(): Server
File::delete($liveSocket);
});
it('kills old orphaned native openssh mux masters whose control socket no longer exists', function () {
config(['constants.ssh.mux_orphan_reap_enabled' => true]);
$muxDir = storage_path('app/ssh/mux');
File::ensureDirectoryExists($muxDir);
$liveSocket = $muxDir.'/mux_native_live_'.uniqid();
$orphanSocket = $muxDir.'/mux_native_orphan_'.uniqid();
File::put($liveSocket, 'x');
Process::fake([
'ps*' => Process::result(output: "111 1 5000 ssh: {$liveSocket} [mux]\n".
"222 1 5000 ssh: {$orphanSocket} [mux]\n"),
'kill*' => Process::result(exitCode: 0),
]);
$job = new CleanupStaleMultiplexedConnections;
$method = new ReflectionMethod($job, 'cleanupOrphanedSshProcesses');
$method->setAccessible(true);
$method->invoke($job);
Process::assertRan(fn ($process) => str_contains($process->command, 'kill') && str_contains($process->command, '222'));
Process::assertNotRan(fn ($process) => str_contains($process->command, 'kill') && str_contains($process->command, '111'));
File::delete($liveSocket);
});
it('kills only old orphaned cloudflared proxies whose parent ssh is gone', function () {
config(['constants.ssh.mux_orphan_reap_enabled' => true]);
@ -211,6 +242,29 @@ function makeMuxServer(): Server
expect(file_exists($controlPath))->toBeFalse();
});
it('resets duplicate native openssh mux process groups atomically when reaping is enabled', function () {
config(['constants.ssh.mux_orphan_reap_enabled' => true]);
$muxDir = storage_path('app/ssh/mux');
File::ensureDirectoryExists($muxDir);
$controlPath = $muxDir.'/mux_native_duplicate_'.uniqid();
File::put($controlPath, 'socket');
Process::fake([
'ps*' => Process::result(output: "111 1 5000 ssh: {$controlPath} [mux]\n".
"222 1 5000 ssh: {$controlPath} [mux]\n"),
'kill*' => Process::result(exitCode: 0),
]);
$job = new CleanupStaleMultiplexedConnections;
$method = new ReflectionMethod($job, 'cleanupDuplicateSshProcesses');
$method->setAccessible(true);
$method->invoke($job);
Process::assertRan(fn ($process) => str_contains($process->command, 'kill') && str_contains($process->command, '111'));
Process::assertRan(fn ($process) => str_contains($process->command, 'kill') && str_contains($process->command, '222'));
expect(file_exists($controlPath))->toBeFalse();
});
it('removes mux files for non-existent servers when reaping is enabled', function () {
config(['constants.ssh.mux_orphan_reap_enabled' => true]);
Storage::fake('ssh-mux');