Skip to content

Solcast Web

The SolcastDataHandler is a specialized class, designed to automate the extraction of historical meteorological data from the Solcast web portal.

This class operates via web scraping. The initialization process simulates a user login to establish an authenticated session. Subsequently, to fetch data, it automates a series of interactions with the portal: it creates a request to generate a data file, monitors the status of that request, and finally, downloads the file when the data set is ready.

Its main feature is:

  • Extracting Historical Data (Features): Through the feature_values method, the handler executes the multi-step process of requesting, polling, and downloading data for a specific location (defined by latitude and longitude in the database) and period. It handles the processing of the final ZIP/CSV file, renaming columns to a standard format, and adjusting timestamps to the local timezone.

How to Use

1. Initialization and Authentication

The initialization of this class requires a data_source_name defined in performance_db, as it performs an authentication process (login) with the Solcast portal upon its creation. The resulting instance will already contain an authenticated session, ready to make data requests.

from echo_dataimporter import SolcastDataHandler

# Initializes and authenticates the session with the Solcast portal
# Requires the "MySolcastDataSource" to be configured in performance_db
handler = SolcastDataHandler(data_source_name="MySolcastDataSource")

2. Extracting Historical Data

After initialization, use the feature_values method to request, monitor, and download the data file. Remember that this process can take several minutes, depending on the requested period.

import pandas as pd
from datetime import datetime
from echo_datetimerange import DateTimeRange

# 1. Define the search parameters
# This object needs to have 'latitude' and 'longitude' attributes in the DB
object_name = "MyVirtualMetMast"
period = DateTimeRange(datetime(2025, 8, 1), datetime(2025, 8, 5))

# 2. Define the features (variables) to be fetched.
# 'name_in_data_source' is the exact name in the Solcast portal
features_to_fetch = pd.DataFrame([
    {"name_in_data_source": "Ghi", "name": "ghi_w_m2"},
    {"name_in_data_source": "Temperature", "name": "temperature_c"},
])

# 3. Call the method to fetch the data
# This process can be time-consuming
historical_data = handler.feature_values(
    object_name=object_name,
    features_df=features_to_fetch,
    period=period,
    time_interval="PT30M" # Optional: Sets the data interval to 30 minutes
)

Required Configuration

Diagram of relationships

