Inquiry About Automated Data Download Using Giovanni Software
Inquiry About Automated Data Download Using Giovanni Software
Hi,
I would like to download daily data for specific toll booth locations in Italy, observed from August 2013 to the present. I understand that this is possible using your Giovanni software with the Time-Series Averaged Area feature. However, manually accessing this data through the web interface can be a daunting process.
Could you please suggest a more straightforward method for achieving this using Python or any other programming language you recommend?
I look forward to your response.
Thank you!
I would like to download daily data for specific toll booth locations in Italy, observed from August 2013 to the present. I understand that this is possible using your Giovanni software with the Time-Series Averaged Area feature. However, manually accessing this data through the web interface can be a daunting process.
Could you please suggest a more straightforward method for achieving this using Python or any other programming language you recommend?
I look forward to your response.
Thank you!
Filters:
-
- Posts: 354
- Joined: Mon Sep 30, 2019 1:57 pm America/New_York
- Has thanked: 2 times
- Been thanked: 9 times
Re: Inquiry About Automated Data Download Using Giovanni Software
Hello,
Thanks for your question. To further assist you and direct your question to the appropriate staff members, can you tell me/us what data product and variable you are seeking? You may be directed to bypass Giovanni and use on of our other data download methods, some of which can be scripted.
Thanks for your question. To further assist you and direct your question to the appropriate staff members, can you tell me/us what data product and variable you are seeking? You may be directed to bypass Giovanni and use on of our other data download methods, some of which can be scripted.
Re: Inquiry About Automated Data Download Using Giovanni Software
Dear NASA staff,
I have just prepared a draft to download the data of interest for my research using AI and Python. The points.csv file
has three fields with the following headers: ID (number of record considered), latitude, and longitude. You may insert
a couple of points for testing purposes.
Please let me know if this approach is feasible and/or how it could be improved to avoid overwhelming your servers.
Thanks for your constant guidance.
# python program
# requirements.txt
# requests==2.31.0
# pandas==2.0.3
# netCDF4==1.6.4
# matplotlib==3.7.1
# xarray==2023.10.0
import requests
import os
import tempfile
import pandas as pd
import netCDF4 as nc
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor
# NASA Earthdata credentials
USERNAME = "your_username"
PASSWORD = "your_password"
# NASA Earthdata Search API base URL
BASE_URL = "https://api.earthdata.nasa.gov/search/granules"
# Function to authenticate and get an access token
def get_access_token(username, password):
auth_url = "https://urs.earthdata.nasa.gov/oauth/token"
headers = {"Content-Type": "application/x-www-form-urlencoded"}
data = {
"client_id": "app",
"username": username,
"password": password,
"grant_type": "password",
}
response = requests.post(auth_url, headers=headers, data=data)
if response.status_code == 200:
return response.json().get("access_token")
else:
print("Authentication failed:", response.text)
return None
# Function to query metadata for granules
def query_metadata(query, token, start_date, end_date):
headers = {"Authorization": f"Bearer {token}"}
params = {
"keyword": query,
"page_size": 100, # Number of results to return
"page_num": 1, # Page number
"temporal": f"{start_date}T00:00:00Z,{end_date}T23:59:59Z",
}
response = requests.get(BASE_URL, headers=headers, params=params)
if response.status_code == 200:
return response.json()
else:
print("Search failed:", response.text)
return None
# Function to download a granule
def download_granule(url, token, temp_dir):
headers = {"Authorization": f"Bearer {token}"}
response = requests.get(url, headers=headers, stream=True)
if response.status_code == 200:
filename = os.path.join(temp_dir, url.split("/")[-1])
with open(filename, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
print(f"Downloaded: {filename}")
return filename
else:
print("Download failed:", response.text)
return None
# Function to process a granule
def process_granule(granule_path, lat, lon):
with nc.Dataset(granule_path) as ncfile:
lats = ncfile.variables['latitude'][:]
lons = ncfile.variables['longitude'][:]
no2_var = ncfile.variables['ColumnAmountNO2'][:]
lat_idx = (abs(lats - lat)).argmin()
lon_idx = (abs(lons - lon)).argmin()
no2_value = no2_var[lat_idx, lon_idx]
return no2_value
# Main script
if __name__ == "__main__":
# Authenticate and get access token
token = get_access_token(USERNAME, PASSWORD)
if not token:
print("Failed to authenticate. Exiting.")
exit()
# Load coordinates from CSV
points_df = pd.read_csv("points.csv")
latitudes = points_df['latitude'].tolist()
longitudes = points_df['longitude'].tolist()
point_ids = points_df['ID'].tolist() # Assuming your CSV has an 'ID' column
# Define time period
start_date = "2004-01-01"
end_date = datetime.now().strftime("%Y-%m-%d")
# Query metadata for OMNO2d_003 granules
query = "OMNO2d_003"
metadata = query_metadata(query, token, start_date, end_date)
if metadata:
print(f"Found {len(metadata.get('feed', {}).get('entry', []))} granules.")
# Use a temporary directory for caching
with tempfile.TemporaryDirectory() as temp_dir:
# Process each point individually
for point_id, lat, lon in zip(point_ids, latitudes, longitudes):
print(f"Processing point ID: {point_id} (Lat: {lat}, Lon: {lon})")
no2_values = []
# Download and process granules for the current point
with ThreadPoolExecutor(max_workers=5) as executor:
futures = []
for granule in metadata.get("feed", {}).get("entry", []):
granule_url = granule.get("links", [{}])[0].get("href")
if granule_url:
futures.append(executor.submit(download_granule, granule_url, token, temp_dir))
for future in futures:
granule_path = future.result()
if granule_path:
no2_value = process_granule(granule_path, lat, lon)
no2_values.append(no2_value)
# Calculate average NO2 concentration for the current point
if no2_values:
avg_no2 = sum(no2_values) / len(no2_values)
# Save results to a CSV file named ID_<point_id>.csv
output_file = f"ID_{point_id}.csv"
with open(output_file, "w") as f:
f.write(f"ID,Latitude,Longitude,Date,NO2_Concentration\n")
f.write(f"{point_id},{lat},{lon},{datetime.now().strftime('%Y-%m-%d')},{avg_no2}\n")
print(f"Results for point ID {point_id} saved to {output_file}")
else:
print(f"No NO2 values extracted for point ID: {point_id}")
I have just prepared a draft to download the data of interest for my research using AI and Python. The points.csv file
has three fields with the following headers: ID (number of record considered), latitude, and longitude. You may insert
a couple of points for testing purposes.
Please let me know if this approach is feasible and/or how it could be improved to avoid overwhelming your servers.
Thanks for your constant guidance.
# python program
# requirements.txt
# requests==2.31.0
# pandas==2.0.3
# netCDF4==1.6.4
# matplotlib==3.7.1
# xarray==2023.10.0
import requests
import os
import tempfile
import pandas as pd
import netCDF4 as nc
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor
# NASA Earthdata credentials
USERNAME = "your_username"
PASSWORD = "your_password"
# NASA Earthdata Search API base URL
BASE_URL = "https://api.earthdata.nasa.gov/search/granules"
# Function to authenticate and get an access token
def get_access_token(username, password):
auth_url = "https://urs.earthdata.nasa.gov/oauth/token"
headers = {"Content-Type": "application/x-www-form-urlencoded"}
data = {
"client_id": "app",
"username": username,
"password": password,
"grant_type": "password",
}
response = requests.post(auth_url, headers=headers, data=data)
if response.status_code == 200:
return response.json().get("access_token")
else:
print("Authentication failed:", response.text)
return None
# Function to query metadata for granules
def query_metadata(query, token, start_date, end_date):
headers = {"Authorization": f"Bearer {token}"}
params = {
"keyword": query,
"page_size": 100, # Number of results to return
"page_num": 1, # Page number
"temporal": f"{start_date}T00:00:00Z,{end_date}T23:59:59Z",
}
response = requests.get(BASE_URL, headers=headers, params=params)
if response.status_code == 200:
return response.json()
else:
print("Search failed:", response.text)
return None
# Function to download a granule
def download_granule(url, token, temp_dir):
headers = {"Authorization": f"Bearer {token}"}
response = requests.get(url, headers=headers, stream=True)
if response.status_code == 200:
filename = os.path.join(temp_dir, url.split("/")[-1])
with open(filename, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
print(f"Downloaded: {filename}")
return filename
else:
print("Download failed:", response.text)
return None
# Function to process a granule
def process_granule(granule_path, lat, lon):
with nc.Dataset(granule_path) as ncfile:
lats = ncfile.variables['latitude'][:]
lons = ncfile.variables['longitude'][:]
no2_var = ncfile.variables['ColumnAmountNO2'][:]
lat_idx = (abs(lats - lat)).argmin()
lon_idx = (abs(lons - lon)).argmin()
no2_value = no2_var[lat_idx, lon_idx]
return no2_value
# Main script
if __name__ == "__main__":
# Authenticate and get access token
token = get_access_token(USERNAME, PASSWORD)
if not token:
print("Failed to authenticate. Exiting.")
exit()
# Load coordinates from CSV
points_df = pd.read_csv("points.csv")
latitudes = points_df['latitude'].tolist()
longitudes = points_df['longitude'].tolist()
point_ids = points_df['ID'].tolist() # Assuming your CSV has an 'ID' column
# Define time period
start_date = "2004-01-01"
end_date = datetime.now().strftime("%Y-%m-%d")
# Query metadata for OMNO2d_003 granules
query = "OMNO2d_003"
metadata = query_metadata(query, token, start_date, end_date)
if metadata:
print(f"Found {len(metadata.get('feed', {}).get('entry', []))} granules.")
# Use a temporary directory for caching
with tempfile.TemporaryDirectory() as temp_dir:
# Process each point individually
for point_id, lat, lon in zip(point_ids, latitudes, longitudes):
print(f"Processing point ID: {point_id} (Lat: {lat}, Lon: {lon})")
no2_values = []
# Download and process granules for the current point
with ThreadPoolExecutor(max_workers=5) as executor:
futures = []
for granule in metadata.get("feed", {}).get("entry", []):
granule_url = granule.get("links", [{}])[0].get("href")
if granule_url:
futures.append(executor.submit(download_granule, granule_url, token, temp_dir))
for future in futures:
granule_path = future.result()
if granule_path:
no2_value = process_granule(granule_path, lat, lon)
no2_values.append(no2_value)
# Calculate average NO2 concentration for the current point
if no2_values:
avg_no2 = sum(no2_values) / len(no2_values)
# Save results to a CSV file named ID_<point_id>.csv
output_file = f"ID_{point_id}.csv"
with open(output_file, "w") as f:
f.write(f"ID,Latitude,Longitude,Date,NO2_Concentration\n")
f.write(f"{point_id},{lat},{lon},{datetime.now().strftime('%Y-%m-%d')},{avg_no2}\n")
print(f"Results for point ID {point_id} saved to {output_file}")
else:
print(f"No NO2 values extracted for point ID: {point_id}")
-
- Subject Matter Expert
- Posts: 22
- Joined: Wed Feb 16, 2022 4:38 pm America/New_York
- Has thanked: 1 time
Re: Inquiry About Automated Data Download Using Giovanni Software
Hello,
I'm Chris Battisto from the GES DISC User Needs team-may I offer a few suggestions regarding your code approach:
First, your data download section is fine, but can be shortened and made more efficient through the use of the "earthaccess" Python library. This adaptation of your existing code will authenticate, search, and download the granules using this library. Additionally, it has built-in multiprocessing, and will use a default value of 8 workers for downloading into your temporary directory:
import earthaccess
import os
import tempfile
# Authenticate using Earthdata credentials
session = earthaccess.login()
# Define search parameters
start_date = "2004-10-01"
end_date = "2004-10-05" # You may want to split the end date into chunks, rather than the full 20 years in one run
collection = "OMNO2d"
version= "003"
# Search for granules
print("Searching for granules...")
granules = earthaccess.search_data(
short_name=collection,
version=version,
temporal=(start_date, end_date)
)
print(f"Found {len(granules)} granules.")
# Create a temporary directory for downloaded files
with tempfile.TemporaryDirectory() as temp_dir:
print(f"Using temporary directory: {temp_dir}")
for granule in granules:
try:
earthaccess.download(granules, temp_dir) # Downloads all granules searched
except Exception as e:
print(f"Failed to download granule: {e}")
print("All granules have been downloaded.")
For your analysis portion, your existing code will correctly extract NO2 values for a single point and store the single averaged value over all granules for the current date, but this is not the same as the Time-Series Averaged Area service, where a spatio-temporal area is specified rather than a single point, and averages are calculated over that area for each time step.
I'm Chris Battisto from the GES DISC User Needs team-may I offer a few suggestions regarding your code approach:
First, your data download section is fine, but can be shortened and made more efficient through the use of the "earthaccess" Python library. This adaptation of your existing code will authenticate, search, and download the granules using this library. Additionally, it has built-in multiprocessing, and will use a default value of 8 workers for downloading into your temporary directory:
import earthaccess
import os
import tempfile
# Authenticate using Earthdata credentials
session = earthaccess.login()
# Define search parameters
start_date = "2004-10-01"
end_date = "2004-10-05" # You may want to split the end date into chunks, rather than the full 20 years in one run
collection = "OMNO2d"
version= "003"
# Search for granules
print("Searching for granules...")
granules = earthaccess.search_data(
short_name=collection,
version=version,
temporal=(start_date, end_date)
)
print(f"Found {len(granules)} granules.")
# Create a temporary directory for downloaded files
with tempfile.TemporaryDirectory() as temp_dir:
print(f"Using temporary directory: {temp_dir}")
for granule in granules:
try:
earthaccess.download(granules, temp_dir) # Downloads all granules searched
except Exception as e:
print(f"Failed to download granule: {e}")
print("All granules have been downloaded.")
For your analysis portion, your existing code will correctly extract NO2 values for a single point and store the single averaged value over all granules for the current date, but this is not the same as the Time-Series Averaged Area service, where a spatio-temporal area is specified rather than a single point, and averages are calculated over that area for each time step.
Re: Inquiry About Automated Data Download Using Giovanni Software
Hello,
I would like to wish you a happy new year!
According to your hints, I have just refined the program with the aid of artificial intelligence.
Since I have 797 points that I want to observe from 2004-01-01 up to now, I plan to divide the download into chunks of months, loading all the coordinates into memory.
Is this approach feasible?
If so, how could it be distributed over days to avoid stressing your servers? Do you think there's an even more efficient approach?
I am looking forward to your reply.
Thanks for your constant guidance.
import earthaccess
import os
import tempfile
import pandas as pd
import netCDF4 as nc
from datetime import datetime
from shapely.geometry import box
from concurrent.futures import ThreadPoolExecutor
import numpy as np
# Define bounding box for Italy
italy_bbox = box(6.6, 36.6, 18.5, 47.1) # Approximate bounding box for Italy
def granule_overlaps_italy(granule):
"""Check if a granule overlaps with Italy's bounding box."""
granule_bbox = box(
granule['bounding_box']['west'],
granule['bounding_box']['south'],
granule['bounding_box']['east'],
granule['bounding_box']['north']
)
return italy_bbox.intersects(granule_bbox)
def load_granule_data(granule_path):
"""Load granule data (latitude, longitude, NO2, time) into memory."""
with nc.Dataset(granule_path) as ncfile:
lats = ncfile.variables['latitude'][:]
lons = ncfile.variables['longitude'][:]
no2_var = ncfile.variables['ColumnAmountNO2'][:]
time = ncfile.variables['time'][:] # Assuming the time variable exists
return lats, lons, no2_var, time
def extract_no2_value(lats, lons, no2_var, lat, lon):
"""Extract the NO2 value for a specific point."""
lat_idx = (abs(lats - lat)).argmin()
lon_idx = (abs(lons - lon)).argmin()
return no2_var[lat_idx, lon_idx]
def process_point(point_id, lat, lon, granules, temp_dir):
"""Process a single point to extract NO2 values for each granule."""
print(f"Processing point ID: {point_id} (Lat: {lat}, Lon: {lon})")
time_series = []
# Process each granule for the current point
for granule in granules:
granule_path = os.path.join(temp_dir, os.path.basename(granule['links'][0]['href']))
if os.path.exists(granule_path):
lats, lons, no2_var, time = load_granule_data(granule_path)
no2_value = extract_no2_value(lats, lons, no2_var, lat, lon)
time_series.append((time[0], no2_value)) # Use the first time value for the granule
# Save time series to a CSV file for the current point
output_file = f"ID_{point_id}_time_series.csv"
with open(output_file, "w") as f:
f.write("Time,NO2_Value\n")
for time, no2_value in time_series:
f.write(f"{time},{no2_value}\n")
print(f"Time series for point ID {point_id} saved to {output_file}")
# Main script
if __name__ == "__main__":
# Authenticate using Earthdata credentials
session = earthaccess.login()
# Load coordinates from CSV
points_df = pd.read_csv("points.csv")
latitudes = points_df['latitude'].tolist()
longitudes = points_df['longitude'].tolist()
point_ids = points_df['ID'].tolist() # Assuming your CSV has an 'ID' column
# Define time period (one month for testing)
start_date = "2004-10-01"
end_date = "2004-10-31"
# Search for granules
print("Searching for granules...")
granules = earthaccess.search_data(
short_name="OMNO2d",
version="003",
temporal=(start_date, end_date)
)
# Filter granules to only those overlapping Italy
granules = [g for g in granules if granule_overlaps_italy(g)]
print(f"Found {len(granules)} granules overlapping Italy.")
# Use a temporary directory for caching
with tempfile.TemporaryDirectory() as temp_dir:
print(f"Using temporary directory: {temp_dir}")
# Download all granules
try:
earthaccess.download(granules, temp_dir)
except Exception as e:
print(f"Failed to download granules: {e}")
# Process points in parallel
with ThreadPoolExecutor(max_workers=8) as executor: # Adjust max_workers based on your system
futures = [
executor.submit(process_point, point_id, lat, lon, granules, temp_dir)
for point_id, lat, lon in zip(point_ids, latitudes, longitudes)
]
for future in futures:
future.result() # Wait for all tasks to complete
print("All granules and points have been processed.")
I would like to wish you a happy new year!
According to your hints, I have just refined the program with the aid of artificial intelligence.
Since I have 797 points that I want to observe from 2004-01-01 up to now, I plan to divide the download into chunks of months, loading all the coordinates into memory.
Is this approach feasible?
If so, how could it be distributed over days to avoid stressing your servers? Do you think there's an even more efficient approach?
I am looking forward to your reply.
Thanks for your constant guidance.
import earthaccess
import os
import tempfile
import pandas as pd
import netCDF4 as nc
from datetime import datetime
from shapely.geometry import box
from concurrent.futures import ThreadPoolExecutor
import numpy as np
# Define bounding box for Italy
italy_bbox = box(6.6, 36.6, 18.5, 47.1) # Approximate bounding box for Italy
def granule_overlaps_italy(granule):
"""Check if a granule overlaps with Italy's bounding box."""
granule_bbox = box(
granule['bounding_box']['west'],
granule['bounding_box']['south'],
granule['bounding_box']['east'],
granule['bounding_box']['north']
)
return italy_bbox.intersects(granule_bbox)
def load_granule_data(granule_path):
"""Load granule data (latitude, longitude, NO2, time) into memory."""
with nc.Dataset(granule_path) as ncfile:
lats = ncfile.variables['latitude'][:]
lons = ncfile.variables['longitude'][:]
no2_var = ncfile.variables['ColumnAmountNO2'][:]
time = ncfile.variables['time'][:] # Assuming the time variable exists
return lats, lons, no2_var, time
def extract_no2_value(lats, lons, no2_var, lat, lon):
"""Extract the NO2 value for a specific point."""
lat_idx = (abs(lats - lat)).argmin()
lon_idx = (abs(lons - lon)).argmin()
return no2_var[lat_idx, lon_idx]
def process_point(point_id, lat, lon, granules, temp_dir):
"""Process a single point to extract NO2 values for each granule."""
print(f"Processing point ID: {point_id} (Lat: {lat}, Lon: {lon})")
time_series = []
# Process each granule for the current point
for granule in granules:
granule_path = os.path.join(temp_dir, os.path.basename(granule['links'][0]['href']))
if os.path.exists(granule_path):
lats, lons, no2_var, time = load_granule_data(granule_path)
no2_value = extract_no2_value(lats, lons, no2_var, lat, lon)
time_series.append((time[0], no2_value)) # Use the first time value for the granule
# Save time series to a CSV file for the current point
output_file = f"ID_{point_id}_time_series.csv"
with open(output_file, "w") as f:
f.write("Time,NO2_Value\n")
for time, no2_value in time_series:
f.write(f"{time},{no2_value}\n")
print(f"Time series for point ID {point_id} saved to {output_file}")
# Main script
if __name__ == "__main__":
# Authenticate using Earthdata credentials
session = earthaccess.login()
# Load coordinates from CSV
points_df = pd.read_csv("points.csv")
latitudes = points_df['latitude'].tolist()
longitudes = points_df['longitude'].tolist()
point_ids = points_df['ID'].tolist() # Assuming your CSV has an 'ID' column
# Define time period (one month for testing)
start_date = "2004-10-01"
end_date = "2004-10-31"
# Search for granules
print("Searching for granules...")
granules = earthaccess.search_data(
short_name="OMNO2d",
version="003",
temporal=(start_date, end_date)
)
# Filter granules to only those overlapping Italy
granules = [g for g in granules if granule_overlaps_italy(g)]
print(f"Found {len(granules)} granules overlapping Italy.")
# Use a temporary directory for caching
with tempfile.TemporaryDirectory() as temp_dir:
print(f"Using temporary directory: {temp_dir}")
# Download all granules
try:
earthaccess.download(granules, temp_dir)
except Exception as e:
print(f"Failed to download granules: {e}")
# Process points in parallel
with ThreadPoolExecutor(max_workers=8) as executor: # Adjust max_workers based on your system
futures = [
executor.submit(process_point, point_id, lat, lon, granules, temp_dir)
for point_id, lat, lon in zip(point_ids, latitudes, longitudes)
]
for future in futures:
future.result() # Wait for all tasks to complete
print("All granules and points have been processed.")
- Attachments
-
- points.pdf
- It is a pdf version version of the selected coordinates
- (39.68 KiB) Downloaded 8135 times
-
- Subject Matter Expert
- Posts: 22
- Joined: Wed Feb 16, 2022 4:38 pm America/New_York
- Has thanked: 1 time
Re: Inquiry About Automated Data Download Using Giovanni Software
Hello,
Your approach is feasible, and since you would be accessing the data from cloud servers (the default source for earthaccess), the risk of overloading our services is minimal. However, we do impose rate limiting on unusually high requests, so we do recommend splitting large requests into several smaller ones as you plan to do.
Chris
Your approach is feasible, and since you would be accessing the data from cloud servers (the default source for earthaccess), the risk of overloading our services is minimal. However, we do impose rate limiting on unusually high requests, so we do recommend splitting large requests into several smaller ones as you plan to do.
Chris
Re: Inquiry About Automated Data Download Using Giovanni Software
You can automate data retrieval using Giovanni's API or other data access tools if supported. Python libraries like `requests` can handle API calls, and `pandas` can process the data. Check Giovanni's documentation for API endpoints or consider contacting their support for programmatic access options.
Re: Inquiry About Automated Data Download Using Giovanni Software
Hello,
I've tested two Python programs to download data for 797 selected points over the course of one month. The first
program downloads the data for the selected month into a dedicated sub-folder on my local machine. The second program
uses the offline data generated by the first program to extract daily data for the selected points in Italy.
Since you impose rate limiting on unusually high requests, I wanted to check with you: is it okay if I download one
month's worth of data, split into daily chunks? What is the maximum number of days' worth of data that I can download
in a single day?
Once I've processed the selected month's data into CSV format, I plan to erase the HE5 files from the local sub-folder
to free up disk space. This would allow me to download a full year's worth of data in 12 days, which is necessary for
my time series analysis.
I'm excited to analyze this data and see what insights I can gain. I've included the code for both programs below
(generated with the aid of artificial intelligence).
Looking forward to your reply.
# pip install -r requirements.txt
requirements.txt
aiobotocore==2.17.0
aiohappyeyeballs==2.4.4
aiohttp==3.11.11
aioitertools==0.12.0
aiosignal==1.3.2
async-timeout==5.0.1
attrs==24.3.0
botocore==1.35.93
bounded-pool-executor==0.0.3
certifi==2024.12.14
cftime==1.6.4.post1
charset-normalizer==3.4.1
contourpy==1.3.1
cycler==0.12.1
earthaccess==0.12.0
fonttools==4.55.3
frozenlist==1.5.0
fsspec==2024.12.0
h5py==3.12.1
idna==3.10
importlib_resources==6.5.2
jmespath==1.0.1
kiwisolver==1.4.8
matplotlib==3.7.1
mpi4py==4.0.1
multidict==6.1.0
multimethod==2.0
netCDF4==1.7.2
numpy==2.2.1
packaging==24.2
pandas==2.2.3
pillow==11.1.0
pqdm==0.2.0
propcache==0.2.1
pyparsing==3.2.1
python-cmr==0.13.0
python-dateutil==2.9.0.post0
pytz==2024.2
requests==2.31.0
s3fs==2024.12.0
shapely==2.0.6
six==1.17.0
tinynetrc==1.3.1
tqdm==4.67.1
typing_extensions==4.12.2
tzdata==2024.2
urllib3==2.3.0
wrapt==1.17.1
xarray==2023.10.0
yarl==1.18.3
#download_granules.py
import earthaccess
import os
import logging
# Configure logging
logging.basicConfig(
level=logging.INFO, # Set the logging level (INFO, DEBUG, WARNING, ERROR, CRITICAL)
format="%(asctime)s - %(levelname)s - %(message)s", # Format of the log messages
handlers=[
logging.FileHandler("download_log.txt"), # Log to a file
logging.StreamHandler() # Log to the terminal
]
)
def download_granules(start_date, end_date, download_dir):
"""Download granules for the specified time period."""
logging.info("Authenticating with Earthdata...") # Debug
username = os.getenv("EARTHDATA_USERNAME")
password = os.getenv("EARTHDATA_PASSWORD")
if not username or not password:
raise ValueError("Please set EARTHDATA_USERNAME and EARTHDATA_PASSWORD environment variables.")
session = earthaccess.login(username, password)
# Search for granules
logging.info("Searching for granules...") # Debug
granules = earthaccess.search_data(
short_name="OMNO2d",
version="003",
temporal=(start_date, end_date)
)
logging.info(f"Found {len(granules)} granules.") # Debug
# Download only new granules
new_granules = []
for granule in granules:
# Find the URL with the "GET DATA" type
related_urls = granule['umm']['RelatedUrls']
download_url = None
for url_info in related_urls:
if url_info['Type'] == 'GET DATA':
download_url = url_info['URL']
break
if not download_url:
logging.warning(f"Skipping granule {granule['meta']['native-id']}: No download URL found.") # Debug
continue
# Construct the expected filename
granule_filename = os.path.basename(download_url)
granule_path = os.path.join(download_dir, granule_filename)
# Check if the file already exists
if not os.path.exists(granule_path):
new_granules.append(granule) # Add to the list of granules to download
logging.info(f"Adding granule {granule_filename} to download list.") # Debug
else:
logging.info(f"Skipping download: {granule_filename} already exists in {download_dir}.") # Debug
# Download new granules
if new_granules:
logging.info(f"Downloading {len(new_granules)} new granules...") # Debug
try:
earthaccess.download(new_granules, download_dir)
except Exception as e:
logging.error(f"Failed to download granules: {e}") # Debug
else:
logging.info("All granules already downloaded. Skipping download.") # Debug
# Main script
if __name__ == "__main__":
# Define time period (one month for testing)
start_date = "2024-10-01"
end_date = "2024-10-31"
logging.info(f"Time period: {start_date} to {end_date}") # Debug
# Create a persistent download directory
download_dir = "downloads"
os.makedirs(download_dir, exist_ok=True)
logging.info(f"Download directory: {download_dir}") # Debug
# Download granules
download_granules(start_date, end_date, download_dir)
It follows the second local program using mpi
# process_granules_mpiv2.py ..run it on 20 cores mpirun -np 20 python process_granules_mpiv2.py
import os
import pandas as pd
import numpy as np
import h5py
from datetime import datetime
import logging
from mpi4py import MPI # Use MPI for parallel processing
from tqdm import tqdm # For progress bar
# Configure logging
logging.basicConfig(
level=logging.INFO, # Set the logging level (INFO, DEBUG, WARNING, ERROR, CRITICAL)
format="%(asctime)s - %(levelname)s - %(message)s", # Format of the log messages
handlers=[
logging.FileHandler("process_log.txt"), # Log to a file
logging.StreamHandler() # Log to the terminal
]
)
# Initialize MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank() # Rank of the current process
size = comm.Get_size() # Total number of processes
def extract_time_from_filename(filename):
"""Extract the date from the filename, ignoring the version number (e.g., v003)."""
try:
# Split the filename by underscores
parts = filename.split("_")
if len(parts) < 4:
raise ValueError(f"Filename {filename} does not match expected format.")
# Extract the date part (e.g., "2024m1008")
date_part = parts[2] # The date is in the third part (index 2)
print("here you are datepart:")
print(date_part)
# Validate the date string format
if len(date_part) != 9 or not date_part[4] == 'm':
raise ValueError(f"Date part {date_part} in filename {filename} is invalid.")
# Extract year, month, and day
year = int(date_part[:4]) # First 4 characters: year
month = int(date_part[5:7]) # Characters 5-6: month
day = int(date_part[7:9]) # Characters 7-8: day
print(year)
print(month)
print(day)
# Validate month
if month < 1 or month > 12:
raise ValueError(f"Invalid month {month} in filename {filename}.")
# Validate day based on month
if month in {4, 6, 9, 11} and day > 30: # Months with 30 days
raise ValueError(f"Invalid day {day} for month {month} in filename {filename}.")
elif month == 2: # February
is_leap_year = (year % 4 == 0 and year % 100 != 0) or (year % 400 == 0)
if (is_leap_year and day > 29) or (not is_leap_year and day > 28):
raise ValueError(f"Invalid day {day} for February {year} in filename {filename}.")
elif day > 31: # All other months
raise ValueError(f"Invalid day {day} for month {month} in filename {filename}.")
# Format month and day with zero-padding
formatted_date = f"{year}-{month:02d}-{day:02d}"
logging.info(f"Extracted date from filename {filename}: {formatted_date}") # Debug
return datetime(year, month, day)
except Exception as e:
logging.error(f"Error extracting time from filename {filename}: {e}")
return None
def get_units_from_dataset(h5file, dataset_path):
"""Extract the units from a dataset in an HDF5 file."""
try:
dataset = h5file[dataset_path]
if 'units' in dataset.attrs:
return dataset.attrs['units'].decode('utf-8') # Decode if necessary
elif 'Units' in dataset.attrs:
return dataset.attrs['Units'].decode('utf-8')
else:
return "Units not specified"
except KeyError:
return "Dataset not found"
def load_granule_data(granule_path):
"""Load granule data (latitude, longitude, NO2, time) into memory."""
try:
logging.info(f"Loading granule: {granule_path}") # Debug
with h5py.File(granule_path, 'r') as h5file:
grid_group = h5file['HDFEOS/GRIDS/ColumnAmountNO2']
# Validate grid metadata
try:
# Parse GridSpacing
grid_spacing_attr = grid_group.attrs['GridSpacing']
if isinstance(grid_spacing_attr, np.ndarray):
grid_spacing_str = grid_spacing_attr[0].decode('utf-8').strip('()')
else:
grid_spacing_str = grid_spacing_attr.decode('utf-8').strip('()')
grid_spacing_values = [float(x.strip()) for x in grid_spacing_str.split(',')]
grid_spacing = grid_spacing_values[0] # Assuming single value
# Parse NumberOfLatitudesInGrid
num_lats_attr = grid_group.attrs['NumberOfLatitudesInGrid']
if isinstance(num_lats_attr, np.ndarray):
num_lats = int(num_lats_attr[0])
else:
num_lats = int(num_lats_attr)
# Parse NumberOfLongitudesInGrid
num_lons_attr = grid_group.attrs['NumberOfLongitudesInGrid']
if isinstance(num_lons_attr, np.ndarray):
num_lons = int(num_lons_attr[0])
else:
num_lons = int(num_lons_attr)
except Exception as e:
logging.error(f"Error parsing grid metadata: {e}")
return None, None, None, None
# Parse GridSpan into numerical values
try:
grid_span_attr = grid_group.attrs['GridSpan']
if isinstance(grid_span_attr, np.ndarray):
grid_span_str = grid_span_attr[0].decode('utf-8').strip('()')
else:
grid_span_str = grid_span_attr.decode('utf-8').strip('()')
grid_span_values = [float(x.strip()) for x in grid_span_str.split(',')]
if len(grid_span_values) != 4:
raise ValueError("GridSpan does not contain exactly 4 values.")
lon_min, lon_max, lat_min, lat_max = grid_span_values
except Exception as e:
logging.error(f"Error parsing GridSpan: {e}")
return None, None, None, None
# Reconstruct latitude and longitude arrays
lats = np.linspace(lat_min, lat_max, num_lats)
lons = np.linspace(lon_min, lon_max, num_lons)
# Extract NO2 data
no2_var = grid_group['Data Fields/ColumnAmountNO2'][:]
# Extract time from the filename
filename = os.path.basename(granule_path)
time = extract_time_from_filename(filename)
if time is None:
time = datetime(1970, 1, 1) # Default time if extraction fails
logging.warning(f"Using default time for granule {filename}") # Debug
# Extract and log units
units = get_units_from_dataset(h5file, 'HDFEOS/GRIDS/ColumnAmountNO2/Data Fields/ColumnAmountNO2')
logging.info(f"Units for NO2 data: {units}")
return lats, lons, no2_var, time
except Exception as e:
logging.error(f"Error loading granule {granule_path}: {e}")
return None, None, None, None
def extract_no2_value(lats, lons, no2_var, lat, lon):
"""Extract the NO2 value for a specific point."""
lat_idx = (abs(lats - lat)).argmin()
lon_idx = (abs(lons - lon)).argmin()
no2_value = no2_var[lat_idx, lon_idx]
# Replace negative values with NaN
if no2_value < 0:
no2_value = np.nan
logging.info(f"Extracted NO2 value at lat={lat}, lon={lon}: {no2_value}") # Debug
return no2_value
def process_granule(granule_path, lat, lon):
"""Process a single granule to extract NO2 values for a specific point."""
lats, lons, no2_var, time = load_granule_data(granule_path)
if lats is not None and lons is not None and no2_var is not None and time is not None:
no2_value = extract_no2_value(lats, lons, no2_var, lat, lon)
return time, no2_value
return None, None
def process_point(point_id, lat, lon, granule_paths, output_dir, start_date, end_date):
"""Process a single point to extract NO2 values for each granule."""
logging.info(f"Processing point ID: {point_id} (Lat: {lat}, Lon: {lon})") # Debug
time_series = []
# Convert start_date and end_date to datetime objects
start_date_dt = datetime.strptime(start_date, "%Y-%m-%d")
end_date_dt = datetime.strptime(end_date, "%Y-%m-%d")
# Distribute granules across MPI processes
for i in range(rank, len(granule_paths), size):
granule_path = granule_paths[i]
result = process_granule(granule_path, lat, lon)
if result[0] is not None:
# Filter data to include only entries within the specified time period
if start_date_dt <= result[0] <= end_date_dt:
time_series.append(result)
# Gather results from all processes
all_time_series = comm.gather(time_series, root=0)
# Save results (only rank 0 writes to file)
if rank == 0:
# Flatten the list of results
time_series = [item for sublist in all_time_series for item in sublist]
# Sort the time series by time
time_series.sort(key=lambda x: x[0])
# Remove duplicate entries based on the date
unique_time_series = []
seen_dates = set()
for time, no2_value in time_series:
date_str = time.strftime('%Y-%m-%d')
if date_str not in seen_dates:
seen_dates.add(date_str)
unique_time_series.append((time, no2_value))
# Get units from the first granule
with h5py.File(granule_paths[0], "r") as h5file:
units = get_units_from_dataset(h5file, 'HDFEOS/GRIDS/ColumnAmountNO2/Data Fields/ColumnAmountNO2')
# Save time series to a CSV file for the current point
output_file = os.path.join(output_dir, f"ID_{point_id}_time_series.csv")
with open(output_file, "w") as f:
f.write(f"Time,NO2_Value ({units})\n") # Include units in the header
for time, no2_value in unique_time_series:
f.write(f"{time.strftime('%Y-%m-%d')},{no2_value}\n") # Format time as YYYY-MM-DD
logging.info(f"Time series for point ID {point_id} saved to {output_file}") # Debug
# Main script
if __name__ == "__main__":
# Only rank 0 loads the CSV and distributes work
if rank == 0:
# Load coordinates from CSV
logging.info("Loading points from CSV...") # Debug
points_df = pd.read_csv("points.csv")
logging.info(f"Columns in CSV file: {points_df.columns.tolist()}") # Debug
# Dynamically retrieve the correct spelling of latitude and longitude from the CSV header
latitude_col = None
longitude_col = None
for col in points_df.columns:
if "lat" in col.lower():
latitude_col = col
if "lon" in col.lower():
longitude_col = col
if not latitude_col or not longitude_col:
raise ValueError("Could not find latitude and longitude columns in the CSV file.")
logging.info(f"Using '{latitude_col}' as latitude and '{longitude_col}' as longitude.") # Debug
# Extract latitude and longitude data
latitudes = points_df[latitude_col].tolist()
longitudes = points_df[longitude_col].tolist()
# Generate IDs starting from 1
point_ids = [i + 1 for i in points_df.index.tolist()] # Start IDs from 1
logging.info(f"Processing {len(point_ids)} points...") # Debug
# Define time period (one month for testing)
start_date = "2024-10-01"
end_date = "2024-10-31"
logging.info(f"Time period: {start_date} to {end_date}") # Debug
# Extract year and month from start_date
year = datetime.strptime(start_date, "%Y-%m-%d").strftime("%Y")
month = datetime.strptime(start_date, "%Y-%m-%d").strftime("%m")
# Create output directory based on year and month
output_dir = os.path.join(year, month)
os.makedirs(output_dir, exist_ok=True) # Create the directory if it doesn't exist
logging.info(f"Output directory: {output_dir}") # Debug
# Create a persistent download directory
download_dir = "downloads"
os.makedirs(download_dir, exist_ok=True)
logging.info(f"Download directory: {download_dir}") # Debug
# Get list of downloaded granules
granule_paths = [os.path.join(download_dir, f) for f in os.listdir(download_dir) if f.endswith(".he5")]
logging.info(f"Found {len(granule_paths)} granules in {download_dir}.") # Debug
# Log units from the first granule
if granule_paths:
with h5py.File(granule_paths[0], "r") as h5file:
units = get_units_from_dataset(h5file, 'HDFEOS/GRIDS/ColumnAmountNO2/Data Fields/ColumnAmountNO2')
logging.info(f"Units for NO2 data: {units}")
else:
# Initialize variables for other ranks
point_ids, latitudes, longitudes, granule_paths, output_dir, start_date, end_date = None, None, None, None, None, None, None
# Broadcast data from rank 0 to all other ranks
point_ids = comm.bcast(point_ids, root=0)
latitudes = comm.bcast(latitudes, root=0)
longitudes = comm.bcast(longitudes, root=0)
granule_paths = comm.bcast(granule_paths, root=0)
output_dir = comm.bcast(output_dir, root=0)
start_date = comm.bcast(start_date, root=0)
end_date = comm.bcast(end_date, root=0)
# Process points in parallel
for point_id, lat, lon in zip(point_ids, latitudes, longitudes):
process_point(point_id, lat, lon, granule_paths, output_dir, start_date, end_date)
if rank == 0:
logging.info("All granules and points have been processed.") # Debug
I've tested two Python programs to download data for 797 selected points over the course of one month. The first
program downloads the data for the selected month into a dedicated sub-folder on my local machine. The second program
uses the offline data generated by the first program to extract daily data for the selected points in Italy.
Since you impose rate limiting on unusually high requests, I wanted to check with you: is it okay if I download one
month's worth of data, split into daily chunks? What is the maximum number of days' worth of data that I can download
in a single day?
Once I've processed the selected month's data into CSV format, I plan to erase the HE5 files from the local sub-folder
to free up disk space. This would allow me to download a full year's worth of data in 12 days, which is necessary for
my time series analysis.
I'm excited to analyze this data and see what insights I can gain. I've included the code for both programs below
(generated with the aid of artificial intelligence).
Looking forward to your reply.
# pip install -r requirements.txt
requirements.txt
aiobotocore==2.17.0
aiohappyeyeballs==2.4.4
aiohttp==3.11.11
aioitertools==0.12.0
aiosignal==1.3.2
async-timeout==5.0.1
attrs==24.3.0
botocore==1.35.93
bounded-pool-executor==0.0.3
certifi==2024.12.14
cftime==1.6.4.post1
charset-normalizer==3.4.1
contourpy==1.3.1
cycler==0.12.1
earthaccess==0.12.0
fonttools==4.55.3
frozenlist==1.5.0
fsspec==2024.12.0
h5py==3.12.1
idna==3.10
importlib_resources==6.5.2
jmespath==1.0.1
kiwisolver==1.4.8
matplotlib==3.7.1
mpi4py==4.0.1
multidict==6.1.0
multimethod==2.0
netCDF4==1.7.2
numpy==2.2.1
packaging==24.2
pandas==2.2.3
pillow==11.1.0
pqdm==0.2.0
propcache==0.2.1
pyparsing==3.2.1
python-cmr==0.13.0
python-dateutil==2.9.0.post0
pytz==2024.2
requests==2.31.0
s3fs==2024.12.0
shapely==2.0.6
six==1.17.0
tinynetrc==1.3.1
tqdm==4.67.1
typing_extensions==4.12.2
tzdata==2024.2
urllib3==2.3.0
wrapt==1.17.1
xarray==2023.10.0
yarl==1.18.3
#download_granules.py
import earthaccess
import os
import logging
# Configure logging
logging.basicConfig(
level=logging.INFO, # Set the logging level (INFO, DEBUG, WARNING, ERROR, CRITICAL)
format="%(asctime)s - %(levelname)s - %(message)s", # Format of the log messages
handlers=[
logging.FileHandler("download_log.txt"), # Log to a file
logging.StreamHandler() # Log to the terminal
]
)
def download_granules(start_date, end_date, download_dir):
"""Download granules for the specified time period."""
logging.info("Authenticating with Earthdata...") # Debug
username = os.getenv("EARTHDATA_USERNAME")
password = os.getenv("EARTHDATA_PASSWORD")
if not username or not password:
raise ValueError("Please set EARTHDATA_USERNAME and EARTHDATA_PASSWORD environment variables.")
session = earthaccess.login(username, password)
# Search for granules
logging.info("Searching for granules...") # Debug
granules = earthaccess.search_data(
short_name="OMNO2d",
version="003",
temporal=(start_date, end_date)
)
logging.info(f"Found {len(granules)} granules.") # Debug
# Download only new granules
new_granules = []
for granule in granules:
# Find the URL with the "GET DATA" type
related_urls = granule['umm']['RelatedUrls']
download_url = None
for url_info in related_urls:
if url_info['Type'] == 'GET DATA':
download_url = url_info['URL']
break
if not download_url:
logging.warning(f"Skipping granule {granule['meta']['native-id']}: No download URL found.") # Debug
continue
# Construct the expected filename
granule_filename = os.path.basename(download_url)
granule_path = os.path.join(download_dir, granule_filename)
# Check if the file already exists
if not os.path.exists(granule_path):
new_granules.append(granule) # Add to the list of granules to download
logging.info(f"Adding granule {granule_filename} to download list.") # Debug
else:
logging.info(f"Skipping download: {granule_filename} already exists in {download_dir}.") # Debug
# Download new granules
if new_granules:
logging.info(f"Downloading {len(new_granules)} new granules...") # Debug
try:
earthaccess.download(new_granules, download_dir)
except Exception as e:
logging.error(f"Failed to download granules: {e}") # Debug
else:
logging.info("All granules already downloaded. Skipping download.") # Debug
# Main script
if __name__ == "__main__":
# Define time period (one month for testing)
start_date = "2024-10-01"
end_date = "2024-10-31"
logging.info(f"Time period: {start_date} to {end_date}") # Debug
# Create a persistent download directory
download_dir = "downloads"
os.makedirs(download_dir, exist_ok=True)
logging.info(f"Download directory: {download_dir}") # Debug
# Download granules
download_granules(start_date, end_date, download_dir)
It follows the second local program using mpi
# process_granules_mpiv2.py ..run it on 20 cores mpirun -np 20 python process_granules_mpiv2.py
import os
import pandas as pd
import numpy as np
import h5py
from datetime import datetime
import logging
from mpi4py import MPI # Use MPI for parallel processing
from tqdm import tqdm # For progress bar
# Configure logging
logging.basicConfig(
level=logging.INFO, # Set the logging level (INFO, DEBUG, WARNING, ERROR, CRITICAL)
format="%(asctime)s - %(levelname)s - %(message)s", # Format of the log messages
handlers=[
logging.FileHandler("process_log.txt"), # Log to a file
logging.StreamHandler() # Log to the terminal
]
)
# Initialize MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank() # Rank of the current process
size = comm.Get_size() # Total number of processes
def extract_time_from_filename(filename):
"""Extract the date from the filename, ignoring the version number (e.g., v003)."""
try:
# Split the filename by underscores
parts = filename.split("_")
if len(parts) < 4:
raise ValueError(f"Filename {filename} does not match expected format.")
# Extract the date part (e.g., "2024m1008")
date_part = parts[2] # The date is in the third part (index 2)
print("here you are datepart:")
print(date_part)
# Validate the date string format
if len(date_part) != 9 or not date_part[4] == 'm':
raise ValueError(f"Date part {date_part} in filename {filename} is invalid.")
# Extract year, month, and day
year = int(date_part[:4]) # First 4 characters: year
month = int(date_part[5:7]) # Characters 5-6: month
day = int(date_part[7:9]) # Characters 7-8: day
print(year)
print(month)
print(day)
# Validate month
if month < 1 or month > 12:
raise ValueError(f"Invalid month {month} in filename {filename}.")
# Validate day based on month
if month in {4, 6, 9, 11} and day > 30: # Months with 30 days
raise ValueError(f"Invalid day {day} for month {month} in filename {filename}.")
elif month == 2: # February
is_leap_year = (year % 4 == 0 and year % 100 != 0) or (year % 400 == 0)
if (is_leap_year and day > 29) or (not is_leap_year and day > 28):
raise ValueError(f"Invalid day {day} for February {year} in filename {filename}.")
elif day > 31: # All other months
raise ValueError(f"Invalid day {day} for month {month} in filename {filename}.")
# Format month and day with zero-padding
formatted_date = f"{year}-{month:02d}-{day:02d}"
logging.info(f"Extracted date from filename {filename}: {formatted_date}") # Debug
return datetime(year, month, day)
except Exception as e:
logging.error(f"Error extracting time from filename {filename}: {e}")
return None
def get_units_from_dataset(h5file, dataset_path):
"""Extract the units from a dataset in an HDF5 file."""
try:
dataset = h5file[dataset_path]
if 'units' in dataset.attrs:
return dataset.attrs['units'].decode('utf-8') # Decode if necessary
elif 'Units' in dataset.attrs:
return dataset.attrs['Units'].decode('utf-8')
else:
return "Units not specified"
except KeyError:
return "Dataset not found"
def load_granule_data(granule_path):
"""Load granule data (latitude, longitude, NO2, time) into memory."""
try:
logging.info(f"Loading granule: {granule_path}") # Debug
with h5py.File(granule_path, 'r') as h5file:
grid_group = h5file['HDFEOS/GRIDS/ColumnAmountNO2']
# Validate grid metadata
try:
# Parse GridSpacing
grid_spacing_attr = grid_group.attrs['GridSpacing']
if isinstance(grid_spacing_attr, np.ndarray):
grid_spacing_str = grid_spacing_attr[0].decode('utf-8').strip('()')
else:
grid_spacing_str = grid_spacing_attr.decode('utf-8').strip('()')
grid_spacing_values = [float(x.strip()) for x in grid_spacing_str.split(',')]
grid_spacing = grid_spacing_values[0] # Assuming single value
# Parse NumberOfLatitudesInGrid
num_lats_attr = grid_group.attrs['NumberOfLatitudesInGrid']
if isinstance(num_lats_attr, np.ndarray):
num_lats = int(num_lats_attr[0])
else:
num_lats = int(num_lats_attr)
# Parse NumberOfLongitudesInGrid
num_lons_attr = grid_group.attrs['NumberOfLongitudesInGrid']
if isinstance(num_lons_attr, np.ndarray):
num_lons = int(num_lons_attr[0])
else:
num_lons = int(num_lons_attr)
except Exception as e:
logging.error(f"Error parsing grid metadata: {e}")
return None, None, None, None
# Parse GridSpan into numerical values
try:
grid_span_attr = grid_group.attrs['GridSpan']
if isinstance(grid_span_attr, np.ndarray):
grid_span_str = grid_span_attr[0].decode('utf-8').strip('()')
else:
grid_span_str = grid_span_attr.decode('utf-8').strip('()')
grid_span_values = [float(x.strip()) for x in grid_span_str.split(',')]
if len(grid_span_values) != 4:
raise ValueError("GridSpan does not contain exactly 4 values.")
lon_min, lon_max, lat_min, lat_max = grid_span_values
except Exception as e:
logging.error(f"Error parsing GridSpan: {e}")
return None, None, None, None
# Reconstruct latitude and longitude arrays
lats = np.linspace(lat_min, lat_max, num_lats)
lons = np.linspace(lon_min, lon_max, num_lons)
# Extract NO2 data
no2_var = grid_group['Data Fields/ColumnAmountNO2'][:]
# Extract time from the filename
filename = os.path.basename(granule_path)
time = extract_time_from_filename(filename)
if time is None:
time = datetime(1970, 1, 1) # Default time if extraction fails
logging.warning(f"Using default time for granule {filename}") # Debug
# Extract and log units
units = get_units_from_dataset(h5file, 'HDFEOS/GRIDS/ColumnAmountNO2/Data Fields/ColumnAmountNO2')
logging.info(f"Units for NO2 data: {units}")
return lats, lons, no2_var, time
except Exception as e:
logging.error(f"Error loading granule {granule_path}: {e}")
return None, None, None, None
def extract_no2_value(lats, lons, no2_var, lat, lon):
"""Extract the NO2 value for a specific point."""
lat_idx = (abs(lats - lat)).argmin()
lon_idx = (abs(lons - lon)).argmin()
no2_value = no2_var[lat_idx, lon_idx]
# Replace negative values with NaN
if no2_value < 0:
no2_value = np.nan
logging.info(f"Extracted NO2 value at lat={lat}, lon={lon}: {no2_value}") # Debug
return no2_value
def process_granule(granule_path, lat, lon):
"""Process a single granule to extract NO2 values for a specific point."""
lats, lons, no2_var, time = load_granule_data(granule_path)
if lats is not None and lons is not None and no2_var is not None and time is not None:
no2_value = extract_no2_value(lats, lons, no2_var, lat, lon)
return time, no2_value
return None, None
def process_point(point_id, lat, lon, granule_paths, output_dir, start_date, end_date):
"""Process a single point to extract NO2 values for each granule."""
logging.info(f"Processing point ID: {point_id} (Lat: {lat}, Lon: {lon})") # Debug
time_series = []
# Convert start_date and end_date to datetime objects
start_date_dt = datetime.strptime(start_date, "%Y-%m-%d")
end_date_dt = datetime.strptime(end_date, "%Y-%m-%d")
# Distribute granules across MPI processes
for i in range(rank, len(granule_paths), size):
granule_path = granule_paths[i]
result = process_granule(granule_path, lat, lon)
if result[0] is not None:
# Filter data to include only entries within the specified time period
if start_date_dt <= result[0] <= end_date_dt:
time_series.append(result)
# Gather results from all processes
all_time_series = comm.gather(time_series, root=0)
# Save results (only rank 0 writes to file)
if rank == 0:
# Flatten the list of results
time_series = [item for sublist in all_time_series for item in sublist]
# Sort the time series by time
time_series.sort(key=lambda x: x[0])
# Remove duplicate entries based on the date
unique_time_series = []
seen_dates = set()
for time, no2_value in time_series:
date_str = time.strftime('%Y-%m-%d')
if date_str not in seen_dates:
seen_dates.add(date_str)
unique_time_series.append((time, no2_value))
# Get units from the first granule
with h5py.File(granule_paths[0], "r") as h5file:
units = get_units_from_dataset(h5file, 'HDFEOS/GRIDS/ColumnAmountNO2/Data Fields/ColumnAmountNO2')
# Save time series to a CSV file for the current point
output_file = os.path.join(output_dir, f"ID_{point_id}_time_series.csv")
with open(output_file, "w") as f:
f.write(f"Time,NO2_Value ({units})\n") # Include units in the header
for time, no2_value in unique_time_series:
f.write(f"{time.strftime('%Y-%m-%d')},{no2_value}\n") # Format time as YYYY-MM-DD
logging.info(f"Time series for point ID {point_id} saved to {output_file}") # Debug
# Main script
if __name__ == "__main__":
# Only rank 0 loads the CSV and distributes work
if rank == 0:
# Load coordinates from CSV
logging.info("Loading points from CSV...") # Debug
points_df = pd.read_csv("points.csv")
logging.info(f"Columns in CSV file: {points_df.columns.tolist()}") # Debug
# Dynamically retrieve the correct spelling of latitude and longitude from the CSV header
latitude_col = None
longitude_col = None
for col in points_df.columns:
if "lat" in col.lower():
latitude_col = col
if "lon" in col.lower():
longitude_col = col
if not latitude_col or not longitude_col:
raise ValueError("Could not find latitude and longitude columns in the CSV file.")
logging.info(f"Using '{latitude_col}' as latitude and '{longitude_col}' as longitude.") # Debug
# Extract latitude and longitude data
latitudes = points_df[latitude_col].tolist()
longitudes = points_df[longitude_col].tolist()
# Generate IDs starting from 1
point_ids = [i + 1 for i in points_df.index.tolist()] # Start IDs from 1
logging.info(f"Processing {len(point_ids)} points...") # Debug
# Define time period (one month for testing)
start_date = "2024-10-01"
end_date = "2024-10-31"
logging.info(f"Time period: {start_date} to {end_date}") # Debug
# Extract year and month from start_date
year = datetime.strptime(start_date, "%Y-%m-%d").strftime("%Y")
month = datetime.strptime(start_date, "%Y-%m-%d").strftime("%m")
# Create output directory based on year and month
output_dir = os.path.join(year, month)
os.makedirs(output_dir, exist_ok=True) # Create the directory if it doesn't exist
logging.info(f"Output directory: {output_dir}") # Debug
# Create a persistent download directory
download_dir = "downloads"
os.makedirs(download_dir, exist_ok=True)
logging.info(f"Download directory: {download_dir}") # Debug
# Get list of downloaded granules
granule_paths = [os.path.join(download_dir, f) for f in os.listdir(download_dir) if f.endswith(".he5")]
logging.info(f"Found {len(granule_paths)} granules in {download_dir}.") # Debug
# Log units from the first granule
if granule_paths:
with h5py.File(granule_paths[0], "r") as h5file:
units = get_units_from_dataset(h5file, 'HDFEOS/GRIDS/ColumnAmountNO2/Data Fields/ColumnAmountNO2')
logging.info(f"Units for NO2 data: {units}")
else:
# Initialize variables for other ranks
point_ids, latitudes, longitudes, granule_paths, output_dir, start_date, end_date = None, None, None, None, None, None, None
# Broadcast data from rank 0 to all other ranks
point_ids = comm.bcast(point_ids, root=0)
latitudes = comm.bcast(latitudes, root=0)
longitudes = comm.bcast(longitudes, root=0)
granule_paths = comm.bcast(granule_paths, root=0)
output_dir = comm.bcast(output_dir, root=0)
start_date = comm.bcast(start_date, root=0)
end_date = comm.bcast(end_date, root=0)
# Process points in parallel
for point_id, lat, lon in zip(point_ids, latitudes, longitudes):
process_point(point_id, lat, lon, granule_paths, output_dir, start_date, end_date)
if rank == 0:
logging.info("All granules and points have been processed.") # Debug