Top-level package for kiara_plugin.language_processing.
KIARA_METADATA
¶
find_data_types: Union[Type, Tuple, Callable]
¶
find_model_classes: Union[Type, Tuple, Callable]
¶
find_modules: Union[Type, Tuple, Callable]
¶
find_pipelines: Union[Type, Tuple, Callable]
¶
get_version()
¶
Source code in language_processing/__init__.py
def get_version():
from pkg_resources import DistributionNotFound, get_distribution
try:
# Change here if project is renamed and does not equal the package name
dist_name = __name__
__version__ = get_distribution(dist_name).version
except DistributionNotFound:
try:
version_file = os.path.join(os.path.dirname(__file__), "version.txt")
if os.path.exists(version_file):
with open(version_file, encoding="utf-8") as vf:
__version__ = vf.read()
else:
__version__ = "unknown"
except (Exception):
pass
if __version__ is None:
__version__ = "unknown"
return __version__
Modules¶
data_types
¶
This module contains the value type classes that are used in the kiara_plugin.language_processing
package.
models
¶
This module contains the metadata (and other) models that are used in the kiara_plugin.language_processing
package.
Those models are convenience wrappers that make it easier for kiara to find, create, manage and version metadata -- but also other type of models -- that is attached to data, as well as kiara modules.
Metadata models must be a sub-class of [kiara.metadata.MetadataModel][]. Other models usually sub-class a pydantic BaseModel or implement custom base classes.
modules
special
¶
Modules¶
lda
¶
Classes¶
LDAModule (KiaraModule)
¶Perform Latent Dirichlet Allocation on a tokenized corpus.
This module computes models for a range of number of topics provided by the user.
Source code in language_processing/modules/lda.py
class LDAModule(KiaraModule):
"""Perform Latent Dirichlet Allocation on a tokenized corpus.
This module computes models for a range of number of topics provided by the user.
"""
_module_type_name = "generate.LDA.for.tokens_array"
KIARA_METADATA = {
"tags": ["LDA", "tokens"],
}
def create_inputs_schema(
self,
) -> ValueSetSchema:
inputs: Dict[str, Dict[str, Any]] = {
"tokens_array": {"type": "array", "doc": "The text corpus."},
"num_topics_min": {
"type": "integer",
"doc": "The minimal number of topics.",
"default": 7,
},
"num_topics_max": {
"type": "integer",
"doc": "The max number of topics.",
"optional": True,
},
"compute_coherence": {
"type": "boolean",
"doc": "Whether to compute the coherence score for each model.",
"default": False,
},
"words_per_topic": {
"type": "integer",
"doc": "How many words per topic to put in the result model.",
"default": 10,
},
}
return inputs
def create_outputs_schema(
self,
) -> ValueSetSchema:
outputs = {
"topic_models": {
"type": "dict",
"doc": "A dictionary with one coherence model table for each number of topics.",
},
"coherence_table": {
"type": "table",
"doc": "Coherence details.",
"optional": True,
},
"coherence_map": {
"type": "dict",
"doc": "A map with the coherence value for every number of topics.",
},
}
return outputs
def create_model(self, corpus, num_topics: int, id2word: Mapping[str, int]):
from gensim.models import LdaModel
model = LdaModel(
corpus, id2word=id2word, num_topics=num_topics, eval_every=None
)
return model
def compute_coherence(self, model, corpus_model, id2word: Mapping[str, int]):
from gensim.models import CoherenceModel
coherencemodel = CoherenceModel(
model=model,
texts=corpus_model,
dictionary=id2word,
coherence="c_v",
processes=1,
)
coherence_value = coherencemodel.get_coherence()
return coherence_value
def assemble_coherence(self, models_dict: Mapping[int, Any], words_per_topic: int):
import pandas as pd
import pyarrow as pa
# Create list with topics and topic words for each number of topics
num_topics_list = []
topics_list = []
for (
num_topics,
model,
) in models_dict.items():
num_topics_list.append(num_topics)
topic_print = model.print_topics(num_words=words_per_topic)
topics_list.append(topic_print)
df_coherence_table = pd.DataFrame(columns=["topic_id", "words", "num_topics"])
idx = 0
for i in range(len(topics_list)):
for j in range(len(topics_list[i])):
df_coherence_table.loc[idx] = ""
df_coherence_table["topic_id"].loc[idx] = j + 1
df_coherence_table["words"].loc[idx] = ", ".join(
re.findall(r'"(\w+)"', topics_list[i][j][1])
)
df_coherence_table["num_topics"].loc[idx] = num_topics_list[i]
idx += 1
coherence_table = pa.Table.from_pandas(df_coherence_table, preserve_index=False)
return coherence_table
def process(self, inputs: ValueMap, outputs: ValueMap) -> None:
from gensim import corpora
logging.getLogger("gensim").setLevel(logging.ERROR)
tokens_array: KiaraArray = inputs.get_value_data("tokens_array")
tokens = tokens_array.arrow_array.to_pylist()
words_per_topic = inputs.get_value_data("words_per_topic")
num_topics_min = inputs.get_value_data("num_topics_min")
num_topics_max = inputs.get_value_data("num_topics_max")
if num_topics_max is None:
num_topics_max = num_topics_min
compute_coherence = inputs.get_value_data("compute_coherence")
id2word = corpora.Dictionary(tokens)
corpus = [id2word.doc2bow(text) for text in tokens]
# model = gensim.models.ldamulticore.LdaMulticore(
# corpus, id2word=id2word, num_topics=num_topics, eval_every=None
# )
models = {}
model_tables = {}
coherence = {}
# multi_threaded = False
# if not multi_threaded:
for nt in range(num_topics_min, num_topics_max + 1):
model = self.create_model(corpus=corpus, num_topics=nt, id2word=id2word)
models[nt] = model
topic_print_model = model.print_topics(num_words=words_per_topic)
# dbg(topic_print_model)
# df = pd.DataFrame(topic_print_model, columns=["topic_id", "words"])
# TODO: create table directly
# result_table = Table.from_pandas(df)
model_tables[nt] = topic_print_model
if compute_coherence:
coherence_result = self.compute_coherence(
model=model, corpus_model=tokens, id2word=id2word
)
coherence[nt] = coherence_result
# else:
# def create_model(num_topics):
# model = self.create_model(corpus=corpus, num_topics=num_topics, id2word=id2word)
# topic_print_model = model.print_topics(num_words=30)
# df = pd.DataFrame(topic_print_model, columns=["topic_id", "words"])
# # TODO: create table directly
# result_table = Table.from_pandas(df)
# coherence_result = None
# if compute_coherence:
# coherence_result = self.compute_coherence(model=model, corpus_model=tokens, id2word=id2word)
# return (num_topics, model, result_table, coherence_result)
#
# executor = ThreadPoolExecutor()
# results: typing.Any = executor.map(create_model, range(num_topics_min, num_topics_max+1))
# executor.shutdown(wait=True)
# for r in results:
# models[r[0]] = r[1]
# model_tables[r[0]] = r[2]
# if compute_coherence:
# coherence[r[0]] = r[3]
# df_coherence = pd.DataFrame(coherence.keys(), columns=["Number of topics"])
# df_coherence["Coherence"] = coherence.values()
if compute_coherence:
coherence_table = self.assemble_coherence(
models_dict=models, words_per_topic=words_per_topic
)
else:
coherence_table = None
coherence_map = {k: v.item() for k, v in coherence.items()}
outputs.set_values(
topic_models=model_tables,
coherence_table=coherence_table,
coherence_map=coherence_map,
)
KIARA_METADATA
¶assemble_coherence(self, models_dict, words_per_topic)
¶Source code in language_processing/modules/lda.py
def assemble_coherence(self, models_dict: Mapping[int, Any], words_per_topic: int):
import pandas as pd
import pyarrow as pa
# Create list with topics and topic words for each number of topics
num_topics_list = []
topics_list = []
for (
num_topics,
model,
) in models_dict.items():
num_topics_list.append(num_topics)
topic_print = model.print_topics(num_words=words_per_topic)
topics_list.append(topic_print)
df_coherence_table = pd.DataFrame(columns=["topic_id", "words", "num_topics"])
idx = 0
for i in range(len(topics_list)):
for j in range(len(topics_list[i])):
df_coherence_table.loc[idx] = ""
df_coherence_table["topic_id"].loc[idx] = j + 1
df_coherence_table["words"].loc[idx] = ", ".join(
re.findall(r'"(\w+)"', topics_list[i][j][1])
)
df_coherence_table["num_topics"].loc[idx] = num_topics_list[i]
idx += 1
coherence_table = pa.Table.from_pandas(df_coherence_table, preserve_index=False)
return coherence_table
compute_coherence(self, model, corpus_model, id2word)
¶Source code in language_processing/modules/lda.py
def compute_coherence(self, model, corpus_model, id2word: Mapping[str, int]):
from gensim.models import CoherenceModel
coherencemodel = CoherenceModel(
model=model,
texts=corpus_model,
dictionary=id2word,
coherence="c_v",
processes=1,
)
coherence_value = coherencemodel.get_coherence()
return coherence_value
create_inputs_schema(self)
¶Return the schema for this types' inputs.
Source code in language_processing/modules/lda.py
def create_inputs_schema(
self,
) -> ValueSetSchema:
inputs: Dict[str, Dict[str, Any]] = {
"tokens_array": {"type": "array", "doc": "The text corpus."},
"num_topics_min": {
"type": "integer",
"doc": "The minimal number of topics.",
"default": 7,
},
"num_topics_max": {
"type": "integer",
"doc": "The max number of topics.",
"optional": True,
},
"compute_coherence": {
"type": "boolean",
"doc": "Whether to compute the coherence score for each model.",
"default": False,
},
"words_per_topic": {
"type": "integer",
"doc": "How many words per topic to put in the result model.",
"default": 10,
},
}
return inputs
create_model(self, corpus, num_topics, id2word)
¶Source code in language_processing/modules/lda.py
def create_model(self, corpus, num_topics: int, id2word: Mapping[str, int]):
from gensim.models import LdaModel
model = LdaModel(
corpus, id2word=id2word, num_topics=num_topics, eval_every=None
)
return model
create_outputs_schema(self)
¶Return the schema for this types' outputs.
Source code in language_processing/modules/lda.py
def create_outputs_schema(
self,
) -> ValueSetSchema:
outputs = {
"topic_models": {
"type": "dict",
"doc": "A dictionary with one coherence model table for each number of topics.",
},
"coherence_table": {
"type": "table",
"doc": "Coherence details.",
"optional": True,
},
"coherence_map": {
"type": "dict",
"doc": "A map with the coherence value for every number of topics.",
},
}
return outputs
process(self, inputs, outputs)
¶Source code in language_processing/modules/lda.py
def process(self, inputs: ValueMap, outputs: ValueMap) -> None:
from gensim import corpora
logging.getLogger("gensim").setLevel(logging.ERROR)
tokens_array: KiaraArray = inputs.get_value_data("tokens_array")
tokens = tokens_array.arrow_array.to_pylist()
words_per_topic = inputs.get_value_data("words_per_topic")
num_topics_min = inputs.get_value_data("num_topics_min")
num_topics_max = inputs.get_value_data("num_topics_max")
if num_topics_max is None:
num_topics_max = num_topics_min
compute_coherence = inputs.get_value_data("compute_coherence")
id2word = corpora.Dictionary(tokens)
corpus = [id2word.doc2bow(text) for text in tokens]
# model = gensim.models.ldamulticore.LdaMulticore(
# corpus, id2word=id2word, num_topics=num_topics, eval_every=None
# )
models = {}
model_tables = {}
coherence = {}
# multi_threaded = False
# if not multi_threaded:
for nt in range(num_topics_min, num_topics_max + 1):
model = self.create_model(corpus=corpus, num_topics=nt, id2word=id2word)
models[nt] = model
topic_print_model = model.print_topics(num_words=words_per_topic)
# dbg(topic_print_model)
# df = pd.DataFrame(topic_print_model, columns=["topic_id", "words"])
# TODO: create table directly
# result_table = Table.from_pandas(df)
model_tables[nt] = topic_print_model
if compute_coherence:
coherence_result = self.compute_coherence(
model=model, corpus_model=tokens, id2word=id2word
)
coherence[nt] = coherence_result
# else:
# def create_model(num_topics):
# model = self.create_model(corpus=corpus, num_topics=num_topics, id2word=id2word)
# topic_print_model = model.print_topics(num_words=30)
# df = pd.DataFrame(topic_print_model, columns=["topic_id", "words"])
# # TODO: create table directly
# result_table = Table.from_pandas(df)
# coherence_result = None
# if compute_coherence:
# coherence_result = self.compute_coherence(model=model, corpus_model=tokens, id2word=id2word)
# return (num_topics, model, result_table, coherence_result)
#
# executor = ThreadPoolExecutor()
# results: typing.Any = executor.map(create_model, range(num_topics_min, num_topics_max+1))
# executor.shutdown(wait=True)
# for r in results:
# models[r[0]] = r[1]
# model_tables[r[0]] = r[2]
# if compute_coherence:
# coherence[r[0]] = r[3]
# df_coherence = pd.DataFrame(coherence.keys(), columns=["Number of topics"])
# df_coherence["Coherence"] = coherence.values()
if compute_coherence:
coherence_table = self.assemble_coherence(
models_dict=models, words_per_topic=words_per_topic
)
else:
coherence_table = None
coherence_map = {k: v.item() for k, v in coherence.items()}
outputs.set_values(
topic_models=model_tables,
coherence_table=coherence_table,
coherence_map=coherence_map,
)
lemmatize
¶
tokens
¶
log
¶Classes¶
AssembleStopwordsModule (KiaraModule)
¶Create a list of stopwords from one or multiple sources.
This will download nltk stopwords if necessary, and merge all input lists into a single, sorted list without duplicates.
Source code in language_processing/modules/tokens.py
class AssembleStopwordsModule(KiaraModule):
"""Create a list of stopwords from one or multiple sources.
This will download nltk stopwords if necessary, and merge all input lists into a single, sorted list without duplicates.
"""
_module_type_name = "create.stopwords_list"
def create_inputs_schema(
self,
) -> ValueSetSchema:
return {
"languages": {
"type": "list",
"doc": "A list of languages, will be used to retrieve language-specific stopword from nltk.",
"optional": True,
},
"stopword_lists": {
"type": "list",
"doc": "A list of lists of stopwords.",
"optional": True,
},
}
def create_outputs_schema(
self,
) -> ValueSetSchema:
return {
"stopwords_list": {
"type": "list",
"doc": "A sorted list of unique stopwords.",
}
}
def process(self, inputs: ValueMap, outputs: ValueMap):
stopwords = set()
_languages = inputs.get_value_obj("languages")
if _languages.is_set:
all_stopwords = get_stopwords()
languages: ListModel = _languages.data
for language in languages.list_data:
if language not in all_stopwords.fileids():
raise KiaraProcessingException(
f"Invalid language: {language}. Available: {', '.join(all_stopwords.fileids())}."
)
stopwords.update(get_stopwords().words(language))
_stopword_lists = inputs.get_value_obj("stopword_lists")
if _stopword_lists.is_set:
stopword_lists: ListModel = _stopword_lists.data
for stopword_list in stopword_lists.list_data:
if isinstance(stopword_list, str):
stopwords.add(stopword_list)
else:
stopwords.update(stopword_list)
outputs.set_value("stopwords_list", sorted(stopwords))
create_inputs_schema(self)
¶Return the schema for this types' inputs.
Source code in language_processing/modules/tokens.py
def create_inputs_schema(
self,
) -> ValueSetSchema:
return {
"languages": {
"type": "list",
"doc": "A list of languages, will be used to retrieve language-specific stopword from nltk.",
"optional": True,
},
"stopword_lists": {
"type": "list",
"doc": "A list of lists of stopwords.",
"optional": True,
},
}
create_outputs_schema(self)
¶Return the schema for this types' outputs.
Source code in language_processing/modules/tokens.py
def create_outputs_schema(
self,
) -> ValueSetSchema:
return {
"stopwords_list": {
"type": "list",
"doc": "A sorted list of unique stopwords.",
}
}
process(self, inputs, outputs)
¶Source code in language_processing/modules/tokens.py
def process(self, inputs: ValueMap, outputs: ValueMap):
stopwords = set()
_languages = inputs.get_value_obj("languages")
if _languages.is_set:
all_stopwords = get_stopwords()
languages: ListModel = _languages.data
for language in languages.list_data:
if language not in all_stopwords.fileids():
raise KiaraProcessingException(
f"Invalid language: {language}. Available: {', '.join(all_stopwords.fileids())}."
)
stopwords.update(get_stopwords().words(language))
_stopword_lists = inputs.get_value_obj("stopword_lists")
if _stopword_lists.is_set:
stopword_lists: ListModel = _stopword_lists.data
for stopword_list in stopword_lists.list_data:
if isinstance(stopword_list, str):
stopwords.add(stopword_list)
else:
stopwords.update(stopword_list)
outputs.set_value("stopwords_list", sorted(stopwords))
PreprocessModule (KiaraModule)
¶Preprocess lists of tokens, incl. lowercasing, remove special characers, etc.
Lowercasing: Lowercase the words. This operation is a double-edged sword. It can be effective at yielding potentially better results in the case of relatively small datasets or datatsets with a high percentage of OCR mistakes. For instance, if lowercasing is not performed, the algorithm will treat USA, Usa, usa, UsA, uSA, etc. as distinct tokens, even though they may all refer to the same entity. On the other hand, if the dataset does not contain such OCR mistakes, then it may become difficult to distinguish between homonyms and make interpreting the topics much harder.
Removing stopwords and words with less than three characters: Remove low information words. These are typically words such as articles, pronouns, prepositions, conjunctions, etc. which are not semantically salient. There are numerous stopword lists available for many, though not all, languages which can be easily adapted to the individual researcher's needs. Removing words with less than three characters may additionally remove many OCR mistakes. Both these operations have the dual advantage of yielding more reliable results while reducing the size of the dataset, thus in turn reducing the required processing power. This step can therefore hardly be considered optional in TM.
Noise removal: Remove elements such as punctuation marks, special characters, numbers, html formatting, etc. This operation is again concerned with removing elements that may not be relevant to the text analysis and in fact interfere with it. Depending on the dataset and research question, this operation can become essential.
Source code in language_processing/modules/tokens.py
class PreprocessModule(KiaraModule):
"""Preprocess lists of tokens, incl. lowercasing, remove special characers, etc.
Lowercasing: Lowercase the words. This operation is a double-edged sword. It can be effective at yielding potentially better results in the case of relatively small datasets or datatsets with a high percentage of OCR mistakes. For instance, if lowercasing is not performed, the algorithm will treat USA, Usa, usa, UsA, uSA, etc. as distinct tokens, even though they may all refer to the same entity. On the other hand, if the dataset does not contain such OCR mistakes, then it may become difficult to distinguish between homonyms and make interpreting the topics much harder.
Removing stopwords and words with less than three characters: Remove low information words. These are typically words such as articles, pronouns, prepositions, conjunctions, etc. which are not semantically salient. There are numerous stopword lists available for many, though not all, languages which can be easily adapted to the individual researcher's needs. Removing words with less than three characters may additionally remove many OCR mistakes. Both these operations have the dual advantage of yielding more reliable results while reducing the size of the dataset, thus in turn reducing the required processing power. This step can therefore hardly be considered optional in TM.
Noise removal: Remove elements such as punctuation marks, special characters, numbers, html formatting, etc. This operation is again concerned with removing elements that may not be relevant to the text analysis and in fact interfere with it. Depending on the dataset and research question, this operation can become essential.
"""
_module_type_name = "preprocess.tokens_array"
KIARA_METADATA = {
"tags": ["tokens", "preprocess"],
}
def create_inputs_schema(
self,
) -> ValueSetSchema:
return {
"tokens_array": {
"type": "array",
"doc": "The tokens array to pre-process.",
},
"to_lowercase": {
"type": "boolean",
"doc": "Apply lowercasing to the text.",
"default": False,
},
"remove_alphanumeric": {
"type": "boolean",
"doc": "Remove all tokens that include numbers (e.g. ex1ample).",
"default": False,
},
"remove_non_alpha": {
"type": "boolean",
"doc": "Remove all tokens that include punctuation and numbers (e.g. ex1a.mple).",
"default": False,
},
"remove_all_numeric": {
"type": "boolean",
"doc": "Remove all tokens that contain numbers only (e.g. 876).",
"default": False,
},
"remove_short_tokens": {
"type": "integer",
"doc": "Remove tokens shorter than a certain length. If value is <= 0, no filtering will be done.",
"default": False,
},
"remove_stopwords": {
"type": "list",
"doc": "Remove stopwords.",
"optional": True,
},
}
def create_outputs_schema(
self,
) -> ValueSetSchema:
return {
"tokens_array": {
"type": "array",
"doc": "The pre-processed content, as an array of lists of strings.",
}
}
def process(self, inputs: ValueMap, outputs: ValueMap):
import polars as pl
import pyarrow as pa
tokens_array: KiaraArray = inputs.get_value_data("tokens_array")
lowercase: bool = inputs.get_value_data("to_lowercase")
remove_alphanumeric: bool = inputs.get_value_data("remove_alphanumeric")
remove_non_alpha: bool = inputs.get_value_data("remove_non_alpha")
remove_all_numeric: bool = inputs.get_value_data("remove_all_numeric")
remove_short_tokens: int = inputs.get_value_data("remove_short_tokens")
if remove_short_tokens is None:
remove_short_tokens = -1
_remove_stopwords = inputs.get_value_obj("remove_stopwords")
if _remove_stopwords.is_set:
stopword_list: Optional[Iterable[str]] = _remove_stopwords.data.list_data
else:
stopword_list = None
# it's better to have one method every token goes through, then do every test seperately for the token list
# because that way each token only needs to be touched once (which is more effective)
def check_token(token: str) -> Optional[str]:
# remove short tokens first, since we can save ourselves all the other checks (which are more expensive)
if remove_short_tokens > 0:
if len(token) <= remove_short_tokens:
return None
_token: str = token
if lowercase:
_token = _token.lower()
if remove_non_alpha:
match = _token if _token.isalpha() else None
if match is None:
return None
# if remove_non_alpha was set, we don't need to worry about tokens that include numbers, since they are already filtered out
if remove_alphanumeric and not remove_non_alpha:
match = _token if _token.isalnum() else None
if match is None:
return None
# all-number tokens are already filtered out if the remove_non_alpha methods above ran
if remove_all_numeric and not remove_non_alpha:
match = None if _token.isdigit() else _token
if match is None:
return None
if stopword_list and _token and _token.lower() in stopword_list:
return None
return _token
series = pl.Series(name="tokens", values=tokens_array.arrow_array)
result = series.apply(
lambda token_list: [
x for x in (check_token(token) for token in token_list) if x is not None
]
)
result_array = result.to_arrow()
# TODO: remove this cast once the array data type can handle non-chunked arrays
chunked = pa.chunked_array(result_array)
outputs.set_values(tokens_array=chunked)
KIARA_METADATA
¶create_inputs_schema(self)
¶Return the schema for this types' inputs.
Source code in language_processing/modules/tokens.py
def create_inputs_schema(
self,
) -> ValueSetSchema:
return {
"tokens_array": {
"type": "array",
"doc": "The tokens array to pre-process.",
},
"to_lowercase": {
"type": "boolean",
"doc": "Apply lowercasing to the text.",
"default": False,
},
"remove_alphanumeric": {
"type": "boolean",
"doc": "Remove all tokens that include numbers (e.g. ex1ample).",
"default": False,
},
"remove_non_alpha": {
"type": "boolean",
"doc": "Remove all tokens that include punctuation and numbers (e.g. ex1a.mple).",
"default": False,
},
"remove_all_numeric": {
"type": "boolean",
"doc": "Remove all tokens that contain numbers only (e.g. 876).",
"default": False,
},
"remove_short_tokens": {
"type": "integer",
"doc": "Remove tokens shorter than a certain length. If value is <= 0, no filtering will be done.",
"default": False,
},
"remove_stopwords": {
"type": "list",
"doc": "Remove stopwords.",
"optional": True,
},
}
create_outputs_schema(self)
¶Return the schema for this types' outputs.
Source code in language_processing/modules/tokens.py
def create_outputs_schema(
self,
) -> ValueSetSchema:
return {
"tokens_array": {
"type": "array",
"doc": "The pre-processed content, as an array of lists of strings.",
}
}
process(self, inputs, outputs)
¶Source code in language_processing/modules/tokens.py
def process(self, inputs: ValueMap, outputs: ValueMap):
import polars as pl
import pyarrow as pa
tokens_array: KiaraArray = inputs.get_value_data("tokens_array")
lowercase: bool = inputs.get_value_data("to_lowercase")
remove_alphanumeric: bool = inputs.get_value_data("remove_alphanumeric")
remove_non_alpha: bool = inputs.get_value_data("remove_non_alpha")
remove_all_numeric: bool = inputs.get_value_data("remove_all_numeric")
remove_short_tokens: int = inputs.get_value_data("remove_short_tokens")
if remove_short_tokens is None:
remove_short_tokens = -1
_remove_stopwords = inputs.get_value_obj("remove_stopwords")
if _remove_stopwords.is_set:
stopword_list: Optional[Iterable[str]] = _remove_stopwords.data.list_data
else:
stopword_list = None
# it's better to have one method every token goes through, then do every test seperately for the token list
# because that way each token only needs to be touched once (which is more effective)
def check_token(token: str) -> Optional[str]:
# remove short tokens first, since we can save ourselves all the other checks (which are more expensive)
if remove_short_tokens > 0:
if len(token) <= remove_short_tokens:
return None
_token: str = token
if lowercase:
_token = _token.lower()
if remove_non_alpha:
match = _token if _token.isalpha() else None
if match is None:
return None
# if remove_non_alpha was set, we don't need to worry about tokens that include numbers, since they are already filtered out
if remove_alphanumeric and not remove_non_alpha:
match = _token if _token.isalnum() else None
if match is None:
return None
# all-number tokens are already filtered out if the remove_non_alpha methods above ran
if remove_all_numeric and not remove_non_alpha:
match = None if _token.isdigit() else _token
if match is None:
return None
if stopword_list and _token and _token.lower() in stopword_list:
return None
return _token
series = pl.Series(name="tokens", values=tokens_array.arrow_array)
result = series.apply(
lambda token_list: [
x for x in (check_token(token) for token in token_list) if x is not None
]
)
result_array = result.to_arrow()
# TODO: remove this cast once the array data type can handle non-chunked arrays
chunked = pa.chunked_array(result_array)
outputs.set_values(tokens_array=chunked)
RemoveStopwordsModule (KiaraModule)
¶Remove stopwords from an array of token-lists.
Source code in language_processing/modules/tokens.py
class RemoveStopwordsModule(KiaraModule):
"""Remove stopwords from an array of token-lists."""
_module_type_name = "remove_stopwords.from.tokens_array"
def create_inputs_schema(
self,
) -> ValueSetSchema:
# TODO: do something smart and check whether languages are already downloaded, if so, display selection in doc
inputs: Dict[str, Dict[str, Any]] = {
"tokens_array": {
"type": "array",
"doc": "An array of string lists (a list of tokens).",
},
"languages": {
"type": "list",
# "doc": f"A list of language names to use default stopword lists for. Available: {', '.join(get_stopwords().fileids())}.",
"doc": "A list of language names to use default stopword lists for.",
"optional": True,
},
"additional_stopwords": {
"type": "list",
"doc": "A list of additional, custom stopwords.",
"optional": True,
},
}
return inputs
def create_outputs_schema(
self,
) -> ValueSetSchema:
outputs = {
"tokens_array": {
"type": "array",
"doc": "An array of string lists, with the stopwords removed.",
}
}
return outputs
def process(self, inputs: ValueMap, outputs: ValueMap) -> None:
import pyarrow as pa
custom_stopwords = inputs.get_value_data("additional_stopwords")
if inputs.get_value_obj("languages").is_set:
_languages: ListModel = inputs.get_value_data("languages")
languages = _languages.list_data
else:
languages = []
stopwords = set()
if languages:
for language in languages:
if language not in get_stopwords().fileids():
raise KiaraProcessingException(
f"Invalid language: {language}. Available: {', '.join(get_stopwords().fileids())}."
)
stopwords.update(get_stopwords().words(language))
if custom_stopwords:
stopwords.update(custom_stopwords)
orig_array = inputs.get_value_obj("tokens_array") # type: ignore
if not stopwords:
outputs.set_value("tokens_array", orig_array)
return
# if hasattr(orig_array, "to_pylist"):
# token_lists = orig_array.to_pylist()
tokens_array = orig_array.data.arrow_array
# TODO: use vaex for this
result = []
for token_list in tokens_array:
cleaned_list = [x for x in token_list.as_py() if x.lower() not in stopwords]
result.append(cleaned_list)
outputs.set_value("tokens_array", pa.chunked_array(pa.array(result)))
create_inputs_schema(self)
¶Return the schema for this types' inputs.
Source code in language_processing/modules/tokens.py
def create_inputs_schema(
self,
) -> ValueSetSchema:
# TODO: do something smart and check whether languages are already downloaded, if so, display selection in doc
inputs: Dict[str, Dict[str, Any]] = {
"tokens_array": {
"type": "array",
"doc": "An array of string lists (a list of tokens).",
},
"languages": {
"type": "list",
# "doc": f"A list of language names to use default stopword lists for. Available: {', '.join(get_stopwords().fileids())}.",
"doc": "A list of language names to use default stopword lists for.",
"optional": True,
},
"additional_stopwords": {
"type": "list",
"doc": "A list of additional, custom stopwords.",
"optional": True,
},
}
return inputs
create_outputs_schema(self)
¶Return the schema for this types' outputs.
Source code in language_processing/modules/tokens.py
def create_outputs_schema(
self,
) -> ValueSetSchema:
outputs = {
"tokens_array": {
"type": "array",
"doc": "An array of string lists, with the stopwords removed.",
}
}
return outputs
process(self, inputs, outputs)
¶Source code in language_processing/modules/tokens.py
def process(self, inputs: ValueMap, outputs: ValueMap) -> None:
import pyarrow as pa
custom_stopwords = inputs.get_value_data("additional_stopwords")
if inputs.get_value_obj("languages").is_set:
_languages: ListModel = inputs.get_value_data("languages")
languages = _languages.list_data
else:
languages = []
stopwords = set()
if languages:
for language in languages:
if language not in get_stopwords().fileids():
raise KiaraProcessingException(
f"Invalid language: {language}. Available: {', '.join(get_stopwords().fileids())}."
)
stopwords.update(get_stopwords().words(language))
if custom_stopwords:
stopwords.update(custom_stopwords)
orig_array = inputs.get_value_obj("tokens_array") # type: ignore
if not stopwords:
outputs.set_value("tokens_array", orig_array)
return
# if hasattr(orig_array, "to_pylist"):
# token_lists = orig_array.to_pylist()
tokens_array = orig_array.data.arrow_array
# TODO: use vaex for this
result = []
for token_list in tokens_array:
cleaned_list = [x for x in token_list.as_py() if x.lower() not in stopwords]
result.append(cleaned_list)
outputs.set_value("tokens_array", pa.chunked_array(pa.array(result)))
TokenizeTextArrayeModule (KiaraModule)
¶Split sentences into words or words into characters. In other words, this operation establishes the word boundaries (i.e., tokens) a very helpful way of finding patterns. It is also the typical step prior to stemming and lemmatization
Source code in language_processing/modules/tokens.py
class TokenizeTextArrayeModule(KiaraModule):
"""Split sentences into words or words into characters.
In other words, this operation establishes the word boundaries (i.e., tokens) a very helpful way of finding patterns. It is also the typical step prior to stemming and lemmatization
"""
_module_type_name = "tokenize.texts_array"
KIARA_METADATA = {
"tags": ["tokenize", "tokens"],
}
def create_inputs_schema(
self,
) -> ValueSetSchema:
return {
"texts_array": {
"type": "array",
"doc": "An array of text items to be tokenized.",
},
"tokenize_by_word": {
"type": "boolean",
"doc": "Whether to tokenize by word (default), or character.",
"default": True,
},
}
def create_outputs_schema(
self,
) -> ValueSetSchema:
return {
"tokens_array": {
"type": "array",
"doc": "The tokenized content, as an array of lists of strings.",
}
}
def process(self, inputs: ValueMap, outputs: ValueMap):
pass
import nltk
import polars as pl
import pyarrow as pa
array: KiaraArray = inputs.get_value_data("texts_array")
# tokenize_by_word: bool = inputs.get_value_data("tokenize_by_word")
column: pa.ChunkedArray = array.arrow_array
# warnings.filterwarnings("ignore", category=np.VisibleDeprecationWarning)
def word_tokenize(word):
result = nltk.word_tokenize(word)
return result
series = pl.Series(name="tokens", values=column)
result = series.apply(word_tokenize)
result_array = result.to_arrow()
# TODO: remove this cast once the array data type can handle non-chunked arrays
chunked = pa.chunked_array(result_array)
outputs.set_values(tokens_array=chunked)
KIARA_METADATA
¶create_inputs_schema(self)
¶Return the schema for this types' inputs.
Source code in language_processing/modules/tokens.py
def create_inputs_schema(
self,
) -> ValueSetSchema:
return {
"texts_array": {
"type": "array",
"doc": "An array of text items to be tokenized.",
},
"tokenize_by_word": {
"type": "boolean",
"doc": "Whether to tokenize by word (default), or character.",
"default": True,
},
}
create_outputs_schema(self)
¶Return the schema for this types' outputs.
Source code in language_processing/modules/tokens.py
def create_outputs_schema(
self,
) -> ValueSetSchema:
return {
"tokens_array": {
"type": "array",
"doc": "The tokenized content, as an array of lists of strings.",
}
}
process(self, inputs, outputs)
¶Source code in language_processing/modules/tokens.py
def process(self, inputs: ValueMap, outputs: ValueMap):
pass
import nltk
import polars as pl
import pyarrow as pa
array: KiaraArray = inputs.get_value_data("texts_array")
# tokenize_by_word: bool = inputs.get_value_data("tokenize_by_word")
column: pa.ChunkedArray = array.arrow_array
# warnings.filterwarnings("ignore", category=np.VisibleDeprecationWarning)
def word_tokenize(word):
result = nltk.word_tokenize(word)
return result
series = pl.Series(name="tokens", values=column)
result = series.apply(word_tokenize)
result_array = result.to_arrow()
# TODO: remove this cast once the array data type can handle non-chunked arrays
chunked = pa.chunked_array(result_array)
outputs.set_values(tokens_array=chunked)
TokenizeTextConfig (KiaraModuleConfig)
pydantic-model
¶Source code in language_processing/modules/tokens.py
class TokenizeTextConfig(KiaraModuleConfig):
filter_non_alpha: bool = Field(
description="Whether to filter out non alpha tokens.", default=True
)
min_token_length: int = Field(description="The minimum token length.", default=3)
to_lowercase: bool = Field(
description="Whether to lowercase the tokens.", default=True
)
TokenizeTextModule (KiaraModule)
¶Tokenize a string.
Source code in language_processing/modules/tokens.py
class TokenizeTextModule(KiaraModule):
"""Tokenize a string."""
_config_cls = TokenizeTextConfig
_module_type_name = "tokenize.string"
def create_inputs_schema(
self,
) -> ValueSetSchema:
inputs = {"text": {"type": "string", "doc": "The text to tokenize."}}
return inputs
def create_outputs_schema(
self,
) -> ValueSetSchema:
outputs = {
"token_list": {
"type": "list",
"doc": "The tokenized version of the input text.",
}
}
return outputs
def process(self, inputs: ValueMap, outputs: ValueMap) -> None:
import nltk
# TODO: module-independent caching?
# language = inputs.get_value_data("language")
#
text = inputs.get_value_data("text")
tokenized = nltk.word_tokenize(text)
result = tokenized
if self.get_config_value("min_token_length") > 0:
result = (
x
for x in tokenized
if len(x) >= self.get_config_value("min_token_length")
)
if self.get_config_value("filter_non_alpha"):
result = (x for x in result if x.isalpha())
if self.get_config_value("to_lowercase"):
result = (x.lower() for x in result)
outputs.set_value("token_list", list(result))
_config_cls (KiaraModuleConfig)
private
pydantic-model
¶Source code in language_processing/modules/tokens.py
class TokenizeTextConfig(KiaraModuleConfig):
filter_non_alpha: bool = Field(
description="Whether to filter out non alpha tokens.", default=True
)
min_token_length: int = Field(description="The minimum token length.", default=3)
to_lowercase: bool = Field(
description="Whether to lowercase the tokens.", default=True
)
create_inputs_schema(self)
¶Return the schema for this types' inputs.
Source code in language_processing/modules/tokens.py
def create_inputs_schema(
self,
) -> ValueSetSchema:
inputs = {"text": {"type": "string", "doc": "The text to tokenize."}}
return inputs
create_outputs_schema(self)
¶Return the schema for this types' outputs.
Source code in language_processing/modules/tokens.py
def create_outputs_schema(
self,
) -> ValueSetSchema:
outputs = {
"token_list": {
"type": "list",
"doc": "The tokenized version of the input text.",
}
}
return outputs
process(self, inputs, outputs)
¶Source code in language_processing/modules/tokens.py
def process(self, inputs: ValueMap, outputs: ValueMap) -> None:
import nltk
# TODO: module-independent caching?
# language = inputs.get_value_data("language")
#
text = inputs.get_value_data("text")
tokenized = nltk.word_tokenize(text)
result = tokenized
if self.get_config_value("min_token_length") > 0:
result = (
x
for x in tokenized
if len(x) >= self.get_config_value("min_token_length")
)
if self.get_config_value("filter_non_alpha"):
result = (x for x in result if x.isalpha())
if self.get_config_value("to_lowercase"):
result = (x.lower() for x in result)
outputs.set_value("token_list", list(result))
get_stopwords()
¶Source code in language_processing/modules/tokens.py
def get_stopwords():
# TODO: make that smarter
import nltk
output = io.StringIO()
nltk.download("punkt", print_error_to=output)
nltk.download("stopwords", print_error_to=output)
log.debug("external.message", source="nltk", msg=output.getvalue())
from nltk.corpus import stopwords
return stopwords
pipelines
special
¶
Default (empty) module that is used as a base path for pipelines contained in this package.