Skip to content

Importer

DataImporter()

Class used to import data from multiple sources to Echoenergia's performance_db.

Source code in echo_dataimporter/importer.py
def __init__(self) -> None:
    """DataImporter constructor."""
    self.perfdb = PerfDB(application_name=f"{type(self).__name__}:{socket.gethostname()}")
    self.baze = Baze()
    self.data_sources: dict[str, dict[str, Any]] = self.perfdb.datasources.instances.get(
        data_source_types_names=list(HANDLER_MAPPING.keys()),
        get_attributes=True,
        output_type="dict",
    )

alarms_history(period, objects=None, overlap_interval=None, recalc_features=False, sort_objects='no', data_sources=None, **kwargs)

Function imports alarms from a data source to performance_db within a specified period. It takes in a range of time, a list of objects, and an overlap interval as parameters. If no objects are specified, it will retrieve all connected objects to data sources of the desired type.

Parameters:

  • period

    (DateTimeRange | PeriodDict | None) –

    Desired period to import the data

  • objects

    (list[str] | None, default: None ) –

    Wanted objects as defined in performance_db.

    If is [] will get all connected objects to data sources of the desired type, as defined in init of subclasses By default [].

  • overlap_interval

    (relativedelta | None, default: None ) –

    How many days in the past to get data that was already acquired.

    This is useful to make sure alarms already acquired will have a correct end.

    By default relativedelta(days=5)

  • recalc_features

    (bool, default: False ) –

    If set to true will recalc all turbine features related to alarms in performance_db for the period, by default False

  • sort_objects

    (Literal['no', 'alphabetical', 'random'], default: 'no' ) –

    How objects should be sorted, which will impact in the order they are imported. Can be one of: - "no": no sorting - "alphabetical": sort by name - "random": sort randomly

    By default "no"

  • data_sources

    (list[str] | None, default: None ) –

    If you just want to import some data sources, select them here.

    It's expected that you provide data source names as defined in performance_db.]

    By default [].

  • **kwargs

    Additional keyword arguments to be passed to the alarm_history method of the handler class.

Returns:

  • tuple[ErrorSummary, dict[str, DateTimeRange | None | bool]]

    Summary of errors that occurred during the import Dict with imported periods in the format {object_name: {"acquired_period": DateTimeRange, "requested_period": DateTimeRange, "import_success": bool}}

Source code in echo_dataimporter/importer.py
@validate_call
def alarms_history(
    self,
    period: DateTimeRange | PeriodDict | None,
    objects: list[str] | None = None,
    overlap_interval: relativedelta | None = None,
    recalc_features: bool = False,
    sort_objects: Literal["no", "alphabetical", "random"] = "no",
    data_sources: list[str] | None = None,
    **kwargs,  # pylint: disable=unused-argument# noqa
) -> tuple[ErrorSummary, dict[str, DateTimeRange | None | bool]]:
    """Function imports alarms from a data source to performance_db within a specified period. It takes in a range of time, a list of objects, and an overlap interval as parameters. If no objects are specified, it will retrieve all connected objects to data sources of the desired type.

    Parameters
    ----------
    period : DateTimeRange | PeriodDict | None
        Desired period to import the data
    objects : list[str] | None, optional
        Wanted objects as defined in performance_db.

        If is [] will get all connected objects to data sources of the desired type, as defined in __init__ of subclasses
        By default [].
    overlap_interval : relativedelta | None, optional
        How many days in the past to get data that was already acquired.

        This is useful to make sure alarms already acquired will have a correct end.

        By default relativedelta(days=5)
    recalc_features : bool, optional
        If set to true will recalc all turbine features related to alarms in performance_db for the period, by default False
    sort_objects : Literal["no", "alphabetical", "random"], optional
        How objects should be sorted, which will impact in the order they are imported. Can be one of:
            - "no": no sorting
            - "alphabetical": sort by name
            - "random": sort randomly

        By default "no"
    data_sources : list[str] | None, optional
        If you just want to import some data sources, select them here.

        It's expected that you provide data source names as defined in performance_db.]

        By default [].
    **kwargs
        Additional keyword arguments to be passed to the alarm_history method of the handler class.

    Returns
    -------
    tuple[ErrorSummary, dict[str, DateTimeRange | None | bool]]
        Summary of errors that occurred during the import
        Dict with imported periods in the format {object_name: {"acquired_period": DateTimeRange, "requested_period": DateTimeRange, "import_success": bool}}

    """
    if objects is None:
        objects = []
    if overlap_interval is None:
        overlap_interval = relativedelta(days=5)
    logger.info(f"Starting {type(self).__name__}.{self.alarms_history.__name__}")

    errs = ErrorSummary("DataImporter.alarms_history")

    # getting wanted objects
    wanted_objects = self._get_wanted_objects(objects)

    # sorting objects
    wanted_objects = self._sort_objects(wanted_objects, sort_objects)

    periods_dict = {}

    # TODO if possible, it would be good to parallelize this code
    for object_name in wanted_objects:
        logger.info(f"Getting alarms for '{object_name}': {wanted_objects.index(object_name) + 1} of {len(wanted_objects)} objects")
        try:
            requested_period, acquired_period = self.single_obj_alarms_history(
                period=period,
                object_name=object_name,
                overlap_interval=overlap_interval,
                data_sources=data_sources,
            )

            periods_dict[object_name] = {
                "acquired_period": acquired_period,
                "requested_period": requested_period,
                "import_success": True,
            }
        except Exception as e:
            logger.exception(f"Failed to get feature values for {object_name}")
            err_obj = ErrorObject(object_name, [e])
            errs.add_child(err_obj)

            periods_dict[object_name] = {"acquired_period": None, "requested_period": period, "import_success": False}

    # recalculating server_calc features
    if recalc_features:
        for object_name, this_period in periods_dict.items():
            if isinstance(this_period["requested_period"], dict):
                this_period["requested_period"] = DateTimeRange.from_dict(this_period["requested_period"])
            calc = CalculationHandler.from_calc_types(calc_types=["alarm_active_time"], object_filters={"object_names": [object_name]})
            calc.calculate(period=this_period["requested_period"])

    return errs, periods_dict

