Skip to content

array

FORCE_NON_NULL_DOC

MAX_INDEX_DOC

MIN_INDEX_DOC

REMOVE_TOKENS_DOC

Classes

DeserializeArrayModule (DeserializeValueModule)

Deserialize array data.

Source code in tabular/modules/array/__init__.py
class DeserializeArrayModule(DeserializeValueModule):
    """Deserialize array data."""

    _module_type_name = "load.array"

    @classmethod
    def retrieve_supported_target_profiles(cls) -> Mapping[str, Type]:
        return {"python_object": KiaraArray}

    @classmethod
    def retrieve_serialized_value_type(cls) -> str:
        return "array"

    @classmethod
    def retrieve_supported_serialization_profile(cls) -> str:
        return "feather"

    def to__python_object(self, data: SerializedData, **config: Any):

        assert "array.arrow" in data.get_keys() and len(list(data.get_keys())) == 1

        chunks = data.get_serialized_data("array.arrow")

        # TODO: support multiple chunks
        assert chunks.get_number_of_chunks() == 1
        files = list(chunks.get_chunks(as_files=True, symlink_ok=True))
        assert len(files) == 1

        array_file = files[0]

        array = KiaraArray(data_path=array_file)
        return array
retrieve_serialized_value_type() classmethod
Source code in tabular/modules/array/__init__.py
@classmethod
def retrieve_serialized_value_type(cls) -> str:
    return "array"
retrieve_supported_serialization_profile() classmethod
Source code in tabular/modules/array/__init__.py
@classmethod
def retrieve_supported_serialization_profile(cls) -> str:
    return "feather"
retrieve_supported_target_profiles() classmethod
Source code in tabular/modules/array/__init__.py
@classmethod
def retrieve_supported_target_profiles(cls) -> Mapping[str, Type]:
    return {"python_object": KiaraArray}
to__python_object(self, data, **config)
Source code in tabular/modules/array/__init__.py
def to__python_object(self, data: SerializedData, **config: Any):

    assert "array.arrow" in data.get_keys() and len(list(data.get_keys())) == 1

    chunks = data.get_serialized_data("array.arrow")

    # TODO: support multiple chunks
    assert chunks.get_number_of_chunks() == 1
    files = list(chunks.get_chunks(as_files=True, symlink_ok=True))
    assert len(files) == 1

    array_file = files[0]

    array = KiaraArray(data_path=array_file)
    return array

ExtractDateConfig (KiaraInputsConfig) pydantic-model

Source code in tabular/modules/array/__init__.py
class ExtractDateConfig(KiaraInputsConfig):

    force_non_null: bool = Field(description=FORCE_NON_NULL_DOC, default=True)
    min_index: Union[None, int] = Field(
        description=MIN_INDEX_DOC,
        default=None,
    )
    max_index: Union[None, int] = Field(description=MAX_INDEX_DOC, default=None)
    remove_tokens: List[str] = Field(
        description=REMOVE_TOKENS_DOC, default_factory=list
    )

Attributes

force_non_null: bool pydantic-field

If set to 'True', raise an error if any of the strings in the array can't be parsed.

max_index: int pydantic-field

The maximum index until whic to parse the string(s).

min_index: int pydantic-field

The minimum index from where to start parsing the string(s).

remove_tokens: List[str] pydantic-field

A list of tokens/characters to replace with a single white-space before parsing the input.

ExtractDateModule (AutoInputsKiaraModule)

Create an array of date objects from an array of strings.

This module is very simplistic at the moment, more functionality and options will be added in the future.

At its core, this module uses the standard parser from the dateutil package to parse strings into dates. As this parser can't handle complex strings, the input strings can be pre-processed in the following ways:

  • 'cut' non-relevant parts of the string (using 'min_index' & 'max_index' input/config options)
  • remove matching tokens from the string, and replace them with a single whitespace (using the 'remove_tokens' option)

By default, if an input string can't be parsed this module will raise an exception. This can be prevented by setting this modules 'force_non_null' config option or input to 'False', in which case un-parsable strings will appear as 'NULL' value in the resulting array.

