mavats.MAR

  1from typing import Tuple, Union
  2
  3import numpy as np
  4from numpy import linalg as LA
  5
  6
  7def estimate_mar1(
  8    X: np.ndarray, method: str = "least_square", **kwargs
  9) -> Union[
 10    Tuple[np.ndarray, np.ndarray], Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]
 11]:
 12    r"""
 13    Estimates the first order MAR model $X_t = A X_{t-1} B^T + E_t$
 14    from Chen, Xiao, Yang 2021 (https://doi.org/10.1016/j.jeconom.2020.07.015)
 15
 16    Parameters
 17    ----------
 18    X : (T, m, n) ndarray
 19        The observed data.
 20    method : str, optional
 21        The estimation method. Must be either "proj", "least_square" or "mle".
 22        Note MLE assumes Cov(vec($E_t$)) = $\Sigma_c \otimes \Sigma_r$
 23        where $\Sigma_c \in \mathbb{R}^{n \times n}$ and $\Sigma_r \in \mathbb{R}^{m \times m}$.
 24    **kwargs
 25        Additional arguments for the estimation method.
 26
 27    Returns
 28    -------
 29    A : (m, m) ndarray
 30        The estimated left matrix.
 31    B : (n, n) ndarray
 32        The estimated right matrix.
 33    If method is "mle":
 34    Sigc : (m, m) ndarray
 35        The estimated covariance $\Sigma_c$
 36    Sigr : (n, n) ndarray
 37        The estimated covariance $\Sigma_r$
 38
 39    """
 40    if method == "proj":
 41        return _proj_estimate(X)
 42    elif method == "least_square":
 43        niter = kwargs.get("niter", 50)
 44        A_init = kwargs.get("A_init", None)
 45        B_init = kwargs.get("B_init", None)
 46        init_method = kwargs.get("init_method", "proj")
 47        return _least_square_estimate(X, niter, A_init, B_init, init_method)
 48    elif method == "mle":
 49        niter = kwargs.get("niter", 50)
 50        A_init = kwargs.get("A_init", None)
 51        B_init = kwargs.get("B_init", None)
 52        init_method = kwargs.get("init_method", "proj")
 53        Sigc_init = kwargs.get("Sigc_init", None)
 54        Sigr_init = kwargs.get("Sigr_init", None)
 55        return _mle_estimate(
 56            X, niter, A_init, B_init, init_method, Sigc_init, Sigr_init
 57        )
 58    else:
 59        raise ValueError('method must be either "proj", "least_square" or "mle"')
 60
 61
 62def estimate_residual_cov(X: np.ndarray, A: np.ndarray, B: np.ndarray) -> np.ndarray:
 63    r"""
 64    Estimates the covariance of $\text{Vec}(E_t)$ where $X_t = A X_{t-1} B^T + E_t$
 65    by computing the sample covariance of the vectorized residuals.
 66
 67    Parameters
 68    ----------
 69    X : (T, m, n) ndarray
 70        The observed data.
 71    A : (m, m) ndarray
 72        The estimated left matrix.
 73    B : (n, n) ndarray
 74        The estimated right matrix.
 75
 76    Returns
 77    -------
 78    Sigma : (m*n, m*n) ndarray
 79        The estimated covariance matrix.
 80
 81    """
 82    T, m, n = X.shape
 83    X_tp1 = X[1:]
 84    X_t = X[:-1]
 85    X_tp1_hat = A @ X_t @ B.T
 86    res = X_tp1 - X_tp1_hat
 87    Sigma = np.cov(res.transpose(0, 2, 1).reshape(T - 1, -1).T)
 88    return Sigma
 89
 90
 91def _proj_estimate(X: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
 92    T, m, n = X.shape
 93    X_flat = X.transpose(0, 2, 1).reshape(T, -1)
 94    X_flat_tp1, X_flat_t = X_flat[1:], X_flat[:-1]
 95    Phi = LA.lstsq(X_flat_tp1, X_flat_t, rcond=None)[0].T
 96    Phi_tilde = _rearrange_phi(Phi, m, n)
 97
 98    U, S, Vh = LA.svd(Phi_tilde, full_matrices=False)
 99    d = S[0]
100    u = U[:, 0]
101    v = Vh[0, :]
102    A = u.reshape(m, m, order="F")
103    B = v.reshape(n, n, order="F").T * d
104    A, B = _normalize(A, B)
105    return A, B
106
107
108def _normalize(A: np.ndarray, B: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
109    norm_A = LA.norm(A)
110    A /= norm_A
111    B *= norm_A
112    return A, B
113
114
115def _rearrange_phi(Phi: np.ndarray, m: int, n: int) -> np.ndarray:
116    """
117    Rearranges the mn x mn matrix Phi into an $m^2 \times n^2$ matrix as specified in the paper.
118    """
119    col_splits = np.split(Phi, n, axis=1)
120    blocks = [np.split(col, n, axis=0) for col in col_splits]
121    return np.column_stack(
122        [np.ravel(block, order="F") for row in blocks for block in row]
123    )
124
125
126def _initialize_AB(
127    X: np.ndarray,
128    A_init: Union[np.ndarray, None],
129    B_init: Union[np.ndarray, None],
130    init_method: str = "proj",
131) -> Tuple[np.ndarray, np.ndarray]:
132    T, m, n = X.shape
133    if A_init is None or B_init is None:
134        if init_method == "proj":
135            A_init, B_init = _proj_estimate(X)
136        elif init_method == "random":
137            A_init = np.random.randn(m, m)
138            B_init = np.random.randn(n, n)
139        else:
140            raise ValueError('init_method must be either "proj" or "random"')
141    return A_init, B_init
142
143
144def _least_square_estimate(
145    X: np.ndarray,
146    niter: int = 50,
147    A_init: Union[np.ndarray, None] = None,
148    B_init: Union[np.ndarray, None] = None,
149    init_method: str = "proj",
150) -> Tuple[np.ndarray, np.ndarray]:
151    T, m, n = X.shape
152    A_init, B_init = _initialize_AB(X, A_init, B_init, init_method)
153    A = A_init
154    B = B_init
155    X_tp1 = X[1:]
156    X_t = X[:-1]
157
158    for i in range(niter):
159        A, B = _update_lse(X_tp1, X_t, A, B)
160
161    return A, B
162
163
164def _update_lse(
165    X_tp1: np.ndarray, X_t: np.ndarray, A: np.ndarray, B: np.ndarray
166) -> Tuple[np.ndarray, np.ndarray]:
167    X_tp1_transpose = X_tp1.transpose(0, 2, 1)
168    X_t_transpose = X_t.transpose(0, 2, 1)
169
170    XBXT = np.sum(X_tp1 @ B @ X_t_transpose, axis=0)
171    XBTBXT = np.sum(X_t @ B.T @ B @ X_t_transpose, axis=0)
172
173    A = np.linalg.solve(XBTBXT, XBXT)
174
175    XTAX = np.sum(X_tp1_transpose @ A @ X_t, axis=0)
176    XTATAX = np.sum(X_t_transpose @ A.T @ A @ X_t, axis=0)
177
178    B = np.linalg.solve(XTATAX, XTAX)
179
180    A, B = _normalize(A, B)
181
182    return A, B
183
184
185def _mle_estimate(
186    X: np.ndarray,
187    niter: int = 50,
188    A_init: Union[np.ndarray, None] = None,
189    B_init: Union[np.ndarray, None] = None,
190    init_method: str = "proj",
191    Sigc_init: Union[np.ndarray, None] = None,
192    Sigr_init: Union[np.ndarray, None] = None,
193) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
194    T, m, n = X.shape
195    A_init, B_init = _initialize_AB(X, A_init, B_init, init_method)
196    A = A_init
197    B = B_init
198    if Sigc_init is None or Sigr_init is None:
199        Sigc_init = np.eye(n)
200        Sigr_init = np.eye(m)
201    Sigc = Sigc_init
202    Sigr = Sigr_init
203    X_tp1 = X[1:]
204    X_t = X[:-1]
205
206    for i in range(niter):
207        A, B, Sigc, Sigr = _update_mle(X_tp1, X_t, A, B, Sigc, Sigr)
208
209    return A, B, Sigc, Sigr
210
211
212def _update_mle(
213    X_tp1: np.ndarray,
214    X_t: np.ndarray,
215    A: np.ndarray,
216    B: np.ndarray,
217    Sigc: np.ndarray,
218    Sigr: np.ndarray,
219) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
220    A = _update_A_mle(X_tp1, X_t, A, B, Sigc, Sigr)
221    B = _update_B_mle(X_tp1, X_t, A, B, Sigc, Sigr)
222    R = _compute_R(X_tp1, X_t, A, B)
223    Sigc = _update_Sigc_mle(X_tp1, X_t, A, B, R, Sigc, Sigr)
224    Sigr = _update_Sigr_mle(X_tp1, X_t, A, B, R, Sigc, Sigr)
225
226    A, B = _normalize(A, B)
227    Sigr, Sigc = _normalize(Sigr, Sigc)
228
229    return A, B, Sigc, Sigr
230
231
232def _update_A_mle(
233    X_tp1: np.ndarray,
234    X_t: np.ndarray,
235    A: np.ndarray,
236    B: np.ndarray,
237    Sigc: np.ndarray,
238    Sigr: np.ndarray,
239) -> np.ndarray:
240    X_tp1_transpose = X_tp1.transpose(0, 2, 1)
241    X_t_transpose = X_t.transpose(0, 2, 1)
242    Sigc_inv = LA.inv(Sigc)
243
244    YSBX = np.sum(X_tp1 @ Sigc_inv @ B @ X_t_transpose, axis=0)
245    XBSBX = np.sum(X_t @ B.T @ Sigc_inv @ B @ X_t_transpose, axis=0)
246
247    A = np.linalg.solve(XBSBX, YSBX)
248    return A
249
250
251def _update_B_mle(
252    X_tp1: np.ndarray,
253    X_t: np.ndarray,
254    A: np.ndarray,
255    B: np.ndarray,
256    Sigc: np.ndarray,
257    Sigr: np.ndarray,
258) -> np.ndarray:
259    X_tp1_transpose = X_tp1.transpose(0, 2, 1)
260    X_t_transpose = X_t.transpose(0, 2, 1)
261    Sigr_inv = LA.inv(Sigr)
262    YSAX = np.sum(X_tp1_transpose @ Sigr_inv @ A @ X_t, axis=0)
263    XASAX = np.sum(X_t_transpose @ A.T @ Sigr_inv @ A @ X_t, axis=0)
264
265    B = np.linalg.solve(XASAX, YSAX)
266    return B
267
268
269def _compute_R(
270    X_tp1: np.ndarray, X_t: np.ndarray, A: np.ndarray, B: np.ndarray
271) -> np.ndarray:
272    return X_tp1 - A @ X_t @ B.T
273
274
275def _update_Sigc_mle(
276    X_tp1: np.ndarray,
277    X_t: np.ndarray,
278    A: np.ndarray,
279    B: np.ndarray,
280    R: np.ndarray,
281    Sigc: np.ndarray,
282    Sigr: np.ndarray,
283) -> np.ndarray:
284    m, _ = A.shape
285    T, _, _ = X_t.shape
286    R_transpose = R.transpose(0, 2, 1)
287    Sigr_inv = LA.inv(Sigr)
288    RSR = np.sum(R_transpose @ Sigr_inv @ R, axis=0)
289    Sigc = RSR / (T * m)
290    return Sigc
291
292
293def _update_Sigr_mle(
294    X_tp1: np.ndarray,
295    X_t: np.ndarray,
296    A: np.ndarray,
297    B: np.ndarray,
298    R: np.ndarray,
299    Sigc: np.ndarray,
300    Sigr: np.ndarray,
301) -> np.ndarray:
302    n, _ = B.shape
303    T, _, _ = X_t.shape
304    R_transpose = R.transpose(0, 2, 1)
305    Sigc_inv = LA.inv(Sigc)
306    RSR = np.sum(R @ Sigc_inv @ R_transpose, axis=0)
307    Sigr = RSR / (T * n)
308    return Sigr
def estimate_mar1( X: numpy.ndarray, method: str = 'least_square', **kwargs) -> Union[Tuple[numpy.ndarray, numpy.ndarray], Tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray]]:
 8def estimate_mar1(
 9    X: np.ndarray, method: str = "least_square", **kwargs
10) -> Union[
11    Tuple[np.ndarray, np.ndarray], Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]
12]:
13    r"""
14    Estimates the first order MAR model $X_t = A X_{t-1} B^T + E_t$
15    from Chen, Xiao, Yang 2021 (https://doi.org/10.1016/j.jeconom.2020.07.015)
16
17    Parameters
18    ----------
19    X : (T, m, n) ndarray
20        The observed data.
21    method : str, optional
22        The estimation method. Must be either "proj", "least_square" or "mle".
23        Note MLE assumes Cov(vec($E_t$)) = $\Sigma_c \otimes \Sigma_r$
24        where $\Sigma_c \in \mathbb{R}^{n \times n}$ and $\Sigma_r \in \mathbb{R}^{m \times m}$.
25    **kwargs
26        Additional arguments for the estimation method.
27
28    Returns
29    -------
30    A : (m, m) ndarray
31        The estimated left matrix.
32    B : (n, n) ndarray
33        The estimated right matrix.
34    If method is "mle":
35    Sigc : (m, m) ndarray
36        The estimated covariance $\Sigma_c$
37    Sigr : (n, n) ndarray
38        The estimated covariance $\Sigma_r$
39
40    """
41    if method == "proj":
42        return _proj_estimate(X)
43    elif method == "least_square":
44        niter = kwargs.get("niter", 50)
45        A_init = kwargs.get("A_init", None)
46        B_init = kwargs.get("B_init", None)
47        init_method = kwargs.get("init_method", "proj")
48        return _least_square_estimate(X, niter, A_init, B_init, init_method)
49    elif method == "mle":
50        niter = kwargs.get("niter", 50)
51        A_init = kwargs.get("A_init", None)
52        B_init = kwargs.get("B_init", None)
53        init_method = kwargs.get("init_method", "proj")
54        Sigc_init = kwargs.get("Sigc_init", None)
55        Sigr_init = kwargs.get("Sigr_init", None)
56        return _mle_estimate(
57            X, niter, A_init, B_init, init_method, Sigc_init, Sigr_init
58        )
59    else:
60        raise ValueError('method must be either "proj", "least_square" or "mle"')

