Key Features

This example demonstrates how to implement custom retry logic when using third-party services in your Upstash Workflow.

We’ll use OpenAI as an example for such a third-party service. Our retry logic uses response status codes and headers to control when to retry, sleep, or store the third-party API response.

Code Example

The following code:

  1. Attempts to make an API call up to 10 times.
  2. Dynamically adjusts request delays based on response headers or status.
  3. Stores successful responses asynchronously.
api/workflow/route.ts
import { serve } from "@upstash/workflow/nextjs"
import { storeResponse } from "@/lib/utils"

const BASE_DELAY = 10;

const createSystemMessage = () => ({
  role: "system",
  content: "You are an AI assistant providing a brief summary and key insights for any given data.",
})

const createUserMessage = (data: string) => ({
  role: "user",
  content: `Analyze this data chunk: ${data}`,
})

export const { POST } = serve<{ userData: string }>(async (context) => {
  // 👇 initial data sent along when triggering the workflow
  const { userData } = context.requestPayload

  for (let attempt = 0; attempt < 10; attempt++) {
    const response = await context.api.openai.call(`call-openai`, {
      token: process.env.OPENAI_API_KEY!,
      operation: "chat.completions.create",
      body: {
        model: "gpt-3.5-turbo",
        messages: [createSystemMessage(), createUserMessage(userData)],
        max_completion_tokens: 150,
      },
    })

    // Success case
    if (response.status < 300) {
      await context.run("store-response-in-db", () => storeResponse(response.body))
      return
    }

    // Rate limit case - wait and retry
    if (response.status === 429) {
      const resetTime =
        response.header["x-ratelimit-reset-tokens"]?.[0] ||
        response.header["x-ratelimit-reset-requests"]?.[0] ||
        BASE_DELAY

      // assuming `resetTime` is in seconds
      await context.sleep("sleep-until-retry", Number(resetTime))

      continue
    }

    // Any other scenario - pause for 5 seconds to avoid overloading OpenAI API
    await context.sleep("pause-to-avoid-spam", 5)
  }
})

Code Breakdown

1. Setting up our Workflow

This POST endpoint serves our workflow. We create a loop to attempt the API call (we’re about to write) up to 10 times.

export const { POST } = serve<{ userData: string }>(async (context) => {
  for (let attempt = 0; attempt < 10; attempt++) {
    // TODO: call API in here
  }
})

2. Making a Third-Party API Call

We use context.api.openai.call to send a request to OpenAI.

context.api.openai.call uses context.call in the background and using context.call to request data from an API is one of the most powerful Upstash Workflow features. Your request can take much longer than any function timeout would normally allow, completely bypassing any platform-specific timeout limits.

Our request to OpenAI includes an auth header, model parameters, and the data to be processed by the AI. The response from this function call (response) is used to determine our retry logic based on its status code and headers.

const response = await context.api.openai.call(`call-openai`, {
  token: process.env.OPENAI_API_KEY,
  operation: "chat.completions.create",
  body: {
    model: "gpt-3.5-turbo",
    messages: [createSystemMessage(), createUserMessage(userData)],
    max_completion_tokens: 150,
  },
})

3. Processing a Successful Response (Status Code < 300)

If the OpenAI response is successful (status code under 300), we store the response in our database. We create a new workflow task (workflow.run) to do this for maximum reliability.

if (response.status < 300) {
  await context.run("store-response-in-db", () => storeResponse(response.body))
  return
}

4. Handling Rate Limits (Status Code 429)

If the API response indicates a rate limit error (status code 429), we retrieve our rate limit reset values from the response headers. We calculate the time until the rate limit resets and then pause execution (workflow.sleep) for this duration.

if (response.status === 429) {
  const resetTime =
    response.header["x-ratelimit-reset-tokens"]?.[0] ||
    response.header["x-ratelimit-reset-requests"]?.[0] ||
    BASE_DELAY

  // assuming `resetTime` is in seconds
  await context.sleep("sleep-until-retry", Number(resetTime))

  continue
}

5. Waiting Before the Next Retry Attempt

To avoid making too many requests in a short period and possibly overloading the OpenAI API, we pause our workflow before the next retry attempt (i.e., 5 seconds), regardless of rate limits.

await context.sleep("pause-to-avoid-spam", 5)