"""Implements utility methods for interacting with ACN-data and GMMs.
When run on its own, this script downloads the default data found in
sustaingym/data/evcharging/acn_data.
"""
from __future__ import annotations
from collections.abc import Iterator
from datetime import timedelta, datetime
import os
import pickle
from typing import Any, Literal
import acnportal.acndata as acnd
import acnportal.acnsim as acns
import numpy as np
import pandas as pd
import pytz
import sklearn.mixture as mixture
from sustaingym.data.utils import read_csv, read_to_bytesio
# API Token for ACN-Data
[docs]
API_TOKEN = 'DEMO_TOKEN'
# Folder name when creating new GMMs
# Timezones for converting charging events in ACN-Data
[docs]
AM_LA = pytz.timezone('America/Los_Angeles')
[docs]
GMT = pytz.timezone('GMT')
# Datetime formatting for printing and API call
[docs]
ONE_DAY = timedelta(days=1)
# Normalization constant for while fitting GMMs
# Start and end dates for real traces usage in simulations
[docs]
START_DATE = datetime(2018, 11, 1, tzinfo=AM_LA)
[docs]
END_DATE = datetime(2021, 8, 31, tzinfo=AM_LA)
# Default date ranges
[docs]
DEFAULT_DATE_RANGES = (
('2019-05-01', '2019-08-31'),
('2019-09-01', '2019-12-31'),
('2020-02-01', '2020-05-31'),
('2021-05-01', '2021-08-31'),
)
# Mapping between name of default period to dates
[docs]
DEFAULT_PERIOD_TO_RANGE = {
'Summer 2019': DEFAULT_DATE_RANGES[0],
'Pre-COVID-19 Summer': DEFAULT_DATE_RANGES[0],
'Fall 2019': DEFAULT_DATE_RANGES[1],
'Pre-COVID-19 Fall': DEFAULT_DATE_RANGES[1],
'Spring 2020': DEFAULT_DATE_RANGES[2],
'In-COVID-19': DEFAULT_DATE_RANGES[2],
'Summer 2021': DEFAULT_DATE_RANGES[3],
'Post-COVID-19': DEFAULT_DATE_RANGES[3],
}
# String typing definitions
[docs]
DefaultPeriodStr = Literal['Summer 2019', 'Fall 2019', 'Spring 2020',
'Summer 2021', 'Pre-COVID-19 Summer',
'Pre-COVID-19 Fall', 'In-COVID-19', 'Post-COVID-19']
[docs]
SiteStr = Literal['caltech', 'jpl']
# Constants for storing pickled GMM model
[docs]
STATION_USAGE_KEY = 'station_usage'
[docs]
def to_la_dt(s: str) -> datetime:
"""Converts string '%Y-%m-%d' to datetime localized in LA Time."""
return datetime.strptime(s, DATE_FORMAT).replace(tzinfo=AM_LA)
[docs]
def site_str_to_site(site: SiteStr) -> acns.ChargingNetwork:
"""Returns charging network from string."""
if site == 'caltech':
return acns.network.sites.caltech_acn()
else:
return acns.network.sites.jpl_acn()
[docs]
def get_sessions(start_date: datetime, end_date: datetime,
site: SiteStr = 'caltech',
) -> Iterator[dict[str, Any]]:
"""Retrieves charging sessions using ACNData.
Args:
start_date: beginning time of interval. Only year, month, and day
are considered. The datetime is expected to be localized in
LA time, the timezone of the charging garages.
end_date: ending time of interval, exclusive. See ``start_date``.
site: 'caltech' or 'jpl'
Returns:
sessions: iterator of sessions with a connection time starting on
``start_date`` and ending the day before ``end_date``
Example::
fall2020_sessions = get_sessions(
datetime(2020, 9, 1), datetime(2020, 12, 1))
"""
start_date = start_date.replace(hour=0, minute=0, second=0).astimezone(GMT)
start_time = start_date.strftime(DT_STRING_FORMAT)
end_date = end_date.replace(hour=0, minute=0, second=0).astimezone(GMT)
end_time = end_date.strftime(DT_STRING_FORMAT)
cond = f'connectionTime>="{start_time}" and connectionTime<="{end_time}"'
data_client = acnd.DataClient(api_token=API_TOKEN)
return data_client.get_sessions(site, cond=cond)
[docs]
def fetch_real_events(start_date: datetime, end_date: datetime, site: SiteStr
) -> pd.DataFrame:
"""Returns a pandas DataFrame of charging events from ACN-Data.
See `get_sessions()` for arguments.
Returns:
events: DataFrame containing charging info
.. code:: none
arrival datetime64[ns, America/Los_Angeles]
departure datetime64[ns, America/Los_Angeles]
requested_energy (kWh) float64
delivered_energy (kWh) float64
station_id str
session_id str
estimated_departure datetime64[ns, America/Los_Angeles]
claimed bool
"""
print(f'Fetching {site} sessions from {start_date.strftime(DATE_FORMAT)} '
f'to {end_date.strftime(DATE_FORMAT)} from ACNData')
sessions = get_sessions(start_date, end_date, site=site)
# TODO(chris): find efficient way to convert JSON-like data to DataFrame
d: dict[str, list[Any]] = {}
d['arrival'] = []
d['departure'] = []
d['requested_energy (kWh)'] = []
d['delivered_energy (kWh)'] = []
d['station_id'] = []
d['session_id'] = []
d['estimated_departure'] = []
d['claimed'] = []
for session in sessions:
userInputs = session['userInputs']
d['arrival'].append(session['connectionTime'])
d['departure'].append(session['disconnectTime'])
if userInputs is None:
requested_energy = session['kWhDelivered']
est_depart_dt = session['disconnectTime']
claimed = False
else:
requested_energy = userInputs[0]['kWhRequested']
est_depart_time = userInputs[0]['requestedDeparture']
est_depart_dt = acnd.utils.parse_http_date(est_depart_time, GMT).astimezone(AM_LA)
claimed = True
d['requested_energy (kWh)'].append(requested_energy)
d['delivered_energy (kWh)'].append(session['kWhDelivered'])
d['station_id'].append(session['spaceID'])
d['session_id'].append(session['sessionID'])
d['estimated_departure'].append(est_depart_dt)
d['claimed'].append(claimed)
return pd.DataFrame(d)
[docs]
def get_real_events(start_date: datetime, end_date: datetime,
site: SiteStr) -> pd.DataFrame:
"""Returns a pandas DataFrame of charging events.
Either loads data from package or retrieves from ACN-Data.
See `fetch_real_events()` for arguments and return value, except function
is now inclusive of ``end_date``.
"""
# search in package
for date_range in DEFAULT_DATE_RANGES:
if to_la_dt(date_range[0]) <= start_date and end_date <= to_la_dt(date_range[1]) + ONE_DAY:
file_path = os.path.join(
'data', 'evcharging', 'acn_data', site,
f'{date_range[0]} {date_range[1]}.csv.gz')
df = read_csv(file_path, compression='gzip')
for col in ['arrival', 'departure', 'estimated_departure']:
df[col] = pd.to_datetime(df[col], utc=True).dt.tz_convert(AM_LA)
return df[(start_date <= df.arrival) & (df.arrival <= end_date + ONE_DAY)].copy()
# data not found in package, use API
return fetch_real_events(start_date, end_date + ONE_DAY, site)
[docs]
def get_model_name(begin: datetime, end: datetime, n_components: int) -> str:
"""Returns folder name for a trained GMM."""
start_str = begin.strftime(DATE_FORMAT)
end_str = end.strftime(DATE_FORMAT)
return f'{start_str} {end_str} {n_components}.pkl'
[docs]
def save_gmm_model(site: SiteStr, gmm: mixture.GaussianMixture,
cnt: np.ndarray, sid: np.ndarray, begin: datetime,
end: datetime, n_components: int) -> None:
"""Saves GMM (presumably trained) and other information to directory.
Args:
site: either 'caltech' or 'jpl'
gmm: trained Gaussian Mixture Model
cnt: a 1-D np.ndarray
session counts per day during date period, expected to have
the same length as the number of days, inclusive, in the date
period
sid: a 1-D np.ndarray
stations' usage counts for entire date period, expected to
have the same length as the number of stations in the network
begin: beginning of training period, for folder name
end: ending of training period, for folder name
n_components: number of GMM components
"""
# create directory as needed
save_dir = os.path.join(GMMS_DIR, site)
if not os.path.exists(save_dir):
print('Creating directory:', save_dir)
os.makedirs(save_dir, exist_ok=True)
# save gmm, session counts and station id usage
filename = get_model_name(begin, end, n_components)
save_path = os.path.join(save_dir, filename)
print(f'Saving to: {save_path}\n')
with open(save_path, 'wb') as f:
model = {GMM_KEY: gmm, COUNT_KEY: cnt, STATION_USAGE_KEY: sid}
pickle.dump(model, f)
[docs]
def load_gmm_model(site: SiteStr, begin: datetime, end: datetime,
n_components: int
) -> dict[str, np.ndarray | mixture.GaussianMixture]:
"""Load pickled GMM and other data from folder.
If searching for a custom model, searches relative to the current
working directory in ``GMMS_DIR``. If searching for a
default model, searches inside the data folder.
Args:
site: either 'caltech' or 'jpl'
begin: start date of date range GMM is trained in
end: end date of date range GMM is trained in
n_components: number of GMM components
Returns:
data: dict containing the following key-value pairs:
- 'gmm': mixture.GaussianMixture, trained gmm, date range and
components are specified on folder
- 'count': np.ndarray, session counts per day
- 'station_usage': np.ndarray, stations' usage counts for date range
"""
folder_path = os.path.join(GMMS_DIR, site)
filename = get_model_name(begin, end, n_components)
# search through custom folders
if os.path.exists(folder_path):
with open(os.path.join(folder_path, filename), 'rb') as f:
return pickle.load(f)
# search through default models
else:
mpath = os.path.join('data', 'evcharging', GMMS_DIR, site, filename)
bytesio = read_to_bytesio(mpath)
return pickle.load(bytesio)
[docs]
def round(arr: np.ndarray, thresh: float = 0.7) -> np.ndarray:
"""Round array values when decimal is above threshold.
Same as np.round if thresh = 0.5
Args:
arr: input array
thresh: decimal between 0 and 1
Returns:
rounded array
"""
# extract decimal component
dec = np.modf(arr)[0]
roundup = dec > thresh
return np.where(roundup, np.ceil(arr), np.floor(arr))
[docs]
def download_default_acndata() -> None:
"""Downloads default data from ACNData."""
print(DEFAULT_DATE_RANGES)
print(DATE_FORMAT)
for start, end in DEFAULT_DATE_RANGES:
for site in ('caltech', 'jpl'):
start_dt = datetime.strptime(start, DATE_FORMAT)
end_dt = datetime.strptime(end, DATE_FORMAT)
df = fetch_real_events(
start_dt, end_dt + timedelta(days=1), site=site) # type: ignore
fdir = os.path.join(
'sustaingym', 'data', 'evcharging', 'acn_data', site)
os.makedirs(fdir, exist_ok=True)
fname = f'{start} {end}.csv.gz'
fpath = os.path.join(fdir, fname)
df.to_csv(fpath, compression='gzip', index=False)
if __name__ == '__main__':
# download data from ACNData
download_default_acndata()