Huawei FTP¶
The HuaweiFTPDataHandler is a specialized class, a subclass of DataHandler, designed to connect to, read, and standardize data from an FTP server that is populated by a Huawei Smart Logger device.
The class can navigate the FTP directory structure, identify the correct files for a given period, and extract data for a specific piece of equipment (inverter), even when the source files contain information from multiple devices.
Its key features are:
-
Fetching Operational (Performance) Data: Through the
feature_valuesmethod, the class reads daily performance files (e.g.,minYYYYMMDD.csv) to extract inverter operational data (such as power, frequency, voltage, current, etc.). It isolates the data block for the desired inverter and transforms it into a standardized time series. -
Fetching Alarm History: Using the
alarm_historymethod, the class locates and processes alarm files (e.g.,alarm_major.csv), consolidating a complete history of faults and alerts for a specific device.
How to Use¶
1. Handler Initialization¶
The handler can be initialized in two ways:
By data source name (recommended)¶
The handler fetches the credentials (host, user, password) from the performance_db database. The data source name must exist in the database.
from echo_dataimporter import HuaweiFTPDataHandler
import pandas as pd
from datetime import datetime
from echo_datetimerange import DateTimeRange
# Initialize using a pre-defined data source from the database
handler = HuaweiFTPDataHandler(data_source_name="MyHuaweiFTPServer")
By direct connection properties¶
You can provide a configuration object with the connection details.
from echo_dataimporter import HuaweiFTPDataHandler
import pandas as pd
from datetime import datetime
from echo_datetimerange import DateTimeRange
# Manually define the connection properties
conn_props = FtpConnProperties(
host="192.168.1.100",
user="ftp_user",
password="ftp_password"
)
handler = HuaweiFTPDataHandler(connection_properties=conn_props)
2. Fetching Performance Data¶
To fetch time-series data from an inverter, use the feature_values method.
# Define the object, model, and features
object_names = "TEST-TS1-INV01"
object_model = "DG-330KTL-H1"
features_name = ["ActivePower_5min.AVG", "DcInput02Current_5min.AVG", "CycleTime_5min.REP"]
# Define the desired period (e.g., June 10 to June 11, 2025)
period = DateTimeRange(datetime(2025, 6, 10), datetime(2025, 6, 11))
# Get the full feature definitions from the database
features_df = handler.perfdb.features.definitions.get(
object_names=[object_names], feature_names=features_name, object_models=[object_model]
).reset_index(drop=False)
# Fetch the data
performance_data = handler.feature_values(
object_name=object_names,
features_df=features_df,
period=period
)
3. Fetching Alarm History¶
To get a consolidated DataFrame with the alarm history for a piece of equipment.
# Define the object
object_name="RG1-TS2-IN21"
# Define the desired period for the alarm search
alarm_period = DateTimeRange(datetime(2025, 6, 10), datetime(2025, 6, 11))
# Fetch the history
alarm_history = handler.alarm_history(
object_name=object_name,
period=alarm_period
)
Required Configuration¶
For the HuaweiFTPDataHandler to work correctly, the following three components must be configured in an integrated manner.
1. Database (performance_db)¶

