Skip to content

Huawei FTP

The HuaweiFTPDataHandler is a specialized class, a subclass of DataHandler, designed to connect to, read, and standardize data from an FTP server that is populated by a Huawei Smart Logger device.

The class can navigate the FTP directory structure, identify the correct files for a given period, and extract data for a specific piece of equipment (inverter), even when the source files contain information from multiple devices.

Its key features are:

  • Fetching Operational (Performance) Data: Through the feature_values method, the class reads daily performance files (e.g., minYYYYMMDD.csv) to extract inverter operational data (such as power, frequency, voltage, current, etc.). It isolates the data block for the desired inverter and transforms it into a standardized time series.

  • Fetching Alarm History: Using the alarm_history method, the class locates and processes alarm files (e.g., alarm_major.csv), consolidating a complete history of faults and alerts for a specific device.

How to Use

1. Handler Initialization

The handler can be initialized in two ways:

The handler fetches the credentials (host, user, password) from the performance_db database. The data source name must exist in the database.

from echo_dataimporter import HuaweiFTPDataHandler
import pandas as pd
from datetime import datetime
from echo_datetimerange import DateTimeRange

# Initialize using a pre-defined data source from the database
handler = HuaweiFTPDataHandler(data_source_name="MyHuaweiFTPServer")

By direct connection properties

You can provide a configuration object with the connection details.

from echo_dataimporter import HuaweiFTPDataHandler
import pandas as pd
from datetime import datetime
from echo_datetimerange import DateTimeRange

# Manually define the connection properties
conn_props = FtpConnProperties(
    host="192.168.1.100",
    user="ftp_user",
    password="ftp_password"
)

handler = HuaweiFTPDataHandler(connection_properties=conn_props)

2. Fetching Performance Data

To fetch time-series data from an inverter, use the feature_values method.

# Define the object, model, and features
object_names = "TEST-TS1-INV01"
object_model = "DG-330KTL-H1"
features_name = ["ActivePower_5min.AVG", "DcInput02Current_5min.AVG", "CycleTime_5min.REP"]

# Define the desired period (e.g., June 10 to June 11, 2025)
period = DateTimeRange(datetime(2025, 6, 10), datetime(2025, 6, 11))

# Get the full feature definitions from the database
features_df = handler.perfdb.features.definitions.get(
    object_names=[object_names], feature_names=features_name, object_models=[object_model]
).reset_index(drop=False)

# Fetch the data
performance_data = handler.feature_values(
    object_name=object_names,
    features_df=features_df,
    period=period
)

3. Fetching Alarm History

To get a consolidated DataFrame with the alarm history for a piece of equipment.

# Define the object
object_name="RG1-TS2-IN21"

# Define the desired period for the alarm search
alarm_period = DateTimeRange(datetime(2025, 6, 10), datetime(2025, 6, 11))

# Fetch the history
alarm_history = handler.alarm_history(
    object_name=object_name,
    period=alarm_period
)

Required Configuration

For the HuaweiFTPDataHandler to work correctly, the following three components must be configured in an integrated manner.

1. Database (performance_db)

Diagram of relationships

The configuration in the performance_db is based on the relationship between three main entities and their essential attributes:

  • The Data Source: Represents the connection to the FTP server.

    • It must have the connection attributes: host, user, and password.
    • It must belong to a specific data source type (e.g., ftp_huawei_smartlogger).
  • The Object (Inverter): Represents the physical equipment.

    • It must have the identification attributes:
      • serial_number: The ESN (Equipment Serial Number) of the equipment.
      • ftp_folder_name: The name of the equipment's folder on the FTP server. Usually is a folder for the TS/Smart logger, not for each inverter.
  • The Feature: Represents a metric to be read.

    • It must contain the name mapping information:
      • name_in_data_source: The exact name of the metric in the source file (e.g., "Active Power").
      • name: The standardized name to which the metric will be converted (e.g., "ActivePower_5min.AVG").
    • A set of features must be associated with an Object Model.

