modules
Modules¶
        lda
¶
    Classes¶
        
LDAModule            (KiaraModule)
        
¶
    Perform Latent Dirichlet Allocation on a tokenized corpus.
This module computes models for a range of number of topics provided by the user.
Source code in language_processing/modules/lda.py
          class LDAModule(KiaraModule):
    """Perform Latent Dirichlet Allocation on a tokenized corpus.
    This module computes models for a range of number of topics provided by the user.
    """
    _module_type_name = "generate.LDA.for.tokens_array"
    KIARA_METADATA = {
        "tags": ["LDA", "tokens"],
    }
    def create_inputs_schema(
        self,
    ) -> ValueSetSchema:
        inputs: Dict[str, Dict[str, Any]] = {
            "tokens_array": {"type": "array", "doc": "The text corpus."},
            "num_topics_min": {
                "type": "integer",
                "doc": "The minimal number of topics.",
                "default": 7,
            },
            "num_topics_max": {
                "type": "integer",
                "doc": "The max number of topics.",
                "optional": True,
            },
            "compute_coherence": {
                "type": "boolean",
                "doc": "Whether to compute the coherence score for each model.",
                "default": False,
            },
            "words_per_topic": {
                "type": "integer",
                "doc": "How many words per topic to put in the result model.",
                "default": 10,
            },
        }
        return inputs
    def create_outputs_schema(
        self,
    ) -> ValueSetSchema:
        outputs = {
            "topic_models": {
                "type": "dict",
                "doc": "A dictionary with one coherence model table for each number of topics.",
            },
            "coherence_table": {
                "type": "table",
                "doc": "Coherence details.",
                "optional": True,
            },
            "coherence_map": {
                "type": "dict",
                "doc": "A map with the coherence value for every number of topics.",
            },
        }
        return outputs
    def create_model(self, corpus, num_topics: int, id2word: Mapping[str, int]):
        from gensim.models import LdaModel
        model = LdaModel(
            corpus, id2word=id2word, num_topics=num_topics, eval_every=None
        )
        return model
    def compute_coherence(self, model, corpus_model, id2word: Mapping[str, int]):
        from gensim.models import CoherenceModel
        coherencemodel = CoherenceModel(
            model=model,
            texts=corpus_model,
            dictionary=id2word,
            coherence="c_v",
            processes=1,
        )
        coherence_value = coherencemodel.get_coherence()
        return coherence_value
    def assemble_coherence(self, models_dict: Mapping[int, Any], words_per_topic: int):
        import pandas as pd
        import pyarrow as pa
        # Create list with topics and topic words for each number of topics
        num_topics_list = []
        topics_list = []
        for (
            num_topics,
            model,
        ) in models_dict.items():
            num_topics_list.append(num_topics)
            topic_print = model.print_topics(num_words=words_per_topic)
            topics_list.append(topic_print)
        df_coherence_table = pd.DataFrame(columns=["topic_id", "words", "num_topics"])
        idx = 0
        for i in range(len(topics_list)):
            for j in range(len(topics_list[i])):
                df_coherence_table.loc[idx] = ""
                df_coherence_table["topic_id"].loc[idx] = j + 1
                df_coherence_table["words"].loc[idx] = ", ".join(
                    re.findall(r'"(\w+)"', topics_list[i][j][1])
                )
                df_coherence_table["num_topics"].loc[idx] = num_topics_list[i]
                idx += 1
        coherence_table = pa.Table.from_pandas(df_coherence_table, preserve_index=False)
        return coherence_table
    def process(self, inputs: ValueMap, outputs: ValueMap) -> None:
        from gensim import corpora
        logging.getLogger("gensim").setLevel(logging.ERROR)
        tokens_array: KiaraArray = inputs.get_value_data("tokens_array")
        tokens = tokens_array.arrow_array.to_pylist()
        words_per_topic = inputs.get_value_data("words_per_topic")
        num_topics_min = inputs.get_value_data("num_topics_min")
        num_topics_max = inputs.get_value_data("num_topics_max")
        if num_topics_max is None:
            num_topics_max = num_topics_min
        compute_coherence = inputs.get_value_data("compute_coherence")
        id2word = corpora.Dictionary(tokens)
        corpus = [id2word.doc2bow(text) for text in tokens]
        # model = gensim.models.ldamulticore.LdaMulticore(
        #     corpus, id2word=id2word, num_topics=num_topics, eval_every=None
        # )
        models = {}
        model_tables = {}
        coherence = {}
        # multi_threaded = False
        # if not multi_threaded:
        for nt in range(num_topics_min, num_topics_max + 1):
            model = self.create_model(corpus=corpus, num_topics=nt, id2word=id2word)
            models[nt] = model
            topic_print_model = model.print_topics(num_words=words_per_topic)
            # dbg(topic_print_model)
            # df = pd.DataFrame(topic_print_model, columns=["topic_id", "words"])
            # TODO: create table directly
            # result_table = Table.from_pandas(df)
            model_tables[nt] = topic_print_model
            if compute_coherence:
                coherence_result = self.compute_coherence(
                    model=model, corpus_model=tokens, id2word=id2word
                )
                coherence[nt] = coherence_result
        # else:
        #     def create_model(num_topics):
        #         model = self.create_model(corpus=corpus, num_topics=num_topics, id2word=id2word)
        #         topic_print_model = model.print_topics(num_words=30)
        #         df = pd.DataFrame(topic_print_model, columns=["topic_id", "words"])
        #         # TODO: create table directly
        #         result_table = Table.from_pandas(df)
        #         coherence_result = None
        #         if compute_coherence:
        #             coherence_result = self.compute_coherence(model=model, corpus_model=tokens, id2word=id2word)
        #         return (num_topics, model, result_table, coherence_result)
        #
        #     executor = ThreadPoolExecutor()
        #     results: typing.Any = executor.map(create_model, range(num_topics_min, num_topics_max+1))
        #     executor.shutdown(wait=True)
        #     for r in results:
        #         models[r[0]] = r[1]
        #         model_tables[r[0]] = r[2]
        #         if compute_coherence:
        #             coherence[r[0]] = r[3]
        # df_coherence = pd.DataFrame(coherence.keys(), columns=["Number of topics"])
        # df_coherence["Coherence"] = coherence.values()
        if compute_coherence:
            coherence_table = self.assemble_coherence(
                models_dict=models, words_per_topic=words_per_topic
            )
        else:
            coherence_table = None
        coherence_map = {k: v.item() for k, v in coherence.items()}
        outputs.set_values(
            topic_models=model_tables,
            coherence_table=coherence_table,
            coherence_map=coherence_map,
        )