The configuration in the performance_db is based on the relationship between three main entities and their essential attributes:
-
The Data Source: Represents the connection to the FTP server.
- It must have the connection attributes:
host,user, andpassword. - It must belong to a specific data source type (e.g.,
ftp_huawei_smartlogger).
- It must have the connection attributes:
-
The Object (Inverter): Represents the physical equipment.
- It must have the identification attributes:
serial_number: The ESN (Equipment Serial Number) of the equipment.ftp_folder_name: The name of the equipment's folder on the FTP server. Usually is a folder for the TS/Smart logger, not for each inverter.
- It must have the identification attributes:
-
The Feature: Represents a metric to be read.
- It must contain the name mapping information:
name_in_data_source: The exact name of the metric in the source file (e.g., "Active Power").name: The standardized name to which the metric will be converted (e.g., "ActivePower_5min.AVG").
- A set of
featuresmust be associated with an Object Model.
- It must contain the name mapping information:
The crucial link is that the features (associated with an object model) must also be linked to the same data source type as the instance containing the connection credentials. This ensures the handler uses the correct feature definitions for the correct data source.
2. Huawei Smart Logger¶
Configuring the Smart Logger is a critical step, as it is responsible for sending data from all inverters connected to its transformer.
Accessing the Smart Logger¶
The device can only be accessed from the plant's local network. The standard procedure is:
- Remotely connect to a computer that is on the plant's network.
- From this computer, open a web browser and access the Smart Logger's web interface via its local IP address.
FTP Configuration¶
In the Smart Logger interface, navigate to the Settings > FTP menu and fill in the fields as described below.
-
Basic parameters:
FTP server: The IP address or hostname of the FTP server.- This value must be identical to the
hostattribute of yourData Sourceinperformance_db.
- This value must be identical to the
User name: The username for the FTP connection.- It must be identical to the
userattribute inperformance_db.
- It must be identical to the
Password: The password for the FTP connection.- It must be identical to the
passwordattribute inperformance_db.
- It must be identical to the
Remote directory: The name of the destination folder on the server.- Important: This field defines the folder for all data generated by this Smart Logger. The end of the name must exactly match the
ftp_folder_nameattribute of the inverter object inperformance_db. Each Smart Logger must have a unique folder.
- Important: This field defines the folder for all data generated by this Smart Logger. The end of the name must exactly match the
-
Report Settings:
Data reporting: EnableFile format: Format 4File name: minYYYYMMDD.csvTime format: YYYY-MM-DDReporting mode: CyclicReporting interval: 180 (minutes)File mode: Accumulated data
3. FTP Server (e.g., FileZilla Server)¶
The access to the data is only possible because we have configured a FileZilla FTP server to host the data files. The relevant configurations are shown in the images below.


