Skip to content

Commit 8e222fa

Browse files
authored
improvement(polling): fix correctness and efficiency across all polling handlers (#4067)
* improvement(polling): fix correctness and efficiency across all polling handlers - Gmail: paginate history API, add historyTypes filter, differentiate 403/429, fetch fresh historyId on fallback to break 404 retry loop - Outlook: follow @odata.nextLink pagination, use fetchWithRetry for all Graph calls, fix $top alignment, skip folder filter on partial resolution failure, remove Content-Type from GET requests - RSS: add conditional GET (ETag/If-None-Match), raise GUID cap to 500, fix 304 ETag capture per RFC 9111, align GUID tracking with idempotency fallback key - IMAP: single connection reuse, UIDVALIDITY tracking per mailbox, advance UID only on successful fetch, fix messageFlagsAdd range type, remove cross-mailbox legacy UID fallback - Dispatch polling via trigger.dev task with per-provider concurrency key; fall back to synchronous Redis-locked polling for self-hosted * fix(rss): align idempotency key GUID fallback with tracking/filter guard * removed comments * fix(imap): clear stale UID when UIDVALIDITY changes during state merge * fix(rss): skip items with no identifiable GUID to avoid idempotency key collisions * fix(schedules): convert dynamic import of getWorkflowById to static import * fix(imap): preserve fresh UID after UIDVALIDITY reset in state merge * improvement(polling): remove trigger.dev dispatch, use synchronous Redis-locked polling * fix(polling): decouple outlook page size from total email cap so pagination works
1 parent b67c068 commit 8e222fa

File tree

6 files changed

+441
-277
lines changed

6 files changed

+441
-277
lines changed

apps/sim/app/api/schedules/execute/route.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import { createBullMQJobData, isBullMQEnabled } from '@/lib/core/bullmq'
88
import { generateRequestId } from '@/lib/core/utils/request'
99
import { generateId } from '@/lib/core/utils/uuid'
1010
import { enqueueWorkspaceDispatch } from '@/lib/core/workspace-dispatch'
11+
import { getWorkflowById } from '@/lib/workflows/utils'
1112
import {
1213
executeJobInline,
1314
executeScheduleJob,
@@ -115,7 +116,6 @@ export async function GET(request: NextRequest) {
115116
}
116117

117118
try {
118-
const { getWorkflowById } = await import('@/lib/workflows/utils')
119119
const resolvedWorkflow = schedule.workflowId
120120
? await getWorkflowById(schedule.workflowId)
121121
: null

apps/sim/app/api/webhooks/poll/[provider]/route.ts

Lines changed: 30 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,6 @@ export async function GET(
2020
const { provider } = await params
2121
const requestId = generateShortId()
2222

23-
const LOCK_KEY = `${provider}-polling-lock`
24-
let lockValue: string | undefined
25-
2623
try {
2724
const authError = verifyCronAuth(request, `${provider} webhook polling`)
2825
if (authError) return authError
@@ -31,29 +28,38 @@ export async function GET(
3128
return NextResponse.json({ error: `Unknown polling provider: ${provider}` }, { status: 404 })
3229
}
3330

34-
lockValue = requestId
35-
const locked = await acquireLock(LOCK_KEY, lockValue, LOCK_TTL_SECONDS)
36-
if (!locked) {
37-
return NextResponse.json(
38-
{
39-
success: true,
40-
message: 'Polling already in progress – skipped',
41-
requestId,
42-
status: 'skip',
43-
},
44-
{ status: 202 }
45-
)
46-
}
31+
const LOCK_KEY = `${provider}-polling-lock`
32+
let lockValue: string | undefined
33+
34+
try {
35+
lockValue = requestId
36+
const locked = await acquireLock(LOCK_KEY, lockValue, LOCK_TTL_SECONDS)
37+
if (!locked) {
38+
return NextResponse.json(
39+
{
40+
success: true,
41+
message: 'Polling already in progress – skipped',
42+
requestId,
43+
status: 'skip',
44+
},
45+
{ status: 202 }
46+
)
47+
}
4748

48-
const results = await pollProvider(provider)
49+
const results = await pollProvider(provider)
4950

50-
return NextResponse.json({
51-
success: true,
52-
message: `${provider} polling completed`,
53-
requestId,
54-
status: 'completed',
55-
...results,
56-
})
51+
return NextResponse.json({
52+
success: true,
53+
message: `${provider} polling completed`,
54+
requestId,
55+
status: 'completed',
56+
...results,
57+
})
58+
} finally {
59+
if (lockValue) {
60+
await releaseLock(LOCK_KEY, lockValue).catch(() => {})
61+
}
62+
}
5763
} catch (error) {
5864
logger.error(`Error during ${provider} polling (${requestId}):`, error)
5965
return NextResponse.json(
@@ -65,9 +71,5 @@ export async function GET(
6571
},
6672
{ status: 500 }
6773
)
68-
} finally {
69-
if (lockValue) {
70-
await releaseLock(LOCK_KEY, lockValue).catch(() => {})
71-
}
7274
}
7375
}

apps/sim/lib/webhooks/polling/gmail.ts

Lines changed: 75 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -151,44 +151,68 @@ async function fetchNewEmails(
151151
let latestHistoryId = config.historyId
152152

153153
if (useHistoryApi) {
154-
const historyUrl = `https://gmail.googleapis.com/gmail/v1/users/me/history?startHistoryId=${config.historyId}`
154+
const messageIds = new Set<string>()
155+
let pageToken: string | undefined
155156

156-
const historyResponse = await fetch(historyUrl, {
157-
headers: { Authorization: `Bearer ${accessToken}` },
158-
})
157+
do {
158+
let historyUrl = `https://gmail.googleapis.com/gmail/v1/users/me/history?startHistoryId=${config.historyId}&historyTypes=messageAdded`
159+
if (pageToken) {
160+
historyUrl += `&pageToken=${pageToken}`
161+
}
159162

160-
if (!historyResponse.ok) {
161-
const errorData = await historyResponse.json()
162-
logger.error(`[${requestId}] Gmail history API error:`, {
163-
status: historyResponse.status,
164-
statusText: historyResponse.statusText,
165-
error: errorData,
163+
const historyResponse = await fetch(historyUrl, {
164+
headers: { Authorization: `Bearer ${accessToken}` },
166165
})
167166

168-
logger.info(`[${requestId}] Falling back to search API after history API failure`)
169-
return searchEmails(accessToken, config, requestId, logger)
170-
}
167+
if (!historyResponse.ok) {
168+
const status = historyResponse.status
169+
const errorData = await historyResponse.json().catch(() => ({}))
170+
logger.error(`[${requestId}] Gmail history API error:`, {
171+
status,
172+
statusText: historyResponse.statusText,
173+
error: errorData,
174+
})
175+
176+
if (status === 403 || status === 429) {
177+
throw new Error(
178+
`Gmail API error ${status} — skipping to retry next poll cycle: ${JSON.stringify(errorData)}`
179+
)
180+
}
171181

172-
const historyData = await historyResponse.json()
182+
logger.info(`[${requestId}] Falling back to search API after history API error ${status}`)
183+
const searchResult = await searchEmails(accessToken, config, requestId, logger)
184+
if (searchResult.emails.length === 0) {
185+
const freshHistoryId = await getGmailProfileHistoryId(accessToken, requestId, logger)
186+
if (freshHistoryId) {
187+
logger.info(
188+
`[${requestId}] Fetched fresh historyId ${freshHistoryId} after invalid historyId (was: ${config.historyId})`
189+
)
190+
return { emails: [], latestHistoryId: freshHistoryId }
191+
}
192+
}
193+
return searchResult
194+
}
173195

174-
if (!historyData.history || !historyData.history.length) {
175-
return { emails: [], latestHistoryId }
176-
}
196+
const historyData = await historyResponse.json()
177197

178-
if (historyData.historyId) {
179-
latestHistoryId = historyData.historyId
180-
}
198+
if (historyData.historyId) {
199+
latestHistoryId = historyData.historyId
200+
}
181201

182-
const messageIds = new Set<string>()
183-
for (const history of historyData.history) {
184-
if (history.messagesAdded) {
185-
for (const messageAdded of history.messagesAdded) {
186-
messageIds.add(messageAdded.message.id)
202+
if (historyData.history) {
203+
for (const history of historyData.history) {
204+
if (history.messagesAdded) {
205+
for (const messageAdded of history.messagesAdded) {
206+
messageIds.add(messageAdded.message.id)
207+
}
208+
}
187209
}
188210
}
189-
}
190211

191-
if (messageIds.size === 0) {
212+
pageToken = historyData.nextPageToken
213+
} while (pageToken)
214+
215+
if (!messageIds.size) {
192216
return { emails: [], latestHistoryId }
193217
}
194218

@@ -352,6 +376,29 @@ async function searchEmails(
352376
}
353377
}
354378

379+
async function getGmailProfileHistoryId(
380+
accessToken: string,
381+
requestId: string,
382+
logger: ReturnType<typeof import('@sim/logger').createLogger>
383+
): Promise<string | null> {
384+
try {
385+
const response = await fetch('https://gmail.googleapis.com/gmail/v1/users/me/profile', {
386+
headers: { Authorization: `Bearer ${accessToken}` },
387+
})
388+
if (!response.ok) {
389+
logger.warn(
390+
`[${requestId}] Failed to fetch Gmail profile for fresh historyId: ${response.status}`
391+
)
392+
return null
393+
}
394+
const profile = await response.json()
395+
return (profile.historyId as string | undefined) ?? null
396+
} catch (error) {
397+
logger.warn(`[${requestId}] Error fetching Gmail profile:`, error)
398+
return null
399+
}
400+
}
401+
355402
async function getEmailDetails(accessToken: string, messageId: string): Promise<GmailEmail> {
356403
const messageUrl = `https://gmail.googleapis.com/gmail/v1/users/me/messages/${messageId}?format=full`
357404

@@ -442,9 +489,7 @@ async function processEmails(
442489
if (headers.date) {
443490
try {
444491
date = new Date(headers.date).toISOString()
445-
} catch (_e) {
446-
// Keep date as null if parsing fails
447-
}
492+
} catch (_e) {}
448493
} else if (email.internalDate) {
449494
date = new Date(Number.parseInt(email.internalDate)).toISOString()
450495
}

0 commit comments

Comments
 (0)