Source code for sustaingym.algorithms.evcharging.baselines

"""
This module implements baseline algorithms for EVChargingEnv.
"""
from __future__ import annotations

from typing import Any

import cvxpy as cp
from gymnasium import spaces
import numpy as np
# from stable_baselines3.common.base_class import BaseAlgorithm

from sustaingym.algorithms.base import BaseAlgorithm
from sustaingym.envs.evcharging import EVChargingEnv
from sustaingym.envs.evcharging.env import magnitude_constraint
from sustaingym.envs.utils import solve_mosek

[docs] MAX_ACTION = 1 # Default maximum action for gym (default continuous action space)
[docs] D_MAX_ACTION = 4 # Discrete maximum action for action wrapper
[docs] class GreedyAlgorithm(BaseAlgorithm): """ Per-time step greedy charging. Whether the action space is continuous or discrete, GreedyAlgorithm outputs the maximum pilot signal allowed. """ def __init__(self, env: EVChargingEnv): super().__init__(env, multiagent=False) self.continuous_action_space = isinstance(env.action_space, spaces.Box) self.max_action = MAX_ACTION if self.continuous_action_space else D_MAX_ACTION
[docs] def get_action(self, observation: dict[str, Any]) -> np.ndarray: """Returns greedy charging action.""" # Send full charging rate wherever demands are non-zero return np.where(observation['demands'] > 0, self.max_action, 0)
[docs] class RandomAlgorithm(BaseAlgorithm): """Random action.""" def __init__(self, env: EVChargingEnv): super().__init__(env, multiagent=False) self.continuous_action_space = isinstance(env.action_space, spaces.Box) self.rng = np.random.default_rng()
[docs] def get_action(self, observation: dict[str, Any]) -> np.ndarray: """Returns random charging action.""" if self.continuous_action_space: action = self.rng.random(size=self.env.num_stations) else: action = self.rng.choice(D_MAX_ACTION + 1, size=self.env.num_stations) return action.astype(np.float32)
[docs] class MPC(BaseAlgorithm): """Model predictive control. See `BaseAlgorithm` for more attributes. Args: env: EV charging environment lookahead: number of timesteps to forecast future trajectory Attributes: lookahead: number of timesteps to forecast future trajectory. Note that MPC cannot see future car arrivals and does not take them into account. """ def __init__(self, env: EVChargingEnv, lookahead: int = 12): super().__init__(env) assert isinstance(env.action_space, spaces.Box), \ "MPC only supports continuous action space" assert lookahead <= env.moer_forecast_steps, \ "MPC lookahead must be less than forecasted timesteps" self.lookahead = lookahead # Optimization problem setup # VARIABLES # MPC action trajectory self.traj = cp.Variable((env.num_stations, self.lookahead), nonneg=True) # PARAMETERS # Current timestep demands in kWh self.demands_cvx = cp.Parameter(env.num_stations, nonneg=True) # Forecasted moers self.moers = cp.Parameter(self.lookahead, nonneg=True) # Boolean mask for when an EV is expected to leave self.mask = cp.Parameter((env.num_stations, self.lookahead), nonneg=True) # OBJECTIVE # Maximize profit and minimize carbon cost profit = cp.sum(self.traj) * env.ACTION_SCALE_FACTOR * env.PROFIT_FACTOR carbon_cost = cp.sum(self.traj @ self.moers) * env.ACTION_SCALE_FACTOR * env.CARBON_COST_FACTOR objective = cp.Maximize(profit - carbon_cost) # CONSTRAINTS constraints = [ # Cannot charge after EV leaves, using estimation as a proxy self.traj <= self.mask, # Cannot overcharge demand cp.sum(self.traj, axis=1) <= self.demands_cvx / env.A_PERS_TO_KWH / env.ACTION_SCALE_FACTOR, # Cannot break network constraints magnitude_constraint(action=self.traj, cn=env.cn) ] # Formulate problem self.prob = cp.Problem(objective, constraints) assert self.prob.is_dpp() and self.prob.is_dcp()
[docs] def get_action(self, observation: dict[str, Any]) -> np.ndarray: """Returns first action of the MPC trajectory.""" self.demands_cvx.value = observation['demands'] self.moers.value = observation['forecasted_moer'][:self.lookahead] # If estimated departure has already passed, assume car will stay for 1 timestep cur_est_dep = np.maximum(1., observation['est_departures']).astype(np.int32) cur_est_dep = np.where(observation['demands'] > 0, cur_est_dep, 0) mask = np.zeros((self.env.num_stations, self.lookahead)) for i in range(self.env.num_stations): # Max action capped at 1 always mask[i, :cur_est_dep[i]] = MAX_ACTION self.mask.value = mask solve_mosek(self.prob) return self.traj.value[:, 0] # take first action
[docs] class OfflineOptimal(BaseAlgorithm): """Calculates best performance of a controller that knows the future. Args: env: EV charging environment """
[docs] TOTAL_TIMESTEPS = 288
def __init__(self, env: EVChargingEnv): super().__init__(env) assert isinstance(env.action_space, spaces.Box), \ "Offline optimal only supports continuous action space" # Optimization problem setup similar to MPC # VARIABLES # Oracle actions, normalized in range [0, 1] self.traj = cp.Variable((env.num_stations, self.TOTAL_TIMESTEPS), nonneg=True) # EV charging demands, in action-periods self.demands = cp.Variable((env.num_stations, self.TOTAL_TIMESTEPS + 1), nonneg=True) # PARAMETERS # True moers self.moers = cp.Parameter(self.TOTAL_TIMESTEPS, nonneg=True) # Boolean mask for when an EV actually leaves self.mask = cp.Parameter((env.num_stations, self.TOTAL_TIMESTEPS), nonneg=True) # OBJECTIVE # Maximize profit and minimize carbon cost profit = cp.sum(self.traj) * env.ACTION_SCALE_FACTOR * env.PROFIT_FACTOR carbon_cost = cp.sum(self.traj @ self.moers) * env.ACTION_SCALE_FACTOR * env.CARBON_COST_FACTOR self._objective = cp.Maximize(profit - carbon_cost) self.reset()
[docs] def reset(self) -> None: """Reset timestep count.""" self.timestep = 0 # CONSTRAINTS (only a partial list, more are added in get_action()) self._constraints = [ # cannot charge after EV leaves self.traj <= self.mask, # aggregate magnitude constraint magnitude_constraint(action=self.traj, cn=self.env.cn) ]
[docs] def get_action(self, observation: dict[str, Any]) -> np.ndarray: """ On first call, get action for all timesteps (should be directly after environment reset). """ env = self.env if self.timestep == 0: # Set marginal emissions to true values self.moers.value = env.moer[1:self.TOTAL_TIMESTEPS + 1, 0] # Convert station id to index in charging network station_idx = {evse: i for i, evse in enumerate(env.cn.station_ids)} mask = np.zeros((env.num_stations, self.TOTAL_TIMESTEPS)) for ev in env._evs: ev_idx = station_idx[ev.station_id] # Set mask using true arrival and departures mask[ev_idx, ev.arrival:ev.departure] = MAX_ACTION # Set starting demand constraints self._constraints.append( self.demands[ev_idx, ev.arrival] == ev.requested_energy / env.A_PERS_TO_KWH / env.ACTION_SCALE_FACTOR) # Set inter-period charge and remaining demand constraints if ev.arrival + 1 < ev.departure: self._constraints.append( self.demands[ev_idx, ev.arrival+1:ev.departure] == self.demands[ev_idx, ev.arrival:ev.departure-1] - self.traj[ev_idx, ev.arrival:ev.departure-1]) self.mask.value = mask # Formulate problem self.prob = cp.Problem(self._objective, self._constraints) assert self.prob.is_dpp() and self.prob.is_dcp() solve_mosek(self.prob) try: action = self.traj.value[:, self.timestep] except IndexError as e: print(e) print(self.timestep) action = np.zeros(54) self.timestep += 1 return action
# class RLAlgorithm(BaseAlgorithm): # """RL algorithm wrapper. # Attributes: # env (EVChargingEnv): EV charging environment # rl_model (BaseAlgorithm): can be from stable baselines # """ # def __init__(self, env: EVChargingEnv, rl_model: BaseAlgorithm): # """ # Args: # env (EVChargingEnv): EV charging environment # rl_model (BaseAlgorithm): can be from stable baselines # """ # super().__init__(env) # self.rl_model = rl_model # def get_action(self, observation: dict[str, Any]) -> np.ndarray: # """Returns output of RL model. # Args: # *See get_action() in BaseAlgorithm. # Returns: # *See get_action() in BaseAlgorithm. # """ # return self.rl_model.predict(observation, deterministic=True)[0]