Skip to content

Pipeline

Pipeline is the end-to-end orchestrator for the Word2Vec dictionary expansion and scoring workflow. It wraps six stages behind a single class and writes every intermediate artifact under work_dir, so reruns skip any stage whose outputs already exist unless force=True is passed.

The six stages:

  • parse: Phase 1 preprocessing (MWE join, NER mask) via the configured backend.
  • clean: lowercase, strip punctuation, drop stopwords, keep [NER:*] placeholders.
  • phrase: gensim Phrases statistical bigram / trigram learning (Phase 2).
  • train: Word2Vec training on the phrase-expanded corpus.
  • expand_dictionary: grow each seed list into a per-dimension dictionary via nearest-neighbor search on the trained vectors.
  • score: compute document-level TF, TFIDF, WFIDF, and optional SIMWEIGHT variants.

The two construction phases are covered in Two-phase preprocessing. The individual stage knobs live on Config.

End-to-end seed-expansion measurement pipeline.

Typical usage::

from lmsy_w2v_rfs import Pipeline, Config

seeds = {
    "risk":   ["risk", "uncertainty", "volatility"],
    "growth": ["growth", "expansion", "scale"],
}
p = Pipeline(
    texts=my_texts,
    doc_ids=my_ids,
    work_dir="runs/demo",
    config=Config(seeds=seeds, preprocessor="none"),
)
p.run()
p.show_dictionary(top_k=10)
scores = p.score_df("TFIDF")

Attributes:

Name Type Description
texts

Raw document strings.

doc_ids

Matching IDs.

work_dir

Directory for all intermediate and output files.

config

Hyperparameters. Required.

parsed_sents_path: Path property

Sentence output from Phase 1a, one preprocessed sentence per line.

parsed_ids_path: Path property

Parallel sentence IDs file; one docID_sentN ID per line.

cleaned_path: Path property

Cleaned sentences after stopword and punctuation removal.

phrase_corpus_path: Path property

Sentences after the last gensim Phrases pass.

training_corpus_path: Path property

Final sentence file fed into Word2Vec.

w2v_path: Path property

Saved gensim Word2Vec model.

dict_path: Path property

Expanded per-dimension word list as CSV.

culture_dict: dict[str, list[str]] property

The expanded dictionary, rank-sorted.

Alias for expanded_dict; kept for compatibility with prose from the 2021 paper. The returned dict is the SAME object held on the pipeline; mutating it mutates pipeline state.

expanded_dict: dict[str, list[str]] property

The expanded dictionary, rank-sorted, theory-agnostic name.

w2v: Word2Vec property

The trained Word2Vec model.

scores_path(method: ScoringMethod) -> Path

Path to the document-level scores CSV for a given method.

Parameters:

Name Type Description Default
method ScoringMethod

One of TF, TFIDF, WFIDF, TFIDF+SIMWEIGHT, WFIDF+SIMWEIGHT.

required

Returns:

Type Description
Path

Expected CSV path under work_dir/outputs/.

parse(*, force: bool = False) -> None

Run Phase 1: preprocess raw documents with the configured backend.

Each sentence is written as one line of space-joined tokens. MWE groups are joined by _ and named entities are replaced with [NER:TYPE] placeholders. Sentence IDs go to a parallel file.

If config.mwe_list is set, a static-MWE post-pass runs after the main preprocessor to catch MWEs the parser missed.

Parameters:

Name Type Description Default
force bool

Rerun even if output files exist.

False

clean(*, force: bool = False) -> None

Lowercase, drop stopwords and punctuation, strip CoreNLP tags.

Parameters:

Name Type Description Default
force bool

Rerun even if output exists.

False

phrase(*, force: bool = False) -> Path

Run gensim Phrases (if enabled) and return the final corpus path.

Parameters:

Name Type Description Default
force bool

Rerun even if output exists.

False

Returns:

Type Description
Path

Path to the final phrase-expanded sentence file.

train(*, force: bool = False) -> Word2Vec

Train (or load) the Word2Vec model.

