"""
This module implements trace generation for the EVCharging class.
Traces consist of EV plug-in and unplug events and marginal carbon emissions.
The module implements trace generation through the RealTraceGenerator and
ArtificialTraceGenerator classes, which generate traces either from real data
or from sampling from an artificial data model, respectively.
"""
from __future__ import annotations
from datetime import timedelta
import uuid
import acnportal.acnsim as acns
import numpy as np
import pandas as pd
import sklearn.mixture as mixture
from .train_gmm_model import create_gmms
from .utils import (COUNT_KEY, DATE_FORMAT, DEFAULT_PERIOD_TO_RANGE, GMM_KEY,
MINS_IN_DAY, REQ_ENERGY_SCALE, STATION_USAGE_KEY,
DefaultPeriodStr, SiteStr, get_real_events, load_gmm_model,
site_str_to_site, to_la_dt)
from sustaingym.data.load_moer import MOERLoader
[docs]
class AbstractTraceGenerator:
"""Abstract class for ``EventQueue`` generator.
Subclasses are expected to implement the methods `_create_events()` and
`__repr__()`.
Args:
site: garage to get events from, either 'caltech' or 'jpl'
date_period: either a pre-defined date period or a custom date period.
If custom, the input must be a 2-tuple of strings with both strings
in the format YYYY-MM-DD. Otherwise, should be a default period
string.
requested_energy_cap: max amount of requested energy allowed (kWh)
seed: seed for random sampling
Attributes:
site: either 'caltech' or 'jpl'
date_range_str: a 2-tuple of string elements describing date range to
generate from.
date_range: a 2-tuple of timezone-aware datetimes.
requested_energy_cap: maximum amount of requested energy allowed (kWh)
station_ids: list of strings of station identifiers
num_stations: number of charging stations at site
day: "day" of simulation, can be artificial
moer_loader: class for loading carbon emission rates data
rng: random number generator
"""
# Time step duration in minutes
# Each trace is one day (1440 minutes)
[docs]
MAX_STEPS_OF_TRACE = 288
# Maximum storage capacity of battery (kWh)
# Maximum charging power of battery (kW)
# CAISO Southern California Edison as balancing authority
[docs]
BA_CALTECH_JPL = 'SGIP_CAISO_SCE'
# Directory to MOER data
[docs]
MOER_DATA_DIR = 'sustaingym/data/moer'
def __init__(self,
site: SiteStr,
date_period: tuple[str, str] | DefaultPeriodStr,
requested_energy_cap: float = 100,
seed: int | None = None):
# Name of site, name of stations on site, and number of stations on site
self.site = site
self.station_ids = site_str_to_site(site).station_ids
self.num_stations = len(self.station_ids)
if isinstance(date_period, str):
# convert literal to actual date range
self.date_range_str = DEFAULT_PERIOD_TO_RANGE[date_period]
else:
self.date_range_str = date_period
# Convert strings to datetime objects
self.date_range = tuple(to_la_dt(s) for s in self.date_range_str)
# Number of days in date range used for sekecting random day
self.num_days_in_date_range = (self.date_range[1] - self.date_range[0]).days + 1
# Cap requested energy if it exceeds the maximum allowed
self.requested_energy_cap = requested_energy_cap
# Loader for marginal emissions data at the Caltech and JPL sites
self.moer_loader = MOERLoader(self.date_range[0], self.date_range[1],
self.BA_CALTECH_JPL, self.MOER_DATA_DIR)
# Internal random number generator
self.rng = np.random.default_rng(seed=seed)
[docs]
def site__repr__(self) -> str:
"""Returns string representation of site."""
if self.site == 'jpl':
site = 'JPL'
else:
site = self.site.capitalize()
return site + ' garage'
[docs]
def date_range__repr__(self) -> str:
"""Returns string representation of date range."""
return f'({self.date_range_str[0]} to {self.date_range_str[1]})'
def __repr__(self) -> str:
"""Returns string representation of generator object."""
raise NotImplementedError
def _update_day(self) -> None:
"""Randomly sets ``self.day`` to a day in the date range."""
self.day = self.date_range[0] + timedelta(days=self.rng.choice(self.num_days_in_date_range))
[docs]
def set_seed(self, seed: int | None) -> None:
"""Sets random seed to make sampling reproducible."""
self.rng = np.random.default_rng(seed=seed)
def _create_events(self) -> pd.DataFrame:
"""Creates a DataFrame of charging events for the current day.
Returns:
events: DataFrame containing charging info::
arrival int
departure int
requested_energy (kWh) float64
delivered_energy (kWh) float64
station_id str
session_id str
estimated_departure int
claimed bool
Notes:
The attributes ``arrival``, ``departure``, and
``estimated_departure`` must be integers representing the timestamp
during the day, which is the number of discrete periods that have
elapsed. The ``station_id`` attribute is expected to be included at
the site's charging network.
"""
raise NotImplementedError
[docs]
def get_event_queue(self) -> tuple[acns.EventQueue, list[acns.EV], int]:
"""Creates an ``EventQueue`` for the current day, and then updates the
day.
Sessions are added as Plugin events, and Recompute events are added
every period so that the algorithm can be continually called. Unplug
events are generated internally by the simulator and do not need to
be explicitly added.
Returns:
events: event queue of EV charging sessions
evs: list of all EVs in event queue
num_plugin: number of plug in events (not counting recompute events)
"""
samples = self._create_events()
non_recompute_timesteps = set()
events, evs = [], []
for i in range(len(samples)):
# Cap maximum requested energy
requested_energy = min(samples['requested_energy (kWh)'].iloc[i],
self.requested_energy_cap)
# Create battery with initial charge at a minimum zero
battery = acns.Linear2StageBattery(
capacity=self.BATTERY_CAPACITY,
init_charge=max(0, self.BATTERY_CAPACITY - requested_energy),
max_power=self.MAX_POWER)
# Create electric vehicle
ev = acns.EV(
arrival=samples['arrival'].iloc[i],
departure=samples['departure'].iloc[i],
requested_energy=requested_energy,
station_id=samples['station_id'].iloc[i],
session_id=samples['session_id'].iloc[i],
battery=battery,
estimated_departure=samples['estimated_departure'].iloc[i])
# Add PluginEvent and let the simulator take care of UnplugEvent
event = acns.PluginEvent(samples['arrival'].iloc[i], ev)
events.append(event)
evs.append(ev)
# Find timesteps where a recompute event is not necessary
non_recompute_timesteps.add(samples['arrival'].iloc[i])
num_plugin = len(events) # number of events before adding recompute events
# every timestep has an event - recompute if no EV events
for timestep in range(self.MAX_STEPS_OF_TRACE + 1):
# add recompute only if a timestep has no events
if timestep not in non_recompute_timesteps:
event = acns.RecomputeEvent(timestep)
events.append(event)
events = acns.EventQueue(events)
self._update_day()
return events, evs, num_plugin
[docs]
def get_moer(self) -> np.ndarray:
"""Retrieves MOER data from the `MOERLoader()`.
Returns:
data: array of shape (289, 37). The first column is the historical
MOER. The remaining columns are forecasts for the next 36
five-min time steps. Units kg CO2 per kWh. Rows are sorted
chronologically.
"""
return self.moer_loader.retrieve(self.day)
[docs]
class RealTraceGenerator(AbstractTraceGenerator):
"""Class for ``EventQueue`` generator using real traces from ACNData.
See `AbstractTraceGenerator` for more arguments and attributes
Args:
sequential: whether to draw simulated days sequentially from date
range or randomly
use_unclaimed: whether to use unclaimed sessions, which do not have
the "requested energy" or "estimated departure" attributes. If
True, the generator uses the energy delivered in the session and
the disconnect time in place of those attributes, eliminating
real-world uncertainty in user requests.
seed: if sequential, the seed determines which day to start on
Attributes:
sequential: whether to draw simulated days sequentially from date
range or randomly
use_unclaimed: whether to use unclaimed sessions, which do not have
the "requested energy" or "estimated departure" attributes. If
True, the generator uses the energy delivered in the session and
the disconnect time in place of those attributes, eliminating
real-world uncertainty in user requests.
"""
def __init__(self,
site: SiteStr,
date_period: tuple[str, str] | DefaultPeriodStr,
sequential: bool = True,
use_unclaimed: bool = False,
requested_energy_cap: float = 100,
seed: int | None = None):
super().__init__(site, date_period, requested_energy_cap, seed)
self.sequential = sequential
if sequential:
if seed is None:
seed = 0
self.set_seed(seed) # set day based on seed
else:
self._update_day() # pick a random day
self.use_unclaimed = use_unclaimed
# DataFrame of all events in date range
self.events_df = get_real_events(self.date_range[0], self.date_range[1], site)
def __repr__(self) -> str:
"""Returns string representation of RealTracesGenerator."""
return (f'Real trace generator for {self.site__repr__()} {self.date_range__repr__()}\n'
f'Sequential = {self.sequential}, Use unclaimed = {self.use_unclaimed}\n'
f'Current day: {self.day.strftime(DATE_FORMAT)}')
[docs]
def set_seed(self, seed: int | None) -> None:
"""If days are sequential, sets the day. Otherwise, seeds the random
number generator.
"""
if self.sequential:
if seed is None:
seed = 0
self.day = self.date_range[0] + timedelta(days=seed % self.num_days_in_date_range)
else:
super().set_seed(seed)
def _update_day(self) -> None:
"""Either increments day or randomly samples from date range."""
if self.sequential:
self.day += timedelta(days=1)
if self.day > self.date_range[1]: # cycle back when day has exceeded date range
self.day = self.date_range[0]
else:
super()._update_day() # pick a random day
def _create_events(self) -> pd.DataFrame:
"""Retrieves and filters real events from a given day.
See `_create_events()` in `AbstractTraceGenerator` for more info.
Returns:
df: DataFrame of real sessions with datetimes in terms of timestamps.
"""
df = self.events_df[(self.day <= self.events_df.arrival) &
(self.events_df.arrival < self.day + timedelta(days=1))]
if not self.use_unclaimed:
df = df[df['claimed']]
# remove sessions that are not in the set of station ids
df = df[df['station_id'].isin(self.station_ids)]
# if dataframe is empty, return before using dt attribute
if len(df) == 0:
return df.copy()
# remove sessions where estimated departure or departure is not the same day as arrival
max_depart = np.maximum(df['departure'], df['estimated_departure'])
mask = (self.day.day == max_depart.dt.day)
df = df[mask]
# if dataframe is empty, return before using dt attribute
if len(df) == 0:
return df.copy()
# convert arrival, departure, estimated departure to timestamps
for col in ['arrival', 'departure', 'estimated_departure']:
df[col] = (df[col].dt.hour * 60 + df[col].dt.minute) // self.TIME_STEP_DURATION
# remove sessions with estimated departure before connection
df = df[df['estimated_departure'] > df['arrival']]
return df.copy()
[docs]
class GMMsTraceGenerator(AbstractTraceGenerator):
"""Class for ``EventQueue`` generator by sampling from trained GMMs.
See `AbstractTraceGenerator` for more arguments and attributes
Args:
site: garage to get events from, either 'caltech' or 'jpl'
date_period: either a pre-defined date period or a
custom date period. If custom, the input must be a 2-tuple
of strings with both strings in the format YYYY-MM-DD.
Otherwise, should be a default period string.
n_components: number of components in GMM
requested_energy_cap: max amount of requested energy allowed (kWh)
seed: seed for random sampling
Attributes:
n_components: int, number of components in use for GMM
gmm: sklearn.mixture.GaussianMixture, models sessions distribution
cnt: np.ndarray, shape [num_days], empirical distribution for number of
sessions on each day
station_usage: np.ndarray, shape [num_stations], total number of
sessions during interval for each station
Notes about saved GMMs
.. code:: none
default gmm directory: in package sustaingym/data/evcharging/gmms
gmms
|----caltech
| |---2019-05-01 2019-08-31 30.pkl
| |---2019-09-01 2019-12-31 30.pkl
| |---2020-02-01 2020-05-31 30.pkl
| |---2021-05-01 2021-08-31 30.pkl
|----jpl
| |---2019-05-01 2019-08-31 30.pkl
| |---2019-09-01 2019-12-31 30.pkl
| |---2020-02-01 2020-05-31 30.pkl
| |---2021-05-01 2021-08-31 30.pkl
Each '*.pkl' file containing a trained GMM, station usage count,
and daily session count.
custom gmm directory: GMMs can also be trained on custom date ranges
and number components. These are saved in the 'gmms' folder
relative to the current working directory. See train_gmm_model.py
for how to train GMMs from the command line.
"""
ARRCOL, DEPCOL, ESTCOL, EREQCOL = 0, 1, 2, 3
def __init__(self,
site: SiteStr,
date_period: tuple[str, str] | DefaultPeriodStr,
n_components: int = 30,
requested_energy_cap: float = 100,
seed: int | None = None):
"""
Notes:
The generator first searches for a matching GMM directory. If
unfound, it creates one.
"""
super().__init__(site, date_period, requested_energy_cap, seed)
self.n_components = n_components
try:
data = load_gmm_model(site, self.date_range[0], self.date_range[1], n_components)
except FileNotFoundError:
create_gmms(site, n_components, date_ranges=[self.date_range_str])
data = load_gmm_model(site, self.date_range[0], self.date_range[1], n_components)
self.gmm: mixture.GaussianMixture = data[GMM_KEY]
self.cnt: np.ndarray = data[COUNT_KEY]
self.station_usage: np.ndarray = data[STATION_USAGE_KEY]
self.set_seed(seed)
self._update_day()
def __repr__(self) -> str:
"""Returns string representation of GMMsTraceGenerator."""
return (f'{self.n_components}-component GMM-based trace generator for '
f'{self.site__repr__()} {self.date_range__repr__()}')
[docs]
def set_seed(self, seed: int | None) -> None:
"""Sets random seed to make GMM sampling reproducible."""
super().set_seed(seed)
self.gmm.set_params(random_state=seed)
def _sample(self, n: int, oversample_factor: float = 0.2) -> np.ndarray:
"""Returns samples from GMM.
This function over-generates samples and discards those that are not in
bounds (i.e., arrival >= departure).
Args:
n: number of samples to generate.
oversample_factor: fractional amount of n to oversample.
Returns:
samples: array of shape (n, 4) whose columns are arrival time in
minutes, departure time in minutes, estimated departure time in
minutes, and requested energy in kWh.
"""
if n == 0:
return np.empty((0, 4))
# use while loop for quality check
all_samples: list[np.ndarray] = []
num_samples: int = 0
while num_samples < n:
samples = self.gmm.sample(int(n * (1 + oversample_factor)))[0] # shape (1.2*n, 4)
# discard sample if arrival, departure, estimated departure or
# requested energy not in bound
samples = samples[
(0 <= samples[:, self.ARRCOL]) & (samples[:, self.DEPCOL] < 1) &
(samples[:, self.ESTCOL] < 1) & (samples[:, self.EREQCOL] >= 0)
]
# rescale arrival, departure, estimated departure
samples[:, [self.ARRCOL, self.DEPCOL, self.ESTCOL]] = (
MINS_IN_DAY * samples[:, [self.ARRCOL, self.DEPCOL, self.ESTCOL]]
// self.TIME_STEP_DURATION)
# discard sample if arrival >= departure or arrival >= estimated_departure
samples = samples[
(samples[:, self.ARRCOL] < samples[:, self.DEPCOL]) &
(samples[:, self.ARRCOL] < samples[:, self.ESTCOL])
]
# rescale requested energy
samples[:, self.EREQCOL] *= REQ_ENERGY_SCALE
all_samples.append(samples)
num_samples += len(samples)
return np.concatenate(all_samples, axis=0)[:n]
def _create_events(self) -> pd.DataFrame:
"""Creates artificial events for the event queue for a single day.
This method first calls `_sample()` to generate the arrival, departure,
estimated departure, and requested energy fields. Then, it fills
in the other attributes, namely ``session_id`` and ``station_id``, that
were not included in modeling. The ``session_id`` is generated
randomly, and the ``station_id`` is sampled from the empirical
probability density distribution of stations on the date range.
Returns:
events: DataFrame of artificial sessions.
"""
# number of events from empirical pdf
n = int(self.rng.choice(self.cnt))
samples = self._sample(n)
events = pd.DataFrame({
'arrival': samples[:, self.ARRCOL].astype(int),
'departure': samples[:, self.DEPCOL].astype(int),
'estimated_departure': samples[:, self.ESTCOL].astype(int),
'requested_energy (kWh)': np.clip(samples[:, self.EREQCOL], 0, self.requested_energy_cap),
'session_id': [str(uuid.uuid4()) for _ in range(n)]
})
# sort by arrival time for probabilistic sampling of stations
events.sort_values('arrival', inplace=True)
# empirical distribution on stations
station_cnts = self.station_usage / self.station_usage.sum()
# array for last departure time of stations
station_dep = np.full(len(self.station_ids), -1, dtype=np.int32)
station_ids = []
for i in range(n):
avail = np.where(station_dep < events['arrival'].iloc[i])[0]
if len(avail) == 0: # all stations have been taken
station_ids.append('NOT_AVAIL')
else:
station_cnts_sum = station_cnts[avail].sum()
if station_cnts_sum <= 1e-5: # if probability distribution is too small, sample uniformly
idx = self.rng.choice(avail)
else:
# sample according to probability distribution
idx = self.rng.choice(avail, p=station_cnts[avail] / station_cnts_sum)
station_dep[idx] = max(events['departure'].iloc[i], station_dep[idx])
station_ids.append(self.station_ids[idx])
events['station_id'] = station_ids
# toss out EV if all stations are taken
events = events[events['station_id'] != 'NOT_AVAIL']
return events.reset_index()