Files
vanilla-machine-learning/time_series/ARIMA.py
2018-10-03 14:53:48 +02:00

194 lines
5.4 KiB
Python

import numpy as np
def difference(x, d=1):
if d == 0:
return x
else:
x = np.r_[x[0], np.diff(x)]
return difference(x, d - 1)
def undo_difference(x, d=1):
if d == 1:
return np.cumsum(x)
else:
x = np.cumsum(x)
return undo_difference(x, d - 1)
def lag_view(x, order):
"""
For every value X_i create a row that lags k values: [X_i-1, X_i-2, ... X_i-k]
"""
y = x.copy()
# Create features by shifting the window of `order` size by one step.
# This results in a 2D array [[t1, t2, t3], [t2, t3, t4], ... [t_k-2, t_k-1, t_k]]
x = np.array([y[-(i + order):][:order] for i in range(y.shape[0])])
# Reverse the array as we started at the end and remove duplicates.
# Note that we truncate the features [order -1:] and the labels [order]
# This is the shifting of the features with one time step compared to the labels
x = np.stack(x)[::-1][order - 1: -1]
y = y[order:]
return x, y
def least_squares(x, y):
return np.linalg.inv((x.T @ x)) @ (x.T @ y)
class LinearModel:
def __init__(self, fit_intercept=True):
self.fit_intercept = fit_intercept
self.beta = None
self.intercept_ = None
self.coef_ = None
def _prepare_features(self, x):
if self.fit_intercept:
x = np.hstack((np.ones((x.shape[0], 1)), x))
return x
def fit(self, x, y):
x = self._prepare_features(x)
self.beta = least_squares(x, y)
if self.fit_intercept:
self.intercept_ = self.beta[0]
self.coef_ = self.beta[1:]
else:
self.coef_ = self.beta
def predict(self, x):
x = self._prepare_features(x)
return x @ self.beta
def fit_predict(self, x, y):
self.fit(x, y)
return self.predict(x)
class ARMA(LinearModel):
def __init__(self, q, p):
"""
An ARIMA model.
:param q: (int) Order of the MA model.
:param p: (int) Order of the AR model.
"""
super().__init__(True)
self.p = p
self.q = q
self.ar = None
self.resid = None
def prepare_features(self, x):
ar_features = None
ma_features = None
# Determine the features and the epsilon terms for the MA process
if self.q > 0:
if self.ar is None:
self.ar = ARIMA(0, 0, self.p)
self.ar.fit_predict(x)
eps = self.ar.resid
eps[0] = 0
# prepend with zeros as there are no residuals_t-k in the first X_t
ma_features, _ = lag_view(np.r_[np.zeros(self.q), eps], self.q)
# Determine the features for the AR process
if self.p > 0:
# prepend with zeros as there are no X_t-k in the first X_t
ar_features = lag_view(np.r_[np.zeros(self.p), x], self.p)[0]
if ar_features is not None and ma_features is not None:
n = min(len(ar_features), len(ma_features))
ar_features = ar_features[:n]
ma_features = ma_features[:n]
features = np.hstack((ar_features, ma_features))
elif ma_features is not None:
n = len(ma_features)
features = ma_features[:n]
else:
n = len(ar_features)
features = ar_features[:n]
return features, x[:n]
def fit(self, x):
features, x = self.prepare_features(x)
super().fit(features, x)
return features
def fit_predict(self, x):
"""
Fit and transform input
:param x: (array) with time series.
"""
features = self.fit(x)
return self.predict(x, prepared=(features))
def predict(self, x, **kwargs):
"""
:param x: (array)
:kwargs:
prepared: (tpl) containing the features, eps and x
"""
features = kwargs.get('prepared', None)
if features is None:
features, x = self.prepare_features(x)
y = super().predict(features)
self.resid = x - y
return y
def forecast(self, x, n):
"""
Forecast the time series.
:param x: (array) Current time steps.
:param n: (int) Number of time steps in the future.
"""
features, x = self.prepare_features(x)
y = super().predict(features)
# Append n time steps as zeros. Because the epsilon terms are unknown
y = np.r_[y, np.zeros(n)]
for i in range(n):
feat = np.r_[y[-(self.p + n) + i: -n + i], np.zeros(self.q)]
y[x.shape[0] + i] = super().predict(feat[None, :])
return y
class ARIMA(ARMA):
def __init__(self, q, d, p):
"""
An ARIMA model.
:param q: (int) Order of the MA model.
:param p: (int) Order of the AR model.
:param d: (int) Number of times the data needs to be differenced.
"""
super().__init__(q, p)
self.d = d
def return_output(self, x):
if self.d > 0:
x = undo_difference(x, self.d)
return x
def prepare_features(self, x):
if self.d > 0:
x = difference(x, self.d)
return super().prepare_features(x)
def fit_predict(self, x):
return self.return_output(super().fit_predict(x))
def predict(self, x, **kwargs):
return self.return_output(super().predict(x, **kwargs))
def forecast(self, x, n):
return self.return_output(super().forecast(x, n))