diff --git a/tools/fastswap-miles/miles.go b/tools/fastswap-miles/miles.go index 3b396d33e..45806ef15 100644 --- a/tools/fastswap-miles/miles.go +++ b/tools/fastswap-miles/miles.go @@ -43,7 +43,7 @@ type ethRow struct { surplus string gasCost sql.NullString inputToken string - blockTS time.Time + blockTS sql.NullTime } type erc20Row struct { @@ -53,7 +53,7 @@ type erc20Row struct { surplus string gasCost sql.NullString inputToken string - blockTS time.Time + blockTS sql.NullTime } type tokenBatch struct { @@ -252,26 +252,30 @@ WHERE processed = false erc20FastRPCSet := batchCheckFastRPC(cfg.Logger, cfg.DB, allErc20Hashes) for token, batch := range batches { - gasCosts := make([]*big.Int, len(batch.Txs)) - bidCosts := make([]*big.Int, len(batch.Txs)) totalOriginalGasCost := big.NewInt(0) totalOriginalBidCost := big.NewInt(0) - var skippedRows []erc20Row + // First pass: separate rows into ready, pending-bid, and not-in-fastrpc. + // Pending-bid rows are excluded from the batch so they retry next cycle. + var readyTxs []erc20Row + var readyGasCosts []*big.Int + var readyBidCosts []*big.Int - for i, r := range batch.Txs { + for _, r := range batch.Txs { bidCostWei := getBidCost(erc20BidMap, r.txHash) if bidCostWei.Sign() == 0 { if erc20FastRPCSet[strings.ToLower(r.txHash)] { cfg.Logger.Info("erc20 tx in FastRPC but bid not indexed yet, will retry", slog.String("tx", r.txHash), slog.String("user", r.user)) - } else { - cfg.Logger.Info("erc20 tx not in FastRPC, skipping with 0 miles", - slog.String("tx", r.txHash), slog.String("user", r.user)) - skippedRows = append(skippedRows, r) + continue // genuinely skip — will retry next cycle + } + cfg.Logger.Info("erc20 tx not in FastRPC, skipping with 0 miles", + slog.String("tx", r.txHash), slog.String("user", r.user)) + surplusWei, _ := new(big.Int).SetString(r.surplus, 10) + if !cfg.DryRun { + markProcessed(cfg.DB, r.txHash, weiToEth(surplusWei), 0, 0, "0") } - gasCosts[i] = big.NewInt(0) - bidCosts[i] = big.NewInt(0) + processed++ continue } @@ -284,25 +288,29 @@ WHERE processed = false } } - gasCosts[i] = gasCostWei - bidCosts[i] = bidCostWei + readyTxs = append(readyTxs, r) + readyGasCosts = append(readyGasCosts, gasCostWei) + readyBidCosts = append(readyBidCosts, bidCostWei) totalOriginalGasCost.Add(totalOriginalGasCost, gasCostWei) totalOriginalBidCost.Add(totalOriginalBidCost, bidCostWei) } - for _, r := range skippedRows { + if len(readyTxs) == 0 { + continue + } + + // Recalculate TotalSum for only the ready rows + readyTotalSum := big.NewInt(0) + for _, r := range readyTxs { surplusWei, _ := new(big.Int).SetString(r.surplus, 10) - if !cfg.DryRun { - markProcessed(cfg.DB, r.txHash, weiToEth(surplusWei), 0, 0, "0") - } - processed++ + readyTotalSum.Add(readyTotalSum, surplusWei) } reqBody := barterRequest{ Source: token, Target: cfg.WETH.Hex(), - SellAmount: batch.TotalSum.String(), + SellAmount: readyTotalSum.String(), Recipient: cfg.ExecutorAddr.Hex(), Origin: cfg.ExecutorAddr.Hex(), MinReturnFraction: 0.98, @@ -351,14 +359,14 @@ WHERE processed = false if cfg.DryRun { cfg.Logger.Info("simulated sweep", - slog.String("amount", batch.TotalSum.String()), + slog.String("amount", readyTotalSum.String()), slog.String("token", token), slog.Float64("return_eth", weiToEth(expectedEthReturn)), slog.Float64("gas_eth", weiToEth(expectedGasCost))) actualEthReturn = expectedEthReturn actualSwapGasCost = expectedGasCost } else { - actualEthReturn, actualSwapGasCost, err = submitFastSwapSweep(ctx, cfg.Logger, cfg.Client, cfg.L1Client, cfg.HTTPClient, cfg.Signer, cfg.ExecutorAddr, common.HexToAddress(token), batch.TotalSum, cfg.FastSwapURL, cfg.FundsRecipient, cfg.SettlementAddr, barterResp, cfg.MaxGasGwei) + actualEthReturn, actualSwapGasCost, err = submitFastSwapSweep(ctx, cfg.Logger, cfg.Client, cfg.L1Client, cfg.HTTPClient, cfg.Signer, cfg.ExecutorAddr, common.HexToAddress(token), readyTotalSum, cfg.FastSwapURL, cfg.FundsRecipient, cfg.SettlementAddr, barterResp, cfg.MaxGasGwei) if err != nil { cfg.Logger.Error("failed to sweep token", slog.String("token", token), slog.Any("error", err)) continue @@ -369,17 +377,17 @@ WHERE processed = false slog.Float64("gas_eth", weiToEth(actualSwapGasCost))) } - for i, r := range batch.Txs { + for i, r := range readyTxs { surplusWei, _ := new(big.Int).SetString(r.surplus, 10) txGrossEth := new(big.Int).Mul(actualEthReturn, surplusWei) - txGrossEth.Div(txGrossEth, batch.TotalSum) + txGrossEth.Div(txGrossEth, readyTotalSum) txOverheadGas := new(big.Int).Mul(actualSwapGasCost, surplusWei) - txOverheadGas.Div(txOverheadGas, batch.TotalSum) + txOverheadGas.Div(txOverheadGas, readyTotalSum) - txNetProfit := new(big.Int).Sub(txGrossEth, gasCosts[i]) - txNetProfit.Sub(txNetProfit, bidCosts[i]) + txNetProfit := new(big.Int).Sub(txGrossEth, readyGasCosts[i]) + txNetProfit.Sub(txNetProfit, readyBidCosts[i]) txNetProfit.Sub(txNetProfit, txOverheadGas) surplusEth := weiToEth(txGrossEth) @@ -390,7 +398,7 @@ WHERE processed = false slog.String("tx", r.txHash), slog.String("user", r.user), slog.Float64("gross_eth", surplusEth), slog.Float64("net_profit_eth", netProfitEth)) if !cfg.DryRun { - markProcessed(cfg.DB, r.txHash, surplusEth, netProfitEth, 0, bidCosts[i].String()) + markProcessed(cfg.DB, r.txHash, surplusEth, netProfitEth, 0, readyBidCosts[i].String()) } processed++ continue @@ -405,7 +413,7 @@ WHERE processed = false slog.String("tx", r.txHash), slog.String("user", r.user), slog.Float64("gross_eth", surplusEth), slog.Float64("net_profit_eth", netProfitEth)) if !cfg.DryRun { - markProcessed(cfg.DB, r.txHash, surplusEth, netProfitEth, 0, bidCosts[i].String()) + markProcessed(cfg.DB, r.txHash, surplusEth, netProfitEth, 0, readyBidCosts[i].String()) } processed++ continue @@ -427,10 +435,11 @@ WHERE processed = false miles, ) if err != nil { - cfg.Logger.Error("fuel submit failed (will not retry to avoid duplicate swap)", + cfg.Logger.Error("fuel submit failed, will retry next cycle", slog.String("tx", r.txHash), slog.Any("error", err)) + continue // don't mark processed — retry next cycle } - markProcessed(cfg.DB, r.txHash, surplusEth, netProfitEth, miles.Int64(), bidCosts[i].String()) + markProcessed(cfg.DB, r.txHash, surplusEth, netProfitEth, miles.Int64(), readyBidCosts[i].String()) processed++ } } diff --git a/tools/fastswap-miles/sweep.go b/tools/fastswap-miles/sweep.go index d665d5bbc..be14a48a6 100644 --- a/tools/fastswap-miles/sweep.go +++ b/tools/fastswap-miles/sweep.go @@ -60,7 +60,18 @@ func indexBatch( txHash := ev.Raw.TxHash.Hex() blockNum := ev.Raw.BlockNumber - receipt, err := client.TransactionReceipt(ctx, ev.Raw.TxHash) + var receipt *types.Receipt + for attempt := 0; attempt < 3; attempt++ { + receipt, err = client.TransactionReceipt(ctx, ev.Raw.TxHash) + if err == nil { + break + } + if strings.Contains(err.Error(), "429") || strings.Contains(err.Error(), "Too Many Requests") { + time.Sleep(time.Duration(attempt+1) * 2 * time.Second) + continue + } + break + } if err != nil { logger.Warn("receipt fetch failed, skipping gas cost", slog.String("tx", txHash), slog.Any("error", err)) } @@ -72,7 +83,18 @@ func indexBatch( ) } - header, err := client.HeaderByNumber(ctx, new(big.Int).SetUint64(blockNum)) + var header *types.Header + for attempt := 0; attempt < 3; attempt++ { + header, err = client.HeaderByNumber(ctx, new(big.Int).SetUint64(blockNum)) + if err == nil { + break + } + if strings.Contains(err.Error(), "429") || strings.Contains(err.Error(), "Too Many Requests") { + time.Sleep(time.Duration(attempt+1) * 2 * time.Second) + continue + } + break + } if err != nil { logger.Warn("header fetch failed", slog.Uint64("block", blockNum), slog.Any("error", err)) }