define_calc_period(periods, mode='single', fallback=None) staticmethod

Method to define the period to be used in calculations based on the periods of the objects.

It will try to create the largest possible period based on the periods of the objects. In case mode is "single", it will return a single DateTimeRange that spans the minimum start and maximum end of both acquired and requested periods of all objects. In case mode is "multiple", it will do the same but for each individual object.

Parameters:

  • periods

    (dict[str, CalcPeriodDict]) –

    Dict containing the periods requested and acquired for each object.

    It is in the format: {object_name: {"requested_period": DateTimeRange, "acquired_period": DateTimeRange | None, "import_success": bool}}

    Where: - requested_period is the period that was requested by the importer to the data source based on missing data in performance_db - acquired_period is the period that was actually acquired from the data source and can be None if no data was acquired - import_success is a boolean that indicates if the import was successful or not

  • mode

    (str, default: 'single' ) –

    Defines how the calculation period will be defined. Can be one of: - "single": will return a single DateTimeRange object or None (in case it failed to define the calculation period) - "multiple": will return a dict with the calculation period for each object. The dict will be in the format: {object_name: {"calc_period": DateTimeRange | None}}

  • fallback

    (DateTimeRange | PeriodDict | None, default: None ) –

    Period to be used in case it is not possible to define the calculation period based on the periods of the objects. By default None

Returns:

  • DateTimeRange | None

    Period to be used in calculations. If None, no calculations should be performed.

