Batch extractors¶
The batch extractors handle the asynchronous batch API path. Use these when you have thousands to millions of rows and can tolerate up to 24 hours of turnaround in exchange for roughly 50% lower per-token cost. Both classes follow the same four-step lifecycle: build requests, submit, poll status, retrieve results as a DataFrame.
OpenAIBatchExtractor uses the OpenAI Batch API (JSONL file upload). AnthropicBatchExtractor
uses Anthropic Message Batches (JSON body POST, no file upload). All intermediate files
are written to disk under batch_root_dir/job_id/ so you can inspect what was sent and
what came back.
OpenAIBatchExtractor¶
Submit, monitor, and retrieve OpenAI Batch API jobs from a DataFrame.
Handles the full lifecycle: 1. create_batch_jsonl: builds JSONL input files from a DataFrame. 2. submit_batches: uploads and submits to OpenAI Batch API. 3. check_batch_status: polls until completion (optionally continuous). 4. retrieve_results_as_dataframe: assembles results into a DataFrame.
Directory layout created under batch_root_dir/job_id/: batch_input/ -- JSONL files ready for submission batch_output/ -- submission manifests, result files, error logs
Attributes:
| Name | Type | Description |
|---|---|---|
batch_root_dir |
Root directory for all batch jobs. |
|
max_requests_per_batch |
Cap on requests per individual batch file. |
|
client |
An openai.OpenAI client instance. |
__init__(batch_root_dir: str = 'batch_jobs', max_requests_per_batch: int = 5000, api_key: str | None = None) -> None
¶
Initialise the extractor.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
batch_root_dir
|
str
|
Root directory for batch jobs. Created if absent. |
'batch_jobs'
|
max_requests_per_batch
|
int
|
Max requests per JSONL file. The output file cannot exceed 5 GB and the input file cannot exceed 200 MB; reduce this value if you hit those limits. |
5000
|
api_key
|
str | None
|
Optional OpenAI API key override. |
None
|
create_batch_jsonl(dataframe: pd.DataFrame, id_col: str, text_col: str, prompt: str, job_id: str, model_name: str, temperature: float = 0.0, chunk_size: int = 5, exclude_processed: bool = True, schema_dict: dict | None = None) -> None
¶
Build JSONL batch input files from a DataFrame.
*** P0 BUG FIX *** The original gpt_funcs.py:303 set role="assistant" for the system prompt. This method uses role="system" as required by the OpenAI Chat Completions API. Sending the system prompt as an assistant turn degrades instruction-following and prevents system-prompt caching. See code review P0-1.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
dataframe
|
DataFrame
|
Input DataFrame. Must contain id_col and text_col. |
required |
id_col
|
str
|
Column name for row identifiers. |
required |
text_col
|
str
|
Column name for text content. |
required |
prompt
|
str
|
System prompt text. Passed as role="system" (P0 fix). |
required |
job_id
|
str
|
Unique job identifier. Used as the subdirectory name. |
required |
model_name
|
str
|
OpenAI model identifier (e.g., "gpt-4.1-mini"). |
required |
temperature
|
float
|
Sampling temperature. Overridden to 1 for o1/o3/gpt-5 models. |
0.0
|
chunk_size
|
int
|
Rows per LLM request chunk. Default 5. |
5
|
exclude_processed
|
bool
|
If True, skip rows already present in batch_output/. |
True
|
schema_dict
|
dict | None
|
Optional JSON schema dict for response_format. If None, uses {"type": "json_object"}. |
None
|
submit_batches(job_id: str) -> None
¶
Upload JSONL files and submit each as an OpenAI Batch job.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job_id
|
str
|
The job identifier whose batch_input/ files to submit. |
required |
check_batch_status(job_id: str, continuous: bool = False, interval: int = 300) -> None
¶
Poll batch status and download results when complete.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job_id
|
str
|
The job identifier to check. |
required |
continuous
|
bool
|
If True, keep polling until all batches finish. |
False
|
interval
|
int
|
Seconds between polls when continuous=True. Default 300. |
300
|
retrieve_results_as_dataframe(job_id: str) -> Optional[pd.DataFrame]
¶
Parse completed batch result JSONL files into a DataFrame.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job_id
|
str
|
The job identifier whose results to retrieve. |
required |
Returns:
| Type | Description |
|---|---|
Optional[DataFrame]
|
DataFrame of parsed result rows, or None if no results exist yet. |
AnthropicBatchExtractor¶
Submit, monitor, and retrieve Anthropic Message Batches API jobs from a DataFrame.
Lifecycle (parallels GPTBatchJobClassifier but with a different wire format):
create_batch_requests: builds an in-memory list of requests and writes it tobatch_input/requests.jsonfor inspection / reproducibility.submit_batch: posts the request list viaclient.messages.batches.create, saves the submission manifest.check_batch_status: polls withclient.messages.batches.retrieve.retrieve_results_as_dataframe: streams results viaclient.messages.batches.resultsand flattens tool_use outputs into a DataFrame.
Directory layout created under batch_root_dir/job_id/:
batch_input/
requests.json -- serialized list of request dicts
submission.json -- returned batch manifest (id, status, ...)
batch_output/
results.jsonl -- raw streamed results from Anthropic
errors.txt -- per-request error payloads, if any
Attributes:
| Name | Type | Description |
|---|---|---|
batch_root_dir |
Root directory for all batch jobs. |
|
client |
An |
__init__(batch_root_dir: str = 'anthropic_batch_jobs', api_key: str | None = None) -> None
¶
Initialise the extractor.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
batch_root_dir
|
str
|
Root directory for batch jobs. Created if absent. |
'anthropic_batch_jobs'
|
api_key
|
str | None
|
Optional Anthropic API key override; otherwise read from
the standard |
None
|
create_batch_requests(dataframe: pd.DataFrame, id_col: str, text_col: str, prompt: str, job_id: str, model_name: str, chunk_size: int = 5, schema_dict: dict[str, Any] | None = None, tool_name: str = 'extract_results', max_tokens: int = 32000) -> Path
¶
Build the request list and write it to batch_input/requests.json.
The system prompt is passed as a list-block with
cache_control={"type": "ephemeral"} so the long prompt is cached
across the many chunk requests.
If schema_dict is provided, the request uses tool_use with that
schema as the tool's input_schema, forcing structured output. If
schema_dict is None, the request omits the tool definition and the
model returns free-form text.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
dataframe
|
DataFrame
|
Input DataFrame. Must contain id_col and text_col. |
required |
id_col
|
str
|
Column name for row identifiers. |
required |
text_col
|
str
|
Column name for text content. |
required |
prompt
|
str
|
System prompt text (cached via cache_control). |
required |
job_id
|
str
|
Unique job identifier. Used as the subdirectory name. |
required |
model_name
|
str
|
Anthropic model identifier (e.g., "claude-haiku-4-5-20251001"). |
required |
chunk_size
|
int
|
Rows per request chunk. Default 5. |
5
|
schema_dict
|
dict[str, Any] | None
|
Optional JSON schema dict describing the tool's |
None
|
tool_name
|
str
|
Name of the tool when schema_dict is provided. Default "extract_results". |
'extract_results'
|
max_tokens
|
int
|
Max tokens per response. Default 32000 (well within Claude 4.x model output limits of 64K). |
32000
|
Returns:
| Type | Description |
|---|---|
Path
|
Path to the written |
submit_batch(job_id: str) -> str
¶
Submit the prebuilt request list to Anthropic Message Batches.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job_id
|
str
|
The job identifier whose |
required |
Returns:
| Type | Description |
|---|---|
str
|
The Anthropic batch ID ( |
check_batch_status(job_id: str, continuous: bool = False, interval: int = 30, timeout: int | None = None) -> str
¶
Poll batch status. Returns the terminal status string.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job_id
|
str
|
The job identifier. |
required |
continuous
|
bool
|
If True, keep polling until the batch ends. |
False
|
interval
|
int
|
Seconds between polls when continuous=True. Default 30. |
30
|
timeout
|
int | None
|
Optional upper bound on polling time in seconds. |
None
|
Returns:
| Type | Description |
|---|---|
str
|
The terminal |
str
|
"in_progress", "canceling", "ended". |
Raises:
| Type | Description |
|---|---|
TimeoutError
|
If |
FileNotFoundError
|
If no submission manifest exists for this job. |
retrieve_results_as_dataframe(job_id: str, tool_name: str = 'extract_results') -> pd.DataFrame | None
¶
Stream results and flatten tool_use outputs into a DataFrame.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job_id
|
str
|
The job identifier. |
required |
tool_name
|
str
|
Tool name used at submission time. Must match whatever
was passed to |
'extract_results'
|
Returns:
| Type | Description |
|---|---|
DataFrame | None
|
DataFrame of parsed result rows, or None if no results yet. |
Note
When the original request had no tool (free-form text mode), this
method falls back to writing the raw assistant text into the
returned DataFrame's text column.
See also¶
- Run a batch job: end-to-end walkthrough for both providers with on-disk artifact inspection.
- Architecture: where the batch path fits in the library.