Huawei FTP¶
TODO @LucasCosta97:
- Add a description of the Huawei FTP data importer.
- Explain how to use it, including any necessary configurations on the database, smart logger and filezilla server. If your are not able to detail these steps, leave a section with a TODO for me (Bruno) to complete later.
Class Definition¶
HuaweiFTPDataHandler(data_source_name=None, connection_properties=None, **kwargs)
¶
Subclass of DataHandler used to interact with Huawei FTP.
It can be initiated with data_source_name or connection_properties attributes (at least one of them must not be None).
The connection is done to the root endpoint of the Huawei FTP (only the host is needed) and we expect no actual response, just that cookie ".ASPXAUTH" is set in the session.
This is done to authenticate the user and set the session for the rest of the requests.
Parameters:
-
data_source_name
¶str | None
, default:None
) –Name of data source as defined in performance_db. If used the connection_properties will be created automatically based on the data source attributes in performance_db. By default None
-
connection_properties
¶HttpConnProperties | None
, default:None
) –If you want to manually connect, provide the connection properties. By default None
Other Parameters:
-
conn_timeout
(int
) –Connection timeout in seconds. By default 30
-
request_timeout
(int
) –Request timeout in seconds. By default 120
Source code in echo_dataimporter/huawei_handler.py
def __init__(
self,
data_source_name: str | None = None,
connection_properties: FtpConnProperties | None = None,
**kwargs,
) -> None:
"""Constructor of the HuaweiFTPDataHandler class.
It can be initiated with data_source_name or connection_properties attributes (at least one of them must not be None).
The connection is done to the root endpoint of the Huawei FTP (only the host is needed) and we expect no actual response, just that cookie ".ASPXAUTH" is set in the session.
This is done to authenticate the user and set the session for the rest of the requests.
Parameters
----------
data_source_name : str | None, optional
Name of data source as defined in performance_db.
If used the connection_properties will be created automatically based on the data source attributes in performance_db.
By default None
connection_properties : HttpConnProperties | None, optional
If you want to manually connect, provide the connection properties. By default None
Other Parameters
----------------
conn_timeout : int, optional
Connection timeout in seconds. By default 30
request_timeout : int, optional
Request timeout in seconds. By default 120
"""
# calling __init__ from base class
super().__init__()
if data_source_name is None and connection_properties is None:
raise ValueError("Either data_source_name or connection_properties must be specified")
if data_source_name is not None:
# getting data sources
ds_dict = self.perfdb.datasources.instances.get(
data_source_types_names=kwargs.get("data_source_types_names"),
get_attributes=True,
output_type="dict",
)
if data_source_name not in ds_dict:
raise RuntimeError(f"Data source '{data_source_name}' not defined in performance_db")
ds_dict = ds_dict[data_source_name]
if connection_properties is not None:
self.connection_properties = connection_properties
else:
self.connection_properties = FtpConnProperties(
host=ds_dict["host_address"],
user=ds_dict["user"],
password=ds_dict["password"],
timeout=kwargs.get("timeout", 30),
passive=kwargs.get("passive", True),
)
self.ftp_client = FtpHandler( # essa parte eu nao entendi exatamente
connection_properties=self.connection_properties,
)
feature_values(object_name, features_df, period, **kwargs)
¶
Method that returns the time series of features for an object in the data source
Parameters:
-
object_name
¶str
) –Name of desired object as defined in performance_db.
-
features_df
¶DataFrame
) –DataFrame with the wanted features. Must contain columns ["name", "standardized_name", "name_in_data_source", "aggregation_type"]
-
period
¶DateTimeRange
) –Desired period of the alarms.
-
**kwargs
¶Other parameters to be passed to the FTP client.
Returns:
-
DataFrame
–DataFrame with time series of features for an object.
Source code in echo_dataimporter/huawei_handler.py
def feature_values(
self,
object_name: str,
features_df: pd.DataFrame,
period: DateTimeRange,
**kwargs, # noqa: ARG002
) -> pd.DataFrame:
"""Method that returns the time series of features for an object in the data source
Parameters
----------
object_name : str
Name of desired object as defined in performance_db.
features_df : pd.DataFrame
DataFrame with the wanted features. Must contain columns ["name", "standardized_name", "name_in_data_source", "aggregation_type"]
period : DateTimeRange
Desired period of the alarms.
**kwargs
Other parameters to be passed to the FTP client.
Returns
-------
pd.DataFrame
DataFrame with time series of features for an object.
"""
# informações gerais sobre o objeto --------------------------------------------------------------------------
object_df_attributes = self.perfdb.objects.instances.attributes.get(object_names=[object_name])
esn = object_df_attributes[object_name]["serial_number"]["attribute_value"]
folder_name = object_df_attributes[object_name]["ftp_folder_name"]["attribute_value"]
# lista de pastas no repositorio FTP -------------------------------------------------------------------------
folder_list = self.ftp_client.list_contents()
if folder_name is not None and folder_name not in folder_list:
raise ValueError(f"Folder '{folder_name}' not found in the FTP server. Available folders: {folder_list}")
self.ftp_client.change_directory(folder_name)
files_list = self.ftp_client.list_contents()
# Conversão da data em nome de arquivos
daily_dates = pd.date_range(start=period.start, end=period.end, freq="D")
files_to_read = [f"min{date.strftime('%Y%m%d')}.csv" for date in daily_dates]
files_filtred = [file for file in files_list if file in files_to_read]
if files_filtred is None or len(files_filtred) == 0:
raise ValueError(f"No files found in the folder '{folder_name}' for the specified period: {period}")
# leitura dos arquivos baixados e criação do dataframe --------------------------------------------------------
def start_end_lines(path: Path) -> pd.DataFrame:
"""Scans a data file to map each inverter ESN to its data block boundaries.
This utility function reads a file line by line to find all inverter identifier lines (e.g., '#INV... ESN:...'). It then calculates the start and end line numbers for each inverter's data block, returning this information as a DataFrame. This is useful for parsing files that contain data from multiple inverters.
Parameters
----------
path : Path
The full path to the data file to be scanned
Returns
-------
pd.DataFrame
A DataFrame where the index is the inverter ESN (str) and the columns are 'start_line' and 'end_line'. The end_line for the last inverter will be the total number of lines in the file.
"""
inverter_map = {}
if not path.exists():
raise FileNotFoundError(f"File '{path}' does not exist")
with Path(path).open(encoding="utf-8") as f:
for number_line, line in enumerate(f, start=1):
if "ESN:" in line: # if "#INV" in line and "ESN:" in line:
try:
esn = line.split(":")[-1].strip()
if esn:
inverter_map[esn] = number_line
except IndexError:
continue
if not inverter_map:
raise ValueError(f"No ESN found in the file '{path}'")
df_lines = pd.DataFrame.from_dict(inverter_map, orient="index", columns=["start line"])
df_lines.loc["end line"] = number_line
df_lines["end line"] = df_lines["start line"].shift(-1)
df_lines = df_lines.drop("end line")
return df_lines
def read_smartlogger_files(esn: str, df_lines: pd.DataFrame) -> pd.DataFrame:
"""_Read a specific data block for an inverter from a smartlogger file.
This function isolates a data block within a larger file using the provided line map (df_lines). It then reads, cleans the headers, sets a datetime index, and renames the columns based on an externally defined 'features_df' DataFrame.
Parameters
----------
esn : str
The ESN (serial number) of the specific inverter whose data block needs to be read
df_lines : pd.DataFrame
A DataFrame, indexed by ESN, containing "start_line" and "end_line" columns that map each inverter to its data block within the file.
Returns
-------
pd.DataFrame
A clean, formatted DataFrame containing the time-series data for the single, specified inverter. The DataFrame has a datetime index and standardized column names. Returns an empty DataFrame if the data block for the ESN has no data rows.
"""
logger.info(f"Starting block processing for ESN '{esn}.")
if esn not in df_lines.index:
raise ValueError(f"ESN '{esn}' not found in the file '{file}'")
start_line = int(df_lines.loc[esn, "start line"])
end_line = int(df_lines.loc[esn, "end line"])
rows_to_read = (end_line - 2) - start_line
if rows_to_read <= 0:
logger.warning("Data block for ESN '%s' is empty or malformed. Skipping.", esn)
return pd.DataFrame() # Retorna um DataFrame vazio.
try:
logger.info(f"Reading data block for ESN '{esn}' from file '{path}'...")
df = pd.read_csv(
path,
skiprows=start_line,
nrows=rows_to_read,
sep=";",
encoding="utf-8",
index_col=False,
)
logger.info(f"Data block for ESN '{esn}' read successfully. Found {len(df)} rows.")
except Exception as e:
raise OSError(f"Failed to read data block for ESN '{esn}'.") from e
df = df.rename(columns=lambda col: col.strip().lstrip("#"))
time_column_name = df.columns[0]
if time_column_name == "Time":
df[time_column_name] = pd.to_datetime(df[time_column_name], format="%Y-%m-%d %H:%M:%S")
df = df.set_index(time_column_name)
else:
logger.warning(f"Expected first column to be 'Time', but found '{time_column_name}'.")
logger.info(f"Renaming columns for ESN '{esn}'...")
df = df.sort_index()
diff_index = df.index.to_series().diff()
frequency_index = diff_index.mode().iloc[0]
df = df.asfreq(frequency_index)
values_in_cycle_time = df["Cycle Time"].dropna().unique()
frequency_index_minute = int(frequency_index.total_seconds() / 60)
values_in_cycle_time = values_in_cycle_time[(values_in_cycle_time != 0) & (values_in_cycle_time != frequency_index_minute)]
for value in values_in_cycle_time:
if value is not None:
index = df[df["Cycle Time"] == value].index
df_preenchedor = df.loc[index[0]].values
end_line_fill = index[0] - frequency_index
start_line_fill = end_line_fill - relativedelta(minutes=int(value - frequency_index_minute))
df.loc[start_line_fill:end_line_fill] = df_preenchedor
else:
logger.warning(f"Found None value in 'Cycle Time' for ESN '{esn}'. Skipping filling.")
original_col_name = features_df["name_in_data_source"].tolist()
new_col_name = features_df["name"].tolist()
if len(original_col_name) != len(new_col_name):
raise ValueError("The length of original_col_name and new_col_name must be the same.")
rename_dict = dict(zip(original_col_name, new_col_name, strict=False))
df = df[original_col_name]
df = df.rename(columns=rename_dict)
df = df[new_col_name]
return df
with tempfile.TemporaryDirectory(dir=".") as temp_folder:
temp_folder_path = Path(temp_folder)
df_list = []
for file in files_filtred:
self.ftp_client.get_file(file, dest_directory=temp_folder_path)
path = temp_folder_path / file
df_lines = start_end_lines(path=path)
df_temp = read_smartlogger_files(esn, df_lines)
df_list.append(df_temp)
df = pd.concat(df_list)
df = df.sort_index()
df = df.loc[period.start : period.end]
return df
object_types()
¶
Method not applicable to Huawei
Source code in echo_dataimporter/huawei_handler.py
def object_types(self) -> pd.DataFrame:
"""Method not applicable to Huawei"""
raise NotImplementedError("Method not implemented yet")
objects()
¶
Method not applicable to Huawei
Source code in echo_dataimporter/huawei_handler.py
def objects(self) -> pd.DataFrame:
"""Method not applicable to Huawei"""
raise NotImplementedError("Method not implemented yet")