lda
Classes¶
LDAModule (KiaraModule)
¶
Perform Latent Dirichlet Allocation on a tokenized corpus.
This module computes models for a range of number of topics provided by the user.
Source code in language_processing/modules/lda.py
class LDAModule(KiaraModule):
"""Perform Latent Dirichlet Allocation on a tokenized corpus.
This module computes models for a range of number of topics provided by the user.
"""
_module_type_name = "generate.LDA.for.tokens_array"
KIARA_METADATA = {
"tags": ["LDA", "tokens"],
}
def create_inputs_schema(
self,
) -> ValueSetSchema:
inputs: Dict[str, Dict[str, Any]] = {
"tokens_array": {"type": "array", "doc": "The text corpus."},
"num_topics_min": {
"type": "integer",
"doc": "The minimal number of topics.",
"default": 7,
},
"num_topics_max": {
"type": "integer",
"doc": "The max number of topics.",
"optional": True,
},
"compute_coherence": {
"type": "boolean",
"doc": "Whether to compute the coherence score for each model.",
"default": False,
},
"words_per_topic": {
"type": "integer",
"doc": "How many words per topic to put in the result model.",
"default": 10,
},
}
return inputs
def create_outputs_schema(
self,
) -> ValueSetSchema:
outputs = {
"topic_models": {
"type": "dict",
"doc": "A dictionary with one coherence model table for each number of topics.",
},
"coherence_table": {
"type": "table",
"doc": "Coherence details.",
"optional": True,
},
"coherence_map": {
"type": "dict",
"doc": "A map with the coherence value for every number of topics.",
},
}
return outputs
def create_model(self, corpus, num_topics: int, id2word: Mapping[str, int]):
from gensim.models import LdaModel
model = LdaModel(
corpus, id2word=id2word, num_topics=num_topics, eval_every=None
)
return model
def compute_coherence(self, model, corpus_model, id2word: Mapping[str, int]):
from gensim.models import CoherenceModel
coherencemodel = CoherenceModel(
model=model,
texts=corpus_model,
dictionary=id2word,
coherence="c_v",
processes=1,
)
coherence_value = coherencemodel.get_coherence()
return coherence_value
def assemble_coherence(self, models_dict: Mapping[int, Any], words_per_topic: int):
import pandas as pd
import pyarrow as pa
# Create list with topics and topic words for each number of topics
num_topics_list = []
topics_list = []
for (
num_topics,
model,
) in models_dict.items():
num_topics_list.append(num_topics)
topic_print = model.print_topics(num_words=words_per_topic)
topics_list.append(topic_print)
df_coherence_table = pd.DataFrame(columns=["topic_id", "words", "num_topics"])
idx = 0
for i in range(len(topics_list)):
for j in range(len(topics_list[i])):
df_coherence_table.loc[idx] = ""
df_coherence_table["topic_id"].loc[idx] = j + 1
df_coherence_table["words"].loc[idx] = ", ".join(
re.findall(r'"(\w+)"', topics_list[i][j][1])
)
df_coherence_table["num_topics"].loc[idx] = num_topics_list[i]
idx += 1
coherence_table = pa.Table.from_pandas(df_coherence_table, preserve_index=False)
return coherence_table
def process(self, inputs: ValueMap, outputs: ValueMap) -> None:
from gensim import corpora
logging.getLogger("gensim").setLevel(logging.ERROR)
tokens_array: KiaraArray = inputs.get_value_data("tokens_array")
tokens = tokens_array.arrow_array.to_pylist()
words_per_topic = inputs.get_value_data("words_per_topic")
num_topics_min = inputs.get_value_data("num_topics_min")
num_topics_max = inputs.get_value_data("num_topics_max")
if num_topics_max is None:
num_topics_max = num_topics_min
compute_coherence = inputs.get_value_data("compute_coherence")
id2word = corpora.Dictionary(tokens)
corpus = [id2word.doc2bow(text) for text in tokens]
# model = gensim.models.ldamulticore.LdaMulticore(
# corpus, id2word=id2word, num_topics=num_topics, eval_every=None
# )
models = {}
model_tables = {}
coherence = {}
# multi_threaded = False
# if not multi_threaded:
for nt in range(num_topics_min, num_topics_max + 1):
model = self.create_model(corpus=corpus, num_topics=nt, id2word=id2word)
models[nt] = model
topic_print_model = model.print_topics(num_words=words_per_topic)
# dbg(topic_print_model)
# df = pd.DataFrame(topic_print_model, columns=["topic_id", "words"])
# TODO: create table directly
# result_table = Table.from_pandas(df)
model_tables[nt] = topic_print_model
if compute_coherence:
coherence_result = self.compute_coherence(
model=model, corpus_model=tokens, id2word=id2word
)
coherence[nt] = coherence_result
# else:
# def create_model(num_topics):
# model = self.create_model(corpus=corpus, num_topics=num_topics, id2word=id2word)
# topic_print_model = model.print_topics(num_words=30)
# df = pd.DataFrame(topic_print_model, columns=["topic_id", "words"])
# # TODO: create table directly
# result_table = Table.from_pandas(df)
# coherence_result = None
# if compute_coherence:
# coherence_result = self.compute_coherence(model=model, corpus_model=tokens, id2word=id2word)
# return (num_topics, model, result_table, coherence_result)
#
# executor = ThreadPoolExecutor()
# results: typing.Any = executor.map(create_model, range(num_topics_min, num_topics_max+1))
# executor.shutdown(wait=True)
# for r in results:
# models[r[0]] = r[1]
# model_tables[r[0]] = r[2]
# if compute_coherence:
# coherence[r[0]] = r[3]
# df_coherence = pd.DataFrame(coherence.keys(), columns=["Number of topics"])
# df_coherence["Coherence"] = coherence.values()
if compute_coherence:
coherence_table = self.assemble_coherence(
models_dict=models, words_per_topic=words_per_topic
)
else:
coherence_table = None
coherence_map = {k: v.item() for k, v in coherence.items()}
outputs.set_values(
topic_models=model_tables,
coherence_table=coherence_table,
coherence_map=coherence_map,
)
KIARA_METADATA
¶
Methods¶
assemble_coherence(self, models_dict, words_per_topic)
¶
Source code in language_processing/modules/lda.py
def assemble_coherence(self, models_dict: Mapping[int, Any], words_per_topic: int):
import pandas as pd
import pyarrow as pa
# Create list with topics and topic words for each number of topics
num_topics_list = []
topics_list = []
for (
num_topics,
model,
) in models_dict.items():
num_topics_list.append(num_topics)
topic_print = model.print_topics(num_words=words_per_topic)
topics_list.append(topic_print)
df_coherence_table = pd.DataFrame(columns=["topic_id", "words", "num_topics"])
idx = 0
for i in range(len(topics_list)):
for j in range(len(topics_list[i])):
df_coherence_table.loc[idx] = ""
df_coherence_table["topic_id"].loc[idx] = j + 1
df_coherence_table["words"].loc[idx] = ", ".join(
re.findall(r'"(\w+)"', topics_list[i][j][1])
)
df_coherence_table["num_topics"].loc[idx] = num_topics_list[i]
idx += 1
coherence_table = pa.Table.from_pandas(df_coherence_table, preserve_index=False)
return coherence_table
compute_coherence(self, model, corpus_model, id2word)
¶
Source code in language_processing/modules/lda.py
def compute_coherence(self, model, corpus_model, id2word: Mapping[str, int]):
from gensim.models import CoherenceModel
coherencemodel = CoherenceModel(
model=model,
texts=corpus_model,
dictionary=id2word,
coherence="c_v",
processes=1,
)
coherence_value = coherencemodel.get_coherence()
return coherence_value
create_inputs_schema(self)
¶
Return the schema for this types' inputs.
Source code in language_processing/modules/lda.py
def create_inputs_schema(
self,
) -> ValueSetSchema:
inputs: Dict[str, Dict[str, Any]] = {
"tokens_array": {"type": "array", "doc": "The text corpus."},
"num_topics_min": {
"type": "integer",
"doc": "The minimal number of topics.",
"default": 7,
},
"num_topics_max": {
"type": "integer",
"doc": "The max number of topics.",
"optional": True,
},
"compute_coherence": {
"type": "boolean",
"doc": "Whether to compute the coherence score for each model.",
"default": False,
},
"words_per_topic": {
"type": "integer",
"doc": "How many words per topic to put in the result model.",
"default": 10,
},
}
return inputs
create_model(self, corpus, num_topics, id2word)
¶
Source code in language_processing/modules/lda.py
def create_model(self, corpus, num_topics: int, id2word: Mapping[str, int]):
from gensim.models import LdaModel
model = LdaModel(
corpus, id2word=id2word, num_topics=num_topics, eval_every=None
)
return model
create_outputs_schema(self)
¶
Return the schema for this types' outputs.
Source code in language_processing/modules/lda.py
def create_outputs_schema(
self,
) -> ValueSetSchema:
outputs = {
"topic_models": {
"type": "dict",
"doc": "A dictionary with one coherence model table for each number of topics.",
},
"coherence_table": {
"type": "table",
"doc": "Coherence details.",
"optional": True,
},
"coherence_map": {
"type": "dict",
"doc": "A map with the coherence value for every number of topics.",
},
}
return outputs
process(self, inputs, outputs)
¶
Source code in language_processing/modules/lda.py
def process(self, inputs: ValueMap, outputs: ValueMap) -> None:
from gensim import corpora
logging.getLogger("gensim").setLevel(logging.ERROR)
tokens_array: KiaraArray = inputs.get_value_data("tokens_array")
tokens = tokens_array.arrow_array.to_pylist()
words_per_topic = inputs.get_value_data("words_per_topic")
num_topics_min = inputs.get_value_data("num_topics_min")
num_topics_max = inputs.get_value_data("num_topics_max")
if num_topics_max is None:
num_topics_max = num_topics_min
compute_coherence = inputs.get_value_data("compute_coherence")
id2word = corpora.Dictionary(tokens)
corpus = [id2word.doc2bow(text) for text in tokens]
# model = gensim.models.ldamulticore.LdaMulticore(
# corpus, id2word=id2word, num_topics=num_topics, eval_every=None
# )
models = {}
model_tables = {}
coherence = {}
# multi_threaded = False
# if not multi_threaded:
for nt in range(num_topics_min, num_topics_max + 1):
model = self.create_model(corpus=corpus, num_topics=nt, id2word=id2word)
models[nt] = model
topic_print_model = model.print_topics(num_words=words_per_topic)
# dbg(topic_print_model)
# df = pd.DataFrame(topic_print_model, columns=["topic_id", "words"])
# TODO: create table directly
# result_table = Table.from_pandas(df)
model_tables[nt] = topic_print_model
if compute_coherence:
coherence_result = self.compute_coherence(
model=model, corpus_model=tokens, id2word=id2word
)
coherence[nt] = coherence_result
# else:
# def create_model(num_topics):
# model = self.create_model(corpus=corpus, num_topics=num_topics, id2word=id2word)
# topic_print_model = model.print_topics(num_words=30)
# df = pd.DataFrame(topic_print_model, columns=["topic_id", "words"])
# # TODO: create table directly
# result_table = Table.from_pandas(df)
# coherence_result = None
# if compute_coherence:
# coherence_result = self.compute_coherence(model=model, corpus_model=tokens, id2word=id2word)
# return (num_topics, model, result_table, coherence_result)
#
# executor = ThreadPoolExecutor()
# results: typing.Any = executor.map(create_model, range(num_topics_min, num_topics_max+1))
# executor.shutdown(wait=True)
# for r in results:
# models[r[0]] = r[1]
# model_tables[r[0]] = r[2]
# if compute_coherence:
# coherence[r[0]] = r[3]
# df_coherence = pd.DataFrame(coherence.keys(), columns=["Number of topics"])
# df_coherence["Coherence"] = coherence.values()
if compute_coherence:
coherence_table = self.assemble_coherence(
models_dict=models, words_per_topic=words_per_topic
)
else:
coherence_table = None
coherence_map = {k: v.item() for k, v in coherence.items()}
outputs.set_values(
topic_models=model_tables,
coherence_table=coherence_table,
coherence_map=coherence_map,
)