KIARA_METADATA
¶Methods¶
assemble_coherence(self, models_dict, words_per_topic)
¶Source code in language_processing/modules/lda.py
          def assemble_coherence(self, models_dict: Mapping[int, Any], words_per_topic: int):
    import pandas as pd
    import pyarrow as pa
    # Create list with topics and topic words for each number of topics
    num_topics_list = []
    topics_list = []
    for (
        num_topics,
        model,
    ) in models_dict.items():
        num_topics_list.append(num_topics)
        topic_print = model.print_topics(num_words=words_per_topic)
        topics_list.append(topic_print)
    df_coherence_table = pd.DataFrame(columns=["topic_id", "words", "num_topics"])
    idx = 0
    for i in range(len(topics_list)):
        for j in range(len(topics_list[i])):
            df_coherence_table.loc[idx] = ""
            df_coherence_table["topic_id"].loc[idx] = j + 1
            df_coherence_table["words"].loc[idx] = ", ".join(
                re.findall(r'"(\w+)"', topics_list[i][j][1])
            )
            df_coherence_table["num_topics"].loc[idx] = num_topics_list[i]
            idx += 1
    coherence_table = pa.Table.from_pandas(df_coherence_table, preserve_index=False)
    return coherence_table
compute_coherence(self, model, corpus_model, id2word)
¶Source code in language_processing/modules/lda.py
          def compute_coherence(self, model, corpus_model, id2word: Mapping[str, int]):
    from gensim.models import CoherenceModel
    coherencemodel = CoherenceModel(
        model=model,
        texts=corpus_model,
        dictionary=id2word,
        coherence="c_v",
        processes=1,
    )
    coherence_value = coherencemodel.get_coherence()
    return coherence_value
create_inputs_schema(self)
¶Return the schema for this types' inputs.
Source code in language_processing/modules/lda.py
          def create_inputs_schema(
    self,
) -> ValueSetSchema:
    inputs: Dict[str, Dict[str, Any]] = {
        "tokens_array": {"type": "array", "doc": "The text corpus."},
        "num_topics_min": {
            "type": "integer",
            "doc": "The minimal number of topics.",
            "default": 7,
        },
        "num_topics_max": {
            "type": "integer",
            "doc": "The max number of topics.",
            "optional": True,
        },
        "compute_coherence": {
            "type": "boolean",
            "doc": "Whether to compute the coherence score for each model.",
            "default": False,
        },
        "words_per_topic": {
            "type": "integer",
            "doc": "How many words per topic to put in the result model.",
            "default": 10,
        },
    }
    return inputs
create_model(self, corpus, num_topics, id2word)
¶Source code in language_processing/modules/lda.py
          def create_model(self, corpus, num_topics: int, id2word: Mapping[str, int]):
    from gensim.models import LdaModel
    model = LdaModel(
        corpus, id2word=id2word, num_topics=num_topics, eval_every=None
    )
    return model
create_outputs_schema(self)
¶Return the schema for this types' outputs.
Source code in language_processing/modules/lda.py
          def create_outputs_schema(
    self,
) -> ValueSetSchema:
    outputs = {
        "topic_models": {
            "type": "dict",
            "doc": "A dictionary with one coherence model table for each number of topics.",
        },
        "coherence_table": {
            "type": "table",
            "doc": "Coherence details.",
            "optional": True,
        },
        "coherence_map": {
            "type": "dict",
            "doc": "A map with the coherence value for every number of topics.",
        },
    }
    return outputs