Source code in echo_dataimporter/importer.py
@staticmethod
@validate_call
def define_calc_period(
    periods: dict[str, CalcPeriodDict],
    mode: str = "single",
    fallback: DateTimeRange | PeriodDict | None = None,
) -> dict[str, dict[str, DateTimeRange]] | DateTimeRange | None:
    """Method to define the period to be used in calculations based on the periods of the objects.

    It will try to create the largest possible period based on the periods of the objects. In case mode is "single", it will return a single DateTimeRange that spans the minimum start and maximum end of both acquired and requested periods of all objects. In case mode is "multiple", it will do the same but for each individual object.

    Parameters
    ----------
    periods : dict[str, CalcPeriodDict]
        Dict containing the periods requested and acquired for each object.

        It is in the format: {object_name: {"requested_period": DateTimeRange, "acquired_period": DateTimeRange | None, "import_success": bool}}

        Where:
            - requested_period is the period that was requested by the importer to the data source based on missing data in performance_db
            - acquired_period is the period that was actually acquired from the data source and can be None if no data was acquired
            - import_success is a boolean that indicates if the import was successful or not
    mode : str, optional
        Defines how the calculation period will be defined. Can be one of:
            - "single": will return a single DateTimeRange object or None (in case it failed to define the calculation period)
            - "multiple": will return a dict with the calculation period for each object.
                The dict will be in the format: {object_name: {"calc_period": DateTimeRange | None}}
    fallback : DateTimeRange | PeriodDict | None, optional
        Period to be used in case it is not possible to define the calculation period based on the periods of the objects.
        By default None

    Returns
    -------
    DateTimeRange | None
        Period to be used in calculations. If None, no calculations should be performed.

    """
    if not isinstance(periods, dict):
        raise TypeError(f"periods must be a dictionary, not {type(periods)}")
    if not all(isinstance(x, str) for x in periods):
        raise TypeError("periods keys must be strings")
    if not all(isinstance(x, dict) for x in periods.values()):
        raise TypeError("periods values must be dictionaries")

    # defining format of output
    match mode:
        case "single":
            output = {"start": datetime(2100, 1, 1), "end": datetime(1900, 1, 1)}
        case "multiple":
            output = copy.deepcopy(periods)
        case _:
            raise ValueError(f"mode must be 'single' or 'multiple', not '{mode}'")

    for object_name, calc_periods in periods.items():
        # checking if calc_periods has the required keys
        if {"requested_period", "acquired_period"} in set(calc_periods.keys()):
            raise ValueError(
                f"'{object_name}' - calc_periods must have keys 'requested_period' and 'acquired_period' but it only has {set(calc_periods.keys())}",
            )
        # checking if requested_period is a DateTimeRange or a dict with start and end
        if not isinstance(calc_periods["requested_period"], DateTimeRange | dict):
            raise TypeError(
                f"'{object_name}' - requested_period must be a DateTimeRange or dict, not {type(calc_periods['requested_period'])}",
            )
        if isinstance(calc_periods["requested_period"], dict) and any(
            x not in calc_periods["requested_period"] for x in ["start", "end"]
        ):
            raise ValueError(f"'{object_name}' - requested_period must have keys 'start' and 'end'")
        # checking if acquired_period is a DateTimeRange or None
        if not isinstance(calc_periods["acquired_period"], DateTimeRange | type(None)):
            raise TypeError(
                f"'{object_name}' - acquired_period must be a DateTimeRange or None, not {type(calc_periods['acquired_period'])}",
            )
        # defining period based on this object
        this_obj_end = None
        this_obj_start = None
        if calc_periods["acquired_period"] is not None:
            this_obj_end = max(calc_periods["acquired_period"]["end"], calc_periods["requested_period"]["end"])
            this_obj_start = min(calc_periods["acquired_period"]["start"], calc_periods["requested_period"]["start"])
        elif calc_periods["requested_period"]["end"] is not None or calc_periods["requested_period"]["start"] is not None:
            if calc_periods["requested_period"]["end"] is not None:
                this_obj_end = calc_periods["requested_period"]["end"]
            if calc_periods["requested_period"]["start"] is not None:
                this_obj_start = calc_periods["requested_period"]["start"]
        # correcting period based on fallback if needed
        if fallback is not None:
            if fallback["end"] is not None and this_obj_end is None:
                this_obj_end = fallback["end"]
            if fallback["start"] is not None and this_obj_start is None:
                this_obj_start = fallback["start"]

        # updating output
        match mode:
            case "single":
                if this_obj_end is None or this_obj_start is None:
                    continue
                output["start"] = min(output["start"], this_obj_start)
                output["end"] = max(output["end"], this_obj_end)
            case "multiple":
                if this_obj_end is None or this_obj_start is None:
                    logger.warning(f"Could not determine calculation period for {object_name}. {calc_periods=}, {fallback=}")
                    output[object_name]["calc_period"] = None
                else:
                    output[object_name]["calc_period"] = DateTimeRange(this_obj_start, this_obj_end)
            case _:
                raise ValueError(f"mode must be 'single' or 'multiple', not '{mode}'")

    if mode == "single":
        if output["start"] == datetime(2100, 1, 1) or output["end"] == datetime(1900, 1, 1):
            output = None
        else:
            output = DateTimeRange.from_dict(output)

    return output

feature_values(period, objects=None, features=None, data_sources=None, overwrite=True, overlap_interval=None, recalc_features=False, upload_to_bazefield=True, bazefield_opc_backup=False, sort_objects='no', **kwargs)

Method that imports feature values from a data source to performance database. It allows the user to specify the time period, objects, and features to import. The method can overwrite existing data in the performance database, and it can also upload features to a corresponding Bazefield point.

Parameters:

  • period

    (DateTimeRange | PeriodDict | None) –

    Desired period to import the data. If None, the period will go from the last timestamp available for the object to the current time.

  • objects

    (list[str] | None, default: None ) –

    Wanted objects as defined in performance_db.

    If is [] will get all connected objects to data sources of the desired type, as defined in init of subclasses

    By default [].

  • features

    (list[str] | None, default: None ) –

    If you just want to import some features, select them here.

    It's expected that you provide names as defined in performance_db.

    By default []

  • data_sources

    (list[str] | None, default: None ) –

    If you just want to import some data sources, select them here.

    It's expected that you provide data source names as defined in performance_db.

    By default [].

  • overwrite

    (bool, default: True ) –

    If set to true will overwrite data in performance_db, by default True

  • overlap_interval

    (relativedelta, default: None ) –

    How many days in the past to get data that was already acquired.

    By default relativedelta(days=2)

  • recalc_features

    (bool, default: False ) –

    If set to true will recalc all turbine features in performance_db for the period, by default False

  • upload_to_bazefield

    (bool, default: True ) –

    If set to True will upload features with a corresponding Bazefield point to Bazefield, by default True

  • bazefield_opc_backup

    (bool, default: False ) –

    If set to True and the feature has an attribute called "feature_bazefield_backup_opc_point", when there is missing data will try to get aggregated data from Bazefield OPC tag to fill the gaps, by default False

  • sort_objects

    (Literal['no', 'alphabetical', 'random'], default: 'no' ) –

    How objects should be sorted, which will impact in the order they are imported. Can be one of: - "no": no sorting - "alphabetical": sort by name - "random": sort randomly

    By default "no"

  • **kwargs

    Additional keyword arguments to be passed to the feature_values method of the handler class.

