Skip to content

Huawei FTP

TODO @LucasCosta97:

  • Add a description of the Huawei FTP data importer.
  • Explain how to use it, including any necessary configurations on the database, smart logger and filezilla server. If your are not able to detail these steps, leave a section with a TODO for me (Bruno) to complete later.

Class Definition

HuaweiFTPDataHandler(data_source_name=None, connection_properties=None, **kwargs)

Subclass of DataHandler used to interact with Huawei FTP.

It can be initiated with data_source_name or connection_properties attributes (at least one of them must not be None).

The connection is done to the root endpoint of the Huawei FTP (only the host is needed) and we expect no actual response, just that cookie ".ASPXAUTH" is set in the session.

This is done to authenticate the user and set the session for the rest of the requests.

Parameters:

  • data_source_name

    (str | None, default: None ) –

    Name of data source as defined in performance_db. If used the connection_properties will be created automatically based on the data source attributes in performance_db. By default None

  • connection_properties

    (HttpConnProperties | None, default: None ) –

    If you want to manually connect, provide the connection properties. By default None

Other Parameters:

  • conn_timeout (int) –

    Connection timeout in seconds. By default 30

  • request_timeout (int) –

    Request timeout in seconds. By default 120

Source code in echo_dataimporter/huawei_handler.py
def __init__(
    self,
    data_source_name: str | None = None,
    connection_properties: FtpConnProperties | None = None,
    **kwargs,
) -> None:
    """Constructor of the HuaweiFTPDataHandler class.

    It can be initiated with data_source_name or connection_properties attributes (at least one of them must not be None).

    The connection is done to the root endpoint of the Huawei FTP (only the host is needed) and we expect no actual response, just that cookie ".ASPXAUTH" is set in the session.

    This is done to authenticate the user and set the session for the rest of the requests.

    Parameters
    ----------
    data_source_name : str | None, optional
        Name of data source as defined in performance_db.
        If used the connection_properties will be created automatically based on the data source attributes in performance_db.
        By default None
    connection_properties : HttpConnProperties | None, optional
        If you want to manually connect, provide the connection properties. By default None

    Other Parameters
    ----------------
    conn_timeout : int, optional
        Connection timeout in seconds. By default 30
    request_timeout : int, optional
        Request timeout in seconds. By default 120

    """
    # calling __init__ from base class
    super().__init__()

    if data_source_name is None and connection_properties is None:
        raise ValueError("Either data_source_name or connection_properties must be specified")

    if data_source_name is not None:
        # getting data sources
        ds_dict = self.perfdb.datasources.instances.get(
            data_source_types_names=kwargs.get("data_source_types_names"),
            get_attributes=True,
            output_type="dict",
        )

        if data_source_name not in ds_dict:
            raise RuntimeError(f"Data source '{data_source_name}' not defined in performance_db")

        ds_dict = ds_dict[data_source_name]

    if connection_properties is not None:
        self.connection_properties = connection_properties
    else:
        self.connection_properties = FtpConnProperties(
            host=ds_dict["host_address"],
            user=ds_dict["user"],
            password=ds_dict["password"],
            timeout=kwargs.get("timeout", 30),
            passive=kwargs.get("passive", True),
        )

    self.ftp_client = FtpHandler(  # essa parte eu nao entendi exatamente
        connection_properties=self.connection_properties,
    )

feature_values(object_name, features_df, period, **kwargs)

Method that returns the time series of features for an object in the data source

Parameters:

  • object_name

    (str) –

    Name of desired object as defined in performance_db.

  • features_df

    (DataFrame) –

    DataFrame with the wanted features. Must contain columns ["name", "standardized_name", "name_in_data_source", "aggregation_type"]

  • period

    (DateTimeRange) –

    Desired period of the alarms.

  • **kwargs

    Other parameters to be passed to the FTP client.

Returns:

  • DataFrame

    DataFrame with time series of features for an object.