process(self, inputs, outputs)
¶Source code in language_processing/modules/lda.py
          def process(self, inputs: ValueMap, outputs: ValueMap) -> None:
    from gensim import corpora
    logging.getLogger("gensim").setLevel(logging.ERROR)
    tokens_array: KiaraArray = inputs.get_value_data("tokens_array")
    tokens = tokens_array.arrow_array.to_pylist()
    words_per_topic = inputs.get_value_data("words_per_topic")
    num_topics_min = inputs.get_value_data("num_topics_min")
    num_topics_max = inputs.get_value_data("num_topics_max")
    if num_topics_max is None:
        num_topics_max = num_topics_min
    compute_coherence = inputs.get_value_data("compute_coherence")
    id2word = corpora.Dictionary(tokens)
    corpus = [id2word.doc2bow(text) for text in tokens]
    # model = gensim.models.ldamulticore.LdaMulticore(
    #     corpus, id2word=id2word, num_topics=num_topics, eval_every=None
    # )
    models = {}
    model_tables = {}
    coherence = {}
    # multi_threaded = False
    # if not multi_threaded:
    for nt in range(num_topics_min, num_topics_max + 1):
        model = self.create_model(corpus=corpus, num_topics=nt, id2word=id2word)
        models[nt] = model
        topic_print_model = model.print_topics(num_words=words_per_topic)
        # dbg(topic_print_model)
        # df = pd.DataFrame(topic_print_model, columns=["topic_id", "words"])
        # TODO: create table directly
        # result_table = Table.from_pandas(df)
        model_tables[nt] = topic_print_model
        if compute_coherence:
            coherence_result = self.compute_coherence(
                model=model, corpus_model=tokens, id2word=id2word
            )
            coherence[nt] = coherence_result
    # else:
    #     def create_model(num_topics):
    #         model = self.create_model(corpus=corpus, num_topics=num_topics, id2word=id2word)
    #         topic_print_model = model.print_topics(num_words=30)
    #         df = pd.DataFrame(topic_print_model, columns=["topic_id", "words"])
    #         # TODO: create table directly
    #         result_table = Table.from_pandas(df)
    #         coherence_result = None
    #         if compute_coherence:
    #             coherence_result = self.compute_coherence(model=model, corpus_model=tokens, id2word=id2word)
    #         return (num_topics, model, result_table, coherence_result)
    #
    #     executor = ThreadPoolExecutor()
    #     results: typing.Any = executor.map(create_model, range(num_topics_min, num_topics_max+1))
    #     executor.shutdown(wait=True)
    #     for r in results:
    #         models[r[0]] = r[1]
    #         model_tables[r[0]] = r[2]
    #         if compute_coherence:
    #             coherence[r[0]] = r[3]
    # df_coherence = pd.DataFrame(coherence.keys(), columns=["Number of topics"])
    # df_coherence["Coherence"] = coherence.values()
    if compute_coherence:
        coherence_table = self.assemble_coherence(
            models_dict=models, words_per_topic=words_per_topic
        )
    else:
        coherence_table = None
    coherence_map = {k: v.item() for k, v in coherence.items()}
    outputs.set_values(
        topic_models=model_tables,
        coherence_table=coherence_table,
        coherence_map=coherence_map,
    )
        lemmatize
¶
    
        tokens
¶
    
log
¶
    Classes¶
        
AssembleStopwordsModule            (KiaraModule)
        
¶
    Create a list of stopwords from one or multiple sources.
This will download nltk stopwords if necessary, and merge all input lists into a single, sorted list without duplicates.
Source code in language_processing/modules/tokens.py
          class AssembleStopwordsModule(KiaraModule):
    """Create a list of stopwords from one or multiple sources.
    This will download nltk stopwords if necessary, and merge all input lists into a single, sorted list without duplicates.
    """
    _module_type_name = "create.stopwords_list"
    def create_inputs_schema(
        self,
    ) -> ValueSetSchema:
        return {
            "languages": {
                "type": "list",
                "doc": "A list of languages, will be used to retrieve language-specific stopword from nltk.",
                "optional": True,
            },
            "stopword_lists": {
                "type": "list",
                "doc": "A list of lists of stopwords.",
                "optional": True,
            },
        }
    def create_outputs_schema(
        self,
    ) -> ValueSetSchema:
        return {
            "stopwords_list": {
                "type": "list",
                "doc": "A sorted list of unique stopwords.",
            }
        }
    def process(self, inputs: ValueMap, outputs: ValueMap):
        stopwords = set()
        _languages = inputs.get_value_obj("languages")
        if _languages.is_set:
            all_stopwords = get_stopwords()
            languages: ListModel = _languages.data
            for language in languages.list_data:
                if language not in all_stopwords.fileids():
                    raise KiaraProcessingException(
                        f"Invalid language: {language}. Available: {', '.join(all_stopwords.fileids())}."
                    )
                stopwords.update(get_stopwords().words(language))
        _stopword_lists = inputs.get_value_obj("stopword_lists")
        if _stopword_lists.is_set:
            stopword_lists: ListModel = _stopword_lists.data
            for stopword_list in stopword_lists.list_data:
                if isinstance(stopword_list, str):
                    stopwords.add(stopword_list)
                else:
                    stopwords.update(stopword_list)
        outputs.set_value("stopwords_list", sorted(stopwords))
