Solcast Web¶
The SolcastDataHandler is a specialized class, designed to automate the extraction of historical meteorological data from the Solcast web portal.
This class operates via web scraping. The initialization process simulates a user login to establish an authenticated session. Subsequently, to fetch data, it automates a series of interactions with the portal: it creates a request to generate a data file, monitors the status of that request, and finally, downloads the file when the data set is ready.
Its main feature is:
- Extracting Historical Data (Features): Through the
feature_valuesmethod, the handler executes the multi-step process of requesting, polling, and downloading data for a specific location (defined by latitude and longitude in the database) and period. It handles the processing of the final ZIP/CSV file, renaming columns to a standard format, and adjusting timestamps to the local timezone.
How to Use¶
1. Initialization and Authentication¶
The initialization of this class requires a data_source_name defined in performance_db, as it performs an authentication process (login) with the Solcast portal upon its creation. The resulting instance will already contain an authenticated session, ready to make data requests.
from echo_dataimporter import SolcastDataHandler
# Initializes and authenticates the session with the Solcast portal
# Requires the "MySolcastDataSource" to be configured in performance_db
handler = SolcastDataHandler(data_source_name="MySolcastDataSource")
2. Extracting Historical Data¶
After initialization, use the feature_values method to request, monitor, and download the data file. Remember that this process can take several minutes, depending on the requested period.
import pandas as pd
from datetime import datetime
from echo_datetimerange import DateTimeRange
# 1. Define the search parameters
# This object needs to have 'latitude' and 'longitude' attributes in the DB
object_name = "MyVirtualMetMast"
period = DateTimeRange(datetime(2025, 8, 1), datetime(2025, 8, 5))
# 2. Define the features (variables) to be fetched.
# 'name_in_data_source' is the exact name in the Solcast portal
features_to_fetch = pd.DataFrame([
{"name_in_data_source": "Ghi", "name": "ghi_w_m2"},
{"name_in_data_source": "Temperature", "name": "temperature_c"},
])
# 3. Call the method to fetch the data
# This process can be time-consuming
historical_data = handler.feature_values(
object_name=object_name,
features_df=features_to_fetch,
period=period,
time_interval="PT30M" # Optional: Sets the data interval to 30 minutes
)
Required Configuration¶