Returns:

  • tuple[ErrorSummary, dict[str, DateTimeRange | None | bool]]

    Summary of errors that occurred during the import Dict with imported periods in the format {object_name: {"acquired_period": DateTimeRange, "requested_period": DateTimeRange, "import_success": bool}}

Source code in echo_dataimporter/importer.py
@validate_call
def feature_values(
    self,
    period: DateTimeRange | PeriodDict | None,
    objects: list[str] | None = None,
    features: list[str] | None = None,
    data_sources: list[str] | None = None,
    overwrite: bool = True,
    overlap_interval: relativedelta | None = None,
    recalc_features: bool = False,
    upload_to_bazefield: bool = True,
    bazefield_opc_backup: bool = False,
    sort_objects: Literal["no", "alphabetical", "random"] = "no",
    **kwargs,  # pylint: disable=unused-argument # noqa
) -> tuple[ErrorSummary, dict[str, DateTimeRange | None | bool]]:
    """Method that imports feature values from a data source to performance database. It allows the user to specify the time period, objects, and features to import. The method can overwrite existing data in the performance database, and it can also upload features to a corresponding Bazefield point.

    Parameters
    ----------
    period : DateTimeRange | PeriodDict | None
        Desired period to import the data. If None, the period will go from the last timestamp available for the object to the current time.
    objects : list[str] | None, optional
        Wanted objects as defined in performance_db.

        If is [] will get all connected objects to data sources of the desired type, as defined in __init__ of subclasses

        By default [].
    features : list[str] | None, optional
        If you just want to import some features, select them here.

        It's expected that you provide names as defined in performance_db.

        By default []
    data_sources : list[str] | None, optional
        If you just want to import some data sources, select them here.

        It's expected that you provide data source names as defined in performance_db.

        By default [].
    overwrite : bool, optional
        If set to true will overwrite data in performance_db, by default True
    overlap_interval : relativedelta, optional
        How many days in the past to get data that was already acquired.

        By default relativedelta(days=2)
    recalc_features : bool, optional
        If set to true will recalc all turbine features in performance_db for the period, by default False
    upload_to_bazefield : bool, optional
        If set to True will upload features with a corresponding Bazefield point to Bazefield, by default True
    bazefield_opc_backup : bool, optional
        If set to True and the feature has an attribute called "feature_bazefield_backup_opc_point", when there is missing data will try to get aggregated data from Bazefield OPC tag to fill the gaps, by default False
    sort_objects : Literal["no", "alphabetical", "random"], optional
        How objects should be sorted, which will impact in the order they are imported. Can be one of:
            - "no": no sorting
            - "alphabetical": sort by name
            - "random": sort randomly

        By default "no"
    **kwargs
        Additional keyword arguments to be passed to the feature_values method of the handler class.

    Returns
    -------
    tuple[ErrorSummary, dict[str, DateTimeRange | None | bool]]
        Summary of errors that occurred during the import
        Dict with imported periods in the format {object_name: {"acquired_period": DateTimeRange, "requested_period": DateTimeRange, "import_success": bool}}

    """
    if features is None:
        features = []
    if objects is None:
        objects = []
    if data_sources is None:
        data_sources = []
    if overlap_interval is None:
        overlap_interval = relativedelta(days=2)

    logger.info(f"Starting {type(self).__name__}.{self.feature_values.__name__}")

    errs = ErrorSummary("DataImporter.feature_values")

    # getting wanted objects
    wanted_objects = self._get_wanted_objects(objects)

    # sorting objects
    wanted_objects = self._sort_objects(wanted_objects, sort_objects)

    periods_dict = {}

    # converting period to a fixed end in case is None
    # this is done so all objects will have the same period end while importing
    if period is None:
        period = {"start": None, "end": datetime.now()}

    # TODO if possible, it would be good to parallelize this code
    for object_name in wanted_objects:
        logger.info(
            f"Getting feature values for '{object_name}': {wanted_objects.index(object_name) + 1} of {len(wanted_objects)} objects",
        )
        try:
            resulting_periods = self.single_obj_feature_values(
                period=period,
                object_name=object_name,
                features=features,
                data_sources=data_sources,
                overwrite=overwrite,
                overlap_interval=overlap_interval,
                upload_to_bazefield=upload_to_bazefield,
                bazefield_opc_backup=bazefield_opc_backup,
            )

            final_resulting_periods = {}
            final_resulting_periods["requested_period"] = resulting_periods[next(iter(resulting_periods))]["requested_period"]
            final_resulting_periods["acquired_period"] = resulting_periods[next(iter(resulting_periods))]["acquired_period"]
            for result in resulting_periods.values():
                for period_type in ["requested_period", "acquired_period"]:
                    if result[period_type] is not None:
                        final_resulting_periods[period_type] = DateTimeRange(
                            min(final_resulting_periods[period_type].start, result[period_type].start),
                            max(final_resulting_periods[period_type].end, result[period_type].end),
                        )

            periods_dict[object_name] = final_resulting_periods | {"import_success": True}

        except Exception as e:
            logger.exception(f"'{object_name}' - Failed to get feature values")
            err_obj = ErrorObject(object_name, [e])
            errs.add_child(err_obj)

            periods_dict[object_name] = {"acquired_period": None, "requested_period": period, "import_success": False}

    # recalculating server_calc features
    if recalc_features:
        # defining calculation period
        calc_periods = DataImporter.define_calc_period(periods_dict, mode="multiple", fallback=period)

        for object_name, obj_calc_periods in calc_periods.items():
            if obj_calc_periods["calc_period"] is not None:
                calc = CalculationHandler.from_type_defaults(object_type="infer", object_filters={"object_names": [object_name]})
                calc_errs = calc[0].calculate(period=obj_calc_periods["calc_period"])
            else:
                error = ValueError("Calculation period is None")
                calc_errs = ErrorDataSource("server_calc", [error])

            errs.add_child(calc_errs)

    return errs, periods_dict