Methods¶
create_inputs_schema(self)
¶Return the schema for this types' inputs.
Source code in language_processing/modules/tokens.py
          def create_inputs_schema(
    self,
) -> ValueSetSchema:
    return {
        "languages": {
            "type": "list",
            "doc": "A list of languages, will be used to retrieve language-specific stopword from nltk.",
            "optional": True,
        },
        "stopword_lists": {
            "type": "list",
            "doc": "A list of lists of stopwords.",
            "optional": True,
        },
    }
create_outputs_schema(self)
¶Return the schema for this types' outputs.
Source code in language_processing/modules/tokens.py
          def create_outputs_schema(
    self,
) -> ValueSetSchema:
    return {
        "stopwords_list": {
            "type": "list",
            "doc": "A sorted list of unique stopwords.",
        }
    }
process(self, inputs, outputs)
¶Source code in language_processing/modules/tokens.py
          def process(self, inputs: ValueMap, outputs: ValueMap):
    stopwords = set()
    _languages = inputs.get_value_obj("languages")
    if _languages.is_set:
        all_stopwords = get_stopwords()
        languages: ListModel = _languages.data
        for language in languages.list_data:
            if language not in all_stopwords.fileids():
                raise KiaraProcessingException(
                    f"Invalid language: {language}. Available: {', '.join(all_stopwords.fileids())}."
                )
            stopwords.update(get_stopwords().words(language))
    _stopword_lists = inputs.get_value_obj("stopword_lists")
    if _stopword_lists.is_set:
        stopword_lists: ListModel = _stopword_lists.data
        for stopword_list in stopword_lists.list_data:
            if isinstance(stopword_list, str):
                stopwords.add(stopword_list)
            else:
                stopwords.update(stopword_list)
    outputs.set_value("stopwords_list", sorted(stopwords))
        
PreprocessModule            (KiaraModule)
        
¶
    Preprocess lists of tokens, incl. lowercasing, remove special characers, etc.
