Source code for sustaingym.algorithms.building.mpc_controller

from __future__ import annotations

import cvxpy as cp
import numpy as np

from sustaingym.envs.building import BuildingEnv


[docs] class MPCAgent: """ Args: env: An object representing the environment for the agent. beta: temperature error penalty weight for reward function pnorm: p to use for norm in reward function safety_margin: A safety margin factor for constraints. planning_steps: Number of steps over which to plan. TODO: rewrite using cp.Parameters """ def __init__( self, env: BuildingEnv, beta: float, pnorm: float, safety_margin: float = 0.9, planning_steps: int = 1, ): self.env = env self.beta = beta self.pnorm = pnorm self.safety_margin = safety_margin self.planning_steps = planning_steps self.action_dim: int = env.action_space.shape[0] self.temp: float = env.out_temp[env.epoch] self.ground_temp: float = env.ground_temp[env.epoch] self.metabolism: float = env.metabolism[env.epoch] self.ghi: float = env.ghi[env.epoch]
[docs] def predict(self) -> tuple[np.ndarray, np.ndarray]: """ Args: env: An object representing the environment for the agent. Returns: tuple: (optimal action, predicted state) """ env = self.env A_d = env.A_d B_d = env.B_d n = env.n self.temp = env.out_temp[env.epoch] self.ground_temp = env.ground_temp[env.epoch] self.metabolism = env.metabolism[env.epoch] self.ghi = env.ghi[env.epoch] action = np.zeros(self.action_dim) x0 = cp.Parameter(n, name="x0") u_max = cp.Parameter(self.action_dim, name="u_max") u_min = cp.Parameter(self.action_dim, name="u_min") x = cp.Variable((n, self.planning_steps + 1), name="x") u = cp.Variable((self.action_dim, self.planning_steps), name="u") x0.value = env.state[:n] u_max.value = 1.0 * np.ones(self.action_dim) u_min.value = -1.0 * np.ones(self.action_dim) x_desired = env.target obj = 0 constr = [x[:, 0] == x0] avg_temp = np.sum(x0.value) / n Meta = self.metabolism self.Occupower = ( 6.461927 + 0.946892 * Meta + 0.0000255737 * Meta**2 - 0.0627909 * avg_temp * Meta + 0.0000589172 * avg_temp * Meta**2 - 0.19855 * avg_temp**2 + 0.000940018 * avg_temp**2 * Meta - 0.00000149532 * avg_temp**2 * Meta**2 ) for t in range(self.planning_steps): constr += [ x[:, t + 1] == A_d @ x[:, t].T + B_d[:, 3:-1] @ u[:, t] + B_d[:, 2] * self.temp + B_d[:, 1] * self.ground_temp + B_d[:, 0] * self.Occupower + B_d[:, -1] * self.ghi, u[:, t] <= u_max, u[:, t] >= u_min, ] obj += self.beta * cp.norm( cp.multiply(x[:, t], env.ac_map) - x_desired * env.ac_map, self.pnorm ) + (1 - self.beta) * 24 * cp.norm(u[:, t], self.pnorm) prob = cp.Problem(cp.Minimize(obj), constr) prob.solve(solver="ECOS_BB") state = x[:, 1].value if env.is_continuous_action: action = u.value[:, 0] else: action = (u.value[:, 0] * 100 - env.Qlow * 100).astype(int) return action, state
[docs] class MPCAgent_DataDriven: """ Args: env: An object representing the environment for the agent. beta: temperature error penalty weight for reward function pnorm: p to use for norm in reward function safety_margin: A safety margin factor for constraints. planning_steps: Number of steps over which to plan. TODO: rewrite using cp.Parameters """ def __init__( self, env: BuildingEnv, beta: float, pnorm: float, safety_margin: float = 0.9, planning_steps: int = 1, ): self.env = env self.beta = beta self.pnorm = pnorm self.safety_margin = safety_margin self.planning_steps = planning_steps self.action_dim: int = env.action_space.shape[0] self.temp: float = env.out_temp[env.epoch] self.ground_temp: float = env.ground_temp[env.epoch] self.metabolism: float = env.metabolism[env.epoch] self.ghi: float = env.ghi[env.epoch]
[docs] def predict(self) -> tuple[np.ndarray, np.ndarray]: """ Args: env: An object representing the environment for the agent. Returns: tuple: (optimal action, predicted state) """ env = self.env A_d = env.A_d B_d = env.B_d n = env.n self.temp = env.out_temp[env.epoch] self.ground_temp = env.ground_temp[env.epoch] self.metabolism = env.metabolism[env.epoch] self.ghi = env.ghi[env.epoch] action = np.zeros(self.action_dim) x0 = cp.Parameter(n, name="x0") u_max = cp.Parameter(self.action_dim, name="u_max") u_min = cp.Parameter(self.action_dim, name="u_max") x = cp.Variable((n, self.planning_steps + 1), name="x") u = cp.Variable((self.action_dim, self.planning_steps), name="u") x0.value = env.state[:n] u_max.value = 1.0 * np.ones(self.action_dim) u_min.value = -1.0 * np.ones(self.action_dim) x_desired = env.target obj = 0 constr = [x[:, 0] == x0] avg_temp = np.sum(x0.value) / n Meta = self.metabolism for t in range(self.planning_steps): constr += [ x[:, t + 1] == A_d @ x[:, t].T + B_d[:, 6:-1] @ u[:, t] + B_d[:, 5] * self.temp + B_d[:, 4] * self.ground_temp + B_d[:, 3] * Meta + B_d[:, 2] * Meta**2 + B_d[:, 1] * avg_temp + B_d[:, 0] * avg_temp**2 + B_d[:, -1] * self.ghi, u[:, t] <= u_max, u[:, t] >= u_min, ] obj += self.beta * cp.norm( cp.multiply(x[:, t], env.ac_map) - x_desired * env.ac_map, self.pnorm ) + (1 - self.beta) * 24 * cp.norm(u[:, t], self.pnorm) prob = cp.Problem(cp.Minimize(obj), constr) prob.solve(solver="ECOS_BB") state = x[:, 1].value if env.is_continuous_action: action = u.value[:, 0] else: action = (u.value[:, 0] * 100 - env.Qlow * 100).astype(int) return action, state