The crucial link is that the features (associated with an object model) must also be linked to the same data source type as the instance containing the connection credentials. This ensures the handler uses the correct feature definitions for the correct data source.

2. Huawei Smart Logger

Configuring the Smart Logger is a critical step, as it is responsible for sending data from all inverters connected to its transformer.

Accessing the Smart Logger

The device can only be accessed from the plant's local network. The standard procedure is:

  1. Remotely connect to a computer that is on the plant's network.
  2. From this computer, open a web browser and access the Smart Logger's web interface via its local IP address.

FTP Configuration

In the Smart Logger interface, navigate to the Settings > FTP menu and fill in the fields as described below.

  • Basic parameters:

    • FTP server: The IP address or hostname of the FTP server.
      • This value must be identical to the host attribute of your Data Source in performance_db.
    • User name: The username for the FTP connection.
      • It must be identical to the user attribute in performance_db.
    • Password: The password for the FTP connection.
      • It must be identical to the password attribute in performance_db.
    • Remote directory: The name of the destination folder on the server.
      • Important: This field defines the folder for all data generated by this Smart Logger. The end of the name must exactly match the ftp_folder_name attribute of the inverter object in performance_db. Each Smart Logger must have a unique folder.
  • Report Settings:

    • Data reporting: Enable
    • File format: Format 4
    • File name: minYYYYMMDD.csv
    • Time format: YYYY-MM-DD
    • Reporting mode: Cyclic
    • Reporting interval: 180 (minutes)
    • File mode: Accumulated data

3. FTP Server (e.g., FileZilla Server)

The access to the data is only possible because we have configured a FileZilla FTP server to host the data files. The relevant configurations are shown in the images below.

FileZilla Server Configuration 1

FileZilla Server Configuration 2

Please note that the Native path in the second image must be the directory where the SmartLogger stores its data files (see step FTP Configuration).

Note

Here we are using FileZilla for windows but any FTP server would work if configured correctly. For the DG solar farms a Linux-based FTP server will be used.

Class Definition

HuaweiFTPDataHandler(data_source_name=None, connection_properties=None, **kwargs)

Subclass of DataHandler used to interact with Huawei FTP.

This method sets up the handler by establishing the necessary FTP connection parameters. It can operate in two ways:

  1. Using data_source_name: Fetches connection details from the performance database (performance_db).
  2. Using connection_properties: Uses a pre-configured FtpConnProperties object directly.

If both are provided, connection_properties will take precedence.

Parameters:

  • data_source_name

    (str | None, default: None ) –

    The name of the data source as defined in performance_db. This is used to look up host, user, and password. Defaults to None.

  • connection_properties

    (FtpConnProperties | None, default: None ) –

    A pre-configured object containing all necessary FTP connection properties (host, user, password, etc.). Defaults to None.

  • **kwargs

    Arbitrary keyword arguments. The following are recognized:

    • data_source_types_names (str): Used when querying the performance_db to filter data source types.
    • timeout (int, optional): The connection timeout in seconds. Defaults to 30 if not provided.
    • passive (bool, optional): Whether to use FTP passive mode. Defaults to True if not provided.
