From 1ac76881aab2396a85750f07c653471428de553c Mon Sep 17 00:00:00 2001 From: Ondrej Mirtes Date: Thu, 14 May 2026 22:36:18 +0200 Subject: [PATCH 1/7] Introduce Process interface and ProcessBase for parallel workers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Splits the parallel worker into a process-creation-agnostic half (ProcessBase: the TCP/NDJSON connection and timeout timer) and the process-creation half. The Process interface is what ParallelAnalyser and ProcessPool talk to; the existing react/child-process implementation is renamed to SpawnedProcess. Sole implementation for now — a pcntl_fork() based one follows. Co-Authored-By: Claude Opus 4.7 (1M context) --- build/baseline-7.4.neon | 4 +- src/Parallel/ParallelAnalyser.php | 26 +++++- src/Parallel/Process.php | 143 +++--------------------------- src/Parallel/ProcessBase.php | 107 ++++++++++++++++++++++ src/Parallel/SpawnedProcess.php | 93 +++++++++++++++++++ 5 files changed, 239 insertions(+), 134 deletions(-) create mode 100644 src/Parallel/ProcessBase.php create mode 100644 src/Parallel/SpawnedProcess.php diff --git a/build/baseline-7.4.neon b/build/baseline-7.4.neon index b28b9f9f5de..7dee4eae344 100644 --- a/build/baseline-7.4.neon +++ b/build/baseline-7.4.neon @@ -11,9 +11,9 @@ parameters: path: ../src/Parallel/ParallelAnalyser.php - - message: "#^Class PHPStan\\\\Parallel\\\\Process has an uninitialized property \\$process\\. Give it default value or assign it in the constructor\\.$#" + message: "#^Class PHPStan\\\\Parallel\\\\SpawnedProcess has an uninitialized property \\$process\\. Give it default value or assign it in the constructor\\.$#" count: 1 - path: ../src/Parallel/Process.php + path: ../src/Parallel/SpawnedProcess.php - message: "#^Class PHPStan\\\\PhpDoc\\\\ResolvedPhpDocBlock has an uninitialized property \\$phpDocNodes\\. Give it default value or assign it in the constructor\\.$#" count: 1 diff --git a/src/Parallel/ParallelAnalyser.php b/src/Parallel/ParallelAnalyser.php index 248b52a4a34..55e209c9f0e 100644 --- a/src/Parallel/ParallelAnalyser.php +++ b/src/Parallel/ParallelAnalyser.php @@ -190,13 +190,13 @@ public function analyse( $commandOptions[] = escapeshellarg($insteadOfFile); } - $process = new Process(ProcessHelper::getWorkerCommand( + $process = $this->createProcess( + $loop, $mainScript, - 'worker', $projectConfigFile, $commandOptions, $input, - ), $loop, $this->processTimeout); + ); $process->start(function (array $json) use ($process, &$internalErrors, &$errors, &$filteredPhpErrors, &$allPhpErrors, &$locallyIgnoredErrors, &$linesToIgnore, &$unmatchedLineIgnores, &$collectedData, &$dependencies, &$usedTraitDependencies, &$exportedNodes, &$peakMemoryUsages, &$jobs, $postFileCallback, &$internalErrorsCount, &$reachedInternalErrorsCountLimit, $processIdentifier, $onFileAnalysisHandler, &$allProcessedFiles): void { $fileErrors = []; foreach ($json['errors'] as $jsonError) { @@ -355,4 +355,24 @@ public function analyse( return $deferred->promise(); } + /** + * @param string[] $commandOptions + */ + private function createProcess( + LoopInterface $loop, + string $mainScript, + ?string $projectConfigFile, + array $commandOptions, + InputInterface $input, + ): Process + { + return new SpawnedProcess(ProcessHelper::getWorkerCommand( + $mainScript, + 'worker', + $projectConfigFile, + $commandOptions, + $input, + ), $loop, $this->processTimeout); + } + } diff --git a/src/Parallel/Process.php b/src/Parallel/Process.php index 94d1dacc678..3033c13c059 100644 --- a/src/Parallel/Process.php +++ b/src/Parallel/Process.php @@ -2,151 +2,36 @@ namespace PHPStan\Parallel; -use PHPStan\ShouldNotHappenException; -use React\EventLoop\LoopInterface; -use React\EventLoop\TimerInterface; use React\Stream\ReadableStreamInterface; use React\Stream\WritableStreamInterface; use Throwable; -use function fclose; -use function rewind; -use function sprintf; -use function stream_get_contents; -use function tmpfile; -final class Process +/** + * A parallel analysis worker as seen by ParallelAnalyser / ProcessPool. + * + * Implementations differ only in how the worker process comes to life: + * SpawnedProcess spawns a fresh PHP process via react/child-process, + * ForkedProcess forks the already-booted main process via pcntl_fork(). Both + * then speak the same TCP + NDJSON protocol, so request()/quit()/ + * bindConnection() behave identically and live in ProcessBase. + */ +interface Process { - private \React\ChildProcess\Process $process; - - private ?WritableStreamInterface $in = null; - - /** @var resource */ - private $stdOut; - - /** @var resource */ - private $stdErr; - - /** @var callable(mixed[] $json) : void */ - private $onData; - - /** @var callable(Throwable $exception): void */ - private $onError; - - private ?TimerInterface $timer = null; - - public function __construct( - private string $command, - private LoopInterface $loop, - private float $timeoutSeconds, - ) - { - } - /** * @param callable(mixed[] $json) : void $onData * @param callable(Throwable $exception): void $onError * @param callable(?int $exitCode, string $output) : void $onExit */ - public function start(callable $onData, callable $onError, callable $onExit): void - { - $tmpStdOut = tmpfile(); - if ($tmpStdOut === false) { - throw new ShouldNotHappenException('Failed creating temp file for stdout.'); - } - $tmpStdErr = tmpfile(); - if ($tmpStdErr === false) { - throw new ShouldNotHappenException('Failed creating temp file for stderr.'); - } - $this->stdOut = $tmpStdOut; - $this->stdErr = $tmpStdErr; - $this->process = new \React\ChildProcess\Process($this->command, fds: [ - 1 => $this->stdOut, - 2 => $this->stdErr, - ]); - $this->process->start($this->loop); - $this->onData = $onData; - $this->onError = $onError; - $this->process->on('exit', function ($exitCode) use ($onExit): void { - $this->cancelTimer(); - - $output = ''; - rewind($this->stdOut); - $output .= stream_get_contents($this->stdOut); - - rewind($this->stdErr); - $output .= stream_get_contents($this->stdErr); - - $onExit($exitCode, $output); - fclose($this->stdOut); - fclose($this->stdErr); - }); - } - - private function cancelTimer(): void - { - if ($this->timer === null) { - return; - } - - $this->loop->cancelTimer($this->timer); - $this->timer = null; - } + public function start(callable $onData, callable $onError, callable $onExit): void; /** * @param mixed[] $data */ - public function request(array $data): void - { - $this->cancelTimer(); - if ($this->in === null) { - throw new ShouldNotHappenException(); - } - $this->in->write($data); - $this->timer = $this->loop->addTimer($this->timeoutSeconds, function (): void { - $onError = $this->onError; - $onError(new ProcessTimedOutException(sprintf('Child process timed out after %.1f seconds. Try making it longer with parallel.processTimeout setting.', $this->timeoutSeconds))); - }); - } - - public function quit(): void - { - $this->cancelTimer(); - if (!$this->process->isRunning()) { - return; - } - - foreach ($this->process->pipes as $pipe) { - $pipe->close(); - } - - if ($this->in === null) { - return; - } - - $this->in->end(); - } + public function request(array $data): void; - public function bindConnection(ReadableStreamInterface $out, WritableStreamInterface $in): void - { - $out->on('data', function (array $json): void { - $this->cancelTimer(); - if ($json['action'] !== 'result') { - return; - } + public function quit(): void; - $onData = $this->onData; - $onData($json['result']); - }); - $this->in = $in; - $out->on('error', function (Throwable $error): void { - $onError = $this->onError; - $onError($error); - }); - $in->on('error', function (Throwable $error): void { - $onError = $this->onError; - $onError($error); - }); - } + public function bindConnection(ReadableStreamInterface $out, WritableStreamInterface $in): void; } diff --git a/src/Parallel/ProcessBase.php b/src/Parallel/ProcessBase.php new file mode 100644 index 00000000000..d623f26a25c --- /dev/null +++ b/src/Parallel/ProcessBase.php @@ -0,0 +1,107 @@ +onData = $onData; + $this->onError = $onError; + } + + protected function cancelTimer(): void + { + if ($this->timer === null) { + return; + } + + $this->loop->cancelTimer($this->timer); + $this->timer = null; + } + + /** Cancels the timeout timer and ends the writable side of the connection. */ + protected function endConnection(): void + { + $this->cancelTimer(); + if ($this->in === null) { + return; + } + + $this->in->end(); + } + + /** + * @param mixed[] $data + */ + public function request(array $data): void + { + $this->cancelTimer(); + if ($this->in === null) { + throw new ShouldNotHappenException(); + } + $this->in->write($data); + $this->timer = $this->loop->addTimer($this->timeoutSeconds, function (): void { + $onError = $this->onError; + $onError(new ProcessTimedOutException(sprintf('Child process timed out after %.1f seconds. Try making it longer with parallel.processTimeout setting.', $this->timeoutSeconds))); + }); + } + + public function bindConnection(ReadableStreamInterface $out, WritableStreamInterface $in): void + { + $out->on('data', function (array $json): void { + $this->cancelTimer(); + if ($json['action'] !== 'result') { + return; + } + + $onData = $this->onData; + $onData($json['result']); + }); + $this->in = $in; + $out->on('error', function (Throwable $error): void { + $onError = $this->onError; + $onError($error); + }); + $in->on('error', function (Throwable $error): void { + $onError = $this->onError; + $onError($error); + }); + } + +} diff --git a/src/Parallel/SpawnedProcess.php b/src/Parallel/SpawnedProcess.php new file mode 100644 index 00000000000..a7a1954f52e --- /dev/null +++ b/src/Parallel/SpawnedProcess.php @@ -0,0 +1,93 @@ +stdOut = $tmpStdOut; + $this->stdErr = $tmpStdErr; + $this->process = new Process($this->command, fds: [ + 1 => $this->stdOut, + 2 => $this->stdErr, + ]); + $this->process->start($this->loop); + $this->setCallbacks($onData, $onError); + $this->process->on('exit', function ($exitCode) use ($onExit): void { + $this->cancelTimer(); + + $output = ''; + rewind($this->stdOut); + $output .= stream_get_contents($this->stdOut); + + rewind($this->stdErr); + $output .= stream_get_contents($this->stdErr); + + $onExit($exitCode, $output); + fclose($this->stdOut); + fclose($this->stdErr); + }); + } + + public function quit(): void + { + $this->cancelTimer(); + if (!$this->process->isRunning()) { + return; + } + + foreach ($this->process->pipes as $pipe) { + $pipe->close(); + } + + $this->endConnection(); + } + +} From 606bf021d74493b026c7c94d3bf704944bea1152 Mon Sep 17 00:00:00 2001 From: Ondrej Mirtes Date: Thu, 14 May 2026 22:36:36 +0200 Subject: [PATCH 2/7] Extract WorkerRunner from WorkerCommand MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Moves everything WorkerCommand does after CommandHelper::begin() — the TCP connect, NDJSON handshake and per-file analysis loop — into a reusable WorkerRunner service. WorkerCommand now only boots and delegates. This makes the post-boot worker logic callable without a re-boot, which the pcntl_fork() path will rely on. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/Command/WorkerCommand.php | 214 ++--------------------------- src/Parallel/WorkerRunner.php | 247 ++++++++++++++++++++++++++++++++++ 2 files changed, 260 insertions(+), 201 deletions(-) create mode 100644 src/Parallel/WorkerRunner.php diff --git a/src/Command/WorkerCommand.php b/src/Command/WorkerCommand.php index d5f42b87956..a1dbab37f50 100644 --- a/src/Command/WorkerCommand.php +++ b/src/Command/WorkerCommand.php @@ -2,38 +2,18 @@ namespace PHPStan\Command; -use Clue\React\NDJson\Decoder; -use Clue\React\NDJson\Encoder; use Override; -use PHPStan\Analyser\FileAnalyser; -use PHPStan\Analyser\InternalError; -use PHPStan\Analyser\NodeScopeResolver; -use PHPStan\Collectors\Registry as CollectorRegistry; -use PHPStan\DependencyInjection\Container; use PHPStan\File\PathNotFoundException; -use PHPStan\Rules\Registry as RuleRegistry; +use PHPStan\Parallel\WorkerRunner; use PHPStan\ShouldNotHappenException; -use React\EventLoop\StreamSelectLoop; -use React\Socket\ConnectionInterface; -use React\Socket\TcpConnector; -use React\Stream\ReadableStreamInterface; -use React\Stream\WritableStreamInterface; use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Input\InputArgument; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Output\OutputInterface; -use Throwable; -use function array_fill_keys; -use function array_filter; -use function array_merge; -use function array_unshift; -use function array_values; -use function defined; use function is_array; use function is_bool; use function is_string; -use function memory_get_peak_usage; use function sprintf; final class WorkerCommand extends Command @@ -41,8 +21,6 @@ final class WorkerCommand extends Command private const NAME = 'worker'; - private int $errorCount = 0; - /** * @param string[] $composerAutoloaderProjectPaths */ @@ -122,13 +100,11 @@ protected function execute(InputInterface $input, OutputInterface $output): int } catch (InceptionNotSuccessfulException $e) { return 1; } - $loop = new StreamSelectLoop(); $container = $inceptionResult->getContainer(); try { [$analysedFiles] = $inceptionResult->getFiles(); - $analysedFiles = $this->switchTmpFile($analysedFiles, $insteadOfFile, $tmpFile); } catch (PathNotFoundException $e) { $inceptionResult->getErrorOutput()->writeLineFormatted(sprintf('%s', $e->getMessage())); return 1; @@ -136,182 +112,18 @@ protected function execute(InputInterface $input, OutputInterface $output): int return 1; } - $nodeScopeResolver = $container->getByType(NodeScopeResolver::class); - $nodeScopeResolver->setAnalysedFiles($analysedFiles); - - $analysedFiles = array_fill_keys($analysedFiles, true); - - $tcpConnector = new TcpConnector($loop); - $tcpConnector->connect(sprintf('127.0.0.1:%d', (int) $port))->then(function (ConnectionInterface $connection) use ($container, $identifier, $output, $analysedFiles, $tmpFile, $insteadOfFile): void { - // phpcs:disable SlevomatCodingStandard.Namespaces.ReferenceUsedNamesOnly - $jsonInvalidUtf8Ignore = defined('JSON_INVALID_UTF8_IGNORE') ? JSON_INVALID_UTF8_IGNORE : 0; - // phpcs:enable - $out = new Encoder($connection, $jsonInvalidUtf8Ignore); - $in = new Decoder($connection, true, options: $jsonInvalidUtf8Ignore, maxlength: $container->getParameter('parallel')['buffer']); - $out->write(['action' => 'hello', 'identifier' => $identifier]); - $this->runWorker($container, $out, $in, $output, $analysedFiles, $tmpFile, $insteadOfFile); - }); - - $loop->run(); - - if ($this->errorCount > 0) { - return 1; - } - - return 0; - } - - /** - * @param array $analysedFiles - */ - private function runWorker( - Container $container, - WritableStreamInterface $out, - ReadableStreamInterface $in, - OutputInterface $output, - array $analysedFiles, - ?string $tmpFile, - ?string $insteadOfFile, - ): void - { - $handleError = function (Throwable $error) use ($out, $output): void { - $this->errorCount++; - $output->writeln(sprintf('Error: %s', $error->getMessage())); - $out->write([ - 'action' => 'result', - 'result' => [ - 'errors' => [], - 'internalErrors' => [ - new InternalError( - $error->getMessage(), - 'communicating with main process in parallel worker', - InternalError::prepareTrace($error), - $error->getTraceAsString(), - shouldReportBug: true, - ), - ], - 'filteredPhpErrors' => [], - 'allPhpErrors' => [], - 'locallyIgnoredErrors' => [], - 'linesToIgnore' => [], - 'unmatchedLineIgnores' => [], - 'collectedData' => [], - 'memoryUsage' => memory_get_peak_usage(true), - 'dependencies' => [], - 'exportedNodes' => [], - 'files' => [], - 'internalErrorsCount' => 1, - ], - ]); - $out->end(); - }; - $out->on('error', $handleError); - $fileAnalyser = $container->getByType(FileAnalyser::class); - $ruleRegistry = $container->getByType(RuleRegistry::class); - $collectorRegistry = $container->getByType(CollectorRegistry::class); - $in->on('data', static function (array $json) use ($fileAnalyser, $ruleRegistry, $collectorRegistry, $out, $analysedFiles, $tmpFile, $insteadOfFile): void { - $action = $json['action']; - if ($action !== 'analyse') { - return; - } - - $internalErrorsCount = 0; - $files = $json['files']; - $errors = []; - $internalErrors = []; - $filteredPhpErrors = []; - $allPhpErrors = []; - $locallyIgnoredErrors = []; - $linesToIgnore = []; - $unmatchedLineIgnores = []; - $collectedData = []; - $dependencies = []; - $usedTraitDependencies = []; - $exportedNodes = []; - $processedFiles = []; - foreach ($files as $file) { - try { - if ($file === $insteadOfFile) { - $file = $tmpFile; - } - $fileAnalyserResult = $fileAnalyser->analyseFile($file, $analysedFiles, $ruleRegistry, $collectorRegistry, null); - $fileErrors = $fileAnalyserResult->getErrors(); - $filteredPhpErrors = array_merge($filteredPhpErrors, $fileAnalyserResult->getFilteredPhpErrors()); - $allPhpErrors = array_merge($allPhpErrors, $fileAnalyserResult->getAllPhpErrors()); - $linesToIgnore[$file] = $fileAnalyserResult->getLinesToIgnore(); - $unmatchedLineIgnores[$file] = $fileAnalyserResult->getUnmatchedLineIgnores(); - $dependencies[$file] = $fileAnalyserResult->getDependencies(); - $usedTraitDependencies[$file] = $fileAnalyserResult->getUsedTraitDependencies(); - $exportedNodes[$file] = $fileAnalyserResult->getExportedNodes(); - $processedFiles = array_merge($processedFiles, $fileAnalyserResult->getProcessedFiles()); - foreach ($fileErrors as $fileError) { - $errors[] = $fileError; - } - foreach ($fileAnalyserResult->getLocallyIgnoredErrors() as $locallyIgnoredError) { - $locallyIgnoredErrors[] = $locallyIgnoredError; - } - foreach ($fileAnalyserResult->getCollectedData() as $collectedFile => $dataPerCollector) { - foreach ($dataPerCollector as $collectorType => $collectorData) { - foreach ($collectorData as $data) { - $collectedData[$collectedFile][$collectorType][] = $data; - } - } - } - } catch (Throwable $t) { - $internalErrorsCount++; - $internalErrors[] = new InternalError( - $t->getMessage(), - sprintf('analysing file %s', $file), - InternalError::prepareTrace($t), - $t->getTraceAsString(), - shouldReportBug: true, - ); - } - } - - $out->write([ - 'action' => 'result', - 'result' => [ - 'errors' => $errors, - 'internalErrors' => $internalErrors, - 'filteredPhpErrors' => $filteredPhpErrors, - 'allPhpErrors' => $allPhpErrors, - 'locallyIgnoredErrors' => $locallyIgnoredErrors, - 'linesToIgnore' => $linesToIgnore, - 'unmatchedLineIgnores' => $unmatchedLineIgnores, - 'collectedData' => $collectedData, - 'memoryUsage' => memory_get_peak_usage(true), - 'dependencies' => $dependencies, - 'usedTraitDependencies' => $usedTraitDependencies, - 'exportedNodes' => $exportedNodes, - 'files' => $files, - 'processedFiles' => $processedFiles, - 'internalErrorsCount' => $internalErrorsCount, - ]]); - }); - $in->on('error', $handleError); - } - - /** - * @param string[] $analysedFiles - * @return string[] - */ - private function switchTmpFile( - array $analysedFiles, - ?string $insteadOfFile, - ?string $tmpFile, - ): array - { - if ($insteadOfFile === null) { - return $analysedFiles; - } - $analysedFiles = array_values(array_filter($analysedFiles, static fn (string $file): bool => $file !== $insteadOfFile)); - - if ($tmpFile !== null) { - array_unshift($analysedFiles, $tmpFile); - } - - return $analysedFiles; + // Everything after the boot lives in WorkerRunner so a pcntl_fork()-ed + // child can reuse it without re-booting (see ParallelAnalyser). + $workerRunner = $container->getByType(WorkerRunner::class); + + return $workerRunner->run( + $output, + $analysedFiles, + (int) $port, + $identifier, + $tmpFile, + $insteadOfFile, + ); } } diff --git a/src/Parallel/WorkerRunner.php b/src/Parallel/WorkerRunner.php new file mode 100644 index 00000000000..6cb597dc2d8 --- /dev/null +++ b/src/Parallel/WorkerRunner.php @@ -0,0 +1,247 @@ +switchTmpFile($analysedFiles, $insteadOfFile, $tmpFile); + $this->nodeScopeResolver->setAnalysedFiles($analysedFiles); + $analysedFiles = array_fill_keys($analysedFiles, true); + + // Always a fresh event loop: in a forked child the parent's inherited + // loop must never be touched. + $loop = new StreamSelectLoop(); + $errorCount = 0; + + $tcpConnector = new TcpConnector($loop); + $tcpConnector->connect(sprintf('127.0.0.1:%d', $port))->then(function (ConnectionInterface $connection) use ($identifier, $output, $analysedFiles, $tmpFile, $insteadOfFile, &$errorCount): void { + // phpcs:disable SlevomatCodingStandard.Namespaces.ReferenceUsedNamesOnly + $jsonInvalidUtf8Ignore = defined('JSON_INVALID_UTF8_IGNORE') ? JSON_INVALID_UTF8_IGNORE : 0; + // phpcs:enable + $out = new Encoder($connection, $jsonInvalidUtf8Ignore); + $in = new Decoder($connection, true, options: $jsonInvalidUtf8Ignore, maxlength: $this->decoderBufferSize); + $out->write(['action' => 'hello', 'identifier' => $identifier]); + $this->runWorker($out, $in, $output, $analysedFiles, $tmpFile, $insteadOfFile, $errorCount); + }); + + $loop->run(); + + return $errorCount > 0 ? 1 : 0; + } + + /** + * @param array $analysedFiles + */ + private function runWorker( + WritableStreamInterface $out, + ReadableStreamInterface $in, + OutputInterface $output, + array $analysedFiles, + ?string $tmpFile, + ?string $insteadOfFile, + int &$errorCount, + ): void + { + $handleError = static function (Throwable $error) use ($out, $output, &$errorCount): void { + $errorCount++; + $output->writeln(sprintf('Error: %s', $error->getMessage())); + $out->write([ + 'action' => 'result', + 'result' => [ + 'errors' => [], + 'internalErrors' => [ + new InternalError( + $error->getMessage(), + 'communicating with main process in parallel worker', + InternalError::prepareTrace($error), + $error->getTraceAsString(), + shouldReportBug: true, + ), + ], + 'filteredPhpErrors' => [], + 'allPhpErrors' => [], + 'locallyIgnoredErrors' => [], + 'linesToIgnore' => [], + 'unmatchedLineIgnores' => [], + 'collectedData' => [], + 'memoryUsage' => memory_get_peak_usage(true), + 'dependencies' => [], + 'exportedNodes' => [], + 'files' => [], + 'internalErrorsCount' => 1, + ], + ]); + $out->end(); + }; + $out->on('error', $handleError); + $fileAnalyser = $this->fileAnalyser; + $ruleRegistry = $this->ruleRegistry; + $collectorRegistry = $this->collectorRegistry; + $in->on('data', static function (array $json) use ($fileAnalyser, $ruleRegistry, $collectorRegistry, $out, $analysedFiles, $tmpFile, $insteadOfFile): void { + $action = $json['action']; + if ($action !== 'analyse') { + return; + } + + $internalErrorsCount = 0; + $files = $json['files']; + $errors = []; + $internalErrors = []; + $filteredPhpErrors = []; + $allPhpErrors = []; + $locallyIgnoredErrors = []; + $linesToIgnore = []; + $unmatchedLineIgnores = []; + $collectedData = []; + $dependencies = []; + $usedTraitDependencies = []; + $exportedNodes = []; + $processedFiles = []; + foreach ($files as $file) { + try { + if ($file === $insteadOfFile) { + $file = $tmpFile; + } + $fileAnalyserResult = $fileAnalyser->analyseFile($file, $analysedFiles, $ruleRegistry, $collectorRegistry, null); + $fileErrors = $fileAnalyserResult->getErrors(); + $filteredPhpErrors = array_merge($filteredPhpErrors, $fileAnalyserResult->getFilteredPhpErrors()); + $allPhpErrors = array_merge($allPhpErrors, $fileAnalyserResult->getAllPhpErrors()); + $linesToIgnore[$file] = $fileAnalyserResult->getLinesToIgnore(); + $unmatchedLineIgnores[$file] = $fileAnalyserResult->getUnmatchedLineIgnores(); + $dependencies[$file] = $fileAnalyserResult->getDependencies(); + $usedTraitDependencies[$file] = $fileAnalyserResult->getUsedTraitDependencies(); + $exportedNodes[$file] = $fileAnalyserResult->getExportedNodes(); + $processedFiles = array_merge($processedFiles, $fileAnalyserResult->getProcessedFiles()); + foreach ($fileErrors as $fileError) { + $errors[] = $fileError; + } + foreach ($fileAnalyserResult->getLocallyIgnoredErrors() as $locallyIgnoredError) { + $locallyIgnoredErrors[] = $locallyIgnoredError; + } + foreach ($fileAnalyserResult->getCollectedData() as $collectedFile => $dataPerCollector) { + foreach ($dataPerCollector as $collectorType => $collectorData) { + foreach ($collectorData as $data) { + $collectedData[$collectedFile][$collectorType][] = $data; + } + } + } + } catch (Throwable $t) { + $internalErrorsCount++; + $internalErrors[] = new InternalError( + $t->getMessage(), + sprintf('analysing file %s', $file), + InternalError::prepareTrace($t), + $t->getTraceAsString(), + shouldReportBug: true, + ); + } + } + + $out->write([ + 'action' => 'result', + 'result' => [ + 'errors' => $errors, + 'internalErrors' => $internalErrors, + 'filteredPhpErrors' => $filteredPhpErrors, + 'allPhpErrors' => $allPhpErrors, + 'locallyIgnoredErrors' => $locallyIgnoredErrors, + 'linesToIgnore' => $linesToIgnore, + 'unmatchedLineIgnores' => $unmatchedLineIgnores, + 'collectedData' => $collectedData, + 'memoryUsage' => memory_get_peak_usage(true), + 'dependencies' => $dependencies, + 'usedTraitDependencies' => $usedTraitDependencies, + 'exportedNodes' => $exportedNodes, + 'files' => $files, + 'processedFiles' => $processedFiles, + 'internalErrorsCount' => $internalErrorsCount, + ]]); + }); + $in->on('error', $handleError); + } + + /** + * @param string[] $analysedFiles + * @return string[] + */ + private function switchTmpFile( + array $analysedFiles, + ?string $insteadOfFile, + ?string $tmpFile, + ): array + { + if ($insteadOfFile === null) { + return $analysedFiles; + } + $analysedFiles = array_values(array_filter($analysedFiles, static fn (string $file): bool => $file !== $insteadOfFile)); + + if ($tmpFile !== null) { + array_unshift($analysedFiles, $tmpFile); + } + + return $analysedFiles; + } + +} From 639b4d4758660d150b734eac870ca36e5bef0180 Mon Sep 17 00:00:00 2001 From: Ondrej Mirtes Date: Thu, 14 May 2026 22:47:15 +0200 Subject: [PATCH 3/7] Add pcntl_fork() parallel worker path MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When PHPSTAN_PARALLEL_FORK=1 is set and OPcache + JIT are off, ParallelAnalyser forks workers from the already-booted process (ForkedProcess) instead of spawning fresh ones — the fork inherits the DI container, skipping the per-worker re-boot. Forked workers still speak the same TCP + NDJSON protocol. ForkParallelChecker gates the path; the full analysed-files list is now threaded through ParallelAnalyser::analyse() so the forked child can set it on NodeScopeResolver — its two callers (AnalyserRunner, FixerWorkerCommand) are updated accordingly. ext-pcntl/ext-posix added to composer.json suggest. Co-Authored-By: Claude Opus 4.7 (1M context) --- composer.json | 4 + src/Command/AnalyserRunner.php | 2 +- src/Command/FixerWorkerCommand.php | 5 +- src/Parallel/ForkParallelChecker.php | 67 ++++++++++++ src/Parallel/ForkedProcess.php | 151 +++++++++++++++++++++++++++ src/Parallel/ParallelAnalyser.php | 40 +++++++ 6 files changed, 267 insertions(+), 2 deletions(-) create mode 100644 src/Parallel/ForkParallelChecker.php create mode 100644 src/Parallel/ForkedProcess.php diff --git a/composer.json b/composer.json index af412db81f8..ce9335dc5aa 100644 --- a/composer.json +++ b/composer.json @@ -64,6 +64,10 @@ "phpstan/phpstan": "2.1.x", "symfony/polyfill-php73": "*" }, + "suggest": { + "ext-pcntl": "Enables forking parallel analysis workers from the already-booted process (experimental, skips the per-worker re-boot)", + "ext-posix": "Used together with ext-pcntl for forked parallel analysis workers" + }, "require-dev": { "cweagans/composer-patches": "^1.7.3", "php-parallel-lint/php-parallel-lint": "^1.2.0", diff --git a/src/Command/AnalyserRunner.php b/src/Command/AnalyserRunner.php index e0642493890..51149328b96 100644 --- a/src/Command/AnalyserRunner.php +++ b/src/Command/AnalyserRunner.php @@ -83,7 +83,7 @@ public function runAnalyser( if ($mainScript !== null && $schedule->getNumberOfProcesses() > 0) { $loop = new StreamSelectLoop(); $result = null; - $promise = $this->parallelAnalyser->analyse($loop, $schedule, $mainScript, $postFileCallback, $projectConfigFile, $tmpFile, $insteadOfFile, $input, null); + $promise = $this->parallelAnalyser->analyse($loop, $schedule, $allAnalysedFiles, $mainScript, $postFileCallback, $projectConfigFile, $tmpFile, $insteadOfFile, $input, null); $promise->then(static function (AnalyserResult $tmp) use (&$result): void { $result = $tmp; }); diff --git a/src/Command/FixerWorkerCommand.php b/src/Command/FixerWorkerCommand.php index cfe2af34c9e..6b42dcba925 100644 --- a/src/Command/FixerWorkerCommand.php +++ b/src/Command/FixerWorkerCommand.php @@ -212,6 +212,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int $loop, $container, $filesToAnalyse, + $inceptionFiles, $configuration, $input, function (array $errors, array $locallyIgnoredErrors, array $analysedFiles) use ($out, $ignoredErrorHelperResult, $isOnlyFiles, $inceptionFiles): void { @@ -381,10 +382,11 @@ private function filterErrors(array $errors, IgnoredErrorHelperResult $ignoredEr /** * @param string[] $files + * @param string[] $allAnalysedFiles * @param callable(list, list, string[]): void $onFileAnalysisHandler * @return PromiseInterface */ - private function runAnalyser(LoopInterface $loop, Container $container, array $files, ?string $configuration, InputInterface $input, callable $onFileAnalysisHandler): PromiseInterface + private function runAnalyser(LoopInterface $loop, Container $container, array $files, array $allAnalysedFiles, ?string $configuration, InputInterface $input, callable $onFileAnalysisHandler): PromiseInterface { /** @var ParallelAnalyser $parallelAnalyser */ $parallelAnalyser = $container->getByType(ParallelAnalyser::class); @@ -423,6 +425,7 @@ private function runAnalyser(LoopInterface $loop, Container $container, array $f return $parallelAnalyser->analyse( $loop, $schedule, + $allAnalysedFiles, $mainScript, null, $configuration, diff --git a/src/Parallel/ForkParallelChecker.php b/src/Parallel/ForkParallelChecker.php new file mode 100644 index 00000000000..53e709ea90e --- /dev/null +++ b/src/Parallel/ForkParallelChecker.php @@ -0,0 +1,67 @@ +isOpcacheOrJitEnabled()) { + return false; + } + + return true; + } + + private function isOpcacheOrJitEnabled(): bool + { + if (!function_exists('opcache_get_status')) { + return false; + } + + $status = opcache_get_status(false); + if ($status === false) { + return false; + } + + if (($status['opcache_enabled'] ?? false) === true) { + return true; + } + + return ($status['jit']['enabled'] ?? false) === true; + } + +} diff --git a/src/Parallel/ForkedProcess.php b/src/Parallel/ForkedProcess.php new file mode 100644 index 00000000000..0d26422d29f --- /dev/null +++ b/src/Parallel/ForkedProcess.php @@ -0,0 +1,151 @@ +setCallbacks($onData, $onError); + + // Created before the fork so the parent can read what the child wrote. + $tmpStdOut = tmpfile(); + if ($tmpStdOut === false) { + throw new ShouldNotHappenException('Failed creating temp file for stdout.'); + } + $this->stdOut = $tmpStdOut; + + $pid = pcntl_fork(); + + if ($pid === -1) { + fclose($this->stdOut); + $this->stdOut = null; + // Deferred so it runs after ParallelAnalyser has attached this + // process to the pool — otherwise tryQuitProcess() would no-op. + $this->loop->futureTick(static function () use ($onExit): void { + $onExit(null, 'pcntl_fork() failed.'); + }); + return; + } + + if ($pid === 0) { + // Child: drop the inherited listening socket immediately, then run + // the worker on its own fresh event loop and never return. + $this->server->close(); + $output = new StreamOutput($this->stdOut); + $exitCode = $this->workerRunner->run( + $output, + $this->analysedFiles, + $this->serverPort, + $this->identifier, + $this->tmpFile, + $this->insteadOfFile, + ); + exit($exitCode); + } + + // Parent: poll for the child to exit and report it through $onExit. + $this->waitTimer = $this->loop->addPeriodicTimer(self::WAITPID_POLL_INTERVAL, function () use ($pid, $onExit): void { + $status = 0; + $result = pcntl_waitpid($pid, $status, WNOHANG); + if ($result === 0) { + return; + } + + $this->cancelWaitTimer(); + $this->cancelTimer(); + + $exitCode = null; + if ($result > 0 && pcntl_wifexited($status)) { + $exitStatus = pcntl_wexitstatus($status); + if ($exitStatus !== false) { + $exitCode = $exitStatus; + } + } + + $output = ''; + if ($this->stdOut !== null) { + rewind($this->stdOut); + $output = (string) stream_get_contents($this->stdOut); + fclose($this->stdOut); + $this->stdOut = null; + } + + $onExit($exitCode, $output); + }); + } + + public function quit(): void + { + // Ending the connection makes the child's event loop drain and the + // child exit; the waitpid poll timer must keep running until then so + // the child is actually reaped (otherwise: zombie + hang). + $this->endConnection(); + } + + private function cancelWaitTimer(): void + { + if ($this->waitTimer === null) { + return; + } + + $this->loop->cancelTimer($this->waitTimer); + $this->waitTimer = null; + } + +} diff --git a/src/Parallel/ParallelAnalyser.php b/src/Parallel/ParallelAnalyser.php index 55e209c9f0e..b42ccff7045 100644 --- a/src/Parallel/ParallelAnalyser.php +++ b/src/Parallel/ParallelAnalyser.php @@ -27,6 +27,7 @@ use function count; use function defined; use function escapeshellarg; +use function fwrite; use function ini_get; use function max; use function memory_get_usage; @@ -34,6 +35,7 @@ use function sprintf; use function str_contains; use const PHP_URL_PORT; +use const STDERR; #[AutowiredService] final class ParallelAnalyser @@ -52,12 +54,15 @@ public function __construct( float $processTimeout, #[AutowiredParameter(ref: '%parallel.buffer%')] private int $decoderBufferSize, + private ForkParallelChecker $forkParallelChecker, + private WorkerRunner $workerRunner, ) { $this->processTimeout = max($processTimeout, self::DEFAULT_TIMEOUT); } /** + * @param string[] $allAnalysedFiles * @param Closure(int, list=): void|null $postFileCallback * @param (callable(list, list, string[]): void)|null $onFileAnalysisHandler * @return PromiseInterface @@ -65,6 +70,7 @@ public function __construct( public function analyse( LoopInterface $loop, Schedule $schedule, + array $allAnalysedFiles, string $mainScript, ?Closure $postFileCallback, ?string $projectConfigFile, @@ -170,6 +176,11 @@ public function analyse( $this->processPool->quitAll(); }; + $useFork = $this->forkParallelChecker->isSupported(); + if ($useFork && $input->hasParameterOption(['-v', '-vv', '-vvv', '--verbose'], true)) { + fwrite(STDERR, "Note: using pcntl_fork() for parallel workers (experimental).\n"); + } + for ($i = 0; $i < $numberOfProcesses; $i++) { if (count($jobs) === 0) { break; @@ -191,10 +202,17 @@ public function analyse( } $process = $this->createProcess( + $useFork, $loop, + $server, + $serverPort, + $processIdentifier, + $allAnalysedFiles, $mainScript, $projectConfigFile, $commandOptions, + $tmpFile, + $insteadOfFile, $input, ); $process->start(function (array $json) use ($process, &$internalErrors, &$errors, &$filteredPhpErrors, &$allPhpErrors, &$locallyIgnoredErrors, &$linesToIgnore, &$unmatchedLineIgnores, &$collectedData, &$dependencies, &$usedTraitDependencies, &$exportedNodes, &$peakMemoryUsages, &$jobs, $postFileCallback, &$internalErrorsCount, &$reachedInternalErrorsCountLimit, $processIdentifier, $onFileAnalysisHandler, &$allProcessedFiles): void { @@ -356,16 +374,38 @@ public function analyse( } /** + * @param string[] $allAnalysedFiles * @param string[] $commandOptions */ private function createProcess( + bool $useFork, LoopInterface $loop, + TcpServer $server, + int $serverPort, + string $processIdentifier, + array $allAnalysedFiles, string $mainScript, ?string $projectConfigFile, array $commandOptions, + ?string $tmpFile, + ?string $insteadOfFile, InputInterface $input, ): Process { + if ($useFork) { + return new ForkedProcess( + $loop, + $this->processTimeout, + $this->workerRunner, + $server, + $serverPort, + $processIdentifier, + $allAnalysedFiles, + $tmpFile, + $insteadOfFile, + ); + } + return new SpawnedProcess(ProcessHelper::getWorkerCommand( $mainScript, 'worker', From 4ac1d65f705d310294882d47515b9fb9bb9c70f2 Mon Sep 17 00:00:00 2001 From: Ondrej Mirtes Date: Thu, 14 May 2026 22:47:26 +0200 Subject: [PATCH 4/7] Introduce ProcessPromise interface for the PHPStan Pro worker MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ProcessPromise becomes an interface describing the fixer:worker process as seen by FixerApplication; the existing react/child-process implementation is renamed to SpawnedProcessPromise. FixerApplication creates it through a factory method. Sole implementation for now — a pcntl_fork() based one follows. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/Command/FixerApplication.php | 35 +++++++--- src/Process/ProcessPromise.php | 90 +++--------------------- src/Process/SpawnedProcessPromise.php | 98 +++++++++++++++++++++++++++ 3 files changed, 134 insertions(+), 89 deletions(-) create mode 100644 src/Process/SpawnedProcessPromise.php diff --git a/src/Command/FixerApplication.php b/src/Command/FixerApplication.php index 61f89f155be..98315b07a52 100644 --- a/src/Command/FixerApplication.php +++ b/src/Command/FixerApplication.php @@ -28,6 +28,7 @@ use PHPStan\Process\ProcessCrashedException; use PHPStan\Process\ProcessHelper; use PHPStan\Process\ProcessPromise; +use PHPStan\Process\SpawnedProcessPromise; use PHPStan\ShouldNotHappenException; use React\ChildProcess\Process; use React\EventLoop\LoopInterface; @@ -446,16 +447,7 @@ private function analyse( }); }); - $process = new ProcessPromise($loop, ProcessHelper::getWorkerCommand( - $mainScript, - 'fixer:worker', - $projectConfigFile, - [ - '--server-port', - (string) $serverPort, - ], - $input, - )); + $process = $this->createProcessPromise($loop, $mainScript, $projectConfigFile, $input, $serverPort); $this->processInProgress = $process->run(); $this->processInProgress->then(function () use ($server): void { @@ -566,4 +558,27 @@ private function getStubFiles(): array return $stubFiles; } + /** + * @param int<0, 65535> $serverPort + */ + private function createProcessPromise( + LoopInterface $loop, + string $mainScript, + ?string $projectConfigFile, + InputInterface $input, + int $serverPort, + ): ProcessPromise + { + return new SpawnedProcessPromise($loop, ProcessHelper::getWorkerCommand( + $mainScript, + 'fixer:worker', + $projectConfigFile, + [ + '--server-port', + (string) $serverPort, + ], + $input, + )); + } + } diff --git a/src/Process/ProcessPromise.php b/src/Process/ProcessPromise.php index 31c9f0b7615..1d588c42073 100644 --- a/src/Process/ProcessPromise.php +++ b/src/Process/ProcessPromise.php @@ -2,91 +2,23 @@ namespace PHPStan\Process; -use PHPStan\ShouldNotHappenException; -use React\ChildProcess\Process; -use React\EventLoop\LoopInterface; -use React\Promise\Deferred; use React\Promise\PromiseInterface; -use function fclose; -use function rewind; -use function stream_get_contents; -use function tmpfile; -final class ProcessPromise +/** + * A PHPStan Pro analysis worker as seen by FixerApplication. + * + * Implementations differ only in how the worker process comes to life: + * SpawnedProcessPromise spawns a fresh PHP process via react/child-process, + * ForkedProcessPromise forks the already-booted main process via pcntl_fork(). + * Both yield a promise that resolves on success and rejects with + * ProcessCrashedException / ProcessCanceledException otherwise. + */ +interface ProcessPromise { - /** @var Deferred */ - private Deferred $deferred; - - private ?Process $process = null; - - private bool $canceled = false; - - public function __construct(private LoopInterface $loop, private string $command) - { - $this->deferred = new Deferred(function (): void { - $this->cancel(); - }); - } - /** * @return PromiseInterface */ - public function run(): PromiseInterface - { - $tmpStdOutResource = tmpfile(); - if ($tmpStdOutResource === false) { - throw new ShouldNotHappenException('Failed creating temp file for stdout.'); - } - $tmpStdErrResource = tmpfile(); - if ($tmpStdErrResource === false) { - throw new ShouldNotHappenException('Failed creating temp file for stderr.'); - } - - $this->process = new Process($this->command, fds: [ - 1 => $tmpStdOutResource, - 2 => $tmpStdErrResource, - ]); - $this->process->start($this->loop); - - $this->process->on('exit', function ($exitCode) use ($tmpStdOutResource, $tmpStdErrResource): void { - if ($this->canceled) { - fclose($tmpStdOutResource); - fclose($tmpStdErrResource); - return; - } - rewind($tmpStdOutResource); - $stdOut = stream_get_contents($tmpStdOutResource); - fclose($tmpStdOutResource); - - rewind($tmpStdErrResource); - $stdErr = stream_get_contents($tmpStdErrResource); - fclose($tmpStdErrResource); - - if ($exitCode === null) { - $this->deferred->reject(new ProcessCrashedException($stdOut . $stdErr)); - return; - } - - if ($exitCode === 0) { - $this->deferred->resolve($stdOut); - return; - } - - $this->deferred->reject(new ProcessCrashedException($stdOut . $stdErr)); - }); - - return $this->deferred->promise(); - } - - private function cancel(): void - { - if ($this->process === null) { - throw new ShouldNotHappenException('Cancelling process before running'); - } - $this->canceled = true; - $this->process->terminate(); - $this->deferred->reject(new ProcessCanceledException()); - } + public function run(): PromiseInterface; } diff --git a/src/Process/SpawnedProcessPromise.php b/src/Process/SpawnedProcessPromise.php new file mode 100644 index 00000000000..4504e4161d0 --- /dev/null +++ b/src/Process/SpawnedProcessPromise.php @@ -0,0 +1,98 @@ + */ + private Deferred $deferred; + + private ?Process $process = null; + + private bool $canceled = false; + + public function __construct(private LoopInterface $loop, private string $command) + { + $this->deferred = new Deferred(function (): void { + $this->cancel(); + }); + } + + /** + * @return PromiseInterface + */ + public function run(): PromiseInterface + { + $tmpStdOutResource = tmpfile(); + if ($tmpStdOutResource === false) { + throw new ShouldNotHappenException('Failed creating temp file for stdout.'); + } + $tmpStdErrResource = tmpfile(); + if ($tmpStdErrResource === false) { + throw new ShouldNotHappenException('Failed creating temp file for stderr.'); + } + + $this->process = new Process($this->command, fds: [ + 1 => $tmpStdOutResource, + 2 => $tmpStdErrResource, + ]); + $this->process->start($this->loop); + + $this->process->on('exit', function ($exitCode) use ($tmpStdOutResource, $tmpStdErrResource): void { + if ($this->canceled) { + fclose($tmpStdOutResource); + fclose($tmpStdErrResource); + return; + } + rewind($tmpStdOutResource); + $stdOut = stream_get_contents($tmpStdOutResource); + fclose($tmpStdOutResource); + + rewind($tmpStdErrResource); + $stdErr = stream_get_contents($tmpStdErrResource); + fclose($tmpStdErrResource); + + if ($exitCode === null) { + $this->deferred->reject(new ProcessCrashedException($stdOut . $stdErr)); + return; + } + + if ($exitCode === 0) { + $this->deferred->resolve($stdOut); + return; + } + + $this->deferred->reject(new ProcessCrashedException($stdOut . $stdErr)); + }); + + return $this->deferred->promise(); + } + + private function cancel(): void + { + if ($this->process === null) { + throw new ShouldNotHappenException('Cancelling process before running'); + } + $this->canceled = true; + $this->process->terminate(); + $this->deferred->reject(new ProcessCanceledException()); + } + +} From 009038d4c52211713ea915ef6aa4ec8654a3bbee Mon Sep 17 00:00:00 2001 From: Ondrej Mirtes Date: Thu, 14 May 2026 22:47:35 +0200 Subject: [PATCH 5/7] Extract FixerWorkerRunner from FixerWorkerCommand MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Moves everything FixerWorkerCommand does after CommandHelper::begin() — connecting back to FixerApplication, restoring the result cache, running the analyser and streaming results — into a reusable FixerWorkerRunner service. FixerWorkerCommand now only boots and delegates, making the post-boot logic callable without a re-boot. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/Command/FixerWorkerCommand.php | 348 +-------------------------- src/Command/FixerWorkerRunner.php | 374 +++++++++++++++++++++++++++++ 2 files changed, 387 insertions(+), 335 deletions(-) create mode 100644 src/Command/FixerWorkerRunner.php diff --git a/src/Command/FixerWorkerCommand.php b/src/Command/FixerWorkerCommand.php index 6b42dcba925..54c0528f0be 100644 --- a/src/Command/FixerWorkerCommand.php +++ b/src/Command/FixerWorkerCommand.php @@ -2,46 +2,17 @@ namespace PHPStan\Command; -use Clue\React\NDJson\Encoder; use Override; -use PHPStan\Analyser\AnalyserResult; -use PHPStan\Analyser\AnalyserResultFinalizer; -use PHPStan\Analyser\Error; -use PHPStan\Analyser\Ignore\IgnoredErrorHelper; -use PHPStan\Analyser\Ignore\IgnoredErrorHelperResult; -use PHPStan\Analyser\InternalError; -use PHPStan\Analyser\ResultCache\ResultCacheManager; -use PHPStan\Analyser\ResultCache\ResultCacheManagerFactory; -use PHPStan\DependencyInjection\Container; use PHPStan\File\PathNotFoundException; -use PHPStan\Parallel\ParallelAnalyser; -use PHPStan\Parallel\Scheduler; -use PHPStan\Process\CpuCoreCounter; use PHPStan\ShouldNotHappenException; -use React\EventLoop\LoopInterface; -use React\EventLoop\StreamSelectLoop; -use React\Promise\PromiseInterface; -use React\Socket\ConnectionInterface; -use React\Socket\TcpConnector; use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Input\InputArgument; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Output\OutputInterface; -use function array_diff; -use function array_key_exists; -use function count; -use function filemtime; -use function in_array; use function is_array; use function is_bool; -use function is_file; use function is_string; -use function memory_get_peak_usage; -use function React\Promise\resolve; -use function sprintf; -use function usort; -use const JSON_INVALID_UTF8_IGNORE; final class FixerWorkerCommand extends Command { @@ -121,318 +92,25 @@ protected function execute(InputInterface $input, OutputInterface $output): int $container = $inceptionResult->getContainer(); - /** @var IgnoredErrorHelper $ignoredErrorHelper */ - $ignoredErrorHelper = $container->getByType(IgnoredErrorHelper::class); - $ignoredErrorHelperResult = $ignoredErrorHelper->initialize(); - if (count($ignoredErrorHelperResult->getErrors()) > 0) { + try { + [$inceptionFiles, $isOnlyFiles] = $inceptionResult->getFiles(); + } catch (PathNotFoundException | InceptionNotSuccessfulException) { throw new ShouldNotHappenException(); } - $loop = new StreamSelectLoop(); - $tcpConnector = new TcpConnector($loop); - $tcpConnector->connect(sprintf('127.0.0.1:%d', (int) $serverPort))->then(function (ConnectionInterface $connection) use ($container, $inceptionResult, $configuration, $input, $ignoredErrorHelperResult, $loop): void { - // phpcs:disable SlevomatCodingStandard.Namespaces.ReferenceUsedNamesOnly - $jsonInvalidUtf8Ignore = defined('JSON_INVALID_UTF8_IGNORE') ? JSON_INVALID_UTF8_IGNORE : 0; - // phpcs:enable - $out = new Encoder($connection, $jsonInvalidUtf8Ignore); - //$in = new Decoder($connection, true, 512, $jsonInvalidUtf8Ignore, 128 * 1024 * 1024); - - /** @var ResultCacheManager $resultCacheManager */ - $resultCacheManager = $container->getByType(ResultCacheManagerFactory::class)->create([]); - $projectConfigArray = $inceptionResult->getProjectConfigArray(); - - /** @var AnalyserResultFinalizer $analyserResultFinalizer */ - $analyserResultFinalizer = $container->getByType(AnalyserResultFinalizer::class); - - try { - [$inceptionFiles, $isOnlyFiles] = $inceptionResult->getFiles(); - } catch (PathNotFoundException | InceptionNotSuccessfulException) { - throw new ShouldNotHappenException(); - } - - $out->write([ - 'action' => 'analysisStart', - 'result' => [ - 'analysedFiles' => $inceptionFiles, - ], - ]); - - $resultCache = $resultCacheManager->restore($inceptionFiles, false, false, $projectConfigArray, $inceptionResult->getErrorOutput()); - - $errorsFromResultCacheTmp = $resultCache->getErrors(); - $locallyIgnoredErrorsFromResultCacheTmp = $resultCache->getLocallyIgnoredErrors(); - foreach ($resultCache->getFilesToAnalyse() as $fileToAnalyse) { - unset($errorsFromResultCacheTmp[$fileToAnalyse]); - unset($locallyIgnoredErrorsFromResultCacheTmp[$fileToAnalyse]); - } - - $errorsFromResultCache = []; - foreach ($errorsFromResultCacheTmp as $errorsByFile) { - foreach ($errorsByFile as $error) { - $errorsFromResultCache[] = $error; - } - } - - [$errorsFromResultCache, $ignoredErrorsFromResultCache] = $this->filterErrors($errorsFromResultCache, $ignoredErrorHelperResult, $isOnlyFiles, $inceptionFiles, false); - - foreach ($locallyIgnoredErrorsFromResultCacheTmp as $locallyIgnoredErrors) { - foreach ($locallyIgnoredErrors as $locallyIgnoredError) { - $ignoredErrorsFromResultCache[] = [$locallyIgnoredError, null]; - } - } - - $out->write([ - 'action' => 'analysisStream', - 'result' => [ - 'errors' => $errorsFromResultCache, - 'ignoredErrors' => $ignoredErrorsFromResultCache, - 'analysedFiles' => array_diff($inceptionFiles, $resultCache->getFilesToAnalyse()), - ], - ]); - - $filesToAnalyse = $resultCache->getFilesToAnalyse(); - usort($filesToAnalyse, static function (string $a, string $b): int { - $aTime = @filemtime($a); - if ($aTime === false) { - return 1; - } - - $bTime = @filemtime($b); - if ($bTime === false) { - return -1; - } - - // files are sorted from the oldest - // because ParallelAnalyser reverses the scheduler jobs to do the smallest - // jobs first - return $aTime <=> $bTime; - }); - - $this->runAnalyser( - $loop, - $container, - $filesToAnalyse, - $inceptionFiles, - $configuration, - $input, - function (array $errors, array $locallyIgnoredErrors, array $analysedFiles) use ($out, $ignoredErrorHelperResult, $isOnlyFiles, $inceptionFiles): void { - $internalErrors = []; - foreach ($errors as $fileSpecificError) { - if (!$fileSpecificError->hasNonIgnorableException()) { - continue; - } - - $internalErrors[] = $this->transformErrorIntoInternalError($fileSpecificError); - } - - if (count($internalErrors) > 0) { - $out->write(['action' => 'analysisCrash', 'data' => [ - 'internalErrors' => $internalErrors, - ]]); - return; - } - - [$errors, $ignoredErrors] = $this->filterErrors($errors, $ignoredErrorHelperResult, $isOnlyFiles, $inceptionFiles, false); - foreach ($locallyIgnoredErrors as $locallyIgnoredError) { - $ignoredErrors[] = [$locallyIgnoredError, null]; - } - $out->write([ - 'action' => 'analysisStream', - 'result' => [ - 'errors' => $errors, - 'ignoredErrors' => $ignoredErrors, - 'analysedFiles' => $analysedFiles, - ], - ]); - }, - )->then(function (AnalyserResult $intermediateAnalyserResult) use ($analyserResultFinalizer, $resultCacheManager, $resultCache, $inceptionResult, $isOnlyFiles, $ignoredErrorHelperResult, $inceptionFiles, $out): void { - $analyserResult = $resultCacheManager->process( - $intermediateAnalyserResult, - $resultCache, - $inceptionResult->getErrorOutput(), - false, - true, - )->getAnalyserResult(); - $finalizerResult = $analyserResultFinalizer->finalize($analyserResult, $isOnlyFiles, false); - - $internalErrors = []; - foreach ($finalizerResult->getAnalyserResult()->getInternalErrors() as $internalError) { - $internalErrors[] = new InternalError( - $internalError->getTraceAsString() !== null ? sprintf('Internal error: %s', $internalError->getMessage()) : $internalError->getMessage(), - $internalError->getContextDescription(), - $internalError->getTrace(), - $internalError->getTraceAsString(), - $internalError->shouldReportBug(), - ); - } - - foreach ($finalizerResult->getAnalyserResult()->getUnorderedErrors() as $fileSpecificError) { - if (!$fileSpecificError->hasNonIgnorableException()) { - continue; - } - - $internalErrors[] = $this->transformErrorIntoInternalError($fileSpecificError); - } - - $hasInternalErrors = count($internalErrors) > 0 || $finalizerResult->getAnalyserResult()->hasReachedInternalErrorsCountLimit(); - - if ($hasInternalErrors) { - $out->write(['action' => 'analysisCrash', 'data' => [ - 'internalErrors' => count($internalErrors) > 0 ? $internalErrors : [ - new InternalError( - 'Internal error occurred', - 'running analyser in PHPStan Pro worker', - trace: [], - traceAsString: null, - shouldReportBug: false, - ), - ], - ]]); - } - - [$collectorErrors, $ignoredCollectorErrors] = $this->filterErrors($finalizerResult->getCollectorErrors(), $ignoredErrorHelperResult, $isOnlyFiles, $inceptionFiles, $hasInternalErrors); - foreach ($finalizerResult->getLocallyIgnoredCollectorErrors() as $locallyIgnoredCollectorError) { - $ignoredCollectorErrors[] = [$locallyIgnoredCollectorError, null]; - } - $out->write([ - 'action' => 'analysisStream', - 'result' => [ - 'errors' => $collectorErrors, - 'ignoredErrors' => $ignoredCollectorErrors, - 'analysedFiles' => [], - ], - ]); - - $ignoredErrorHelperProcessedResult = $ignoredErrorHelperResult->process( - $finalizerResult->getErrors(), - $isOnlyFiles, - $inceptionFiles, - $hasInternalErrors, - ); - $ignoreFileErrors = []; - foreach ($ignoredErrorHelperProcessedResult->getNotIgnoredErrors() as $error) { - if ($error->getIdentifier() === null) { - continue; - } - if (!in_array($error->getIdentifier(), ['ignore.count', 'ignore.unmatched', 'ignore.unmatchedLine', 'ignore.unmatchedIdentifier', 'ignore.noComment'], true)) { - continue; - } - $ignoreFileErrors[] = $error; - } - - $out->end([ - 'action' => 'analysisEnd', - 'result' => [ - 'ignoreFileErrors' => $ignoreFileErrors, - 'ignoreNotFileErrors' => $ignoredErrorHelperProcessedResult->getOtherIgnoreMessages(), - ], - ]); - }); - }); - $loop->run(); - - return 0; - } - - private function transformErrorIntoInternalError(Error $error): InternalError - { - $message = $error->getMessage(); - $metadata = $error->getMetadata(); - if ( - $error->getIdentifier() === 'phpstan.internal' - && array_key_exists(InternalError::STACK_TRACE_AS_STRING_METADATA_KEY, $metadata) - ) { - $message = sprintf('Internal error: %s', $message); - } - - return new InternalError( - $message, - sprintf('analysing file %s', $error->getTraitFilePath() ?? $error->getFilePath()), - $metadata[InternalError::STACK_TRACE_METADATA_KEY] ?? [], - $metadata[InternalError::STACK_TRACE_AS_STRING_METADATA_KEY] ?? null, - shouldReportBug: true, - ); - } - - /** - * @param string[] $inceptionFiles - * @param list $errors - * @return array{list, list} - */ - private function filterErrors(array $errors, IgnoredErrorHelperResult $ignoredErrorHelperResult, bool $onlyFiles, array $inceptionFiles, bool $hasInternalErrors): array - { - $ignoredErrorHelperProcessedResult = $ignoredErrorHelperResult->process($errors, $onlyFiles, $inceptionFiles, $hasInternalErrors); - $finalErrors = []; - foreach ($ignoredErrorHelperProcessedResult->getNotIgnoredErrors() as $error) { - if ($error->getIdentifier() === null) { - $finalErrors[] = $error; - continue; - } - if (in_array($error->getIdentifier(), ['ignore.count', 'ignore.unmatched'], true)) { - continue; - } - $finalErrors[] = $error; - } - - return [ - $finalErrors, - $ignoredErrorHelperProcessedResult->getIgnoredErrors(), - ]; - } - - /** - * @param string[] $files - * @param string[] $allAnalysedFiles - * @param callable(list, list, string[]): void $onFileAnalysisHandler - * @return PromiseInterface - */ - private function runAnalyser(LoopInterface $loop, Container $container, array $files, array $allAnalysedFiles, ?string $configuration, InputInterface $input, callable $onFileAnalysisHandler): PromiseInterface - { - /** @var ParallelAnalyser $parallelAnalyser */ - $parallelAnalyser = $container->getByType(ParallelAnalyser::class); - $filesCount = count($files); - if ($filesCount === 0) { - return resolve(new AnalyserResult( - unorderedErrors: [], - filteredPhpErrors: [], - allPhpErrors: [], - locallyIgnoredErrors: [], - linesToIgnore: [], - unmatchedLineIgnores: [], - internalErrors: [], - collectedData: [], - dependencies: [], - usedTraitDependencies: [], - exportedNodes: [], - reachedInternalErrorsCountLimit: false, - peakMemoryUsageBytes: memory_get_peak_usage(true), - processedFiles: [], - )); - } - - /** @var Scheduler $scheduler */ - $scheduler = $container->getByType(Scheduler::class); - - /** @var CpuCoreCounter $cpuCoreCounter */ - $cpuCoreCounter = $container->getByType(CpuCoreCounter::class); - - $schedule = $scheduler->scheduleWork($cpuCoreCounter->getNumberOfCpuCores(), $files); - $mainScript = null; - if (isset($_SERVER['argv'][0]) && is_file($_SERVER['argv'][0])) { - $mainScript = $_SERVER['argv'][0]; - } + // Everything after the boot lives in FixerWorkerRunner so a + // pcntl_fork()-ed child can reuse it without re-booting (see + // FixerApplication). + $fixerWorkerRunner = $container->getByType(FixerWorkerRunner::class); - return $parallelAnalyser->analyse( - $loop, - $schedule, - $allAnalysedFiles, - $mainScript, - null, + return $fixerWorkerRunner->run( + $inceptionResult->getErrorOutput(), + $inceptionFiles, + $isOnlyFiles, + $inceptionResult->getProjectConfigArray(), $configuration, - null, - null, + (int) $serverPort, $input, - $onFileAnalysisHandler, ); } diff --git a/src/Command/FixerWorkerRunner.php b/src/Command/FixerWorkerRunner.php new file mode 100644 index 00000000000..9f70596ae82 --- /dev/null +++ b/src/Command/FixerWorkerRunner.php @@ -0,0 +1,374 @@ +ignoredErrorHelper->initialize(); + if (count($ignoredErrorHelperResult->getErrors()) > 0) { + throw new ShouldNotHappenException(); + } + + // Always a fresh event loop: in a forked child the parent's inherited + // loop must never be touched. + $loop = new StreamSelectLoop(); + $tcpConnector = new TcpConnector($loop); + $tcpConnector->connect(sprintf('127.0.0.1:%d', $serverPort))->then(function (ConnectionInterface $connection) use ($errorOutput, $inceptionFiles, $isOnlyFiles, $projectConfigArray, $configuration, $input, $ignoredErrorHelperResult, $loop): void { + // phpcs:disable SlevomatCodingStandard.Namespaces.ReferenceUsedNamesOnly + $jsonInvalidUtf8Ignore = defined('JSON_INVALID_UTF8_IGNORE') ? JSON_INVALID_UTF8_IGNORE : 0; + // phpcs:enable + $out = new Encoder($connection, $jsonInvalidUtf8Ignore); + //$in = new Decoder($connection, true, 512, $jsonInvalidUtf8Ignore, 128 * 1024 * 1024); + + /** @var ResultCacheManager $resultCacheManager */ + $resultCacheManager = $this->resultCacheManagerFactory->create([]); + + $out->write([ + 'action' => 'analysisStart', + 'result' => [ + 'analysedFiles' => $inceptionFiles, + ], + ]); + + $resultCache = $resultCacheManager->restore($inceptionFiles, false, false, $projectConfigArray, $errorOutput); + + $errorsFromResultCacheTmp = $resultCache->getErrors(); + $locallyIgnoredErrorsFromResultCacheTmp = $resultCache->getLocallyIgnoredErrors(); + foreach ($resultCache->getFilesToAnalyse() as $fileToAnalyse) { + unset($errorsFromResultCacheTmp[$fileToAnalyse]); + unset($locallyIgnoredErrorsFromResultCacheTmp[$fileToAnalyse]); + } + + $errorsFromResultCache = []; + foreach ($errorsFromResultCacheTmp as $errorsByFile) { + foreach ($errorsByFile as $error) { + $errorsFromResultCache[] = $error; + } + } + + [$errorsFromResultCache, $ignoredErrorsFromResultCache] = $this->filterErrors($errorsFromResultCache, $ignoredErrorHelperResult, $isOnlyFiles, $inceptionFiles, false); + + foreach ($locallyIgnoredErrorsFromResultCacheTmp as $locallyIgnoredErrors) { + foreach ($locallyIgnoredErrors as $locallyIgnoredError) { + $ignoredErrorsFromResultCache[] = [$locallyIgnoredError, null]; + } + } + + $out->write([ + 'action' => 'analysisStream', + 'result' => [ + 'errors' => $errorsFromResultCache, + 'ignoredErrors' => $ignoredErrorsFromResultCache, + 'analysedFiles' => array_diff($inceptionFiles, $resultCache->getFilesToAnalyse()), + ], + ]); + + $filesToAnalyse = $resultCache->getFilesToAnalyse(); + usort($filesToAnalyse, static function (string $a, string $b): int { + $aTime = @filemtime($a); + if ($aTime === false) { + return 1; + } + + $bTime = @filemtime($b); + if ($bTime === false) { + return -1; + } + + // files are sorted from the oldest + // because ParallelAnalyser reverses the scheduler jobs to do the smallest + // jobs first + return $aTime <=> $bTime; + }); + + $this->runAnalyser( + $loop, + $filesToAnalyse, + $inceptionFiles, + $configuration, + $input, + function (array $errors, array $locallyIgnoredErrors, array $analysedFiles) use ($out, $ignoredErrorHelperResult, $isOnlyFiles, $inceptionFiles): void { + $internalErrors = []; + foreach ($errors as $fileSpecificError) { + if (!$fileSpecificError->hasNonIgnorableException()) { + continue; + } + + $internalErrors[] = $this->transformErrorIntoInternalError($fileSpecificError); + } + + if (count($internalErrors) > 0) { + $out->write(['action' => 'analysisCrash', 'data' => [ + 'internalErrors' => $internalErrors, + ]]); + return; + } + + [$errors, $ignoredErrors] = $this->filterErrors($errors, $ignoredErrorHelperResult, $isOnlyFiles, $inceptionFiles, false); + foreach ($locallyIgnoredErrors as $locallyIgnoredError) { + $ignoredErrors[] = [$locallyIgnoredError, null]; + } + $out->write([ + 'action' => 'analysisStream', + 'result' => [ + 'errors' => $errors, + 'ignoredErrors' => $ignoredErrors, + 'analysedFiles' => $analysedFiles, + ], + ]); + }, + )->then(function (AnalyserResult $intermediateAnalyserResult) use ($resultCacheManager, $resultCache, $errorOutput, $isOnlyFiles, $ignoredErrorHelperResult, $inceptionFiles, $out): void { + $analyserResult = $resultCacheManager->process( + $intermediateAnalyserResult, + $resultCache, + $errorOutput, + false, + true, + )->getAnalyserResult(); + $finalizerResult = $this->analyserResultFinalizer->finalize($analyserResult, $isOnlyFiles, false); + + $internalErrors = []; + foreach ($finalizerResult->getAnalyserResult()->getInternalErrors() as $internalError) { + $internalErrors[] = new InternalError( + $internalError->getTraceAsString() !== null ? sprintf('Internal error: %s', $internalError->getMessage()) : $internalError->getMessage(), + $internalError->getContextDescription(), + $internalError->getTrace(), + $internalError->getTraceAsString(), + $internalError->shouldReportBug(), + ); + } + + foreach ($finalizerResult->getAnalyserResult()->getUnorderedErrors() as $fileSpecificError) { + if (!$fileSpecificError->hasNonIgnorableException()) { + continue; + } + + $internalErrors[] = $this->transformErrorIntoInternalError($fileSpecificError); + } + + $hasInternalErrors = count($internalErrors) > 0 || $finalizerResult->getAnalyserResult()->hasReachedInternalErrorsCountLimit(); + + if ($hasInternalErrors) { + $out->write(['action' => 'analysisCrash', 'data' => [ + 'internalErrors' => count($internalErrors) > 0 ? $internalErrors : [ + new InternalError( + 'Internal error occurred', + 'running analyser in PHPStan Pro worker', + trace: [], + traceAsString: null, + shouldReportBug: false, + ), + ], + ]]); + } + + [$collectorErrors, $ignoredCollectorErrors] = $this->filterErrors($finalizerResult->getCollectorErrors(), $ignoredErrorHelperResult, $isOnlyFiles, $inceptionFiles, $hasInternalErrors); + foreach ($finalizerResult->getLocallyIgnoredCollectorErrors() as $locallyIgnoredCollectorError) { + $ignoredCollectorErrors[] = [$locallyIgnoredCollectorError, null]; + } + $out->write([ + 'action' => 'analysisStream', + 'result' => [ + 'errors' => $collectorErrors, + 'ignoredErrors' => $ignoredCollectorErrors, + 'analysedFiles' => [], + ], + ]); + + $ignoredErrorHelperProcessedResult = $ignoredErrorHelperResult->process( + $finalizerResult->getErrors(), + $isOnlyFiles, + $inceptionFiles, + $hasInternalErrors, + ); + $ignoreFileErrors = []; + foreach ($ignoredErrorHelperProcessedResult->getNotIgnoredErrors() as $error) { + if ($error->getIdentifier() === null) { + continue; + } + if (!in_array($error->getIdentifier(), ['ignore.count', 'ignore.unmatched', 'ignore.unmatchedLine', 'ignore.unmatchedIdentifier', 'ignore.noComment'], true)) { + continue; + } + $ignoreFileErrors[] = $error; + } + + $out->end([ + 'action' => 'analysisEnd', + 'result' => [ + 'ignoreFileErrors' => $ignoreFileErrors, + 'ignoreNotFileErrors' => $ignoredErrorHelperProcessedResult->getOtherIgnoreMessages(), + ], + ]); + }); + }); + $loop->run(); + + return 0; + } + + private function transformErrorIntoInternalError(Error $error): InternalError + { + $message = $error->getMessage(); + $metadata = $error->getMetadata(); + if ( + $error->getIdentifier() === 'phpstan.internal' + && array_key_exists(InternalError::STACK_TRACE_AS_STRING_METADATA_KEY, $metadata) + ) { + $message = sprintf('Internal error: %s', $message); + } + + return new InternalError( + $message, + sprintf('analysing file %s', $error->getTraitFilePath() ?? $error->getFilePath()), + $metadata[InternalError::STACK_TRACE_METADATA_KEY] ?? [], + $metadata[InternalError::STACK_TRACE_AS_STRING_METADATA_KEY] ?? null, + shouldReportBug: true, + ); + } + + /** + * @param string[] $inceptionFiles + * @param list $errors + * @return array{list, list} + */ + private function filterErrors(array $errors, IgnoredErrorHelperResult $ignoredErrorHelperResult, bool $onlyFiles, array $inceptionFiles, bool $hasInternalErrors): array + { + $ignoredErrorHelperProcessedResult = $ignoredErrorHelperResult->process($errors, $onlyFiles, $inceptionFiles, $hasInternalErrors); + $finalErrors = []; + foreach ($ignoredErrorHelperProcessedResult->getNotIgnoredErrors() as $error) { + if ($error->getIdentifier() === null) { + $finalErrors[] = $error; + continue; + } + if (in_array($error->getIdentifier(), ['ignore.count', 'ignore.unmatched'], true)) { + continue; + } + $finalErrors[] = $error; + } + + return [ + $finalErrors, + $ignoredErrorHelperProcessedResult->getIgnoredErrors(), + ]; + } + + /** + * @param string[] $files + * @param string[] $allAnalysedFiles + * @param callable(list, list, string[]): void $onFileAnalysisHandler + * @return PromiseInterface + */ + private function runAnalyser(LoopInterface $loop, array $files, array $allAnalysedFiles, ?string $configuration, InputInterface $input, callable $onFileAnalysisHandler): PromiseInterface + { + $filesCount = count($files); + if ($filesCount === 0) { + return resolve(new AnalyserResult( + unorderedErrors: [], + filteredPhpErrors: [], + allPhpErrors: [], + locallyIgnoredErrors: [], + linesToIgnore: [], + unmatchedLineIgnores: [], + internalErrors: [], + collectedData: [], + dependencies: [], + usedTraitDependencies: [], + exportedNodes: [], + reachedInternalErrorsCountLimit: false, + peakMemoryUsageBytes: memory_get_peak_usage(true), + processedFiles: [], + )); + } + + $schedule = $this->scheduler->scheduleWork($this->cpuCoreCounter->getNumberOfCpuCores(), $files); + $mainScript = null; + if (isset($_SERVER['argv'][0]) && is_file($_SERVER['argv'][0])) { + $mainScript = $_SERVER['argv'][0]; + } + + return $this->parallelAnalyser->analyse( + $loop, + $schedule, + $allAnalysedFiles, + $mainScript, + null, + $configuration, + null, + null, + $input, + $onFileAnalysisHandler, + ); + } + +} From 0aaaa47055207b0d4d59d7aadc34c0e79d1750f8 Mon Sep 17 00:00:00 2001 From: Ondrej Mirtes Date: Thu, 14 May 2026 22:47:46 +0200 Subject: [PATCH 6/7] Add pcntl_fork() PHPStan Pro worker path MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When PHPSTAN_PARALLEL_FORK=1 is set and OPcache + JIT are off, FixerApplication forks the fixer:worker from the already-booted process (ForkedProcessPromise) instead of spawning a fresh one — the fork inherits the DI container, skipping the re-boot. The forked child still talks to FixerApplication over the same TCP + NDJSON protocol. FixerApplication::run() now receives the InceptionResult so the forked child has the analysed files, project config and error output without re-deriving them. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/Command/AnalyseCommand.php | 2 +- src/Command/FixerApplication.php | 54 +++++++- src/Process/ForkedProcessPromise.php | 179 +++++++++++++++++++++++++++ 3 files changed, 230 insertions(+), 5 deletions(-) create mode 100644 src/Process/ForkedProcessPromise.php diff --git a/src/Command/AnalyseCommand.php b/src/Command/AnalyseCommand.php index f7865ad7508..d2e39d33003 100644 --- a/src/Command/AnalyseCommand.php +++ b/src/Command/AnalyseCommand.php @@ -840,7 +840,7 @@ private function runFixer(InceptionResult $inceptionResult, Container $container $fixerApplication = $container->getByType(FixerApplication::class); return $fixerApplication->run( - $inceptionResult->getProjectConfigFile(), + $inceptionResult, $input, $output, count($files), diff --git a/src/Command/FixerApplication.php b/src/Command/FixerApplication.php index 98315b07a52..8bd4700bb8e 100644 --- a/src/Command/FixerApplication.php +++ b/src/Command/FixerApplication.php @@ -19,11 +19,14 @@ use PHPStan\File\FileMonitorResult; use PHPStan\File\FileReader; use PHPStan\File\FileWriter; +use PHPStan\File\PathNotFoundException; use PHPStan\Internal\ComposerHelper; use PHPStan\Internal\DirectoryCreator; use PHPStan\Internal\DirectoryCreatorException; use PHPStan\Internal\HttpClientFactory; +use PHPStan\Parallel\ForkParallelChecker; use PHPStan\PhpDoc\StubFilesProvider; +use PHPStan\Process\ForkedProcessPromise; use PHPStan\Process\ProcessCanceledException; use PHPStan\Process\ProcessCrashedException; use PHPStan\Process\ProcessHelper; @@ -95,18 +98,25 @@ public function __construct( #[AutowiredParameter] private string $usedLevel, private HttpClientFactory $httpClientFactory, + private ForkParallelChecker $forkParallelChecker, + private FixerWorkerRunner $fixerWorkerRunner, ) { } public function run( - ?string $projectConfigFile, + InceptionResult $inceptionResult, InputInterface $input, OutputInterface $output, int $filesCount, string $mainScript, ): int { + $projectConfigFile = $inceptionResult->getProjectConfigFile(); + if ($this->forkParallelChecker->isSupported() && $output->isVerbose()) { + $output->writeln('Note: using pcntl_fork() for the PHPStan Pro worker (experimental).'); + } + $loop = new StreamSelectLoop(); $server = new TcpServer('127.0.0.1:0', $loop); /** @var string $serverAddress */ @@ -115,7 +125,7 @@ public function run( /** @var int<0, 65535> $serverPort */ $serverPort = parse_url($serverAddress, PHP_URL_PORT); - $server->on('connection', function (ConnectionInterface $connection) use ($loop, $projectConfigFile, $input, $output, $mainScript, $filesCount): void { + $server->on('connection', function (ConnectionInterface $connection) use ($loop, $inceptionResult, $projectConfigFile, $input, $output, $mainScript, $filesCount): void { // phpcs:disable SlevomatCodingStandard.Namespaces.ReferenceUsedNamesOnly $jsonInvalidUtf8Ignore = defined('JSON_INVALID_UTF8_IGNORE') ? JSON_INVALID_UTF8_IGNORE : 0; // phpcs:enable @@ -158,6 +168,7 @@ public function run( $this->analyse( $loop, + $inceptionResult, $mainScript, $projectConfigFile, $input, @@ -165,7 +176,7 @@ public function run( $encoder, ); - $this->monitorFileChanges($loop, function (FileMonitorResult $changes) use ($loop, $mainScript, $projectConfigFile, $input, $encoder, $output): void { + $this->monitorFileChanges($loop, function (FileMonitorResult $changes) use ($loop, $inceptionResult, $mainScript, $projectConfigFile, $input, $encoder, $output): void { if ($this->processInProgress !== null) { $this->processInProgress->cancel(); $this->processInProgress = null; @@ -179,6 +190,7 @@ public function run( $this->analyse( $loop, + $inceptionResult, $mainScript, $projectConfigFile, $input, @@ -418,6 +430,7 @@ private function monitorFileChanges(LoopInterface $loop, callable $hasChangesCal private function analyse( LoopInterface $loop, + InceptionResult $inceptionResult, string $mainScript, ?string $projectConfigFile, InputInterface $input, @@ -447,7 +460,16 @@ private function analyse( }); }); - $process = $this->createProcessPromise($loop, $mainScript, $projectConfigFile, $input, $serverPort); + $process = $this->createProcessPromise( + $this->forkParallelChecker->isSupported(), + $loop, + $server, + $mainScript, + $projectConfigFile, + $input, + $serverPort, + $inceptionResult, + ); $this->processInProgress = $process->run(); $this->processInProgress->then(function () use ($server): void { @@ -562,13 +584,37 @@ private function getStubFiles(): array * @param int<0, 65535> $serverPort */ private function createProcessPromise( + bool $useFork, LoopInterface $loop, + TcpServer $server, string $mainScript, ?string $projectConfigFile, InputInterface $input, int $serverPort, + InceptionResult $inceptionResult, ): ProcessPromise { + if ($useFork) { + try { + [$inceptionFiles, $isOnlyFiles] = $inceptionResult->getFiles(); + } catch (PathNotFoundException | InceptionNotSuccessfulException) { + throw new ShouldNotHappenException(); + } + + return new ForkedProcessPromise( + $loop, + $this->fixerWorkerRunner, + $server, + $inceptionResult->getErrorOutput(), + $inceptionFiles, + $isOnlyFiles, + $inceptionResult->getProjectConfigArray(), + $projectConfigFile, + $serverPort, + $input, + ); + } + return new SpawnedProcessPromise($loop, ProcessHelper::getWorkerCommand( $mainScript, 'fixer:worker', diff --git a/src/Process/ForkedProcessPromise.php b/src/Process/ForkedProcessPromise.php new file mode 100644 index 00000000000..8381b8acbeb --- /dev/null +++ b/src/Process/ForkedProcessPromise.php @@ -0,0 +1,179 @@ + */ + private Deferred $deferred; + + private ?int $childPid = null; + + /** @var resource|null */ + private $stdOut = null; + + private ?TimerInterface $waitTimer = null; + + private bool $canceled = false; + + /** + * @param string[] $inceptionFiles + * @param mixed[]|null $projectConfigArray + */ + public function __construct( + private LoopInterface $loop, + private FixerWorkerRunner $fixerWorkerRunner, + private TcpServer $server, + private Output $errorOutput, + private array $inceptionFiles, + private bool $isOnlyFiles, + private ?array $projectConfigArray, + private ?string $configuration, + private int $serverPort, + private InputInterface $input, + ) + { + $this->deferred = new Deferred(function (): void { + $this->cancel(); + }); + } + + /** + * @return PromiseInterface + */ + public function run(): PromiseInterface + { + // Created before the fork so the parent can read what the child wrote. + $tmpStdOut = tmpfile(); + if ($tmpStdOut === false) { + throw new ShouldNotHappenException('Failed creating temp file for stdout.'); + } + $this->stdOut = $tmpStdOut; + + $pid = pcntl_fork(); + + if ($pid === -1) { + fclose($this->stdOut); + $this->stdOut = null; + // Deferred so it runs after FixerApplication has stored the promise. + $this->loop->futureTick(function (): void { + $this->deferred->reject(new ProcessCrashedException('pcntl_fork() failed.')); + }); + + return $this->deferred->promise(); + } + + if ($pid === 0) { + // Child: drop the inherited listening socket immediately, then run + // the worker on its own fresh event loop and never return. + $this->server->close(); + $exitCode = $this->fixerWorkerRunner->run( + $this->errorOutput, + $this->inceptionFiles, + $this->isOnlyFiles, + $this->projectConfigArray, + $this->configuration, + $this->serverPort, + $this->input, + ); + exit($exitCode); + } + + // Parent: poll for the child to exit and resolve/reject accordingly. + $this->childPid = $pid; + $this->waitTimer = $this->loop->addPeriodicTimer(self::WAITPID_POLL_INTERVAL, function () use ($pid): void { + $status = 0; + $result = pcntl_waitpid($pid, $status, WNOHANG); + if ($result === 0) { + return; + } + + $this->cancelWaitTimer(); + + $output = ''; + if ($this->stdOut !== null) { + rewind($this->stdOut); + $output = (string) stream_get_contents($this->stdOut); + fclose($this->stdOut); + $this->stdOut = null; + } + + if ($this->canceled) { + // cancel() already rejected the promise; just reap the child. + return; + } + + $exitCode = null; + if ($result > 0 && pcntl_wifexited($status)) { + $exitStatus = pcntl_wexitstatus($status); + if ($exitStatus !== false) { + $exitCode = $exitStatus; + } + } + + if ($exitCode === 0) { + $this->deferred->resolve($output); + return; + } + + $this->deferred->reject(new ProcessCrashedException($output)); + }); + + return $this->deferred->promise(); + } + + private function cancel(): void + { + if ($this->childPid === null) { + throw new ShouldNotHappenException('Cancelling process before running'); + } + $this->canceled = true; + // SIGTERM the child; the waitpid poll timer keeps running so it still + // gets reaped (otherwise: zombie). + posix_kill($this->childPid, SIGTERM); + $this->deferred->reject(new ProcessCanceledException()); + } + + private function cancelWaitTimer(): void + { + if ($this->waitTimer === null) { + return; + } + + $this->loop->cancelTimer($this->waitTimer); + $this->waitTimer = null; + } + +} From 9e6a5cd0a5351468436da5a6e2114db79b2028aa Mon Sep 17 00:00:00 2001 From: Ondrej Mirtes Date: Fri, 15 May 2026 09:24:31 +0200 Subject: [PATCH 7/7] Print parallel worker mechanism via DiagnoseExtension MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ForkParallelChecker now implements DiagnoseExtension (like Scheduler): under -vvv it prints whether parallel workers are spawned or forked, and — if PHPSTAN_PARALLEL_FORK=1 is set but the fork path still wasn't taken — it explains which precondition (pcntl/posix availability or OPcache/ JIT being off) is missing. The ad-hoc "Note: using pcntl_fork()…" lines that ParallelAnalyser and FixerApplication used to write to stderr in verbose mode are removed in favour of this. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/Command/FixerApplication.php | 3 --- src/Parallel/ForkParallelChecker.php | 39 ++++++++++++++++++++++------ src/Parallel/ParallelAnalyser.php | 5 ---- 3 files changed, 31 insertions(+), 16 deletions(-) diff --git a/src/Command/FixerApplication.php b/src/Command/FixerApplication.php index 8bd4700bb8e..d0deb1607ac 100644 --- a/src/Command/FixerApplication.php +++ b/src/Command/FixerApplication.php @@ -113,9 +113,6 @@ public function run( ): int { $projectConfigFile = $inceptionResult->getProjectConfigFile(); - if ($this->forkParallelChecker->isSupported() && $output->isVerbose()) { - $output->writeln('Note: using pcntl_fork() for the PHPStan Pro worker (experimental).'); - } $loop = new StreamSelectLoop(); $server = new TcpServer('127.0.0.1:0', $loop); diff --git a/src/Parallel/ForkParallelChecker.php b/src/Parallel/ForkParallelChecker.php index 53e709ea90e..e75a543c2f6 100644 --- a/src/Parallel/ForkParallelChecker.php +++ b/src/Parallel/ForkParallelChecker.php @@ -2,10 +2,13 @@ namespace PHPStan\Parallel; +use PHPStan\Command\Output; use PHPStan\DependencyInjection\AutowiredService; +use PHPStan\Diagnose\DiagnoseExtension; use function function_exists; use function getenv; use function opcache_get_status; +use function sprintf; /** * Decides whether parallel analysis should fork workers via pcntl_fork() @@ -17,10 +20,33 @@ * doing so corrupts analysis results. */ #[AutowiredService] -final class ForkParallelChecker +final class ForkParallelChecker implements DiagnoseExtension { public function isSupported(): bool + { + return $this->getDisabledReason() === null; + } + + public function print(Output $output): void + { + $output->writeLineFormatted('Parallel worker creation:'); + + $reason = $this->getDisabledReason(); + if ($reason === null) { + $output->writeLineFormatted('Mechanism: fork (pcntl_fork — experimental)'); + $output->writeLineFormatted(''); + return; + } + + $output->writeLineFormatted('Mechanism: spawn (react/child-process)'); + if (getenv('PHPSTAN_PARALLEL_FORK') === '1') { + $output->writeLineFormatted(sprintf('Reason fork not used: %s', $reason)); + } + $output->writeLineFormatted(''); + } + + private function getDisabledReason(): ?string { if ( !function_exists('pcntl_fork') @@ -29,21 +55,18 @@ public function isSupported(): bool || !function_exists('pcntl_wexitstatus') || !function_exists('posix_kill') ) { - return false; + return 'pcntl/posix functions are not available'; } if (getenv('PHPSTAN_PARALLEL_FORK') !== '1') { - return false; + return 'PHPSTAN_PARALLEL_FORK environment variable is not set to "1"'; } - // OPcache's shared memory and the JIT buffer are not safe to populate - // concurrently from multiple forked children — doing so corrupts - // analysis results. Forked workers require OPcache and JIT to be off. if ($this->isOpcacheOrJitEnabled()) { - return false; + return 'OPcache or JIT is enabled (forked workers require both to be off — their shared memory corrupts under concurrent population)'; } - return true; + return null; } private function isOpcacheOrJitEnabled(): bool diff --git a/src/Parallel/ParallelAnalyser.php b/src/Parallel/ParallelAnalyser.php index b42ccff7045..c5be7a4e546 100644 --- a/src/Parallel/ParallelAnalyser.php +++ b/src/Parallel/ParallelAnalyser.php @@ -27,7 +27,6 @@ use function count; use function defined; use function escapeshellarg; -use function fwrite; use function ini_get; use function max; use function memory_get_usage; @@ -35,7 +34,6 @@ use function sprintf; use function str_contains; use const PHP_URL_PORT; -use const STDERR; #[AutowiredService] final class ParallelAnalyser @@ -177,9 +175,6 @@ public function analyse( }; $useFork = $this->forkParallelChecker->isSupported(); - if ($useFork && $input->hasParameterOption(['-v', '-vv', '-vvv', '--verbose'], true)) { - fwrite(STDERR, "Note: using pcntl_fork() for parallel workers (experimental).\n"); - } for ($i = 0; $i < $numberOfProcesses; $i++) { if (count($jobs) === 0) {