Source code in echo_dataimporter/huawei_handler.py
def feature_values(
    self,
    object_name: str,
    features_df: pd.DataFrame,
    period: DateTimeRange,
    **kwargs,  # noqa: ARG002
) -> pd.DataFrame:
    """Method that returns the time series of features for an object in the data source

    Parameters
    ----------
    object_name : str
        Name of desired object as defined in performance_db.
    features_df : pd.DataFrame
        DataFrame with the wanted features. Must contain columns ["name", "standardized_name", "name_in_data_source", "aggregation_type"]
    period : DateTimeRange
        Desired period of the alarms.
    **kwargs
        Other parameters to be passed to the FTP client.

    Returns
    -------
    pd.DataFrame
        DataFrame with time series of features for an object.

    """
    # informações gerais sobre o objeto --------------------------------------------------------------------------
    object_df_attributes = self.perfdb.objects.instances.attributes.get(object_names=[object_name])
    esn = object_df_attributes[object_name]["serial_number"]["attribute_value"]
    folder_name = object_df_attributes[object_name]["ftp_folder_name"]["attribute_value"]

    # lista de pastas no repositorio FTP -------------------------------------------------------------------------
    folder_list = self.ftp_client.list_contents()

    if folder_name is not None and folder_name not in folder_list:
        raise ValueError(f"Folder '{folder_name}' not found in the FTP server. Available folders: {folder_list}")
    self.ftp_client.change_directory(folder_name)

    files_list = self.ftp_client.list_contents()

    # Conversão da data em nome de arquivos
    daily_dates = pd.date_range(start=period.start, end=period.end, freq="D")

    files_to_read = [f"min{date.strftime('%Y%m%d')}.csv" for date in daily_dates]
    files_filtred = [file for file in files_list if file in files_to_read]
    if files_filtred is None or len(files_filtred) == 0:
        raise ValueError(f"No files found in the folder '{folder_name}' for the specified period: {period}")

    # leitura dos arquivos baixados e criação do dataframe --------------------------------------------------------

    def start_end_lines(path: Path) -> pd.DataFrame:
        """Scans a data file to map each inverter ESN to its data block boundaries.

        This utility function reads a file line by line to find all inverter identifier lines (e.g., '#INV... ESN:...'). It then calculates the start and end line numbers for each inverter's data block, returning this information as a DataFrame. This is useful for parsing files that contain data from multiple inverters.

        Parameters
        ----------
        path : Path
            The full path to the data file to be scanned

        Returns
        -------
        pd.DataFrame
            A DataFrame where the index is the inverter ESN (str) and the columns are 'start_line' and 'end_line'. The end_line for the last inverter will be the total number of lines in the file.

        """
        inverter_map = {}

        if not path.exists():
            raise FileNotFoundError(f"File '{path}' does not exist")

        with Path(path).open(encoding="utf-8") as f:
            for number_line, line in enumerate(f, start=1):
                if "ESN:" in line:  # if "#INV" in line and "ESN:" in line:
                    try:
                        esn = line.split(":")[-1].strip()
                        if esn:
                            inverter_map[esn] = number_line
                    except IndexError:
                        continue

        if not inverter_map:
            raise ValueError(f"No ESN found in the file '{path}'")

        df_lines = pd.DataFrame.from_dict(inverter_map, orient="index", columns=["start line"])
        df_lines.loc["end line"] = number_line
        df_lines["end line"] = df_lines["start line"].shift(-1)
        df_lines = df_lines.drop("end line")

        return df_lines

    def read_smartlogger_files(esn: str, df_lines: pd.DataFrame) -> pd.DataFrame:
        """_Read a specific data block for an inverter from a smartlogger file.

        This function isolates a data block within a larger file using the provided line map (df_lines). It then reads, cleans the headers, sets a datetime index, and renames the columns based on an externally defined 'features_df' DataFrame.

        Parameters
        ----------
        esn : str
            The ESN (serial number) of the specific inverter whose data block needs to be read
        df_lines : pd.DataFrame
            A DataFrame, indexed by ESN, containing "start_line" and "end_line" columns that map each inverter to its data block within the file.

        Returns
        -------
        pd.DataFrame
            A clean, formatted DataFrame containing the time-series data for the single, specified inverter. The DataFrame has a datetime index and standardized column names. Returns an empty DataFrame if the data block for the ESN has no data rows.

        """
        logger.info(f"Starting block processing for ESN '{esn}.")

        if esn not in df_lines.index:
            raise ValueError(f"ESN '{esn}' not found in the file '{file}'")

        start_line = int(df_lines.loc[esn, "start line"])
        end_line = int(df_lines.loc[esn, "end line"])
        rows_to_read = (end_line - 2) - start_line

        if rows_to_read <= 0:
            logger.warning("Data block for ESN '%s' is empty or malformed. Skipping.", esn)
            return pd.DataFrame()  # Retorna um DataFrame vazio.

        try:
            logger.info(f"Reading data block for ESN '{esn}' from file '{path}'...")
            df = pd.read_csv(
                path,
                skiprows=start_line,
                nrows=rows_to_read,
                sep=";",
                encoding="utf-8",
                index_col=False,
            )
            logger.info(f"Data block for ESN '{esn}' read successfully. Found {len(df)} rows.")

        except Exception as e:
            raise OSError(f"Failed to read data block for ESN '{esn}'.") from e

        df = df.rename(columns=lambda col: col.strip().lstrip("#"))

        time_column_name = df.columns[0]

        if time_column_name == "Time":
            df[time_column_name] = pd.to_datetime(df[time_column_name], format="%Y-%m-%d %H:%M:%S")
            df = df.set_index(time_column_name)
        else:
            logger.warning(f"Expected first column to be 'Time', but found '{time_column_name}'.")

        logger.info(f"Renaming columns for ESN '{esn}'...")

        df = df.sort_index()

        diff_index = df.index.to_series().diff()
        frequency_index = diff_index.mode().iloc[0]

        df = df.asfreq(frequency_index)

        values_in_cycle_time = df["Cycle Time"].dropna().unique()
        frequency_index_minute = int(frequency_index.total_seconds() / 60)
        values_in_cycle_time = values_in_cycle_time[(values_in_cycle_time != 0) & (values_in_cycle_time != frequency_index_minute)]

        for value in values_in_cycle_time:
            if value is not None:
                index = df[df["Cycle Time"] == value].index
                df_preenchedor = df.loc[index[0]].values
                end_line_fill = index[0] - frequency_index
                start_line_fill = end_line_fill - relativedelta(minutes=int(value - frequency_index_minute))
                df.loc[start_line_fill:end_line_fill] = df_preenchedor
            else:
                logger.warning(f"Found None value in 'Cycle Time' for ESN '{esn}'. Skipping filling.")

        original_col_name = features_df["name_in_data_source"].tolist()
        new_col_name = features_df["name"].tolist()

        if len(original_col_name) != len(new_col_name):
            raise ValueError("The length of original_col_name and new_col_name must be the same.")

        rename_dict = dict(zip(original_col_name, new_col_name, strict=False))

        df = df[original_col_name]

        df = df.rename(columns=rename_dict)

        df = df[new_col_name]

        return df

    with tempfile.TemporaryDirectory(dir=".") as temp_folder:
        temp_folder_path = Path(temp_folder)
        df_list = []
        for file in files_filtred:
            self.ftp_client.get_file(file, dest_directory=temp_folder_path)
            path = temp_folder_path / file
            df_lines = start_end_lines(path=path)
            df_temp = read_smartlogger_files(esn, df_lines)
            df_list.append(df_temp)

        df = pd.concat(df_list)
        df = df.sort_index()
        df = df.loc[period.start : period.end]

        return df

object_types()

Method not applicable to Huawei

Source code in echo_dataimporter/huawei_handler.py
def object_types(self) -> pd.DataFrame:
    """Method not applicable to Huawei"""
    raise NotImplementedError("Method not implemented yet")

objects()

Method not applicable to Huawei

Source code in echo_dataimporter/huawei_handler.py
def objects(self) -> pd.DataFrame:
    """Method not applicable to Huawei"""
    raise NotImplementedError("Method not implemented yet")