Source code in echo_dataimporter/huawei_handler.py
def __init__(
    self,
    data_source_name: str | None = None,
    connection_properties: FtpConnProperties | None = None,
    **kwargs,
) -> None:
    """Initialize the handler for a Huawei FTP data source.

    This method sets up the handler by establishing the necessary FTP
    connection parameters. It can operate in two ways:

    1.  Using `data_source_name`: Fetches connection details from the
        performance database (`performance_db`).
    2.  Using `connection_properties`: Uses a pre-configured
        `FtpConnProperties` object directly.

    If both are provided, `connection_properties` will take precedence.

    Parameters
    ----------
    data_source_name : str | None, optional
        The name of the data source as defined in `performance_db`.
        This is used to look up host, user, and password.
        Defaults to None.
    connection_properties : FtpConnProperties | None, optional
        A pre-configured object containing all necessary FTP connection
        properties (host, user, password, etc.). Defaults to None.
    **kwargs
        Arbitrary keyword arguments. The following are recognized:

        * `data_source_types_names` (str): Used when querying the
            `performance_db` to filter data source types.
        * `timeout` (int, optional): The connection timeout in seconds.
            Defaults to 30 if not provided.
        * `passive` (bool, optional): Whether to use FTP passive mode.
            Defaults to True if not provided.
    """
    # calling __init__ from base class
    super().__init__()

    if data_source_name is None and connection_properties is None:
        raise ValueError("Either data_source_name or connection_properties must be specified")

    if data_source_name is not None:
        # getting data sources
        ds_dict = self.perfdb.datasources.instances.get(
            data_source_types_names=kwargs.get("data_source_types_names"),
            get_attributes=True,
            output_type="dict",
        )

        if data_source_name not in ds_dict:
            raise RuntimeError(f"Data source '{data_source_name}' not defined in performance_db")

        ds_dict = ds_dict[data_source_name]

    if connection_properties is not None:
        self.connection_properties = connection_properties
    else:
        self.connection_properties = FtpConnProperties(
            host=ds_dict["host_address"],
            user=ds_dict["user"],
            password=ds_dict["password"],
            timeout=kwargs.get("timeout", 30),
            passive=kwargs.get("passive", True),
        )

    self.ftp_client = FtpHandler(  # essa parte eu nao entendi exatamente
        connection_properties=self.connection_properties,
    )

alarm_history(object_name, period)

Retrieves and consolidates the alarm history for a specific object over a given period.

This method acts as the main entry point for fetching alarm data. It locates the object's specific alarm folder on the FTP server, processes all relevant alarm files within that folder using a helper function, and returns a single, sorted DataFrame containing the complete alarm history for the period.

Parameters:

  • object_name

    (str) –

    Name of the desired object as defined in performance_db. This object must have the "ftp_folder_name" attribute defined.

  • period

    (DateTimeRange) –

    The desired period for which to retrieve alarms. Any alarm active during this period will be included.

Returns:

  • DataFrame

    A single DataFrame containing all alarms that occurred during the specified period, sorted by 'start' and 'end' time. The DataFrame has the following standardized columns: - object_name, manufacturer_id, alarm_name, start, end, data_source_type, alarm_responsibility.

