Importer¶
DataImporter()
¶
Class used to import data from multiple sources to Echoenergia's performance_db.
Source code in echo_dataimporter/importer.py
def __init__(self) -> None:
"""DataImporter constructor."""
self.perfdb = PerfDB(application_name=f"{type(self).__name__}:{socket.gethostname()}")
self.baze = Baze()
self.data_sources: dict[str, dict[str, Any]] = self.perfdb.datasources.instances.get(
data_source_types_names=list(HANDLER_MAPPING.keys()),
get_attributes=True,
output_type="dict",
)
alarms_history(period, objects=None, overlap_interval=None, recalc_features=False, sort_objects='no', data_sources=None, **kwargs)
¶
Function imports alarms from a data source to performance_db within a specified period. It takes in a range of time, a list of objects, and an overlap interval as parameters. If no objects are specified, it will retrieve all connected objects to data sources of the desired type.
Parameters:
-
period
¶DateTimeRange | PeriodDict | None
) –Desired period to import the data
-
objects
¶list[str] | None
, default:None
) –Wanted objects as defined in performance_db.
If is [] will get all connected objects to data sources of the desired type, as defined in init of subclasses By default [].
-
overlap_interval
¶relativedelta | None
, default:None
) –How many days in the past to get data that was already acquired.
This is useful to make sure alarms already acquired will have a correct end.
By default relativedelta(days=5)
-
recalc_features
¶bool
, default:False
) –If set to true will recalc all turbine features related to alarms in performance_db for the period, by default False
-
sort_objects
¶Literal['no', 'alphabetical', 'random']
, default:'no'
) –How objects should be sorted, which will impact in the order they are imported. Can be one of: - "no": no sorting - "alphabetical": sort by name - "random": sort randomly
By default "no"
-
data_sources
¶list[str] | None
, default:None
) –If you just want to import some data sources, select them here.
It's expected that you provide data source names as defined in performance_db.]
By default [].
-
**kwargs
¶Additional keyword arguments to be passed to the alarm_history method of the handler class.
Returns:
-
tuple[ErrorSummary, dict[str, DateTimeRange | None | bool]]
–Summary of errors that occurred during the import Dict with imported periods in the format {object_name: {"acquired_period": DateTimeRange, "requested_period": DateTimeRange, "import_success": bool}}
Source code in echo_dataimporter/importer.py
@validate_call
def alarms_history(
self,
period: DateTimeRange | PeriodDict | None,
objects: list[str] | None = None,
overlap_interval: relativedelta | None = None,
recalc_features: bool = False,
sort_objects: Literal["no", "alphabetical", "random"] = "no",
data_sources: list[str] | None = None,
**kwargs, # pylint: disable=unused-argument# noqa
) -> tuple[ErrorSummary, dict[str, DateTimeRange | None | bool]]:
"""Function imports alarms from a data source to performance_db within a specified period. It takes in a range of time, a list of objects, and an overlap interval as parameters. If no objects are specified, it will retrieve all connected objects to data sources of the desired type.
Parameters
----------
period : DateTimeRange | PeriodDict | None
Desired period to import the data
objects : list[str] | None, optional
Wanted objects as defined in performance_db.
If is [] will get all connected objects to data sources of the desired type, as defined in __init__ of subclasses
By default [].
overlap_interval : relativedelta | None, optional
How many days in the past to get data that was already acquired.
This is useful to make sure alarms already acquired will have a correct end.
By default relativedelta(days=5)
recalc_features : bool, optional
If set to true will recalc all turbine features related to alarms in performance_db for the period, by default False
sort_objects : Literal["no", "alphabetical", "random"], optional
How objects should be sorted, which will impact in the order they are imported. Can be one of:
- "no": no sorting
- "alphabetical": sort by name
- "random": sort randomly
By default "no"
data_sources : list[str] | None, optional
If you just want to import some data sources, select them here.
It's expected that you provide data source names as defined in performance_db.]
By default [].
**kwargs
Additional keyword arguments to be passed to the alarm_history method of the handler class.
Returns
-------
tuple[ErrorSummary, dict[str, DateTimeRange | None | bool]]
Summary of errors that occurred during the import
Dict with imported periods in the format {object_name: {"acquired_period": DateTimeRange, "requested_period": DateTimeRange, "import_success": bool}}
"""
if objects is None:
objects = []
if overlap_interval is None:
overlap_interval = relativedelta(days=5)
logger.info(f"Starting {type(self).__name__}.{self.alarms_history.__name__}")
errs = ErrorSummary("DataImporter.alarms_history")
# getting wanted objects
wanted_objects = self._get_wanted_objects(objects)
# sorting objects
wanted_objects = self._sort_objects(wanted_objects, sort_objects)
periods_dict = {}
# TODO if possible, it would be good to parallelize this code
for object_name in wanted_objects:
logger.info(f"Getting alarms for '{object_name}': {wanted_objects.index(object_name) + 1} of {len(wanted_objects)} objects")
try:
requested_period, acquired_period = self.single_obj_alarms_history(
period=period,
object_name=object_name,
overlap_interval=overlap_interval,
data_sources=data_sources,
)
periods_dict[object_name] = {
"acquired_period": acquired_period,
"requested_period": requested_period,
"import_success": True,
}
except Exception as e:
logger.exception(f"Failed to get feature values for {object_name}")
err_obj = ErrorObject(object_name, [e])
errs.add_child(err_obj)
periods_dict[object_name] = {"acquired_period": None, "requested_period": period, "import_success": False}
# recalculating server_calc features
if recalc_features:
for object_name, this_period in periods_dict.items():
if isinstance(this_period["requested_period"], dict):
this_period["requested_period"] = DateTimeRange.from_dict(this_period["requested_period"])
calc = CalculationHandler.from_calc_types(calc_types=["alarm_active_time"], object_filters={"object_names": [object_name]})
calc.calculate(period=this_period["requested_period"])
return errs, periods_dict
define_calc_period(periods, mode='single', fallback=None)
staticmethod
¶
Method to define the period to be used in calculations based on the periods of the objects.
It will try to create the largest possible period based on the periods of the objects. In case mode is "single", it will return a single DateTimeRange that spans the minimum start and maximum end of both acquired and requested periods of all objects. In case mode is "multiple", it will do the same but for each individual object.
Parameters:
-
periods
¶dict[str, CalcPeriodDict]
) –Dict containing the periods requested and acquired for each object.
It is in the format: {object_name: {"requested_period": DateTimeRange, "acquired_period": DateTimeRange | None, "import_success": bool}}
Where: - requested_period is the period that was requested by the importer to the data source based on missing data in performance_db - acquired_period is the period that was actually acquired from the data source and can be None if no data was acquired - import_success is a boolean that indicates if the import was successful or not
-
mode
¶str
, default:'single'
) –Defines how the calculation period will be defined. Can be one of: - "single": will return a single DateTimeRange object or None (in case it failed to define the calculation period) - "multiple": will return a dict with the calculation period for each object. The dict will be in the format: {object_name: {"calc_period": DateTimeRange | None}}
-
fallback
¶DateTimeRange | PeriodDict | None
, default:None
) –Period to be used in case it is not possible to define the calculation period based on the periods of the objects. By default None
Returns:
-
DateTimeRange | None
–Period to be used in calculations. If None, no calculations should be performed.
Source code in echo_dataimporter/importer.py
@staticmethod
@validate_call
def define_calc_period(
periods: dict[str, CalcPeriodDict],
mode: str = "single",
fallback: DateTimeRange | PeriodDict | None = None,
) -> dict[str, dict[str, DateTimeRange]] | DateTimeRange | None:
"""Method to define the period to be used in calculations based on the periods of the objects.
It will try to create the largest possible period based on the periods of the objects. In case mode is "single", it will return a single DateTimeRange that spans the minimum start and maximum end of both acquired and requested periods of all objects. In case mode is "multiple", it will do the same but for each individual object.
Parameters
----------
periods : dict[str, CalcPeriodDict]
Dict containing the periods requested and acquired for each object.
It is in the format: {object_name: {"requested_period": DateTimeRange, "acquired_period": DateTimeRange | None, "import_success": bool}}
Where:
- requested_period is the period that was requested by the importer to the data source based on missing data in performance_db
- acquired_period is the period that was actually acquired from the data source and can be None if no data was acquired
- import_success is a boolean that indicates if the import was successful or not
mode : str, optional
Defines how the calculation period will be defined. Can be one of:
- "single": will return a single DateTimeRange object or None (in case it failed to define the calculation period)
- "multiple": will return a dict with the calculation period for each object.
The dict will be in the format: {object_name: {"calc_period": DateTimeRange | None}}
fallback : DateTimeRange | PeriodDict | None, optional
Period to be used in case it is not possible to define the calculation period based on the periods of the objects.
By default None
Returns
-------
DateTimeRange | None
Period to be used in calculations. If None, no calculations should be performed.
"""
if not isinstance(periods, dict):
raise TypeError(f"periods must be a dictionary, not {type(periods)}")
if not all(isinstance(x, str) for x in periods):
raise TypeError("periods keys must be strings")
if not all(isinstance(x, dict) for x in periods.values()):
raise TypeError("periods values must be dictionaries")
# defining format of output
match mode:
case "single":
output = {"start": datetime(2100, 1, 1), "end": datetime(1900, 1, 1)}
case "multiple":
output = copy.deepcopy(periods)
case _:
raise ValueError(f"mode must be 'single' or 'multiple', not '{mode}'")
for object_name, calc_periods in periods.items():
# checking if calc_periods has the required keys
if {"requested_period", "acquired_period"} in set(calc_periods.keys()):
raise ValueError(
f"'{object_name}' - calc_periods must have keys 'requested_period' and 'acquired_period' but it only has {set(calc_periods.keys())}",
)
# checking if requested_period is a DateTimeRange or a dict with start and end
if not isinstance(calc_periods["requested_period"], DateTimeRange | dict):
raise TypeError(
f"'{object_name}' - requested_period must be a DateTimeRange or dict, not {type(calc_periods['requested_period'])}",
)
if isinstance(calc_periods["requested_period"], dict) and any(
x not in calc_periods["requested_period"] for x in ["start", "end"]
):
raise ValueError(f"'{object_name}' - requested_period must have keys 'start' and 'end'")
# checking if acquired_period is a DateTimeRange or None
if not isinstance(calc_periods["acquired_period"], DateTimeRange | type(None)):
raise TypeError(
f"'{object_name}' - acquired_period must be a DateTimeRange or None, not {type(calc_periods['acquired_period'])}",
)
# defining period based on this object
this_obj_end = None
this_obj_start = None
if calc_periods["acquired_period"] is not None:
this_obj_end = max(calc_periods["acquired_period"]["end"], calc_periods["requested_period"]["end"])
this_obj_start = min(calc_periods["acquired_period"]["start"], calc_periods["requested_period"]["start"])
elif calc_periods["requested_period"]["end"] is not None or calc_periods["requested_period"]["start"] is not None:
if calc_periods["requested_period"]["end"] is not None:
this_obj_end = calc_periods["requested_period"]["end"]
if calc_periods["requested_period"]["start"] is not None:
this_obj_start = calc_periods["requested_period"]["start"]
# correcting period based on fallback if needed
if fallback is not None:
if fallback["end"] is not None and this_obj_end is None:
this_obj_end = fallback["end"]
if fallback["start"] is not None and this_obj_start is None:
this_obj_start = fallback["start"]
# updating output
match mode:
case "single":
if this_obj_end is None or this_obj_start is None:
continue
output["start"] = min(output["start"], this_obj_start)
output["end"] = max(output["end"], this_obj_end)
case "multiple":
if this_obj_end is None or this_obj_start is None:
logger.warning(f"Could not determine calculation period for {object_name}. {calc_periods=}, {fallback=}")
output[object_name]["calc_period"] = None
else:
output[object_name]["calc_period"] = DateTimeRange(this_obj_start, this_obj_end)
case _:
raise ValueError(f"mode must be 'single' or 'multiple', not '{mode}'")
if mode == "single":
if output["start"] == datetime(2100, 1, 1) or output["end"] == datetime(1900, 1, 1):
output = None
else:
output = DateTimeRange.from_dict(output)
return output
feature_values(period, objects=None, features=None, data_sources=None, overwrite=True, overlap_interval=None, recalc_features=False, upload_to_bazefield=True, bazefield_opc_backup=False, sort_objects='no', **kwargs)
¶
Method that imports feature values from a data source to performance database. It allows the user to specify the time period, objects, and features to import. The method can overwrite existing data in the performance database, and it can also upload features to a corresponding Bazefield point.
Parameters:
-
period
¶DateTimeRange | PeriodDict | None
) –Desired period to import the data. If None, the period will go from the last timestamp available for the object to the current time.
-
objects
¶list[str] | None
, default:None
) –Wanted objects as defined in performance_db.
If is [] will get all connected objects to data sources of the desired type, as defined in init of subclasses
By default [].
-
features
¶list[str] | None
, default:None
) –If you just want to import some features, select them here.
It's expected that you provide names as defined in performance_db.
By default []
-
data_sources
¶list[str] | None
, default:None
) –If you just want to import some data sources, select them here.
It's expected that you provide data source names as defined in performance_db.
By default [].
-
overwrite
¶bool
, default:True
) –If set to true will overwrite data in performance_db, by default True
-
overlap_interval
¶relativedelta
, default:None
) –How many days in the past to get data that was already acquired.
By default relativedelta(days=2)
-
recalc_features
¶bool
, default:False
) –If set to true will recalc all turbine features in performance_db for the period, by default False
-
upload_to_bazefield
¶bool
, default:True
) –If set to True will upload features with a corresponding Bazefield point to Bazefield, by default True
-
bazefield_opc_backup
¶bool
, default:False
) –If set to True and the feature has an attribute called "feature_bazefield_backup_opc_point", when there is missing data will try to get aggregated data from Bazefield OPC tag to fill the gaps, by default False
-
sort_objects
¶Literal['no', 'alphabetical', 'random']
, default:'no'
) –How objects should be sorted, which will impact in the order they are imported. Can be one of: - "no": no sorting - "alphabetical": sort by name - "random": sort randomly
By default "no"
-
**kwargs
¶Additional keyword arguments to be passed to the feature_values method of the handler class.
Returns:
-
tuple[ErrorSummary, dict[str, DateTimeRange | None | bool]]
–Summary of errors that occurred during the import Dict with imported periods in the format {object_name: {"acquired_period": DateTimeRange, "requested_period": DateTimeRange, "import_success": bool}}
Source code in echo_dataimporter/importer.py
@validate_call
def feature_values(
self,
period: DateTimeRange | PeriodDict | None,
objects: list[str] | None = None,
features: list[str] | None = None,
data_sources: list[str] | None = None,
overwrite: bool = True,
overlap_interval: relativedelta | None = None,
recalc_features: bool = False,
upload_to_bazefield: bool = True,
bazefield_opc_backup: bool = False,
sort_objects: Literal["no", "alphabetical", "random"] = "no",
**kwargs, # pylint: disable=unused-argument # noqa
) -> tuple[ErrorSummary, dict[str, DateTimeRange | None | bool]]:
"""Method that imports feature values from a data source to performance database. It allows the user to specify the time period, objects, and features to import. The method can overwrite existing data in the performance database, and it can also upload features to a corresponding Bazefield point.
Parameters
----------
period : DateTimeRange | PeriodDict | None
Desired period to import the data. If None, the period will go from the last timestamp available for the object to the current time.
objects : list[str] | None, optional
Wanted objects as defined in performance_db.
If is [] will get all connected objects to data sources of the desired type, as defined in __init__ of subclasses
By default [].
features : list[str] | None, optional
If you just want to import some features, select them here.
It's expected that you provide names as defined in performance_db.
By default []
data_sources : list[str] | None, optional
If you just want to import some data sources, select them here.
It's expected that you provide data source names as defined in performance_db.
By default [].
overwrite : bool, optional
If set to true will overwrite data in performance_db, by default True
overlap_interval : relativedelta, optional
How many days in the past to get data that was already acquired.
By default relativedelta(days=2)
recalc_features : bool, optional
If set to true will recalc all turbine features in performance_db for the period, by default False
upload_to_bazefield : bool, optional
If set to True will upload features with a corresponding Bazefield point to Bazefield, by default True
bazefield_opc_backup : bool, optional
If set to True and the feature has an attribute called "feature_bazefield_backup_opc_point", when there is missing data will try to get aggregated data from Bazefield OPC tag to fill the gaps, by default False
sort_objects : Literal["no", "alphabetical", "random"], optional
How objects should be sorted, which will impact in the order they are imported. Can be one of:
- "no": no sorting
- "alphabetical": sort by name
- "random": sort randomly
By default "no"
**kwargs
Additional keyword arguments to be passed to the feature_values method of the handler class.
Returns
-------
tuple[ErrorSummary, dict[str, DateTimeRange | None | bool]]
Summary of errors that occurred during the import
Dict with imported periods in the format {object_name: {"acquired_period": DateTimeRange, "requested_period": DateTimeRange, "import_success": bool}}
"""
if features is None:
features = []
if objects is None:
objects = []
if data_sources is None:
data_sources = []
if overlap_interval is None:
overlap_interval = relativedelta(days=2)
logger.info(f"Starting {type(self).__name__}.{self.feature_values.__name__}")
errs = ErrorSummary("DataImporter.feature_values")
# getting wanted objects
wanted_objects = self._get_wanted_objects(objects)
# sorting objects
wanted_objects = self._sort_objects(wanted_objects, sort_objects)
periods_dict = {}
# converting period to a fixed end in case is None
# this is done so all objects will have the same period end while importing
if period is None:
period = {"start": None, "end": datetime.now()}
# TODO if possible, it would be good to parallelize this code
for object_name in wanted_objects:
logger.info(
f"Getting feature values for '{object_name}': {wanted_objects.index(object_name) + 1} of {len(wanted_objects)} objects",
)
try:
resulting_periods = self.single_obj_feature_values(
period=period,
object_name=object_name,
features=features,
data_sources=data_sources,
overwrite=overwrite,
overlap_interval=overlap_interval,
upload_to_bazefield=upload_to_bazefield,
bazefield_opc_backup=bazefield_opc_backup,
)
final_resulting_periods = {}
final_resulting_periods["requested_period"] = resulting_periods[next(iter(resulting_periods))]["requested_period"]
final_resulting_periods["acquired_period"] = resulting_periods[next(iter(resulting_periods))]["acquired_period"]
for result in resulting_periods.values():
for period_type in ["requested_period", "acquired_period"]:
if result[period_type] is not None:
final_resulting_periods[period_type] = DateTimeRange(
min(final_resulting_periods[period_type].start, result[period_type].start),
max(final_resulting_periods[period_type].end, result[period_type].end),
)
periods_dict[object_name] = final_resulting_periods | {"import_success": True}
except Exception as e:
logger.exception(f"'{object_name}' - Failed to get feature values")
err_obj = ErrorObject(object_name, [e])
errs.add_child(err_obj)
periods_dict[object_name] = {"acquired_period": None, "requested_period": period, "import_success": False}
# recalculating server_calc features
if recalc_features:
# defining calculation period
calc_periods = DataImporter.define_calc_period(periods_dict, mode="multiple", fallback=period)
for object_name, obj_calc_periods in calc_periods.items():
if obj_calc_periods["calc_period"] is not None:
calc = CalculationHandler.from_type_defaults(object_type="infer", object_filters={"object_names": [object_name]})
calc_errs = calc[0].calculate(period=obj_calc_periods["calc_period"])
else:
error = ValueError("Calculation period is None")
calc_errs = ErrorDataSource("server_calc", [error])
errs.add_child(calc_errs)
return errs, periods_dict
single_obj_alarms_history(period, object_name, overlap_interval=None, data_sources=None, **kwargs)
¶
Method to get alarms history for a single object.
Parameters:
-
period
¶DateTimeRange | PeriodDict | None
) –Desired period to import the data. Can be one of the following: - PeriodDict: Dict with start and end timestamps. - DateTimeRange - None
In case None, the period will go from the last timestamp available for the object to the current time.
-
object_name
¶str
) –Name of the object in performance_db.
-
overlap_interval
¶relativedelta | None
, default:None
) –How many days in the past to get data that was already acquired.
This is useful to make sure alarms already acquired will have a correct end.
By default relativedelta(days=0)
-
data_sources
¶list[str] | None
, default:None
) –If you just want to import some data sources, select them here.
It's expected that you provide data source names as defined in performance_db.
By default [].
-
**kwargs
¶Additional keyword arguments to be passed to the alarm_history method of the handler class.
Returns:
-
tuple[DateTimeRange, DateTimeRange | None]
–Requested period and period that was actually imported
Source code in echo_dataimporter/importer.py
@validate_call
def single_obj_alarms_history(
self,
period: DateTimeRange | PeriodDict | None,
object_name: str,
overlap_interval: relativedelta | None = None,
data_sources: list[str] | None = None,
**kwargs, # pylint: disable=unused-argument
) -> tuple[DateTimeRange, DateTimeRange | None]:
"""Method to get alarms history for a single object.
Parameters
----------
period : DateTimeRange | PeriodDict | None
Desired period to import the data. Can be one of the following:
- PeriodDict: Dict with start and end timestamps.
- DateTimeRange
- None
In case None, the period will go from the last timestamp available for the object to the current time.
object_name : str
Name of the object in performance_db.
overlap_interval : relativedelta | None, optional
How many days in the past to get data that was already acquired.
This is useful to make sure alarms already acquired will have a correct end.
By default relativedelta(days=0)
data_sources : list[str] | None, optional
If you just want to import some data sources, select them here.
It's expected that you provide data source names as defined in performance_db.
By default [].
**kwargs
Additional keyword arguments to be passed to the alarm_history method of the handler class.
Returns
-------
tuple[DateTimeRange, DateTimeRange | None]
Requested period and period that was actually imported
"""
if overlap_interval is None:
overlap_interval = relativedelta(days=0)
if data_sources is None:
data_sources = []
# checking if period is correct
if not isinstance(period, dict | DateTimeRange | type(None)):
raise TypeError(f"period must be a dict, DateTimeRange, or None, not {type(period)}")
# checking in case of dict
if isinstance(period, dict):
# keys
if set(period.keys()) != {"start", "end"}:
raise ValueError(f"period dict must have keys 'start' and 'end', not {set(period.keys())}")
# values
if not all(isinstance(x, datetime | type(None)) for x in period.values()):
wrong_values = [x for x in period.values() if not isinstance(x, datetime | type(None))]
raise TypeError(
f"period values must be datetime or None. Wrong values {wrong_values}",
)
logger.info(f"'{object_name}' - Importing alarms considering period {period}")
# getting data source of the object
# TODO this assumes that the object is only connected to one data source (which is true for now), but in the future we might have to change this
data_source = self._get_object_data_sources(object_name=object_name, data_sources=data_sources)[0]
ds_properties = self.data_sources[data_source]
ds_type = ds_properties["data_source_type_name"]
# connecting to data source
try:
data_handler = HANDLER_MAPPING[ds_type]["handler_class"](data_source_name=data_source, **kwargs)
except Exception as e:
raise ConnectionError(f"Failed to connect to {data_source}") from e
# defining period
request_period = copy.deepcopy(period)
# if period is not defined, set it to start from 2010-01-01
if request_period is None:
request_period = {"start": None, "end": None}
first_time_stamp = request_period["start"]
if first_time_stamp is None:
first_time_stamp = datetime(2010, 1, 1)
# if start timestamp is not defined, set it to the last alarm timestamp
if request_period["start"] is None:
lates_alarm = self.perfdb.alarms.history.get_latest(object_names=[object_name], only_timestamp=True)
request_period["start"] = lates_alarm.get(object_name, None)
if request_period["start"] is None:
request_period["start"] = first_time_stamp
# make sure start timestamp is greater than or equal to the defined period
request_period["start"] = max(first_time_stamp, request_period["start"])
# subtract overlap interval from start timestamp
request_period["start"] -= overlap_interval
# if end timestamp is not defined, set it to the current datetime
if request_period["end"] is None:
request_period["end"] = datetime.now()
# convert request_period to DateTimeRange
if isinstance(request_period, dict):
request_period = DateTimeRange.from_dict(request_period)
logger.info(f"'{object_name}' - Getting alarms for the period {request_period}")
# getting data
try:
# get alarm history for object within defined period
df = data_handler.alarm_history(object_name=object_name, period=request_period, **kwargs)
# drop any duplicate alarms based on start, manufacturer_id, alarm_type, and object_name
df = df.drop_duplicates(subset=["start", "manufacturer_id", "alarm_type", "object_name"], keep="first")
# check for any alarms where end < start - timedelta(seconds=0.1) and set end to None
wrong_idx = df[df["end"] < df["start"] - timedelta(seconds=0.1)].index
df.loc[wrong_idx, "end"] = None
# set end equal to start if end < start (this assumes that all with end before start - timedelta(seconds=0.1) where already set to None)
wrong_idx = df[df["end"] < df["start"]].index
df.loc[wrong_idx, "end"] = df.loc[wrong_idx, "start"]
except Exception as e:
raise RuntimeError(f"Failed to get alarm history for object '{object_name}'") from e
# uploading alarms history
self.perfdb.alarms.history.insert(df, create_definitions=True, on_conflict="update")
if len(df) >= 1:
min_date = df["start"].min()
max_date = df["end"].max() if df["end"].count() > 0 else df["start"].max()
acquired_period = DateTimeRange(min_date, max_date)
else:
acquired_period = None
logger.info(
f"'{object_name}' - Finished getting alarms for the period {request_period} - Acquired data for period {acquired_period} - Got {len(df)} alarms",
)
return request_period, acquired_period
single_obj_feature_values(period, object_name, features, data_sources=None, overwrite=True, overlap_interval=None, upload_to_bazefield=True, bazefield_opc_backup=False, **kwargs)
¶
Method used to import feature values of a single object.
Parameters:
-
period
¶DateTimeRange | PeriodDict | None
) –Desired period to import the data. If None, the period will go from the last timestamp available for the object to the current time.
-
object_name
¶str
) –Name of the object in performance_db.
-
features
¶list[str]
) –If you just want to import some features, select them here.
It's expected that you provide names as defined in performance_db.
By default []
-
data_sources
¶list[str] | None
, default:None
) –If you just want to import some data sources, select them here.
It's expected that you provide data source names as defined in performance_db.
By default [].
-
overwrite
¶bool
, default:True
) –If set to true will overwrite data in performance_db, by default True
-
overlap_interval
¶relativedelta | None
, default:None
) –How many days in the past to get data that was already acquired.
By default relativedelta(days=0)
-
upload_to_bazefield
¶bool
, default:True
) –If set to True will upload features with a corresponding Bazefield point to Bazefield, by default True
-
bazefield_opc_backup
¶bool
, default:False
) –If set to True and the feature has an attribute called "feature_bazefield_backup_opc_point", when there is missing data will try to get aggregated data from Bazefield OPC tag to fill the gaps, by default False
-
**kwargs
¶Additional keyword arguments to be passed to the feature_values method of the handler class.
Returns:
-
dict[str, DateTimeRange | None]
–Dict with the acquired period in the format {data_source: {"acquired_period": DateTimeRange, "acquired_period": DateTimeRange | None}}
Source code in echo_dataimporter/importer.py
@validate_call
def single_obj_feature_values(
self,
period: DateTimeRange | PeriodDict | None,
object_name: str,
features: list[str],
data_sources: list[str] | None = None,
overwrite: bool = True,
overlap_interval: relativedelta | None = None,
upload_to_bazefield: bool = True,
bazefield_opc_backup: bool = False,
**kwargs,
) -> dict[str, DateTimeRange | None]:
"""Method used to import feature values of a single object.
Parameters
----------
period : DateTimeRange | PeriodDict | None
Desired period to import the data. If None, the period will go from the last timestamp available for the object to the current time.
object_name : str
Name of the object in performance_db.
features : list[str], optional
If you just want to import some features, select them here.
It's expected that you provide names as defined in performance_db.
By default []
data_sources : list[str] | None, optional
If you just want to import some data sources, select them here.
It's expected that you provide data source names as defined in performance_db.
By default [].
overwrite : bool, optional
If set to true will overwrite data in performance_db, by default True
overlap_interval : relativedelta | None, optional
How many days in the past to get data that was already acquired.
By default relativedelta(days=0)
upload_to_bazefield : bool, optional
If set to True will upload features with a corresponding Bazefield point to Bazefield, by default True
bazefield_opc_backup : bool, optional
If set to True and the feature has an attribute called "feature_bazefield_backup_opc_point", when there is missing data will try to get aggregated data from Bazefield OPC tag to fill the gaps, by default False
**kwargs
Additional keyword arguments to be passed to the feature_values method of the handler class.
Returns
-------
dict[str, DateTimeRange | None]
Dict with the acquired period in the format {data_source: {"acquired_period": DateTimeRange, "acquired_period": DateTimeRange | None}}
"""
if overlap_interval is None:
overlap_interval = relativedelta(days=0)
logger.info(f"'{object_name}' - Importing feature values considering period {period}")
# getting data source of the object
data_sources = self._get_object_data_sources(object_name=object_name, data_sources=data_sources)
logger.info(f"'{object_name}' - Found data sources: {data_sources}")
period_results = {}
for data_source in data_sources:
logger.info(f"'{object_name}' - Importing from data source '{data_source}'")
ds_properties = self.data_sources[data_source]
ds_type = ds_properties["data_source_type_name"]
# connecting to data source
try:
data_handler: DataHandler = HANDLER_MAPPING[ds_type]["handler_class"](data_source_name=data_source, **kwargs)
except Exception as e:
raise ConnectionError(f"Failed to connect to {data_source}") from e
# getting features
features_df = self.perfdb.features.definitions.get(
object_names=[object_name],
get_attributes=True,
output_type="DataFrame",
).reset_index(drop=False)
features_df = features_df[features_df["data_source_type_name"] == ds_type].copy()
original_features_df = features_df.copy()
features_df = features_df.drop_duplicates(subset=["name"], keep="first")
# filtering features
if features:
features_df = features_df[features_df["name"].isin(features)].copy()
if features_df.empty:
logger.warning(f"features_df has no values for {object_name=} and {features=}")
continue
# defining period
request_period = copy.deepcopy(period)
if request_period is None:
request_period = {"start": None, "end": None}
first_time_stamp = request_period["start"]
if first_time_stamp is None:
first_time_stamp = datetime(2010, 1, 1)
if request_period["start"] is None:
request_period["start"] = datetime(3000, 1, 1)
for feature in HANDLER_MAPPING[ds_type]["last_timestamp_features"]:
latest_value = self.perfdb.features.values.latest.get(features={object_name: [feature]})
feature_start = None if len(latest_value) == 0 else latest_value["timestamp"].iloc[0]
if feature_start is None:
continue
request_period["start"] = min(request_period["start"], feature_start)
if request_period["start"] is None or request_period["start"] == datetime(3000, 1, 1):
request_period["start"] = first_time_stamp
request_period["start"] = max(first_time_stamp, request_period["start"])
start_without_overlap_interval = request_period["start"]
request_period["start"] -= overlap_interval
if request_period["end"] is None:
request_period["end"] = datetime.now()
# converting this_period to DateTimeRange
if isinstance(request_period, dict):
request_period = DateTimeRange.from_dict(request_period)
logger.info(f"'{object_name}' - Getting data for the period {request_period}")
# getting data
df = pd.DataFrame()
try:
df = data_handler.feature_values(object_name=object_name, period=request_period, features_df=features_df, **kwargs)
except Exception as e:
raise RuntimeError(f"Failed to acquire feature values for object '{object_name}'") from e
# getting index frequency
# inferring frequency if reindex is "infer"
if len(df) > 3:
reindex = pd.infer_freq(df.index)
else:
logger.debug("Cannot infer frequency from data because it has less than 3 timestamps. Using '10min' as default.")
reindex = "10min"
# if failed (returned None), lets try other function
if reindex is None:
reindex = get_index_freq(df)
# if still failed, lets raise an error
if reindex is None:
raise RuntimeError("Cannot infer frequency from data.")
# expected index so all timestamps match index fre
complete_index = pd.date_range(
pd.Timestamp(request_period["start"]).floor(timedelta(minutes=10)).to_pydatetime(),
pd.Timestamp(request_period["end"]).ceil(timedelta(minutes=10)).to_pydatetime(),
freq=reindex,
)
df.index = df.index.astype("datetime64[ns]")
df = df.reindex(
complete_index,
method="nearest",
tolerance=timedelta(minutes=1),
)
df = df[(df.index >= request_period["start"]) & (df.index <= request_period["end"])]
# checking if needs data from Bazefield OPC point
if bazefield_opc_backup and "feature_bazefield_backup_opc_point" in features_df.columns:
try:
for _, backup_feature_row in features_df[features_df["feature_bazefield_backup_opc_point"].notna()].iterrows():
if backup_feature_row["name"] not in df.columns:
df[backup_feature_row["name"]] = np.nan
missing_index = complete_index.difference(df[df[backup_feature_row["name"]].notna()].index)
# only getting from bazefield if at least 3 timestamps are needed
if len(missing_index) < 3:
continue
missing_period = DateTimeRange(missing_index[0] - timedelta(minutes=10), missing_index[-1] + timedelta(minutes=10))
baze_df = self.baze.points.values.series.get(
points={object_name: [backup_feature_row["feature_bazefield_backup_opc_point"]["point"]]},
period=missing_period,
aggregation=backup_feature_row["feature_bazefield_backup_opc_point"]["aggregate"],
aggregation_interval=timedelta(minutes=10),
filter_quality="Good",
)
baze_df = baze_df.droplevel(0, axis=1)
if baze_df.empty:
continue
# moving to 10 minutes after as SCADA data has data labeled at the end of the 10 min interval
# TODO this needs to be evaluated as might not be the true for all data sources
baze_df.index = baze_df.index + timedelta(minutes=10)
# getting intersection of indexes
baze_df = baze_df.loc[baze_df.index.intersection(missing_index), :]
# renaming columns
baze_df = baze_df.rename(
columns={backup_feature_row["feature_bazefield_backup_opc_point"]["point"]: backup_feature_row["name"]},
)
# updating on df
df.update(baze_df, overwrite=False)
logger.debug(
f"{object_name} {backup_feature_row['name']}: Got backup data from Bazefield point {backup_feature_row['feature_bazefield_backup_opc_point']} Timestamps={len(baze_df)} {missing_period}",
)
except Exception:
logger.exception(
f"{object_name} {backup_feature_row['name']}: Failed to get backup data from Bazefield point {backup_feature_row['feature_bazefield_backup_opc_point']} {missing_period}",
)
df = df.astype("double[pyarrow]")
df = df.dropna(how="all")
# deleting repeated values based on wind speed
wind_speed_df = features_df[features_df["name"] == "WindSpeed_10min.AVG"]
wind_speed_cols = wind_speed_df["name"].to_list()
if wind_speed_cols and wind_speed_cols[0] in df.columns:
df, _ = filters.flatline_filter(
df,
target_feature=wind_speed_cols[0],
thresh=0.00001,
features_to_remove=list(df.columns),
consecutive_periods=12,
)
# checking if got data
if (len(df[df.index > start_without_overlap_interval]) == 0 if isinstance(df, pd.DataFrame) else False) or (df is None):
logger.warning(f"No new data found for '{object_name}'")
# uploading to postgres
# converting df to have a multindex column with levels object_name and feature_name
pg_df = df.copy()
pg_df.columns = pd.MultiIndex.from_product([[object_name], pg_df.columns], names=["object_name", "feature_name"])
self.perfdb.features.values.series.insert(
df=pg_df,
bazefield_upload=upload_to_bazefield,
on_conflict="update" if overwrite else "ignore",
)
df = original_features_df
# getting acquired period
for col in HANDLER_MAPPING[ds_type]["last_timestamp_features"]:
if col not in df.columns:
df[col] = np.nan
not_nan_df = df[HANDLER_MAPPING[ds_type]["last_timestamp_features"]].dropna(how="any")
acquired_period = DateTimeRange(not_nan_df.index[0], not_nan_df.index[-1]) if len(not_nan_df) >= 2 else None
logger.info(
f"'{object_name}' - Finished getting data for the period {request_period} - Acquired data for period {acquired_period} - Got {len(df)} timestamps and {len(df.columns)} features",
)
period_results[data_source] = {"requested_period": request_period, "acquired_period": acquired_period}
return period_results