air_quality / functions.py
jdowling's picture
Upload functions.py
3e03447
import os
import datetime
import time
import requests
import pandas as pd
import json
from geopy.geocoders import Nominatim
def convert_date_to_unix(x):
"""
Convert datetime to unix time in milliseconds.
"""
dt_obj = datetime.datetime.strptime(str(x), '%Y-%m-%d %H:%M:%S')
dt_obj = int(dt_obj.timestamp() * 1000)
return dt_obj
def get_city_coordinates(city_name: str):
"""
Takes city name and returns its latitude and longitude (rounded to 2 digits after dot).
"""
# Initialize Nominatim API (for getting lat and long of the city)
geolocator = Nominatim(user_agent="MyApp")
city = geolocator.geocode(city_name)
latitude = round(city.latitude, 2)
longitude = round(city.longitude, 2)
return latitude, longitude
##################################### EEA
def convert_to_daily(df, pollutant: str):
"""
Returns DataFrame where pollutant column is resampled to days and rounded.
"""
res_df = df.copy()
# convert dates in 'time' column
res_df["date"] = pd.to_datetime(res_df["date"])
# I want data daily, not hourly (mean per each day = 1 datarow per 1 day)
res_df = res_df.set_index('date')
res_df = res_df[pollutant].resample('1d').mean().reset_index()
res_df[pollutant] = res_df[pollutant].fillna(res_df[pollutant].median())
res_df[pollutant] = res_df[pollutant].apply(lambda x: round(x, 0))
return res_df
def find_fullest_csv(csv_links: list, year: str):
candidates = [link for link in csv_links if str(year) in link]
biggest_df = pd.read_csv(candidates[0])
for link in candidates[1:]:
_df = pd.read_csv(link)
if len(biggest_df) < len(_df):
biggest_df = _df
return biggest_df
def get_air_quality_from_eea(city_name: str,
pollutant: str,
start_year: str,
end_year: str):
"""
Takes city name, daterange and returns pandas DataFrame with daily air quality data.
It parses data by 1-year batches, so please specify years, not dates. (example: "2014", "2022"...)
EEA means European Environmental Agency. So it has data for Europe Union countries ONLY.
"""
start_of_cell = time.time()
params = {
'CountryCode': '',
'CityName': city_name,
'Pollutant': pollutant.upper(),
'Year_from': start_year,
'Year_to': end_year,
'Station': '',
'Source': 'All',
'Samplingpoint': '',
'Output': 'TEXT',
'UpdateDate': '',
'TimeCoverage': 'Year'
}
# observations endpoint
base_url = "https://fme.discomap.eea.europa.eu/fmedatastreaming/AirQualityDownload/AQData_Extract.fmw?"
try:
response = requests.get(base_url, params=params)
except ConnectionError:
response = requests.get(base_url, params=params)
response.encoding = response.apparent_encoding
csv_links = response.text.split("\r\n")
res_df = pd.DataFrame()
target_year = int(start_year)
for year in range(int(start_year), int(end_year) + 1):
try:
# find the fullest, the biggest csv file with observations for this particular year
_df = find_fullest_csv(csv_links, year)
# append it to res_df
res_df = pd.concat([res_df, _df])
except IndexError:
print(f"!! Missing data for {year} for {city} city.")
pass
pollutant = pollutant.lower()
if pollutant == "pm2.5":
pollutant = "pm2_5"
res_df = res_df.rename(columns={
'DatetimeBegin': 'date',
'Concentration': pollutant
})
# cut timezones info
res_df['date'] = res_df['date'].apply(lambda x: x[:-6])
# convert dates in 'time' column
res_df['date'] = pd.to_datetime(res_df['date'])
res_df = convert_to_daily(res_df, pollutant)
res_df['city_name'] = city_name
res_df = res_df[['city_name', 'date', pollutant.lower()]]
end_of_cell = time.time()
print(f"Processed {pollutant.upper()} for {city_name} since {start_year} till {end_year}.")
print(f"Took {round(end_of_cell - start_of_cell, 2)} sec.\n")
return res_df
##################################### USEPA
city_code_dict = {}
pollutant_dict = {
'CO': '42101',
'SO2': '42401',
'NO2': '42602',
'O3': '44201',
'PM10': '81102',
'PM2.5': '88101'
}
def get_city_code(city_name: str):
"Encodes city name to be used later for data parsing using USEPA."
if city_code_dict:
city_full = [i for i in city_code_dict.keys() if city_name in i][0]
return city_code_dict[city_full]
else:
params = {
"email": "test@aqs.api",
"key": "test"
}
response = requests.get("https://aqs.epa.gov/data/api/list/cbsas?", params)
response_json = response.json()
data = response_json["Data"]
for item in data:
city_code_dict[item['value_represented']] = item['code']
return get_city_code(city_name)
def get_air_quality_from_usepa(city_name: str,
pollutant: str,
start_date: str,
end_date: str):
"""
Takes city name, daterange and returns pandas DataFrame with daily air quality data.
USEPA means United States Environmental Protection Agency. So it has data for US ONLY.
"""
start_of_cell = time.time()
res_df = pd.DataFrame()
for start_date_, end_date_ in make_date_intervals(start_date, end_date):
params = {
"email": "test@aqs.api",
"key": "test",
"param": pollutant_dict[pollutant.upper().replace("_", ".")], # encoded pollutant
"bdate": start_date_,
"edate": end_date_,
"cbsa": get_city_code(city_name) # Core-based statistical area
}
# observations endpoint
base_url = "https://aqs.epa.gov/data/api/dailyData/byCBSA?"
response = requests.get(base_url, params=params)
response_json = response.json()
df_ = pd.DataFrame(response_json["Data"])
pollutant = pollutant.lower()
if pollutant == "pm2.5":
pollutant = "pm2_5"
df_ = df_.rename(columns={
'date_local': 'date',
'arithmetic_mean': pollutant
})
# convert dates in 'date' column
df_['date'] = pd.to_datetime(df_['date'])
df_['city_name'] = city_name
df_ = df_[['city_name', 'date', pollutant]]
res_df = pd.concat([res_df, df_])
# there are duplicated rows (several records for the same day and station). get rid of it.
res_df = res_df.groupby(['date', 'city_name'], as_index=False)[pollutant].mean()
res_df[pollutant] = round(res_df[pollutant], 1)
end_of_cell = time.time()
print(f"Processed {pollutant.upper()} for {city_name} since {start_date} till {end_date}.")
print(f"Took {round(end_of_cell - start_of_cell, 2)} sec.\n")
return res_df
def make_date_intervals(start_date, end_date):
start_dt = datetime.datetime.strptime(start_date, '%Y-%m-%d')
end_dt = datetime.datetime.strptime(end_date, '%Y-%m-%d')
date_intervals = []
for year in range(start_dt.year, end_dt.year + 1):
year_start = datetime.datetime(year, 1, 1)
year_end = datetime.datetime(year, 12, 31)
interval_start = max(start_dt, year_start)
interval_end = min(end_dt, year_end)
if interval_start < interval_end:
date_intervals.append((interval_start.strftime('%Y%m%d'), interval_end.strftime('%Y%m%d')))
return date_intervals
##################################### Weather Open Meteo
def get_weather_data_from_open_meteo(city_name: str,
start_date: str,
end_date: str,
coordinates: list = None,
forecast: bool = False):
"""
Takes [city name OR coordinates] and returns pandas DataFrame with weather data.
Examples of arguments:
coordinates=(47.755, -122.2806), start_date="2023-01-01"
"""
start_of_cell = time.time()
if coordinates:
latitude, longitude = coordinates
else:
latitude, longitude = get_city_coordinates(city_name=city_name)
params = {
'latitude': latitude,
'longitude': longitude,
'daily': ["temperature_2m_max", "temperature_2m_min",
"precipitation_sum", "rain_sum", "snowfall_sum",
"precipitation_hours", "windspeed_10m_max",
"windgusts_10m_max", "winddirection_10m_dominant"],
'start_date': start_date,
'end_date': end_date,
'timezone': "Europe/London"
}
if forecast:
# historical forecast endpoint
base_url = 'https://api.open-meteo.com/v1/forecast'
else:
# historical observations endpoint
base_url = 'https://archive-api.open-meteo.com/v1/archive'
try:
response = requests.get(base_url, params=params)
except ConnectionError:
response = requests.get(base_url, params=params)
response_json = response.json()
res_df = pd.DataFrame(response_json["daily"])
res_df["city_name"] = city_name
# rename columns
res_df = res_df.rename(columns={
"time": "date",
"temperature_2m_max": "temperature_max",
"temperature_2m_min": "temperature_min",
"windspeed_10m_max": "wind_speed_max",
"winddirection_10m_dominant": "wind_direction_dominant",
"windgusts_10m_max": "wind_gusts_max"
})
# change columns order
res_df = res_df[
['city_name', 'date', 'temperature_max', 'temperature_min',
'precipitation_sum', 'rain_sum', 'snowfall_sum',
'precipitation_hours', 'wind_speed_max',
'wind_gusts_max', 'wind_direction_dominant']
]
# convert dates in 'date' column
res_df["date"] = pd.to_datetime(res_df["date"])
end_of_cell = time.time()
print(f"Parsed weather for {city_name} since {start_date} till {end_date}.")
print(f"Took {round(end_of_cell - start_of_cell, 2)} sec.\n")
return res_df
##################################### Air Quality data from Open Meteo
def get_aqi_data_from_open_meteo(city_name: str,
start_date: str,
end_date: str,
coordinates: list = None,
pollutant: str = "pm2_5"):
"""
Takes [city name OR coordinates] and returns pandas DataFrame with AQI data.
Examples of arguments:
...
coordinates=(47.755, -122.2806),
start_date="2023-01-01",
pollutant="no2"
...
"""
start_of_cell = time.time()
if coordinates:
latitude, longitude = coordinates
else:
latitude, longitude = get_city_coordinates(city_name=city_name)
pollutant = pollutant.lower()
if pollutant == "pm2.5":
pollutant = "pm2_5"
# make it work with both "no2" and "nitrogen_dioxide" passed.
if pollutant == "no2":
pollutant = "nitrogen_dioxide"
params = {
'latitude': latitude,
'longitude': longitude,
'hourly': [pollutant],
'start_date': start_date,
'end_date': end_date,
'timezone': "Europe/London"
}
# base endpoint
base_url = "https://air-quality-api.open-meteo.com/v1/air-quality"
try:
response = requests.get(base_url, params=params)
except ConnectionError:
response = requests.get(base_url, params=params)
response_json = response.json()
res_df = pd.DataFrame(response_json["hourly"])
# convert dates
res_df["time"] = pd.to_datetime(res_df["time"])
# resample to days
res_df = res_df.groupby(res_df['time'].dt.date).mean(numeric_only=True).reset_index()
res_df[pollutant] = round(res_df[pollutant], 1)
# rename columns
res_df = res_df.rename(columns={
"time": "date"
})
res_df["city_name"] = city_name
# change columns order
res_df = res_df[
['city_name', 'date', pollutant]
]
end_of_cell = time.time()
print(f"Processed {pollutant.upper()} for {city_name} since {start_date} till {end_date}.")
print(f"Took {round(end_of_cell - start_of_cell, 2)} sec.\n")
return res_df