Source code in tabular/modules/array/__init__.py
class ExtractDateModule(AutoInputsKiaraModule):
    """Create an array of date objects from an array of strings.

    This module is very simplistic at the moment, more functionality and options will be added in the future.

    At its core, this module uses the standard parser from the
    [dateutil](https://github.com/dateutil/dateutil) package to parse strings into dates. As this parser can't handle
     complex strings, the input strings can be pre-processed in the following ways:

    - 'cut' non-relevant parts of the string (using 'min_index' & 'max_index' input/config options)
    - remove matching tokens from the string, and replace them with a single whitespace (using the 'remove_tokens' option)

    By default, if an input string can't be parsed this module will raise an exception. This can be prevented by
    setting this modules 'force_non_null' config option or input to 'False', in which case un-parsable strings
    will appear as 'NULL' value in the resulting array.
    """

    _module_type_name = "parse.date_array"
    _config_cls = ExtractDateConfig

    def create_inputs_schema(
        self,
    ) -> ValueMapSchema:

        inputs = {"array": {"type": "array", "doc": "The input array."}}
        return inputs

    def create_outputs_schema(
        self,
    ) -> ValueMapSchema:

        return {
            "date_array": {
                "type": "array",
                "doc": "The resulting array with items of a date data type.",
            }
        }

    def process(self, inputs: ValueMap, outputs: ValueMap, job_log: JobLog):

        import polars as pl
        import pyarrow as pa
        from dateutil import parser

        force_non_null: bool = self.get_data_for_field(
            field_name="force_non_null", inputs=inputs
        )
        min_pos: Union[None, int] = self.get_data_for_field(
            field_name="min_index", inputs=inputs
        )
        if min_pos is None:
            min_pos = 0
        max_pos: Union[None, int] = self.get_data_for_field(
            field_name="max_index", inputs=inputs
        )
        remove_tokens: Iterable[str] = self.get_data_for_field(
            field_name="remove_tokens", inputs=inputs
        )

        def parse_date(_text: str):

            text = _text
            if min_pos:
                try:
                    text = text[min_pos:]  # type: ignore
                except Exception:
                    return None
            if max_pos:
                try:
                    text = text[0 : max_pos - min_pos]  # type: ignore  # noqa
                except Exception:
                    pass

            if remove_tokens:
                for t in remove_tokens:
                    text = text.replace(t, " ")

            try:
                d_obj = parser.parse(text, fuzzy=True)
            except Exception as e:
                if force_non_null:
                    raise KiaraProcessingException(e)
                return None

            if d_obj is None:
                if force_non_null:
                    raise KiaraProcessingException(
                        f"Can't parse date from string: {text}"
                    )
                return None

            return d_obj

        value = inputs.get_value_obj("array")
        array: KiaraArray = value.data

        series = pl.Series(name="tokens", values=array.arrow_array)
        job_log.add_log(f"start parsing date for {len(array)} items")
        result = series.apply(parse_date)
        job_log.add_log(f"finished parsing date for {len(array)} items")
        result_array = result.to_arrow()

        # TODO: remove this cast once the array data type can handle non-chunked arrays
        chunked = pa.chunked_array(result_array)
        outputs.set_values(date_array=chunked)

Classes

_config_cls (KiaraInputsConfig) private pydantic-model
Source code in tabular/modules/array/__init__.py
class ExtractDateConfig(KiaraInputsConfig):

    force_non_null: bool = Field(description=FORCE_NON_NULL_DOC, default=True)
    min_index: Union[None, int] = Field(
        description=MIN_INDEX_DOC,
        default=None,
    )
    max_index: Union[None, int] = Field(description=MAX_INDEX_DOC, default=None)
    remove_tokens: List[str] = Field(
        description=REMOVE_TOKENS_DOC, default_factory=list
    )
Attributes
force_non_null: bool pydantic-field

If set to 'True', raise an error if any of the strings in the array can't be parsed.

max_index: int pydantic-field

The maximum index until whic to parse the string(s).

min_index: int pydantic-field

The minimum index from where to start parsing the string(s).

remove_tokens: List[str] pydantic-field

A list of tokens/characters to replace with a single white-space before parsing the input.

Methods

create_inputs_schema(self)

Return the schema for this types' inputs.

Source code in tabular/modules/array/__init__.py
def create_inputs_schema(
    self,
) -> ValueMapSchema:

    inputs = {"array": {"type": "array", "doc": "The input array."}}
    return inputs
create_outputs_schema(self)

Return the schema for this types' outputs.

Source code in tabular/modules/array/__init__.py
def create_outputs_schema(
    self,
) -> ValueMapSchema:

    return {
        "date_array": {
            "type": "array",
            "doc": "The resulting array with items of a date data type.",
        }
    }
process(self, inputs, outputs, job_log)
Source code in tabular/modules/array/__init__.py
def process(self, inputs: ValueMap, outputs: ValueMap, job_log: JobLog):

    import polars as pl
    import pyarrow as pa
    from dateutil import parser

    force_non_null: bool = self.get_data_for_field(
        field_name="force_non_null", inputs=inputs
    )
    min_pos: Union[None, int] = self.get_data_for_field(
        field_name="min_index", inputs=inputs
    )
    if min_pos is None:
        min_pos = 0
    max_pos: Union[None, int] = self.get_data_for_field(
        field_name="max_index", inputs=inputs
    )
    remove_tokens: Iterable[str] = self.get_data_for_field(
        field_name="remove_tokens", inputs=inputs
    )

    def parse_date(_text: str):

        text = _text
        if min_pos:
            try:
                text = text[min_pos:]  # type: ignore
            except Exception:
                return None
        if max_pos:
            try:
                text = text[0 : max_pos - min_pos]  # type: ignore  # noqa
            except Exception:
                pass

        if remove_tokens:
            for t in remove_tokens:
                text = text.replace(t, " ")

        try:
            d_obj = parser.parse(text, fuzzy=True)
        except Exception as e:
            if force_non_null:
                raise KiaraProcessingException(e)
            return None

        if d_obj is None:
            if force_non_null:
                raise KiaraProcessingException(
                    f"Can't parse date from string: {text}"
                )
            return None

        return d_obj

    value = inputs.get_value_obj("array")
    array: KiaraArray = value.data

    series = pl.Series(name="tokens", values=array.arrow_array)
    job_log.add_log(f"start parsing date for {len(array)} items")
    result = series.apply(parse_date)
    job_log.add_log(f"finished parsing date for {len(array)} items")
    result_array = result.to_arrow()

    # TODO: remove this cast once the array data type can handle non-chunked arrays
    chunked = pa.chunked_array(result_array)
    outputs.set_values(date_array=chunked)