Skip to content

Commit 7567ef7

Browse files
authored
fix: preserve NonRetryableError message when compat flag is enabled (#13560)
1 parent 5a2968a commit 7567ef7

7 files changed

Lines changed: 139 additions & 9 deletions

File tree

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
"@cloudflare/workflows-shared": patch
3+
"wrangler": patch
4+
"miniflare": patch
5+
---
6+
7+
Preserve NonRetryableError message and name when the `workflows_preserve_non_retryable_error_message` compatibility flag is enabled, instead of replacing it with a generic error message.

packages/miniflare/src/plugins/workflows/index.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ export const WorkflowsOptionsSchema = z.object({
2323
.custom<RemoteProxyConnectionString>()
2424
.optional(),
2525
stepLimit: z.number().int().min(1).optional(),
26+
compatibilityFlags: z.string().array().optional(),
2627
})
2728
)
2829
.optional(),
@@ -118,6 +119,10 @@ export const WORKFLOWS_PLUGIN: Plugin<
118119
),
119120
worker: {
120121
compatibilityDate: "2024-10-22",
122+
compatibilityFlags: [
123+
"experimental",
124+
...(workflow.compatibilityFlags ?? []),
125+
],
121126
modules: [
122127
{
123128
name: "workflows.mjs",

packages/workflows-shared/src/context.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import {
66
ABORT_REASONS,
77
InvalidStepReadableStreamError,
88
OversizedStreamChunkError,
9+
PreservedNonRetryableError,
10+
shouldPreserveNonRetryableError,
911
StreamOutputStorageLimitError,
1012
UnsupportedStreamChunkError,
1113
WorkflowFatalError,
@@ -698,15 +700,19 @@ export class Context extends RpcTarget {
698700
(error.name === "NonRetryableError" ||
699701
error.message.startsWith("NonRetryableError"))
700702
) {
703+
const attemptError = shouldPreserveNonRetryableError()
704+
? new PreservedNonRetryableError(e)
705+
: new WorkflowFatalError(
706+
`Step threw a NonRetryableError with message "${e.message}"`
707+
);
708+
701709
this.#engine.writeLog(
702710
InstanceEvent.ATTEMPT_FAILURE,
703711
cacheKey,
704712
stepNameWithCounter,
705713
{
706714
attempt: stepState.attemptedCount,
707-
error: new WorkflowFatalError(
708-
`Step threw a NonRetryableError with message "${e.message}"`
709-
),
715+
error: attemptError,
710716
}
711717
);
712718
this.#engine.writeLog(

packages/workflows-shared/src/engine.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ import {
1313
ABORT_REASONS,
1414
createWorkflowError,
1515
isAbortError,
16+
PreservedNonRetryableError,
17+
shouldPreserveNonRetryableError,
1618
WorkflowFatalError,
1719
} from "./lib/errors";
1820
import {
@@ -1124,10 +1126,14 @@ export class Engine extends DurableObject<Env> {
11241126
err.name === "NonRetryableError" ||
11251127
err.message.startsWith("NonRetryableError")
11261128
) {
1129+
const fatalError = shouldPreserveNonRetryableError()
1130+
? new PreservedNonRetryableError(err)
1131+
: new WorkflowFatalError(
1132+
`The execution of the Workflow instance was terminated, as a step threw an NonRetryableError and it was not handled`
1133+
);
1134+
11271135
this.writeLog(InstanceEvent.WORKFLOW_FAILURE, null, null, {
1128-
error: new WorkflowFatalError(
1129-
`The execution of the Workflow instance was terminated, as a step threw an NonRetryableError and it was not handled`
1130-
),
1136+
error: fatalError,
11311137
});
11321138

11331139
await this.setStatus(accountId, instance.id, InstanceStatus.Errored);

packages/workflows-shared/src/lib/errors.ts

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,22 @@ export class WorkflowFatalError extends Error {
1717
}
1818
}
1919

20+
export class PreservedNonRetryableError extends WorkflowFatalError {
21+
name = "NonRetryableError";
22+
23+
constructor(err: Error) {
24+
// When the error crosses an RPC boundary, the name gets
25+
// prepended to the message (e.g. "NonRetryableError: msg",
26+
// or just "NonRetryableError" if the original message was empty).
27+
// Parse it back out so we surface the original message.
28+
const message =
29+
err.name === "NonRetryableError"
30+
? err.message
31+
: err.message.replace(/^NonRetryableError:?\s*/, "");
32+
super(message);
33+
}
34+
}
35+
2036
export class WorkflowError extends Error {
2137
name = "WorkflowError";
2238
}
@@ -89,3 +105,12 @@ export function isUserTriggeredRestart(e: unknown): boolean {
89105
export function isUserTriggeredTerminate(e: unknown): boolean {
90106
return getErrorMessage(e) === ABORT_REASONS.USER_TERMINATE;
91107
}
108+
109+
function getCompatFlag(name: string): boolean {
110+
// eslint-disable-next-line @typescript-eslint/no-explicit-any -- safe globalThis access for environments where cloudflare global may not exist
111+
return (globalThis as any).Cloudflare?.compatibilityFlags?.[name] ?? false;
112+
}
113+
114+
export function shouldPreserveNonRetryableError(): boolean {
115+
return getCompatFlag("workflows_preserve_non_retryable_error_message");
116+
}

packages/workflows-shared/tests/engine.test.ts

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,77 @@ describe("Engine", () => {
5959
).toHaveLength(1);
6060
});
6161

62+
it("should preserve NonRetryableError message when compat flag is enabled", async ({
63+
expect,
64+
}) => {
65+
const instanceId = "NON-RETRYABLE-PRESERVE-MSG";
66+
const engineId = env.ENGINE.idFromName(instanceId);
67+
const engineStub = env.ENGINE.get(engineId);
68+
69+
vi.stubGlobal("Cloudflare", {
70+
compatibilityFlags: {
71+
workflows_preserve_non_retryable_error_message: true,
72+
},
73+
});
74+
75+
try {
76+
setTestWorkflowCallback(async (_event, step) => {
77+
await step.do("failing-step", async () => {
78+
throw new NonRetryableError("my custom error message");
79+
});
80+
});
81+
82+
await engineStub
83+
.init(
84+
12346,
85+
{} as DatabaseWorkflow,
86+
{} as DatabaseVersion,
87+
{ id: instanceId } as DatabaseInstance,
88+
{ payload: {}, timestamp: new Date(), instanceId }
89+
)
90+
.catch(() => {});
91+
92+
await vi.waitUntil(
93+
async () => {
94+
try {
95+
const logs = (await env.ENGINE.get(
96+
engineId
97+
).readLogs()) as EngineLogs;
98+
return logs.logs.some(
99+
(val) => val.event === InstanceEvent.WORKFLOW_FAILURE
100+
);
101+
} catch (e) {
102+
if (isAbortError(e)) {
103+
return false;
104+
}
105+
throw e;
106+
}
107+
},
108+
{ timeout: 3000 }
109+
);
110+
111+
const logs = (await env.ENGINE.get(engineId).readLogs()) as EngineLogs;
112+
113+
const workflowFailure = logs.logs.find(
114+
(val) => val.event === InstanceEvent.WORKFLOW_FAILURE
115+
);
116+
expect(workflowFailure?.metadata.error).toEqual({
117+
name: "NonRetryableError",
118+
message: "my custom error message",
119+
});
120+
121+
const attemptFailure = logs.logs.find(
122+
(val) => val.event === InstanceEvent.ATTEMPT_FAILURE
123+
);
124+
expect(attemptFailure?.metadata.error).toEqual({
125+
name: "NonRetryableError",
126+
message: "my custom error message",
127+
});
128+
} finally {
129+
vi.unstubAllGlobals();
130+
}
131+
});
132+
62133
it("should not error out if step fails but is try-catched", async ({
63134
expect,
64135
}) => {

packages/wrangler/src/dev/miniflare/index.ts

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,8 @@ function workflowEntry(
312312
remote,
313313
limits,
314314
}: CfWorkflow,
315-
remoteProxyConnectionString?: RemoteProxyConnectionString
315+
remoteProxyConnectionString?: RemoteProxyConnectionString,
316+
compatibilityFlags?: string[]
316317
): [
317318
string,
318319
{
@@ -321,6 +322,7 @@ function workflowEntry(
321322
scriptName?: string;
322323
remoteProxyConnectionString?: RemoteProxyConnectionString;
323324
stepLimit?: number;
325+
compatibilityFlags?: string[];
324326
},
325327
] {
326328
const stepLimit = limits?.steps;
@@ -333,6 +335,7 @@ function workflowEntry(
333335
className,
334336
scriptName,
335337
...(stepLimit !== undefined && { stepLimit }),
338+
compatibilityFlags,
336339
},
337340
];
338341
}
@@ -345,6 +348,7 @@ function workflowEntry(
345348
scriptName,
346349
remoteProxyConnectionString,
347350
...(stepLimit !== undefined && { stepLimit }),
351+
compatibilityFlags,
348352
},
349353
];
350354
}
@@ -450,7 +454,9 @@ type MiniflareBindingsConfig = Pick<
450454
| "containerBuildId"
451455
| "enableContainers"
452456
> &
453-
Partial<Pick<ConfigBundle, "format" | "bundle" | "assets">>;
457+
Partial<
458+
Pick<ConfigBundle, "format" | "bundle" | "assets" | "compatibilityFlags">
459+
>;
454460

455461
// TODO(someday): would be nice to type these methods more, can we export types for
456462
// each plugin options schema and use those
@@ -787,7 +793,11 @@ export function buildMiniflareBindingOptions(
787793
`Configure limits on the worker that defines the workflow.`
788794
);
789795
}
790-
return workflowEntry(workflow, remoteProxyConnectionString);
796+
return workflowEntry(
797+
workflow,
798+
remoteProxyConnectionString,
799+
config.compatibilityFlags
800+
);
791801
})
792802
),
793803
secretsStoreSecrets: Object.fromEntries(

0 commit comments

Comments
 (0)