diff --git a/aeon/forecasting/__init__.py b/aeon/forecasting/__init__.py index 0b134857dd..7d7cb7e6c4 100644 --- a/aeon/forecasting/__init__.py +++ b/aeon/forecasting/__init__.py @@ -1,13 +1,15 @@ """Forecasters.""" __all__ = [ - "NaiveForecaster", "BaseForecaster", + "NaiveForecaster", "RegressionForecaster", "ETSForecaster", "TVPForecaster", + "ARIMA", ] +from aeon.forecasting._arima import ARIMA from aeon.forecasting._ets import ETSForecaster from aeon.forecasting._naive import NaiveForecaster from aeon.forecasting._regression import RegressionForecaster diff --git a/aeon/forecasting/_arima.py b/aeon/forecasting/_arima.py new file mode 100644 index 0000000000..6f08af4325 --- /dev/null +++ b/aeon/forecasting/_arima.py @@ -0,0 +1,301 @@ +"""ARIMA. + +An implementation of the ARIMA forecasting algorithm. +""" + +__maintainer__ = ["alexbanwell1", "TonyBagnall"] +__all__ = ["ARIMA"] + +import numpy as np +from numba import njit + +from aeon.forecasting._extract_paras import _extract_arma_params +from aeon.forecasting._nelder_mead import nelder_mead +from aeon.forecasting.base import BaseForecaster + + +class ARIMA(BaseForecaster): + """AutoRegressive Integrated Moving Average (ARIMA) forecaster. + + ARIMA with fixed model structure and fitted parameters found with an + nelder mead optimizer to minimise the AIC. + + Parameters + ---------- + p : int, default=1, + Autoregressive (p) order of the ARIMA model + d : int, default=0, + Differencing (d) order of the ARIMA model + q : int, default=1, + Moving average (q) order of the ARIMA model + use_constant : bool = False, + Presence of a constant/intercept term in the model. + iterations : int, default = 200 + Maximum number of iterations to use in the Nelder-Mead parameter search. + + Attributes + ---------- + residuals_ : np.ndarray + Residual errors from the fitted model. + aic_ : float + Akaike Information Criterion for the fitted model. + c_ : float, default = 0 + Intercept term. + phi_ : np.ndarray + Coefficients for autoregressive terms (length p). + theta_ : np.ndarray + Coefficients for moving average terms (length q). + + References + ---------- + .. [1] R. J. Hyndman and G. Athanasopoulos, + Forecasting: Principles and Practice. OTexts, 2014. + https://otexts.com/fpp3/ + """ + + _tags = { + "capability:horizon": False, # cannot fit to a horizon other than 1 + } + + def __init__( + self, + p: int = 1, + d: int = 0, + q: int = 1, + use_constant: bool = False, + iterations: int = 200, + ): + self.p = p + self.d = d + self.q = q + self.use_constant = use_constant + self.iterations = iterations + self.phi_ = 0 + self.theta_ = 0 + self.c_ = 0 + self._series = [] + self._differenced_series = [] + self.residuals_ = [] + self.fitted_values_ = [] + self.aic_ = 0 + self._model = [] + self._parameters = [] + super().__init__(horizon=1, axis=1) + + def _fit(self, y, exog=None): + """Fit ARIMA forecaster to series y to predict one ahead using y. + + Parameters + ---------- + y : np.ndarray + A time series on which to learn a forecaster to predict horizon ahead + exog : np.ndarray, default =None + Not allowed for this forecaster + + Returns + ------- + self + Fitted ARIMA. + """ + self._series = np.array(y.squeeze(), dtype=np.float64) + # Model is an array of the (c,p,q) + self._model = np.array( + (1 if self.use_constant else 0, self.p, self.q), dtype=np.int32 + ) + self._differenced_series = np.diff(self._series, n=self.d) + # Nelder Mead returns the parameters in a single array + (self._parameters, self.aic_) = nelder_mead( + 0, + np.sum(self._model[:3]), + self._differenced_series, + self._model, + max_iter=self.iterations, + ) + # + (self.aic_, self.residuals_, self.fitted_values_) = _arima_model( + self._parameters, + self._differenced_series, + self._model, + ) + formatted_params = _extract_arma_params( + self._parameters, self._model + ) # Extract + # parameters + differenced_forecast = self.fitted_values_[-1] + + if self.d == 0: + forecast_value = differenced_forecast + elif self.d == 1: + forecast_value = differenced_forecast + self._series[-1] + else: # for d > 1, iteratively undifference + forecast_value = differenced_forecast + last_vals = self._series[-self.d :] + for _ in range(self.d): + forecast_value += last_vals[-1] - last_vals[-2] + # Shift values to avoid appending to list (efficient) + last_vals = np.roll(last_vals, -1) + last_vals[-1] = forecast_value # Extract the parameter values + self.forecast_ = forecast_value + if self.use_constant: + self.c_ = formatted_params[0][0] + self.phi_ = formatted_params[1][: self.p] + self.theta_ = formatted_params[2][: self.q] + + return self + + def _predict(self, y, exog=None): + """ + Predict the next step ahead for y. + + Parameters + ---------- + y : np.ndarray, default = None + A time series to predict the value of. y can be independent of the series + seen in fit. + exog : np.ndarray, default =None + Optional exogenous time series data assumed to be aligned with y + + Returns + ------- + float + Prediction 1 step ahead of the last value in y. + """ + y = y.squeeze() + p, q, d = self.p, self.q, self.d + phi, theta = self.phi_, self.theta_ + c = 0.0 + if self.use_constant: + c = self.c_ + + # Apply differencing + if d > 0: + if len(y) <= d: + raise ValueError("Series too short for differencing.") + y_diff = np.diff(y, n=d) + else: + y_diff = y + + n = len(y_diff) + if n < max(p, q): + raise ValueError("Series too short for ARMA(p,q) with given order.") + + # Estimate in-sample residuals using model (fixed parameters) + residuals = np.zeros(n) + for t in range(max(p, q), n): + ar_part = np.dot(phi, y_diff[t - np.arange(1, p + 1)]) if p > 0 else 0.0 + ma_part = ( + np.dot(theta, residuals[t - np.arange(1, q + 1)]) if q > 0 else 0.0 + ) + pred = c + ar_part + ma_part + residuals[t] = y_diff[t] - pred + + # Use most recent p values of y_diff and q values of residuals to forecast t+1 + ar_forecast = np.dot(phi, y_diff[-np.arange(1, p + 1)]) if p > 0 else 0.0 + ma_forecast = np.dot(theta, residuals[-np.arange(1, q + 1)]) if q > 0 else 0.0 + + forecast_diff = c + ar_forecast + ma_forecast + + # Undifference the forecast + if d == 0: + return forecast_diff + elif d == 1: + return forecast_diff + y[-1] + else: + return forecast_diff + np.sum(y[-d:]) + + def _forecast(self, y, exog=None): + """Forecast one ahead for time series y.""" + self.fit(y, exog) + return float(self.forecast_) + + def iterative_forecast(self, y, prediction_horizon): + self.fit(y) + n = len(self._differenced_series) + p, q = self.p, self.q + phi, theta = self.phi_, self.theta_ + h = prediction_horizon + c = 0.0 + if self.use_constant: + c = self.c_ + + # Start with a copy of the original series and residuals + residuals = np.zeros(len(self.residuals_) + h) + residuals[: len(self.residuals_)] = self.residuals_ + forecast_series = np.zeros(n + h) + forecast_series[:n] = self._differenced_series + for i in range(h): + # Get most recent p values (lags) + t = n + i + ar_term = 0.0 + if p > 0: + ar_term = np.dot(phi, forecast_series[t - np.arange(1, p + 1)]) + # Get most recent q residuals (lags) + ma_term = 0.0 + if q > 0: + ma_term = np.dot(theta, residuals[t - np.arange(1, q + 1)]) + next_value = c + ar_term + ma_term + # Append prediction and a zero residual (placeholder) + forecast_series[n + i] = next_value + # Can't compute real residual during prediction, leave as zero + + # Correct differencing using forecast values + y_forecast_diff = forecast_series[n : n + h] + d = self.d + if d == 0: + return y_forecast_diff + else: # Correct undifferencing + # Start with last d values from original y + undiff = list(self._series[-d:]) + for i in range(h): + # Take the last d values and sum them + reconstructed = y_forecast_diff[i] + sum(undiff[-d:]) + undiff.append(reconstructed) + return np.array(undiff[d:]) + + +@njit(cache=True, fastmath=True) +def _aic(residuals, num_params): + """Calculate the log-likelihood of a model.""" + variance = np.mean(residuals**2) + likelihood = len(residuals) * (np.log(2 * np.pi) + np.log(variance) + 1) + return likelihood + 2 * num_params + + +# Define the ARIMA(p, d, q) likelihood function +@njit(cache=True, fastmath=True) +def _arima_model(params, data, model): + """Calculate the log-likelihood of an ARIMA model given the parameters.""" + formatted_params = _extract_arma_params(params, model) # Extract parameters + + # Initialize residuals + n = len(data) + num_predictions = n + 1 + residuals = np.zeros(num_predictions - 1) + fitted_values = np.zeros(num_predictions) + # Leave first max(p,q) residuals and fitted as zero. + for t in range(max(model[1], model[2]), num_predictions): + fitted_values[t] = _in_sample_forecast( + data, model, t, formatted_params, residuals + ) + if t != num_predictions - 1: + # Only calculate residuals for the predictions we have data for + residuals[t] = data[t] - fitted_values[t] + return _aic(residuals, len(params)), residuals, fitted_values + + +@njit(cache=True, fastmath=True) +def _in_sample_forecast(data, model, t, formatted_params, residuals): + """Efficient ARMA one-step forecast at time t for fitted model.""" + p = model[1] + q = model[2] + c = formatted_params[0][0] if model[0] else 0.0 + + ar_term = 0.0 + for j in range(min(p, t)): + ar_term += formatted_params[1, j] * data[t - j - 1] + + ma_term = 0.0 + for j in range(min(q, t)): + ma_term += formatted_params[2, j] * residuals[t - j - 1] + + return c + ar_term + ma_term diff --git a/aeon/forecasting/_extract_paras.py b/aeon/forecasting/_extract_paras.py new file mode 100644 index 0000000000..2e38a3e4dd --- /dev/null +++ b/aeon/forecasting/_extract_paras.py @@ -0,0 +1,27 @@ +"""ARIMA Utility Function.""" + +import numpy as np +from numba import njit + + +@njit(cache=True, fastmath=True) +def _extract_arma_params(params, model): + """Extract ARIMA parameters from the parameter vector.""" + n_parts = len(model) + starts = np.zeros(n_parts, dtype=np.int32) + for i in range(1, n_parts): + starts[i] = starts[i - 1] + model[i - 1] + + max_len = np.max(model) + result = np.empty((n_parts, max_len), dtype=params.dtype) + for i in range(n_parts): + for j in range(max_len): + result[i, j] = np.nan + + for i in range(n_parts): + length = model[i] + start = starts[i] + for j in range(length): + result[i, j] = params[start + j] + + return result diff --git a/aeon/forecasting/_hypo_tests.py b/aeon/forecasting/_hypo_tests.py new file mode 100644 index 0000000000..198a4472a8 --- /dev/null +++ b/aeon/forecasting/_hypo_tests.py @@ -0,0 +1,94 @@ +import numpy as np + + +def kpss_test(y, regression="c", lags=None): # Test if time series is stationary + """ + Perform the KPSS (Kwiatkowski-Phillips-Schmidt-Shin) test for stationarity. + + The KPSS test evaluates the null hypothesis that a time series is + (trend or level) stationary against the alternative of a unit root + (non-stationarity). It can test for either stationarity around a + constant (level stationarity) or arounda deterministic trend + (trend stationarity). + + Parameters + ---------- + y : array-like + Time series data to test for stationarity. + regression : str, default="c" + Indicates the null hypothesis for stationarity: + - "c" : Stationary around a constant (level stationarity) + - "ct" : Stationary around a constant and linear trend (trend stationarity) + lags : int or None, optional + Number of lags to use for the + HAC (heteroskedasticity and autocorrelation consistent) variance estimator. + If None, defaults to sqrt(n), where n is the sample size. + + Returns + ------- + kpss_stat : float + The KPSS test statistic. + stationary : bool + True if the series is judged stationary at the 5% significance level + (i.e., test statistic is below the critical value); False otherwise. + + Notes + ----- + - Uses asymptotic 5% critical values from Kwiatkowski et al. (1992): 0.463 for level + stationarity, 0.146 for trend stationarity. + - Returns True for stationary if the test statistic is below the 5% critical value. + + References + ---------- + Kwiatkowski, D., Phillips, P.C.B., Schmidt, P., & Shin, Y. (1992). + "Testing the null hypothesis of stationarity against the alternative + of a unit root." + Journal of Econometrics, 54(1–3), 159–178. + https://doi.org/10.1016/0304-4076(92)90104-Y + """ + y = np.asarray(y) + n = len(y) + + # Step 1: Fit regression model to estimate residuals + if regression == "c": # Constant + X = np.ones((n, 1)) + elif regression == "ct": # Constant + Trend + X = np.column_stack((np.ones(n), np.arange(1, n + 1))) + else: + raise ValueError("regression must be 'c' or 'ct'") + + beta = np.linalg.lstsq(X, y, rcond=None)[0] # Estimate regression coefficients + residuals = y - X @ beta # Get residuals (u_t) + + # Step 2: Compute cumulative sum of residuals (S_t) + S_t = np.cumsum(residuals) + + # Step 3: Estimate long-run variance (HAC variance) + if lags is None: + # lags = int(12 * (n / 100)**(1/4)) # Default statsmodels lag length + lags = int(np.sqrt(n)) # Default lag length + + gamma_0 = np.sum(residuals**2) / (n - X.shape[1]) # Lag-0 autocovariance + gamma = [np.sum(residuals[k:] * residuals[:-k]) / n for k in range(1, lags + 1)] + + # Bartlett weights + weights = [1 - (k / (lags + 1)) for k in range(1, lags + 1)] + + # Long-run variance + sigma_squared = gamma_0 + 2 * np.sum([w * g for w, g in zip(weights, gamma)]) + + # Step 4: Calculate the KPSS statistic + kpss_stat = np.sum(S_t**2) / (n**2 * sigma_squared) + + # 5% critical values for KPSS test + if regression == "ct": + # p. 162 Kwiatkowski et al. (1992): y_t = beta * t + r_t + e_t, + # where beta is the trend, r_t a random walk and e_t a stationary + # error term. + crit = 0.146 + else: # hypo == "c" + # special case of the model above, where beta = 0 (so the null + # hypothesis is that the data is stationary around r_0). + crit = 0.463 + + return kpss_stat, kpss_stat < crit diff --git a/aeon/forecasting/_loss_functions.py b/aeon/forecasting/_loss_functions.py new file mode 100644 index 0000000000..c88211b645 --- /dev/null +++ b/aeon/forecasting/_loss_functions.py @@ -0,0 +1,40 @@ +"""Loss functions for optimiser.""" + +import numpy as np +from numba import njit + +from aeon.forecasting._extract_paras import _extract_arma_params + +LOG_2PI = 1.8378770664093453 + + +@njit(cache=True, fastmath=True) +def _arima_fit(params, data, model): + """Calculate the AIC of an ARIMA model given the parameters.""" + formatted_params = _extract_arma_params(params, model) # Extract parameters + + # Initialize residuals + n = len(data) + residuals = np.zeros(n) + c = formatted_params[0][0] if model[0] else 0 + p = model[1] + q = model[2] + for t in range(n): + ar_term = 0.0 + max_ar = min(p, t) + for j in range(max_ar): + ar_term += formatted_params[1, j] * data[t - j - 1] + ma_term = 0.0 + max_ma = min(q, t) + for j in range(max_ma): + ma_term += formatted_params[2, j] * residuals[t - j - 1] + y_hat = c + ar_term + ma_term + residuals[t] = data[t] - y_hat + sse = 0.0 + start = max(p, q) + for i in range(start, n): + sse += residuals[i] * residuals[i] + variance = sse / (n - start) + likelihood = (n - start) * (LOG_2PI + np.log(variance) + 1.0) + k = len(params) + return likelihood + 2 * k diff --git a/aeon/forecasting/_nelder_mead.py b/aeon/forecasting/_nelder_mead.py new file mode 100644 index 0000000000..f617aa5727 --- /dev/null +++ b/aeon/forecasting/_nelder_mead.py @@ -0,0 +1,130 @@ +"""Optimisation algorithms for automatic parameter tuning.""" + +import numpy as np +from numba import njit + +from aeon.forecasting._loss_functions import _arima_fit + + +@njit(cache=True, fastmath=True) +def dispatch_loss(fn_id, params, data, model): + if fn_id == 0: + return _arima_fit(params, data, model) + else: + raise ValueError("Unknown loss function ID") + + +@njit(cache=True, fastmath=True) +def nelder_mead( + loss_id, + num_params, + data, + model, + tol=1e-6, + max_iter=500, +): + """ + Perform optimisation using the Nelder–Mead simplex algorithm. + + This function minimises a given loss (objective) function using the Nelder–Mead + algorithm, a derivative-free method that iteratively refines a simplex of candidate + solutions. The implementation supports unconstrained minimisation of functions + with a fixed number of parameters. + + Parameters + ---------- + loss_id : int + ID for loss function to optimise, used by dispatch_loss. + num_params : int + The number of parameters (dimensions) in the optimisation problem. + data : np.ndarray + The input data used by the loss function. The shape and content depend on the + specific loss function being minimised. + model : np.ndarray + The model or context in which the loss function operates. This could be any + other object that the `loss_function` requires to compute its value. + The exact type and structure of `model` should be compatible with the + `loss_function`. + tol : float, optional (default=1e-6) + Tolerance for convergence. The algorithm stops when the maximum difference + between function values at simplex vertices is less than `tol`. + max_iter : int, optional (default=500) + Maximum number of iterations to perform. + + Returns + ------- + best_params : np.ndarray, shape (`num_params`,) + The parameter vector that minimises the loss function. + best_value : float + The value of the loss function at the optimal parameter vector. + + Notes + ----- + - The initial simplex is constructed by setting each parameter to 0.5, + with one additional point per dimension at 0.6 for that dimension. + - This implementation does not support constraints or bounds on the parameters. + - The algorithm does not guarantee finding a global minimum. + + References + ---------- + .. [1] Nelder, J. A. and Mead, R. (1965). + A Simplex Method for Function Minimization. + The Computer Journal, 7(4), 308–313. + https://doi.org/10.1093/comjnl/7.4.308 + """ + points = np.full((num_params + 1, num_params), 0.5) + for i in range(num_params): + points[i + 1][i] = 0.6 + values = np.empty(len(points), dtype=np.float64) + for i in range(len(points)): + values[i] = dispatch_loss(loss_id, points[i].copy(), data, model) + for i in range(max_iter): + # Order simplex by function values + order = np.argsort(values) + points = points[order] + values = values[order] + + # Centroid of the best n points + centre_point = points[:-1].sum(axis=0) / len(points[:-1]) + + # Reflection + # centre + distance between centre and largest value + reflected_point = centre_point + (centre_point - points[-1]) + reflected_value = dispatch_loss(0, reflected_point, data, model) + # if between best and second best, use reflected value + if len(values) > 1 and values[0] <= reflected_value < values[-2]: + points[-1] = reflected_point + values[-1] = reflected_value + continue + # Expansion + # Otherwise if it is better than the best value + if reflected_value < values[0]: + expanded_point = centre_point + 2 * (reflected_point - centre_point) + expanded_value = dispatch_loss(0, expanded_point, data, model) + # if less than reflected value use expanded, otherwise go back to reflected + if expanded_value < reflected_value: + points[-1] = expanded_point + values[-1] = expanded_value + else: + points[-1] = reflected_point + values[-1] = reflected_value + continue + # Contraction + # Otherwise if reflection is worse than all current values + contracted_point = centre_point - 0.5 * (centre_point - points[-1]) + contracted_value = dispatch_loss(0, contracted_point, data, model) + # If contraction is better use that otherwise move to shrinkage + if contracted_value < values[-1]: + points[-1] = contracted_point + values[-1] = contracted_value + continue + + # Shrinkage + for i in range(1, len(points)): + points[i] = points[0] - 0.5 * (points[0] - points[i]) + values[i] = dispatch_loss(0, points[i], data, model) + + # Convergence check + if np.max(np.abs(values - values[0])) < tol: + break + return points[0], values[0] diff --git a/aeon/forecasting/_seasonality.py b/aeon/forecasting/_seasonality.py new file mode 100644 index 0000000000..356b1a40d2 --- /dev/null +++ b/aeon/forecasting/_seasonality.py @@ -0,0 +1,101 @@ +"""Seasonality Tools. + +Includes autocorrelation function (ACF) and seasonal period estimation. +""" + +import numpy as np +from numba import njit + + +@njit(cache=True, fastmath=True) +def acf(X, max_lag): + """ + Compute the sample autocorrelation function (ACF) of a time series. + + Up to a specified maximum lag. + + The autocorrelation at lag k is defined as the Pearson correlation + coefficient between the series and a lagged version of itself. + If both segments at a given lag have zero variance, the function + returns 1 for that lag. If only one segment has zero variance, + the function returns 0. + + Parameters + ---------- + X : array-like, shape (n_samples,) + The input time series data. + max_lag : int + The maximum lag (number of steps) for which to + compute the autocorrelation. + + Returns + ------- + acf_values : np.ndarray, shape (max_lag,) + The autocorrelation values for lags 1 through `max_lag`. + + Notes + ----- + The function handles cases where the lagged segments have zero + variance to avoid division by zero. + The returned values correspond to + lags 1, 2, ..., `max_lag` (not including lag 0). + """ + length = len(X) + X_t = np.zeros(max_lag, dtype=float) + for lag in range(1, max_lag + 1): + lag_length = length - lag + x1 = X[:-lag] + x2 = X[lag:] + s1 = np.sum(x1) + s2 = np.sum(x2) + m1 = s1 / lag_length + m2 = s2 / lag_length + ss1 = np.sum(x1 * x1) + ss2 = np.sum(x2 * x2) + v1 = ss1 - s1 * m1 + v2 = ss2 - s2 * m2 + v1_is_zero, v2_is_zero = v1 <= 1e-9, v2 <= 1e-9 + if v1_is_zero and v2_is_zero: # Both zero variance, + # so must be 100% correlated + X_t[lag - 1] = 1 + elif v1_is_zero or v2_is_zero: # One zero variance + # the other not + X_t[lag - 1] = 0 + else: + X_t[lag - 1] = np.sum((x1 - m1) * (x2 - m2)) / np.sqrt(v1 * v2) + return X_t + + +@njit(cache=True, fastmath=True) +def calc_seasonal_period(data): + """ + Estimate the seasonal period of a time series using autocorrelation analysis. + + This function computes the autocorrelation function (ACF) of + the input series up to lag 24. It then identifies peaks in the + ACF above the mean value, treating the first such peak + as the estimated seasonal period. If no peak is found, + a period of 1 is returned. + + Parameters + ---------- + data : array-like, shape (n_samples,) + The input time series data. + + Returns + ------- + period : int + The estimated seasonal period (lag) of the series. Returns 1 if no significant + peak is detected in the autocorrelation. + """ + lags = acf(data, 24) + lags = np.concatenate((np.array([1.0]), lags)) + peaks = [] + mean_lags = np.mean(lags) + for i in range(1, len(lags) - 1): # Skip the first (lag 0) and last elements + if lags[i] >= lags[i - 1] and lags[i] >= lags[i + 1] and lags[i] > mean_lags: + peaks.append(i) + if not peaks: + return 1 + else: + return peaks[0] diff --git a/aeon/forecasting/tests/test_arima.py b/aeon/forecasting/tests/test_arima.py new file mode 100644 index 0000000000..73d99d7b50 --- /dev/null +++ b/aeon/forecasting/tests/test_arima.py @@ -0,0 +1,113 @@ +"""Test the ARIMA forecaster.""" + +import numpy as np +import pytest + +from aeon.forecasting import ARIMA + +y = np.array( + [112, 118, 132, 129, 121, 135, 148, 148, 136, 119, 104, 118], dtype=np.float64 +) + + +def test_arima_zero_orders(): + """Test ARIMA(0,0,0) which should return the mean if constant is used.""" + model = ARIMA(p=0, d=0, q=0, use_constant=True) + model.fit(y) + forecast = model.predict(y) + assert np.isfinite(forecast) + assert abs(forecast - np.mean(y)) < 10 + + +@pytest.mark.parametrize( + "p, d, q, use_constant", + [ + (1, 0, 1, True), # basic ARIMA + (2, 1, 1, False), # no constant + (1, 2, 1, True), # higher-order differencing + ], +) +def test_arima_fit_and_predict_variants(p, d, q, use_constant): + """Test ARIMA fit and predict for various (p,d,q) and use_constant settings.""" + model = ARIMA(p=p, d=d, q=q, use_constant=use_constant) + model.fit(y) + forecast = model.forecast_ + assert isinstance(forecast, float) + assert np.isfinite(forecast) + + +def test_arima_iterative_forecast(): + """Test multi-step forecasting using iterative_forecast method.""" + model = ARIMA(p=1, d=1, q=1) + horizon = 3 + preds = model.iterative_forecast(y, prediction_horizon=horizon) + assert preds.shape == (horizon,) + assert np.all(np.isfinite(preds)) + + +@pytest.mark.parametrize( + "y_input, error_match", + [ + (np.array([1.0, 2.0]), "Series too short for differencing"), + (np.array([1.0, 2.0, 3.0]), "Series too short for ARMA"), + ], +) +def test_arima_too_short_series_errors(y_input, error_match): + """Test errors raised for too short input series.""" + model = ARIMA(p=3, d=2, q=3) + model.fit(y) + with pytest.raises(ValueError, match=error_match): + model._predict(y_input) + + +def test_forecast_attribute_set(): + """Test that calling _forecast sets the internal forecast_ attribute.""" + model = ARIMA(p=1, d=1, q=1) + forecast = model._forecast(y) + assert hasattr(model, "forecast_") + assert np.isclose(forecast, model.forecast_) + + +def test_iterative_forecast_with_d2(): + """Test iterative forecast output shape and validity with d=2.""" + model = ARIMA(p=1, d=2, q=1) + preds = model.iterative_forecast(y, prediction_horizon=5) + assert preds.shape == (5,) + assert np.all(np.isfinite(preds)) + + +@pytest.mark.parametrize( + "p, d, q, use_constant, expected_forecast", + [ + (1, 0, 1, False, 118.47506756), # precomputed from known ARIMA implementation + (2, 1, 1, False, 209.1099231455), # precomputed + (3, 0, 0, True, 137.47368045155), # precomputed + ], +) +def test_arima_fixed_paras(p, d, q, use_constant, expected_forecast): + """Test ARIMA fit/predict accuracy against known forecasts. + + expected values calculated with values fitted by Nelder-Mead: + + 1. phi = [0.99497524] theta [0.0691515] + 2. phi = [ 0.02898788 -0.4330671 ] theta [1.26699252] + 3. phi = [ 0.19202414 0.05207654 -0.07367897] theta [], constant 105.970867164 + + """ + model = ARIMA(p=p, d=d, q=q, use_constant=use_constant) + model.fit(y) + forecast = model.forecast_ + assert isinstance(forecast, float) + assert np.isfinite(forecast) + assert np.isclose(forecast, expected_forecast, atol=1e-6) + + +def test_arima_known_output(): + """Test ARIMA for fixed parameters. + + Test ARMIMA with forecast generated externally. + """ + model = ARIMA(p=1, d=0, q=1) + model.fit(y) + f = model.forecast_ + assert np.isclose(118.47506756, f)