Source code for do_dpc.dpc.gamma_ddpc
# pylint: disable=line-too-long
"""
gamma-DPC implementation.
This module implements gamma-DPC based on the methodology presented in:
- `Uncertainty-aware data-driven predictive control in a stochastic setting <https://arxiv.org/pdf/2211.10321>`_
- `Data-driven predictive control in a stochastic setting: a unified framework <https://www.sciencedirect.com/science/article/pii/S0005109823001139>`_
The gamma-DPC controller is a subclass of Data-Driven Predictive Control (DPC).
"""
import logging
from dataclasses import dataclass
from typing import Optional
import cvxpy as cp
import numpy as np
from numpy.linalg import pinv
from do_dpc.dpc.dpc import DPC
from do_dpc.dpc.dpc_structs import (
DPCParameters,
DPCClosedFormSolutionMatrices,
DPCPredictorMatrices,
DPCRegularizationMatrices,
)
from do_dpc.control_utils.control_structs import InputOutputTrajectory
from do_dpc.dpc.dpc_utils import DPCUtils
logger = logging.getLogger(__name__)
[docs]
@dataclass
class GammaDDPCPredictorMatrices(DPCPredictorMatrices):
r""""
Stores predictor matrices for the gamma-DPC controller.
The LQ decomposition of the Hankel Matrix is represented as:
.. math::
L =
\begin{pmatrix}
L_{11} & 0 & 0 \\
L_{21} & L_{22} & 0 \\
L_{31} & L_{32} & L_{33}
\end{pmatrix}
and the concatenated matrix :math:`L_{2,3}` is:
.. math::
L_{2,3} =
\begin{pmatrix}
L_{21} & L_{22} \\
L_{31} & L_{32}
\end{pmatrix}
Attributes:
pinv_L_11 (np.ndarray): Pseudo-inverse of :math:`L_{11}`.
L_2_3 (np.ndarray): Concatenated matrix from :math:`L_{21}, L_{22}, L_{31}, L_{32}`.
L_21 (np.ndarray): Part of :math:`L` representing :math:`\gamma_1` and :math:`U_f`.
L_22 (np.ndarray): Part of :math:`L` representing :math:`\gamma_2` and :math:`U_f`.
L_31 (np.ndarray): Part of :math:`L` representing :math:`\gamma_1` and :math:`Y_f`.
L_32 (np.ndarray): Part of :math:`L` representing :math:`\gamma_2` and :math:`Y_f`.
"""
pinv_L_11: np.ndarray
L_2_3: np.ndarray
L_21: np.ndarray
L_22: np.ndarray
L_31: np.ndarray
L_32: np.ndarray
[docs]
@dataclass
class GammaDDPCRegularizationMatrices(DPCRegularizationMatrices):
"""Currently there is no regularization cost function for the Gamma DPC algorithm."""
[docs]
@DPC.register("GammaDDPC")
class GammaDDPC(DPC):
"""
Implements gamma-DPC based on DPC.
Attributes:
gamma_2_cp (cp.Variable): Slack decision variable for the gamma parameter in the optimization.
lambda_gamma_2 (float): Tunable parameter associated with the gamma-2 term.
"""
def __init__(self, dpc_params: DPCParameters, training_data: InputOutputTrajectory):
"""
Initializes the gamma DPC controller.
"""
super().__init__(dpc_params, training_data)
# Additional slack variables
self.gamma_2_cp = cp.Variable(self.dims.n_u_f)
# Tunable parameters
self.lambda_gamma_2 = 0.0
def calculate_predictor_matrices(self) -> GammaDDPCPredictorMatrices:
"""
Computes the L matrices.
Returns:
GammaDDPCOfflineData
"""
H_stacked = np.vstack((self.hankel_matrices.Z_p, self.hankel_matrices.U_f, self.hankel_matrices.Y_f))
L = DPCUtils.save_lq_decomposition(H_stacked @ H_stacked.T)
mp_taup = self.dims.n_z_p
mp_taup_m_tauf = self.dims.n_z_p + self.dims.n_u_f
mp_taup_mp_tauf = self.dims.n_z_p + self.dims.n_u_f + self.dims.n_y_f
L_11 = L[:mp_taup, :mp_taup]
L_21 = L[mp_taup:mp_taup_m_tauf, :mp_taup]
L_22 = L[mp_taup:mp_taup_m_tauf, mp_taup:mp_taup_m_tauf]
L_31 = L[mp_taup_m_tauf:mp_taup_mp_tauf, :mp_taup]
L_32 = L[mp_taup_m_tauf:mp_taup_mp_tauf, mp_taup:mp_taup_m_tauf]
# L_33 = L[mp_taup_m_tauf:mp_taup_mp_tauf, mp_taup_m_tauf:mp_taup_mp_tauf]
L_2_3 = np.vstack((np.hstack((L_21, L_22)), np.hstack((L_31, L_32))))
return GammaDDPCPredictorMatrices(pinv_L_11=pinv(L_11), L_2_3=L_2_3, L_21=L_21, L_22=L_22, L_31=L_31, L_32=L_32)
def calculate_regularization_matrices(self) -> GammaDDPCRegularizationMatrices:
return GammaDDPCRegularizationMatrices()
def get_regularization_cost_expression(self) -> cp.Expression:
r"""
Calculates and returns the CVXPY expression for the regularization cost.
The regularization cost is calculated as follows:
.. math::
\text{cost} = \lambda_{\gamma_2} \|\gamma_2\|_2^2
Returns:
cp.Expression: The CVXPY expression for the regularization cost.
"""
cost = cp.Constant(0.0)
cost += self.lambda_gamma_2 * cp.norm(self.gamma_2_cp, 2) ** 2
return cost
def get_predictor_constraint_expression(self) -> cp.Constraint:
r"""
Calculates and returns the CVXPY constraint for the predictor constraint.
The predictor constraint is calculated as follows:
.. math::
\gamma_1 = L_{11}^{-1} z_p \\
\begin{bmatrix} u_f \\ y_f \end{bmatrix} = L_{23} \begin{bmatrix} \gamma_1 \\ \gamma_2 \end{bmatrix}
Returns:
cp.Constraint: The CVXPY constraint for the predictor constraint.
"""
# Setting gamma_1 = L_11^-1 z_p
gamma_1 = self.pred_matrices.pinv_L_11 @ self.z_p_cp # type: ignore
gamma = cp.hstack((gamma_1, self.gamma_2_cp)) # type: ignore
# Equality constraints for u_f, y_f
return cp.hstack((self.u_f_cp, self.y_f_cp)) == self.pred_matrices.L_2_3 @ gamma # type: ignore
def calculate_closed_form_solution_matrices(self) -> Optional[DPCClosedFormSolutionMatrices]:
"""
Calculates and returns the closed-form gains for the DPC controller.
The gains are calculated using the pseudo-inverse of the transformed matrices.
Returns:
Optional[DPCClosedFormSolutionMatrices]: The closed-form gains for the DPC controller,
"""
# pylint: disable=too-many-locals
pinv_L_11 = self.pred_matrices.pinv_L_11 # type: ignore
L_21 = self.pred_matrices.L_21 # type: ignore
L_22 = self.pred_matrices.L_22 # type: ignore
L_31 = self.pred_matrices.L_31 # type: ignore
L_32 = self.pred_matrices.L_32 # type: ignore
Q_h = self.dpc_params.Q_horizon
R_h = self.dpc_params.R_horizon
lambda_gamma_2 = getattr(self, "lambda_gamma_2", 0)
T_1 = pinv(L_22) @ L_21 @ pinv_L_11
T_2 = pinv(L_22)
T_3 = L_31 @ pinv_L_11 - L_32 @ T_1
T_4 = L_32 @ T_2
F_1 = pinv(T_4.T @ Q_h @ T_4 + R_h + lambda_gamma_2 * T_2.T @ T_2)
F_2 = -T_4.T @ Q_h @ T_3 + lambda_gamma_2 * T_2.T @ T_1
F_3 = T_4.T @ Q_h
return DPCClosedFormSolutionMatrices(K_z_p=F_1 @ F_2, K_y_r=F_1 @ F_3, K_u_r=F_1 @ R_h)
def set_lambda_gamma_2(self, new_lambda_gamma_2: float):
"""Set the value of lambda_gamma_2, ensuring it is positive.
Args:
new_lambda_gamma_2 (float): The new value for lambda_gamma_2. Must be a positive number.
Note:
The closed-form data will be recalculated as the matrices depend on lambda_gamma_2.
Raises:
TypeError: If new_lambda_gamma_2 is not a number.
ValueError: If new_lambda_gamma_2 is not positive.
"""
if not isinstance(new_lambda_gamma_2, (int, float)):
raise TypeError("new_lambda_gamma_2 must be a number.")
if new_lambda_gamma_2 <= 0:
raise ValueError("new_lambda_gamma_2 must be a positive number.")
self.lambda_gamma_2 = new_lambda_gamma_2
self.cf_matrices = self.calculate_closed_form_solution_matrices()