Lowercasing: Lowercase the words. This operation is a double-edged sword. It can be effective at yielding potentially better results in the case of relatively small datasets or datatsets with a high percentage of OCR mistakes. For instance, if lowercasing is not performed, the algorithm will treat USA, Usa, usa, UsA, uSA, etc. as distinct tokens, even though they may all refer to the same entity. On the other hand, if the dataset does not contain such OCR mistakes, then it may become difficult to distinguish between homonyms and make interpreting the topics much harder.
Removing stopwords and words with less than three characters: Remove low information words. These are typically words such as articles, pronouns, prepositions, conjunctions, etc. which are not semantically salient. There are numerous stopword lists available for many, though not all, languages which can be easily adapted to the individual researcher's needs. Removing words with less than three characters may additionally remove many OCR mistakes. Both these operations have the dual advantage of yielding more reliable results while reducing the size of the dataset, thus in turn reducing the required processing power. This step can therefore hardly be considered optional in TM.
Noise removal: Remove elements such as punctuation marks, special characters, numbers, html formatting, etc. This operation is again concerned with removing elements that may not be relevant to the text analysis and in fact interfere with it. Depending on the dataset and research question, this operation can become essential.
Source code in language_processing/modules/tokens.py
          class PreprocessModule(KiaraModule):
    """Preprocess lists of tokens, incl. lowercasing, remove special characers, etc.
    Lowercasing: Lowercase the words. This operation is a double-edged sword. It can be effective at yielding potentially better results in the case of relatively small datasets or datatsets with a high percentage of OCR mistakes. For instance, if lowercasing is not performed, the algorithm will treat USA, Usa, usa, UsA, uSA, etc. as distinct tokens, even though they may all refer to the same entity. On the other hand, if the dataset does not contain such OCR mistakes, then it may become difficult to distinguish between homonyms and make interpreting the topics much harder.
    Removing stopwords and words with less than three characters: Remove low information words. These are typically words such as articles, pronouns, prepositions, conjunctions, etc. which are not semantically salient. There are numerous stopword lists available for many, though not all, languages which can be easily adapted to the individual researcher's needs. Removing words with less than three characters may additionally remove many OCR mistakes. Both these operations have the dual advantage of yielding more reliable results while reducing the size of the dataset, thus in turn reducing the required processing power. This step can therefore hardly be considered optional in TM.
    Noise removal: Remove elements such as punctuation marks, special characters, numbers, html formatting, etc. This operation is again concerned with removing elements that may not be relevant to the text analysis and in fact interfere with it. Depending on the dataset and research question, this operation can become essential.
    """
    _module_type_name = "preprocess.tokens_array"
    KIARA_METADATA = {
        "tags": ["tokens", "preprocess"],
    }
    def create_inputs_schema(
        self,
    ) -> ValueSetSchema:
        return {
            "tokens_array": {
                "type": "array",
                "doc": "The tokens array to pre-process.",
            },
            "to_lowercase": {
                "type": "boolean",
                "doc": "Apply lowercasing to the text.",
                "default": False,
            },
            "remove_alphanumeric": {
                "type": "boolean",
                "doc": "Remove all tokens that include numbers (e.g. ex1ample).",
                "default": False,
            },
            "remove_non_alpha": {
                "type": "boolean",
                "doc": "Remove all tokens that include punctuation and numbers (e.g. ex1a.mple).",
                "default": False,
            },
            "remove_all_numeric": {
                "type": "boolean",
                "doc": "Remove all tokens that contain numbers only (e.g. 876).",
                "default": False,
            },
            "remove_short_tokens": {
                "type": "integer",
                "doc": "Remove tokens shorter than a certain length. If value is <= 0, no filtering will be done.",
                "default": False,
            },
            "remove_stopwords": {
                "type": "list",
                "doc": "Remove stopwords.",
                "optional": True,
            },
        }
    def create_outputs_schema(
        self,
    ) -> ValueSetSchema:
        return {
            "tokens_array": {
                "type": "array",
                "doc": "The pre-processed content, as an array of lists of strings.",
            }
        }
    def process(self, inputs: ValueMap, outputs: ValueMap):
        import polars as pl
        import pyarrow as pa
        tokens_array: KiaraArray = inputs.get_value_data("tokens_array")
        lowercase: bool = inputs.get_value_data("to_lowercase")
        remove_alphanumeric: bool = inputs.get_value_data("remove_alphanumeric")
        remove_non_alpha: bool = inputs.get_value_data("remove_non_alpha")
        remove_all_numeric: bool = inputs.get_value_data("remove_all_numeric")
        remove_short_tokens: int = inputs.get_value_data("remove_short_tokens")
        if remove_short_tokens is None:
            remove_short_tokens = -1
        _remove_stopwords = inputs.get_value_obj("remove_stopwords")
        if _remove_stopwords.is_set:
            stopword_list: Optional[Iterable[str]] = _remove_stopwords.data.list_data
        else:
            stopword_list = None
        # it's better to have one method every token goes through, then do every test seperately for the token list
        # because that way each token only needs to be touched once (which is more effective)
        def check_token(token: str) -> Optional[str]:
            # remove short tokens first, since we can save ourselves all the other checks (which are more expensive)
            if remove_short_tokens > 0:
                if len(token) <= remove_short_tokens:
                    return None
            _token: str = token
            if lowercase:
                _token = _token.lower()
            if remove_non_alpha:
                match = _token if _token.isalpha() else None
                if match is None:
                    return None
            # if remove_non_alpha was set, we don't need to worry about tokens that include numbers, since they are already filtered out
            if remove_alphanumeric and not remove_non_alpha:
                match = _token if _token.isalnum() else None
                if match is None:
                    return None
            # all-number tokens are already filtered out if the remove_non_alpha methods above ran
            if remove_all_numeric and not remove_non_alpha:
                match = None if _token.isdigit() else _token
                if match is None:
                    return None
            if stopword_list and _token and _token.lower() in stopword_list:
                return None
            return _token
        series = pl.Series(name="tokens", values=tokens_array.arrow_array)
        result = series.apply(
            lambda token_list: [
                x for x in (check_token(token) for token in token_list) if x is not None
            ]
        )
        result_array = result.to_arrow()
        # TODO: remove this cast once the array data type can handle non-chunked arrays
        chunked = pa.chunked_array(result_array)
        outputs.set_values(tokens_array=chunked)
KIARA_METADATA
¶Methods¶
create_inputs_schema(self)
¶Return the schema for this types' inputs.
Source code in language_processing/modules/tokens.py
          def create_inputs_schema(
    self,
) -> ValueSetSchema:
    return {
        "tokens_array": {
            "type": "array",
            "doc": "The tokens array to pre-process.",
        },
        "to_lowercase": {
            "type": "boolean",
            "doc": "Apply lowercasing to the text.",
            "default": False,
        },
        "remove_alphanumeric": {
            "type": "boolean",
            "doc": "Remove all tokens that include numbers (e.g. ex1ample).",
            "default": False,
        },
        "remove_non_alpha": {
            "type": "boolean",
            "doc": "Remove all tokens that include punctuation and numbers (e.g. ex1a.mple).",
            "default": False,
        },
        "remove_all_numeric": {
            "type": "boolean",
            "doc": "Remove all tokens that contain numbers only (e.g. 876).",
            "default": False,
        },
        "remove_short_tokens": {
            "type": "integer",
            "doc": "Remove tokens shorter than a certain length. If value is <= 0, no filtering will be done.",
            "default": False,
        },
        "remove_stopwords": {
            "type": "list",
            "doc": "Remove stopwords.",
            "optional": True,
        },
    }
create_outputs_schema(self)
¶Return the schema for this types' outputs.
Source code in language_processing/modules/tokens.py
          def create_outputs_schema(
    self,
) -> ValueSetSchema:
    return {
        "tokens_array": {
            "type": "array",
            "doc": "The pre-processed content, as an array of lists of strings.",
        }
    }