For the SolcastDataHandler to work correctly, the entities in performance_db must be configured to support the web scraping process and the search for data by geographic coordinates.

  • The Data Source:

    • A data source instance must be created to store the login credentials for the Solcast portal.
    • It must have the following attributes:
      • host_address: The base URL of the Solcast portal (e.g., https://toolkit.solcast.com.au/).
      • user: The username for the login.
      • password: The password for the login.
      • provider: Additional parameter required for authentication.
      • rememberme: Additional parameter required for authentication.
    • This instance must belong to a specific Data Source Type (e.g., solcast_api).
  • The Object (Virtual Meteorological Mast):

    • Represents the location for which meteorological data will be extracted.
    • As with the TempoOkDataHandler, this object can be of the virtual_met_mast Model.
    • It needs to have the location attributes:
      • latitude: The latitude of the point of interest.
      • longitude: The longitude of the point of interest.
  • Features and Links:

    • The features define the variables to be read (e.g., Ghi, Temperature) and contain the mapping between the name_in_data_source (the name in Solcast) and the name (the standardized name).
    • The crucial link is that the set of features associated with the virtual_met_mast model must also be linked to the same Data Source Type (e.g., solcast_api). This ensures that the handler uses the correct set of feature definitions for the Solcast portal.

Class Definition

SolcastDataHandler(data_source_name=None, connection_properties=None, **kwargs)

Subclass of DataHandler used to interact with Solcast API.

This method establishes an authenticated session by simulating a user login. It sends a POST request to the /ToolkitAuthenticate endpoint with credentials and other parameters fetched from performance_db via the data_source_name.

The resulting handler instance holds a valid session cookie, enabling subsequent web scraping requests to pages that require a login.

Parameters:

  • data_source_name

    (str | None, default: None ) –

    The name of the data source as defined in performance_db. This is used to fetch all necessary connection and login parameters (host, user, password, provider, rememberme).

  • connection_properties

    (HttpConnProperties | None, default: None ) –

    Direct connection properties (host, user, password). Note: In the current implementation, login parameters like provider are still fetched using data_source_name. Defaults to None.

  • **kwargs

    Arbitrary keyword arguments. Currently unused by this method.

Source code in echo_dataimporter/solcast_handler.py
@validate_call
def __init__(self, data_source_name: str | None = None, connection_properties: HttpConnProperties | None = None, **kwargs) -> None:  # noqa: ARG002
    """Initializes and authenticates a session with the Solcast web portal.

    This method establishes an authenticated session by simulating a user
    login. It sends a POST request to the `/ToolkitAuthenticate` endpoint
    with credentials and other parameters fetched from `performance_db` via
    the `data_source_name`.

    The resulting handler instance holds a valid session cookie, enabling
    subsequent web scraping requests to pages that require a login.

    Parameters
    ----------
    data_source_name : str | None, optional
        The name of the data source as defined in `performance_db`. This is used to fetch all necessary connection and login parameters (`host`, `user`, `password`, `provider`, `rememberme`).
    connection_properties : HttpConnProperties | None, optional
        Direct connection properties (`host`, `user`, `password`).
        Note: In the current implementation, login parameters like `provider` are still fetched using `data_source_name`.
        Defaults to None.
    **kwargs
        Arbitrary keyword arguments. Currently unused by this method.
    """
    super().__init__()

    logger.info("Initializing SolcastDataHandler...")

    if data_source_name is None and connection_properties is None:
        raise ValueError("Either data_source_name or connection_properties must be specified")

    if data_source_name is not None:
        logger.info(f"Using data source name: {data_source_name}")
        ds_dict = self.perfdb.datasources.instances.get(
            data_source_types_names=["solcast_api"],
            get_attributes=True,
            output_type="dict",
        )

        if data_source_name not in ds_dict:
            raise RuntimeError(f"Data source '{data_source_name}' not defined in performance_db")

        ds_dict = ds_dict[data_source_name]

        logger.info(f"Data source details: {ds_dict}")

        connection_properties = HttpConnProperties(
            host=ds_dict["host_address"],
            user=ds_dict["user"],
            password=ds_dict["password"],
        )

    logger.info(f"Using connection properties: {connection_properties}")

    self.conn_props = connection_properties

    conn = HttpHandler(connection_properties=self.conn_props)

    logger.info("Setting up connection to Solcast API...")

    payload = {
        "password": self.conn_props.password,
        "provider": ds_dict["provider"],
        "rememberMe": ds_dict["rememberme"],
        "userName": self.conn_props.user,
    }
    logger.info("Connecting to Solcast API...")

    response = conn.post("ToolkitAuthenticate", json=payload, response_ok=None)

    response_json = response.json()

    logger.info(f"Response from Solcast API: {response_json}")

    try:
        if response_json["response_status"]["error_code"] == "Unauthorized":  # se é diferente de vazio
            raise RuntimeError("Error connecting to Solcast API: Unauthorized access. Check your credentials.")
    except KeyError:
        logger.warning(
            "Response from Solcast API does not contain 'error_code'. This may indicate a change in the API response format.",
        )

    logger.info("Successfully connected to Solcast API and set necessary cookies.")

    self.conn_handler = conn

feature_values(object_name, features_df, period, time_interval='PT5M', batch_request=True)

Scrapes historical weather data for a location from the Solcast web portal.

This method automates the process of generating and downloading a historical data report from Solcast. It operates via a multi-step, asynchronous-like process: 1. It submits a batch job request to generate the data file. 2. It polls a status endpoint periodically until the job is complete. 3. It downloads the resulting ZIP archive, extracts the CSV, and processes it into a standardized DataFrame.

Parameters:

  • object_name

    (str) –

    The name of the object. Its latitude and longitude attributes will be fetched from performance_db to define the location.

  • features_df

    (DataFrame) –

    DataFrame defining the features to query. Must contain the columns name_in_data_source (the variable name in Solcast) and name (the desired standardized name).

  • period

    (DateTimeRange) –

    The time range for the data query.

  • time_interval

    (str | None, default: 'PT5M' ) –

    The time resolution of the data, in ISO 8601 duration format. Defaults to "PT60M" (60 minutes).

  • batch_request

    (bool | None, default: True ) –

    If True, initiates a new batch request job. If False, uses a hardcoded batch ID for debugging. Defaults to True.

Returns:

  • DataFrame

    A pandas DataFrame containing the requested historical data. The index is a DatetimeIndex (adjusted to local time, GMT-3) and columns are the standardized feature names.

Source code in echo_dataimporter/solcast_handler.py
@validate_call
def feature_values(
    self,
    object_name: str,
    features_df: pd.DataFrame,
    period: DateTimeRange,
    time_interval: str | None = "PT5M",
    batch_request: bool | None = True,
) -> pd.DataFrame:
    """Scrapes historical weather data for a location from the Solcast web portal.

    This method automates the process of generating and downloading a historical data report from Solcast. It operates via a multi-step, asynchronous-like process:
    1. It submits a batch job request to generate the data file.
    2. It polls a status endpoint periodically until the job is complete.
    3. It downloads the resulting ZIP archive, extracts the CSV, and processes it into a standardized DataFrame.

    Parameters
    ----------
    object_name : str
        The name of the object. Its `latitude` and `longitude` attributes will be fetched from `performance_db` to define the location.
    features_df : pd.DataFrame
        DataFrame defining the features to query. Must contain the columns `name_in_data_source` (the variable name in Solcast) and `name` (the desired standardized name).
    period : DateTimeRange
        The time range for the data query.
    time_interval : str | None, optional
        The time resolution of the data, in ISO 8601 duration format.
        Defaults to "PT60M" (60 minutes).
    batch_request : bool | None, optional
        If `True`, initiates a new batch request job. If `False`, uses a hardcoded batch ID for debugging.
        Defaults to `True`.

    Returns
    -------
    pd.DataFrame
        A pandas DataFrame containing the requested historical data. The index is a `DatetimeIndex` (adjusted to local time, GMT-3) and columns are the standardized feature names.
    """
    logger.info("Starting feature values retrieval from Solcast API...")

    period.start = datetime(period.start.year, period.start.month, period.start.day, 0, 0, 0)
    period.end = datetime(period.end.year, period.end.month, period.end.day, 0, 0, 0)

    if period.start.date() > pd.Timestamp.now().date() or period.end.date() > pd.Timestamp.now().date():
        logger.error(f"Invalid period: {period}")
        raise ValueError("Invalid period: start date is after today")

    period.start = datetime(period.start.year, period.start.month, 1)

    if (pd.Timestamp.now().date() - period.end.date()).days < 7:
        if (period.end.date() - period.start.date()).days < 9:
            period.start = datetime(period.start.year, period.start.month - 1, 1)
            period.end = period.end - relativedelta(days=9)
        else:
            period.end = period.end - relativedelta(days=9)

    original_period = period
    period_1day_more = DateTimeRange(
        start=period.start,
        end=period.end + relativedelta(days=1),
    )

    end = period_1day_more.end.strftime("%Y-%m-%dT%H:%M:%S.000Z")
    start = period_1day_more.start.strftime("%Y-%m-%dT%H:%M:%S.000Z")

    logger.info(f"Retrieving feature values for object '{object_name}' from {start} to {end} with time interval '{time_interval}'.")

    try:
        logger.info(f"Fetching object info for '{object_name}' from performance_db...")
        object_info = self.perfdb.objects.instances.attributes.get(
            object_names=[object_name],
        )
        object_info = object_info[object_name]
    except KeyError as e:
        raise RuntimeError(f"Object '{object_name}' not defined in performance_db") from e

    logger.info(f"Object info retrieved: {object_info}")

    try:
        dict_names = features_df.set_index("name_in_data_source")["name"].to_dict()

        name_list = features_df["name"].tolist()
        name_list_solcast = [name for name in name_list if "Solcast" in name]
        output_parameters = features_df[features_df["name"].isin(name_list_solcast)]["name_in_data_source"].tolist()

        logger.info(f"Output parameters for Solcast API: {output_parameters}")

        dict_names = {name: dict_names[name] for name in output_parameters if name in dict_names}
        logger.info(f"Dictionary of names for renaming: {dict_names}")
    except KeyError as e:
        logger.error(f"Error in feature definitions: {e}")
        raise RuntimeError(f"Feature definitions are not correctly defined for object '{object_name}'") from e

    payload_request = {
        "end": end,
        "format": "csv",
        "locations": [
            {
                "latitude": object_info["latitude"]["attribute_value"],
                "longitude": object_info["longitude"]["attribute_value"],
                "array_type": "",
                "azimuth": "",
                "capacity": 5,
                "loss_factor": "",
                "tilt": "",
            },
        ],
        "output_parameters": output_parameters,
        "period": time_interval,
        "start": start,
        "time_zone": "utc",
        "type": "radiation_and_weather",
    }

    logger.info(f"Payload for Solcast API request: {payload_request}")

    if batch_request:
        logger.info("Creating batch request for Solcast API...")
        response_request = self.conn_handler.post(
            "CreateHistoricRadiationAndWeatherBatchRequest",
            json=payload_request,
            response_ok=None,
        )

        logger.info(f"Batch request response: {response_request.status_code} - {response_request.text}")

        if response_request.status_code != 200:
            logger.error(f"Error creating batch request: {response_request.status_code} - {response_request.text}")
            raise RuntimeError(f"Failed to create batch request: {response_request.status_code}")

        response_request_json = response_request.json()
    else:
        response_request_json = {}
        response_request_json["batch_id"] = "a5016602-74e4-485b-be7c-f7ba1e73eab5"

    logger.info(f"Batch request created with ID: {response_request_json['batch_id']}")
    endpoint = f"EvaluateHistoricBatch?batch_id={response_request_json['batch_id']}"
    status = "Pending"

    while status != "Ready":
        logger.info(f"Checking status of batch request with ID: {response_request_json['batch_id']}...")
        response_status = self.conn_handler.get(
            endpoint,
            response_ok=None,
        )

        if response_status.status_code != 200:
            logger.error(f"Error fetching batch status: {response_status.status_code} - {response_status.text}")
            raise RuntimeError(f"Failed to fetch batch status: {response_status.status_code}")

        status = response_status.json()["status"]

        logger.info(f"Batch status: {status}")

        if status == "Pending":
            logger.info("Batch is still pending, waiting for 60 seconds before checking again...")
            time.sleep(60)

    logger.info("Batch is ready, downloading the results...")

    uri = response_status.json()["uri"]
    response_download = httpx.get(uri, stream=True)

    with tempfile.TemporaryDirectory() as caminho_pasta_temporaria:
        parte_principal_uri = uri.split("?")[0]
        nome_arquivo = parte_principal_uri.split("/")[-1]
        caminho_completo_para_salvar = os.path.join(caminho_pasta_temporaria, nome_arquivo)  # noqa: PTH118
        response_download = httpx.get(uri, stream=True)
        logger.info(f"Downloading file from {uri} to {caminho_completo_para_salvar}...")
        with Path(caminho_completo_para_salvar).open("wb") as file:
            file.writelines(response_download.iter_content(chunk_size=8192))

        with zipfile.ZipFile(caminho_completo_para_salvar, "r") as zip_ref:
            nome_do_arquivo_interno = zip_ref.namelist()[0]

            # Extrai o arquivo para a mesma pasta temporária
            zip_ref.extract(nome_do_arquivo_interno, caminho_pasta_temporaria)

        caminho_arquivo_final = os.path.join(caminho_pasta_temporaria, nome_do_arquivo_interno)  # noqa: PTH118

        df = pd.read_csv(caminho_arquivo_final, sep=",")
        df.index = df["period_end"]
        df.index = pd.to_datetime(df.index, format="%Y-%m-%dT%H:%M:%S+00:00")
        df = df.drop(columns=["period_end", "period"])

    logger.info("Feature values DataFrame created successfully.")

    df = df.rename(columns=dict_names)

    df = df.shift(freq="-3H")

    if "WeatherTypeSolcast_1h.REP" in df.columns:
        dict_weather = {
            "MOSTLY CLOUDY": 1,
            "OVERCAST": 2,
            "PARTLY CLOUDY": 3,
            "MOSTLY SUNNY": 4,
            "SUNNY": 5,
            "CLEAR": 6,
            "MOSTLY CLEAR": 7,
        }

        df["WeatherTypeSolcast_1h.REP"] = df["WeatherTypeSolcast_1h.REP"].map(dict_weather)

    df = df[dict_names.values()]

    df = df.loc[original_period.start : original_period.end]

    logger.info("Feature values DataFrame shifted by -3 hours.")

    return df

object_types()

Method not applicable to Way2

Source code in echo_dataimporter/solcast_handler.py
def object_types(self) -> pd.DataFrame:
    """Method not applicable to Way2"""
    raise NotImplementedError("Method not implemented yet")