Source code in echo_dataimporter/huawei_handler.py
@validate_call
def alarm_history(
    self,
    object_name: str,
    period: DateTimeRange,
) -> pd.DataFrame:
    """Retrieves and consolidates the alarm history for a specific object over a given period.

    This method acts as the main entry point for fetching alarm data. It locates the object's specific alarm folder on the FTP server, processes all relevant alarm files within that folder using a helper function, and returns a single, sorted DataFrame containing the complete alarm history for the period.

    Parameters
    ----------
    object_name : str
        Name of the desired object as defined in `performance_db`. This object must have the "ftp_folder_name" attribute defined.
    period : DateTimeRange
        The desired period for which to retrieve alarms. Any alarm active during this period will be included.

    Returns
    -------
    pd.DataFrame
        A single DataFrame containing all alarms that occurred during the specified period, sorted by 'start' and 'end' time. The DataFrame has the following standardized columns:
        - `object_name`, `manufacturer_id`, `alarm_name`, `start`, `end`, `data_source_type`, `alarm_responsibility`.
    """

    def read_alarm_files(files_list: list[str], object: str, period: DateTimeRange) -> dict[str, pd.DataFrame]:  # noqa: A002
        """Helper function to download, parse, and standardize alarm data files.

        For a given list of filenames, this function filters for actual alarm files (containing "alarmg_*"), downloads them, and extracts alarm data corresponding to the specified object and period.

        Parameters
        ----------
        files_list : list[str]
            A list of all filenames present in the object's FTP directory.
        object : str
            The name of the device to filter alarms for, corresponding to the "Device" column in the raw files.
        period : DateTimeRange
            The time period to filter alarms against. Alarms overlapping with this period are returned.

        Returns
        -------
        dict[str, pd.DataFrame]
            A dictionary mapping each processed filename to a DataFrame of its standardized alarm data.
        """
        with tempfile.TemporaryDirectory(dir=".") as temp_folder:
            dict_df = {}
            for file in files_list:
                if "alarmg_major" in file or "alarmg_minor" in file or "alarmg_warning" in file:
                    try:
                        self.ftp_client.get_file(file, dest_directory=Path(temp_folder))
                        path = Path(temp_folder) / file
                        df_temp = pd.read_csv(
                            path,
                            sep=",",
                            encoding="utf-8",
                            index_col=False,
                        )
                        df_temp = df_temp.applymap(lambda x: x.strip() if isinstance(x, str) else x)
                        df_temp = df_temp[df_temp["Device"] == object]  # use "RG1-TS2-IN21" for testing
                        dict_rename = {
                            "Device": "object_name",
                            "Alarm ID": "manufacturer_id",
                            "Alarm Name": "alarm_name",
                            "Generation time": "start",
                            "End time": "end",
                        }
                        df_temp = df_temp.rename(columns=dict_rename)
                        df_temp = df_temp[dict_rename.values()]
                        df_temp["data_source_type"] = "ftp_huawei_smartlogger"
                        df_temp["alarm_responsibility"] = "U"
                        df_temp["start"] = pd.to_datetime(df_temp["start"], format="%Y-%m-%d %H:%M:%S", errors="coerce")
                        df_temp["end"] = pd.to_datetime(df_temp["end"], format="%Y-%m-%d %H:%M:%S", errors="coerce")
                        df_temp = df_temp[(df_temp["start"] < period.end) & (df_temp["end"] > period.start)]
                        dict_df[file] = df_temp
                    except pd.errors.EmptyDataError:
                        logger.warning(f"File '{file}' is empty. Skipping.")
                        continue
        return dict_df

    object_df_attributes = self.perfdb.objects.instances.attributes.get(object_names=[object_name])
    folder_name = "./Alarmes/" + object_df_attributes[object_name]["ftp_folder_name"]["attribute_value"]

    self.ftp_client.change_directory(folder_name)
    files_list = self.ftp_client.list_contents()

    dict_df = {}
    dict_df = read_alarm_files(files_list, object_name, period)

    df = pd.concat(dict_df.values(), ignore_index=True)
    df = df.sort_values(by=["start", "end"]).reset_index(drop=True)

    return df

feature_values(object_name, features_df, period)

Retrieves time-series feature data for a specific object from a Huawei FTP server.

This method orchestrates the entire process of fetching data. It first queries the performance database for object metadata (like ESN and FTP folder), then connects to the FTP server to find and download the relevant daily CSV files. Each file, which may contain data for multiple devices, is parsed to extract the specific data block for the requested object. Finally, data from all files is concatenated and cleaned.

Parameters:

  • object_name

    (str) –

    Name of the desired object (e.g., an inverter) as defined in performance_db.

  • features_df

    (DataFrame) –

    A DataFrame defining the features to be extracted. It must contain the columns: "name", "standardized_name", "name_in_data_source", and "aggregation_type".

  • period

    (DateTimeRange) –

    The start and end datetimes for the desired data period.

Returns:

  • DataFrame

    A pandas DataFrame indexed by datetime, with columns corresponding to the standardized feature names. The data is sorted and clipped to the requested period.

