Pipeline¶
Pipeline is the end-to-end orchestrator for the Word2Vec dictionary expansion
and scoring workflow. It wraps six stages behind a single class and writes every
intermediate artifact under work_dir, so reruns skip any stage whose outputs
already exist unless force=True is passed.
The six stages:
parse: Phase 1 preprocessing (MWE join, NER mask) via the configured backend.clean: lowercase, strip punctuation, drop stopwords, keep[NER:*]placeholders.phrase: gensimPhrasesstatistical bigram / trigram learning (Phase 2).train: Word2Vec training on the phrase-expanded corpus.expand_dictionary: grow each seed list into a per-dimension dictionary via nearest-neighbor search on the trained vectors.score: compute document-level TF, TFIDF, WFIDF, and optional SIMWEIGHT variants.
The two construction phases are covered in Two-phase preprocessing. The individual stage knobs live on Config.
End-to-end seed-expansion measurement pipeline.
Typical usage::
from lmsy_w2v_rfs import Pipeline, Config
seeds = {
"risk": ["risk", "uncertainty", "volatility"],
"growth": ["growth", "expansion", "scale"],
}
p = Pipeline(
texts=my_texts,
doc_ids=my_ids,
work_dir="runs/demo",
config=Config(seeds=seeds, preprocessor="none"),
)
p.run()
p.show_dictionary(top_k=10)
scores = p.score_df("TFIDF")
Attributes:
| Name | Type | Description |
|---|---|---|
texts |
Raw document strings. |
|
doc_ids |
Matching IDs. |
|
work_dir |
Directory for all intermediate and output files. |
|
config |
Hyperparameters. Required. |
parsed_sents_path: Path
property
¶
Sentence output from Phase 1a, one preprocessed sentence per line.
parsed_ids_path: Path
property
¶
Parallel sentence IDs file; one docID_sentN ID per line.
cleaned_path: Path
property
¶
Cleaned sentences after stopword and punctuation removal.
phrase_corpus_path: Path
property
¶
Sentences after the last gensim Phrases pass.
training_corpus_path: Path
property
¶
Final sentence file fed into Word2Vec.
w2v_path: Path
property
¶
Saved gensim Word2Vec model.
dict_path: Path
property
¶
Expanded per-dimension word list as CSV.
culture_dict: dict[str, list[str]]
property
¶
The expanded dictionary, rank-sorted.
Alias for expanded_dict; kept for compatibility with prose
from the 2021 paper. The returned dict is the SAME object held
on the pipeline; mutating it mutates pipeline state.
expanded_dict: dict[str, list[str]]
property
¶
The expanded dictionary, rank-sorted, theory-agnostic name.
w2v: Word2Vec
property
¶
The trained Word2Vec model.
scores_path(method: ScoringMethod) -> Path
¶
Path to the document-level scores CSV for a given method.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
method
|
ScoringMethod
|
One of |
required |
Returns:
| Type | Description |
|---|---|
Path
|
Expected CSV path under |
parse(*, force: bool = False) -> None
¶
Run Phase 1: preprocess raw documents with the configured backend.
Each sentence is written as one line of space-joined tokens. MWE
groups are joined by _ and named entities are replaced with
[NER:TYPE] placeholders. Sentence IDs go to a parallel file.
If config.mwe_list is set, a static-MWE post-pass runs after
the main preprocessor to catch MWEs the parser missed.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
force
|
bool
|
Rerun even if output files exist. |
False
|
clean(*, force: bool = False) -> None
¶
Lowercase, drop stopwords and punctuation, strip CoreNLP tags.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
force
|
bool
|
Rerun even if output exists. |
False
|
phrase(*, force: bool = False) -> Path
¶
Run gensim Phrases (if enabled) and return the final corpus path.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
force
|
bool
|
Rerun even if output exists. |
False
|
Returns:
| Type | Description |
|---|---|
Path
|
Path to the final phrase-expanded sentence file. |
train(*, force: bool = False) -> Word2Vec
¶
Train (or load) the Word2Vec model.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
force
|
bool
|
Retrain even if |
False
|
Returns:
| Type | Description |
|---|---|
Word2Vec
|
The loaded or trained model. |
expand_dictionary(*, force: bool = False) -> dict[str, list[str]]
¶
Expand seeds into a per-dimension culture dictionary.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
force
|
bool
|
Rebuild even if the dictionary CSV exists. |
False
|
Returns:
| Type | Description |
|---|---|
dict[str, list[str]]
|
Mapping of dimension name to rank-sorted word list. |
score(methods: Sequence[ScoringMethod] = ('TF', 'TFIDF', 'WFIDF'), *, force: bool = False) -> dict[str, pd.DataFrame]
¶
Score every document under each requested method.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
methods
|
Sequence[ScoringMethod]
|
Any subset of TF, TFIDF, WFIDF, TFIDF+SIMWEIGHT, WFIDF+SIMWEIGHT. |
('TF', 'TFIDF', 'WFIDF')
|
force
|
bool
|
Recompute even if CSV outputs exist. |
False
|
Returns:
| Type | Description |
|---|---|
dict[str, DataFrame]
|
Mapping of method name to scores DataFrame. |
run(*, methods: Sequence[ScoringMethod] = ('TF', 'TFIDF', 'WFIDF'), force: bool = False) -> dict[str, pd.DataFrame]
¶
Run every stage end to end.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
methods
|
Sequence[ScoringMethod]
|
Scoring methods to compute. |
('TF', 'TFIDF', 'WFIDF')
|
force
|
bool
|
Rerun every stage regardless of existing outputs. |
False
|
Returns:
| Type | Description |
|---|---|
dict[str, DataFrame]
|
Mapping of method name to scores DataFrame. |
score_df(method: ScoringMethod = 'TFIDF') -> pd.DataFrame
¶
Return the scores DataFrame for one method.
firm_year(id_to_firm: pd.DataFrame, method: ScoringMethod = 'TFIDF') -> pd.DataFrame
¶
Aggregate scores to firm-year level.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
id_to_firm
|
DataFrame
|
DataFrame with |
required |
method
|
ScoringMethod
|
Which scores to aggregate. |
'TFIDF'
|
Returns:
| Type | Description |
|---|---|
DataFrame
|
Firm-year DataFrame. |
dictionary_preview(top_k: int = 10) -> pd.DataFrame
¶
Return a per-dimension preview of the expanded dictionary.
Useful for notebook display: rendering the returned DataFrame
shows seeds and the top-top_k expanded words side by side.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
top_k
|
int
|
How many expanded words to show per dimension. |
10
|
Returns:
| Type | Description |
|---|---|
DataFrame
|
A DataFrame with one row per dimension and columns |
DataFrame
|
|
show_dictionary(top_k: int = 10) -> None
¶
Pretty-print the expanded dictionary, dimension by dimension.
Replaces the boilerplate for dim in DIMS: print(...) loop.
Prints seeds with an in-vocab marker and the top-top_k
expanded words for each dimension.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
top_k
|
int
|
How many expanded words to show per dimension. |
10
|
edit_dictionary(remove: dict[str, Iterable[str]] | None = None, add: dict[str, Iterable[str]] | None = None) -> dict[str, list[str]]
¶
Manually curate the expanded dictionary.
The 2021 paper allowed researchers to drop noisy expansion candidates (and occasionally append domain words the model missed) before scoring. This method does both atomically: in-memory dict and on-disk CSV are updated together.
Use programmatically from a notebook, or pair with
:meth:reload_dictionary to drive curation from a spreadsheet.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
remove
|
dict[str, Iterable[str]] | None
|
Mapping of dimension name to words to drop. Words not present are silently ignored. |
None
|
add
|
dict[str, Iterable[str]] | None
|
Mapping of dimension name to words to append (at the end, after existing rank-sorted entries). Duplicates within a dimension are deduped. |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, list[str]]
|
The updated dict. |
Raises:
| Type | Description |
|---|---|
KeyError
|
If a referenced dimension is not in
|
reload_dictionary() -> dict[str, list[str]]
¶
Reread the dictionary CSV from disk.
Use this after editing pipeline.dict_path in a spreadsheet
or text editor. Both the in-memory score cache and any score
CSVs on disk are dropped, since they were computed against the
previous dict; the next call to score() will recompute.
Returns:
| Type | Description |
|---|---|
dict[str, list[str]]
|
The reloaded dict. |
from_text_file(text_path: str | Path, id_path: str | Path | None = None, *, work_dir: str | Path = 'runs', config: Config | None = None) -> Pipeline
classmethod
¶
Construct a pipeline from a one-document-per-line text file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
text_path
|
str | Path
|
Input text file, one document per line. |
required |
id_path
|
str | Path | None
|
Optional matching IDs file. |
None
|
work_dir
|
str | Path
|
Where to write artifacts. |
'runs'
|
config
|
Config | None
|
Pipeline config. |
None
|
Returns:
| Type | Description |
|---|---|
Pipeline
|
A new |
from_directory(dir_path: str | Path, pattern: str = '*.txt', *, work_dir: str | Path = 'runs', config: Config | None = None) -> Pipeline
classmethod
¶
Construct a pipeline from a directory of one-file-per-document text.
Common pattern: SEC filings where each 10-K is 10k_AAPL_2024.txt.
Each file's contents become one document; each file's stem (filename
without extension) becomes the document ID.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
dir_path
|
str | Path
|
Directory to scan. |
required |
pattern
|
str
|
Glob pattern, e.g. |
'*.txt'
|
work_dir
|
str | Path
|
Where to write artifacts. |
'runs'
|
config
|
Config | None
|
Pipeline config. |
None
|
Returns:
| Type | Description |
|---|---|
Pipeline
|
A new |
from_dataframe(df: pd.DataFrame, text_col: str = 'text', id_col: str | None = 'id', *, work_dir: str | Path = 'runs', config: Config | None = None) -> Pipeline
classmethod
¶
Construct a pipeline from a pandas DataFrame.
Most workshop / teaching examples load a CSV with pd.read_csv
and then have a DataFrame with one row per document. This is the
no-nonsense way in.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame
|
DataFrame with at least a text column. |
required |
text_col
|
str
|
Name of the column with document text. |
'text'
|
id_col
|
str | None
|
Name of the column with document IDs. Pass |
'id'
|
work_dir
|
str | Path
|
Where to write artifacts. |
'runs'
|
config
|
Config | None
|
Pipeline config. |
None
|
Returns:
| Type | Description |
|---|---|
Pipeline
|
A new |
from_csv(csv_path: str | Path, text_col: str = 'text', id_col: str | None = 'id', *, work_dir: str | Path = 'runs', config: Config | None = None, **read_csv_kwargs: Any) -> Pipeline
classmethod
¶
Construct a pipeline from a CSV file.
Equivalent to from_dataframe(pd.read_csv(csv_path, **kwargs)).
Extra kwargs pass through to pandas.read_csv so you can specify
delimiters, encodings, dtypes, etc.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
csv_path
|
str | Path
|
Path to the CSV file. |
required |
text_col
|
str
|
Name of the column with document text. |
'text'
|
id_col
|
str | None
|
Name of the column with document IDs. |
'id'
|
work_dir
|
str | Path
|
Where to write artifacts. |
'runs'
|
config
|
Config | None
|
Pipeline config. |
None
|
**read_csv_kwargs
|
Any
|
Forwarded to |
{}
|
Returns:
| Type | Description |
|---|---|
Pipeline
|
A new |
from_jsonl(jsonl_path: str | Path, text_key: str = 'text', id_key: str | None = 'id', *, work_dir: str | Path = 'runs', config: Config | None = None) -> Pipeline
classmethod
¶
Construct a pipeline from a JSON Lines file.
Each line of the file is a standalone JSON object. Useful for records exported from an API or a database in a streaming format.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
jsonl_path
|
str | Path
|
Path to |
required |
text_key
|
str
|
JSON key holding document text. |
'text'
|
id_key
|
str | None
|
JSON key holding document ID, or |
'id'
|
work_dir
|
str | Path
|
Where to write artifacts. |
'runs'
|
config
|
Config | None
|
Pipeline config. |
None
|
Returns:
| Type | Description |
|---|---|
Pipeline
|
A new |