single_obj_alarms_history(period, object_name, overlap_interval=None, data_sources=None, **kwargs)

Method to get alarms history for a single object.

Parameters:

  • period

    (DateTimeRange | PeriodDict | None) –

    Desired period to import the data. Can be one of the following: - PeriodDict: Dict with start and end timestamps. - DateTimeRange - None

    In case None, the period will go from the last timestamp available for the object to the current time.

  • object_name

    (str) –

    Name of the object in performance_db.

  • overlap_interval

    (relativedelta | None, default: None ) –

    How many days in the past to get data that was already acquired.

    This is useful to make sure alarms already acquired will have a correct end.

    By default relativedelta(days=0)

  • data_sources

    (list[str] | None, default: None ) –

    If you just want to import some data sources, select them here.

    It's expected that you provide data source names as defined in performance_db.

    By default [].

  • **kwargs

    Additional keyword arguments to be passed to the alarm_history method of the handler class.

Returns:

  • tuple[DateTimeRange, DateTimeRange | None]

    Requested period and period that was actually imported

Source code in echo_dataimporter/importer.py
@validate_call
def single_obj_alarms_history(
    self,
    period: DateTimeRange | PeriodDict | None,
    object_name: str,
    overlap_interval: relativedelta | None = None,
    data_sources: list[str] | None = None,
    **kwargs,  # pylint: disable=unused-argument
) -> tuple[DateTimeRange, DateTimeRange | None]:
    """Method to get alarms history for a single object.

    Parameters
    ----------
    period : DateTimeRange | PeriodDict | None
        Desired period to import the data. Can be one of the following:
            - PeriodDict: Dict with start and end timestamps.
            - DateTimeRange
            - None

        In case None, the period will go from the last timestamp available for the object to the current time.
    object_name : str
        Name of the object in performance_db.
    overlap_interval : relativedelta | None, optional
        How many days in the past to get data that was already acquired.

        This is useful to make sure alarms already acquired will have a correct end.

        By default relativedelta(days=0)
    data_sources : list[str] | None, optional
        If you just want to import some data sources, select them here.

        It's expected that you provide data source names as defined in performance_db.

        By default [].
    **kwargs
        Additional keyword arguments to be passed to the alarm_history method of the handler class.

    Returns
    -------
    tuple[DateTimeRange, DateTimeRange | None]
        Requested period and period that was actually imported

    """
    if overlap_interval is None:
        overlap_interval = relativedelta(days=0)
    if data_sources is None:
        data_sources = []

    # checking if period is correct
    if not isinstance(period, dict | DateTimeRange | type(None)):
        raise TypeError(f"period must be a dict, DateTimeRange, or None, not {type(period)}")
    # checking in case of dict
    if isinstance(period, dict):
        # keys
        if set(period.keys()) != {"start", "end"}:
            raise ValueError(f"period dict must have keys 'start' and 'end', not {set(period.keys())}")
        # values
        if not all(isinstance(x, datetime | type(None)) for x in period.values()):
            wrong_values = [x for x in period.values() if not isinstance(x, datetime | type(None))]
            raise TypeError(
                f"period values must be datetime or None. Wrong values {wrong_values}",
            )

    logger.info(f"'{object_name}' - Importing alarms considering period {period}")

    # getting data source of the object
    # TODO this assumes that the object is only connected to one data source (which is true for now), but in the future we might have to change this
    data_source = self._get_object_data_sources(object_name=object_name, data_sources=data_sources)[0]
    ds_properties = self.data_sources[data_source]
    ds_type = ds_properties["data_source_type_name"]

    # connecting to data source
    try:
        data_handler = HANDLER_MAPPING[ds_type]["handler_class"](data_source_name=data_source, **kwargs)
    except Exception as e:
        raise ConnectionError(f"Failed to connect to {data_source}") from e

    # defining period
    request_period = copy.deepcopy(period)

    # if period is not defined, set it to start from 2010-01-01
    if request_period is None:
        request_period = {"start": None, "end": None}

    first_time_stamp = request_period["start"]
    if first_time_stamp is None:
        first_time_stamp = datetime(2010, 1, 1)

    # if start timestamp is not defined, set it to the last alarm timestamp
    if request_period["start"] is None:
        lates_alarm = self.perfdb.alarms.history.get_latest(object_names=[object_name], only_timestamp=True)
        request_period["start"] = lates_alarm.get(object_name, None)
    if request_period["start"] is None:
        request_period["start"] = first_time_stamp

    # make sure start timestamp is greater than or equal to the defined period
    request_period["start"] = max(first_time_stamp, request_period["start"])

    # subtract overlap interval from start timestamp
    request_period["start"] -= overlap_interval

    # if end timestamp is not defined, set it to the current datetime
    if request_period["end"] is None:
        request_period["end"] = datetime.now()

    # convert request_period to DateTimeRange
    if isinstance(request_period, dict):
        request_period = DateTimeRange.from_dict(request_period)

    logger.info(f"'{object_name}' - Getting alarms for the period {request_period}")

    # getting data
    try:
        # get alarm history for object within defined period
        df = data_handler.alarm_history(object_name=object_name, period=request_period, **kwargs)

        # drop any duplicate alarms based on start, manufacturer_id, alarm_type, and object_name
        df = df.drop_duplicates(subset=["start", "manufacturer_id", "alarm_type", "object_name"], keep="first")

        # check for any alarms where end < start - timedelta(seconds=0.1) and set end to None
        wrong_idx = df[df["end"] < df["start"] - timedelta(seconds=0.1)].index
        df.loc[wrong_idx, "end"] = None

        # set end equal to start if end < start (this assumes that all with end before start - timedelta(seconds=0.1) where already set to None)
        wrong_idx = df[df["end"] < df["start"]].index
        df.loc[wrong_idx, "end"] = df.loc[wrong_idx, "start"]

    except Exception as e:
        raise RuntimeError(f"Failed to get alarm history for object '{object_name}'") from e

    # uploading alarms history
    self.perfdb.alarms.history.insert(df, create_definitions=True, on_conflict="update")

    if len(df) >= 1:
        min_date = df["start"].min()
        max_date = df["end"].max() if df["end"].count() > 0 else df["start"].max()
        acquired_period = DateTimeRange(min_date, max_date)
    else:
        acquired_period = None

    logger.info(
        f"'{object_name}' - Finished getting alarms for the period {request_period} - Acquired data for period {acquired_period} - Got {len(df)} alarms",
    )

    return request_period, acquired_period