Estimates the first order MAR model $X_t = A X_{t-1} B^T + E_t$ from Chen, Xiao, Yang 2021 (https://doi.org/10.1016/j.jeconom.2020.07.015)

Parameters
  • X ((T, m, n) ndarray): The observed data.
  • method (str, optional): The estimation method. Must be either "proj", "least_square" or "mle". Note MLE assumes Cov(vec($E_t$)) = $\Sigma_c \otimes \Sigma_r$ where $\Sigma_c \in \mathbb{R}^{n \times n}$ and $\Sigma_r \in \mathbb{R}^{m \times m}$.
  • **kwargs: Additional arguments for the estimation method.
Returns
  • A ((m, m) ndarray): The estimated left matrix.
  • B ((n, n) ndarray): The estimated right matrix.
  • If method is "mle":
  • Sigc ((m, m) ndarray): The estimated covariance $\Sigma_c$
  • Sigr ((n, n) ndarray): The estimated covariance $\Sigma_r$
def estimate_residual_cov(X: numpy.ndarray, A: numpy.ndarray, B: numpy.ndarray) -> numpy.ndarray:
63def estimate_residual_cov(X: np.ndarray, A: np.ndarray, B: np.ndarray) -> np.ndarray:
64    r"""
65    Estimates the covariance of $\text{Vec}(E_t)$ where $X_t = A X_{t-1} B^T + E_t$
66    by computing the sample covariance of the vectorized residuals.
67
68    Parameters
69    ----------
70    X : (T, m, n) ndarray
71        The observed data.
72    A : (m, m) ndarray
73        The estimated left matrix.
74    B : (n, n) ndarray
75        The estimated right matrix.
76
77    Returns
78    -------
79    Sigma : (m*n, m*n) ndarray
80        The estimated covariance matrix.
81
82    """
83    T, m, n = X.shape
84    X_tp1 = X[1:]
85    X_t = X[:-1]
86    X_tp1_hat = A @ X_t @ B.T
87    res = X_tp1 - X_tp1_hat
88    Sigma = np.cov(res.transpose(0, 2, 1).reshape(T - 1, -1).T)
89    return Sigma

Estimates the covariance of $\text{Vec}(E_t)$ where $X_t = A X_{t-1} B^T + E_t$ by computing the sample covariance of the vectorized residuals.

Parameters
  • X ((T, m, n) ndarray): The observed data.
  • A ((m, m) ndarray): The estimated left matrix.
  • B ((n, n) ndarray): The estimated right matrix.
Returns
  • Sigma ((mn, mn) ndarray): The estimated covariance matrix.