Source code in echo_dataimporter/huawei_handler.py
@validate_call
def feature_values(
    self,
    object_name: str,
    features_df: pd.DataFrame,
    period: DateTimeRange,
) -> pd.DataFrame:
    """Retrieves time-series feature data for a specific object from a Huawei FTP server.

    This method orchestrates the entire process of fetching data. It first queries the performance database for object metadata (like ESN and FTP folder), then connects to the FTP server to find and download the relevant daily CSV files.
    Each file, which may contain data for multiple devices, is parsed to extract the specific data block for the requested object. Finally, data from all files is concatenated and cleaned.

    Parameters
    ----------
    object_name : str
        Name of the desired object (e.g., an inverter) as defined in `performance_db`.
    features_df : pd.DataFrame
        A DataFrame defining the features to be extracted. It must contain the columns: "name", "standardized_name", "name_in_data_source", and "aggregation_type".
    period : DateTimeRange
        The start and end datetimes for the desired data period.

    Returns
    -------
    pd.DataFrame
        A pandas DataFrame indexed by datetime, with columns corresponding to the standardized feature names. The data is sorted and clipped to the requested `period`.

    """
    # informações gerais sobre o objeto --------------------------------------------------------------------------
    object_df_attributes = self.perfdb.objects.instances.attributes.get(object_names=[object_name])
    esn = object_df_attributes[object_name]["serial_number"]["attribute_value"]
    folder_name = object_df_attributes[object_name]["ftp_folder_name"]["attribute_value"]

    # lista de pastas no repositorio FTP -------------------------------------------------------------------------
    folder_list = self.ftp_client.list_contents()

    if folder_name is not None and folder_name not in folder_list:
        raise ValueError(f"Folder '{folder_name}' not found in the FTP server. Available folders: {folder_list}")
    self.ftp_client.change_directory(folder_name)

    files_list = self.ftp_client.list_contents()

    # Conversão da data em nome de arquivos
    daily_dates = pd.date_range(start=period.start, end=period.end, freq="D")

    files_to_read = [f"min{date.strftime('%Y%m%d')}.csv" for date in daily_dates]
    files_filtred = [file for file in files_list if file in files_to_read]
    if files_filtred is None or len(files_filtred) == 0:
        raise ValueError(f"No files found in the folder '{folder_name}' for the specified period: {period}")

    # leitura dos arquivos baixados e criação do dataframe --------------------------------------------------------

    def start_end_lines(path: Path) -> pd.DataFrame:
        """Scans a data file to map each inverter ESN to its data block boundaries.

        This utility function reads a file line by line to find all inverter identifier lines (e.g., '#INV... ESN:...'). It calculates the start and end line numbers for each inverter's data block, which is essential for parsing files containing data from multiple inverters.

        Parameters
        ----------
        path : Path
            The full path to the data file to be scanned.

        Returns
        -------
        pd.DataFrame
            A DataFrame where the index is the inverter ESN (str) and columns are 'start_line' and 'end_line'. The end_line for the last inverter is the total number of lines in the file.
        """
        inverter_map = {}

        if not path.exists():
            raise FileNotFoundError(f"File '{path}' does not exist")

        with Path(path).open(encoding="utf-8") as f:
            for number_line, line in enumerate(f, start=1):
                if "ESN:" in line:  # if "#INV" in line and "ESN:" in line:
                    try:
                        esn = line.split(":")[-1].strip()
                        if esn:
                            inverter_map[esn] = number_line
                    except IndexError:
                        continue

        if not inverter_map:
            raise ValueError(f"No ESN found in the file '{path}'")

        df_lines = pd.DataFrame.from_dict(inverter_map, orient="index", columns=["start line"])
        df_lines.loc["end line"] = number_line
        df_lines["end line"] = df_lines["start line"].shift(-1)
        df_lines = df_lines.drop("end line")

        return df_lines

    def read_smartlogger_files(esn: str, df_lines: pd.DataFrame) -> pd.DataFrame:
        """Reads a specific data block for one inverter from a smartlogger file.

        This function uses a pre-generated line map (`df_lines`) to isolate and read only the data block corresponding to a specific ESN. It then performs several data cleaning and formatting operations:
        - Cleans column headers.
        - Sets a proper datetime index.
        - Resamples the data to a consistent frequency.
        - Fills data gaps by forward-filling, using 'Cycle Time' as a guide.
        - Renames columns to a standard format using `features_df` from the parent scope.

        Parameters
        ----------
        esn : str
            The ESN (serial number) of the specific inverter whose data is to be read.
        df_lines : pd.DataFrame
            A DataFrame, indexed by ESN, containing 'start_line' and 'end_line' columns that map each inverter to its data block within the file.

        Returns
        -------
        pd.DataFrame
            A clean, formatted DataFrame with a datetime index and standardized column names for the specified inverter. Returns an empty DataFrame if the data block for the ESN contains no data rows.

        """
        logger.info(f"Starting block processing for ESN '{esn}.")

        if esn not in df_lines.index:
            raise ValueError(f"ESN '{esn}' not found in the file '{file}'")

        start_line = int(df_lines.loc[esn, "start line"])
        end_line = int(df_lines.loc[esn, "end line"])
        rows_to_read = (end_line - 2) - start_line

        if rows_to_read <= 0:
            logger.warning("Data block for ESN '%s' is empty or malformed. Skipping.", esn)
            return pd.DataFrame()  # Retorna um DataFrame vazio.

        try:
            logger.info(f"Reading data block for ESN '{esn}' from file '{path}'...")
            df = pd.read_csv(
                path,
                skiprows=start_line,
                nrows=rows_to_read,
                sep=";",
                encoding="utf-8",
                index_col=False,
            )
            logger.info(f"Data block for ESN '{esn}' read successfully. Found {len(df)} rows.")

        except Exception as e:
            raise OSError(f"Failed to read data block for ESN '{esn}'.") from e

        df = df.rename(columns=lambda col: col.strip().lstrip("#"))

        time_column_name = df.columns[0]

        if time_column_name == "Time":
            df[time_column_name] = pd.to_datetime(df[time_column_name], format="%Y-%m-%d %H:%M:%S")
            df = df.set_index(time_column_name)
        else:
            logger.warning(f"Expected first column to be 'Time', but found '{time_column_name}'.")

        logger.info(f"Renaming columns for ESN '{esn}'...")

        df = df.sort_index()

        diff_index = df.index.to_series().diff()
        frequency_index = diff_index.mode().iloc[0]

        df = df.asfreq(frequency_index)

        values_in_cycle_time = df["Cycle Time"].dropna().unique()
        frequency_index_minute = int(frequency_index.total_seconds() / 60)
        values_in_cycle_time = values_in_cycle_time[(values_in_cycle_time != 0) & (values_in_cycle_time != frequency_index_minute)]

        for value in values_in_cycle_time:
            if value is not None:
                index = df[df["Cycle Time"] == value].index
                df_preenchedor = df.loc[index[0]].values
                end_line_fill = index[0] - frequency_index
                start_line_fill = end_line_fill - relativedelta(minutes=int(value - frequency_index_minute))
                df.loc[start_line_fill:end_line_fill] = df_preenchedor
            else:
                logger.warning(f"Found None value in 'Cycle Time' for ESN '{esn}'. Skipping filling.")

        original_col_name = features_df["name_in_data_source"].tolist()
        new_col_name = features_df["name"].tolist()

        if len(original_col_name) != len(new_col_name):
            raise ValueError("The length of original_col_name and new_col_name must be the same.")

        rename_dict = dict(zip(original_col_name, new_col_name, strict=False))

        df = df[original_col_name]

        df = df.rename(columns=rename_dict)

        df = df[new_col_name]

        return df

    with tempfile.TemporaryDirectory(dir=".") as temp_folder:
        temp_folder_path = Path(temp_folder)
        df_list = []
        for file in files_filtred:
            self.ftp_client.get_file(file, dest_directory=temp_folder_path)
            path = temp_folder_path / file
            df_lines = start_end_lines(path=path)
            df_temp = read_smartlogger_files(esn, df_lines)
            df_list.append(df_temp)

        df = pd.concat(df_list)
        df = df.sort_index()
        df = df.loc[period.start : period.end]

        return df

object_types()

Method not applicable to Huawei

Source code in echo_dataimporter/huawei_handler.py
def object_types(self) -> pd.DataFrame:
    """Method not applicable to Huawei"""
    raise NotImplementedError("Method not implemented yet")

objects()

Method not applicable to Huawei

Source code in echo_dataimporter/huawei_handler.py
def objects(self) -> pd.DataFrame:
    """Method not applicable to Huawei"""
    raise NotImplementedError("Method not implemented yet")