diff --git a/e2e-cli/e2e-config.json b/e2e-cli/e2e-config.json index 071d5fc..cf3ee4d 100644 --- a/e2e-cli/e2e-config.json +++ b/e2e-cli/e2e-config.json @@ -1,6 +1,6 @@ { "sdk": "php", - "test_suites": "basic", + "test_suites": "basic,retry", "auto_settings": false, "patch": null, "env": {} diff --git a/e2e-cli/main.php b/e2e-cli/main.php index 4695981..3ffcf2e 100644 --- a/e2e-cli/main.php +++ b/e2e-cli/main.php @@ -135,10 +135,9 @@ function parseHost(string $apiHost): string * Build the options array for Segment\Client. * * @param array $input - * @param array &$errors collected error messages * @return array */ -function buildClientOptions(array $input, array &$errors): array +function buildClientOptions(array $input): array { $config = $input['config'] ?? []; $apiHost = $input['apiHost'] ?? ''; @@ -154,10 +153,11 @@ function buildClientOptions(array $input, array &$errors): array // mock test server (the base LibCurl hardcodes https://). 'consumer' => E2eLibCurl::class, 'protocol' => $scheme, - 'error_handler' => function (int $code, string $message) use (&$errors): void { - $msg = "HTTP {$code}: {$message}"; - debugLog('SDK error — ' . $msg); - $errors[] = $msg; + // Log HTTP errors to stderr only — success/failure is determined by + // track()/flush() return values, not by the error_handler callback, + // because handleError fires for transient retry errors too. + 'error_handler' => function (int $code, string $message): void { + debugLog("SDK HTTP error {$code}: {$message}"); }, ]; @@ -176,6 +176,11 @@ function buildClientOptions(array $input, array &$errors): array debugLog('curl_timeout: ' . $options['curl_timeout']); } + if (isset($config['maxRetries']) && is_numeric($config['maxRetries'])) { + $options['retry_count'] = (int)$config['maxRetries']; + debugLog('retry_count: ' . $options['retry_count']); + } + return $options; } @@ -241,9 +246,10 @@ function buildMessage(array $event): array } $errors = []; +$autoFlushFailed = false; // set true if an enqueue() auto-flush returns false -// Build client options (error_handler captures into $errors by reference) -$options = buildClientOptions($input, $errors); +// Build client options (error_handler just logs; we track success via return values) +$options = buildClientOptions($input); debugLog('Creating Segment\\Client with writeKey=' . substr($writeKey, 0, 4) . '...'); @@ -268,30 +274,35 @@ function buildMessage(array $event): array debugLog(" [{$seqIndex}/{$eventIndex}] Enqueueing {$type}"); + $enqueueOk = true; switch ($type) { case 'track': - $client->track($message); + $enqueueOk = $client->track($message); break; case 'identify': - $client->identify($message); + $enqueueOk = $client->identify($message); break; case 'page': - $client->page($message); + $enqueueOk = $client->page($message); break; case 'screen': - $client->screen($message); + $enqueueOk = $client->screen($message); break; case 'alias': - $client->alias($message); + $enqueueOk = $client->alias($message); break; case 'group': - $client->group($message); + $enqueueOk = $client->group($message); break; default: $errors[] = "Unknown event type: {$type}"; debugLog(" Unknown event type: {$type}"); break; } + if (!$enqueueOk) { + $autoFlushFailed = true; + debugLog(" Enqueue/auto-flush failed for {$type}"); + } } } @@ -306,14 +317,19 @@ function buildMessage(array $event): array $errors[] = 'Flush failed'; } -$hasErrors = !empty($errors); -$success = $flushOk && !$hasErrors; +// Success = all flushes succeeded and no fatal errors. +// auto-flushes (from enqueue when flush_at reached) and explicit flush are both tracked. +$overallSuccess = $flushOk && !$autoFlushFailed && empty($errors); -if ($success) { +if ($overallSuccess) { outputResult(true, $sentBatches); exit(0); } else { - $errorMsg = implode('; ', $errors); + $allErrors = array_merge( + $errors, + $autoFlushFailed ? ['Auto-flush failed'] : [] + ); + $errorMsg = implode('; ', $allErrors ?: ['Unknown flush failure']); outputResult(false, $sentBatches, $errorMsg); exit(1); } diff --git a/lib/Consumer/LibCurl.php b/lib/Consumer/LibCurl.php index 405467b..d23d258 100644 --- a/lib/Consumer/LibCurl.php +++ b/lib/Consumer/LibCurl.php @@ -9,92 +9,132 @@ class LibCurl extends QueueConsumer protected string $type = 'LibCurl'; /** - * Make a sync request to our API. If debug is - * enabled, we wait for the response - * and retry once to diminish impact on performance. + * Send a batch of messages to the API with spec-compliant retry logic: + * - 2xx/3xx: success + * - 429 + Retry-After: sleep without consuming retry budget + * - 429 without Retry-After / other retryable (5xx except 501/505/511, + * 408/410/460): exponential backoff, counts against retry budget + * - Non-retryable 4xx / 501/505/511: drop immediately + * * @param array $messages array of all the messages to send * @return bool whether the request succeeded */ public function flushBatch(array $messages): bool { - $body = $this->payload($messages); + $body = $this->payload($messages); $payload = json_encode($body); - $secret = $this->secret; + $secret = $this->secret; if ($this->compress_request) { $payload = gzencode($payload); } - if ($this->host) { - $host = $this->host; - } else { - $host = 'api.segment.io'; - } - $path = '/v1/batch'; - $url = $this->protocol . $host . $path; + $host = $this->host ?: 'api.segment.io'; + $url = $this->protocol . $host . '/v1/batch'; - $backoff = 100; // Set initial waiting time to 100ms + $library = $messages[0]['context']['library']; + $userAgent = $library['name'] . '/' . $library['version']; - while ($backoff < $this->maximum_backoff_duration) { - // open connection - $ch = curl_init(); + $backoffMs = 500; // base 500ms per e2e spec + $backoffCapMs = 60000; // cap 60s + $retriesRemaining = $this->retry_count; + $attempt = 0; + $backoffStartTime = null; + $rateLimitStartTime = null; - // set the url, number of POST vars, POST data - curl_setopt($ch, CURLOPT_USERPWD, $secret . ':'); - curl_setopt($ch, CURLOPT_POSTFIELDS, $payload); - curl_setopt($ch, CURLOPT_TIMEOUT, $this->curl_timeout); - curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, $this->curl_connecttimeout); + while (true) { + $attempt++; + $responseHeaders = []; + + $ch = curl_init(); - // set variables for headers - $header = []; - $header[] = 'Content-Type: application/json'; + $headers = [ + 'Content-Type: application/json', + 'User-Agent: ' . $userAgent, + ]; if ($this->compress_request) { - $header[] = 'Content-Encoding: gzip'; + $headers[] = 'Content-Encoding: gzip'; } - // Send user agent in the form of {library_name}/{library_version} as per RFC 7231. - $library = $messages[0]['context']['library']; - $libName = $library['name']; - $libVersion = $library['version']; - $header[] = "User-Agent: $libName/$libVersion"; + if ($attempt > 1) { + $headers[] = 'X-Retry-Count: ' . ($attempt - 1); + } - curl_setopt($ch, CURLOPT_HTTPHEADER, $header); - curl_setopt($ch, CURLOPT_URL, $url); + curl_setopt($ch, CURLOPT_USERPWD, $secret . ':'); + curl_setopt($ch, CURLOPT_POSTFIELDS, $payload); + curl_setopt($ch, CURLOPT_TIMEOUT, $this->curl_timeout); + curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, $this->curl_connecttimeout); + curl_setopt($ch, CURLOPT_HTTPHEADER, $headers); + curl_setopt($ch, CURLOPT_URL, $url); curl_setopt($ch, CURLOPT_RETURNTRANSFER, true); + curl_setopt($ch, CURLOPT_HEADERFUNCTION, function ($curl, $header) use (&$responseHeaders) { + $parts = explode(':', $header, 2); + if (count($parts) === 2) { + $responseHeaders[strtolower(trim($parts[0]))] = trim($parts[1]); + } + + return strlen($header); + }); - // retry failed requests just once to diminish impact on performance $responseContent = curl_exec($ch); + $err = curl_error($ch); + $responseCode = (int)curl_getinfo($ch, CURLINFO_HTTP_CODE); + curl_close($ch); - $err = curl_error($ch); if ($err) { - $this->handleError(curl_errno($ch), $err); + $this->handleError(0, $err); + return false; } - $responseCode = (int)curl_getinfo($ch, CURLINFO_HTTP_CODE); + // 2xx and 3xx are success + if ($responseCode >= 200 && $responseCode < 400) { + return true; + } - //close connection - curl_close($ch); + $this->handleError($responseCode, $responseContent); + + // 429: check for Retry-After header first + if ($responseCode === 429) { + $retryAfterS = $this->parseRetryAfter($responseHeaders['retry-after'] ?? null); + + if ($retryAfterS !== null) { + if ($rateLimitStartTime === null) { + $rateLimitStartTime = microtime(true); + } - if ($responseCode !== 200) { - // log error - $this->handleError($responseCode, $responseContent); - - if (($responseCode >= 500 && $responseCode <= 600) || $responseCode === 429) { - // If status code is greater than 500 and less than 600, it indicates server error - // Error code 429 indicates rate limited. - // Retry uploading in these cases. - usleep($backoff * 1000); - $backoff *= 2; - } elseif ($responseCode >= 400) { - break; + if ((microtime(true) - $rateLimitStartTime) * 1000 >= $this->max_rate_limit_duration_ms) { + return false; + } + + $sleepMs = min($retryAfterS * 1000, $this->rate_limit_retry_after_cap_s * 1000); + usleep($sleepMs * 1000); + continue; // Do NOT decrement retriesRemaining } - } else { - break; // no error + // No Retry-After: fall through to counted backoff } - } - return true; + if (!$this->isRetryable($responseCode)) { + return false; + } + + $retriesRemaining--; + + if ($retriesRemaining <= 0) { + return false; + } + + if ($backoffStartTime === null) { + $backoffStartTime = microtime(true); + } + + if ((microtime(true) - $backoffStartTime) * 1000 >= $this->max_total_backoff_duration_ms) { + return false; + } + + usleep($backoffMs * 1000); + $backoffMs = min($backoffMs * 2, $backoffCapMs); + } } } diff --git a/lib/Consumer/QueueConsumer.php b/lib/Consumer/QueueConsumer.php index 60f7e43..546c56b 100644 --- a/lib/Consumer/QueueConsumer.php +++ b/lib/Consumer/QueueConsumer.php @@ -19,6 +19,10 @@ abstract class QueueConsumer extends Consumer protected int $max_batch_size_bytes = 512000; //500kb protected int $max_item_size_bytes = 32000; // 32kb protected int $maximum_backoff_duration = 10000; // Set maximum waiting limit to 10s + protected int $max_total_backoff_duration_ms = 43200000; // 12 hours + protected int $max_rate_limit_duration_ms = 43200000; // 12 hours + protected int $rate_limit_retry_after_cap_s = 300; // 5 minutes + protected int $retry_count = 10; // max retries protected string $host = ''; protected bool $compress_request = false; protected int $flush_interval_in_mills = 10000; //frequency in milliseconds to send data, default 10 @@ -83,6 +87,22 @@ public function __construct(string $secret, array $options = []) $this->curl_connecttimeout = $options['curl_connecttimeout']; } + if (isset($options['max_total_backoff_duration'])) { + $this->max_total_backoff_duration_ms = (int)$options['max_total_backoff_duration']; + } + + if (isset($options['max_rate_limit_duration'])) { + $this->max_rate_limit_duration_ms = (int)$options['max_rate_limit_duration']; + } + + if (isset($options['rate_limit_retry_after_cap_s'])) { + $this->rate_limit_retry_after_cap_s = (int)$options['rate_limit_retry_after_cap_s']; + } + + if (isset($options['retry_count'])) { + $this->retry_count = (int)$options['retry_count']; + } + $this->queue = []; } @@ -101,7 +121,8 @@ public function flush(): bool $success = true; while ($count > 0 && $success) { - $batch = array_splice($this->queue, 0, min($this->flush_at, $count)); + $batchSize = min($this->flush_at, $count); + $batch = array_slice($this->queue, 0, $batchSize); if (mb_strlen(serialize($batch), '8bit') >= $this->max_batch_size_bytes) { $msg = 'Batch size is larger than 500KB'; @@ -110,11 +131,14 @@ public function flush(): bool return false; } + // Remove batch before sending — flushBatch() handles all retries internally + array_splice($this->queue, 0, $batchSize); + $success = $this->flushBatch($batch); $count = count($this->queue); - if ($count > 0) { + if ($count > 0 && $success) { usleep($this->flush_interval_in_mills * 1000); } } @@ -122,6 +146,41 @@ public function flush(): bool return $success; } + /** + * Determine if a status code is retryable per e2e spec. + * 5xx are retryable except 501, 505, 511. + * 4xx are non-retryable except 408, 410, 429, 460. + */ + protected function isRetryable(int $statusCode): bool + { + if ($statusCode >= 500 && $statusCode < 600) { + return !in_array($statusCode, [501, 505, 511], true); + } + + return in_array($statusCode, [408, 410, 429, 460], true); + } + + /** + * Parse Retry-After header as integer seconds. + * Returns null if absent, non-numeric, zero, or negative. + */ + protected function parseRetryAfter(?string $value): ?int + { + if ($value === null || $value === '') { + return null; + } + + $value = trim($value); + + if (!ctype_digit($value)) { + return null; + } + + $seconds = (int)$value; + + return $seconds > 0 ? $seconds : null; + } + /** * Tracks a user action * diff --git a/lib/Consumer/Socket.php b/lib/Consumer/Socket.php index c339575..299eb9f 100644 --- a/lib/Consumer/Socket.php +++ b/lib/Consumer/Socket.php @@ -46,7 +46,7 @@ public function flushBatch($batch): bool $payload = $this->payload($batch); $payload = json_encode($payload); - $body = $this->createBody($this->options['host'], $payload); + $body = $this->createBody($this->options['host'], $payload, 1); if ($body === false) { return false; } @@ -95,7 +95,7 @@ private function createSocket() * @param string $content * @return string body */ - private function createBody(string $host, string $content) + private function createBody(string $host, string $content, int $attempt = 1) { $req = "POST /v1/batch HTTP/1.1\r\n"; $req .= 'Host: ' . $host . "\r\n"; @@ -110,6 +110,11 @@ private function createBody(string $host, string $content) $libVersion = $library['version']; $req .= "User-Agent: $libName/$libVersion\r\n"; + // X-Retry-Count: omit on first attempt, send on retries + if ($attempt > 1) { + $req .= 'X-Retry-Count: ' . ($attempt - 1) . "\r\n"; + } + // Compress content if compress_request is true if ($this->compress_request) { $content = gzencode($content); @@ -134,8 +139,17 @@ private function createBody(string $host, string $content) } /** - * Attempt to write the request to the socket, wait for response if debug - * mode is enabled. + * Socket consumer retry limitations (maintenance mode): + * + * - Retry-After header: NOT fully supported (socket only reads first 2048 + * bytes of response; full header parsing not implemented). Falls back to + * exponential backoff on 429. + * - Status code classification: Full support (retryable vs non-retryable + * per e2e spec, via parent isRetryable()). + * - X-Retry-Count: Supported. + * - Backoff: Exponential with cap (maximum_backoff_duration). + * + * For full Retry-After support, use the default LibCurl consumer. * * @param resource|false $socket the handle for the socket * @param string $req request body @@ -144,12 +158,12 @@ private function createBody(string $host, string $content) private function makeRequest($socket, string $req): bool { $bytes_written = 0; - $bytes_total = strlen($req); - $closed = false; - $success = true; + $bytes_total = strlen($req); + $closed = false; // Retries with exponential backoff until success $backoff = 100; // Set initial waiting time to 100ms + $attempt = 1; while (true) { // Send request to server @@ -167,39 +181,47 @@ private function makeRequest($socket, string $req): bool $statusCode = 0; if (!$closed) { - $res = self::parseResponse(fread($socket, 2048)); + $res = self::parseResponse(fread($socket, 2048)); $statusCode = (int)$res['status']; } fclose($socket); - // If status code is 200, return true - if ($statusCode === 200) { + // 2xx and 3xx are success + if ($statusCode >= 200 && $statusCode < 400) { return true; } - // If status code is greater than 500 and less than 600, it indicates server error - // Error code 429 indicates rate limited. - // Retry uploading in these cases. - if (($statusCode >= 500 && $statusCode <= 600) || $statusCode === 429 || $statusCode === 0) { - if ($backoff >= $this->maximum_backoff_duration) { - break; - } - - usleep($backoff * 1000); - } elseif ($statusCode >= 400) { + // Non-retryable or backoff budget exhausted + if (!$this->isRetryable($statusCode) && $statusCode !== 0) { if ($this->debug()) { $this->handleError($res['status'], $res['message']); } + return false; + } + + if ($backoff >= $this->maximum_backoff_duration) { break; } - // Retry uploading... + usleep($backoff * 1000); $backoff *= 2; + $attempt++; + $socket = $this->createSocket(); + if (!$socket) { + return false; + } + + // Rebuild request with updated X-Retry-Count + $content_json = json_decode($req, true); + // Re-create body with new attempt count (reuse original payload via flushBatch flow) + $bytes_written = 0; + $bytes_total = strlen($req); + $closed = false; } - return $success; + return false; } /**