22 KiB
Process OpenAI batch requests with a Supabase/Postgres FIFO queue
Process OpenAI batch requests with a Supabase/Postgres FIFO queue
1. Workflow Overview
Purpose:
Process multiple OpenAI requests cheaply via the OpenAI Batch API, while tracking jobs in a persistent Supabase/Postgres FIFO queue. The workflow has two phases:
1.1 Phase 1 — Submit Batch (manual / upstream-triggered)
Takes a systemPrompt plus an array of inputs, converts them into OpenAI Batch .jsonl format, uploads the file to OpenAI Files API, creates a batch job via Batches API, then stores the batch id and initial status in Supabase (openai_batches).
1.2 Phase 2 — Poll & Retrieve (scheduled)
Every 5 minutes, pulls the earliest batch whose status is not terminal, retrieves the batch status from OpenAI, and:
- If not completed: updates status in Supabase.
- If completed (and has
output_file_id): downloads output.jsonl, parses it, extracts responses, and updates Supabase with status + results.
2. Block-by-Block Analysis
Block 1 — Submit Batch Input Preparation & Upload
Overview: Converts an array of prompts into the OpenAI Batch .jsonl format, then uploads that .jsonl to OpenAI Files API for batch processing.
Nodes involved:
Start (mock data)Convert to batch requests in .jsonlCall Files API
Node: Start (mock data)
- Type / role: Manual Trigger (
manualTrigger) to simulate upstream input. - Configuration choices: Uses pinned data with:
systemPrompt(string)inputs(array of strings)
- Inputs/Outputs: Entry point → outputs one item containing the mock JSON.
- Edge cases:
- If
inputsis missing or not an array, downstream Code node fails. - If prompts contain unexpected types (objects),
.jsonlline creation may still serialize but the OpenAI API may reject content.
- If
Node: Convert to batch requests in .jsonl
- Type / role: Code node (
code) that generates OpenAI Batch.jsonland returns it as binary (base64). - Key logic/config:
- Reads:
const inputs = $input.first().json.inputsconst systemPrompt = $input.first().json.systemPrompt
- Builds one JSON line per input with:
custom_id:${index}_${Date.now()}``method: "POST"url: "/v1/responses"body: { model: "gpt-5-mini", instructions: systemPrompt, input }
- Joins lines with
\nto create NDJSON. - Base64-encodes content and returns as:
binary.data.data(base64)mimeType: application/x-ndjsonfileName: batch_input.jsonl
- Reads:
- Connections: Input from
Start (mock data)→ output toCall Files API. - Version notes: Code node v2; uses Node.js
Buffer(available in n8n Code node). - Edge cases / failure modes:
- Large
inputscan create oversized.jsonl→ OpenAI file upload limits or timeouts. Date.now()inside map can produce identical timestamps within same ms; uniqueness relies also onindex.- Uses
/v1/responsesformat; if your OpenAI account/model doesn’t support Responses API in batch, batch creation will fail.
- Large
Node: Call Files API
- Type / role: HTTP Request node uploading the
.jsonlto OpenAI Files. - Configuration choices:
POST https://api.openai.com/v1/filesmultipart-form-data- Form fields:
purpose = batchfile= binary fielddata
- Auth:
predefinedCredentialType→openAiApicredential alwaysOutputData: true(so it still outputs something on some error conditions)
- Inputs/Outputs:
- Expects binary
datafrom previous node. - Outputs OpenAI file object (notably
id) used by the next step.
- Expects binary
- Edge cases / failure modes:
- Missing/invalid OpenAI credential → 401/403.
- Incorrect binary field name (must be
data) → empty upload or 400. - Network timeout / file too large → request fails.
Block 2 — Create Batch Job & Persist in Supabase
Overview: Creates the OpenAI batch using the uploaded file id, then inserts a tracking row in Supabase.
Nodes involved:
Call Batch APICreate a row in batch tableSubmission done
Node: Call Batch API
- Type / role: HTTP Request node to create a batch job.
- Configuration choices:
POST https://api.openai.com/v1/batches- JSON body:
input_file_id: {{ $json.id }}(from Files API response)endpoint: "/v1/responses"completion_window: "24h"
- Auth: OpenAI credential
alwaysOutputData: true
- Connections: From
Call Files API→ toCreate a row in batch table. - Edge cases / failure modes:
- If
$json.idis missing (failed upload), body becomes invalid. - Endpoint mismatch: file lines target
/v1/responsesand batch endpoint also/v1/responses—if you change one, you must change the other. - OpenAI may reject unsupported models or invalid request schema in the
.jsonl.
- If
Node: Create a row in batch table
- Type / role: Supabase node (
supabase) to insert (or create) tracking record. - Configuration choices:
- Table:
openai_batches - Fields set:
id = {{ $json.id }}batch_status = {{ $json.status }}
- Table:
- Connections: From
Call Batch API→ toSubmission done. - Credentials:
supabaseApi(namedb2b) - Edge cases / failure modes:
- If
idis PK and already exists → insert conflict (depends on Supabase node behavior; likely fails unless configured for upsert). - Schema mismatch: if
openai_batchestable/columns differ, update fails. batch_statusnull not allowed per note; if OpenAI response lacksstatus, insert fails.
- If
Node: Submission done
- Type / role: NoOp node marking end of submit phase.
- Connections: From
Create a row in batch table. - Edge cases: None (pure terminator/visual marker).
Block 3 — FIFO Polling: Select Next Batch to Check
Overview: On a schedule, selects batches that are not terminal (completed/failed/expired/cancelled), ordered by creation time (FIFO intent), then requests status from OpenAI.
Nodes involved:
Cron Job (5 mins)Get the earliest uncompleted batchCall Batch API to retrieve batch object
Node: Cron Job (5 mins)
- Type / role: Schedule Trigger (
scheduleTrigger) entry point for polling. - Configuration choices: Interval: every minute field (configured as minutes interval; effectively “every 5 mins” per node name, but JSON shows interval with
{field:"minutes"}and no explicit “5” value). - Connections: →
Get the earliest uncompleted batch - Edge cases:
- If interval is misconfigured (e.g., every 1 minute), may spam OpenAI status checks.
- If too infrequent, completed batches will sit longer before retrieval.
Node: Get the earliest uncompleted batch
- Type / role: Postgres node (
postgres) selecting candidate batch rows. - Configuration choices:
- Schema:
public - Table:
openai_batches - Sort: by
created_atascending (FIFO ordering) - Where filters (all applied):
batch_status != failedbatch_status != expiredbatch_status != cancelledbatch_status != completed
returnAll: true(returns all matching rows, not just the earliest)
- Schema:
- Connections: →
Call Batch API to retrieve batch object - FIFO note: Despite node name, it does not
LIMIT 1, so multiple batches may be processed in a single run (n8n will execute downstream once per item). - Edge cases / failure modes:
- If table is empty → outputs no items; downstream doesn’t run (expected).
- If statuses include other terminal values not excluded, they’ll keep being polled.
- If many rows returned, you may hit OpenAI rate limits with many status calls per trigger.
- Credential/network errors to Postgres.
Node: Call Batch API to retrieve batch object
- Type / role: HTTP Request to fetch batch status/details from OpenAI.
- Configuration choices:
GET https://api.openai.com/v1/batches/{{ $json.id }}- Auth: OpenAI credential
- Connections: →
If status = completed - Edge cases / failure modes:
- If
$json.idmissing/blank → invalid URL. - 404 if batch id doesn’t exist (row corruption/manual edits).
- 401/403 auth issues.
- If
Block 4 — Branch: Completed vs Not Completed
Overview: If batch is completed and has an output file, download and parse results; otherwise update only the batch status.
Nodes involved:
If status = completedUpdate status(not completed path)Download .jsonl result(completed path)
Node: If status = completed
- Type / role: IF node branching on OpenAI batch object fields.
- Conditions (AND):
{{ $json.status }}equalscompleted{{ $json.output_file_id }}exists
- Outputs:
- True →
Download .jsonl result - False →
Update status
- True →
- Edge cases:
- Some batches could be
completedbut have nooutput_file_iddue to API changes/errors → goes to “false” branch and only updates status. - If OpenAI returns status
failed/expired/cancelled, it will go “false” and be updated, but will also be excluded from future polling by the SQL filter.
- Some batches could be
Node: Update status
- Type / role: Supabase update to persist latest status for non-completed (or completed-without-output) batches.
- Configuration choices:
- Table:
openai_batches - Filter:
id eq {{ $json.id }} - Updates:
updated_at = {{ $now }}batch_status = {{ $json.status }}
- Table:
- Connections: None further (ends that branch).
- Edge cases / failure modes:
- If row does not exist → update affects 0 rows (silent or error depending on node behavior).
$nowformatting: Supabase expects timestamp; n8n typically provides ISO string—ensure column type is compatible (timestamptzis fine).
Node: Download .jsonl result
- Type / role: HTTP Request to download the output file content from OpenAI Files API.
- Configuration choices:
GET https://api.openai.com/v1/files/{{ $json.output_file_id }}/content- Response format:
file(binary) - Auth: OpenAI credential
- Connections: →
.jsonl to base64 - Edge cases / failure modes:
- Output file not ready immediately (rare if status says completed, but possible timing issues) → 404/409.
- Large output file → memory pressure in n8n when handled as binary.
Block 5 — Decode Output, Extract Results, Persist
Overview: Converts the downloaded NDJSON file into text, parses each line, extracts the assistant message text from each response, then writes results back into Supabase.
Nodes involved:
.jsonl to base64Decode base64Update status and resultRetrieval done
Node: .jsonl to base64
- Type / role: Extract From File node (
extractFromFile) converting binary content into a JSON property. - Configuration choices:
- Operation:
binaryToPropery(note the node’s operation name; intended meaning is “binary to property”)
- Operation:
- Inputs/Outputs:
- Input: binary file from
Download .jsonl result - Output: JSON field
dataholding base64 (as used by next node)
- Input: binary file from
- Edge cases / failure modes:
- If binary property name differs from default (usually
data), conversion may fail or produce empty output. - Very large files increase memory usage.
- If binary property name differs from default (usually
Node: Decode base64
- Type / role: Code node parsing the NDJSON batch output and extracting texts.
- Key logic/config:
- Reads base64 from
const base64Content = $input.first().json.data; - Decodes to UTF-8, splits by newline,
JSON.parseper line (skipping invalid lines) - For each parsed line:
- Skips if
line.errorexists orline.response.status_code !== 200 - Extracts
line.response.body.output - Finds element where
type === 'message' - Extracts
messageOutput.content[0].text
- Skips if
- Counts
parsedanderrors - Pulls batch metadata from earlier node:
const batch = $('If status = completed').first().json;
- Outputs:
id,status,output_file_id,result(array of texts),parsed,errors
- Reads base64 from
- Connections: →
Update status and result - Version notes: Code node v2; uses
Bufferand the$('Node Name')selector. - Edge cases / failure modes:
- If OpenAI output schema differs (e.g., content is not
[ {text: ...} ]) → extraction yields empty results and increments errors. - If multiple message parts exist, only the first
content[0].textis taken. - If
$('If status = completed')has no item (unexpected execution path),.first()will throw. - NDJSON lines that fail JSON parsing are dropped.
- If OpenAI output schema differs (e.g., content is not
Node: Update status and result
- Type / role: Supabase update to store final status and extracted results.
- Configuration choices:
- Filter:
id eq {{ $json.id }} - Updates:
updated_at = {{ $now }}batch_status = {{ $json.status }}output_file_id = {{ $json.output_file_id }}result = {{ $json.result }}
- Filter:
- Connections: →
Retrieval done - Edge cases / failure modes:
resultcolumn type must accept an array (per sticky note:text[]). If set astext/JSON incorrectly, update fails.- Large arrays may hit row size or request limits.
Node: Retrieval done
- Type / role: NoOp node marking end of retrieval phase.
- Connections: From
Update status and result.
3. Summary Table
| Node Name | Node Type | Functional Role | Input Node(s) | Output Node(s) | Sticky Note |
|---|---|---|---|---|---|
| Start (mock data) | Manual Trigger | Entry point for submit phase with sample inputs | — | Convert to batch requests in .jsonl | ## 📤 Phase 1: Submit Batch |
| Convert to batch requests in .jsonl | Code | Build NDJSON batch file and output as binary | Start (mock data) | Call Files API | ## 📤 Phase 1: Submit Batch |
| Call Files API | HTTP Request | Upload .jsonl file to OpenAI Files API |
Convert to batch requests in .jsonl | Call Batch API | ## 📤 Phase 1: Submit Batch |
| Call Batch API | HTTP Request | Create OpenAI batch job | Call Files API | Create a row in batch table | ## 📤 Phase 1: Submit Batch |
| Create a row in batch table | Supabase | Insert batch record into openai_batches |
Call Batch API | Submission done | ## 📤 Phase 1: Submit Batch |
| Submission done | NoOp | End marker for submission | Create a row in batch table | — | ## 📤 Phase 1: Submit Batch |
| Cron Job (5 mins) | Schedule Trigger | Entry point for polling phase | — | Get the earliest uncompleted batch | ## 📥 Phase 2: Poll & Retrieve (cron) |
| Get the earliest uncompleted batch | Postgres | Select not-terminal batches ordered by created time | Cron Job (5 mins) | Call Batch API to retrieve batch object | ## 📥 Phase 2: Poll & Retrieve (cron) |
| Call Batch API to retrieve batch object | HTTP Request | Fetch batch status/details from OpenAI | Get the earliest uncompleted batch | If status = completed | ## 📥 Phase 2: Poll & Retrieve (cron) |
| If status = completed | IF | Branch completed vs not completed | Call Batch API to retrieve batch object | Download .jsonl result (true), Update status (false) | ## 📥 Phase 2: Poll & Retrieve (cron) |
| Update status | Supabase | Update status for non-completed batch | If status = completed (false) | — | ## 📥 Phase 2: Poll & Retrieve (cron) |
| Download .jsonl result | HTTP Request | Download output file content (binary) | If status = completed (true) | .jsonl to base64 | ## 📥 Phase 2: Poll & Retrieve (cron) |
| .jsonl to base64 | Extract From File | Convert downloaded binary to base64 in JSON | Download .jsonl result | Decode base64 | ## 📥 Phase 2: Poll & Retrieve (cron) |
| Decode base64 | Code | Parse NDJSON and extract response texts | .jsonl to base64 | Update status and result | ## 📥 Phase 2: Poll & Retrieve (cron) |
| Update status and result | Supabase | Persist final status + output file id + results array | Decode base64 | Retrieval done | ## 📥 Phase 2: Poll & Retrieve (cron) |
| Retrieval done | NoOp | End marker for retrieval | Update status and result | — | ## 📥 Phase 2: Poll & Retrieve (cron) |
Global sticky note content applicable to the whole workflow (context):
“OpenAI Batch API (FIFO with Supabase / Postgres)” note describing table schema, credentials, customization, and FIFO LIMIT 1 recommendation.
4. Reproducing the Workflow from Scratch
-
Create database table in Supabase (and ensure Postgres access):
- Table name:
openai_batches - Columns:
id(text, primary key)batch_status(text, not null)output_file_id(text, nullable)result(text[], nullable)created_at(timestamptz, defaultnow())updated_at(timestamptz, nullable)
- Table name:
-
Create/Open credentials in n8n:
- OpenAI API credential (used by all HTTP Request nodes to
api.openai.com) - Supabase API credential (used by Supabase nodes)
- Postgres credential pointing to the Supabase Postgres database (used by Postgres node)
- OpenAI API credential (used by all HTTP Request nodes to
-
Phase 1 nodes (Submit):
-
Add Manual Trigger named
Start (mock data).- Provide test pinned data (optional) with:
systemPrompt(string)inputs(array of strings)
- Provide test pinned data (optional) with:
-
Add Code node named
Convert to batch requests in .jsonl.- Paste logic to:
- Convert
inputs+systemPromptinto NDJSON lines targeting/v1/responses - Output as
binary.datawith base64 content and filenamebatch_input.jsonl
- Convert
- Set model (e.g.,
gpt-5-mini) in the code.
- Paste logic to:
-
Add HTTP Request node named
Call Files API.- Method:
POST - URL:
https://api.openai.com/v1/files - Authentication: OpenAI predefined credential
- Body:
multipart/form-data- Field
purpose=batch - Field
file= binary from input, property namedata
- Field
- Method:
-
Add HTTP Request node named
Call Batch API.- Method:
POST - URL:
https://api.openai.com/v1/batches - Authentication: OpenAI credential
- Body: JSON:
input_file_id= expression{{ $json.id }}endpoint="/v1/responses"completion_window="24h"
- Method:
-
Add Supabase node named
Create a row in batch table.- Operation: Insert/Create row
- Table:
openai_batches - Fields:
id={{ $json.id }}batch_status={{ $json.status }}
-
Add NoOp node
Submission done. -
Connect Phase 1:
Start (mock data)→Convert to batch requests in .jsonl→Call Files API→Call Batch API→Create a row in batch table→Submission done
-
-
Phase 2 nodes (Poll & Retrieve):
-
Add Schedule Trigger named
Cron Job (5 mins).- Configure interval to every 5 minutes (ensure the value is actually 5 in the UI).
-
Add Postgres node named
Get the earliest uncompleted batch.- Operation: Select from
public.openai_batches - Filter: exclude terminal statuses:
batch_status != completedbatch_status != failedbatch_status != expiredbatch_status != cancelled
- Sort by
created_atascending - For strict FIFO, set Limit = 1 (recommended; otherwise it will process all matching rows).
- Operation: Select from
-
Add HTTP Request node named
Call Batch API to retrieve batch object.- Method:
GET - URL:
https://api.openai.com/v1/batches/{{ $json.id }} - Authentication: OpenAI credential
- Method:
-
Add IF node named
If status = completed.- Condition 1:
{{ $json.status }}equalscompleted - Condition 2:
{{ $json.output_file_id }}exists - Combine with AND
- Condition 1:
-
False branch (not completed): add Supabase node
Update status- Operation: Update
- Table:
openai_batches - Filter:
id eq {{ $json.id }} - Fields:
updated_at = {{ $now }}batch_status = {{ $json.status }}
-
True branch (completed): add HTTP Request node
Download .jsonl result- Method:
GET - URL:
https://api.openai.com/v1/files/{{ $json.output_file_id }}/content - Authentication: OpenAI credential
- Response: File (binary)
- Method:
-
Add Extract From File node named
.jsonl to base64- Operation: “binary to property”
- Keep default binary property name
dataunless you changed it.
-
Add Code node
Decode base64- Decode base64 to string
- Parse NDJSON lines
- Extract
messageoutput text from each line - Also reference the completed batch object via
$('If status = completed').first().json
-
Add Supabase node
Update status and result- Operation: Update
- Table:
openai_batches - Filter:
id eq {{ $json.id }} - Fields:
updated_at = {{ $now }}batch_status = {{ $json.status }}output_file_id = {{ $json.output_file_id }}result = {{ $json.result }}
-
Add NoOp node
Retrieval done. -
Connect Phase 2:
Cron Job (5 mins)→Get the earliest uncompleted batch→Call Batch API to retrieve batch object→If status = completed
- True:
Download .jsonl result→.jsonl to base64→Decode base64→Update status and result→Retrieval done - False:
Update status
-
5. General Notes & Resources
| Note Content | Context or Link |
|---|---|
OpenAI Batch API (FIFO with Supabase / Postgres): submit .jsonl file, create batch, store status in openai_batches, cron polls earliest uncompleted, downloads output and stores results. |
Workflow design note (global) |
Supabase table schema required: id (text PK), batch_status (text), output_file_id (text nullable), result (text[] nullable), created_at default now(), updated_at nullable. |
Database prerequisite |
Customization: change model in “Convert to batch requests…” (currently gpt-5-mini), adjust cron interval, and for strict FIFO set Postgres select to LIMIT 1. |
Operational tuning |
disclaimer Le texte fourni provient exclusivement d’un workflow automatisé réalisé avec n8n, un outil d’intégration et d’automatisation. Ce traitement respecte strictement les politiques de contenu en vigueur et ne contient aucun élément illégal, offensant ou protégé. Toutes les données manipulées sont légales et publiques.