Parameters:

Name Type Description Default
force bool

Retrain even if w2v.mod exists.

False

Returns:

Type Description
Word2Vec

The loaded or trained model.

expand_dictionary(*, force: bool = False) -> dict[str, list[str]]

Expand seeds into a per-dimension culture dictionary.

Parameters:

Name Type Description Default
force bool

Rebuild even if the dictionary CSV exists.

False

Returns:

Type Description
dict[str, list[str]]

Mapping of dimension name to rank-sorted word list.

score(methods: Sequence[ScoringMethod] = ('TF', 'TFIDF', 'WFIDF'), *, force: bool = False) -> dict[str, pd.DataFrame]

Score every document under each requested method.

Parameters:

Name Type Description Default
methods Sequence[ScoringMethod]

Any subset of TF, TFIDF, WFIDF, TFIDF+SIMWEIGHT, WFIDF+SIMWEIGHT.

('TF', 'TFIDF', 'WFIDF')
force bool

Recompute even if CSV outputs exist.

False

Returns:

Type Description
dict[str, DataFrame]

Mapping of method name to scores DataFrame.

run(*, methods: Sequence[ScoringMethod] = ('TF', 'TFIDF', 'WFIDF'), force: bool = False) -> dict[str, pd.DataFrame]

Run every stage end to end.

Parameters:

Name Type Description Default
methods Sequence[ScoringMethod]

Scoring methods to compute.

('TF', 'TFIDF', 'WFIDF')
force bool

Rerun every stage regardless of existing outputs.

False

Returns:

Type Description
dict[str, DataFrame]

Mapping of method name to scores DataFrame.

score_df(method: ScoringMethod = 'TFIDF') -> pd.DataFrame

Return the scores DataFrame for one method.

firm_year(id_to_firm: pd.DataFrame, method: ScoringMethod = 'TFIDF') -> pd.DataFrame

Aggregate scores to firm-year level.

Parameters:

Name Type Description Default
id_to_firm DataFrame

DataFrame with document_id, firm_id, time.

required
method ScoringMethod

Which scores to aggregate.

'TFIDF'

Returns:

Type Description
DataFrame

Firm-year DataFrame.

dictionary_preview(top_k: int = 10) -> pd.DataFrame

Return a per-dimension preview of the expanded dictionary.

Useful for notebook display: rendering the returned DataFrame shows seeds and the top-top_k expanded words side by side.

Parameters:

Name Type Description Default
top_k int

How many expanded words to show per dimension.

10

Returns:

Type Description
DataFrame

A DataFrame with one row per dimension and columns

DataFrame

dimension, seeds, expanded_top_k, n_expanded.

show_dictionary(top_k: int = 10) -> None

Pretty-print the expanded dictionary, dimension by dimension.

Replaces the boilerplate for dim in DIMS: print(...) loop. Prints seeds with an in-vocab marker and the top-top_k expanded words for each dimension.

Parameters:

Name Type Description Default
top_k int

How many expanded words to show per dimension.

10

edit_dictionary(remove: dict[str, Iterable[str]] | None = None, add: dict[str, Iterable[str]] | None = None) -> dict[str, list[str]]

Manually curate the expanded dictionary.

The 2021 paper allowed researchers to drop noisy expansion candidates (and occasionally append domain words the model missed) before scoring. This method does both atomically: in-memory dict and on-disk CSV are updated together.

Use programmatically from a notebook, or pair with :meth:reload_dictionary to drive curation from a spreadsheet.

Parameters:

Name Type Description Default
remove dict[str, Iterable[str]] | None

Mapping of dimension name to words to drop. Words not present are silently ignored.

None
add dict[str, Iterable[str]] | None

Mapping of dimension name to words to append (at the end, after existing rank-sorted entries). Duplicates within a dimension are deduped.

None

Returns:

Type Description
dict[str, list[str]]

The updated dict.

Raises:

Type Description
KeyError

If a referenced dimension is not in config.seeds.

reload_dictionary() -> dict[str, list[str]]