single_obj_feature_values(period, object_name, features, data_sources=None, overwrite=True, overlap_interval=None, upload_to_bazefield=True, bazefield_opc_backup=False, **kwargs)

Method used to import feature values of a single object.

Parameters:

  • period

    (DateTimeRange | PeriodDict | None) –

    Desired period to import the data. If None, the period will go from the last timestamp available for the object to the current time.

  • object_name

    (str) –

    Name of the object in performance_db.

  • features

    (list[str]) –

    If you just want to import some features, select them here.

    It's expected that you provide names as defined in performance_db.

    By default []

  • data_sources

    (list[str] | None, default: None ) –

    If you just want to import some data sources, select them here.

    It's expected that you provide data source names as defined in performance_db.

    By default [].

  • overwrite

    (bool, default: True ) –

    If set to true will overwrite data in performance_db, by default True

  • overlap_interval

    (relativedelta | None, default: None ) –

    How many days in the past to get data that was already acquired.

    By default relativedelta(days=0)

  • upload_to_bazefield

    (bool, default: True ) –

    If set to True will upload features with a corresponding Bazefield point to Bazefield, by default True

  • bazefield_opc_backup

    (bool, default: False ) –

    If set to True and the feature has an attribute called "feature_bazefield_backup_opc_point", when there is missing data will try to get aggregated data from Bazefield OPC tag to fill the gaps, by default False

  • **kwargs

    Additional keyword arguments to be passed to the feature_values method of the handler class.

Returns:

  • dict[str, DateTimeRange | None]

    Dict with the acquired period in the format {data_source: {"acquired_period": DateTimeRange, "acquired_period": DateTimeRange | None}}

