Source code for feature_encoders.models.grouped

# -*- coding: utf-8 -*-
# Copyright (c) Hebes Intelligence Private Company

# This source code is licensed under the Apache License, Version 2.0 found in the
# LICENSE file in the root directory of this source tree.

import copy
from collections import OrderedDict, defaultdict
from functools import reduce
from typing import Dict, Union

import pandas as pd
from sklearn.base import BaseEstimator, RegressorMixin
from sklearn.exceptions import NotFittedError
from sklearn.utils.validation import check_is_fitted

from ..compose import ModelStructure
from ..compose._parse import parse_encoder_definition
from ..encode import CategoricalEncoder
from ..utils import as_list, check_X, check_y
from .linear import LinearPredictor


[docs]class GroupedPredictor(RegressorMixin, BaseEstimator): """Construct one predictor per data group. The predictor splits data by the different values of a single column and fits one estimator per group. Since each of the models in the ensemble predicts on a different subset of the input data (an observation cannot belong to more than one clusters), the final prediction is generated by vertically concatenating all the individual models’ predictions. Args: group_feature (str): The name of the column of the input dataframe to use as the grouping set. model_conf (Dict[str, Dict]): A dictionary that includes information about the base model's structure. feature_conf (Dict[str, Dict], optional): A dictionary that maps feature generator names to the classes for the generators' validation and creation. Defaults to None. estimator_params (dict or tuple of tuples, optional): The parameters to use when instantiating a new base estimator. If none are given, default parameters are used. Defaults to tuple(). fallback (bool, optional): Whether or not to fall back to a global model in case a group parameter is not found during `.predict()`. Otherwise, an exception will be raised. Defaults to False. """ def __init__( self, *, group_feature: str, model_conf: Dict[str, Dict], feature_conf: Dict[str, Dict] = None, estimator_params=tuple(), fallback=False, ): self.group_feature = group_feature self.model_conf = model_conf self.feature_conf = feature_conf self.estimator_params = estimator_params self.fallback = fallback self.components_ = ModelStructure.from_config( model_conf, feature_conf ).components self.estimators_ = OrderedDict({}) self.added_features_ = [] self.encoders_ = { "main_effects": defaultdict(dict), "interactions": defaultdict(dict), } @property def n_parameters(self): try: check_is_fitted(self, "fitted_") except NotFittedError as exc: raise ValueError( "The number of parameters is acceccible only after " "the model has been fitted" ) from exc else: n_parameters = 0 for name, est in self.estimators_.items(): if name != "_global_": n_parameters += est.n_parameters return n_parameters @property def dof(self): try: check_is_fitted(self, "fitted_") except NotFittedError as exc: raise ValueError( "The degrees of freedom are acceccible only after " "the model has been fitted" ) from exc else: dof = 0 for name, est in self.estimators_.items(): if name != "_global_": dof += est.dof return dof def _fit_single_group(self, group_name, model_structure, X, y): try: params = ( dict(self.estimator_params) if self.estimator_params is not None else {} ) estimator = LinearPredictor(model_structure=model_structure, **params) estimator = estimator.fit(X, y) except Exception as e: raise type(e)(f"Exception for group {group_name}: {e}") else: return estimator def _predict_single_group(self, group_name, X, include_components): """Predict a single group by getting its estimator""" try: estimator = self.estimators_[group_name] except KeyError: if self.fallback: estimator = self.estimators_["_global_"] else: raise ValueError(f"Found new group {group_name} during predict") finally: pred = estimator.predict(X, include_components=include_components) if not isinstance(pred, (pd.Series, pd.DataFrame)): pred = pd.DataFrame( data=pred, index=X.index, columns=[self.target_name_] ) return pred def _update_local_conf(self, conf, X, y=None, fitting=True): for name, props in conf["main_effects"].items(): if props["type"] == "categorical": if fitting: stratify_by = ( None if not props["stratify_by"] else [self.group_feature] + props["stratify_by"] ) enc = CategoricalEncoder( **dict( {key: val for key, val in props.items() if key != "type"}, encode_as="ordinal", stratify_by=stratify_by, ) ) encoded = enc.fit_transform(X, y).squeeze() self.encoders_["main_effects"][name] = enc else: enc = self.encoders_["main_effects"][name] encoded = enc.transform(X).squeeze() new_name = "__for__".join((props.get("feature"), name)) X[new_name] = encoded props.update( { "feature": new_name, "max_n_categories": None, "stratify_by": None, } ) for pair_name, pair_props in conf["interactions"].items(): for name in pair_name: props = pair_props[name] if props["type"] == "categorical": if fitting: stratify_by = ( None if not props["stratify_by"] else [self.group_feature] + props["stratify_by"] ) enc = CategoricalEncoder( **dict( { key: val for key, val in props.items() if key != "type" }, encode_as="ordinal", stratify_by=stratify_by, ) ) encoded = enc.fit_transform(X, y).squeeze() self.encoders_["interactions"][pair_name].update({name: enc}) else: enc = self.encoders_["interactions"][pair_name][name] encoded = enc.transform(X).squeeze() new_name = "__for__".join( (props.get("feature"), ":".join(pair_name)) ) X[new_name] = encoded props.update( { "feature": new_name, "max_n_categories": None, "stratify_by": None, } ) return (conf, X) if fitting else X def _create_new_features(self): added_features = self.components_.pop("add_features") self.components_["add_features"] = {} if added_features: for _, props in added_features.items(): fgen_type = props.pop("type") if isinstance(fgen_type, str): if self.feature_conf is None: raise ValueError( "A mapping between types and classes has not been provided." ) targets = self.feature_conf.get(fgen_type) if targets is None: raise ValueError( f"Type {fgen_type} not found in provided mapping" ) class_obj = parse_encoder_definition(targets["generate"]) self.added_features_.append(class_obj(**props)) else: self.added_features_.append(fgen_type)
[docs] def fit(self, X: pd.DataFrame, y: Union[pd.DataFrame, pd.Series]): """Fit the estimator with the available data. Args: X (pandas.DataFrame): Input data. y (pandas.Series or pandas.DataFrame): Target data. Raises: Exception: If the estimator is re-fitted. An estimator object can only be fitted once. ValueError: If the input data does not pass the checks of `utils.check_X`. ValueError: If the target data does not pass the checks of `utils.check_y`. Returns: GroupedPredictor: Fitted estimator. """ try: check_is_fitted(self, "fitted_") except NotFittedError: pass else: raise Exception( "Estimator object can only be fit once. Instantiate a new object." ) self._create_new_features() # Apply the feature generators if self.added_features_: X = reduce( lambda _df, trans: trans.fit_transform(_df), self.added_features_, X ) X = check_X(X, exists=self.group_feature) if self.fallback and ("_global_" in X[self.group_feature]): raise ValueError( "Name `_global_` is reserved and cannot be used as a group name" ) y = check_y(y, index=X.index) self.target_name_ = y.columns[0] local_model_conf = copy.deepcopy(self.components_) local_model_conf, X = self._update_local_conf( local_model_conf, X, y=y, fitting=True ) for group_name, group_data in X.groupby(self.group_feature): self.estimators_[group_name] = self._fit_single_group( group_name=group_name, model_structure=ModelStructure(local_model_conf, self.feature_conf), X=group_data.drop(self.group_feature, axis=1), y=y.loc[group_data.index], ) if self.fallback: self.estimators_["_global_"] = self._fit_single_group( group_name="_global_", model_structure=ModelStructure(self.components_, self.feature_conf), X=X.drop(self.group_feature, axis=1), y=y, ) self.groups_ = as_list(self.estimators_.keys()) self.fitted_ = True return self
[docs] def predict( self, X: pd.DataFrame, include_clusters=False, include_components=False ): """Predict given new input data. Args: X (pandas.DataFrame): Input data. include_clusters (bool, optional): Whether to include the added clusters in the returned prediction. Defaults to False. include_components (bool, optional): Whether to include the contribution of the individual components of the model structure in the returned prediction. Defaults to False. Raises: ValueError: If the input data does not pass the checks of `utils.check_X`. Returns: pandas.DataFrame: The predicted values. """ check_is_fitted(self, "fitted_") if self.added_features_: X = reduce( lambda _df, trans: trans.fit_transform(_df), self.added_features_, X ) X = check_X(X, exists=self.group_feature) local_model_conf = copy.deepcopy(self.components_) X = self._update_local_conf(local_model_conf, X, fitting=False) pred = None for group_name, group_data in X.groupby(self.group_feature): group_pred = self._predict_single_group( group_name=group_name, X=group_data.drop(self.group_feature, axis=1), include_components=include_components, ) if include_clusters: group_pred = pd.concat( (group_pred, group_data[[self.group_feature]]), axis=1, ignore_index=False, ) pred = pd.concat((pred, group_pred), axis=0, ignore_index=False) pred = pred.reindex(X.index).dropna() return pred