update arima
This commit is contained in:
@@ -5,6 +5,4 @@
|
||||
* [genetic algorithms](https://www.ritchievink.com/blog/2018/01/14/computer-build-me-a-bridge/)
|
||||
* [support vector machine](https://www.ritchievink.com/blog/2017/11/27/implementing-a-support-vector-machine-in-scala/)
|
||||
* [neural network](https://ritchievink.com/blog/2017/07/10/programming-a-neural-network-from-scratch/)
|
||||
|
||||
|
||||
|
||||
* [arima models](https://www.ritchievink.com/blog/2018/09/26/algorithm-breakdown-ar-ma-and-arima-models./)
|
||||
193
time_series/ARIMA.py
Normal file
193
time_series/ARIMA.py
Normal file
@@ -0,0 +1,193 @@
|
||||
import numpy as np
|
||||
|
||||
|
||||
def difference(x, d=1):
|
||||
if d == 0:
|
||||
return x
|
||||
else:
|
||||
x = np.r_[x[0], np.diff(x)]
|
||||
return difference(x, d - 1)
|
||||
|
||||
|
||||
def undo_difference(x, d=1):
|
||||
if d == 1:
|
||||
return np.cumsum(x)
|
||||
else:
|
||||
x = np.cumsum(x)
|
||||
return undo_difference(x, d - 1)
|
||||
|
||||
|
||||
def lag_view(x, order):
|
||||
"""
|
||||
For every value X_i create a row that lags k values: [X_i-1, X_i-2, ... X_i-k]
|
||||
"""
|
||||
y = x.copy()
|
||||
# Create features by shifting the window of `order` size by one step.
|
||||
# This results in a 2D array [[t1, t2, t3], [t2, t3, t4], ... [t_k-2, t_k-1, t_k]]
|
||||
x = np.array([y[-(i + order):][:order] for i in range(y.shape[0])])
|
||||
|
||||
# Reverse the array as we started at the end and remove duplicates.
|
||||
# Note that we truncate the features [order -1:] and the labels [order]
|
||||
# This is the shifting of the features with one time step compared to the labels
|
||||
x = np.stack(x)[::-1][order - 1: -1]
|
||||
y = y[order:]
|
||||
|
||||
return x, y
|
||||
|
||||
|
||||
def least_squares(x, y):
|
||||
return np.linalg.inv((x.T @ x)) @ (x.T @ y)
|
||||
|
||||
|
||||
class LinearModel:
|
||||
def __init__(self, fit_intercept=True):
|
||||
self.fit_intercept = fit_intercept
|
||||
self.beta = None
|
||||
self.intercept_ = None
|
||||
self.coef_ = None
|
||||
|
||||
def _prepare_features(self, x):
|
||||
if self.fit_intercept:
|
||||
x = np.hstack((np.ones((x.shape[0], 1)), x))
|
||||
return x
|
||||
|
||||
def fit(self, x, y):
|
||||
x = self._prepare_features(x)
|
||||
self.beta = least_squares(x, y)
|
||||
if self.fit_intercept:
|
||||
self.intercept_ = self.beta[0]
|
||||
self.coef_ = self.beta[1:]
|
||||
else:
|
||||
self.coef_ = self.beta
|
||||
|
||||
def predict(self, x):
|
||||
x = self._prepare_features(x)
|
||||
return x @ self.beta
|
||||
|
||||
def fit_predict(self, x, y):
|
||||
self.fit(x, y)
|
||||
return self.predict(x)
|
||||
|
||||
|
||||
class ARMA(LinearModel):
|
||||
def __init__(self, q, p):
|
||||
"""
|
||||
An ARIMA model.
|
||||
:param q: (int) Order of the MA model.
|
||||
:param p: (int) Order of the AR model.
|
||||
"""
|
||||
super().__init__(True)
|
||||
self.p = p
|
||||
self.q = q
|
||||
self.ar = None
|
||||
self.resid = None
|
||||
|
||||
def prepare_features(self, x):
|
||||
ar_features = None
|
||||
ma_features = None
|
||||
|
||||
# Determine the features and the epsilon terms for the MA process
|
||||
if self.q > 0:
|
||||
if self.ar is None:
|
||||
self.ar = ARIMA(0, 0, self.p)
|
||||
self.ar.fit_predict(x)
|
||||
eps = self.ar.resid
|
||||
eps[0] = 0
|
||||
|
||||
# prepend with zeros as there are no residuals_t-k in the first X_t
|
||||
ma_features, _ = lag_view(np.r_[np.zeros(self.q), eps], self.q)
|
||||
|
||||
# Determine the features for the AR process
|
||||
if self.p > 0:
|
||||
# prepend with zeros as there are no X_t-k in the first X_t
|
||||
ar_features = lag_view(np.r_[np.zeros(self.p), x], self.p)[0]
|
||||
|
||||
if ar_features is not None and ma_features is not None:
|
||||
n = min(len(ar_features), len(ma_features))
|
||||
ar_features = ar_features[:n]
|
||||
ma_features = ma_features[:n]
|
||||
features = np.hstack((ar_features, ma_features))
|
||||
elif ma_features is not None:
|
||||
n = len(ma_features)
|
||||
features = ma_features[:n]
|
||||
else:
|
||||
n = len(ar_features)
|
||||
features = ar_features[:n]
|
||||
|
||||
return features, x[:n]
|
||||
|
||||
def fit(self, x):
|
||||
features, x = self.prepare_features(x)
|
||||
super().fit(features, x)
|
||||
return features
|
||||
|
||||
def fit_predict(self, x):
|
||||
"""
|
||||
Fit and transform input
|
||||
:param x: (array) with time series.
|
||||
"""
|
||||
features = self.fit(x)
|
||||
return self.predict(x, prepared=(features))
|
||||
|
||||
def predict(self, x, **kwargs):
|
||||
"""
|
||||
:param x: (array)
|
||||
:kwargs:
|
||||
prepared: (tpl) containing the features, eps and x
|
||||
"""
|
||||
features = kwargs.get('prepared', None)
|
||||
if features is None:
|
||||
features, x = self.prepare_features(x)
|
||||
|
||||
y = super().predict(features)
|
||||
self.resid = x - y
|
||||
|
||||
return y
|
||||
|
||||
def forecast(self, x, n):
|
||||
"""
|
||||
Forecast the time series.
|
||||
|
||||
:param x: (array) Current time steps.
|
||||
:param n: (int) Number of time steps in the future.
|
||||
"""
|
||||
features, x = self.prepare_features(x)
|
||||
y = super().predict(features)
|
||||
|
||||
# Append n time steps as zeros. Because the epsilon terms are unknown
|
||||
y = np.r_[y, np.zeros(n)]
|
||||
for i in range(n):
|
||||
feat = np.r_[y[-(self.p + n) + i: -n + i], np.zeros(self.q)]
|
||||
y[x.shape[0] + i] = super().predict(feat[None, :])
|
||||
return y
|
||||
|
||||
|
||||
class ARIMA(ARMA):
|
||||
def __init__(self, q, d, p):
|
||||
"""
|
||||
An ARIMA model.
|
||||
:param q: (int) Order of the MA model.
|
||||
:param p: (int) Order of the AR model.
|
||||
:param d: (int) Number of times the data needs to be differenced.
|
||||
"""
|
||||
super().__init__(q, p)
|
||||
self.d = d
|
||||
|
||||
def return_output(self, x):
|
||||
if self.d > 0:
|
||||
x = undo_difference(x, self.d)
|
||||
return x
|
||||
|
||||
def prepare_features(self, x):
|
||||
if self.d > 0:
|
||||
x = difference(x, self.d)
|
||||
return super().prepare_features(x)
|
||||
|
||||
def fit_predict(self, x):
|
||||
return self.return_output(super().fit_predict(x))
|
||||
|
||||
def predict(self, x, **kwargs):
|
||||
return self.return_output(super().predict(x, **kwargs))
|
||||
|
||||
def forecast(self, x, n):
|
||||
return self.return_output(super().forecast(x, n))
|
||||
File diff suppressed because one or more lines are too long
Reference in New Issue
Block a user