Reread the dictionary CSV from disk.

Use this after editing pipeline.dict_path in a spreadsheet or text editor. Both the in-memory score cache and any score CSVs on disk are dropped, since they were computed against the previous dict; the next call to score() will recompute.

Returns:

Type Description
dict[str, list[str]]

The reloaded dict.

from_text_file(text_path: str | Path, id_path: str | Path | None = None, *, work_dir: str | Path = 'runs', config: Config | None = None) -> Pipeline classmethod

Construct a pipeline from a one-document-per-line text file.

Parameters:

Name Type Description Default
text_path str | Path

Input text file, one document per line.

required
id_path str | Path | None

Optional matching IDs file.

None
work_dir str | Path

Where to write artifacts.

'runs'
config Config | None

Pipeline config.

None

Returns:

Type Description
Pipeline

A new Pipeline.

from_directory(dir_path: str | Path, pattern: str = '*.txt', *, work_dir: str | Path = 'runs', config: Config | None = None) -> Pipeline classmethod

Construct a pipeline from a directory of one-file-per-document text.

Common pattern: SEC filings where each 10-K is 10k_AAPL_2024.txt. Each file's contents become one document; each file's stem (filename without extension) becomes the document ID.

Parameters:

Name Type Description Default
dir_path str | Path

Directory to scan.

required
pattern str

Glob pattern, e.g. "*.txt" or "**/*.md".

'*.txt'
work_dir str | Path

Where to write artifacts.

'runs'
config Config | None

Pipeline config.

None

Returns:

Type Description
Pipeline

A new Pipeline.

from_dataframe(df: pd.DataFrame, text_col: str = 'text', id_col: str | None = 'id', *, work_dir: str | Path = 'runs', config: Config | None = None) -> Pipeline classmethod

Construct a pipeline from a pandas DataFrame.

Most workshop / teaching examples load a CSV with pd.read_csv and then have a DataFrame with one row per document. This is the no-nonsense way in.

Parameters:

Name Type Description Default
df DataFrame

DataFrame with at least a text column.

required
text_col str

Name of the column with document text.

'text'
id_col str | None

Name of the column with document IDs. Pass None to use the DataFrame's row index as IDs.

'id'
work_dir str | Path

Where to write artifacts.

'runs'
config Config | None

Pipeline config.

None

Returns:

Type Description
Pipeline

A new Pipeline.

from_csv(csv_path: str | Path, text_col: str = 'text', id_col: str | None = 'id', *, work_dir: str | Path = 'runs', config: Config | None = None, **read_csv_kwargs: Any) -> Pipeline classmethod

Construct a pipeline from a CSV file.

Equivalent to from_dataframe(pd.read_csv(csv_path, **kwargs)). Extra kwargs pass through to pandas.read_csv so you can specify delimiters, encodings, dtypes, etc.

Parameters:

Name Type Description Default
csv_path str | Path

Path to the CSV file.

required
text_col str

Name of the column with document text.

'text'
id_col str | None

Name of the column with document IDs.

'id'
work_dir str | Path

Where to write artifacts.

'runs'
config Config | None

Pipeline config.

None
**read_csv_kwargs Any

Forwarded to pandas.read_csv.

{}

Returns:

Type Description
Pipeline

A new Pipeline.

from_jsonl(jsonl_path: str | Path, text_key: str = 'text', id_key: str | None = 'id', *, work_dir: str | Path = 'runs', config: Config | None = None) -> Pipeline classmethod

Construct a pipeline from a JSON Lines file.

Each line of the file is a standalone JSON object. Useful for records exported from an API or a database in a streaming format.

Parameters:

Name Type Description Default
jsonl_path str | Path

Path to .jsonl file.

required
text_key str

JSON key holding document text.

'text'
id_key str | None

JSON key holding document ID, or None to use the 1-based line number.

'id'
work_dir str | Path

Where to write artifacts.

'runs'
config Config | None

Pipeline config.

None

Returns:

Type Description
Pipeline

A new Pipeline.