Source code in echo_dataimporter/importer.py
@validate_call
def single_obj_feature_values(
    self,
    period: DateTimeRange | PeriodDict | None,
    object_name: str,
    features: list[str],
    data_sources: list[str] | None = None,
    overwrite: bool = True,
    overlap_interval: relativedelta | None = None,
    upload_to_bazefield: bool = True,
    bazefield_opc_backup: bool = False,
    **kwargs,
) -> dict[str, DateTimeRange | None]:
    """Method used to import feature values of a single object.

    Parameters
    ----------
    period : DateTimeRange | PeriodDict | None
        Desired period to import the data. If None, the period will go from the last timestamp available for the object to the current time.
    object_name : str
        Name of the object in performance_db.
    features : list[str], optional
        If you just want to import some features, select them here.

        It's expected that you provide names as defined in performance_db.

        By default []
    data_sources : list[str] | None, optional
        If you just want to import some data sources, select them here.

        It's expected that you provide data source names as defined in performance_db.

        By default [].
    overwrite : bool, optional
        If set to true will overwrite data in performance_db, by default True
    overlap_interval : relativedelta | None, optional
        How many days in the past to get data that was already acquired.

        By default relativedelta(days=0)
    upload_to_bazefield : bool, optional
        If set to True will upload features with a corresponding Bazefield point to Bazefield, by default True
    bazefield_opc_backup : bool, optional
        If set to True and the feature has an attribute called "feature_bazefield_backup_opc_point", when there is missing data will try to get aggregated data from Bazefield OPC tag to fill the gaps, by default False
    **kwargs
        Additional keyword arguments to be passed to the feature_values method of the handler class.

    Returns
    -------
    dict[str, DateTimeRange | None]
        Dict with the acquired period in the format {data_source: {"acquired_period": DateTimeRange, "acquired_period": DateTimeRange | None}}

    """
    if overlap_interval is None:
        overlap_interval = relativedelta(days=0)

    logger.info(f"'{object_name}' - Importing feature values considering period {period}")

    # getting data source of the object
    data_sources = self._get_object_data_sources(object_name=object_name, data_sources=data_sources)
    logger.info(f"'{object_name}' - Found data sources: {data_sources}")

    period_results = {}

    for data_source in data_sources:
        logger.info(f"'{object_name}' - Importing from data source '{data_source}'")

        ds_properties = self.data_sources[data_source]
        ds_type = ds_properties["data_source_type_name"]

        # connecting to data source
        try:
            data_handler: DataHandler = HANDLER_MAPPING[ds_type]["handler_class"](data_source_name=data_source, **kwargs)
        except Exception as e:
            raise ConnectionError(f"Failed to connect to {data_source}") from e

        # getting features
        features_df = self.perfdb.features.definitions.get(
            object_names=[object_name],
            get_attributes=True,
            output_type="DataFrame",
        ).reset_index(drop=False)

        features_df = features_df[features_df["data_source_type_name"] == ds_type].copy()
        original_features_df = features_df.copy()
        features_df = features_df.drop_duplicates(subset=["name"], keep="first")

        # filtering features
        if features:
            features_df = features_df[features_df["name"].isin(features)].copy()
        if features_df.empty:
            logger.warning(f"features_df has no values for {object_name=} and {features=}")
            continue

        # defining period
        request_period = copy.deepcopy(period)

        if request_period is None:
            request_period = {"start": None, "end": None}

        first_time_stamp = request_period["start"]
        if first_time_stamp is None:
            first_time_stamp = datetime(2010, 1, 1)

        if request_period["start"] is None:
            request_period["start"] = datetime(3000, 1, 1)
            for feature in HANDLER_MAPPING[ds_type]["last_timestamp_features"]:
                latest_value = self.perfdb.features.values.latest.get(features={object_name: [feature]})
                feature_start = None if len(latest_value) == 0 else latest_value["timestamp"].iloc[0]
                if feature_start is None:
                    continue
                request_period["start"] = min(request_period["start"], feature_start)
            if request_period["start"] is None or request_period["start"] == datetime(3000, 1, 1):
                request_period["start"] = first_time_stamp

        request_period["start"] = max(first_time_stamp, request_period["start"])

        start_without_overlap_interval = request_period["start"]
        request_period["start"] -= overlap_interval

        if request_period["end"] is None:
            request_period["end"] = datetime.now()

        # converting this_period to DateTimeRange
        if isinstance(request_period, dict):
            request_period = DateTimeRange.from_dict(request_period)

        logger.info(f"'{object_name}' - Getting data for the period {request_period}")

        # getting data
        df = pd.DataFrame()
        try:
            df = data_handler.feature_values(object_name=object_name, period=request_period, features_df=features_df, **kwargs)
        except Exception as e:
            raise RuntimeError(f"Failed to acquire feature values for object '{object_name}'") from e

        # getting index frequency
        # inferring frequency if reindex is "infer"
        if len(df) > 3:
            reindex = pd.infer_freq(df.index)
        else:
            logger.debug("Cannot infer frequency from data because it has less than 3 timestamps. Using '10min' as default.")
            reindex = "10min"

        # if failed (returned None), lets try other function
        if reindex is None:
            reindex = get_index_freq(df)

        # if still failed, lets raise an error
        if reindex is None:
            raise RuntimeError("Cannot infer frequency from data.")

        # expected index so all timestamps match index fre
        complete_index = pd.date_range(
            pd.Timestamp(request_period["start"]).floor(timedelta(minutes=10)).to_pydatetime(),
            pd.Timestamp(request_period["end"]).ceil(timedelta(minutes=10)).to_pydatetime(),
            freq=reindex,
        )

        df.index = df.index.astype("datetime64[ns]")
        df = df.reindex(
            complete_index,
            method="nearest",
            tolerance=timedelta(minutes=1),
        )

        df = df[(df.index >= request_period["start"]) & (df.index <= request_period["end"])]

        # checking if needs data from Bazefield OPC point
        if bazefield_opc_backup and "feature_bazefield_backup_opc_point" in features_df.columns:
            try:
                for _, backup_feature_row in features_df[features_df["feature_bazefield_backup_opc_point"].notna()].iterrows():
                    if backup_feature_row["name"] not in df.columns:
                        df[backup_feature_row["name"]] = np.nan

                    missing_index = complete_index.difference(df[df[backup_feature_row["name"]].notna()].index)

                    # only getting from bazefield if at least 3 timestamps are needed
                    if len(missing_index) < 3:
                        continue

                    missing_period = DateTimeRange(missing_index[0] - timedelta(minutes=10), missing_index[-1] + timedelta(minutes=10))

                    baze_df = self.baze.points.values.series.get(
                        points={object_name: [backup_feature_row["feature_bazefield_backup_opc_point"]["point"]]},
                        period=missing_period,
                        aggregation=backup_feature_row["feature_bazefield_backup_opc_point"]["aggregate"],
                        aggregation_interval=timedelta(minutes=10),
                        filter_quality="Good",
                    )
                    baze_df = baze_df.droplevel(0, axis=1)

                    if baze_df.empty:
                        continue

                    # moving to 10 minutes after as SCADA data has data labeled at the end of the 10 min interval
                    # TODO this needs to be evaluated as might not be the true for all data sources
                    baze_df.index = baze_df.index + timedelta(minutes=10)

                    # getting intersection of indexes
                    baze_df = baze_df.loc[baze_df.index.intersection(missing_index), :]

                    # renaming columns
                    baze_df = baze_df.rename(
                        columns={backup_feature_row["feature_bazefield_backup_opc_point"]["point"]: backup_feature_row["name"]},
                    )

                    # updating on df
                    df.update(baze_df, overwrite=False)

                    logger.debug(
                        f"{object_name} {backup_feature_row['name']}: Got backup data from Bazefield point {backup_feature_row['feature_bazefield_backup_opc_point']} Timestamps={len(baze_df)} {missing_period}",
                    )

            except Exception:
                logger.exception(
                    f"{object_name} {backup_feature_row['name']}: Failed to get backup data from Bazefield point {backup_feature_row['feature_bazefield_backup_opc_point']} {missing_period}",
                )

        df = df.astype("double[pyarrow]")
        df = df.dropna(how="all")

        # deleting repeated values based on wind speed
        wind_speed_df = features_df[features_df["name"] == "WindSpeed_10min.AVG"]
        wind_speed_cols = wind_speed_df["name"].to_list()
        if wind_speed_cols and wind_speed_cols[0] in df.columns:
            df, _ = filters.flatline_filter(
                df,
                target_feature=wind_speed_cols[0],
                thresh=0.00001,
                features_to_remove=list(df.columns),
                consecutive_periods=12,
            )

        # checking if got data
        if (len(df[df.index > start_without_overlap_interval]) == 0 if isinstance(df, pd.DataFrame) else False) or (df is None):
            logger.warning(f"No new data found for '{object_name}'")

        # uploading to postgres

        # converting df to have a multindex column with levels object_name and feature_name
        pg_df = df.copy()
        pg_df.columns = pd.MultiIndex.from_product([[object_name], pg_df.columns], names=["object_name", "feature_name"])

        self.perfdb.features.values.series.insert(
            df=pg_df,
            bazefield_upload=upload_to_bazefield,
            on_conflict="update" if overwrite else "ignore",
        )

        df = original_features_df

        # getting acquired period
        for col in HANDLER_MAPPING[ds_type]["last_timestamp_features"]:
            if col not in df.columns:
                df[col] = np.nan
        not_nan_df = df[HANDLER_MAPPING[ds_type]["last_timestamp_features"]].dropna(how="any")

        acquired_period = DateTimeRange(not_nan_df.index[0], not_nan_df.index[-1]) if len(not_nan_df) >= 2 else None

        logger.info(
            f"'{object_name}' - Finished getting data for the period {request_period} - Acquired data for period {acquired_period} - Got {len(df)} timestamps and {len(df.columns)} features",
        )

        period_results[data_source] = {"requested_period": request_period, "acquired_period": acquired_period}

    return period_results