For the SolcastDataHandler to work correctly, the entities in performance_db must be configured to support the web scraping process and the search for data by geographic coordinates.
-
The Data Source:
- A data source instance must be created to store the login credentials for the Solcast portal.
- It must have the following attributes:
host_address: The base URL of the Solcast portal (e.g.,https://toolkit.solcast.com.au/).user: The username for the login.password: The password for the login.provider: Additional parameter required for authentication.rememberme: Additional parameter required for authentication.
- This instance must belong to a specific Data Source Type (e.g.,
solcast_api).
-
The Object (Virtual Meteorological Mast):
- Represents the location for which meteorological data will be extracted.
- As with the
TempoOkDataHandler, this object can be of thevirtual_met_mastModel. - It needs to have the location attributes:
latitude: The latitude of the point of interest.longitude: The longitude of the point of interest.
-
Features and Links:
- The
featuresdefine the variables to be read (e.g., Ghi, Temperature) and contain the mapping between thename_in_data_source(the name in Solcast) and thename(the standardized name). - The crucial link is that the set of
featuresassociated with thevirtual_met_mastmodel must also be linked to the same Data Source Type (e.g.,solcast_api). This ensures that the handler uses the correct set of feature definitions for the Solcast portal.
- The
Class Definition¶
SolcastDataHandler(data_source_name=None, connection_properties=None, **kwargs)
¶
Subclass of DataHandler used to interact with Solcast API.
This method establishes an authenticated session by simulating a user
login. It sends a POST request to the /ToolkitAuthenticate endpoint
with credentials and other parameters fetched from performance_db via
the data_source_name.
The resulting handler instance holds a valid session cookie, enabling subsequent web scraping requests to pages that require a login.
Parameters:
-
(data_source_name¶str | None, default:None) –The name of the data source as defined in
performance_db. This is used to fetch all necessary connection and login parameters (host,user,password,provider,rememberme). -
(connection_properties¶HttpConnProperties | None, default:None) –Direct connection properties (
host,user,password). Note: In the current implementation, login parameters likeproviderare still fetched usingdata_source_name. Defaults to None. -
–**kwargs¶Arbitrary keyword arguments. Currently unused by this method.
Source code in echo_dataimporter/solcast_handler.py
@validate_call
def __init__(self, data_source_name: str | None = None, connection_properties: HttpConnProperties | None = None, **kwargs) -> None: # noqa: ARG002
"""Initializes and authenticates a session with the Solcast web portal.
This method establishes an authenticated session by simulating a user
login. It sends a POST request to the `/ToolkitAuthenticate` endpoint
with credentials and other parameters fetched from `performance_db` via
the `data_source_name`.
The resulting handler instance holds a valid session cookie, enabling
subsequent web scraping requests to pages that require a login.
Parameters
----------
data_source_name : str | None, optional
The name of the data source as defined in `performance_db`. This is used to fetch all necessary connection and login parameters (`host`, `user`, `password`, `provider`, `rememberme`).
connection_properties : HttpConnProperties | None, optional
Direct connection properties (`host`, `user`, `password`).
Note: In the current implementation, login parameters like `provider` are still fetched using `data_source_name`.
Defaults to None.
**kwargs
Arbitrary keyword arguments. Currently unused by this method.
"""
super().__init__()
logger.info("Initializing SolcastDataHandler...")
if data_source_name is None and connection_properties is None:
raise ValueError("Either data_source_name or connection_properties must be specified")
if data_source_name is not None:
logger.info(f"Using data source name: {data_source_name}")
ds_dict = self.perfdb.datasources.instances.get(
data_source_types_names=["solcast_api"],
get_attributes=True,
output_type="dict",
)
if data_source_name not in ds_dict:
raise RuntimeError(f"Data source '{data_source_name}' not defined in performance_db")
ds_dict = ds_dict[data_source_name]
logger.info(f"Data source details: {ds_dict}")
connection_properties = HttpConnProperties(
host=ds_dict["host_address"],
user=ds_dict["user"],
password=ds_dict["password"],
)
logger.info(f"Using connection properties: {connection_properties}")
self.conn_props = connection_properties
conn = HttpHandler(connection_properties=self.conn_props)
logger.info("Setting up connection to Solcast API...")
payload = {
"password": self.conn_props.password,
"provider": ds_dict["provider"],
"rememberMe": ds_dict["rememberme"],
"userName": self.conn_props.user,
}
logger.info("Connecting to Solcast API...")
response = conn.post("ToolkitAuthenticate", json=payload, response_ok=None)
response_json = response.json()
logger.info(f"Response from Solcast API: {response_json}")
try:
if response_json["response_status"]["error_code"] == "Unauthorized": # se é diferente de vazio
raise RuntimeError("Error connecting to Solcast API: Unauthorized access. Check your credentials.")
except KeyError:
logger.warning(
"Response from Solcast API does not contain 'error_code'. This may indicate a change in the API response format.",
)
logger.info("Successfully connected to Solcast API and set necessary cookies.")
self.conn_handler = conn
feature_values(object_name, features_df, period, time_interval='PT5M', batch_request=True)
¶
Scrapes historical weather data for a location from the Solcast web portal.
This method automates the process of generating and downloading a historical data report from Solcast. It operates via a multi-step, asynchronous-like process: 1. It submits a batch job request to generate the data file. 2. It polls a status endpoint periodically until the job is complete. 3. It downloads the resulting ZIP archive, extracts the CSV, and processes it into a standardized DataFrame.
Parameters:
-
(object_name¶str) –The name of the object. Its
latitudeandlongitudeattributes will be fetched fromperformance_dbto define the location. -
(features_df¶DataFrame) –DataFrame defining the features to query. Must contain the columns
name_in_data_source(the variable name in Solcast) andname(the desired standardized name). -
(period¶DateTimeRange) –The time range for the data query.
-
(time_interval¶str | None, default:'PT5M') –The time resolution of the data, in ISO 8601 duration format. Defaults to "PT60M" (60 minutes).
-
(batch_request¶bool | None, default:True) –If
True, initiates a new batch request job. IfFalse, uses a hardcoded batch ID for debugging. Defaults toTrue.
Returns:
-
DataFrame–A pandas DataFrame containing the requested historical data. The index is a
DatetimeIndex(adjusted to local time, GMT-3) and columns are the standardized feature names.
Source code in echo_dataimporter/solcast_handler.py
@validate_call
def feature_values(
self,
object_name: str,
features_df: pd.DataFrame,
period: DateTimeRange,
time_interval: str | None = "PT5M",
batch_request: bool | None = True,
) -> pd.DataFrame:
"""Scrapes historical weather data for a location from the Solcast web portal.
This method automates the process of generating and downloading a historical data report from Solcast. It operates via a multi-step, asynchronous-like process:
1. It submits a batch job request to generate the data file.
2. It polls a status endpoint periodically until the job is complete.
3. It downloads the resulting ZIP archive, extracts the CSV, and processes it into a standardized DataFrame.
Parameters
----------
object_name : str
The name of the object. Its `latitude` and `longitude` attributes will be fetched from `performance_db` to define the location.
features_df : pd.DataFrame
DataFrame defining the features to query. Must contain the columns `name_in_data_source` (the variable name in Solcast) and `name` (the desired standardized name).
period : DateTimeRange
The time range for the data query.
time_interval : str | None, optional
The time resolution of the data, in ISO 8601 duration format.
Defaults to "PT60M" (60 minutes).
batch_request : bool | None, optional
If `True`, initiates a new batch request job. If `False`, uses a hardcoded batch ID for debugging.
Defaults to `True`.
Returns
-------
pd.DataFrame
A pandas DataFrame containing the requested historical data. The index is a `DatetimeIndex` (adjusted to local time, GMT-3) and columns are the standardized feature names.
"""
logger.info("Starting feature values retrieval from Solcast API...")
period.start = datetime(period.start.year, period.start.month, period.start.day, 0, 0, 0)
period.end = datetime(period.end.year, period.end.month, period.end.day, 0, 0, 0)
if period.start.date() > pd.Timestamp.now().date() or period.end.date() > pd.Timestamp.now().date():
logger.error(f"Invalid period: {period}")
raise ValueError("Invalid period: start date is after today")
period.start = datetime(period.start.year, period.start.month, 1)
if (pd.Timestamp.now().date() - period.end.date()).days < 7:
if (period.end.date() - period.start.date()).days < 9:
period.start = datetime(period.start.year, period.start.month - 1, 1)
period.end = period.end - relativedelta(days=9)
else:
period.end = period.end - relativedelta(days=9)
original_period = period
period_1day_more = DateTimeRange(
start=period.start,
end=period.end + relativedelta(days=1),
)
end = period_1day_more.end.strftime("%Y-%m-%dT%H:%M:%S.000Z")
start = period_1day_more.start.strftime("%Y-%m-%dT%H:%M:%S.000Z")
logger.info(f"Retrieving feature values for object '{object_name}' from {start} to {end} with time interval '{time_interval}'.")
try:
logger.info(f"Fetching object info for '{object_name}' from performance_db...")
object_info = self.perfdb.objects.instances.attributes.get(
object_names=[object_name],
)
object_info = object_info[object_name]
except KeyError as e:
raise RuntimeError(f"Object '{object_name}' not defined in performance_db") from e
logger.info(f"Object info retrieved: {object_info}")
try:
dict_names = features_df.set_index("name_in_data_source")["name"].to_dict()
name_list = features_df["name"].tolist()
name_list_solcast = [name for name in name_list if "Solcast" in name]
output_parameters = features_df[features_df["name"].isin(name_list_solcast)]["name_in_data_source"].tolist()
logger.info(f"Output parameters for Solcast API: {output_parameters}")
dict_names = {name: dict_names[name] for name in output_parameters if name in dict_names}
logger.info(f"Dictionary of names for renaming: {dict_names}")
except KeyError as e:
logger.error(f"Error in feature definitions: {e}")
raise RuntimeError(f"Feature definitions are not correctly defined for object '{object_name}'") from e
payload_request = {
"end": end,
"format": "csv",
"locations": [
{
"latitude": object_info["latitude"]["attribute_value"],
"longitude": object_info["longitude"]["attribute_value"],
"array_type": "",
"azimuth": "",
"capacity": 5,
"loss_factor": "",
"tilt": "",
},
],
"output_parameters": output_parameters,
"period": time_interval,
"start": start,
"time_zone": "utc",
"type": "radiation_and_weather",
}
logger.info(f"Payload for Solcast API request: {payload_request}")
if batch_request:
logger.info("Creating batch request for Solcast API...")
response_request = self.conn_handler.post(
"CreateHistoricRadiationAndWeatherBatchRequest",
json=payload_request,
response_ok=None,
)
logger.info(f"Batch request response: {response_request.status_code} - {response_request.text}")
if response_request.status_code != 200:
logger.error(f"Error creating batch request: {response_request.status_code} - {response_request.text}")
raise RuntimeError(f"Failed to create batch request: {response_request.status_code}")
response_request_json = response_request.json()
else:
response_request_json = {}
response_request_json["batch_id"] = "a5016602-74e4-485b-be7c-f7ba1e73eab5"
logger.info(f"Batch request created with ID: {response_request_json['batch_id']}")
endpoint = f"EvaluateHistoricBatch?batch_id={response_request_json['batch_id']}"
status = "Pending"
while status != "Ready":
logger.info(f"Checking status of batch request with ID: {response_request_json['batch_id']}...")
response_status = self.conn_handler.get(
endpoint,
response_ok=None,
)
if response_status.status_code != 200:
logger.error(f"Error fetching batch status: {response_status.status_code} - {response_status.text}")
raise RuntimeError(f"Failed to fetch batch status: {response_status.status_code}")
status = response_status.json()["status"]
logger.info(f"Batch status: {status}")
if status == "Pending":
logger.info("Batch is still pending, waiting for 60 seconds before checking again...")
time.sleep(60)
logger.info("Batch is ready, downloading the results...")
uri = response_status.json()["uri"]
response_download = httpx.get(uri, stream=True)
with tempfile.TemporaryDirectory() as caminho_pasta_temporaria:
parte_principal_uri = uri.split("?")[0]
nome_arquivo = parte_principal_uri.split("/")[-1]
caminho_completo_para_salvar = os.path.join(caminho_pasta_temporaria, nome_arquivo) # noqa: PTH118
response_download = httpx.get(uri, stream=True)
logger.info(f"Downloading file from {uri} to {caminho_completo_para_salvar}...")
with Path(caminho_completo_para_salvar).open("wb") as file:
file.writelines(response_download.iter_content(chunk_size=8192))
with zipfile.ZipFile(caminho_completo_para_salvar, "r") as zip_ref:
nome_do_arquivo_interno = zip_ref.namelist()[0]
# Extrai o arquivo para a mesma pasta temporária
zip_ref.extract(nome_do_arquivo_interno, caminho_pasta_temporaria)
caminho_arquivo_final = os.path.join(caminho_pasta_temporaria, nome_do_arquivo_interno) # noqa: PTH118
df = pd.read_csv(caminho_arquivo_final, sep=",")
df.index = df["period_end"]
df.index = pd.to_datetime(df.index, format="%Y-%m-%dT%H:%M:%S+00:00")
df = df.drop(columns=["period_end", "period"])
logger.info("Feature values DataFrame created successfully.")
df = df.rename(columns=dict_names)
df = df.shift(freq="-3H")
if "WeatherTypeSolcast_1h.REP" in df.columns:
dict_weather = {
"MOSTLY CLOUDY": 1,
"OVERCAST": 2,
"PARTLY CLOUDY": 3,
"MOSTLY SUNNY": 4,
"SUNNY": 5,
"CLEAR": 6,
"MOSTLY CLEAR": 7,
}
df["WeatherTypeSolcast_1h.REP"] = df["WeatherTypeSolcast_1h.REP"].map(dict_weather)
df = df[dict_names.values()]
df = df.loc[original_period.start : original_period.end]
logger.info("Feature values DataFrame shifted by -3 hours.")
return df
object_types()
¶
Method not applicable to Way2
Source code in echo_dataimporter/solcast_handler.py
def object_types(self) -> pd.DataFrame:
"""Method not applicable to Way2"""
raise NotImplementedError("Method not implemented yet")