"""Topic modeling using NMF on PPMI matrices with support for temporal alignment."""
from dataclasses import dataclass
import numpy as np
from scipy.optimize import linear_sum_assignment
from scipy.sparse import csr_matrix
from scipy.spatial.distance import cosine
from sklearn.decomposition import NMF
[docs]
@dataclass
class Topic:
"""Container for topic information.
Fields:
id: Unique topic identifier
words: List of (word, weight) pairs for top words
distribution: Full probability distribution over vocabulary
Examples
--------
>>> import numpy as np
>>> dist = np.array([0.5, 0.3, 0.2])
>>> topic = Topic(1, [('cat', 0.5), ('dog', 0.3)], dist)
>>> topic.id
1
>>> topic.words
[('cat', 0.5), ('dog', 0.3)]
>>> np.allclose(topic.distribution, [0.5, 0.3, 0.2])
True
"""
id: int
words: list[tuple[str, float]] # (word, weight) pairs
distribution: np.ndarray # Full word distribution
[docs]
@dataclass
class AlignedTopic:
"""Container for aligned topic pairs.
Fields:
source_topic: Topic from source time period
target_topic: Topic from target time period
similarity: Cosine similarity between topics
Examples
--------
>>> import numpy as np
>>> dist = np.array([0.5, 0.3, 0.2])
>>> topic1 = Topic(1, [('cat', 0.5)], dist)
>>> topic2 = Topic(2, [('dog', 0.4)], dist)
>>> aligned = AlignedTopic(topic1, topic2, 0.8)
>>> aligned.source_topic.id
1
>>> aligned.target_topic.id
2
>>> aligned.similarity
0.8
"""
source_topic: Topic
target_topic: Topic
similarity: float
[docs]
class TopicModel:
"""Topic model using NMF on PPMI matrices.
Supports temporal alignment of topics between different time periods.
"""
[docs]
def __init__(
self,
n_topics: int = 10,
max_iter: int = 500,
min_similarity: float = 0.1,
) -> None:
"""Initialize topic model.
Args:
----
n_topics: Number of topics to extract
max_iter: Maximum number of iterations for NMF
min_similarity: Minimum similarity for topic alignment
Examples:
--------
>>> model = TopicModel(n_topics=5, max_iter=100)
>>> model.n_topics
5
>>> model.max_iter
100
"""
self.n_topics = n_topics
self.max_iter = max_iter
self.min_similarity = min_similarity
self.nmf = NMF(
n_components=n_topics,
max_iter=max_iter,
init="nndsvd", # Better initialization for sparse data
)
self.vocabulary: list[str] = []
self.topics: list[Topic] = []
self.topic_word_matrix: np.ndarray | None = None
[docs]
def fit(
self, ppmi_matrix: csr_matrix, vocabulary: list[str], top_n_words: int = 10
) -> None:
"""Fit topic model to PPMI matrix.
Args:
----
ppmi_matrix: Sparse PPMI matrix from word embeddings
vocabulary: List of words corresponding to matrix columns
top_n_words: Number of top words to store per topic
Examples:
--------
>>> import numpy as np
>>> from scipy.sparse import csr_matrix
>>> model = TopicModel(n_topics=2)
>>> ppmi = csr_matrix([[1, 0], [0, 1]])
>>> model.fit(ppmi, ['word1', 'word2'])
>>> len(model.topics)
2
>>> isinstance(model.topics[0], Topic)
True
>>> len(model.vocabulary)
2
"""
self.vocabulary = vocabulary
# Run NMF
self.topic_word_matrix = self.nmf.fit_transform(ppmi_matrix)
word_topic_matrix = self.nmf.components_
# Create topic objects
self.topics = []
for topic_idx in range(self.n_topics):
# Get word weights for this topic
word_weights = word_topic_matrix[topic_idx]
# Normalize weights
word_weights = (
word_weights / word_weights.sum()
if word_weights.sum() > 0
else word_weights
)
# Get top words
top_indices = np.argsort(word_weights)[-top_n_words:][::-1]
top_words = [
(vocabulary[idx], float(word_weights[idx])) for idx in top_indices
]
# Create topic object with normalized distribution
topic = Topic(
id=topic_idx,
words=top_words,
distribution=word_weights,
)
self.topics.append(topic)
[docs]
def get_document_topics(
self, doc_vector: np.ndarray, threshold: float = 0.1
) -> list[tuple[int, float]]:
"""Get topic distribution for a document vector.
Args:
----
doc_vector: Document vector in vocabulary space
threshold: Minimum topic proportion to include
Returns:
-------
List of (topic_id, weight) pairs above threshold, sorted by weight
Examples:
--------
>>> import numpy as np
>>> from scipy.sparse import csr_matrix
>>> model = TopicModel(n_topics=2)
>>> ppmi = csr_matrix([[1, 0], [0, 1]])
>>> model.fit(ppmi, ['word1', 'word2'])
>>> doc = np.array([0.8, 0.2])
>>> topics = model.get_document_topics(doc, threshold=0.1)
>>> len(topics) > 0
True
>>> all(w >= 0.1 for _, w in topics)
True
"""
if self.topic_word_matrix is None:
raise ValueError("Model must be fit before getting document topics")
# Project document into topic space
doc_topics = self.nmf.transform(doc_vector.reshape(1, -1))[0]
# Normalize
doc_topics = (
doc_topics / np.sum(doc_topics) if np.sum(doc_topics) > 0 else doc_topics
)
# Get topics above threshold
topic_weights = [
(idx, float(weight))
for idx, weight in enumerate(doc_topics)
if weight > threshold
]
return sorted(topic_weights, key=lambda x: x[1], reverse=True)
[docs]
def _align_distributions(
self, topic1: Topic, topic2: Topic, vocab1: list[str], vocab2: list[str]
) -> tuple[np.ndarray, np.ndarray]:
"""Align two topic distributions to use the same vocabulary space.
Args:
----
topic1: First topic
topic2: Second topic
vocab1: Vocabulary for first topic
vocab2: Vocabulary for second topic
Returns:
-------
Tuple of aligned distributions
Examples:
--------
>>> import numpy as np
>>> model = TopicModel()
>>> dist1 = np.array([0.6, 0.4])
>>> dist2 = np.array([0.3, 0.7])
>>> t1 = Topic(1, [('cat', 0.6), ('dog', 0.4)], dist1)
>>> t2 = Topic(2, [('dog', 0.3), ('bird', 0.7)], dist2)
>>> aligned1, aligned2 = model._align_distributions(
... t1, t2, ['cat', 'dog'], ['dog', 'bird']
... )
>>> len(aligned1) == len(aligned2) # Same length after alignment
True
>>> np.allclose(aligned1.sum(), 1.0) # Still normalized
True
>>> np.allclose(aligned2.sum(), 1.0)
True
"""
# Create unified vocabulary and mapping
unified_vocab: set[str] = set(vocab1) | set(vocab2)
vocab1_idx = {word: idx for idx, word in enumerate(vocab1)}
vocab2_idx = {word: idx for idx, word in enumerate(vocab2)}
# Create aligned distributions
dist1_aligned = np.zeros(len(unified_vocab))
dist2_aligned = np.zeros(len(unified_vocab))
# Fill in values using original distributions
for idx, word in enumerate(sorted(unified_vocab)):
if word in vocab1_idx:
dist1_aligned[idx] = topic1.distribution[vocab1_idx[word]]
if word in vocab2_idx:
dist2_aligned[idx] = topic2.distribution[vocab2_idx[word]]
# Renormalize if necessary
if dist1_aligned.sum() > 0:
dist1_aligned /= dist1_aligned.sum()
if dist2_aligned.sum() > 0:
dist2_aligned /= dist2_aligned.sum()
return dist1_aligned, dist2_aligned
[docs]
def _compute_topic_similarity(self, topic1: Topic, topic2: Topic) -> float:
"""Compute cosine similarity between topic distributions.
Args:
----
topic1: First topic
topic2: Second topic
Returns:
-------
Cosine similarity between the topics
Examples:
--------
>>> import numpy as np
>>> model = TopicModel()
>>> dist1 = np.array([1, 0])
>>> dist2 = np.array([0, 1])
>>> t1 = Topic(1, [('cat', 1.0)], dist1)
>>> t2 = Topic(2, [('dog', 1.0)], dist2)
>>> sim = model._compute_topic_similarity(t1, t2)
>>> round(sim, 1)
0.0
"""
# Align distributions before computing similarity
dist1_aligned, dist2_aligned = self._align_distributions(
topic1, topic2, self.vocabulary, self.vocabulary
)
# Handle edge cases: if either vector is all zeros, similarity is 0
if np.all(dist1_aligned == 0) or np.all(dist2_aligned == 0):
return 0.0
# Calculate cosine similarity, handling potential numerical issues
try:
similarity = 1 - cosine(dist1_aligned, dist2_aligned)
# Handle edge case where cosine similarity is slightly outside [-1, 1]
if np.isnan(similarity):
return 0.0
return float(np.clip(similarity, -1.0, 1.0))
except Exception:
return 0.0
[docs]
def align_with(self, other: "TopicModel") -> list[AlignedTopic]:
"""Align topics with another model using Hungarian algorithm.
Args:
----
other: Another fitted TopicModel
Returns:
-------
List of aligned topic pairs sorted by similarity
Raises:
------
ValueError: If either model is not fitted
Examples:
--------
>>> import numpy as np
>>> from scipy.sparse import csr_matrix
>>> model1 = TopicModel(n_topics=2)
>>> model2 = TopicModel(n_topics=2)
>>> ppmi = csr_matrix([[1, 0], [0, 1]])
>>> model1.fit(ppmi, ['word1', 'word2'])
>>> model2.fit(ppmi, ['word1', 'word2'])
>>> aligned = model1.align_with(model2)
>>> len(aligned) > 0
True
>>> isinstance(aligned[0], AlignedTopic)
True
"""
if not self.topics or not other.topics:
raise ValueError("Both models must be fit before alignment")
# Pre-compute unified vocabulary mappings once
unified_vocab = sorted(set(self.vocabulary) | set(other.vocabulary))
vocab1_idx = {word: idx for idx, word in enumerate(self.vocabulary)}
vocab2_idx = {word: idx for idx, word in enumerate(other.vocabulary)}
unified_size = len(unified_vocab)
word_to_unified = {word: idx for idx, word in enumerate(unified_vocab)}
# Compute cost matrix using pre-computed mappings
cost_matrix = np.zeros((self.n_topics, other.n_topics))
for i, topic1 in enumerate(self.topics):
for j, topic2 in enumerate(other.topics):
dist1 = np.zeros(unified_size)
dist2 = np.zeros(unified_size)
for word, uidx in word_to_unified.items():
if word in vocab1_idx:
dist1[uidx] = topic1.distribution[vocab1_idx[word]]
if word in vocab2_idx:
dist2[uidx] = topic2.distribution[vocab2_idx[word]]
if dist1.sum() > 0:
dist1 /= dist1.sum()
if dist2.sum() > 0:
dist2 /= dist2.sum()
similarity = 1 - cosine(dist1, dist2)
cost_matrix[i, j] = 1 - similarity
# Find optimal matching
source_indices, target_indices = linear_sum_assignment(cost_matrix)
# Create aligned topic pairs
aligned_topics = []
for source_idx, target_idx in zip(source_indices, target_indices, strict=False):
similarity = 1 - cost_matrix[source_idx, target_idx]
if similarity >= self.min_similarity:
aligned_topics.append(
AlignedTopic(
source_topic=self.topics[source_idx],
target_topic=other.topics[target_idx],
similarity=float(similarity),
)
)
return sorted(aligned_topics, key=lambda x: x.similarity, reverse=True)
[docs]
def print_topics(self, top_n: int = 10) -> None:
"""Print top words for each topic.
Args:
----
top_n: Number of top words to print per topic
Examples:
--------
>>> from scipy.sparse import csr_matrix
>>> model = TopicModel(n_topics=1)
>>> ppmi = csr_matrix([[1, 0], [0, 1]])
>>> model.fit(ppmi, ['word1', 'word2'])
>>> model.print_topics(top_n=2) # doctest: +ELLIPSIS
<BLANKLINE>
Topic 0:
word...: 1.0000
word...: 0.0000
"""
if not self.topics:
print("No topics available. Model needs to be fit first.")
return
for topic in self.topics:
print(f"\nTopic {topic.id}:")
n_words = min(top_n, len(topic.words))
for word, weight in topic.words[:n_words]:
print(f" {word}: {weight:.4f}")