process(self, inputs, outputs)
¶Source code in language_processing/modules/tokens.py
          def process(self, inputs: ValueMap, outputs: ValueMap):
    import polars as pl
    import pyarrow as pa
    tokens_array: KiaraArray = inputs.get_value_data("tokens_array")
    lowercase: bool = inputs.get_value_data("to_lowercase")
    remove_alphanumeric: bool = inputs.get_value_data("remove_alphanumeric")
    remove_non_alpha: bool = inputs.get_value_data("remove_non_alpha")
    remove_all_numeric: bool = inputs.get_value_data("remove_all_numeric")
    remove_short_tokens: int = inputs.get_value_data("remove_short_tokens")
    if remove_short_tokens is None:
        remove_short_tokens = -1
    _remove_stopwords = inputs.get_value_obj("remove_stopwords")
    if _remove_stopwords.is_set:
        stopword_list: Optional[Iterable[str]] = _remove_stopwords.data.list_data
    else:
        stopword_list = None
    # it's better to have one method every token goes through, then do every test seperately for the token list
    # because that way each token only needs to be touched once (which is more effective)
    def check_token(token: str) -> Optional[str]:
        # remove short tokens first, since we can save ourselves all the other checks (which are more expensive)
        if remove_short_tokens > 0:
            if len(token) <= remove_short_tokens:
                return None
        _token: str = token
        if lowercase:
            _token = _token.lower()
        if remove_non_alpha:
            match = _token if _token.isalpha() else None
            if match is None:
                return None
        # if remove_non_alpha was set, we don't need to worry about tokens that include numbers, since they are already filtered out
        if remove_alphanumeric and not remove_non_alpha:
            match = _token if _token.isalnum() else None
            if match is None:
                return None
        # all-number tokens are already filtered out if the remove_non_alpha methods above ran
        if remove_all_numeric and not remove_non_alpha:
            match = None if _token.isdigit() else _token
            if match is None:
                return None
        if stopword_list and _token and _token.lower() in stopword_list:
            return None
        return _token
    series = pl.Series(name="tokens", values=tokens_array.arrow_array)
    result = series.apply(
        lambda token_list: [
            x for x in (check_token(token) for token in token_list) if x is not None
        ]
    )
    result_array = result.to_arrow()
    # TODO: remove this cast once the array data type can handle non-chunked arrays
    chunked = pa.chunked_array(result_array)
    outputs.set_values(tokens_array=chunked)
        
RemoveStopwordsModule            (KiaraModule)
        
¶
    Remove stopwords from an array of token-lists.
Source code in language_processing/modules/tokens.py
          class RemoveStopwordsModule(KiaraModule):
    """Remove stopwords from an array of token-lists."""
    _module_type_name = "remove_stopwords.from.tokens_array"
    def create_inputs_schema(
        self,
    ) -> ValueSetSchema:
        # TODO: do something smart and check whether languages are already downloaded, if so, display selection in doc
        inputs: Dict[str, Dict[str, Any]] = {
            "tokens_array": {
                "type": "array",
                "doc": "An array of string lists (a list of tokens).",
            },
            "languages": {
                "type": "list",
                # "doc": f"A list of language names to use default stopword lists for. Available: {', '.join(get_stopwords().fileids())}.",
                "doc": "A list of language names to use default stopword lists for.",
                "optional": True,
            },
            "additional_stopwords": {
                "type": "list",
                "doc": "A list of additional, custom stopwords.",
                "optional": True,
            },
        }
        return inputs
    def create_outputs_schema(
        self,
    ) -> ValueSetSchema:
        outputs = {
            "tokens_array": {
                "type": "array",
                "doc": "An array of string lists, with the stopwords removed.",
            }
        }
        return outputs
    def process(self, inputs: ValueMap, outputs: ValueMap) -> None:
        import pyarrow as pa
        custom_stopwords = inputs.get_value_data("additional_stopwords")
        if inputs.get_value_obj("languages").is_set:
            _languages: ListModel = inputs.get_value_data("languages")
            languages = _languages.list_data
        else:
            languages = []
        stopwords = set()
        if languages:
            for language in languages:
                if language not in get_stopwords().fileids():
                    raise KiaraProcessingException(
                        f"Invalid language: {language}. Available: {', '.join(get_stopwords().fileids())}."
                    )
                stopwords.update(get_stopwords().words(language))
        if custom_stopwords:
            stopwords.update(custom_stopwords)
        orig_array = inputs.get_value_obj("tokens_array")  # type: ignore
        if not stopwords:
            outputs.set_value("tokens_array", orig_array)
            return
        # if hasattr(orig_array, "to_pylist"):
        #     token_lists = orig_array.to_pylist()
        tokens_array = orig_array.data.arrow_array
        # TODO: use vaex for this
        result = []
        for token_list in tokens_array:
            cleaned_list = [x for x in token_list.as_py() if x.lower() not in stopwords]
            result.append(cleaned_list)
        outputs.set_value("tokens_array", pa.chunked_array(pa.array(result)))
