194 lines
5.4 KiB
Python
194 lines
5.4 KiB
Python
import numpy as np
|
|
|
|
|
|
def difference(x, d=1):
|
|
if d == 0:
|
|
return x
|
|
else:
|
|
x = np.r_[x[0], np.diff(x)]
|
|
return difference(x, d - 1)
|
|
|
|
|
|
def undo_difference(x, d=1):
|
|
if d == 1:
|
|
return np.cumsum(x)
|
|
else:
|
|
x = np.cumsum(x)
|
|
return undo_difference(x, d - 1)
|
|
|
|
|
|
def lag_view(x, order):
|
|
"""
|
|
For every value X_i create a row that lags k values: [X_i-1, X_i-2, ... X_i-k]
|
|
"""
|
|
y = x.copy()
|
|
# Create features by shifting the window of `order` size by one step.
|
|
# This results in a 2D array [[t1, t2, t3], [t2, t3, t4], ... [t_k-2, t_k-1, t_k]]
|
|
x = np.array([y[-(i + order):][:order] for i in range(y.shape[0])])
|
|
|
|
# Reverse the array as we started at the end and remove duplicates.
|
|
# Note that we truncate the features [order -1:] and the labels [order]
|
|
# This is the shifting of the features with one time step compared to the labels
|
|
x = np.stack(x)[::-1][order - 1: -1]
|
|
y = y[order:]
|
|
|
|
return x, y
|
|
|
|
|
|
def least_squares(x, y):
|
|
return np.linalg.inv((x.T @ x)) @ (x.T @ y)
|
|
|
|
|
|
class LinearModel:
|
|
def __init__(self, fit_intercept=True):
|
|
self.fit_intercept = fit_intercept
|
|
self.beta = None
|
|
self.intercept_ = None
|
|
self.coef_ = None
|
|
|
|
def _prepare_features(self, x):
|
|
if self.fit_intercept:
|
|
x = np.hstack((np.ones((x.shape[0], 1)), x))
|
|
return x
|
|
|
|
def fit(self, x, y):
|
|
x = self._prepare_features(x)
|
|
self.beta = least_squares(x, y)
|
|
if self.fit_intercept:
|
|
self.intercept_ = self.beta[0]
|
|
self.coef_ = self.beta[1:]
|
|
else:
|
|
self.coef_ = self.beta
|
|
|
|
def predict(self, x):
|
|
x = self._prepare_features(x)
|
|
return x @ self.beta
|
|
|
|
def fit_predict(self, x, y):
|
|
self.fit(x, y)
|
|
return self.predict(x)
|
|
|
|
|
|
class ARMA(LinearModel):
|
|
def __init__(self, q, p):
|
|
"""
|
|
An ARIMA model.
|
|
:param q: (int) Order of the MA model.
|
|
:param p: (int) Order of the AR model.
|
|
"""
|
|
super().__init__(True)
|
|
self.p = p
|
|
self.q = q
|
|
self.ar = None
|
|
self.resid = None
|
|
|
|
def prepare_features(self, x):
|
|
ar_features = None
|
|
ma_features = None
|
|
|
|
# Determine the features and the epsilon terms for the MA process
|
|
if self.q > 0:
|
|
if self.ar is None:
|
|
self.ar = ARIMA(0, 0, self.p)
|
|
self.ar.fit_predict(x)
|
|
eps = self.ar.resid
|
|
eps[0] = 0
|
|
|
|
# prepend with zeros as there are no residuals_t-k in the first X_t
|
|
ma_features, _ = lag_view(np.r_[np.zeros(self.q), eps], self.q)
|
|
|
|
# Determine the features for the AR process
|
|
if self.p > 0:
|
|
# prepend with zeros as there are no X_t-k in the first X_t
|
|
ar_features = lag_view(np.r_[np.zeros(self.p), x], self.p)[0]
|
|
|
|
if ar_features is not None and ma_features is not None:
|
|
n = min(len(ar_features), len(ma_features))
|
|
ar_features = ar_features[:n]
|
|
ma_features = ma_features[:n]
|
|
features = np.hstack((ar_features, ma_features))
|
|
elif ma_features is not None:
|
|
n = len(ma_features)
|
|
features = ma_features[:n]
|
|
else:
|
|
n = len(ar_features)
|
|
features = ar_features[:n]
|
|
|
|
return features, x[:n]
|
|
|
|
def fit(self, x):
|
|
features, x = self.prepare_features(x)
|
|
super().fit(features, x)
|
|
return features
|
|
|
|
def fit_predict(self, x):
|
|
"""
|
|
Fit and transform input
|
|
:param x: (array) with time series.
|
|
"""
|
|
features = self.fit(x)
|
|
return self.predict(x, prepared=(features))
|
|
|
|
def predict(self, x, **kwargs):
|
|
"""
|
|
:param x: (array)
|
|
:kwargs:
|
|
prepared: (tpl) containing the features, eps and x
|
|
"""
|
|
features = kwargs.get('prepared', None)
|
|
if features is None:
|
|
features, x = self.prepare_features(x)
|
|
|
|
y = super().predict(features)
|
|
self.resid = x - y
|
|
|
|
return y
|
|
|
|
def forecast(self, x, n):
|
|
"""
|
|
Forecast the time series.
|
|
|
|
:param x: (array) Current time steps.
|
|
:param n: (int) Number of time steps in the future.
|
|
"""
|
|
features, x = self.prepare_features(x)
|
|
y = super().predict(features)
|
|
|
|
# Append n time steps as zeros. Because the epsilon terms are unknown
|
|
y = np.r_[y, np.zeros(n)]
|
|
for i in range(n):
|
|
feat = np.r_[y[-(self.p + n) + i: -n + i], np.zeros(self.q)]
|
|
y[x.shape[0] + i] = super().predict(feat[None, :])
|
|
return y
|
|
|
|
|
|
class ARIMA(ARMA):
|
|
def __init__(self, q, d, p):
|
|
"""
|
|
An ARIMA model.
|
|
:param q: (int) Order of the MA model.
|
|
:param p: (int) Order of the AR model.
|
|
:param d: (int) Number of times the data needs to be differenced.
|
|
"""
|
|
super().__init__(q, p)
|
|
self.d = d
|
|
|
|
def return_output(self, x):
|
|
if self.d > 0:
|
|
x = undo_difference(x, self.d)
|
|
return x
|
|
|
|
def prepare_features(self, x):
|
|
if self.d > 0:
|
|
x = difference(x, self.d)
|
|
return super().prepare_features(x)
|
|
|
|
def fit_predict(self, x):
|
|
return self.return_output(super().fit_predict(x))
|
|
|
|
def predict(self, x, **kwargs):
|
|
return self.return_output(super().predict(x, **kwargs))
|
|
|
|
def forecast(self, x, n):
|
|
return self.return_output(super().forecast(x, n))
|