Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion e2e-cli/e2e-config.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"sdk": "php",
"test_suites": "basic",
"test_suites": "basic,retry",
"auto_settings": false,
"patch": null,
"env": {}
Expand Down
52 changes: 34 additions & 18 deletions e2e-cli/main.php
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,9 @@ function parseHost(string $apiHost): string
* Build the options array for Segment\Client.
*
* @param array<string,mixed> $input
* @param array<int,string> &$errors collected error messages
* @return array<string,mixed>
*/
function buildClientOptions(array $input, array &$errors): array
function buildClientOptions(array $input): array
{
$config = $input['config'] ?? [];
$apiHost = $input['apiHost'] ?? '';
Expand All @@ -154,10 +153,11 @@ function buildClientOptions(array $input, array &$errors): array
// mock test server (the base LibCurl hardcodes https://).
'consumer' => E2eLibCurl::class,
'protocol' => $scheme,
'error_handler' => function (int $code, string $message) use (&$errors): void {
$msg = "HTTP {$code}: {$message}";
debugLog('SDK error — ' . $msg);
$errors[] = $msg;
// Log HTTP errors to stderr only — success/failure is determined by
// track()/flush() return values, not by the error_handler callback,
// because handleError fires for transient retry errors too.
'error_handler' => function (int $code, string $message): void {
debugLog("SDK HTTP error {$code}: {$message}");
},
];

Expand All @@ -176,6 +176,11 @@ function buildClientOptions(array $input, array &$errors): array
debugLog('curl_timeout: ' . $options['curl_timeout']);
}

if (isset($config['maxRetries']) && is_numeric($config['maxRetries'])) {
$options['retry_count'] = (int)$config['maxRetries'];
debugLog('retry_count: ' . $options['retry_count']);
}

return $options;
}

Expand Down Expand Up @@ -241,9 +246,10 @@ function buildMessage(array $event): array
}

$errors = [];
$autoFlushFailed = false; // set true if an enqueue() auto-flush returns false

// Build client options (error_handler captures into $errors by reference)
$options = buildClientOptions($input, $errors);
// Build client options (error_handler just logs; we track success via return values)
$options = buildClientOptions($input);

debugLog('Creating Segment\\Client with writeKey=' . substr($writeKey, 0, 4) . '...');

Expand All @@ -268,30 +274,35 @@ function buildMessage(array $event): array

debugLog(" [{$seqIndex}/{$eventIndex}] Enqueueing {$type}");

$enqueueOk = true;
switch ($type) {
case 'track':
$client->track($message);
$enqueueOk = $client->track($message);
break;
case 'identify':
$client->identify($message);
$enqueueOk = $client->identify($message);
break;
case 'page':
$client->page($message);
$enqueueOk = $client->page($message);
break;
case 'screen':
$client->screen($message);
$enqueueOk = $client->screen($message);
break;
case 'alias':
$client->alias($message);
$enqueueOk = $client->alias($message);
break;
case 'group':
$client->group($message);
$enqueueOk = $client->group($message);
break;
default:
$errors[] = "Unknown event type: {$type}";
debugLog(" Unknown event type: {$type}");
break;
}
if (!$enqueueOk) {
$autoFlushFailed = true;
debugLog(" Enqueue/auto-flush failed for {$type}");
}
}
}

Expand All @@ -306,14 +317,19 @@ function buildMessage(array $event): array
$errors[] = 'Flush failed';
}

$hasErrors = !empty($errors);
$success = $flushOk && !$hasErrors;
// Success = all flushes succeeded and no fatal errors.
// auto-flushes (from enqueue when flush_at reached) and explicit flush are both tracked.
$overallSuccess = $flushOk && !$autoFlushFailed && empty($errors);

