-
Notifications
You must be signed in to change notification settings - Fork 150
Payload limit configuration and validation #1288
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
This reverts commit 95a66d7.
temporalio/converter.py
Outdated
| payload_upload_error_limit: int | Literal["disabled"] | None = None | ||
| """The limit at which a payloads size error is created.""" | ||
| payload_upload_warning_limit: int | Literal["disabled"] | None = None | ||
| """The limit at which a payloads size warning is created.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Literal disabled works for me, though could also use < 0 (e.g. -1), no strong preference. But what does None mean in this case, doesn't that also mean disabled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll remove "disabled" in favor of just setting it to 0 or less for disablement. None will be used for a future change where we'll get the defaults from the server and merge those to make the effective payload limits for workers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't really make None mean something different later than now IMO. I think we need the proper default here, and we should not accept None right now IMO and we can add it later if we want (but ideally pre-GA). Can you clarify what the default is meant to be today for payload limits (recognizing we may not get them from the server in all cases including at the moment)? Is the default meant to be no limits?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
None will mean "use default values of the context in which the data converter is used". So when within a worker, it will get default values from the server. When used in a client or replayer, it will have no default values and thus limit checks are disabled. Specifying a value other than None and > 0 will unconditionally enable the limit checks.
Maybe there's a better representation of the "use default values". Maybe Literal["default"] or sentinel class and thus the config becomes:
payload_upload_error_limit: int | Literal["default"] = "default"
or
class DefaultPayloadLimit:
pass
...
payload_upload_error_limit: int | DefaultPayloadLimit = DefaultPayloadLimit
Sentinel class is slightly more difficult if we decide to allow specifying these via env vars or envconfig.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
None will mean "use default values of the context in which the data converter is used". So when within a worker, it will get default values from the server. When used in a client or replayer, it will have no default values and thus limit checks are disabled.
This is inconsistent and difficult to comprehend from a user POV. IMO it needs to be easier to understand and therefore cohesive. I think we should consider a common set of defaults across the SDKs regardless of how payloads are used.
Maybe there's a better representation of the "use default values"
It's less about "use default values" and more about inconsistent behavior for serialization depending on where the converter is used.
At the very least, I think we should have a integer literal warning default in all SDKs that is not server derived and therefore usable everywhere since warnings are mostly harmless from a compatibility POV. If we have to have erroring have inconsistent defaults based on where used, we can (begrudgingly, with API doc clarifying why we have chosen to be inconsistent).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At the very least, I think we should have a integer literal warning default in all SDKs that is not server derived and therefore usable everywhere since warnings are mostly harmless from a compatibility POV.
I've updated the warning default to be 512 KiB.
The error limit default will still be context dependent e.g. client has no error limit vs worker has error limit prescribed by server. The error limit can be overridden or disabled in options.
temporalio/worker/_activity.py
Outdated
| elif isinstance( | ||
| err, | ||
| temporalio.exceptions.PayloadsSizeError, | ||
| ): | ||
| temporalio.activity.logger.warning( | ||
| "Activity task failed: payloads size exceeded the error limit. Size: %d bytes, Limit: %d bytes", | ||
| err.payloads_size, | ||
| err.payloads_limit, | ||
| extra={"__temporal_error_identifier": "ActivityFailure"}, | ||
| ) | ||
| await data_converter.encode_failure( | ||
| temporalio.exceptions.ApplicationError( | ||
| type="PayloadsTooLarge", | ||
| message="Payloads size has exceeded the error limit.", | ||
| ), | ||
| completion.result.failed.failure, | ||
| ) | ||
| # TODO: Add force_cause to activity Failure bridge proto? | ||
| # TODO: Add WORKFLOW_TASK_FAILED_CAUSE_PAYLOADS_TOO_LARGE to API | ||
| # completion.result.failed.force_cause = WorkflowTaskFailedCause.WORKFLOW_TASK_FAILED_CAUSE_PAYLOADS_TOO_LARGE |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should let the traditional error path run here. Assuming a readable error message on the error, I think the only reason I can see not doing so is to have the ApplicationError.type be PayloadsTooLarge instead of PayloadsSizeError, but we can either alter the failure converter, or make just that slight specialization in the catch here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Are you suggesting that
PayloadSizeErrorshould extend fromApplicationError? - We want a specialized warning message that better describes what the issue is and give better guidance in the worker output as to what went wrong rather and how to fix it than the standard "Completing activity as failed" message. My understanding is that log messages from sdk-core won't be surfaced in the worker unless a logger is configured via the telemetry configuration. And even then, I haven't seen activity failures get routed through there (see https://github.com/temporalio/sdk-python/pull/1288/changes#diff-3162a4b842d45d546da93b825218f7f863b6b481684d2b9570a38b04facb266bR8833), but maybe I'm doing something wrong with that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On extending from ApplicationError, note that PayloadSizeError will be thrown from Client invocations if payload limits are configured. This is in contrast to the description of the ApplicationError which states "Error raised during workflow/activity execution."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you suggesting that PayloadSizeError should extend from ApplicationError?
No, in these situations, all non-Temporal-failure exceptions automatically convert to application error with their unqualified class name as the error type
We want a specialized warning message that better describes what the issue is and give better guidance in the worker output as to what went wrong rather and how to fix it than the standard "Completing activity as failed" message.
It sounds like everyone should get this message and not just this log statement. Therefore, such a message should be part of the error, not the log.
| temporalio.exceptions.CancelledError("Cancelled"), | ||
| completion.result.cancelled.failure, | ||
| ) | ||
| elif isinstance( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note, this doesn't catch cases where encoding the failure includes payloads too large, may want to handle this error in the outer except. This can happen when application error details are too large, or when stack trace is too large and it's moved to encoded attributes (see temporalio/features#597).
temporalio/converter.py
Outdated
| payload_upload_error_limit: int | Literal["disabled"] | None = None | ||
| """The limit at which a payloads size error is created.""" | ||
| payload_upload_warning_limit: int | Literal["disabled"] | None = None | ||
| """The limit at which a payloads size warning is created.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't really make None mean something different later than now IMO. I think we need the proper default here, and we should not accept None right now IMO and we can add it later if we want (but ideally pre-GA). Can you clarify what the default is meant to be today for payload limits (recognizing we may not get them from the server in all cases including at the moment)? Is the default meant to be no limits?
temporalio/worker/_activity.py
Outdated
| elif isinstance( | ||
| err, | ||
| temporalio.exceptions.PayloadsSizeError, | ||
| ): | ||
| temporalio.activity.logger.warning( | ||
| "Activity task failed: payloads size exceeded the error limit. Size: %d bytes, Limit: %d bytes", | ||
| err.payloads_size, | ||
| err.payloads_limit, | ||
| extra={"__temporal_error_identifier": "ActivityFailure"}, | ||
| ) | ||
| await data_converter.encode_failure( | ||
| temporalio.exceptions.ApplicationError( | ||
| type="PayloadsTooLarge", | ||
| message="Payloads size has exceeded the error limit.", | ||
| ), | ||
| completion.result.failed.failure, | ||
| ) | ||
| # TODO: Add force_cause to activity Failure bridge proto? | ||
| # TODO: Add WORKFLOW_TASK_FAILED_CAUSE_PAYLOADS_TOO_LARGE to API | ||
| # completion.result.failed.force_cause = WorkflowTaskFailedCause.WORKFLOW_TASK_FAILED_CAUSE_PAYLOADS_TOO_LARGE |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you suggesting that PayloadSizeError should extend from ApplicationError?
No, in these situations, all non-Temporal-failure exceptions automatically convert to application error with their unqualified class name as the error type
We want a specialized warning message that better describes what the issue is and give better guidance in the worker output as to what went wrong rather and how to fix it than the standard "Completing activity as failed" message.
It sounds like everyone should get this message and not just this log statement. Therefore, such a message should be part of the error, not the log.
What was changed
Update the SDK to check the size of payload collections and issue warnings and errors when the size exceeds limits:
This is done by:
PayloadLimitsConfigclass for configuring the warning and error limits. The error limit are not defined by default whereas the warning limit is set to 512 KiB.payload_limitsproperty toDataConverterof typePayloadLimitsConfig.DataConverterfor encoding and decoding payloads in all forms. The encoding methods call_validate_payload_limitsbefore returning.DataConveterinstead of usingpayload_codecdirectly.Examples
Log output when an activity attempts to return a result that exceeds the error limit:
Log output when a workflow attempts to provide an activity input that exceeds the error limit:
Note that the above example is missing the extra context that the activity result failure example has. This is due to the available logging infra where these errors are raised and can be fixed separately with some log refactoring (see deferred items).
Log output when a workflow attempts to provide activity input that is above the warning threshold but below the error limit:
Same note about the missing extra context.
Deferred
Work that has been deferred to later PRs (unless requested to pull back in):
WorkflowTaskFailedCauseto indicate the specific cause of the failure scenario. Pending Add a new workflow failure cause for oversized payloads api#697, integration into sdk-core, and sdk-core into sdk-python._validate_payload_limitsto get rich information about the execution context when issuing a warning._WorkflowWorker::_handle_activationto get rich information about the execution context when issuing a warning when exceeding the payload error limit.Why?
Users need to know when payload sizes are approaching or have exceeded size limits. This will help prevent workflow outages and inform users to adjust their workflows to make use of alternate storage methods or to break down their payloads more granularly.
Checklist
Closes Warn if the SDK tried to send a payload above a specific size #1284
Closes SDK should fail workflow task if payloads size it known to be too large #1285
How was this tested: Unit tests
Any docs updates needed? Yes