From 4d0554ba272b656ee4e2bbaa41a29f5ba8643402 Mon Sep 17 00:00:00 2001 From: John Bogaardt Date: Fri, 22 Sep 2023 17:54:51 -0600 Subject: [PATCH] polars scaffolding --- chainladder/core/__init__.py | 9 +- chainladder/core/_slicing.py | 309 ------ chainladder/core/base.py | 438 --------- chainladder/core/core.py | 889 +++++++++++------- chainladder/core/tests/test_triangle.py | 1 + chainladder/core/triangle.py | 225 +++-- chainladder/legacy/__init__.py | 5 + chainladder/legacy/base.py | 455 +++++++++ chainladder/legacy/common.py | 215 +++++ chainladder/legacy/correlation.py | 314 +++++++ chainladder/{core => legacy}/display.py | 0 chainladder/{core => legacy}/dunders.py | 0 chainladder/legacy/io.py | 82 ++ chainladder/{core => legacy}/pandas.py | 0 chainladder/{core => legacy}/slice.py | 0 .../{core/legacy.py => legacy/triangle.py} | 6 +- chainladder/utils/utility_functions.py | 2 + 17 files changed, 1777 insertions(+), 1173 deletions(-) delete mode 100644 chainladder/core/_slicing.py create mode 100644 chainladder/legacy/__init__.py create mode 100644 chainladder/legacy/base.py create mode 100644 chainladder/legacy/common.py create mode 100644 chainladder/legacy/correlation.py rename chainladder/{core => legacy}/display.py (100%) rename chainladder/{core => legacy}/dunders.py (100%) create mode 100644 chainladder/legacy/io.py rename chainladder/{core => legacy}/pandas.py (100%) rename chainladder/{core => legacy}/slice.py (100%) rename chainladder/{core/legacy.py => legacy/triangle.py} (99%) diff --git a/chainladder/core/__init__.py b/chainladder/core/__init__.py index 514d58cc..3901fef2 100644 --- a/chainladder/core/__init__.py +++ b/chainladder/core/__init__.py @@ -1,6 +1,5 @@ -from chainladder.core.legacy import Triangle as LegacyTriangle # noqa (API import) from chainladder.core.triangle import Triangle -from chainladder.core.correlation import ( - DevelopmentCorrelation, - ValuationCorrelation, -) # noqa (API import) \ No newline at end of file +#from chainladder.core.correlation import ( +# DevelopmentCorrelation, +# ValuationCorrelation, +#) # noqa (API import) \ No newline at end of file diff --git a/chainladder/core/_slicing.py b/chainladder/core/_slicing.py deleted file mode 100644 index 8191a905..00000000 --- a/chainladder/core/_slicing.py +++ /dev/null @@ -1,309 +0,0 @@ -# Most of this file is taken from https://github.com/dask/dask/blob/master/dask/array/slicing.py -# See license at https://github.com/dask/dask/blob/master/LICENSE.txt - -import math -from collections.abc import Iterable -from numbers import Integral, Number - -import numpy as np - - -def normalize_index(idx, shape): - """Normalize slicing indexes - 1. Replaces ellipses with many full slices - 2. Adds full slices to end of index - 3. Checks bounding conditions - 4. Replaces numpy arrays with lists - 5. Posify's slices integers and lists - 6. Normalizes slices to canonical form - Examples - -------- - >>> normalize_index(1, (10,)) - (1,) - >>> normalize_index(-1, (10,)) - (9,) - >>> normalize_index([-1], (10,)) - (array([9]),) - >>> normalize_index(slice(-3, 10, 1), (10,)) - (slice(7, 10, 1),) - >>> normalize_index((Ellipsis, None), (10,)) - (slice(0, 10, 1), None) - """ - if not isinstance(idx, tuple): - idx = (idx,) - idx = replace_ellipsis(len(shape), idx) - n_sliced_dims = 0 - for i in idx: - if hasattr(i, "ndim") and i.ndim >= 1: - n_sliced_dims += i.ndim - elif i is None: - continue - else: - n_sliced_dims += 1 - idx += (slice(None),) * (len(shape) - n_sliced_dims) - if len([i for i in idx if i is not None]) > len(shape): - raise IndexError("Too many indices for array") - - none_shape = [] - i = 0 - for ind in idx: - if ind is not None: - none_shape.append(shape[i]) - i += 1 - else: - none_shape.append(None) - - for i, d in zip(idx, none_shape): - if d is not None: - check_index(i, d) - idx = tuple(map(sanitize_index, idx)) - idx = tuple(map(replace_none, idx, none_shape)) - idx = posify_index(none_shape, idx) - idx = tuple(map(clip_slice, idx, none_shape)) - return idx - - -def replace_ellipsis(n, index): - """Replace ... with slices, :, : ,: - >>> replace_ellipsis(4, (3, Ellipsis, 2)) - (3, slice(None, None, None), slice(None, None, None), 2) - >>> replace_ellipsis(2, (Ellipsis, None)) - (slice(None, None, None), slice(None, None, None), None) - """ - # Careful about using in or index because index may contain arrays - isellipsis = [i for i, ind in enumerate(index) if ind is Ellipsis] - if not isellipsis: - return index - elif len(isellipsis) > 1: - raise IndexError("an index can only have a single ellipsis ('...')") - else: - loc = isellipsis[0] - extra_dimensions = n - (len(index) - sum(i is None for i in index) - 1) - return ( - index[:loc] + (slice(None, None, None),) * extra_dimensions + index[loc + 1 :] - ) - - -def check_index(ind, dimension): - """Check validity of index for a given dimension - Examples - -------- - >>> check_index(3, 5) - >>> check_index(5, 5) - Traceback (most recent call last): - ... - IndexError: Index is not smaller than dimension 5 >= 5 - >>> check_index(6, 5) - Traceback (most recent call last): - ... - IndexError: Index is not smaller than dimension 6 >= 5 - >>> check_index(-1, 5) - >>> check_index(-6, 5) - Traceback (most recent call last): - ... - IndexError: Negative index is not greater than negative dimension -6 <= -5 - >>> check_index([1, 2], 5) - >>> check_index([6, 3], 5) - Traceback (most recent call last): - ... - IndexError: Index out of bounds for dimension 5 - >>> check_index(slice(0, 3), 5) - """ - # unknown dimension, assumed to be in bounds - if isinstance(ind, Iterable): - x = np.asanyarray(ind) - if ( - np.issubdtype(x.dtype, np.integer) - and ((x >= dimension) | (x < -dimension)).any() - ): - raise IndexError("Index out of bounds for dimension {:d}".format(dimension)) - elif x.dtype == bool and len(x) != dimension: - raise IndexError( - "boolean index did not match indexed array; dimension is {:d} " - "but corresponding boolean dimension is {:d}".format(dimension, len(x)) - ) - elif isinstance(ind, slice): - return - elif not isinstance(ind, Integral): - raise IndexError( - "only integers, slices (`:`), ellipsis (`...`), numpy.newaxis (`None`) and " - "integer or boolean arrays are valid indices" - ) - - elif ind >= dimension: - raise IndexError( - "Index is not smaller than dimension {:d} >= {:d}".format(ind, dimension) - ) - - elif ind < -dimension: - msg = "Negative index is not greater than negative dimension {:d} <= -{:d}" - raise IndexError(msg.format(ind, dimension)) - - -def sanitize_index(ind): - """Sanitize the elements for indexing along one axis - >>> sanitize_index([2, 3, 5]) - array([2, 3, 5]) - >>> sanitize_index([True, False, True, False]) - array([0, 2]) - >>> sanitize_index(np.array([1, 2, 3])) - array([1, 2, 3]) - >>> sanitize_index(np.array([False, True, True])) - array([1, 2]) - >>> type(sanitize_index(np.int32(0))) # doctest: +SKIP - - >>> sanitize_index(0.5) # doctest: +SKIP - Traceback (most recent call last): - ... - IndexError: only integers, slices (`:`), ellipsis (`...`), - numpy.newaxis (`None`) and integer or boolean arrays are valid indices - """ - if ind is None: - return None - elif isinstance(ind, slice): - return slice( - _sanitize_index_element(ind.start), - _sanitize_index_element(ind.stop), - _sanitize_index_element(ind.step), - ) - elif isinstance(ind, Number): - return _sanitize_index_element(ind) - if not hasattr(ind, "dtype") and len(ind) == 0: - ind = np.array([], dtype=np.intp) - ind = np.asarray(ind) - if ind.dtype == np.bool_: - nonzero = np.nonzero(ind) - if len(nonzero) == 1: - # If a 1-element tuple, unwrap the element - nonzero = nonzero[0] - return np.asanyarray(nonzero) - elif np.issubdtype(ind.dtype, np.integer): - return ind - else: - raise IndexError( - "only integers, slices (`:`), ellipsis (`...`), numpy.newaxis (`None`) and " - "integer or boolean arrays are valid indices" - ) - - -def _sanitize_index_element(ind): - """Sanitize a one-element index.""" - if ind is None: - return None - - return int(ind) - - -def posify_index(shape, ind): - """Flip negative indices around to positive ones - >>> posify_index(10, 3) - 3 - >>> posify_index(10, -3) - 7 - >>> posify_index(10, [3, -3]) - array([3, 7]) - >>> posify_index((10, 20), (3, -3)) - (3, 17) - >>> posify_index((10, 20), (3, [3, 4, -3])) # doctest: +NORMALIZE_WHITESPACE - (3, array([ 3, 4, 17])) - """ - if isinstance(ind, tuple): - return tuple(map(posify_index, shape, ind)) - if isinstance(ind, Integral): - if ind < 0 and not math.isnan(shape): - return ind + shape - else: - return ind - if isinstance(ind, (np.ndarray, list)) and not math.isnan(shape): - ind = np.asanyarray(ind) - return np.where(ind < 0, ind + shape, ind) - if isinstance(ind, slice): - start, stop, step = ind.start, ind.stop, ind.step - - if start < 0: - start += shape - - if not (0 > stop >= step) and stop < 0: - stop += shape - - return slice(start, stop, ind.step) - - return ind - - -def clip_slice(idx, dim): - """ - Clip slice to its effective size given the shape. - - Parameters - ---------- - idx : The index. - dim : The size along the corresponding dimension. - - Returns - ------- - idx : slice - - Examples - -------- - >>> clip_slice(slice(0, 20, 1), 10) - slice(0, 10, 1) - """ - if not isinstance(idx, slice): - return idx - - start, stop, step = idx.start, idx.stop, idx.step - - if step > 0: - start = max(start, 0) - stop = min(stop, dim) - - if start > stop: - start = stop - else: - start = min(start, dim - 1) - stop = max(stop, -1) - - if start < stop: - start = stop - - return slice(start, stop, step) - - -def replace_none(idx, dim): - """ - Normalize slices to canonical form, i.e. - replace ``None`` with the appropriate integers. - - Parameters - ---------- - idx : slice or other index - dim : dimension length - - Examples - -------- - >>> replace_none(slice(None, None, None), 10) - slice(0, 10, 1) - """ - if not isinstance(idx, slice): - return idx - - start, stop, step = idx.start, idx.stop, idx.step - - if step is None: - step = 1 - - if step > 0: - if start is None: - start = 0 - - if stop is None: - stop = dim - else: - if start is None: - start = dim - 1 - - if stop is None: - stop = -1 - - return slice(start, stop, step) \ No newline at end of file diff --git a/chainladder/core/base.py b/chainladder/core/base.py index 4362069e..ea86a87b 100644 --- a/chainladder/core/base.py +++ b/chainladder/core/base.py @@ -1,444 +1,6 @@ # This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at https://mozilla.org/MPL/2.0/. -import pandas as pd -import numpy as np -from chainladder.utils.cupy import cp -from chainladder.utils.sparse import sp -from chainladder.utils.dask import dp -import warnings - -from chainladder.core.display import TriangleDisplay -from chainladder.core.dunders import TriangleDunders -from chainladder.core.pandas import TrianglePandas -from chainladder.core.slice import TriangleSlicer -from chainladder.core.io import TriangleIO -from chainladder.core.common import Common -from chainladder import options -from chainladder.utils.utility_functions import num_to_nan, concat - - -class TriangleBase( - TriangleIO, TriangleDisplay, TriangleSlicer, TriangleDunders, TrianglePandas, Common -): - """This class handles the initialization of a triangle""" - - @property - def shape(self): - return self.values.shape - - @staticmethod - def _input_validation(data, index, columns, origin, development): - """Validate/sanitize inputs""" - - def str_to_list(arg): - if arg is None: - return - if type(arg) in [str, pd.Period]: - return [arg] - else: - return list(arg) - - index = str_to_list(index) - columns = str_to_list(columns) - origin = str_to_list(origin) - development = str_to_list(development) - if "object" in data[columns].dtypes: - raise TypeError("column attribute must be numeric.") - if data[columns].shape[1] != len(columns): - raise AttributeError("Columns are required to have unique names") - return index, columns, origin, development - - @staticmethod - def _set_development(data, development, development_format, origin_date): - """Initialize development and its grain""" - if development: - development_date = TriangleBase._to_datetime( - data, development, period_end=True, format=development_format - ) - # if np.all(development_date.dt.strftime('%m-%d') == '01-01'): - # development_date = pd.Series(pd.PeriodIndex(development_date, freq='A').to_timestamp(how='e')) - else: - o_max = pd.Period( - origin_date.max(), freq=TriangleBase._get_grain(origin_date) - ).to_timestamp(how="e") - development_date = pd.Series([o_max] * len(origin_date)) - - development_date.name = "__development__" - if ( - pd.Series(development_date).dt.year.min() - == pd.Series(development_date).dt.year.max() - == 1970 - ): - raise ValueError( - "Development lags could not be determined. This may be because development" - "is expressed as an age where a date-like vector is required" - ) - return development_date - - @staticmethod - def _set_index(col, unique): - return col.map(dict(zip(unique, range(len(unique))))).values[None].T - - @staticmethod - def _aggregate_data(data, origin_date, development_date, index, columns): - """Summarize dataframe to the level specified in axes""" - if type(data) != pd.DataFrame: - # Dask dataframes are mutated - data["__origin__"] = origin_date - data["__development__"] = development_date - key_gr = ["__origin__", "__development__"] + [ - data[item] for item in ([] if not index else index) - ] - data_agg = data.groupby(key_gr)[columns].sum().reset_index().fillna(0) - data = data.drop(["__origin__", "__development__"], axis=1) - else: - # Summarize dataframe to the level specified in axes - key_gr = [origin_date, development_date] + [ - data[item] for item in ([] if not index else index) - ] - data_agg = data[columns].groupby(key_gr).sum().reset_index().fillna(0) - data_agg["__origin__"] = data_agg[origin_date.name] - data_agg["__development__"] = data_agg[development_date.name] - # origin <= development is required - truncate bad records if not true - valid = data_agg["__origin__"] <= data_agg["__development__"] - if sum(~valid) > 0: - warnings.warn( - """ - Observations with development before - origin start have been removed.""" - ) - valid = valid.compute() if hasattr(valid, "compute") else valid - data_agg = data_agg[valid] - return data_agg - - @staticmethod - def _set_kdims(data_agg, index): - kdims = data_agg[index].drop_duplicates().reset_index(drop=True).reset_index() - key_idx = ( - data_agg[index].merge(kdims, how="left", on=index)["index"].values[None].T - ) - return kdims.drop("index", axis=1).values, key_idx - - @staticmethod - def _set_odims(data_agg, date_axes): - odims = np.sort(date_axes["__origin__"].unique()) - orig_idx = TriangleBase._set_index(data_agg["__origin__"], odims) - return odims, orig_idx - - @staticmethod - def _set_ddims(data_agg, date_axes): - if date_axes["__development__"].nunique() > 1: - dev_lag = TriangleBase._development_lag( - data_agg["__origin__"], data_agg["__development__"] - ) - - ddims = np.sort( - TriangleBase._development_lag( - date_axes["__origin__"], date_axes["__development__"] - ).unique() - ) - - dev_idx = TriangleBase._set_index(dev_lag, ddims) - - else: - ddims = pd.DatetimeIndex( - [data_agg["__development__"].max()], name="valuation" - ) - dev_idx = np.zeros((len(data_agg), 1)) - - return ddims, dev_idx - - @staticmethod - def _set_values(data_agg, key_idx, columns, orig_idx, dev_idx): - val_idx = ( - ((np.ones(len(data_agg))[None].T) * range(len(columns))) - .reshape((1, -1), order="F") - .T - ) - coords = np.concatenate( - tuple([np.concatenate((orig_idx, dev_idx), 1)] * len(columns)), 0 - ) - coords = np.concatenate( - (np.concatenate(tuple([key_idx] * len(columns)), 0), val_idx, coords), 1 - ) - amts = np.concatenate( - [data_agg[col].fillna(0).values for col in data_agg[columns]] - ).astype("float64") - return coords.T.astype("int32"), amts - - def _len_check(self, x, y): - if len(x) != len(y): - raise ValueError( - "Length mismatch: Expected axis has", - "{} elements, new values have".format(len(x)), - "{} elements".format(len(y)), - ) - - def _get_date_axes( - self, origin_date, development_date, origin_grain, development_grain - ): - """Function to find any missing origin dates or development dates that - would otherwise mess up the origin/development dimensions. - """ - o = pd.period_range( - start=origin_date.min(), end=origin_date.max(), freq=origin_grain - ).to_timestamp(how="s") - - d = pd.period_range( - start=development_date.min(), - end=development_date.max(), - freq=development_grain, - ).to_timestamp(how="e") - - # If the development is semi-annual, we need to adjust further because of "2Q-DEC" - if development_grain == "2Q-DEC": - from pandas.tseries.offsets import DateOffset - - d = d + DateOffset(months=-3) - - c = pd.DataFrame( - TriangleBase._cartesian_product(o, d), - columns=["__origin__", "__development__"], - ) - - return c[c["__development__"] > c["__origin__"]] - - @property - def nan_triangle(self): - """Given the current triangle shape and valuation, it determines the - appropriate placement of NANs in the triangle for future valuations. - This becomes useful when managing array arithmetic. - """ - xp = self.get_array_module() - if min(self.values.shape[2:]) == 1: - return xp.ones(self.values.shape[2:], dtype="float16") - val_array = np.array(self.valuation).reshape(self.shape[-2:], order="f") - nan_triangle = np.array(pd.DataFrame(val_array) > self.valuation_date) - nan_triangle = xp.array(np.where(nan_triangle, xp.nan, 1), dtype="float16") - return nan_triangle - - @staticmethod - def _to_datetime(data, fields, period_end=False, format=None): - """For tabular form, this will take a set of data - column(s) and return a single date array. This function heavily - relies on pandas, but does two additional things: - 1. It extends the automatic inference using date_inference_list - 2. it allows pd_to_datetime on a set of columns - """ - # Concat everything into one field - if len(fields) > 1: - target_field = data[fields].astype(str).apply(lambda x: "-".join(x), axis=1) - else: - target_field = data[fields].iloc[:, 0] - if hasattr(target_field, "dt"): - target = target_field - if type(target.iloc[0]) == pd.Period: - return target.dt.to_timestamp(how={1: "e", 0: "s"}[period_end]) - else: - datetime_arg = target_field.unique() - format = [{"arg": datetime_arg, "format": format}] if format else [] - date_inference_list = format + [ - {"arg": datetime_arg, "format": "%Y%m"}, - {"arg": datetime_arg, "format": "%Y"}, - {"arg": datetime_arg, "infer_datetime_format": True}, - ] - for item in date_inference_list: - try: - arr = dict(zip(datetime_arg, pd.to_datetime(**item))) - break - except: - pass - target = target_field.map(arr) - return target - - @staticmethod - def _development_lag(origin, valuation): - """For tabular format, this will convert the origin/valuation - difference to a development lag""" - return ((valuation - origin) / (365.25/12)).round('1d').dt.days - - - @staticmethod - def _get_grain(dates, trailing=False, kind="origin"): - """Determines Grain of origin or valuation vector - - Parameters: - - dates: pd.Series[datetime64[ns]] - A Datetime Series - trailing: - Set to False if you want to treat December as period end. Set - to True if you want it inferred from the data. - """ - months = dates.dt.month.unique() - diffs = np.diff(np.sort(months)) - if len(dates.unique()) == 1: - grain = "A" - elif len(months) == 1: - grain = "A" - elif np.all(diffs == 6): - grain = "2Q" - elif np.all(diffs == 3): - grain = "Q" - else: - grain = "M" - if trailing and grain != "M": - if kind == "origin": - end = (dates.min() - pd.DateOffset(days=1)).strftime("%b").upper() - end = ( - "DEC" - if end in ["MAR", "JUN", "SEP", "DEC"] and grain == "Q" - else end - ) - end = "DEC" if end in ["JUN", "DEC"] and grain == "2Q" else end - else: - # If inferred to beginning of calendar period, 1/1 from YYYY, 4/1 from YYYYQQ - if ( - dates.dt.strftime("%m%d") - .isin(["0101", "0401", "0701", "1001"]) - .any() - ): - end = ( - (dates.min() - pd.DateOffset(days=1, years=-1)) - .strftime("%b") - .upper() - ) - else: - end = dates.max().strftime("%b").upper() - grain = grain + "-" + end - return grain - - @staticmethod - def _cartesian_product(*arrays): - """A fast implementation of cartesian product, used for filling in gaps - in triangles (if any)""" - arr = np.empty( - [len(a) for a in arrays] + [len(arrays)], dtype=np.result_type(*arrays) - ) - for i, a in enumerate(np.ix_(*arrays)): - arr[..., i] = a - arr = arr.reshape(-1, len(arrays)) - return arr - - def get_array_module(self, arr=None): - backend = ( - self.array_backend - if arr is None - else arr.__class__.__module__.split(".")[0] - ) - modules = {"cupy": cp, "sparse": sp, "numpy": np, "dask": dp} - if modules.get(backend, None): - return modules.get(backend, None) - else: - raise ValueError("Array backend is invalid or not properly set.") - - def _auto_sparse(self): - """Auto sparsifies at 30Mb or more and 20% density or less""" - if not options.AUTO_SPARSE: - return self - n = np.prod(list(self.shape) + [8 / 1e6]) - if ( - self.array_backend == "numpy" - and n > 30 - and 1 - np.isnan(self.values).sum() / n * (8 / 1e6) < 0.2 - ): - self.set_backend("sparse", inplace=True) - if self.array_backend == "sparse" and not ( - self.values.density < 0.2 and n > 30 - ): - self.set_backend("numpy", inplace=True) - return self - - @property - def valuation(self): - ddims = self.ddims - if self.is_val_tri: - out = pd.DataFrame(np.repeat(self.ddims.values[None], len(self.odims), 0)) - return pd.DatetimeIndex(out.unstack().values) - ddim_arr = ddims - ddims[0] - origin = np.minimum(self.odims, np.datetime64(self.valuation_date)) - val_array = origin.astype("datetime64[M]") + np.timedelta64(ddims[0], "M") - val_array = val_array.astype("datetime64[ns]") - np.timedelta64(1, "ns") - val_array = val_array[:, None] - s = slice(None, -1) if ddims[-1] == 9999 else slice(None, None) - val_array = ( - val_array.astype("datetime64[M]") + ddim_arr[s][None, :] + 1 - ).astype("datetime64[ns]") - np.timedelta64(1, "ns") - if ddims[-1] == 9999: - ult = np.repeat(np.datetime64(options.ULT_VAL), val_array.shape[0])[:, None] - val_array = np.concatenate( - ( - val_array, - ult, - ), - axis=1, - ) - return pd.DatetimeIndex(val_array.reshape(1, -1, order="F")[0]) - - def _drop_subtriangles(self): - """Removes subtriangles from a Triangle instance""" - sub_tris = [k for k, v in vars(self).items() if isinstance(v, TriangleBase)] - if "ldf_" in sub_tris: - del self.ldf_ - if "sigma_" in sub_tris: - del self.sigma_ - if "std_err_" in sub_tris: - del self.std_err_ - - @property - def subtriangles(self): - """Lists subtriangles from a Triangle instance""" - return [k for k, v in vars(self).items() if isinstance(v, TriangleBase)] - - def __array__(self): - return self.values - - def __array_ufunc__(self, ufunc, method, *inputs, **kwargs): - obj = self.copy() - if method == "__call__": - inputs = [i.values if hasattr(i, "values") else i for i in inputs] - obj.values = ufunc(*inputs, **kwargs) - return obj - else: - raise NotImplementedError() - - def __array_function__(self, func, types, args, kwargs): - from chainladder.utils.utility_functions import concat - - methods_as_funcs = list( - set(dir(np)).intersection(set(dir(self))) - {"__dir__", "__doc__"} - ) - methods_as_funcs = {getattr(np, i): getattr(self, i) for i in methods_as_funcs} - HANDLED_FUNCTIONS = {np.concatenate: concat, np.round: self.__round__} - HANDLED_FUNCTIONS = {**HANDLED_FUNCTIONS, **methods_as_funcs} - if func not in HANDLED_FUNCTIONS: - return NotImplemented - if not all(issubclass(t, self.__class__) for t in types): - return NotImplemented - if func in methods_as_funcs: - args = args[1:] - return HANDLED_FUNCTIONS[func](*args, **kwargs) - - def compute(self, *args, **kwargs): - if hasattr(self.values, "chunks"): - obj = self.copy() - obj.values = obj.values.compute(*args, **kwargs) - m = obj.get_array_module(obj.values) - if m == sp: - obj.array_backend = "sparse" - if m == cp: - obj.array_backend = "cupy" - if m == np: - obj.array_backend = "numpy" - return obj - return self - - def _get_axis_value(self, axis): - axis = self._get_axis(axis) - return {0: self.index, 1: self.columns, 2: self.origin, 3: self.development}[ - axis - ] def is_chainladder(estimator): diff --git a/chainladder/core/core.py b/chainladder/core/core.py index 8995dea0..0744a99a 100644 --- a/chainladder/core/core.py +++ b/chainladder/core/core.py @@ -1,5 +1,4 @@ import polars as pl -from ._slicing import normalize_index dcol = ( (pl.col('__development__').dt.year() - @@ -26,60 +25,56 @@ class TriangleBase: def __init__( self, data=None, index=None, origin=None, columns=None, valuation=None, origin_format=None, valuation_format=None, - cumulative=None, pattern=False, trailing=False, lazy=False, + cumulative=None, pattern=False, lazy=False, trailing=False, *args, **kwargs ): if data is None: return + # Static attributes self.columns = [columns] if type(columns) is str else columns + index = [index] if type(index) is str else index or [] self.is_cumulative = cumulative self.is_pattern = pattern - self.origin_close = 'DEC' - - index = index or [] + self.is_lazy = lazy + self._properties = {} + if valuation is None: - data = ( - data.with_columns( - TriangleBase._format_origin( - data, origin, origin_format - ).dt.truncate("1mo").alias('__origin__'), - TriangleBase._format_valuation( - data, origin, origin_format - ).dt.month_end().max().alias('__development__')) - .select( - pl.col(index + self.columns + - ['__origin__', '__development__']))) + __development__ = TriangleBase._format_valuation( + data, origin, origin_format + ).dt.month_end().max().alias('__development__') else: - data = ( - data.with_columns( - TriangleBase._format_origin( - data, origin, origin_format - ).dt.truncate("1mo").alias('__origin__'), - TriangleBase._format_valuation( - data, valuation, valuation_format - ).dt.month_end().alias('__development__')) - .select( - pl.col(index + self.columns + - ['__origin__', '__development__']))) - if data.select('__development__').lazy().collect().n_unique() > 1: - data = data.select(pl.all().exclude('__development__'), dcol) + __development__ = TriangleBase._format_valuation( + data, valuation, valuation_format + ).dt.month_end().alias('__development__') + __origin__ = TriangleBase._format_origin( + data, origin, origin_format + ).dt.truncate("1mo").alias('__origin__') + + data = data.with_columns(__origin__, __development__) + if data.select('__development__').lazy().collect().n_unique() > 1: + # Coerce to development triangle + data = data.select(pl.all().exclude('__development__'), dcol) self.data = ( data - .with_columns(pl.lit('Total').alias('Total') if not index else []) - .group_by(pl.all().exclude(columns)) # Needed for cum_to_incr/incr_to_cum + .group_by(pl.col(index + ['__origin__', '__development__'])) # Needed for to_incremental/to_cumulative .agg(pl.col(columns).sum()) - .sort(index + ['__origin__', '__development__']) - ) - self.is_lazy = lazy + .select( + pl.lit('Total').alias('Total') if index == [] else pl.col(index), + pl.col(['__origin__', '__development__'] + self.columns)) + .sort((index or ['Total']) + ['__origin__', '__development__'])) if not lazy: self.data = self.data.lazy().collect() + if not trailing: + self.origin_close = 'DEC' + else: + self.origin_close = self.data.select( + pl.col('__origin__').max().dt.offset_by( + {'Y': '12mo', 'M': '1mo', + 'Q': '3mo', '2Q': '6mo'}[self.origin_grain] + ).dt.offset_by('-1d').dt.strftime('%b').str.to_uppercase())[0, 0] - if trailing: - self.data = self.grain( - f'O{self.origin_grain}D{self.development_grain}', - trailing=True).data - self.properties = {} + @staticmethod def from_triangle(triangle): @@ -90,17 +85,15 @@ def from_triangle(triangle): obj.is_pattern = triangle.is_pattern obj.origin_close = triangle.origin_close obj.is_lazy = triangle.is_lazy - obj.properties = triangle.properties.copy() + obj._properties = triangle._properties.copy() return obj @property def key_labels(self): - if 'key_labels' not in self.properties.keys(): - self.properties['key_labels'] = [ - c for c in self.data.columns - if c not in self.columns + - ['__origin__', '__development__']] - return self.properties['key_labels'] + return [ + c for c in self.data.columns + if c not in self.columns + + ['__origin__', '__development__']] @property def shape(self): @@ -113,40 +106,37 @@ def shape(self): @property def valuation_date(self): - # requires valuation return self.valuation.max() @property def index(self): - if 'index' not in self.properties.keys(): - self.properties['index'] = ( + if 'index' not in self._properties.keys(): + self._properties['index'] = ( self.data.select(pl.col(self.key_labels)).unique().sort(pl.all()) ) - return self.properties['index'] + return self._properties['index'] @property def origin_grain(self): - if 'origin_grain' not in self.properties.keys(): - months = self.date_matrix.select( - pl.col('__origin__').dt.month().sort().unique() - )['__origin__'] - diffs = months.diff()[1:] - if len(months) == 1: - grain = "Y" - elif (diffs == 6).all(): - grain = "2Q" - elif (diffs == 3).all(): - grain = "Q" - else: - grain = "M" - self.properties['origin_grain'] = grain - return self.properties['origin_grain'] + months = self.date_matrix.select( + pl.col('__origin__').dt.month().sort().unique() + )['__origin__'] + diffs = months.diff()[1:] + if len(months) == 1: + grain = "Y" + elif (diffs == 6).all(): + grain = "2Q" + elif (diffs == 3).all(): + grain = "Q" + else: + grain = "M" + return grain @property def date_matrix(self): - if 'date_matrix' not in self.properties.keys(): + if 'date_matrix' not in self._properties.keys(): if self.is_val_tri: - self.properties['date_matrix'] = ( + self._properties['date_matrix'] = ( self.data .group_by('__origin__') .agg(pl.col('__development__').unique()) @@ -156,60 +146,89 @@ def date_matrix(self): dcol )).lazy().collect() else: - self.properties['date_matrix'] = ( + self._properties['date_matrix'] = ( self.data .group_by('__origin__') .agg(pl.col('__development__').unique()) .explode('__development__') .with_columns(vcol.alias('__valuation__'))).lazy().collect() - return self.properties['date_matrix'] + return self._properties['date_matrix'] @property def origin(self): - if 'origin' not in self.properties.keys(): - self.properties['origin'] = pl.date_range( + if 'origin' not in self._properties.keys(): + self._properties['origin'] = pl.date_range( start=self.date_matrix['__origin__'].min(), end=self.date_matrix['__origin__'].max(), interval={'Y': '12mo', 'M': '1mo', 'Q': '3mo', '2Q': '6mo'}[self.origin_grain], eager=True).alias('origin') - return self.properties['origin'] - - @property - def odims(self): - return pl.DataFrame({'odims': range(len(self.origin)), '__origin__': self.origin}) + return self._properties['origin'] - @property - def ddims(self): - values = self.valutaion if self.is_val_tri else self.development - return pl.DataFrame({'ddims': range(len(values)), '__development__': values}) + @origin.setter + def origin(self, value): + if type(value) is not pl.Series: + raise TypeError('The origin attribute must be of type pl.Series.') + if value.name != 'origin': + raise ValueError('The origin attribute must be a series with the name "origin".') + if value.dtype != pl.Date: + raise TypeError('The origin Series must be of dtype pl.Date.') + if len(set(value.unique())) != len(set(self.date_matrix['__origin__'].unique())): + raise ValueError('Duplicates values found in origin') + self._properties['origin'] = value + @property def development(self): - if 'development' not in self.properties.keys(): + if 'development' not in self._properties.keys(): interval = {'Y': 12, '2Q': 6, 'Q': 3, 'M': 1}[self.development_grain] - self.properties['development'] = pl.Series( + self._properties['development'] = pl.Series( 'development', range(self.date_matrix['__development__'].min(), self.date_matrix['__development__'].max() + interval, interval)).cast(pl.UInt16) - return self.properties['development'] + return self._properties['development'] + + @development.setter + def development(self, value : pl.Series): + if type(value) is not pl.Series: + raise TypeError('The development attribute must be of type pl.Series.') + if value.name != 'development': + raise ValueError('The development attribute must be a series with the name "development".') + if value.dtype != pl.UInt16: + raise TypeError('The development Series must be of dtype pl.Uint16.') + if len(set(value.unique())) != len(set(self.date_matrix['__development__'].unique())): + raise ValueError('Duplicates values found in development') + self._properties['development'] = value @property def valuation(self): - if 'valuation' not in self.properties.keys(): + if 'valuation' not in self._properties.keys(): interval={'Y': '12mo', 'M': '1mo', 'Q': '3mo', '2Q': '6mo'}[self.development_grain] valuation_range = self.date_matrix.select( pl.col('__valuation__').min().alias('vmin').dt.month_start(), pl.col('__valuation__').max().alias('vmax')) - self.properties['valuation'] = pl.date_range( + self._properties['valuation'] = pl.date_range( start=valuation_range['vmin'][0], end=valuation_range['vmax'][0], interval=interval, eager=True).dt.month_end().alias('valuation') - return self.properties['valuation'] + return self._properties['valuation'] + + @valuation.setter + def valuation(self, value): + if type(value) is not pl.Series: + raise TypeError('The valuation attribute must be of type pl.Series.') + if value.name != 'valuation': + raise ValueError('The valuation attribute must be a series with the name "valuation".') + if value.dtype != pl.Date: + raise TypeError('The valuation Series must be of dtype pl.Date.') + if len(set(value.unique())) != len(set(self.date_matrix['__valuation__'].unique())): + raise ValueError('Duplicates values found in valuation') + self._properties['valuation'] = value + @property def is_full(self): @@ -218,62 +237,94 @@ def is_full(self): @property def is_val_tri(self): - if 'is_val_tri' not in self.properties.keys(): - self.properties['is_val_tri'] = dict( + if 'is_val_tri' not in self._properties.keys(): + self._properties['is_val_tri'] = dict( zip(self.data.columns, self.data.dtypes) )['__development__'] != pl.UInt16 - return self.properties['is_val_tri'] + return self._properties['is_val_tri'] @property def development_grain(self): - if 'development_grain' not in self.properties.keys(): - if len(self.date_matrix['__valuation__'].unique()) == 1: - grain = 'M' + if len(self.date_matrix['__valuation__'].unique()) == 1: + grain = 'M' + else: + months = self.data.select( + self.date_matrix['__valuation__'] + .dt.month().unique().sort().alias('__development__') + ).lazy().collect()['__development__'] + diffs = months.diff()[1:] + if len(months) == 1: + grain = "Y" + elif (diffs == 6).all(): + grain = "2Q" + elif (diffs == 3).all(): + grain = "Q" else: - months = self.data.select( - self.date_matrix['__valuation__'] - .dt.month().unique().sort().alias('__development__') - ).lazy().collect()['__development__'] - diffs = months.diff()[1:] - if len(months) == 1: - grain = "Y" - elif (diffs == 6).all(): - grain = "2Q" - elif (diffs == 3).all(): - grain = "Q" - else: - grain = "M" - self.properties['development_grain'] = grain - return self.properties['development_grain'] + grain = "M" + return grain @property def latest_diagonal(self): - # requires valuation, valuation_date - triangle = self[self.valuation==self.valuation_date] - if not triangle.is_val_tri: - triangle.data = triangle.data.select( - pl.all().exclude('__development__'), vcol) - triangle.properties['is_val_tri'] = True - triangle.properties.pop('date_matrix', None) - triangle.properties.pop('valuation', None) - triangle.properties.pop('development_grain', None) - triangle.properties.pop('development', None) + return self[self.valuation==self.valuation_date].to_valuation() + + def _get_value_idx(self): + index = pl.concat( + (self.index, + pl.Series(range(len(self.index))).alias('index').cast(pl.UInt64).to_frame()), + how='horizontal') + origin = pl.concat( + (self.origin.alias('__origin__').to_frame(), + pl.Series(range(len(self.origin))).alias('origin').cast(pl.UInt64).to_frame()), + how='horizontal') + development = pl.concat( + ((self.valuation if self.is_val_tri else self.development).alias('__development__').to_frame() , + pl.Series( + range(len(self.valuation if self.is_val_tri else self.development)) + ).alias('development').cast(pl.UInt64).to_frame()), + how='horizontal') + return index, origin, development + + @property + def values(self) -> pl.DataFrame: + index, origin, development = self._get_value_idx() + return ( + self.data + .join(origin, how='left', on='__origin__') + .join(development, how='left', on='__development__') + .join(index, how='left', on=self.key_labels) + .select(['index', 'origin', 'development'] + self.columns) + .rename({i: str(num) for num, i in enumerate(self.columns)})) + + def apply_labels_to_values(self, other: pl.DataFrame): + index, origin, development = self._get_value_idx() + triangle = TriangleBase.from_triangle(self) + triangle.data = ( + other + .join(origin, how='left', on='origin') + .join(development, how='left', on='development') + .join(index, how='left', on='index') + .rename({str(num): i for num, i in enumerate(self.columns)}) + .select(self.key_labels + ['__origin__', '__development__'] + self.columns)) return triangle - def val_to_dev(self): + def to_development(self): if self.is_val_tri: obj = TriangleBase.from_triangle(self) - obj.data = obj.data.select(pl.all().exclude('__development__'), dcol) - obj.properties['is_val_tri'] = False + obj.data = obj.data.select( + pl.col(self.key_labels + ['__origin__']), + dcol, pl.col(self.columns)) + obj._properties['is_val_tri'] = False return obj else: return self - def dev_to_val(self): + def to_valuation(self): if not self.is_val_tri: obj = TriangleBase.from_triangle(self) - obj.data = obj.data.select(pl.all().exclude('__development__'), vcol) - obj.properties['is_val_tri'] = True + obj.data = obj.data.select( + pl.col(self.key_labels + ['__origin__']), + vcol, pl.col(self.columns)) + obj._properties['is_val_tri'] = True return obj else: return self @@ -289,11 +340,14 @@ def collect(self, *args, **kwargs): return self @staticmethod - def _format_origin(data, column, format): + def _format_origin( + data : pl.DataFrame, + column : str, + format: str) -> pl.Expr: if data.select(column).dtypes[0] in ([pl.Date, pl.Datetime]): return pl.col(column).cast(pl.Date).dt.month_start() else: - for f in ['%Y%m', '%Y', format]: + for f in ['%Y%m', '%Y-%m', '%Y', format]: c = ( pl.col(column) .cast(pl.Utf8).str.to_date(format=f) @@ -305,11 +359,14 @@ def _format_origin(data, column, format): pass @staticmethod - def _format_valuation(data, column, format) -> pl.Expr: + def _format_valuation( + data: pl.DataFrame, + column: str, + format: str) -> pl.Expr: if data.select(column).dtypes[0] in ([pl.Date, pl.Datetime]): return pl.col(column).cast(pl.Date).dt.month_end() else: - for f in ['%Y%m', '%Y', format]: + for f in ['%Y%m', '%Y-%m', '%Y', format]: c = ( pl.col(column) .cast(pl.Utf8).str.to_date(format=f) @@ -325,8 +382,7 @@ def _format_valuation(data, column, format) -> pl.Expr: .dt.offset_by('-1d').dt.month_end()) else: return c.dt.month_end() - - + def _agg(self, agg, axis=None, *args, **kwargs): if axis is None: if max(self.shape) == 1: @@ -342,34 +398,29 @@ def _agg(self, agg, axis=None, *args, **kwargs): .group_by(['__origin__', '__development__']) .agg(getattr(pl.col(self.columns).fill_null(0), agg)(*args, **kwargs)) .with_columns(*[pl.lit('(All)').alias(c) for c in self.key_labels]) + .select(self.key_labels + ['__origin__', '__development__'] + self.columns) ) - obj.properties.pop('index', None) - obj.properties.pop('key_labels', None) + obj._properties.pop('index', None) elif axis == 1: - obj.data = self.data.select( - pl.col(self.key_labels + ['__origin__', '__development__']), - pl.sum_horizontal(self.columns).alias('0')) - obj.columns = ['0'] + obj.data = self.select(pl.sum_horizontal(self.columns).alias('0')) elif axis == 2: obj.data = ( self.data .group_by(self.key_labels + ['__development__']) .agg(getattr(pl.col(self.columns).fill_null(0), agg)(*args, **kwargs)) .with_columns(pl.lit(self.origin.min()).alias('__origin__'))) - obj.properties.pop('date_matrix', None) - obj.properties.pop('origin', None) - obj.properties.pop('origin_grain', None) + obj._properties.pop('date_matrix', None) + obj._properties.pop('origin', None) elif axis == 3: obj.data = ( self.data .group_by(self.key_labels + ['__origin__']) .agg(getattr(pl.col(self.columns).fill_null(0), agg)(*args, **kwargs)) .with_columns(pl.lit(self.valuation_date).alias('__development__'))) - obj.properties['is_val_tri'] = True - obj.properties.pop('date_matrix', None) - obj.properties.pop('development', None) - obj.properties.pop('development_grain', None) - obj.properties.pop('valuation', None) + obj._properties['is_val_tri'] = True + obj._properties.pop('date_matrix', None) + obj._properties.pop('development', None) + obj._properties.pop('valuation', None) else: raise ValueError(f'axis {axis} is not supported') return obj @@ -402,12 +453,11 @@ def quantile(self, axis=None, q=0.5): return self._agg('quantile', axis, quantile=q) def _get_axis(self, axis): - ax = { + return { **{0: 0, 1: 1, 2: 2, 3: 3}, **{-1: 3, -2: 2, -3: 1, -4: 0}, **{"index": 0, "columns": 1, "origin": 2, "development": 3}, - } - return ax.get(axis, 0) + }.get(axis, 0) def group_by(self, by, axis=0, *args, **kwargs): """Group Triangle by index values. If the triangle is convertable to a @@ -424,14 +474,9 @@ def group_by(self, by, axis=0, *args, **kwargs): """ return PlTriangleGroupBy(self, by, axis) - def incr_to_cum(self, inplace=False): + def to_cumulative(self): """Method to convert an incremental triangle into a cumulative triangle. - Parameters - ---------- - inplace: bool - Set to True will update the instance data attribute inplace - Returns ------- Updated instance of triangle accumulated along the origin @@ -463,26 +508,26 @@ def incr_to_cum(self, inplace=False): .agg( pl.col('__development__'), pl.col(self.columns).fill_null(pl.lit(0)).cumsum()) - .explode(["__development__"] + self.columns)) + .explode(["__development__"] + self.columns) + .select(self.key_labels + ['__origin__', '__development__'] + self.columns)) if not self.is_lazy: triangle.data = triangle.data.collect() triangle.is_cumulative = True - triangle.properties['is_val_tri'] = True - triangle.properties.pop('date_matrix', None) + triangle._properties['is_val_tri'] = True + triangle._properties.pop('date_matrix', None) if self.is_val_tri: - triangle.properties.pop('valuation', None) + triangle._properties.pop('valuation', None) return triangle else: - triangle.properties.pop('development', None) - return triangle.val_to_dev() + triangle._properties.pop('development', None) + return triangle.to_development() - def cum_to_incr(self, filter_zeros=False): + def to_incremental(self, filter_zeros=False): """Method to convert an cumlative triangle into a incremental triangle. Parameters ---------- - inplace: bool - Set to True will update the instance data attribute inplace + Returns ------- @@ -503,45 +548,26 @@ def cum_to_incr(self, filter_zeros=False): .filter(pl.any_horizontal(pl.col(self.columns) != 0) if filter_zeros else True) ) triangle.is_cumulative = False - triangle.properties.pop('date_matrix', None) + triangle._properties.pop('date_matrix', None) if self.is_val_tri: - triangle.properties.pop('valuation', None) + triangle._properties.pop('valuation', None) else: - triangle.properties.pop('development', None) + triangle._properties.pop('development', None) if not self.is_lazy: triangle.data = triangle.data.collect() return triangle @property def link_ratio(self): - triangle = TriangleBase.from_triangle(self.incr_to_cum().val_to_dev()) - interval = {'Y': 12, '2Q': 6, 'Q': 3, 'M': 1}[self.development_grain] - triangle.data = ( - triangle.data.lazy() - .sort(['__origin__', '__development__']) - .group_by(self.key_labels + ['__origin__']) - .agg( - (pl.col('__development__') - - pl.lit(interval)).cast(pl.UInt16).alias('__development__'), - (pl.when(pl.col(self.columns).pct_change().is_infinite()) - .then(pl.lit(None)) - .otherwise(pl.col(self.columns).pct_change()) + pl.lit(1.0) - ).keep_name()) - .explode(["__development__"] + self.columns) - .filter(~pl.any_horizontal(pl.col(self.columns).is_null()))) - if not self.is_lazy: - triangle.data = triangle.data.collect() + numer = self[..., 1:] + denom = self[..., :numer.shape[2], :-1] + triangle = 1 / denom * numer.values + triangle = triangle[triangle.valuation 0 else None - stop = i.stop if i.stop > -1 else None - stop = None if stop == self.shape[n] else stop - step = None if start is None and stop is None else i.step - l.append(slice(start, stop, step)) - else: - l.append(i) - key = tuple(l) - return key - - def _contig_slice(arr): - """ Try to make a contiguous slicer from an array of indices """ - if type(arr) is slice: - return arr - if type(arr) in [int]: - arr = [arr] - if len(arr) == 1: - return slice(arr[0], arr[0] + 1) - if len(arr) == 0: - raise ValueError("Slice returns empty Triangle") - diff = pl.Series(arr).diff() - if max(diff) == min(diff): - step = max(diff) - else: - return arr - step = None if step == 1 else step - min_arr = None if min(arr) == 0 else min(arr) - max_arr = max(arr) + 1 - if step and step < 0: - min_arr, max_arr = max_arr - 1, min_arr - 1 if min_arr else min_arr - return slice(min_arr, max_arr, step) - - idx = _normalize_index(idx) - return (_contig_slice(idx[0]), _contig_slice(idx[1]), - _contig_slice(idx[2]), _contig_slice(idx[3])) + def _normalize_slice(self, key): + key = [key] if type(key) is not tuple else list(key) + key = [slice(item, item + 1 if item != -1 else None, None) if type(item) is int else item for item in key] + ellipsis_index = [num for num, i in enumerate(key) if i == Ellipsis] + if key[0] == Ellipsis: + key = [slice(None, None, None)]*(5 - len(key)) + key[1:] + if key[-1] == Ellipsis: + key = key[:-1] + [slice(None, None, None)]*(5 - len(key)) + if len(ellipsis_index) > 0: + key = key[:ellipsis_index[0]] + [slice(None, None, None)]*(5 - len(key)) + key[ellipsis_index[0] + 1:] + if len(ellipsis_index) == 0 and len(key) < 4: + key = key + [slice(None, None, None)]*(4 - len(key)) + return key if type(key) is tuple else tuple(key) def __getitem__(self, key): - """ Only returns polars expressions. """ + """ Eager materialization. Use select, with_columns and filter for optimized performance """ + triangle = TriangleBase.from_triangle(self) if type(key) is str: - key = [key] - if type(key) is tuple or type(key) is slice or type(key) is int: - s0, s1, s2, s3 = self._get_idx(key) - return ( - [pl.col(c).is_in(self.index[c]) for c in self.key_labels[s0]], - self.columns[s1], - [pl.col('__origin__').is_in(self.origin[s2])], - [pl.col('__development__').is_in(self.valuation[s3] if self.is_val_tri else self.development[s3])]) - elif type(key) is list: return self.select(key) - elif type(key) is pl.Series: - triangle = TriangleBase.from_triangle(self) - triangle.properties.pop('date_matrix', None) + if type(key) in [tuple, slice, int] or (type(key) is list and type(key[0]) is int): + s0, s1, s2, s3 = self._normalize_slice(key) + s0 = self.index[s0] if s0 != slice(None, None, None) else s0 + s1 = pl.col(self.columns[s1]) + s2 = self.origin[s2] + s3 = self.valuation[s3] if self.is_val_tri else self.development[s3] + triangle = triangle.filter( + pl.fold( + acc=pl.lit(True), + function=lambda acc, x: acc & x, + exprs=([pl.col('__origin__').is_in(s2)] + + [pl.col('__development__').is_in(s3)]))) + if type(s0) is not slice: + triangle = triangle.filter_by_df(s0) + triangle._properties['index'] = s0 + triangle._properties['origin'] = s2 + if self.is_val_tri: + triangle._properties['valuation'] = s3 + else: + triangle._properties['development'] = s3 + return triangle.select(s1) + elif type(key) is list: + triangle = triangle.select(pl.col(key)) + return triangle + elif type(key) is pl.Series and key.dtype == pl.Boolean: if key.name == 'valuation': key = self.valuation.filter(key) - triangle.properties.pop('development', None) - triangle.properties['valuation'] = key - return triangle.filter(pl.col('__development__').is_in(key) if self.is_val_tri else vcol.is_in(key)) + return triangle.filter( + pl.col('__development__').is_in(key) + if self.is_val_tri else vcol.is_in(key)) elif key.name == 'development': - triangle.properties.pop('valuation', None) key = self.development.filter(key) - triangle.properties['development'] = key - return triangle.filter(dcol.is_in(key) if self.is_val_tri else pl.col('__development__').is_in(key)) + return triangle.filter( + dcol.is_in(key) if self.is_val_tri else + pl.col('__development__').is_in(key)) elif key.name == 'origin': key = self.origin.filter(key) - triangle.properties['origin'] = key return triangle.filter(pl.col('__origin__').is_in(key)) else: raise NotImplementedError() @@ -741,13 +750,13 @@ def __getitem__(self, key): def __setitem__(self, key, value): """ Function for pandas style column setting """ if type(value) is pl.Expr: - self.data = self.data.select(pl.all().exclude(key), value.alias(key)) + self.data = self.with_columns(value.alias(key)).data elif type(value) != type(self): value = self._triangle_literal(value) value.data = value.data.rename({'__value__': key}) value.columns = [key] self.data = ( - self.data.select(pl.all().exclude(key)) + self.data.select(pl.col(self.columns).exclude(key)) .join(value.data.select([key, '__origin__', '__development__']), how='left', on=['__origin__', '__development__'])) else: @@ -760,19 +769,27 @@ def __setitem__(self, key, value): f"""Unable to assign triangle with multiple column values. Choose one of {value.columns}.""") value = TriangleBase.from_triangle(value) - index_intersection = list(set(self.key_labels).intersection(set(value.key_labels))) - if len(value.key_labels) == 1: - index_intersection = [] - value.data = value.data.rename({value.columns[0]: key}) - value.columns = [key] - self.data = ( - self.data.lazy().select(pl.all().exclude(key)) - .join( - value.data.lazy().select( - index_intersection + value.columns + ['__origin__', '__development__']), - how='left', on=index_intersection + ['__origin__', '__development__']) - .rename({value.columns[0]: key}) - ) + if self._is_aligned(value): + self.data = ( + pl.concat( + (self.data.lazy().collect(), + value.data.select(pl.col(value.columns[0]).alias(key)).lazy().collect()) + , how='horizontal') + .lazy()) + else: + index_intersection = list(set(self.key_labels).intersection(set(value.key_labels))) + if len(value.key_labels) == 1: + index_intersection = [] + value.data = value.data.rename({value.columns[0]: key}) + value.columns = [key] + self.data = ( + self.data.lazy().select(pl.col(self.columns).exclude(key)) + .join( + value.data.lazy().select( + index_intersection + value.columns + ['__origin__', '__development__']), + how='left', on=index_intersection + ['__origin__', '__development__']) + .rename({value.columns[0]: key}) + ) self.columns = self.columns + [key] if not self.is_lazy: self.data = self.data.lazy().collect() @@ -783,8 +800,9 @@ def broadcast_index(a, b): a = TriangleBase.from_triangle(a) a.data = (b.index.lazy().join( a.data.lazy().select( - pl.col(a.columns + ['__origin__', '__development__'])), + pl.col(['__origin__', '__development__'] + a.columns)), how='cross')) + a._properties.pop('index', None) return a, b def broadcast_columns(a, b): @@ -800,6 +818,10 @@ def broadcast_origin(a, b): a.data = a.data.drop('__origin__').join( b.origin.alias('__origin__').to_frame().lazy(), how='cross') + a._properties.pop('date_matrix', None) + a._properties.pop('origin', None) + a._properties.pop('development', None) + a._properties.pop('valuation', None) return a, b def broadcast_development(a, b): @@ -808,6 +830,11 @@ def broadcast_development(a, b): (b.valuation if b.is_val_tri else b.development ).alias('__development__').to_frame().lazy(), how='cross') + a._properties.pop('date_matrix', None) + a._properties.pop('origin', None) + a._properties.pop('development', None) + a._properties.pop('valuation', None) + a._properties.pop('is_val_tri', None) return a, b a.data = a.data.lazy() b.data = b.data.lazy() @@ -839,7 +866,7 @@ def head(self, n: 'int' = 5): self.index.head(n), how='semi', on=self.key_labels) - triangle.properties.pop('index', None) + triangle._properties.pop('index', None) return triangle def tail(self, n: 'int' = 5): @@ -848,31 +875,96 @@ def tail(self, n: 'int' = 5): self.index.tail(n), how='semi', on=self.key_labels) - triangle.properties.pop('index', None) + triangle._properties.pop('index', None) return triangle - def filter(self, key, *args, **kwargs): + def filter(self, *exprs): + """ Function to apply polars filtering and re-trigger + affected properties """ triangle = TriangleBase.from_triangle(self) - triangle.data = triangle.data.filter(key, *args, **kwargs) + triangle.data = triangle.data.filter(*exprs) + triangle._properties.pop('date_matrix', None) + triangle._properties['origin'] = triangle.origin.filter( + triangle.origin.is_in( + triangle.date_matrix['__origin__'].unique())) + triangle._properties['development'] = triangle.development.filter( + triangle.development.is_in( + triangle.date_matrix['__development__'].unique())) + triangle._properties['valuation'] = triangle.valuation.filter( + triangle.valuation.is_in( + triangle.date_matrix['__valuation__'].unique())) + triangle._properties.pop('index', None) + triangle._properties['index'] = self.filter_index(triangle.index) return triangle + + def filter_index(self, df): + return ( + self.index + .with_columns( + pl.Series('__row__', range(len(self.index)))) + .join(df, how='inner', on=df.columns) + .sort('__row__') + .drop('__row__')) + - def select(self, key, *args, **kwargs): + def filter_by_df(self, df): triangle = TriangleBase.from_triangle(self) - if type(key) is str: - key = [key] - if len(set(key).intersection(self.key_labels)) ==len(key): - triangle.data = triangle.data.select(pl.col(key + ['__origin__', '__development__'] + self.columns, *args, **kwargs)) - triangle.key_labels = key - elif len(set(key).intersection(self.columns)) ==len(key): - triangle.data = triangle.data.select(pl.col(self.key_labels + ['__origin__', '__development__'] + key, *args, **kwargs)) - triangle.columns = key - else: - raise NotImplementedError() + triangle.data = triangle.data.join( + df, how='inner', on=df.columns) + triangle._properties.pop('date_matrix', None) + triangle._properties['origin'] = triangle.origin.filter( + triangle.origin.is_in( + triangle.date_matrix['__origin__'].unique())) + triangle._properties['development'] = triangle.development.filter( + triangle.development.is_in( + triangle.date_matrix['__development__'].unique())) + triangle._properties['valuation'] = triangle.valuation.filter( + triangle.valuation.is_in( + triangle.date_matrix['__valuation__'].unique())) + triangle._properties.pop('index', None) + triangle._properties['index'] = self.filter_index(triangle.index) + return triangle + + + def select(self, *exprs): + """ Function to apply polars selection and re-trigger + affected properties. Does not support pl.all """ + triangle = TriangleBase.from_triangle(self) + dims = self.key_labels + ['__origin__', '__development__'] + triangle.data = triangle.data.select(pl.col(dims), *exprs) + triangle.columns = [c for c in triangle.data.columns if c not in dims] + return triangle + + def with_columns(self, *exprs): + """ Function to apply polars selection and re-trigger + affected properties """ + triangle = TriangleBase.from_triangle(self) + dims = self.key_labels + ['__origin__', '__development__'] + triangle.data = triangle.data.with_columns(*exprs) + triangle.columns = [c for c in triangle.data.columns if c not in dims] + return triangle + + def join(self, other): + """ Method to union two triangles together """ + shared_cols = set(self.columns).intersection(set(other.columns)) + if len(shared_cols) > 0: + raise ValueError( + f"Column values must be unique, but both triangle have {shared_cols}." + ) + triangle = TriangleBase.from_triangle(self) + on =(list(set(self.key_labels).intersection(other.key_labels)) + + ['__origin__', '__development__']) + triangle.data = self.data.join(other.data, on=on, how='inner') + triangle.columns = self.columns + other.columns + triangle._properties = {} return triangle - def join(self, other, on, how, *args, **kwargs): + + def union(self, other): + """ Method to union two triangles together """ triangle = TriangleBase.from_triangle(self) - triangle.data = triangle.data.join(other, on, how, *args, **kwargs) + triangle.data = pl.concat((self.data, other.data), how='align') + triangle._properties = {} return triangle def _compatibility_check(self, other): @@ -889,9 +981,38 @@ def _compatibility_check(self, other): """Triangle arithmetic requires triangles to be broadcastable or on the same lag basis (development or valuation).""" ) + if (self.origin_grain != other.origin_grain + or (self.development_grain != other.development_grain + and min(self.shape[-1], y.shape[-1]) > 1)): + raise ValueError( + "Triangle arithmetic requires both triangles to be the same grain." + ) + a, b = set(self.key_labels), set(other.key_labels) + common = a.intersection(b) + if common not in [a, b]: + raise ValueError('Index broadcasting is ambiguous between', str(a), 'and', str(b)) return join_index, union_index, source_columns, destination_columns + def _is_aligned(self, other): + """ Helper to determine whether horizontal concat is feasible """ + return ( + not (self.is_lazy and other.is_lazy) and # must be eager + len(self.data) == len(other.data) and # must have same underlying rows + # must have all non-measure columns be equal + (self.data.select(self.key_labels + ['__origin__', '__development__']) == + other.data.select(other.key_labels + ['__origin__', '__development__']) + ).min().min(axis=1)[0]) + + def rename(self, mapping): + triangle = TriangleBase.from_triangle(self) + triangle.data = triangle.data.rename(mapping) + triangle.columns = [mapping.get(c, c) for c in self.columns] + return triangle + + def __arithmetic__(self, other, operation): + if type(other) == pl.DataFrame: + other = self.apply_labels_to_values(other) if type(other) != type(self): other = self._triangle_literal(other) valuation = max(self.valuation_date, other.valuation_date) @@ -899,9 +1020,7 @@ def __arithmetic__(self, other, operation): join_index, union_index, source_columns, destination_columns = \ a._compatibility_check(b) a = TriangleBase.from_triangle(a) - if (not (a.is_lazy and b.is_lazy) and len(a.data) == len(b.data) and - (a.data.select(a.key_labels + ['__origin__', '__development__']) == - b.data.select(b.key_labels + ['__origin__', '__development__'])).min().min(axis=1)[0]): + if a._is_aligned(b): a.data = ( pl.concat( (a.data.lazy().collect(), @@ -934,7 +1053,7 @@ def __arithmetic__(self, other, operation): ) if not self.is_lazy: a.data = a.data.collect() - a.properties = {} + a._properties = {} return a def _triangle_literal(self, value): @@ -1007,8 +1126,6 @@ def __round__(self, n): def __len__(self): return self.shape[0] - - def __contains__(self, value): raise NotImplementedError() @@ -1020,6 +1137,17 @@ def __le__(self, value): def copy(self): return TriangleBase.from_triangle(self) + + def __eq__(self, other): + if (type(other) != type(self) or + self.shape != other.shape + or len(self.data) != len(other.data)): + return False + return ( + self.sort_data().data.select(self.columns).lazy().collect() == + other.sort_data().data.select(other.columns).lazy().collect() + ).min().min(axis=1)[0] + def to_frame(self, keepdims=False, implicit_axis=False, *args, **kwargs): """ Converts a triangle to a pandas.DataFrame. @@ -1037,45 +1165,81 @@ def to_frame(self, keepdims=False, implicit_axis=False, *args, **kwargs): pandas.DataFrame representation of the Triangle. """ if self.shape[:2] == (1, 1) and not keepdims: - return self.wide() - if implicit_axis: - if self.is_val_tri: - return self.data.sort( - pl.col(self.key_labels + ['__origin__', '__development__'])).select( - pl.col(self.key_labels), - pl.col('__origin__').alias('origin'), - pl.col('__development__').alias('valuation'), - dcol.alias('development'), - pl.col(self.columns)) - else: - return self.data.sort( - pl.col(self.key_labels + ['__origin__', '__development__'])).select( - pl.col(self.key_labels), + index = pl.concat(( + pl.Series(range(len(self.origin))).alias('index').to_frame(), + self.origin.to_frame()), how='horizontal') + columns = (self.valuation + if self.is_val_tri else + self.development.cast(pl.Utf8)) + return ( + self.data + .with_columns( + (pl.col('__development__') + if self.is_val_tri else + pl.col('__development__')).alias('development'), pl.col('__origin__').alias('origin'), - pl.col('__development__').alias('development'), - vcol.alias('valuation'), pl.col(self.columns)) + .lazy().collect(streaming=True) + .pivot( + index='origin', + columns='development', + values=self.columns, + aggregate_function='first') + .join(index, how='left', on='origin') + .sort('index') + .select(pl.col(['origin'] + columns.to_list()))) else: + alias = 'valuation' if self.is_val_tri else 'development' + implicit = dcol.alias('development') if self.is_val_tri else vcol.alias('valuation') return self.data.sort( pl.col(self.key_labels + ['__origin__', '__development__'])).select( pl.col(self.key_labels), pl.col('__origin__').alias('origin'), - pl.col('__development__').alias('valuation' if self.is_val_tri else 'development'), + pl.col('__development__').alias(alias), + implicit if implicit_axis else pl.col([]), pl.col(self.columns)) - def sort(self): + def sort_data(self): self.data = self.data.sort(self.key_labels + ['__origin__', '__development__']) return self + + def _summary_frame(self): + return pl.DataFrame({ + "": ["Valuation:", "Grain:", "Shape:", "Index:", "Columns:"], + "Triangle Summary": [ + self.valuation_date.strftime("%Y-%m"), + "O" + self.origin_grain + "D" + self.development_grain, + str(self.shape), + str(self.key_labels), + str(self.columns),],}) + + def __repr__(self): + if self.shape[:2] == (1, 1): + data = self.wide() + return data.__repr__() + else: + return self._summary_frame().__repr__() + + def sort_index(self, by=None, descending=False): + self._properties['index'] = self.index.sort(by, descending) + class PlTriangleGroupBy: - def __init__(self, obj, by, axis=0, **kwargs): + def __init__(self, obj, by, axis=0): self.obj = TriangleBase.from_triangle(obj) self.axis = self.obj._get_axis(axis) self.by = [by] if type(by) is str else by if self.axis == 0: self.groups = obj.data.group_by( self.by + ['__origin__', '__development__']) + elif self.axis == 1: + if callable(by): + self.by = [by(c) for c in self.obj.columns] + elif len(by) == len(self.obj.columns): + self.by = by + else: + raise NotImplementedError() else: raise NotImplementedError() self.columns = self.obj.columns @@ -1084,42 +1248,53 @@ def __getitem__(self, key): self.columns = [key] if type(key) is str else key return self - def _agg(self, agg, axis=1, *args, **kwargs): - axis = self.obj._get_axis(axis) + def _agg(self, agg): + axis = self.obj._get_axis(self.axis) if axis == 0: self.obj.data = self.groups.agg( - getattr(pl.col(self.columns), agg)(*args, **kwargs)) - self.obj.properties.pop('index', None) - self.obj.properties.pop('key_labels', None) - else: + getattr(pl.col(self.obj.columns), agg)()) + self.obj._properties.pop('index', None) + elif axis == 1: + maps = pl.DataFrame( + {'by': self.by, 'columns': list(self.obj.columns)} + ).group_by('by').agg(pl.col('columns')) + maps = dict( + zip(maps['by'].cast(pl.Utf8).to_list(), + maps['columns'].to_list())) + self.obj.data = self.obj.data.select( + pl.col(self.obj.key_labels + ['__origin__', '__development__']), + *[getattr(pl, agg + '_horizontal')(pl.col(v)).alias(str(k)) + for k, v in maps.items()]) + self.obj.columns = [ + c for c in self.obj.data.columns + if c not in self.obj.key_labels + ['__origin__', '__development__']] raise ValueError(f'axis {axis} is not supported') - self.obj.columns = self.columns return self.obj - def sum(self, axis=0): - return self._agg('sum', axis) + def sum(self): + return self._agg('sum') - def mean(self, axis=0): - return self._agg('mean', axis) + def mean(self): + return self._agg('mean') - def min(self, axis=0): - return self._agg('min', axis) + def min(self): + return self._agg('min') - def max(self, axis=0): - return self._agg('max', axis) + def max(self): + return self._agg('max') - def median(self, axis=0): - return self._agg('median', axis) + def median(self): + return self._agg('median') - def std(self, axis=0): - return self._agg('std', axis) + def std(self): + return self._agg('std') - def var(self, axis=0): - return self._agg('var', axis) + def var(self): + return self._agg('var') - def product(self, axis=0): - return self._agg('product', axis) + def product(self): + return self._agg('product') - def quantile(self, axis=0, quantile=0.5): - return self._agg('quantile', axis, quantile=quantile) + def quantile(self, quantile=0.5): + return self._agg('quantile', quantile=quantile) diff --git a/chainladder/core/tests/test_triangle.py b/chainladder/core/tests/test_triangle.py index 91539f94..7294fa36 100644 --- a/chainladder/core/tests/test_triangle.py +++ b/chainladder/core/tests/test_triangle.py @@ -217,6 +217,7 @@ def test_jagged_2_add(raa): def test_df_period_input(raa): d = raa.latest_diagonal df = d.to_frame(origin_as_datetime=False).reset_index() + df = df.rename(columns={'1990-12': 'values'}) assert cl.Triangle(df, origin="index", columns=df.columns[-1]) == d diff --git a/chainladder/core/triangle.py b/chainladder/core/triangle.py index b165cfc6..eae8de2f 100644 --- a/chainladder/core/triangle.py +++ b/chainladder/core/triangle.py @@ -56,6 +56,10 @@ def origin(self): def is_val_tri(self): return self.triangle.is_val_tri + @property + def origin_close(self): + return self.triangle.origin_close + def collect(self): self.triangle.data = self.triangle.data.collect() return self @@ -126,25 +130,16 @@ def development_grain(self): def iloc(self): return Ilocation(self) + @property + def loc(self): + return Location(self) + def __repr__(self): if self.shape[:2] == (1, 1): data = self._repr_format() return data.to_string() else: - return self._summary_frame().__repr__() - - def _summary_frame(self): - return pd.Series( - [ - self.valuation_date.strftime("%Y-%m"), - "O" + self.origin_grain + "D" + self.development_grain, - self.shape, - self.key_labels, - self.columns.tolist(), - ], - index=["Valuation:", "Grain:", "Shape:", "Index:", "Columns:"], - name="Triangle Summary", - ).to_frame() + return self.triangle._summary_frame().to_pandas().set_index('').__repr__() def _repr_html_(self): """ Jupyter/Ipython HTML representation """ @@ -163,7 +158,7 @@ def _repr_html_(self): ) return default else: - return self._summary_frame().to_html( + return self.triangle._summary_frame().to_pandas().set_index('').to_html( max_rows=pd.options.display.max_rows, max_cols=pd.options.display.max_columns, ) @@ -265,12 +260,15 @@ def __getitem__(self, key): obj = self.copy() if type(key) is str: key = [key] + index = type(key) is list and len(set(self.key_labels).intersection(set(key))) == len(key) columns = type(key) is list and len(set(self.columns).intersection(set(key))) == len(key) development = type(key) is pd.Series origin = type(key) is np.ndarray and len(key) == len(self.origin) valuation = type(key) is np.ndarray and len(key) != len(self.origin) + if type(key) is pl.Expr: + obj.triangle = self.triangle.filter(key) if columns: - obj.triangle = self.triangle.select(key) + obj.triangle = self.triangle.select(pl.col(key)) elif development: if self.is_val_tri: formats = {"Y": "%Y", "S": "%YQ%q", "Q": "%YQ%q", "M": "%Y-%m"} @@ -300,8 +298,8 @@ def __getitem__(self, key): )) .select(self.key_labels + ['__origin__', '__development__'] + s1)) obj.triangle.columns = s1 - else: - raise NotImplementedError() + elif index: + return pl.col(key[0] if len(key) == 1 else key) return obj def __setitem__(self, key, value): @@ -311,8 +309,8 @@ def __setitem__(self, key, value): def to_sparse(self): - from chainladder.core.slice import VirtualColumns - from chainladder.core.triangle import Triangle + from chainladder.legacy.slice import VirtualColumns + from chainladder.legacy.triangle import Triangle import pandas as pd import sparse import polars as pl @@ -343,42 +341,63 @@ def to_sparse(self): @property def values(self): - return self.triangle.data.select(self.columns) + return self.triangle.values def __eq__(self, other): return self.triangle == other.triangle - - def __eq__(self, other): - return ( - self.triangle.data.sort( - pl.col(self.key_labels + ['__origin__', '__development__']) - ).select(self.triangle.columns).lazy().collect() == - other.triangle.data.sort( - pl.col(other.key_labels + ['__origin__', '__development__']) - ).select(other.columns).lazy().collect() - ).min(axis=0).min(axis=1)[0] - + def __len__(self): return len(self) - - def to_frame(self, *args, **kwargs): - df = self.triangle.to_frame(*args, **kwargs).lazy().collect().to_pandas() + def to_frame(self, origin_as_datetime=True, keepdims=False, + implicit_axis=False, *args, **kwargs): + """ Converts a triangle to a pandas.DataFrame. + Parameters + ---------- + origin_as_datetime : bool + Whether the origin vector should be converted from PeriodIndex + into a datetime dtype. Default is False. + keepdims : bool + If True, the triangle will be converted to a DataFrame with all + dimensions intact. The argument will force a consistent DataFrame + format regardless of whether any dimensions are of length 1. + implicit_axis : bool + When keepdims is True, this denotes whether to include the implicit + valuation axis in addition to the origin and development. + Returns + ------- + pandas.DataFrame representation of the Triangle. + """ + df = self.triangle.to_frame(keepdims=keepdims, implicit_axis=implicit_axis).lazy().collect().to_pandas() + if not origin_as_datetime: + df['origin'] = df['origin'].map(dict(zip(self.triangle.origin, self.origin))) shape = tuple([num for num, i in enumerate(self.shape) if i > 1]) - if shape == (0, 1): - df = df.set_index(self.key_labels)[self.columns] - if shape == (0, 2): - df = df.pivot(index=self.key_labels, columns='origin', values=self.columns) - if shape == (0, 3): - df = df.pivot(index=self.key_labels, columns='development', values=self.columns) - if shape == (1, 2): - df = df.set_index('origin')[self.columns].T - if shape == (1, 3): - df = df.set_index('development')[self.columns].T - if shape == (2, 3): - df = df.set_index('origin') + if len(shape) == 2 and not keepdims: + if shape == (0, 1): + df = df.set_index(self.key_labels)[self.columns] + if shape == (0, 2): + df = df.pivot(index=self.key_labels, columns='origin', values=self.columns[0]) + if shape == (0, 3): + df = df.pivot(index=self.key_labels, columns='development', values=self.columns[0]) + if shape == (1, 2): + df = df.set_index('origin')[self.columns].T + if shape == (1, 3): + df = df.set_index('development')[self.columns].T + if shape == (2, 3): + df = df.set_index('origin') + df.index.name = None + df.columns.name = None + if df.columns[0] == 'origin': + df = df.set_index('origin') + df.index.name = None + else: + df = df.set_index(self.key_labels) return df + @property + def T(self): + return self.to_frame(origin_as_datetime=False).T + def groupby(self, by, axis=0, *args, **kwargs): return TriangleGroupBy(self.triangle, by, axis) @@ -412,24 +431,107 @@ def sqrt(self): def exp(self): return np.exp(self) + def val_to_dev(self, *args, **kwargs): + obj = self.copy() + obj.triangle = self.triangle.to_development(*args, **kwargs) + return obj + + def dev_to_val(self, *args, **kwargs): + obj = self.copy() + obj.triangle = self.triangle.to_valuation(*args, **kwargs) + return obj + + def cum_to_incr(self, *args, **kwargs): + obj = self.copy() + obj.triangle = self.triangle.to_incremental(*args, **kwargs) + return obj + + def incr_to_cum(self, *args, **kwargs): + obj = self.copy() + obj.triangle = self.triangle.to_cumulative(*args, **kwargs) + return obj + + def grain(self, *args, **kwargs): + obj = self.copy() + obj.triangle = self.triangle.to_grain(*args, **kwargs) + return obj + + def pipe(self, func, *args, **kwargs): + return func(self, *args, **kwargs) + + def append(self, other): + from chainladder.utils.utility_functions import concat + obj = self.copy() + obj.triangle = concat((self.triangle, other.triangle), axis=0) + return obj + + + +class Location: + """ Base class for pandas style loc/iloc indexing """ + def __init__(self, obj): + self.obj = obj + + def _contig_slice(self, arr): + """ Try to make a contiguous slicer from an array of indices """ + if type(arr) is slice: + return arr + if type(arr) in [int, np.int64, np.int32]: + arr = [arr] + if len(arr) == 1: + return slice(arr[0], arr[0] + 1) + diff = np.diff(arr) + if len(diff) == 0: + raise ValueError("Slice returns empty Triangle") + if max(diff) == min(diff): + step = max(diff) + else: + return arr + step = None if step == 1 else step + min_arr = None if min(arr) == 0 else min(arr) + max_arr = max(arr) + 1 + if step and step < 0: + min_arr, max_arr = max_arr - 1, min_arr - 1 if min_arr else min_arr + return slice(min_arr, max_arr, step) + + def __getitem__(self, key): + key = self.obj.triangle._normalize_slice(key) + obj = self.obj.copy() + idx_slice = obj.index.reset_index().set_index(obj.key_labels).loc[key[0]] + + key = ( + self._contig_slice(idx_slice.values.flatten().tolist()), + self._contig_slice( + pd.Series(obj.columns).reset_index().set_index('columns') + .loc[key[1]].values.flatten().tolist()), + self._contig_slice( + pd.Series(obj.origin).reset_index().set_index('origin') + .loc[key[2]].values.flatten().tolist()), + self._contig_slice( + obj.development.reset_index().set_index('development') + .loc[key[3]].values.flatten().tolist())) + obj.triangle = obj.triangle[key] + if len(obj.key_labels) > 1: + obj.triangle.data = obj.triangle.data.drop(set(obj.key_labels)-set(idx_slice.index.names)) + obj.triangle._properties.pop('index', None) + return obj + + class Ilocation: def __init__(self, obj): self.obj = obj def __getitem__(self, key): - return self.obj.__getitem__(key) - -class TriangleGroupBy(PlTriangleGroupBy): - def _agg(self, agg, axis=1, *args, **kwargs): - axis = self.obj._get_axis(axis) - if axis == 0: - self.obj.data = self.groups.agg( - getattr(pl.col(self.columns), agg)(*args, **kwargs)) - else: - raise ValueError(f'axis {axis} is not supported') - self.obj.columns = self.columns + key = self.obj.triangle._normalize_slice(key) + obj = self.obj.copy() + obj.triangle = obj.triangle.__getitem__(key) + return obj + + +class TriangleGroupBy(PlTriangleGroupBy): + def _agg(self, agg, *args, **kwargs): obj = Triangle() - obj.triangle = self.obj + obj.triangle = super()._agg(agg, *args, **kwargs) return obj def add_tri_passthru(cls, k): @@ -454,9 +556,10 @@ def set_method(cls, func, k): passthru = [ '__abs__', '__neg__', '__pos__', '__pow__', '__round__', - 'collect', 'lazy', 'head', + 'collect', 'lazy', 'head', '_get_axis', 'max', 'mean', 'median', 'min', 'product', 'quantile', 'std', - 'sum', 'tail', 'val_to_dev', 'var', 'val_to_dev', 'dev_to_val', 'cum_to_incr', 'incr_to_cum', 'grain'] + 'sum', 'tail', 'var'] + for item in passthru: add_tri_passthru(Triangle, item) diff --git a/chainladder/legacy/__init__.py b/chainladder/legacy/__init__.py new file mode 100644 index 00000000..cd971676 --- /dev/null +++ b/chainladder/legacy/__init__.py @@ -0,0 +1,5 @@ +from chainladder.legacy.triangle import Triangle as LegacyTriangle # noqa (API import) +from chainladder.legacy.correlation import ( + DevelopmentCorrelation, + ValuationCorrelation, +) # noqa (API import) \ No newline at end of file diff --git a/chainladder/legacy/base.py b/chainladder/legacy/base.py new file mode 100644 index 00000000..05fad95f --- /dev/null +++ b/chainladder/legacy/base.py @@ -0,0 +1,455 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at https://mozilla.org/MPL/2.0/. +import pandas as pd +import numpy as np +from chainladder.utils.cupy import cp +from chainladder.utils.sparse import sp +from chainladder.utils.dask import dp +import warnings + +from chainladder.legacy.display import TriangleDisplay +from chainladder.legacy.dunders import TriangleDunders +from chainladder.legacy.pandas import TrianglePandas +from chainladder.legacy.slice import TriangleSlicer +from chainladder.legacy.io import TriangleIO +from chainladder.legacy.common import Common +from chainladder import options +from chainladder.utils.utility_functions import num_to_nan, concat + + +class TriangleBase( + TriangleIO, TriangleDisplay, TriangleSlicer, TriangleDunders, TrianglePandas, Common +): + """This class handles the initialization of a triangle""" + + @property + def shape(self): + return self.values.shape + + @staticmethod + def _input_validation(data, index, columns, origin, development): + """Validate/sanitize inputs""" + + def str_to_list(arg): + if arg is None: + return + if type(arg) in [str, pd.Period]: + return [arg] + else: + return list(arg) + + index = str_to_list(index) + columns = str_to_list(columns) + origin = str_to_list(origin) + development = str_to_list(development) + if "object" in data[columns].dtypes: + raise TypeError("column attribute must be numeric.") + if data[columns].shape[1] != len(columns): + raise AttributeError("Columns are required to have unique names") + return index, columns, origin, development + + @staticmethod + def _set_development(data, development, development_format, origin_date): + """Initialize development and its grain""" + if development: + development_date = TriangleBase._to_datetime( + data, development, period_end=True, format=development_format + ) + # if np.all(development_date.dt.strftime('%m-%d') == '01-01'): + # development_date = pd.Series(pd.PeriodIndex(development_date, freq='A').to_timestamp(how='e')) + else: + o_max = pd.Period( + origin_date.max(), freq=TriangleBase._get_grain(origin_date) + ).to_timestamp(how="e") + development_date = pd.Series([o_max] * len(origin_date)) + + development_date.name = "__development__" + if ( + pd.Series(development_date).dt.year.min() + == pd.Series(development_date).dt.year.max() + == 1970 + ): + raise ValueError( + "Development lags could not be determined. This may be because development" + "is expressed as an age where a date-like vector is required" + ) + return development_date + + @staticmethod + def _set_index(col, unique): + return col.map(dict(zip(unique, range(len(unique))))).values[None].T + + @staticmethod + def _aggregate_data(data, origin_date, development_date, index, columns): + """Summarize dataframe to the level specified in axes""" + if type(data) != pd.DataFrame: + # Dask dataframes are mutated + data["__origin__"] = origin_date + data["__development__"] = development_date + key_gr = ["__origin__", "__development__"] + [ + data[item] for item in ([] if not index else index) + ] + data_agg = data.groupby(key_gr)[columns].sum().reset_index().fillna(0) + data = data.drop(["__origin__", "__development__"], axis=1) + else: + # Summarize dataframe to the level specified in axes + key_gr = [origin_date, development_date] + [ + data[item] for item in ([] if not index else index) + ] + data_agg = data[columns].groupby(key_gr).sum().reset_index().fillna(0) + data_agg["__origin__"] = data_agg[origin_date.name] + data_agg["__development__"] = data_agg[development_date.name] + # origin <= development is required - truncate bad records if not true + valid = data_agg["__origin__"] <= data_agg["__development__"] + if sum(~valid) > 0: + warnings.warn( + """ + Observations with development before + origin start have been removed.""" + ) + valid = valid.compute() if hasattr(valid, "compute") else valid + data_agg = data_agg[valid] + return data_agg + + @staticmethod + def _set_kdims(data_agg, index): + kdims = data_agg[index].drop_duplicates().reset_index(drop=True).reset_index() + key_idx = ( + data_agg[index].merge(kdims, how="left", on=index)["index"].values[None].T + ) + return kdims.drop("index", axis=1).values, key_idx + + @staticmethod + def _set_odims(data_agg, date_axes): + odims = np.sort(date_axes["__origin__"].unique()) + orig_idx = TriangleBase._set_index(data_agg["__origin__"], odims) + return odims, orig_idx + + @staticmethod + def _set_ddims(data_agg, date_axes): + if date_axes["__development__"].nunique() > 1: + dev_lag = TriangleBase._development_lag( + data_agg["__origin__"], data_agg["__development__"] + ) + + ddims = np.sort( + TriangleBase._development_lag( + date_axes["__origin__"], date_axes["__development__"] + ).unique() + ) + + dev_idx = TriangleBase._set_index(dev_lag, ddims) + + else: + ddims = pd.DatetimeIndex( + [data_agg["__development__"].max()], name="valuation" + ) + dev_idx = np.zeros((len(data_agg), 1)) + + return ddims, dev_idx + + @staticmethod + def _set_values(data_agg, key_idx, columns, orig_idx, dev_idx): + val_idx = ( + ((np.ones(len(data_agg))[None].T) * range(len(columns))) + .reshape((1, -1), order="F") + .T + ) + coords = np.concatenate( + tuple([np.concatenate((orig_idx, dev_idx), 1)] * len(columns)), 0 + ) + coords = np.concatenate( + (np.concatenate(tuple([key_idx] * len(columns)), 0), val_idx, coords), 1 + ) + amts = np.concatenate( + [data_agg[col].fillna(0).values for col in data_agg[columns]] + ).astype("float64") + return coords.T.astype("int32"), amts + + def _len_check(self, x, y): + if len(x) != len(y): + raise ValueError( + "Length mismatch: Expected axis has", + "{} elements, new values have".format(len(x)), + "{} elements".format(len(y)), + ) + + def _get_date_axes( + self, origin_date, development_date, origin_grain, development_grain + ): + """Function to find any missing origin dates or development dates that + would otherwise mess up the origin/development dimensions. + """ + o = pd.period_range( + start=origin_date.min(), end=origin_date.max(), freq=origin_grain + ).to_timestamp(how="s") + + d = pd.period_range( + start=development_date.min(), + end=development_date.max(), + freq=development_grain, + ).to_timestamp(how="e") + + # If the development is semi-annual, we need to adjust further because of "2Q-DEC" + if development_grain == "2Q-DEC": + from pandas.tseries.offsets import DateOffset + + d = d + DateOffset(months=-3) + + c = pd.DataFrame( + TriangleBase._cartesian_product(o, d), + columns=["__origin__", "__development__"], + ) + + return c[c["__development__"] > c["__origin__"]] + + @property + def nan_triangle(self): + """Given the current triangle shape and valuation, it determines the + appropriate placement of NANs in the triangle for future valuations. + This becomes useful when managing array arithmetic. + """ + xp = self.get_array_module() + if min(self.values.shape[2:]) == 1: + return xp.ones(self.values.shape[2:], dtype="float16") + val_array = np.array(self.valuation).reshape(self.shape[-2:], order="f") + nan_triangle = np.array(pd.DataFrame(val_array) > self.valuation_date) + nan_triangle = xp.array(np.where(nan_triangle, xp.nan, 1), dtype="float16") + return nan_triangle + + @staticmethod + def _to_datetime(data, fields, period_end=False, format=None): + """For tabular form, this will take a set of data + column(s) and return a single date array. This function heavily + relies on pandas, but does two additional things: + 1. It extends the automatic inference using date_inference_list + 2. it allows pd_to_datetime on a set of columns + """ + # Concat everything into one field + if len(fields) > 1: + target_field = data[fields].astype(str).apply(lambda x: "-".join(x), axis=1) + else: + target_field = data[fields].iloc[:, 0] + if hasattr(target_field, "dt"): + target = target_field + if type(target.iloc[0]) == pd.Period: + return target.dt.to_timestamp(how={1: "e", 0: "s"}[period_end]) + else: + datetime_arg = target_field.unique() + format = [{"arg": datetime_arg, "format": format}] if format else [] + date_inference_list = format + [ + {"arg": datetime_arg, "format": "%Y%m"}, + {"arg": datetime_arg, "format": "%Y"}, + {"arg": datetime_arg, "infer_datetime_format": True}, + ] + for item in date_inference_list: + try: + arr = dict(zip(datetime_arg, pd.to_datetime(**item))) + break + except: + pass + target = target_field.map(arr) + return target + + @staticmethod + def _development_lag(origin, valuation): + """For tabular format, this will convert the origin/valuation + difference to a development lag""" + return ((valuation - origin) / (365.25/12)).round('1d').dt.days + + + @staticmethod + def _get_grain(dates, trailing=False, kind="origin"): + """Determines Grain of origin or valuation vector + + Parameters: + + dates: pd.Series[datetime64[ns]] + A Datetime Series + trailing: + Set to False if you want to treat December as period end. Set + to True if you want it inferred from the data. + """ + months = dates.dt.month.unique() + diffs = np.diff(np.sort(months)) + if len(dates.unique()) == 1: + grain = "A" + elif len(months) == 1: + grain = "A" + elif np.all(diffs == 6): + grain = "2Q" + elif np.all(diffs == 3): + grain = "Q" + else: + grain = "M" + if trailing and grain != "M": + if kind == "origin": + end = (dates.min() - pd.DateOffset(days=1)).strftime("%b").upper() + end = ( + "DEC" + if end in ["MAR", "JUN", "SEP", "DEC"] and grain == "Q" + else end + ) + end = "DEC" if end in ["JUN", "DEC"] and grain == "2Q" else end + else: + # If inferred to beginning of calendar period, 1/1 from YYYY, 4/1 from YYYYQQ + if ( + dates.dt.strftime("%m%d") + .isin(["0101", "0401", "0701", "1001"]) + .any() + ): + end = ( + (dates.min() - pd.DateOffset(days=1, years=-1)) + .strftime("%b") + .upper() + ) + else: + end = dates.max().strftime("%b").upper() + grain = grain + "-" + end + return grain + + @staticmethod + def _cartesian_product(*arrays): + """A fast implementation of cartesian product, used for filling in gaps + in triangles (if any)""" + arr = np.empty( + [len(a) for a in arrays] + [len(arrays)], dtype=np.result_type(*arrays) + ) + for i, a in enumerate(np.ix_(*arrays)): + arr[..., i] = a + arr = arr.reshape(-1, len(arrays)) + return arr + + def get_array_module(self, arr=None): + backend = ( + self.array_backend + if arr is None + else arr.__class__.__module__.split(".")[0] + ) + modules = {"cupy": cp, "sparse": sp, "numpy": np, "dask": dp} + if modules.get(backend, None): + return modules.get(backend, None) + else: + raise ValueError("Array backend is invalid or not properly set.") + + def _auto_sparse(self): + """Auto sparsifies at 30Mb or more and 20% density or less""" + if not options.AUTO_SPARSE: + return self + n = np.prod(list(self.shape) + [8 / 1e6]) + if ( + self.array_backend == "numpy" + and n > 30 + and 1 - np.isnan(self.values).sum() / n * (8 / 1e6) < 0.2 + ): + self.set_backend("sparse", inplace=True) + if self.array_backend == "sparse" and not ( + self.values.density < 0.2 and n > 30 + ): + self.set_backend("numpy", inplace=True) + return self + + @property + def valuation(self): + ddims = self.ddims + if self.is_val_tri: + out = pd.DataFrame(np.repeat(self.ddims.values[None], len(self.odims), 0)) + return pd.DatetimeIndex(out.unstack().values) + ddim_arr = ddims - ddims[0] + origin = np.minimum(self.odims, np.datetime64(self.valuation_date)) + val_array = origin.astype("datetime64[M]") + np.timedelta64(ddims[0], "M") + val_array = val_array.astype("datetime64[ns]") - np.timedelta64(1, "ns") + val_array = val_array[:, None] + s = slice(None, -1) if ddims[-1] == 9999 else slice(None, None) + val_array = ( + val_array.astype("datetime64[M]") + ddim_arr[s][None, :] + 1 + ).astype("datetime64[ns]") - np.timedelta64(1, "ns") + if ddims[-1] == 9999: + ult = np.repeat(np.datetime64(options.ULT_VAL), val_array.shape[0])[:, None] + val_array = np.concatenate( + ( + val_array, + ult, + ), + axis=1, + ) + return pd.DatetimeIndex(val_array.reshape(1, -1, order="F")[0]) + + def _drop_subtriangles(self): + """Removes subtriangles from a Triangle instance""" + sub_tris = [k for k, v in vars(self).items() if isinstance(v, TriangleBase)] + if "ldf_" in sub_tris: + del self.ldf_ + if "sigma_" in sub_tris: + del self.sigma_ + if "std_err_" in sub_tris: + del self.std_err_ + + @property + def subtriangles(self): + """Lists subtriangles from a Triangle instance""" + return [k for k, v in vars(self).items() if isinstance(v, TriangleBase)] + + def __array__(self): + return self.values + + def __array_ufunc__(self, ufunc, method, *inputs, **kwargs): + obj = self.copy() + if method == "__call__": + inputs = [i.values if hasattr(i, "values") else i for i in inputs] + obj.values = ufunc(*inputs, **kwargs) + return obj + else: + raise NotImplementedError() + + def __array_function__(self, func, types, args, kwargs): + from chainladder.utils.utility_functions import concat + + methods_as_funcs = list( + set(dir(np)).intersection(set(dir(self))) - {"__dir__", "__doc__"} + ) + methods_as_funcs = {getattr(np, i): getattr(self, i) for i in methods_as_funcs} + HANDLED_FUNCTIONS = {np.concatenate: concat, np.round: self.__round__} + HANDLED_FUNCTIONS = {**HANDLED_FUNCTIONS, **methods_as_funcs} + if func not in HANDLED_FUNCTIONS: + return NotImplemented + if not all(issubclass(t, self.__class__) for t in types): + return NotImplemented + if func in methods_as_funcs: + args = args[1:] + return HANDLED_FUNCTIONS[func](*args, **kwargs) + + def compute(self, *args, **kwargs): + if hasattr(self.values, "chunks"): + obj = self.copy() + obj.values = obj.values.compute(*args, **kwargs) + m = obj.get_array_module(obj.values) + if m == sp: + obj.array_backend = "sparse" + if m == cp: + obj.array_backend = "cupy" + if m == np: + obj.array_backend = "numpy" + return obj + return self + + def _get_axis_value(self, axis): + axis = self._get_axis(axis) + return {0: self.index, 1: self.columns, 2: self.origin, 3: self.development}[ + axis + ] + + +def is_chainladder(estimator): + """Return True if the given estimator is a chainladder based method. + Parameters + ---------- + estimator : object + Estimator object to test. + Returns + ------- + out : bool + True if estimator is a chainladder based method and False otherwise. + """ + return getattr(estimator, "_estimator_type", None) == "chainladder" diff --git a/chainladder/legacy/common.py b/chainladder/legacy/common.py new file mode 100644 index 00000000..f32b3637 --- /dev/null +++ b/chainladder/legacy/common.py @@ -0,0 +1,215 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at https://mozilla.org/MPL/2.0/. +import pandas as pd +from chainladder.utils.cupy import cp +from chainladder.utils.sparse import sp +from chainladder.utils.dask import dp +import numpy as np +from chainladder.utils.utility_functions import concat +from chainladder import options + + + +def _get_full_expectation(cdf_, ultimate_, is_cumulative=True): + """ Private method that builds full expectation""" + full = ultimate_ / cdf_ + + if is_cumulative: + return concat((full, ultimate_.copy().rename("development", [9999])), axis=3) + + else: + tail_ = full.iloc[:, :, :, -1] - ultimate_ + + return concat( + (full.cum_to_incr(), tail_.copy().rename("development", [9999])), axis=3 + ) + + +def _get_full_triangle(X, ultimate, is_cumulative=True): + """ Private method that builds full triangle""" + # Getting the LDFs and expand for all origins + from chainladder.utils.utility_functions import num_to_nan + emergence = X.ldf_.copy() * (ultimate / ultimate) + + # Setting LDFs for all of the known diagonals as 1 + emergence = ( + emergence[emergence.valuation < X.valuation_date] * 0 + + 1 + + emergence[emergence.valuation >= X.valuation_date] + ) + + emergence.valuation_date = pd.to_datetime(options.ULT_VAL) + emergence.values = emergence.values.cumprod(axis=3) - 1 + + # Shifting the CDFs by development age, and renaming the last column as 9999 + emergence.ddims = emergence.ddims + \ + {"Y": 12, "Q": 3, "S": 6, "M": 1}[emergence.development_grain] + emergence.ddims[-1] = 9999 + emergence.values = emergence.values / num_to_nan(emergence.values[..., -1:]) + ld = X.incr_to_cum().latest_diagonal + cum_run_off = ld + emergence * (ultimate - ld) + cum_run_off = cum_run_off[cum_run_off.valuation > X.valuation_date] + cum_run_off.is_cumulative = True + + if is_cumulative: + return X + cum_run_off + else: + return (X.incr_to_cum() + cum_run_off).cum_to_incr() + + +class Common: + """ Class that contains common properties of a "fitted" Triangle. """ + + @property + def has_ldf(self): + if hasattr(self, "ldf_"): + return True + else: + return False + + @property + def has_zeta(self): + if hasattr(self, "zeta_"): + return True + else: + return False + + @property + def cdf_(self): + if not self.has_ldf: + x = self.__class__.__name__ + raise AttributeError("'" + x + "' object has no attribute 'cdf_'") + return self.ldf_.incr_to_cum() + + @property + def cum_zeta_(self): + if not self.has_zeta: + x = self.__class__.__name__ + raise AttributeError("'" + x + "' object has no attribute 'cum_zeta_'") + return self.zeta_.incr_to_cum() + + @property + def ibnr_(self): + if not hasattr(self, "ultimate_"): + x = self.__class__.__name__ + raise AttributeError("'" + x + "' object has no attribute 'ibnr_'") + if hasattr(self, "X_"): + ld = self.latest_diagonal + else: + ld = self.latest_diagonal if self.is_cumulative else self.sum( + axis=3) + ibnr = self.ultimate_ - ld + ibnr.vdims = self.ultimate_.vdims + return ibnr + + @property + def full_expectation_(self): + if not hasattr(self, "ultimate_"): + raise AttributeError( + "'" + + self.__class__.__name__ + + "' object has no attribute 'full_expectation_'" + ) + + return _get_full_expectation(self.cdf_, self.ultimate_, self.X_.is_cumulative) + + @property + def full_triangle_(self): + if not hasattr(self, "ultimate_"): + raise AttributeError( + "'" + + self.__class__.__name__ + + "' object has no attribute 'full_triangle_'" + ) + + if hasattr(self, "X_"): + X = self.X_ + else: + X = self + return _get_full_triangle(X, self.ultimate_, X.is_cumulative) + + + def pipe(self, func, *args, **kwargs): + return func(self, *args, **kwargs) + + def set_backend(self, backend, inplace=False, deep=False, **kwargs): + """ Converts triangle array_backend. + + Parameters + ---------- + backend : str + Currently supported options are 'numpy', 'sparse', and 'cupy' + inplace : bool + Whether to mutate the existing Triangle instance or return a new + one. + + Returns + ------- + Triangle with updated array_backend + """ + if hasattr(self, "array_backend"): + old_backend = self.array_backend + else: + if hasattr(self, "ldf_"): + old_backend = self.ldf_.array_backend + else: + raise ValueError("Unable to determine array backend.") + if inplace: + # Coming from dask - compute and then recall this method + # going to dask - + if old_backend == "dask" and backend != "dask": + self = self.compute() + old_backend = self.array_backend + if backend in ["numpy", "sparse", "cupy", "dask"]: + lookup = { + "numpy": { + "sparse": lambda x: x.todense(), + "cupy": lambda x: cp.asnumpy(x), + }, + "cupy": { + "numpy": lambda x: cp.array(x), + "sparse": lambda x: cp.array(x.todense()), + }, + "sparse": { + "numpy": lambda x: sp.array(x), + "cupy": lambda x: sp.array(cp.asnumpy(x)), + }, + "dask": { + # should this be chunked? + "numpy": lambda x: dp.from_array(x, **kwargs), + "cupy": lambda x: dp.from_array(x, **kwargs), + "sparse": lambda x: dp.from_array(x, **kwargs), + }, + } + if hasattr(self, "values"): + self.values = lookup[backend].get(old_backend, lambda x: x)( + self.values + ) + if deep: + for k, v in vars(self).items(): + if isinstance(v, Common): + v.set_backend(backend, inplace=True, deep=True) + if hasattr(self, "array_backend"): + self.array_backend = backend + else: + raise AttributeError(backend, "backend is not supported.") + return self + else: + obj = self.copy() + return obj.set_backend(backend=backend, inplace=True, deep=deep, **kwargs) + + def _validate_assumption(self, triangle, value, axis): + if type(value) in (int, float, str): + arr = np.repeat(value, triangle.shape[axis]) + if type(value) in (list, tuple, set, np.array): + arr = np.array(value) + if type(value) is dict: + arr = np.array([value[a] for a in triangle._get_axis_value(axis)]) + if callable(value): + arr = np.array([value(a) for a in triangle._get_axis_value(axis)]) + if axis == 3: + arr = arr[None, None, None] + if axis == 2: + arr = arr[None, None, :, None] + return arr \ No newline at end of file diff --git a/chainladder/legacy/correlation.py b/chainladder/legacy/correlation.py new file mode 100644 index 00000000..01626106 --- /dev/null +++ b/chainladder/legacy/correlation.py @@ -0,0 +1,314 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at https://mozilla.org/MPL/2.0/. +from __future__ import annotations + +import numpy as np +import pandas as pd + +from scipy.special import comb + +from scipy.stats import ( + binom, + norm, + rankdata +) + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from chainladder.core.triangle import Triangle + + +class DevelopmentCorrelation: + """ + Mack (1997) test for correlations between subsequent development + factors. Results should be within confidence interval range + otherwise too much correlation + + Parameters + ---------- + triangle: Triangle + Triangle on which to estimate correlation between subsequent development + factors. + p_critical: float (default=0.5) + Value between 0 and 1 representing the confidence level for the test. A + value of 0.5 implies a 50% confidence. The default value is based on the example + provided in the Mack 97 paper, the selection of which is justified on the basis of the + test being only an approximate measure of correlations and the desire to detect + correlations already in a substantial part of the triangle. + + Attributes + ---------- + t_critical: DataFrame + Boolean value for whether correlation is too high based on ``p_critical`` + confidence level. + t_expectation: DataFrame + Values representing the Spearman rank correlation + t_variance: float + Variance measure of Spearman rank correlation + confidence_interval: tuple + Range within which ``t_expectation`` must fall for independence assumption + to be significant. + """ + + def __init__( + self, + triangle, + p_critical: float = 0.5 + ): + self.p_critical = p_critical + + # Check that critical value is a probability + validate_critical(p_critical=p_critical) + + if triangle.array_backend != "numpy": + triangle = triangle.set_backend("numpy") + xp = triangle.get_array_module() + + m1 = triangle.link_ratio + + # Rank link ratios by development period, assigning a score of 1 for the lowest + m1_val = xp.apply_along_axis( + func1d=rankdata, + axis=2, + arr=m1.values + ) * (m1.values * 0 + 1) + + # Remove the last element from each column, and then rank again + m2 = triangle[triangle.valuation < triangle.valuation_date].link_ratio + m2.values = xp.apply_along_axis( + func1d=rankdata, + axis=2, + arr=m2.values + ) * (m2.values * 0 + 1) + + m1 = m2.copy() + + # remove the first column from m1 since it is not used in the comparison to m2 + m1.values = m1_val[..., : m2.shape[2], 1:] + + # Apply Spearman Rank Correlation formula + # numerator is the one in formula G4 of the Mack 97 paper + numerator = ((m1 - m2) ** 2).sum("origin") + + # remove last column because it was not part of the comparison with m2 + numerator.values = numerator.values[..., :-1] + numerator.ddims = numerator.ddims[:-1] + + # I is the number of development periods in the triangle + I = len(triangle.development) + + # k values are the column indexes for which we are calculating T_k + k = xp.array(range(2, 2 + numerator.shape[3])) + + # denominator is the one in formula G4 of the Mack 97 paper + denominator = ((I - k) ** 3 - I + k)[None, None, None] + + # complete formula G4, results in array of each T_k value + self.t = 1 - 6 * xp.nan_to_num(numerator.values) / denominator + + # per Mack, weight is one less than the number of pairs for each T_k + weight = (I - k - 1)[None, None, None] + + # Calculate big T, the weighted average of the T_k values + t_expectation = ( + xp.sum(xp.nan_to_num(weight * self.t), axis=3) / xp.sum(weight, axis=3) + )[..., None] + + idx = triangle.index.set_index(triangle.key_labels).index + + # variance is result of formula G6 + self.t_variance = 2 / ((I - 2) * (I - 3)) + + # array of t values + self.t = pd.DataFrame( + self.t[0, 0, ...], + columns=k, + index=["T_k"] + ) + + # array of weights + self.weights = pd.DataFrame( + weight[0, 0, ...], + columns=k, + index=["I-k-1"] + ) + + # final big T + self.t_expectation = pd.DataFrame( + t_expectation[..., 0, 0], + columns=triangle.vdims, + index=idx + ) + + # table of Spearman's rank coefficients Tk, can be used to verify consistency with paper + self.corr = pd.concat([ + self.t, + self.weights + ]) + + self.corr.columns.names = ['k'] + + # construct confidence interval based on selection of p_critical + self.confidence_interval = ( + norm.ppf(0.5 - (1 - p_critical) / 2) * xp.sqrt(self.t_variance), + norm.ppf(0.5 + (1 - p_critical) / 2) * xp.sqrt(self.t_variance), + ) + + # if T lies outside this range, we reject the null hypothesis + self.t_critical = (self.t_expectation < self.confidence_interval[0]) | ( + self.t_expectation > self.confidence_interval[1] + ) + + # hypothesis test result, False means fail to reject the null hypothesis + self.reject = self.t_critical.values[0][0] + + +class ValuationCorrelation: + """ + Mack (1997) test for calendar year effect.A calendar period has impact + across developments if the probability of the number of small (or large) + development factors, Z, in that period occurring randomly is less than + ``p_critical`` + + Parameters + ---------- + triangle: Triangle + Triangle on which to test whether the calendar effects violate independence + requirements of the chainladder method. + p_critical: float (default=0.10) + Value between 0 and 1 representing the confidence level for the test. 0.1 + implies 90% confidence. + total: boolean + Whether to calculate valuation correlation in total across all + years (True) consistent with Mack 1993 or for each year separately + (False) consistent with Mack 1997. + + Attributes + ---------- + z : Triangle or DataFrame + Z values for each Valuation Period + z_critical : Triangle or DataFrame + Boolean value for whether correlation is too high based on ``p_critical`` + confidence level. + z_expectation : Triangle or DataFrame + The expected value of Z. + z_variance : Triangle or DataFrame + The variance value of Z. + """ + + def __init__( + self, + triangle: Triangle, + p_critical: float = 0.1, + total: bool = True + ): + + def pZlower( + z: int, + n: int, + p: float = 0.5 + ) -> float: + return min(1, 2 * binom.cdf(z, n, p)) + + self.p_critical = p_critical + + # Check that critical value is a probability + validate_critical(p_critical=p_critical) + + self.total = total + triangle = triangle.set_backend("numpy") + xp = triangle.get_array_module() + lr = triangle.link_ratio + + # Rank link ratios for each column + m1 = xp.apply_along_axis( + func1d=rankdata, + axis=2, + arr=lr.values) * (lr.values * 0 + 1) + + med = xp.nanmedian( + a=m1, + axis=2, + keepdims=True + ) + + m1large = (xp.nan_to_num(m1) > med) + (lr.values * 0) + m1small = (xp.nan_to_num(m1) < med) + (lr.values * 0) + m2large = triangle.link_ratio + m2large.values = m1large + m2small = triangle.link_ratio + m2small.values = m1small + S = xp.nan_to_num(m2small.dev_to_val().sum(axis=2).set_backend('numpy').values) + L = xp.nan_to_num(m2large.dev_to_val().sum(axis=2).set_backend('numpy').values) + z = xp.minimum(L, S) + n = L + S + m = xp.floor((n - 1) / 2) + c = comb(n - 1, m) + EZ = (n / 2) - c * n / (2 ** n) + VarZ = n * (n - 1) / 4 - c * n * (n - 1) / (2 ** n) + EZ - EZ ** 2 + if not self.total: + T = [] + for i in range(0, xp.max(m1large.shape[2:]) + 1): + T.append( + [ + pZlower(i, j, 0.5) + for j in range(0, xp.max(m1large.shape[2:]) + 1) + ] + ) + T = np.array(T) + z_idx, n_idx = z.astype(int), n.astype(int) + self.probs = T[z_idx, n_idx] + z_critical = triangle[triangle.valuation > triangle.valuation.min()] + # z_critical = z_critical[z_critical.development > z_critical.development.min()].dev_to_val().sum( + # "origin") * 0 + z_critical = z_critical.dev_to_val().dropna().sum("origin") * 0 + z_critical.values = np.array(self.probs) < p_critical + z_critical.odims = triangle.odims[0:1] + self.z_critical = z_critical + self.z = self.z_critical.copy() + self.z.values = z + self.z_expectation = self.z_critical.copy() + self.z_expectation.values = EZ + self.z_variance = self.z_critical.copy() + self.z_variance.values = VarZ + else: + ci2 = norm.ppf(0.5 - (1 - p_critical) / 2) * xp.sqrt(xp.sum(VarZ, axis=-1)) + self.range = (xp.sum(VarZ, axis=-1) + ci2, xp.sum(VarZ, axis=-1) - ci2) + idx = triangle.index.set_index(triangle.key_labels).index + self.z_critical = pd.DataFrame( + ( + (self.range[0] > VarZ.sum(axis=-1)) + | (VarZ.sum(axis=-1) > self.range[1]) + )[..., 0], + columns=triangle.vdims, + index=idx, + ) + self.z = pd.DataFrame( + z.sum(axis=-1)[..., 0], columns=triangle.vdims, index=idx + ) + self.z_expectation = pd.DataFrame( + EZ.sum(axis=-1)[..., 0], columns=triangle.vdims, index=idx + ) + self.z_variance = pd.DataFrame( + VarZ.sum(axis=-1)[..., 0], columns=triangle.vdims, index=idx + ) + + +def validate_critical( + p_critical: float +) -> None: + """ + Checks whether value passed to the p_critical parameter in ValuationCorrelation or DevelopmentCorrelation + classes is a percentage, that is, between 0 and 1. + + Parameters + ---------- + p_critical: float + Critical value used to test null hypothesis in Mack correlation tests. + """ + if 0 <= p_critical <= 1: + pass + else: + raise ValueError('p_critical must be between 0 and 1.') diff --git a/chainladder/core/display.py b/chainladder/legacy/display.py similarity index 100% rename from chainladder/core/display.py rename to chainladder/legacy/display.py diff --git a/chainladder/core/dunders.py b/chainladder/legacy/dunders.py similarity index 100% rename from chainladder/core/dunders.py rename to chainladder/legacy/dunders.py diff --git a/chainladder/legacy/io.py b/chainladder/legacy/io.py new file mode 100644 index 00000000..127c1f77 --- /dev/null +++ b/chainladder/legacy/io.py @@ -0,0 +1,82 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at https://mozilla.org/MPL/2.0/. +import pandas as pd +from sklearn.base import BaseEstimator +import json +import joblib +import dill + + +class TriangleIO: + def to_pickle(self, path, protocol=None): + """ Serializes triangle object to pickle. + + Parameters + ---------- + path : str + File path and name of pickle object. + protocol : + The pickle protocol to use. + + """ + with open(path, "wb") as pkl: + dill.dump(self, pkl) + + def to_json(self): + """ Serializes triangle object to json format + + Returns + ------- + string representation of object in json format + """ + metadata = { + "is_val_tri": self.is_val_tri, + "is_cumulative": self.is_cumulative, + "is_pattern": self.is_pattern, + "columns": list(self.columns), + } + out = self.cum_to_incr().dev_to_val().to_frame( + keepdims=True, origin_as_datetime=True).fillna(0) + x = out.reset_index().to_json(orient="split", date_unit="ns") + json_dict = {"metadata": json.dumps(metadata), "data": x} + sub_tris = [k for k, v in vars(self).items() if isinstance(v, TriangleIO)] + json_dict["sub_tris"] = { + sub_tri: getattr(self, sub_tri).to_json() for sub_tri in sub_tris + } + dfs = [k for k, v in vars(self).items() if isinstance(v, pd.DataFrame)] + json_dict["dfs"] = {df: getattr(self, df).to_json() for df in dfs} + dfs = [k for k, v in vars(self).items() if isinstance(v, pd.Series)] + json_dict["dfs"].update( + {df: getattr(self, df).to_frame().to_json() for df in dfs} + ) + return json.dumps(json_dict) + + +class EstimatorIO: + """ Class intended to allow persistence of estimator objects """ + + def to_pickle(self, path, protocol=None): + """ Serializes triangle object to pickle. + + Parameters + ---------- + path : str + File path and name of pickle object. + protocol : + The pickle protocol to use. + """ + with open(path, "wb") as pkl: + dill.dump(self, pkl) + + def to_json(self): + """ Serializes triangle object to json format + + Returns + ------- + string representation of object in json format + """ + params = self.get_params(deep=False) + j = lambda v: v.to_json() if isinstance(v, BaseEstimator) else v + params = {k: j(v) for k, v in params.items()} + return json.dumps({"params": params, "__class__": self.__class__.__name__}) diff --git a/chainladder/core/pandas.py b/chainladder/legacy/pandas.py similarity index 100% rename from chainladder/core/pandas.py rename to chainladder/legacy/pandas.py diff --git a/chainladder/core/slice.py b/chainladder/legacy/slice.py similarity index 100% rename from chainladder/core/slice.py rename to chainladder/legacy/slice.py diff --git a/chainladder/core/legacy.py b/chainladder/legacy/triangle.py similarity index 99% rename from chainladder/core/legacy.py rename to chainladder/legacy/triangle.py index 12b0afca..f00b26d1 100644 --- a/chainladder/core/legacy.py +++ b/chainladder/legacy/triangle.py @@ -6,10 +6,10 @@ import numpy as np import copy import warnings -from chainladder.core.base import TriangleBase +from chainladder.legacy.base import TriangleBase from chainladder.utils.sparse import sp -from chainladder.core.slice import VirtualColumns -from chainladder.core.correlation import DevelopmentCorrelation, ValuationCorrelation +from chainladder.legacy.slice import VirtualColumns +from chainladder.legacy.correlation import DevelopmentCorrelation, ValuationCorrelation from chainladder.utils.utility_functions import concat, num_to_nan, num_to_value from chainladder import options diff --git a/chainladder/utils/utility_functions.py b/chainladder/utils/utility_functions.py index 698d2d69..8d67597f 100644 --- a/chainladder/utils/utility_functions.py +++ b/chainladder/utils/utility_functions.py @@ -296,6 +296,7 @@ def concat(objs, axis, ignore_index: bool = False, sort: bool = False): ------- Updated triangle """ + from chainladder.core.core import TriangleBase if type(objs) not in (list, tuple): raise TypeError("objects to be concatenated must be in a list or tuple") if type(objs) is tuple: @@ -321,6 +322,7 @@ def concat(objs, axis, ignore_index: bool = False, sort: bool = False): l0 = l0.join(lf, how='outer', on=objs[0].key_labels + ['__origin__', '__development__']) triangle.data = l0.collect() triangle.columns=[col for obj in objs for col in obj.columns] + triangle._properties = {} return triangle def num_to_value(arr, value):