GenAI Information Extraction for DataFrames¶
A general-purpose library for prompt-based information extraction over DataFrames, with concurrent and batch execution against OpenAI, Anthropic, and models on OpenRouter.
Cite as: Li, Mai, Shen, Yang, Zhang (2026), RFS. Full citation and BibTeX at the bottom of this page.
Install¶
pip install -U lmsyz_genai_ie_rfs
Set at least one provider key (or drop them in a .env file next to your notebook):
import os
os.environ["OPENAI_API_KEY"] = "sk-..."
# os.environ["ANTHROPIC_API_KEY"] = "sk-ant-..."
# os.environ["OPENROUTER_API_KEY"] = "sk-or-..."
Quickstart: entity + relation extraction¶
Three short news sentences in:
| id | text |
|---|---|
| 1 | Apple CEO Tim Cook announced the iPhone 17 at WWDC in June 2025. |
| 2 | Tesla acquired SolarCity in 2016 for $2.6 billion to enter the solar market. |
| 3 | Pfizer's decision to spin off its consumer health unit was driven by activist pressure from Trian Partners. |
Structured DataFrame out:
| input_id | entities | causal_triples | sentiment |
|---|---|---|---|
| 1 | [{Tim Cook, PERSON}, {Apple, ORG}, {iPhone 17, PRODUCT}, {WWDC, EVENT}, {June 2025, DATE}] |
[] |
neutral |
| 2 | [{Tesla, ORG}, {SolarCity, ORG}, {2016, DATE}, {$2.6 billion, MONEY}] |
[[acquisition, enabled, market entry]] |
positive |
| 3 | [{Pfizer, ORG}, {Trian Partners, ORG}] |
[[activist pressure, drove, spin-off]] |
neutral |
One function call:
import os
import pandas as pd
from lmsyz_genai_ie_rfs import extract_df
df = pd.DataFrame({
"id": [1, 2, 3],
"text": [
"Apple CEO Tim Cook announced the iPhone 17 at WWDC in June 2025.",
"Tesla's $2.6 billion acquisition of SolarCity in 2016 enabled its entry into the solar market.",
"Pfizer's decision to spin off its consumer health unit was driven by activist pressure from Trian Partners.",
],
})
prompt = """
You are an information-extraction assistant. For each input row, analyze the text and extract structured information.
Step-by-step instructions:
1. input_id: Copy the input_id from the row verbatim.
2. entities: List every named entity mentioned in the text. For each entity give:
- name: the surface form as it appears in the text.
- type: one of "PERSON", "ORG", "PRODUCT", "DATE", "MONEY".
3. causal_triples: If the text explicitly states a cause and effect, list each as a
three-element array ["cause", "relation", "effect"]. If there is no explicit
causation, return an empty list []. All elements should be concisely summarized, in three words or less.
4. sentiment: One of "positive", "neutral", or "negative".
Return a JSON object with this EXACT structure:
{
"all_results": [
{
"input_id": "1",
"entities": [
{"name": "Apple", "type": "ORG"},
{"name": "Tim Cook", "type": "PERSON"}
],
"causal_triples": [[cause_1, relation_1, effect_1], [cause_2, relation_2, effect_2], ...],
"sentiment": "positive/neutral/negative"
}
]
}
Do not include any fields besides input_id, entities, causal_triples, and sentiment.
"""
out = extract_df(
df, prompt=prompt,
chunk_size=5,
max_workers=20,
backend="openai",
model="gpt-4.1-mini",
cache_path="demo.sqlite",
id_col="id", text_col="text",
api_key=os.environ["OPENAI_API_KEY"],
)
print(out)
entities = out[['input_id', 'entities']].explode("entities")
print(entities)
Save:
out.to_csv("extraction.csv", index=False)
Tip: No prompt yet? draft_prompt(goal="...") returns a starter you can edit. See draft_prompt below.
Speed up: chunk_size and max_workers¶
max_workers sets the size of the threadpool that issues API calls in parallel. chunk_size sets how many DataFrame rows are packed into each call (one shared system prompt, chunk_size user inputs). Combining them can speed up execution by 100× compared to a naive for loop with one row per call.
out = extract_df(
df, prompt=...,
chunk_size=5, # rows packed into one API call (amortizes the system prompt)
max_workers=20, # API calls in flight at once (a thread pool)
backend="openai", model="gpt-4.1-mini",
cache_path="big.sqlite",
)
At ~1 second per call, 100,000 rows:
| Approach | API calls | Wall-clock |
|---|---|---|
One row per call, serial (a plain for loop) |
100,000 | ~28 hours |
One row per call, max_workers=20 |
100,000 | ~83 minutes |
chunk_size=5, max_workers=20 |
20,000 | ~17 minutes |
Tuning: start chunk_size=5, max_workers=20. Raise max_workers until the log shows rate-limit retries (OpenAI tier-3 handles 60–100; Anthropic tier-2 handles 20–50).
For ~50% lower per-token cost in exchange for asynchronous execution, see the batch API below.
The results cache DB (cache_path)¶
API calls cost money and time. The cache exists so that an interrupted, edited, or replayed run does not re-send rows that already returned.
cache_path is required. It is the path to a SQLite file. Each completed row is written to it as soon as it returns from the model. The file is not auto-deleted.
On rerun with the same cache_path, rows already in the cache are skipped; only missing rows go to the model.
The schema is three columns:
results(row_id TEXT PRIMARY KEY, json_result TEXT, prompt_hash TEXT)
Open in any SQLite browser or sqlite3 my_experiment.sqlite to inspect. Force a full re-run: fresh=True.
What if I change the prompt?¶
Each cached row is stamped with sha256(prompt)[:16]. On rerun, rows whose stamp matches the current prompt are skipped; rows stamped with a different prompt are re-executed and the old row is overwritten (INSERT OR REPLACE on row_id).
Three escape hatches:
- Compare prompts side by side: use a different
cache_pathfor each prompt version. - Fixed a typo and want to keep the cached rows: pass
ignore_prompt_hash=True. - Wipe and redo:
fresh=True, or delete the.sqlitefile.
Batch API: ~50% cheaper¶
OpenAI and Anthropic both expose a Batch API at approximately 50% of the per-token cost. Execution is asynchronous with a stated 24-hour SLA (typical completion is faster). Lifecycle: build request files, submit, poll status, retrieve.
OpenAI batch¶
from lmsyz_genai_ie_rfs import OpenAIBatchExtractor
ext = OpenAIBatchExtractor(batch_root_dir="my_job/")
ext.create_batch_jsonl( # 1. write JSONL input files
dataframe=df, id_col="id", text_col="text",
prompt=prompt, job_id="my_job",
model_name="gpt-4.1-mini",
schema_dict=None, # or a strict json_schema dict
)
ext.submit_batches("my_job") # 2. upload + submit
ext.check_batch_status("my_job", # 3. poll
continuous=True, interval=60)
out = ext.retrieve_results_as_dataframe("my_job") # 4. results as DataFrame
Anthropic batch¶
Same lifecycle, different wire format (a JSON request body, not a JSONL file upload):
from lmsyz_genai_ie_rfs import AnthropicBatchExtractor
ext = AnthropicBatchExtractor(batch_root_dir="my_job/")
ext.create_batch_requests(..., schema_dict=my_schema)
ext.submit_batch("my_job")
ext.check_batch_status("my_job", continuous=True)
out = ext.retrieve_results_as_dataframe("my_job")
if out is None:
print("Batch not finished yet; check status and retry.")
else:
print(out.head())
All intermediate files (JSONL input, submission manifest, raw results) are written under batch_root_dir/<job_id>/ and can be inspected directly.
Model providers¶
Two native backends (OpenAI, Anthropic), plus any OpenAI-compatible endpoint reachable by setting base_url=. The same extract_df call shape works for all of them; only model, base_url, and api_key change.
To switch to Anthropic, change backend and model:
out = extract_df(df, prompt=prompt, cache_path="run.sqlite",
backend="anthropic", model="claude-3-5-sonnet-20241022")
| Capability | OpenAI | Anthropic | OpenAI-compatible (OpenRouter, Gemini, Together, vLLM, ...) |
|---|---|---|---|
| Concurrent, no schema | yes: json_object |
yes: plain text (parsed tolerantly) | model-dependent |
| Concurrent, with schema | yes: structured outputs | yes: tool_use + prompt caching |
model-dependent (gateway forwards response_format) |
| Batch API (~50% cheaper) | yes: OpenAIBatchExtractor |
yes: AnthropicBatchExtractor |
use the gateway's native batch API |
| Long-prompt caching | automatic (1024+ tokens) | explicit cache_control on system |
upstream-model-dependent |
OpenRouter: one key, hundreds of models¶
OpenRouter is a gateway: a single key + base_url puts you in front of Llama, Mistral, DeepSeek, Qwen, Gemini, Claude, and many more. Swap model= for whichever you want.
out = extract_df(
df, prompt=my_prompt,
backend="openai",
base_url="https://openrouter.ai/api/v1",
api_key=os.environ["OPENROUTER_API_KEY"],
model="qwen/qwen-2.5-72b-instruct", # Qwen
# model="deepseek/deepseek-chat", # DeepSeek
# model="meta-llama/llama-3.3-70b-instruct", # Llama
# model="mistralai/mistral-large", # Mistral
# model="anthropic/claude-haiku-4.5", # Claude (proxied)
cache_path="openrouter.sqlite",
)
JSON-mode support varies by model on OpenRouter. If a model returns malformed JSON, switch to one that supports response_format, or pass a strict schema= so the gateway routes via JSON-schema mode.
Gemini direct¶
out = extract_df(
df, prompt=my_prompt,
backend="openai", model="gemini-2.5-flash",
base_url="https://generativelanguage.googleapis.com/v1beta/openai/",
api_key=os.environ["GEMINI_API_KEY"],
cache_path="gemini.sqlite",
)
Drafting a prompt with draft_prompt¶
Pass a sentence describing what you want to extract; draft_prompt returns a candidate prompt in the package's standard structure (a numbered field list followed by a {"all_results": [...]} JSON envelope) for you to edit:
import os
from lmsyz_genai_ie_rfs import draft_prompt
prompt = draft_prompt(
goal="Tag short product reviews with sentiment (positive/neutral/negative) and a confidence score 0-1.",
api_key=os.environ["OPENAI_API_KEY"],
)
Prints and returns:
You are an information-extraction assistant. For each input row, analyze the product review and extract structured information.
Step-by-step instructions:
1. input_id: Copy the input_id from the row verbatim.
2. sentiment: Tag the review's overall sentiment as one of "positive", "neutral", or "negative".
3. confidence: Provide a confidence score for the sentiment tag as a float between 0 and 1 inclusive.
Return a JSON object with this EXACT structure:
{
"all_results": [
{
"input_id": "42",
"sentiment": "positive",
"confidence": 0.87
}
]
}
Do not include any fields besides input_id, sentiment, and confidence.
Read the result, tighten enums, add any domain-specific instructions, then pass it to extract_df(prompt=prompt, ...). Defaults are backend="openai", model="gpt-4.1-mini"; pass backend="anthropic" to use Claude instead. The temperature is fixed at 0, so the same goal reproduces the same starting prompt.
Optional: a JSON schema file¶
By default, output structure is enforced only by the prompt; the model returns a JSON object whose shape it infers from the instructions.
Pass schema= to enforce structure at the API layer. OpenAI consumes it as response_format={"type": "json_schema", ...}; Anthropic consumes it as the input_schema of a forced tool_use.
out = extract_df(
df, prompt=my_prompt, schema="my_schema.json",
backend="openai", model="gpt-4.1-mini",
cache_path="strict.sqlite",
)
The file contains a standard OpenAI response_format object. Here is one matching the Quickstart task above:
{
"type": "json_schema",
"json_schema": {
"name": "news_extraction",
"strict": true,
"schema": {
"type": "object",
"additionalProperties": false,
"required": ["all_results"],
"properties": {
"all_results": {
"type": "array",
"items": {
"type": "object",
"additionalProperties": false,
"required": ["input_id", "entities", "causal_triples", "sentiment"],
"properties": {
"input_id": {"type": "string"},
"entities": {
"type": "array",
"items": {
"type": "object",
"additionalProperties": false,
"required": ["name", "type"],
"properties": {
"name": {"type": "string"},
"type": {"type": "string", "enum": ["PERSON", "ORG", "PRODUCT", "DATE", "MONEY", "EVENT"]}
}
}
},
"causal_triples": {
"type": "array",
"items": {
"type": "array",
"minItems": 3, "maxItems": 3,
"items": {"type": "string"}
}
},
"sentiment": {"type": "string", "enum": ["positive", "neutral", "negative"]}
}
}
}
}
}
}
}
The same file works on Anthropic: the library extracts the inner schema object and passes it as the input_schema of a tool_use.
schema= accepts a file path, a dict with the contents above, or None (the default).
All knobs¶
extract_df(
df,
prompt=..., # required
cache_path="path.sqlite", # required: results DB on disk
backend="openai", # or "anthropic"
model="gpt-4.1-mini", # required
schema=None, # optional: path to JSON schema file, dict, or None
id_col="id",
text_col="text",
chunk_size=5, # rows per API call; 5-20 is typical
max_workers=20, # parallel threads; tune to your rate limit
fresh=False, # True to ignore prior DB contents
ignore_prompt_hash=False, # True to reuse rows produced by a different prompt
api_key=None, # overrides .env / environment
base_url=None, # for OpenRouter / Gemini compat
client=None, # BYO preconfigured SDK client
)
Failed API calls are retried automatically (tenacity, exponential backoff, up to 5 attempts) on RateLimitError and APIError. If a chunk still fails after retries, the failure is logged and the chunk's rows are omitted from the returned DataFrame.
draft_prompt knobs¶
draft_prompt(
goal, # required: plain-English description of what to extract
backend="openai", # or "anthropic"
model="gpt-4.1-mini", # model for the meta-call; any chat model works
api_key=None, # overrides .env / environment
base_url=None, # for OpenRouter / Gemini compat
print_prompt=True, # print result to stdout (useful in notebooks)
)
draft_prompt returns the generated prompt string. Set print_prompt=False to suppress
the automatic print and capture only the return value.
FAQ¶
Why does my DataFrame have fewer rows than I expect? A chunk failed after retries. Check the log for the stack trace.
How do I restart a batch job when I lost the job_id? Look under batch_root_dir/. The directories ARE the job_ids.
Is there retry logic? Yes: tenacity, 5 attempts with exponential backoff 2-30s for RateLimitError and APIError.
What about nested lists and dicts in the output? Flatten with df.explode("entities") (each list item becomes its own row), followed by pd.json_normalize(df["entities"]) (each dict's keys become columns). CSV stringifies nested fields; save as JSONL (df.to_json(..., orient="records", lines=True)) for clean round-tripping.
Citation¶
If you find this library useful in your research, please cite:
Li, Kai, Feng Mai, Rui Shen, Chelsea Yang, and Tengfei Zhang (2026). "Dissecting Corporate Culture Using Generative AI." Review of Financial Studies 39(1):253–296. doi.org/10.1093/rfs/hhaf081.
@article{li2026dissecting,
title = {Dissecting Corporate Culture Using Generative {AI}},
author = {Li, Kai and Mai, Feng and Shen, Rui and Yang, Chelsea and Zhang, Tengfei},
journal = {Review of Financial Studies},
volume = {39},
number = {1},
pages = {253--296},
year = {2026},
doi = {10.1093/rfs/hhaf081},
}
GitHub¶
Source code: https://github.com/maifeng/lmsyz_genai_ie_rfs
License¶
MIT.
Next in the documentation¶
- Concepts goes deeper on architecture, prompts and schemas, and the results database.
- How-to has task-oriented recipes: resume, change prompt safely, switch providers, batch jobs, inspect results, troubleshooting.
- Reference is the full API: extract_df, draft_prompt, batch extractors, cache and chunking internals, settings.
- Explanation covers the "why": design decisions.