Methods¶
create_inputs_schema(self)
¶Return the schema for this types' inputs.
Source code in language_processing/modules/tokens.py
          def create_inputs_schema(
    self,
) -> ValueSetSchema:
    # TODO: do something smart and check whether languages are already downloaded, if so, display selection in doc
    inputs: Dict[str, Dict[str, Any]] = {
        "tokens_array": {
            "type": "array",
            "doc": "An array of string lists (a list of tokens).",
        },
        "languages": {
            "type": "list",
            # "doc": f"A list of language names to use default stopword lists for. Available: {', '.join(get_stopwords().fileids())}.",
            "doc": "A list of language names to use default stopword lists for.",
            "optional": True,
        },
        "additional_stopwords": {
            "type": "list",
            "doc": "A list of additional, custom stopwords.",
            "optional": True,
        },
    }
    return inputs
create_outputs_schema(self)
¶Return the schema for this types' outputs.
Source code in language_processing/modules/tokens.py
          def create_outputs_schema(
    self,
) -> ValueSetSchema:
    outputs = {
        "tokens_array": {
            "type": "array",
            "doc": "An array of string lists, with the stopwords removed.",
        }
    }
    return outputs
process(self, inputs, outputs)
¶Source code in language_processing/modules/tokens.py
          def process(self, inputs: ValueMap, outputs: ValueMap) -> None:
    import pyarrow as pa
    custom_stopwords = inputs.get_value_data("additional_stopwords")
    if inputs.get_value_obj("languages").is_set:
        _languages: ListModel = inputs.get_value_data("languages")
        languages = _languages.list_data
    else:
        languages = []
    stopwords = set()
    if languages:
        for language in languages:
            if language not in get_stopwords().fileids():
                raise KiaraProcessingException(
                    f"Invalid language: {language}. Available: {', '.join(get_stopwords().fileids())}."
                )
            stopwords.update(get_stopwords().words(language))
    if custom_stopwords:
        stopwords.update(custom_stopwords)
    orig_array = inputs.get_value_obj("tokens_array")  # type: ignore
    if not stopwords:
        outputs.set_value("tokens_array", orig_array)
        return
    # if hasattr(orig_array, "to_pylist"):
    #     token_lists = orig_array.to_pylist()
    tokens_array = orig_array.data.arrow_array
    # TODO: use vaex for this
    result = []
    for token_list in tokens_array:
        cleaned_list = [x for x in token_list.as_py() if x.lower() not in stopwords]
        result.append(cleaned_list)
    outputs.set_value("tokens_array", pa.chunked_array(pa.array(result)))
        
TokenizeTextArrayeModule            (KiaraModule)
        
¶
    Split sentences into words or words into characters. In other words, this operation establishes the word boundaries (i.e., tokens) a very helpful way of finding patterns. It is also the typical step prior to stemming and lemmatization
Source code in language_processing/modules/tokens.py
          class TokenizeTextArrayeModule(KiaraModule):
    """Split sentences into words or words into characters.
    In other words, this operation establishes the word boundaries (i.e., tokens) a very helpful way of finding patterns. It is also the typical step prior to stemming and lemmatization
    """
    _module_type_name = "tokenize.texts_array"
    KIARA_METADATA = {
        "tags": ["tokenize", "tokens"],
    }
    def create_inputs_schema(
        self,
    ) -> ValueSetSchema:
        return {
            "texts_array": {
                "type": "array",
                "doc": "An array of text items to be tokenized.",
            },
            "tokenize_by_word": {
                "type": "boolean",
                "doc": "Whether to tokenize by word (default), or character.",
                "default": True,
            },
        }
    def create_outputs_schema(
        self,
    ) -> ValueSetSchema:
        return {
            "tokens_array": {
                "type": "array",
                "doc": "The tokenized content, as an array of lists of strings.",
            }
        }
    def process(self, inputs: ValueMap, outputs: ValueMap):
        pass
        import nltk
        import polars as pl
        import pyarrow as pa
        array: KiaraArray = inputs.get_value_data("texts_array")
        # tokenize_by_word: bool = inputs.get_value_data("tokenize_by_word")
        column: pa.ChunkedArray = array.arrow_array
        # warnings.filterwarnings("ignore", category=np.VisibleDeprecationWarning)
        def word_tokenize(word):
            result = nltk.word_tokenize(word)
            return result
        series = pl.Series(name="tokens", values=column)
        result = series.apply(word_tokenize)
        result_array = result.to_arrow()
        # TODO: remove this cast once the array data type can handle non-chunked arrays
        chunked = pa.chunked_array(result_array)
        outputs.set_values(tokens_array=chunked)
KIARA_METADATA
¶Methods¶
create_inputs_schema(self)
¶Return the schema for this types' inputs.
Source code in language_processing/modules/tokens.py
          def create_inputs_schema(
    self,
) -> ValueSetSchema:
    return {
        "texts_array": {
            "type": "array",
            "doc": "An array of text items to be tokenized.",
        },
        "tokenize_by_word": {
            "type": "boolean",
            "doc": "Whether to tokenize by word (default), or character.",
            "default": True,
        },
    }
create_outputs_schema(self)
¶Return the schema for this types' outputs.
Source code in language_processing/modules/tokens.py
          def create_outputs_schema(
    self,
) -> ValueSetSchema:
    return {
        "tokens_array": {
            "type": "array",
            "doc": "The tokenized content, as an array of lists of strings.",
        }
    }
