From a112c650e6ccba6fa4128f152cdb8eb59e11010d Mon Sep 17 00:00:00 2001 From: Enzo Busseti Date: Wed, 21 Aug 2024 10:43:34 +0400 Subject: [PATCH 1/2] dataframe selector companion class design --- cvxportfolio/forecast.py | 259 ++++++++++++++++++++++++++------------- 1 file changed, 171 insertions(+), 88 deletions(-) diff --git a/cvxportfolio/forecast.py b/cvxportfolio/forecast.py index 68e977ae9..24d48b0e2 100644 --- a/cvxportfolio/forecast.py +++ b/cvxportfolio/forecast.py @@ -310,6 +310,116 @@ def _online_update(self, **kwargs): """Update forecast from period before.""" raise NotImplementedError # pragma: no cover +class DataFrameSelector: + """Companion class used to specify the DataFrame for the forecast.""" + + def past_history(self, **kwargs): + """Return all past history of the dataframe we work with. + + :param kwargs: All arguments passed to + :meth:`estimator.Estimator.values_in_time`. + :type kwargs: dict + + :returns: All past history (relative to the point in time in the + back-test) of the DataFrame. + :rtype: pd.DataFrame + """ + raise NotImplementedError # pragma: no cover + + def last_row(self, **kwargs): + """Return last row of the dataframe we work with. + + You may redefine it if obtaining the full dataframe is expensive, + during online update (in most cases) only this method is required. + + :param kwargs: All arguments passed to + :meth:`estimator.Estimator.values_in_time`. + :type kwargs: dict + + :returns: Most recent row, relative to the point in time in the + back-test, of the DataFrame returned by :meth:`past_history`. + :rtype: pd.Series + """ + return self.past_history(**kwargs).iloc[-1] + +class PastReturns(DataFrameSelector): + """Operate on past returns.""" + + def past_history( # pylint: disable=arguments-differ + self, past_returns, **kwargs): + """Past returns, skipping cash. + + :param past_returns: Past market returns, including cash. + :type past_returns: pd.DataFrame + :param kwargs: All other arguments passed to + :meth:`estimator.Estimator.values_in_time`. + :type kwargs: dict + + :raises DataError: If the forecaster is used in a policy without market + data server. + + :returns: Past market returns. + :rtype: pd.DataFrame + """ + if past_returns is None: + raise DataError( + "Forecaster needs past returns; can only be used if" + + " MarketData is not None.") + return past_returns.iloc[:, :-1] + +class PastReturnsSquared(PastReturns): + """Operate on past returns squared.""" + + def past_history( # pylint: disable=arguments-differ + self, **kwargs): + """Past returns squared, skipping cash. + + :param kwargs: All arguments passed to + :meth:`estimator.Estimator.values_in_time`. + :type kwargs: dict + + :returns: Past market returns, squared. + :rtype: pd.DataFrame + """ + return super().past_history(**kwargs)**2 + + def last_row(self, **kwargs): + """Most recent past returns. + + :param kwargs: All arguments passed to + :meth:`estimator.Estimator.values_in_time`. + :type kwargs: dict + + :returns: Most recent market returns, squared. + :rtype: pd.Series + """ + return super().past_history(**kwargs).iloc[-1]**2 + +class PastVolumes(DataFrameSelector): + """Operate on past volumes.""" + + def past_history( # pylint: disable=arguments-differ + self, past_volumes, **kwargs): + """Full history of past market volumes. + + :param past_volumes: Past market volumes. + :type past_volumes: pd.DataFrame + :param kwargs: All other arguments passed to + :meth:`estimator.Estimator.values_in_time`. + :type kwargs: dict + + :raises DataError: If the forecaster is used in a policy whose market + data server does not provides past volumes. + + :returns: Past market volumes. + :rtype: pd.DataFrame + """ + if past_volumes is None: + raise DataError( + "Forecaster needs past volumes; can only be used if MarketData" + + " provides market volumes.") + return past_volumes + class SumForecaster(UpdatingForecaster): """Base forecaster that implements a sum operation. @@ -318,13 +428,17 @@ class SumForecaster(UpdatingForecaster): these: they are means, so both their numerator and denominator are subclasses of this. + :param dataframe_selector: Instance of DataFrame selector for this + forecaster. + :type dataframe_selector: DataFrameSelector :param half_life: Length of the exponential smoothing half-life. :type half_life: pd.Timedelta or np.inf :param rolling: Length of the rolling window. :type rolling: pd.Timedelta or np.inf """ - def __init__(self, half_life=np.inf, rolling=np.inf): + def __init__(self, dataframe_selector, half_life=np.inf, rolling=np.inf): + self._dataframe_selector = dataframe_selector self.half_life = half_life self.rolling = rolling @@ -336,22 +450,22 @@ def _single_compute(self, last_row): """Compute the value for a single observation.""" raise NotImplementedError # pragma: no cover - def _dataframe_selector(self, **kwargs): - """Return dataframe we work with. + # def _dataframe_selector(self, **kwargs): + # """Return dataframe we work with. - This method receives the **kwargs passed to :meth:`values_in_time`. - """ - raise NotImplementedError # pragma: no cover + # This method receives the **kwargs passed to :meth:`values_in_time`. + # """ + # raise NotImplementedError # pragma: no cover - def _get_last_row(self, **kwargs): - """Return last row of the dataframe we work with. + # def _get_last_row(self, **kwargs): + # """Return last row of the dataframe we work with. - This method receives the **kwargs passed to :meth:`values_in_time`. + # This method receives the **kwargs passed to :meth:`values_in_time`. - You may redefine it if obtaining the full dataframe is expensive, - during online update (in most cases) only this method is required. - """ - return self._dataframe_selector(**kwargs).iloc[-1] + # You may redefine it if obtaining the full dataframe is expensive, + # during online update (in most cases) only this method is required. + # """ + # return self._dataframe_selector(**kwargs).iloc[-1] def _emw_weights(self, index, t): """Get weights to apply to the past observations for EMW.""" @@ -363,7 +477,7 @@ def _initial_compute(self, t, **kwargs): # pylint: disable=arguments-differ This method receives the **kwargs passed to :meth:`values_in_time`. """ - df = self._dataframe_selector(t=t, **kwargs) + df = self._dataframe_selector.past_history(t=t, **kwargs) # Moving average window logic if _is_timedelta_or_inf(_resolve_hyperpar(self.rolling)): @@ -387,7 +501,7 @@ def _online_update(self, t, **kwargs): # pylint: disable=arguments-differ This method receives the **kwargs passed to :meth:`values_in_time`. """ - last_row = self._get_last_row(t=t, **kwargs) + last_row = self._dataframe_selector.last_row(t=t, **kwargs) # if emw discount past if _is_timedelta_or_inf(_resolve_hyperpar(self.half_life)): @@ -405,7 +519,7 @@ def _online_update(self, t, **kwargs): # pylint: disable=arguments-differ # Moving average window logic: subtract elements that have gone out if _is_timedelta_or_inf(_resolve_hyperpar(self.rolling)): - df = self._dataframe_selector(t=t, **kwargs) + df = self._dataframe_selector.past_history(t=t, **kwargs) result = self._remove_part_gone_out_of_ma(result, df, t) # update internal timestamp @@ -431,53 +545,6 @@ def _remove_part_gone_out_of_ma(self, result, df, t): return result -class OnPastReturns(SumForecaster): # pylint: disable=abstract-method - """Intermediate class, operate on past returns.""" - - def _dataframe_selector( # pylint: disable=arguments-differ - self, past_returns, **kwargs): - """Past returns, skipping cash. - - This method receives the full arguments to :meth:`values_in_time`. - """ - if past_returns is None: - raise DataError( - f"{self.__class__.__name__} can only be used if MarketData is" - + " not None.") - return past_returns.iloc[:, :-1] - -class OnPastReturnsSquared(OnPastReturns): # pylint: disable=abstract-method - """Intermediate class, operate on past returns squared.""" - - def _dataframe_selector( # pylint: disable=arguments-differ - self, **kwargs): - """Past returns squared, skipping cash. - - This method receives the full arguments to :meth:`values_in_time`. - """ - return super()._dataframe_selector(**kwargs)**2 - - def _get_last_row(self, **kwargs): - """Most recent past returns. - - This method receives the full arguments to :meth:`values_in_time`. - """ - return super()._dataframe_selector(**kwargs).iloc[-1]**2 - -class OnPastVolumes(SumForecaster): # pylint: disable=abstract-method - """Intermediate class, operate on past volumes.""" - - def _dataframe_selector( # pylint: disable=arguments-differ - self, past_volumes, **kwargs): - """Past volumes. - - This method receives the full arguments to :meth:`values_in_time`. - """ - if past_volumes is None: - raise DataError( - f"{self.__class__.__name__} can only be used if MarketData" - + " provides market volumes.") - return past_volumes class VectorCount(SumForecaster): # pylint: disable=abstract-method """Intermediate class, count of non-NaN values of vectors.""" @@ -525,11 +592,11 @@ def values_in_time( # pylint: disable=arguments-differ return result -class CountPastReturns(VectorCount, OnPastReturns): - """Count non-nan past returns, excluding cash.""" +# class CountPastReturns(VectorCount, OnPastReturns): +# """Count non-nan past returns, excluding cash.""" -class CountPastVolumes(VectorCount, OnPastVolumes): - """Count non-nan past volumes.""" +# class CountPastVolumes(VectorCount, OnPastVolumes): +# """Count non-nan past volumes.""" class VectorSum(SumForecaster): # pylint: disable=abstract-method """Intermediate class, sum of non-NaN values of vectors.""" @@ -544,14 +611,14 @@ def _single_compute(self, last_row): """Update with last observation.""" return last_row.fillna(0.) -class SumPastReturns(VectorSum, OnPastReturns): - """Sum non-nan past returns, excluding cash.""" +# class SumPastReturns(VectorSum, OnPastReturns): +# """Sum non-nan past returns, excluding cash.""" -class SumPastReturnsSquared(VectorSum, OnPastReturnsSquared): - """Sum non-nan past returns squared, excluding cash.""" +# class SumPastReturnsSquared(VectorSum, OnPastReturnsSquared): +# """Sum non-nan past returns squared, excluding cash.""" -class SumPastVolumes(VectorSum, OnPastVolumes): - """Sum non-nan past volumes.""" +# class SumPastVolumes(VectorSum, OnPastVolumes): +# """Sum non-nan past volumes.""" class HistoricalMeanReturn(BaseForecast): r"""Historical means of non-cash returns. @@ -581,11 +648,14 @@ class HistoricalMeanReturn(BaseForecast): :type rolling: pandas.Timedelta or np.inf """ def __init__(self, half_life=np.inf, rolling=np.inf): + self._dataframe_selector = PastReturns() self.half_life = half_life self.rolling = rolling - self._numerator = SumPastReturns( + self._numerator = VectorSum( + dataframe_selector = self._dataframe_selector, half_life=half_life, rolling=rolling) - self._denominator = CountPastReturns( + self._denominator = VectorCount( + dataframe_selector = self._dataframe_selector, half_life=half_life, rolling=rolling) def values_in_time( # pylint: disable=arguments-differ @@ -637,12 +707,15 @@ class HistoricalVariance(BaseForecast): :type kelly: bool """ def __init__(self, half_life=np.inf, rolling=np.inf, kelly=True): + self._dataframe_selector = PastReturnsSquared() self.half_life = half_life self.rolling = rolling self.kelly = kelly - self._denominator = CountPastReturns( + self._denominator = VectorCount( + dataframe_selector = self._dataframe_selector, half_life=half_life, rolling=rolling) - self._numerator = SumPastReturnsSquared( + self._numerator = VectorSum( + dataframe_selector = self._dataframe_selector, half_life=half_life, rolling=rolling) if not self.kelly: self._correction = HistoricalMeanReturn( @@ -787,11 +860,14 @@ class HistoricalMeanVolume(BaseForecast): :type rolling: pandas.Timedelta or np.inf """ def __init__(self, half_life=np.inf, rolling=np.inf): + self._dataframe_selector = PastVolumes() self.half_life = half_life self.rolling = rolling - self._numerator = SumPastVolumes( + self._numerator = VectorSum( + dataframe_selector = self._dataframe_selector, half_life=half_life, rolling=rolling) - self._denominator = CountPastVolumes( + self._denominator = VectorCount( + dataframe_selector = self._dataframe_selector, half_life=half_life, rolling=rolling) def values_in_time( # pylint: disable=arguments-differ @@ -827,8 +903,8 @@ def _single_compute(self, last_row): return np.outer(nonnull, nonnull) -class JointCountPastReturns(JointCount, OnPastReturns): - """Compute denominator of (Kelly) covariance of past returns.""" +# class JointCountPastReturns(JointCount, OnPastReturns): +# """Compute denominator of (Kelly) covariance of past returns.""" class JointSum(SumForecaster): # pylint: disable=abstract-method """Intermediate class: joint sum for the numerator of covariances.""" @@ -846,8 +922,8 @@ def _single_compute(self, last_row): filled = last_row.fillna(0.) return np.outer(filled, filled) -class JointSumPastReturns(JointSum, OnPastReturns): - """Compute numerator of (Kelly) covariance of past returns.""" +# class JointSumPastReturns(JointSum, OnPastReturns): +# """Compute numerator of (Kelly) covariance of past returns.""" class JointMean(SumForecaster): # pylint: disable=abstract-method """Intermediate class: corrector for non-Kelly covariance.""" @@ -865,8 +941,8 @@ def _single_compute(self, last_row): \mathbf{E}[r^{i}]\mathbf{E}[r^{j}]`.""" return last_row.fillna(0.) -class JointMeanReturns(JointMean, OnPastReturns): - """Compute corrector for non-Kelly covariance.""" +# class JointMeanReturns(JointMean, OnPastReturns): +# """Compute corrector for non-Kelly covariance.""" class HistoricalCovariance(BaseForecast): r"""Historical covariance matrix. @@ -889,15 +965,19 @@ class HistoricalCovariance(BaseForecast): """ def __init__(self, half_life=np.inf, rolling=np.inf, kelly=True): + self._dataframe_selector = PastReturns() self.half_life = half_life self.rolling = rolling self.kelly = kelly - self._denominator = JointCountPastReturns( + self._denominator = JointCount( + dataframe_selector = self._dataframe_selector, half_life=half_life, rolling=rolling) - self._numerator = JointSumPastReturns( + self._numerator = JointSum( + dataframe_selector = self._dataframe_selector, half_life=half_life, rolling=rolling) if not self.kelly: - self._correction = JointMeanReturns( + self._correction = JointMean( + dataframe_selector = self._dataframe_selector, half_life=half_life, rolling=rolling) def values_in_time( # pylint: disable=arguments-differ @@ -964,6 +1044,9 @@ def name(self): # probably can restate this as another intermediate class, not inheriting # from OnPastReturns, and define specialized ones for other raw dataframes +class OnPastReturns: + pass # TMP + class OnWeightedPastReturns(OnPastReturns): # pylint: disable=abstract-method """Intermediate class, operate on past returns weighted by regressor.""" From ef243222f8aa6193066c079642b77d38aa9387a6 Mon Sep 17 00:00:00 2001 From: Enzo Busseti Date: Wed, 21 Aug 2024 10:53:25 +0400 Subject: [PATCH 2/2] passes tests, already cleaner --- cvxportfolio/forecast.py | 44 ++++++++++++++++++++++------------------ 1 file changed, 24 insertions(+), 20 deletions(-) diff --git a/cvxportfolio/forecast.py b/cvxportfolio/forecast.py index 24d48b0e2..9baa7c410 100644 --- a/cvxportfolio/forecast.py +++ b/cvxportfolio/forecast.py @@ -1044,25 +1044,26 @@ def name(self): # probably can restate this as another intermediate class, not inheriting # from OnPastReturns, and define specialized ones for other raw dataframes -class OnPastReturns: - pass # TMP -class OnWeightedPastReturns(OnPastReturns): # pylint: disable=abstract-method +class WeightedPastReturns(PastReturns): # pylint: disable=abstract-method """Intermediate class, operate on past returns weighted by regressor.""" # could put the __init__ we use in derived classes here, but then # would have to be careful to use correct inheritance order for MRO # this needs to be populated by __init__ of derived class - regressor = None + # regressor = None - def _dataframe_selector( # pylint: disable=arguments-differ + def __init__(self, regressor): + self.regressor = regressor + + def past_history( # pylint: disable=arguments-differ self, **kwargs): """Past returns, skipping cash, weighted by regressor. This method receives the full arguments to :meth:`values_in_time`. """ - raw_past_df = super()._dataframe_selector(**kwargs) + raw_past_df = super().past_history(**kwargs) regressor_history = self.regressor.get_all_history( raw_past_df.index) # with the dropna we remove (old) observations for which regressor had @@ -1085,19 +1086,19 @@ def _dataframe_selector( # pylint: disable=arguments-differ # breakpoint() # return raw_last_row * regressor_on_last_row -class CountWeightedPastReturns(VectorCount, OnWeightedPastReturns): - """Count non-nan past returns, excluding cash, weighted by regressor.""" +# class CountWeightedPastReturns(VectorCount, OnWeightedPastReturns): +# """Count non-nan past returns, excluding cash, weighted by regressor.""" - def __init__(self, regressor, **kwargs): - self.regressor = regressor - super().__init__(**kwargs) # this goes to SumForecaster +# def __init__(self, regressor, **kwargs): +# self.regressor = regressor +# super().__init__(**kwargs) # this goes to SumForecaster -class SumWeightedPastReturns(VectorSum, OnWeightedPastReturns): - """Sum non-nan past returns, excluding cash, weighted by regressor.""" +# class SumWeightedPastReturns(VectorSum, OnWeightedPastReturns): +# """Sum non-nan past returns, excluding cash, weighted by regressor.""" - def __init__(self, regressor, **kwargs): - self.regressor = regressor - super().__init__(**kwargs) # this goes to SumForecaster +# def __init__(self, regressor, **kwargs): +# self.regressor = regressor +# super().__init__(**kwargs) # this goes to SumForecaster # We can reproduce this design pattern for other base forecasters @@ -1127,10 +1128,13 @@ def __init__(self, regressor, **kwargs): super().__init__(**kwargs) # regression part - self.regressor = regressor - self._numerator = SumWeightedPastReturns(regressor=regressor, **kwargs) - self._denominator = CountWeightedPastReturns( - regressor=regressor, **kwargs) + self._regressor = regressor + self._dataframe_selector = WeightedPastReturns( + regressor=self._regressor) + self._numerator = VectorSum( + dataframe_selector=self._dataframe_selector, **kwargs) + self._denominator = VectorCount( + dataframe_selector=self._dataframe_selector, **kwargs) # def _work_with(self, past_returns, **kwargs): # """Base DataFrame we work with."""