import numpy as np def difference(x, d=1): if d == 0: return x else: x = np.r_[x[0], np.diff(x)] return difference(x, d - 1) def undo_difference(x, d=1): if d == 1: return np.cumsum(x) else: x = np.cumsum(x) return undo_difference(x, d - 1) def lag_view(x, order): """ For every value X_i create a row that lags k values: [X_i-1, X_i-2, ... X_i-k] """ y = x.copy() # Create features by shifting the window of `order` size by one step. # This results in a 2D array [[t1, t2, t3], [t2, t3, t4], ... [t_k-2, t_k-1, t_k]] x = np.array([y[-(i + order):][:order] for i in range(y.shape[0])]) # Reverse the array as we started at the end and remove duplicates. # Note that we truncate the features [order -1:] and the labels [order] # This is the shifting of the features with one time step compared to the labels x = np.stack(x)[::-1][order - 1: -1] y = y[order:] return x, y def least_squares(x, y): return np.linalg.inv((x.T @ x)) @ (x.T @ y) class LinearModel: def __init__(self, fit_intercept=True): self.fit_intercept = fit_intercept self.beta = None self.intercept_ = None self.coef_ = None def _prepare_features(self, x): if self.fit_intercept: x = np.hstack((np.ones((x.shape[0], 1)), x)) return x def fit(self, x, y): x = self._prepare_features(x) self.beta = least_squares(x, y) if self.fit_intercept: self.intercept_ = self.beta[0] self.coef_ = self.beta[1:] else: self.coef_ = self.beta def predict(self, x): x = self._prepare_features(x) return x @ self.beta def fit_predict(self, x, y): self.fit(x, y) return self.predict(x) class ARMA(LinearModel): def __init__(self, q, p): """ An ARIMA model. :param q: (int) Order of the MA model. :param p: (int) Order of the AR model. """ super().__init__(True) self.p = p self.q = q self.ar = None self.resid = None def prepare_features(self, x): ar_features = None ma_features = None # Determine the features and the epsilon terms for the MA process if self.q > 0: if self.ar is None: self.ar = ARIMA(0, 0, self.p) self.ar.fit_predict(x) eps = self.ar.resid eps[0] = 0 # prepend with zeros as there are no residuals_t-k in the first X_t ma_features, _ = lag_view(np.r_[np.zeros(self.q), eps], self.q) # Determine the features for the AR process if self.p > 0: # prepend with zeros as there are no X_t-k in the first X_t ar_features = lag_view(np.r_[np.zeros(self.p), x], self.p)[0] if ar_features is not None and ma_features is not None: n = min(len(ar_features), len(ma_features)) ar_features = ar_features[:n] ma_features = ma_features[:n] features = np.hstack((ar_features, ma_features)) elif ma_features is not None: n = len(ma_features) features = ma_features[:n] else: n = len(ar_features) features = ar_features[:n] return features, x[:n] def fit(self, x): features, x = self.prepare_features(x) super().fit(features, x) return features def fit_predict(self, x): """ Fit and transform input :param x: (array) with time series. """ features = self.fit(x) return self.predict(x, prepared=(features)) def predict(self, x, **kwargs): """ :param x: (array) :kwargs: prepared: (tpl) containing the features, eps and x """ features = kwargs.get('prepared', None) if features is None: features, x = self.prepare_features(x) y = super().predict(features) self.resid = x - y return y def forecast(self, x, n): """ Forecast the time series. :param x: (array) Current time steps. :param n: (int) Number of time steps in the future. """ features, x = self.prepare_features(x) y = super().predict(features) # Append n time steps as zeros. Because the epsilon terms are unknown y = np.r_[y, np.zeros(n)] for i in range(n): feat = np.r_[y[-(self.p + n) + i: -n + i], np.zeros(self.q)] y[x.shape[0] + i] = super().predict(feat[None, :]) return y class ARIMA(ARMA): def __init__(self, q, d, p): """ An ARIMA model. :param q: (int) Order of the MA model. :param p: (int) Order of the AR model. :param d: (int) Number of times the data needs to be differenced. """ super().__init__(q, p) self.d = d def return_output(self, x): if self.d > 0: x = undo_difference(x, self.d) return x def prepare_features(self, x): if self.d > 0: x = difference(x, self.d) return super().prepare_features(x) def fit_predict(self, x): return self.return_output(super().fit_predict(x)) def predict(self, x, **kwargs): return self.return_output(super().predict(x, **kwargs)) def forecast(self, x, n): return self.return_output(super().forecast(x, n))