Skip to content

Batch extractors

The batch extractors handle the asynchronous batch API path. Use these when you have thousands to millions of rows and can tolerate up to 24 hours of turnaround in exchange for roughly 50% lower per-token cost. Both classes follow the same four-step lifecycle: build requests, submit, poll status, retrieve results as a DataFrame.

OpenAIBatchExtractor uses the OpenAI Batch API (JSONL file upload). AnthropicBatchExtractor uses Anthropic Message Batches (JSON body POST, no file upload). All intermediate files are written to disk under batch_root_dir/job_id/ so you can inspect what was sent and what came back.

OpenAIBatchExtractor

Submit, monitor, and retrieve OpenAI Batch API jobs from a DataFrame.

Handles the full lifecycle: 1. create_batch_jsonl: builds JSONL input files from a DataFrame. 2. submit_batches: uploads and submits to OpenAI Batch API. 3. check_batch_status: polls until completion (optionally continuous). 4. retrieve_results_as_dataframe: assembles results into a DataFrame.

Directory layout created under batch_root_dir/job_id/: batch_input/ -- JSONL files ready for submission batch_output/ -- submission manifests, result files, error logs

Attributes:

Name Type Description
batch_root_dir

Root directory for all batch jobs.

max_requests_per_batch

Cap on requests per individual batch file.

client

An openai.OpenAI client instance.

__init__(batch_root_dir: str = 'batch_jobs', max_requests_per_batch: int = 5000, api_key: str | None = None) -> None

Initialise the extractor.

Parameters:

Name Type Description Default
batch_root_dir str

Root directory for batch jobs. Created if absent.

'batch_jobs'
max_requests_per_batch int

Max requests per JSONL file. The output file cannot exceed 5 GB and the input file cannot exceed 200 MB; reduce this value if you hit those limits.

5000
api_key str | None

Optional OpenAI API key override.

None

create_batch_jsonl(dataframe: pd.DataFrame, id_col: str, text_col: str, prompt: str, job_id: str, model_name: str, temperature: float = 0.0, chunk_size: int = 5, exclude_processed: bool = True, schema_dict: dict | None = None) -> None

Build JSONL batch input files from a DataFrame.

*** P0 BUG FIX *** The original gpt_funcs.py:303 set role="assistant" for the system prompt. This method uses role="system" as required by the OpenAI Chat Completions API. Sending the system prompt as an assistant turn degrades instruction-following and prevents system-prompt caching. See code review P0-1.

Parameters:

Name Type Description Default
dataframe DataFrame

Input DataFrame. Must contain id_col and text_col.

required
id_col str

Column name for row identifiers.

required
text_col str

Column name for text content.

required
prompt str

System prompt text. Passed as role="system" (P0 fix).

required
job_id str

Unique job identifier. Used as the subdirectory name.

required
model_name str

OpenAI model identifier (e.g., "gpt-4.1-mini").

required
temperature float

Sampling temperature. Overridden to 1 for o1/o3/gpt-5 models.

0.0
chunk_size int

Rows per LLM request chunk. Default 5.

5
exclude_processed bool

If True, skip rows already present in batch_output/.

True
schema_dict dict | None

Optional JSON schema dict for response_format. If None, uses {"type": "json_object"}.

None

submit_batches(job_id: str) -> None

Upload JSONL files and submit each as an OpenAI Batch job.

Parameters:

Name Type Description Default
job_id str

The job identifier whose batch_input/ files to submit.

required

check_batch_status(job_id: str, continuous: bool = False, interval: int = 300) -> None

Poll batch status and download results when complete.

Parameters:

Name Type Description Default
job_id str

The job identifier to check.

required
continuous bool

If True, keep polling until all batches finish.

False
interval int

Seconds between polls when continuous=True. Default 300.

300

retrieve_results_as_dataframe(job_id: str) -> Optional[pd.DataFrame]

Parse completed batch result JSONL files into a DataFrame.

Parameters:

Name Type Description Default
job_id str

The job identifier whose results to retrieve.

required

Returns:

Type Description
Optional[DataFrame]

DataFrame of parsed result rows, or None if no results exist yet.

AnthropicBatchExtractor

Submit, monitor, and retrieve Anthropic Message Batches API jobs from a DataFrame.