process(self, inputs, outputs)
¶Source code in language_processing/modules/tokens.py
          def process(self, inputs: ValueMap, outputs: ValueMap):
    pass
    import nltk
    import polars as pl
    import pyarrow as pa
    array: KiaraArray = inputs.get_value_data("texts_array")
    # tokenize_by_word: bool = inputs.get_value_data("tokenize_by_word")
    column: pa.ChunkedArray = array.arrow_array
    # warnings.filterwarnings("ignore", category=np.VisibleDeprecationWarning)
    def word_tokenize(word):
        result = nltk.word_tokenize(word)
        return result
    series = pl.Series(name="tokens", values=column)
    result = series.apply(word_tokenize)
    result_array = result.to_arrow()
    # TODO: remove this cast once the array data type can handle non-chunked arrays
    chunked = pa.chunked_array(result_array)
    outputs.set_values(tokens_array=chunked)
        
TokenizeTextConfig            (KiaraModuleConfig)
        
  
      pydantic-model
  
¶
    Source code in language_processing/modules/tokens.py
          class TokenizeTextConfig(KiaraModuleConfig):
    filter_non_alpha: bool = Field(
        description="Whether to filter out non alpha tokens.", default=True
    )
    min_token_length: int = Field(description="The minimum token length.", default=3)
    to_lowercase: bool = Field(
        description="Whether to lowercase the tokens.", default=True
    )
        
TokenizeTextModule            (KiaraModule)
        
¶
    Tokenize a string.
Source code in language_processing/modules/tokens.py
          class TokenizeTextModule(KiaraModule):
    """Tokenize a string."""
    _config_cls = TokenizeTextConfig
    _module_type_name = "tokenize.string"
    def create_inputs_schema(
        self,
    ) -> ValueSetSchema:
        inputs = {"text": {"type": "string", "doc": "The text to tokenize."}}
        return inputs
    def create_outputs_schema(
        self,
    ) -> ValueSetSchema:
        outputs = {
            "token_list": {
                "type": "list",
                "doc": "The tokenized version of the input text.",
            }
        }
        return outputs
    def process(self, inputs: ValueMap, outputs: ValueMap) -> None:
        import nltk
        # TODO: module-independent caching?
        # language = inputs.get_value_data("language")
        #
        text = inputs.get_value_data("text")
        tokenized = nltk.word_tokenize(text)
        result = tokenized
        if self.get_config_value("min_token_length") > 0:
            result = (
                x
                for x in tokenized
                if len(x) >= self.get_config_value("min_token_length")
            )
        if self.get_config_value("filter_non_alpha"):
            result = (x for x in result if x.isalpha())
        if self.get_config_value("to_lowercase"):
            result = (x.lower() for x in result)
        outputs.set_value("token_list", list(result))
Classes¶
_config_cls            (KiaraModuleConfig)
        
  
      private
      pydantic-model
  
¶Source code in language_processing/modules/tokens.py
          class TokenizeTextConfig(KiaraModuleConfig):
    filter_non_alpha: bool = Field(
        description="Whether to filter out non alpha tokens.", default=True
    )
    min_token_length: int = Field(description="The minimum token length.", default=3)
    to_lowercase: bool = Field(
        description="Whether to lowercase the tokens.", default=True
    )
Methods¶
create_inputs_schema(self)
¶Return the schema for this types' inputs.
Source code in language_processing/modules/tokens.py
          def create_inputs_schema(
    self,
) -> ValueSetSchema:
    inputs = {"text": {"type": "string", "doc": "The text to tokenize."}}
    return inputs
create_outputs_schema(self)
¶Return the schema for this types' outputs.
Source code in language_processing/modules/tokens.py
          def create_outputs_schema(
    self,
) -> ValueSetSchema:
    outputs = {
        "token_list": {
            "type": "list",
            "doc": "The tokenized version of the input text.",
        }
    }
    return outputs
process(self, inputs, outputs)
¶Source code in language_processing/modules/tokens.py
          def process(self, inputs: ValueMap, outputs: ValueMap) -> None:
    import nltk
    # TODO: module-independent caching?
    # language = inputs.get_value_data("language")
    #
    text = inputs.get_value_data("text")
    tokenized = nltk.word_tokenize(text)
    result = tokenized
    if self.get_config_value("min_token_length") > 0:
        result = (
            x
            for x in tokenized
            if len(x) >= self.get_config_value("min_token_length")
        )
    if self.get_config_value("filter_non_alpha"):
        result = (x for x in result if x.isalpha())
    if self.get_config_value("to_lowercase"):
        result = (x.lower() for x in result)
    outputs.set_value("token_list", list(result))
get_stopwords()
¶
    Source code in language_processing/modules/tokens.py
          def get_stopwords():
    # TODO: make that smarter
    import nltk
    output = io.StringIO()
    nltk.download("punkt", print_error_to=output)
    nltk.download("stopwords", print_error_to=output)
    log.debug("external.message", source="nltk", msg=output.getvalue())
    from nltk.corpus import stopwords
    return stopwords