if ($success) {
if ($overallSuccess) {
outputResult(true, $sentBatches);
exit(0);
} else {
$errorMsg = implode('; ', $errors);
$allErrors = array_merge(
$errors,
$autoFlushFailed ? ['Auto-flush failed'] : []
);
$errorMsg = implode('; ', $allErrors ?: ['Unknown flush failure']);
outputResult(false, $sentBatches, $errorMsg);
exit(1);
}
148 changes: 94 additions & 54 deletions lib/Consumer/LibCurl.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,92 +9,132 @@ class LibCurl extends QueueConsumer
protected string $type = 'LibCurl';

/**
* Make a sync request to our API. If debug is
* enabled, we wait for the response
* and retry once to diminish impact on performance.
* Send a batch of messages to the API with spec-compliant retry logic:
* - 2xx/3xx: success
* - 429 + Retry-After: sleep without consuming retry budget
* - 429 without Retry-After / other retryable (5xx except 501/505/511,
* 408/410/460): exponential backoff, counts against retry budget
* - Non-retryable 4xx / 501/505/511: drop immediately
*
* @param array $messages array of all the messages to send
* @return bool whether the request succeeded
*/
public function flushBatch(array $messages): bool
{
$body = $this->payload($messages);
$body = $this->payload($messages);
$payload = json_encode($body);
$secret = $this->secret;
$secret = $this->secret;

if ($this->compress_request) {
$payload = gzencode($payload);
}

if ($this->host) {
$host = $this->host;
} else {
$host = 'api.segment.io';
}
$path = '/v1/batch';
$url = $this->protocol . $host . $path;
$host = $this->host ?: 'api.segment.io';
$url = $this->protocol . $host . '/v1/batch';

$backoff = 100; // Set initial waiting time to 100ms
$library = $messages[0]['context']['library'];
$userAgent = $library['name'] . '/' . $library['version'];

while ($backoff < $this->maximum_backoff_duration) {
// open connection
$ch = curl_init();
$backoffMs = 500; // base 500ms per e2e spec
$backoffCapMs = 60000; // cap 60s
$retriesRemaining = $this->retry_count;
$attempt = 0;
$backoffStartTime = null;
$rateLimitStartTime = null;

// set the url, number of POST vars, POST data
curl_setopt($ch, CURLOPT_USERPWD, $secret . ':');
curl_setopt($ch, CURLOPT_POSTFIELDS, $payload);
curl_setopt($ch, CURLOPT_TIMEOUT, $this->curl_timeout);
curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, $this->curl_connecttimeout);
while (true) {
$attempt++;
$responseHeaders = [];

$ch = curl_init();

// set variables for headers
$header = [];
$header[] = 'Content-Type: application/json';
$headers = [
'Content-Type: application/json',
'User-Agent: ' . $userAgent,
];

if ($this->compress_request) {
$header[] = 'Content-Encoding: gzip';
$headers[] = 'Content-Encoding: gzip';
}

// Send user agent in the form of {library_name}/{library_version} as per RFC 7231.
$library = $messages[0]['context']['library'];
$libName = $library['name'];
$libVersion = $library['version'];
$header[] = "User-Agent: $libName/$libVersion";
if ($attempt > 1) {
$headers[] = 'X-Retry-Count: ' . ($attempt - 1);
}

curl_setopt($ch, CURLOPT_HTTPHEADER, $header);
curl_setopt($ch, CURLOPT_URL, $url);
curl_setopt($ch, CURLOPT_USERPWD, $secret . ':');
curl_setopt($ch, CURLOPT_POSTFIELDS, $payload);
curl_setopt($ch, CURLOPT_TIMEOUT, $this->curl_timeout);
curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, $this->curl_connecttimeout);
curl_setopt($ch, CURLOPT_HTTPHEADER, $headers);
curl_setopt($ch, CURLOPT_URL, $url);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
curl_setopt($ch, CURLOPT_HEADERFUNCTION, function ($curl, $header) use (&$responseHeaders) {
$parts = explode(':', $header, 2);
if (count($parts) === 2) {
$responseHeaders[strtolower(trim($parts[0]))] = trim($parts[1]);
}

return strlen($header);
});

// retry failed requests just once to diminish impact on performance
$responseContent = curl_exec($ch);
$err = curl_error($ch);
$responseCode = (int)curl_getinfo($ch, CURLINFO_HTTP_CODE);
curl_close($ch);

$err = curl_error($ch);
if ($err) {
$this->handleError(curl_errno($ch), $err);
$this->handleError(0, $err);

return false;
}

$responseCode = (int)curl_getinfo($ch, CURLINFO_HTTP_CODE);
// 2xx and 3xx are success
if ($responseCode >= 200 && $responseCode < 400) {
return true;
}

//close connection
curl_close($ch);
$this->handleError($responseCode, $responseContent);

// 429: check for Retry-After header first
if ($responseCode === 429) {
$retryAfterS = $this->parseRetryAfter($responseHeaders['retry-after'] ?? null);

if ($retryAfterS !== null) {
if ($rateLimitStartTime === null) {
$rateLimitStartTime = microtime(true);
}

if ($responseCode !== 200) {
// log error
$this->handleError($responseCode, $responseContent);

if (($responseCode >= 500 && $responseCode <= 600) || $responseCode === 429) {
// If status code is greater than 500 and less than 600, it indicates server error
// Error code 429 indicates rate limited.
// Retry uploading in these cases.
usleep($backoff * 1000);
$backoff *= 2;
} elseif ($responseCode >= 400) {
break;
if ((microtime(true) - $rateLimitStartTime) * 1000 >= $this->max_rate_limit_duration_ms) {
return false;
}

$sleepMs = min($retryAfterS * 1000, $this->rate_limit_retry_after_cap_s * 1000);
usleep($sleepMs * 1000);
continue; // Do NOT decrement retriesRemaining
}
} else {
break; // no error
// No Retry-After: fall through to counted backoff
}
}

return true;
if (!$this->isRetryable($responseCode)) {
return false;
}

$retriesRemaining--;

if ($retriesRemaining <= 0) {
return false;
}

if ($backoffStartTime === null) {
$backoffStartTime = microtime(true);
}

if ((microtime(true) - $backoffStartTime) * 1000 >= $this->max_total_backoff_duration_ms) {
return false;
}

usleep($backoffMs * 1000);
$backoffMs = min($backoffMs * 2, $backoffCapMs);
}
}
}
Loading