Lifecycle (parallels GPTBatchJobClassifier but with a different wire format):

  1. create_batch_requests: builds an in-memory list of requests and writes it to batch_input/requests.json for inspection / reproducibility.
  2. submit_batch: posts the request list via client.messages.batches.create, saves the submission manifest.
  3. check_batch_status: polls with client.messages.batches.retrieve.
  4. retrieve_results_as_dataframe: streams results via client.messages.batches.results and flattens tool_use outputs into a DataFrame.

Directory layout created under batch_root_dir/job_id/:

batch_input/
    requests.json           -- serialized list of request dicts
    submission.json         -- returned batch manifest (id, status, ...)
batch_output/
    results.jsonl           -- raw streamed results from Anthropic
    errors.txt              -- per-request error payloads, if any

Attributes:

Name Type Description
batch_root_dir

Root directory for all batch jobs.

client

An anthropic.Anthropic client instance.

__init__(batch_root_dir: str = 'anthropic_batch_jobs', api_key: str | None = None) -> None

Initialise the extractor.

Parameters:

Name Type Description Default
batch_root_dir str

Root directory for batch jobs. Created if absent.

'anthropic_batch_jobs'
api_key str | None

Optional Anthropic API key override; otherwise read from the standard ANTHROPIC_API_KEY environment variable.

None

create_batch_requests(dataframe: pd.DataFrame, id_col: str, text_col: str, prompt: str, job_id: str, model_name: str, chunk_size: int = 5, schema_dict: dict[str, Any] | None = None, tool_name: str = 'extract_results', max_tokens: int = 32000) -> Path

Build the request list and write it to batch_input/requests.json.

The system prompt is passed as a list-block with cache_control={"type": "ephemeral"} so the long prompt is cached across the many chunk requests.

If schema_dict is provided, the request uses tool_use with that schema as the tool's input_schema, forcing structured output. If schema_dict is None, the request omits the tool definition and the model returns free-form text.

Parameters:

Name Type Description Default
dataframe DataFrame

Input DataFrame. Must contain id_col and text_col.

required
id_col str

Column name for row identifiers.

required
text_col str

Column name for text content.

required
prompt str

System prompt text (cached via cache_control).

required
job_id str

Unique job identifier. Used as the subdirectory name.

required
model_name str

Anthropic model identifier (e.g., "claude-haiku-4-5-20251001").

required
chunk_size int

Rows per request chunk. Default 5.

5
schema_dict dict[str, Any] | None

Optional JSON schema dict describing the tool's input_schema. If None, no tool is used and the model returns free-form text.

None
tool_name str

Name of the tool when schema_dict is provided. Default "extract_results".

'extract_results'
max_tokens int

Max tokens per response. Default 32000 (well within Claude 4.x model output limits of 64K).

32000

Returns:

Type Description
Path

Path to the written requests.json file.

submit_batch(job_id: str) -> str

Submit the prebuilt request list to Anthropic Message Batches.

Parameters:

Name Type Description Default
job_id str

The job identifier whose batch_input/requests.json to submit.

required

Returns:

Type Description
str

The Anthropic batch ID (msgbatch_...).

check_batch_status(job_id: str, continuous: bool = False, interval: int = 30, timeout: int | None = None) -> str

Poll batch status. Returns the terminal status string.

Parameters:

Name Type Description Default
job_id str

The job identifier.

required
continuous bool

If True, keep polling until the batch ends.

False
interval int

Seconds between polls when continuous=True. Default 30.

30
timeout int | None

Optional upper bound on polling time in seconds.

None

Returns:

Type Description
str

The terminal processing_status from Anthropic: one of

str

"in_progress", "canceling", "ended".

Raises:

Type Description
TimeoutError

If timeout elapses before the batch ends.

FileNotFoundError

If no submission manifest exists for this job.

retrieve_results_as_dataframe(job_id: str, tool_name: str = 'extract_results') -> pd.DataFrame | None

Stream results and flatten tool_use outputs into a DataFrame.

Parameters:

Name Type Description Default
job_id str

The job identifier.

required
tool_name str

Tool name used at submission time. Must match whatever was passed to create_batch_requests.

'extract_results'

Returns:

Type Description
DataFrame | None

DataFrame of parsed result rows, or None if no results yet.

Note

When the original request had no tool (free-form text mode), this method falls back to writing the raw assistant text into the returned DataFrame's text column.


See also

  • Run a batch job: end-to-end walkthrough for both providers with on-disk artifact inspection.
  • Architecture: where the batch path fits in the library.