Please note that the Native path in the second image must be the directory where the SmartLogger stores its data files (see step FTP Configuration).
Note
Here we are using FileZilla for windows but any FTP server would work if configured correctly. For the DG solar farms a Linux-based FTP server will be used.
Class Definition¶
HuaweiFTPDataHandler(data_source_name=None, connection_properties=None, **kwargs)
¶
Subclass of DataHandler used to interact with Huawei FTP.
This method sets up the handler by establishing the necessary FTP connection parameters. It can operate in two ways:
- Using
data_source_name: Fetches connection details from the performance database (performance_db). - Using
connection_properties: Uses a pre-configuredFtpConnPropertiesobject directly.
If both are provided, connection_properties will take precedence.
Parameters:
-
(data_source_name¶str | None, default:None) –The name of the data source as defined in
performance_db. This is used to look up host, user, and password. Defaults to None. -
(connection_properties¶FtpConnProperties | None, default:None) –A pre-configured object containing all necessary FTP connection properties (host, user, password, etc.). Defaults to None.
-
–**kwargs¶Arbitrary keyword arguments. The following are recognized:
data_source_types_names(str): Used when querying theperformance_dbto filter data source types.timeout(int, optional): The connection timeout in seconds. Defaults to 30 if not provided.passive(bool, optional): Whether to use FTP passive mode. Defaults to True if not provided.
Source code in echo_dataimporter/huawei_handler.py
def __init__(
self,
data_source_name: str | None = None,
connection_properties: FtpConnProperties | None = None,
**kwargs,
) -> None:
"""Initialize the handler for a Huawei FTP data source.
This method sets up the handler by establishing the necessary FTP
connection parameters. It can operate in two ways:
1. Using `data_source_name`: Fetches connection details from the
performance database (`performance_db`).
2. Using `connection_properties`: Uses a pre-configured
`FtpConnProperties` object directly.
If both are provided, `connection_properties` will take precedence.
Parameters
----------
data_source_name : str | None, optional
The name of the data source as defined in `performance_db`.
This is used to look up host, user, and password.
Defaults to None.
connection_properties : FtpConnProperties | None, optional
A pre-configured object containing all necessary FTP connection
properties (host, user, password, etc.). Defaults to None.
**kwargs
Arbitrary keyword arguments. The following are recognized:
* `data_source_types_names` (str): Used when querying the
`performance_db` to filter data source types.
* `timeout` (int, optional): The connection timeout in seconds.
Defaults to 30 if not provided.
* `passive` (bool, optional): Whether to use FTP passive mode.
Defaults to True if not provided.
"""
# calling __init__ from base class
super().__init__()
if data_source_name is None and connection_properties is None:
raise ValueError("Either data_source_name or connection_properties must be specified")
if data_source_name is not None:
# getting data sources
ds_dict = self.perfdb.datasources.instances.get(
data_source_types_names=kwargs.get("data_source_types_names"),
get_attributes=True,
output_type="dict",
)
if data_source_name not in ds_dict:
raise RuntimeError(f"Data source '{data_source_name}' not defined in performance_db")
ds_dict = ds_dict[data_source_name]
if connection_properties is not None:
self.connection_properties = connection_properties
else:
self.connection_properties = FtpConnProperties(
host=ds_dict["host_address"],
user=ds_dict["user"],
password=ds_dict["password"],
timeout=kwargs.get("timeout", 30),
passive=kwargs.get("passive", True),
)
self.ftp_client = FtpHandler( # essa parte eu nao entendi exatamente
connection_properties=self.connection_properties,
)
alarm_history(object_name, period)
¶
Retrieves and consolidates the alarm history for a specific object over a given period.
This method acts as the main entry point for fetching alarm data. It locates the object's specific alarm folder on the FTP server, processes all relevant alarm files within that folder using a helper function, and returns a single, sorted DataFrame containing the complete alarm history for the period.
Parameters:
-
(object_name¶str) –Name of the desired object as defined in
performance_db. This object must have the "ftp_folder_name" attribute defined. -
(period¶DateTimeRange) –The desired period for which to retrieve alarms. Any alarm active during this period will be included.
Returns:
-
DataFrame–A single DataFrame containing all alarms that occurred during the specified period, sorted by 'start' and 'end' time. The DataFrame has the following standardized columns: -
object_name,manufacturer_id,alarm_name,start,end,data_source_type,alarm_responsibility.
Source code in echo_dataimporter/huawei_handler.py
@validate_call
def alarm_history(
self,
object_name: str,
period: DateTimeRange,
) -> pd.DataFrame:
"""Retrieves and consolidates the alarm history for a specific object over a given period.
This method acts as the main entry point for fetching alarm data. It locates the object's specific alarm folder on the FTP server, processes all relevant alarm files within that folder using a helper function, and returns a single, sorted DataFrame containing the complete alarm history for the period.
Parameters
----------
object_name : str
Name of the desired object as defined in `performance_db`. This object must have the "ftp_folder_name" attribute defined.
period : DateTimeRange
The desired period for which to retrieve alarms. Any alarm active during this period will be included.
Returns
-------
pd.DataFrame
A single DataFrame containing all alarms that occurred during the specified period, sorted by 'start' and 'end' time. The DataFrame has the following standardized columns:
- `object_name`, `manufacturer_id`, `alarm_name`, `start`, `end`, `data_source_type`, `alarm_responsibility`.
"""
def read_alarm_files(files_list: list[str], object: str, period: DateTimeRange) -> dict[str, pd.DataFrame]: # noqa: A002
"""Helper function to download, parse, and standardize alarm data files.
For a given list of filenames, this function filters for actual alarm files (containing "alarmg_*"), downloads them, and extracts alarm data corresponding to the specified object and period.
Parameters
----------
files_list : list[str]
A list of all filenames present in the object's FTP directory.
object : str
The name of the device to filter alarms for, corresponding to the "Device" column in the raw files.
period : DateTimeRange
The time period to filter alarms against. Alarms overlapping with this period are returned.
Returns
-------
dict[str, pd.DataFrame]
A dictionary mapping each processed filename to a DataFrame of its standardized alarm data.
"""
with tempfile.TemporaryDirectory(dir=".") as temp_folder:
dict_df = {}
for file in files_list:
if "alarmg_major" in file or "alarmg_minor" in file or "alarmg_warning" in file:
try:
self.ftp_client.get_file(file, dest_directory=Path(temp_folder))
path = Path(temp_folder) / file
df_temp = pd.read_csv(
path,
sep=",",
encoding="utf-8",
index_col=False,
)
df_temp = df_temp.applymap(lambda x: x.strip() if isinstance(x, str) else x)
df_temp = df_temp[df_temp["Device"] == object] # use "RG1-TS2-IN21" for testing
dict_rename = {
"Device": "object_name",
"Alarm ID": "manufacturer_id",
"Alarm Name": "alarm_name",
"Generation time": "start",
"End time": "end",
}
df_temp = df_temp.rename(columns=dict_rename)
df_temp = df_temp[dict_rename.values()]
df_temp["data_source_type"] = "ftp_huawei_smartlogger"
df_temp["alarm_responsibility"] = "U"
df_temp["start"] = pd.to_datetime(df_temp["start"], format="%Y-%m-%d %H:%M:%S", errors="coerce")
df_temp["end"] = pd.to_datetime(df_temp["end"], format="%Y-%m-%d %H:%M:%S", errors="coerce")
df_temp = df_temp[(df_temp["start"] < period.end) & (df_temp["end"] > period.start)]
dict_df[file] = df_temp
except pd.errors.EmptyDataError:
logger.warning(f"File '{file}' is empty. Skipping.")
continue
return dict_df
object_df_attributes = self.perfdb.objects.instances.attributes.get(object_names=[object_name])
folder_name = "./Alarmes/" + object_df_attributes[object_name]["ftp_folder_name"]["attribute_value"]
self.ftp_client.change_directory(folder_name)
files_list = self.ftp_client.list_contents()
dict_df = {}
dict_df = read_alarm_files(files_list, object_name, period)
df = pd.concat(dict_df.values(), ignore_index=True)
df = df.sort_values(by=["start", "end"]).reset_index(drop=True)
return df
feature_values(object_name, features_df, period)
¶
Retrieves time-series feature data for a specific object from a Huawei FTP server.
This method orchestrates the entire process of fetching data. It first queries the performance database for object metadata (like ESN and FTP folder), then connects to the FTP server to find and download the relevant daily CSV files. Each file, which may contain data for multiple devices, is parsed to extract the specific data block for the requested object. Finally, data from all files is concatenated and cleaned.
Parameters:
-
(object_name¶str) –Name of the desired object (e.g., an inverter) as defined in
performance_db. -
(features_df¶DataFrame) –A DataFrame defining the features to be extracted. It must contain the columns: "name", "standardized_name", "name_in_data_source", and "aggregation_type".
-
(period¶DateTimeRange) –The start and end datetimes for the desired data period.
Returns:
-
DataFrame–A pandas DataFrame indexed by datetime, with columns corresponding to the standardized feature names. The data is sorted and clipped to the requested
period.
Source code in echo_dataimporter/huawei_handler.py
@validate_call
def feature_values(
self,
object_name: str,
features_df: pd.DataFrame,
period: DateTimeRange,
) -> pd.DataFrame:
"""Retrieves time-series feature data for a specific object from a Huawei FTP server.
This method orchestrates the entire process of fetching data. It first queries the performance database for object metadata (like ESN and FTP folder), then connects to the FTP server to find and download the relevant daily CSV files.
Each file, which may contain data for multiple devices, is parsed to extract the specific data block for the requested object. Finally, data from all files is concatenated and cleaned.
Parameters
----------
object_name : str
Name of the desired object (e.g., an inverter) as defined in `performance_db`.
features_df : pd.DataFrame
A DataFrame defining the features to be extracted. It must contain the columns: "name", "standardized_name", "name_in_data_source", and "aggregation_type".
period : DateTimeRange
The start and end datetimes for the desired data period.
Returns
-------
pd.DataFrame
A pandas DataFrame indexed by datetime, with columns corresponding to the standardized feature names. The data is sorted and clipped to the requested `period`.
"""
# informações gerais sobre o objeto --------------------------------------------------------------------------
object_df_attributes = self.perfdb.objects.instances.attributes.get(object_names=[object_name])
esn = object_df_attributes[object_name]["serial_number"]["attribute_value"]
folder_name = object_df_attributes[object_name]["ftp_folder_name"]["attribute_value"]
# lista de pastas no repositorio FTP -------------------------------------------------------------------------
folder_list = self.ftp_client.list_contents()
if folder_name is not None and folder_name not in folder_list:
raise ValueError(f"Folder '{folder_name}' not found in the FTP server. Available folders: {folder_list}")
self.ftp_client.change_directory(folder_name)
files_list = self.ftp_client.list_contents()
# Conversão da data em nome de arquivos
daily_dates = pd.date_range(start=period.start, end=period.end, freq="D")
files_to_read = [f"min{date.strftime('%Y%m%d')}.csv" for date in daily_dates]
files_filtred = [file for file in files_list if file in files_to_read]
if files_filtred is None or len(files_filtred) == 0:
raise ValueError(f"No files found in the folder '{folder_name}' for the specified period: {period}")
# leitura dos arquivos baixados e criação do dataframe --------------------------------------------------------
def start_end_lines(path: Path) -> pd.DataFrame:
"""Scans a data file to map each inverter ESN to its data block boundaries.
This utility function reads a file line by line to find all inverter identifier lines (e.g., '#INV... ESN:...'). It calculates the start and end line numbers for each inverter's data block, which is essential for parsing files containing data from multiple inverters.
Parameters
----------
path : Path
The full path to the data file to be scanned.
Returns
-------
pd.DataFrame
A DataFrame where the index is the inverter ESN (str) and columns are 'start_line' and 'end_line'. The end_line for the last inverter is the total number of lines in the file.
"""
inverter_map = {}
if not path.exists():
raise FileNotFoundError(f"File '{path}' does not exist")
with Path(path).open(encoding="utf-8") as f:
for number_line, line in enumerate(f, start=1):
if "ESN:" in line: # if "#INV" in line and "ESN:" in line:
try:
esn = line.split(":")[-1].strip()
if esn:
inverter_map[esn] = number_line
except IndexError:
continue
if not inverter_map:
raise ValueError(f"No ESN found in the file '{path}'")
df_lines = pd.DataFrame.from_dict(inverter_map, orient="index", columns=["start line"])
df_lines.loc["end line"] = number_line
df_lines["end line"] = df_lines["start line"].shift(-1)
df_lines = df_lines.drop("end line")
return df_lines
def read_smartlogger_files(esn: str, df_lines: pd.DataFrame) -> pd.DataFrame:
"""Reads a specific data block for one inverter from a smartlogger file.
This function uses a pre-generated line map (`df_lines`) to isolate and read only the data block corresponding to a specific ESN. It then performs several data cleaning and formatting operations:
- Cleans column headers.
- Sets a proper datetime index.
- Resamples the data to a consistent frequency.
- Fills data gaps by forward-filling, using 'Cycle Time' as a guide.
- Renames columns to a standard format using `features_df` from the parent scope.
Parameters
----------
esn : str
The ESN (serial number) of the specific inverter whose data is to be read.
df_lines : pd.DataFrame
A DataFrame, indexed by ESN, containing 'start_line' and 'end_line' columns that map each inverter to its data block within the file.
Returns
-------
pd.DataFrame
A clean, formatted DataFrame with a datetime index and standardized column names for the specified inverter. Returns an empty DataFrame if the data block for the ESN contains no data rows.
"""
logger.info(f"Starting block processing for ESN '{esn}.")
if esn not in df_lines.index:
raise ValueError(f"ESN '{esn}' not found in the file '{file}'")
start_line = int(df_lines.loc[esn, "start line"])
end_line = int(df_lines.loc[esn, "end line"])
rows_to_read = (end_line - 2) - start_line
if rows_to_read <= 0:
logger.warning("Data block for ESN '%s' is empty or malformed. Skipping.", esn)
return pd.DataFrame() # Retorna um DataFrame vazio.
try:
logger.info(f"Reading data block for ESN '{esn}' from file '{path}'...")
df = pd.read_csv(
path,
skiprows=start_line,
nrows=rows_to_read,
sep=";",
encoding="utf-8",
index_col=False,
)
logger.info(f"Data block for ESN '{esn}' read successfully. Found {len(df)} rows.")
except Exception as e:
raise OSError(f"Failed to read data block for ESN '{esn}'.") from e
df = df.rename(columns=lambda col: col.strip().lstrip("#"))
time_column_name = df.columns[0]
if time_column_name == "Time":
df[time_column_name] = pd.to_datetime(df[time_column_name], format="%Y-%m-%d %H:%M:%S")
df = df.set_index(time_column_name)
else:
logger.warning(f"Expected first column to be 'Time', but found '{time_column_name}'.")
logger.info(f"Renaming columns for ESN '{esn}'...")
df = df.sort_index()
diff_index = df.index.to_series().diff()
frequency_index = diff_index.mode().iloc[0]
df = df.asfreq(frequency_index)
values_in_cycle_time = df["Cycle Time"].dropna().unique()
frequency_index_minute = int(frequency_index.total_seconds() / 60)
values_in_cycle_time = values_in_cycle_time[(values_in_cycle_time != 0) & (values_in_cycle_time != frequency_index_minute)]
for value in values_in_cycle_time:
if value is not None:
index = df[df["Cycle Time"] == value].index
df_preenchedor = df.loc[index[0]].values
end_line_fill = index[0] - frequency_index
start_line_fill = end_line_fill - relativedelta(minutes=int(value - frequency_index_minute))
df.loc[start_line_fill:end_line_fill] = df_preenchedor
else:
logger.warning(f"Found None value in 'Cycle Time' for ESN '{esn}'. Skipping filling.")
original_col_name = features_df["name_in_data_source"].tolist()
new_col_name = features_df["name"].tolist()
if len(original_col_name) != len(new_col_name):
raise ValueError("The length of original_col_name and new_col_name must be the same.")
rename_dict = dict(zip(original_col_name, new_col_name, strict=False))
df = df[original_col_name]
df = df.rename(columns=rename_dict)
df = df[new_col_name]
return df
with tempfile.TemporaryDirectory(dir=".") as temp_folder:
temp_folder_path = Path(temp_folder)
df_list = []
for file in files_filtred:
self.ftp_client.get_file(file, dest_directory=temp_folder_path)
path = temp_folder_path / file
df_lines = start_end_lines(path=path)
df_temp = read_smartlogger_files(esn, df_lines)
df_list.append(df_temp)
df = pd.concat(df_list)
df = df.sort_index()
df = df.loc[period.start : period.end]
return df
object_types()
¶
Method not applicable to Huawei
Source code in echo_dataimporter/huawei_handler.py
def object_types(self) -> pd.DataFrame:
"""Method not applicable to Huawei"""
raise NotImplementedError("Method not implemented yet")
objects()
¶
Method not applicable to Huawei
Source code in echo_dataimporter/huawei_handler.py
def objects(self) -> pd.DataFrame:
"""Method not applicable to Huawei"""
raise NotImplementedError("Method not implemented yet")