"""
DOENUT
Design of Experiments Numerical Utility Toolkit
"""
# first we import some useful libraries
import logging
import numpy as np
import pandas as pd
import copy
from sklearn.linear_model import LinearRegression
from typing import Tuple, List, TYPE_CHECKING, Any
from doenut.utils import initialise_log
from doenut.data import ModifiableDataSet
from doenut.models import AveragedModel
if TYPE_CHECKING:
import sklearn
logger = initialise_log(__name__, logging.DEBUG)
[docs]
def set_log_level(level: "str|int") -> None:
"""Sets the global log level for the module
Parameters
----------
level : "str|int"
logging module value representing the desired log level
"""
loggers = [logger for name, logger in logging.root.manager.loggerDict.items() if name.startswith('doenut')]
for l in loggers:
if isinstance(l, logging.PlaceHolder):
# can't set placeholders
continue
l.setLevel(level)
[docs]
def orthogonal_scaling(
inputs: pd.DataFrame, axis: int = 0
) -> Tuple[pd.DataFrame, float, float]:
"""Calculates the orthoganal scaling of an array along an axis
Parameters
----------
inputs : pd.DataFrame
the dataframe to scale
axis : int, default 0
the axis to scale around (defaults to 0)
Returns
-------
pd.DataFrame:
The scaled inputs
float:
the Mj scaling parameter
float:
the Rj scaling parameter
"""
# the scaling thingy that Modde uses
inputs_max = np.max(inputs, axis)
inputs_min = np.min(inputs, axis)
Mj = (inputs_min + inputs_max) / 2
Rj = (inputs_max - inputs_min) / 2
scaled_inputs = (inputs - Mj) / Rj
return scaled_inputs, Mj, Rj
[docs]
def scale_1D_data(scaler, data, do_fit=True):
"""ELLA TODO: What does this do what it does?
Parameters
----------
scaler :
the scaler to transform the data with
data :
the data to scale
do_fit :
whether to fit the data first (default true)
Returns
-------
pd.DataFrame:
The scaled data
sklearn.scalar?
The scaler object
"""
if do_fit:
scaler.fit(data.reshape(-1, 1))
data_scaled = scaler.transform(data.reshape(-1, 1))
data_scaled = data_scaled.reshape(-1)
return data_scaled, scaler
[docs]
def scale_by(new_data: pd.DataFrame, mj: float, rj: float) -> pd.DataFrame:
"""Scales a dataframe orthogonally using the supplied parameters according to
the equation::
result = (data - Mj) / Rj
Parameters
----------
new_data : pd.DataFrame
the data to scale
mj : float
the Mj parameter
rj : float
the Rj parameter
Returns
-------
pd.DataFrame
the scaled data
"""
# the scaling thingy that Modde uses
# TODO:: Any form of sanity checking whatsoever.
new_data = (new_data - mj) / rj
return new_data
[docs]
def find_replicates(inputs: pd.DataFrame) -> np.array:
"""Find experimental settings that are replicates
Parameters
----------
inputs : pd.DataFrame
The dataframe to parse
Returns
-------
np.array:
A series of indices of all the rows which are replicates
"""
# list comps ftw!
a = [x for x in inputs[inputs.duplicated()].index]
b = [x for x in inputs[inputs.duplicated(keep="last")].index]
replicate_row_list = np.unique(np.array(a + b))
return replicate_row_list
[docs]
def train_model(
inputs: pd.DataFrame,
responses: pd.DataFrame,
test_responses: pd.DataFrame,
do_scaling_here: bool = False,
fit_intercept: bool = False,
) -> Tuple["sklearn.linear_model", pd.DataFrame, float, List[Any]]:
"""A simple function to train a model
Parameters
----------
inputs :
full set of terms for the model (x_n)
responses :
expected responses for the inputs (ground truth, y)
test_responses :
expected responses for separate test data (if used)
do_scaling_here :
whether to scale the data (Default value = False)
fit_intercept :
whether to fit the intercept (Default value = False)
Returns
-------
sklearn.linear_model:
A model fitted to the data,
pd.DataFrame:
the inputs used
float:
the R2 of that model
List[Any]:
the predictions that model makes for the original inputs
"""
if do_scaling_here:
inputs, _, _ = orthogonal_scaling(inputs, axis=0)
if test_responses is None:
test_responses = responses
model = LinearRegression(fit_intercept=fit_intercept)
model.fit(inputs, responses)
predictions = model.predict(inputs)
R2 = model.score(inputs, test_responses)
logger.debug("R squared for this model is {:.3}".format(R2))
return model, inputs, R2, predictions
[docs]
def Calculate_R2(
ground_truth: pd.DataFrame,
predictions: pd.DataFrame,
key: str,
word: str = "test",
) -> float:
"""Calculates R2 from input data
You can use this to calculate q2 if you're
using the test ground truth as the mean
else use calculate Q2
I think this is what Modde uses for PLS fitting
Parameters
----------
ground_truth :
The actual response values
predictions :
What the model guessed as the response values
key :
the column name into ground_truth that we predicted
word :
What mode we were working on
ground_truth: pd.DataFrame :
predictions: pd.DataFrame :
key: str :
word: str :
(Default value = "test")
Returns
-------
type
the R2 of the model on this data, or the Q2 if in test mode.
"""
errors = ground_truth[[key]] - predictions[[key]]
test_mean = np.mean(ground_truth[[key]], axis=0)
logger.debug(f"Mean of {word} set: {test_mean[0]}")
errors["squared"] = errors[key] * errors[key]
sum_squares_residuals = sum(errors["squared"])
logger.debug(
"Sum of squares of the residuals (explained variance) is"
f"{sum_squares_residuals}"
)
sum_squares_total = sum((ground_truth[key] - test_mean[0]) ** 2)
logger.debug(
f"Sum of squares total (total variance) is {sum_squares_total}"
)
r2 = 1 - (sum_squares_residuals / sum_squares_total)
if word == "test":
logger.debug("{} is {:.3}".format("Q2", r2))
else:
logger.debug("{} is {:.3}".format("R2", r2))
return r2
[docs]
def Calculate_Q2(
ground_truth: pd.DataFrame,
predictions: pd.DataFrame,
train_responses: pd.DataFrame,
key: str,
word: str = "test",
) -> float:
"""A different way of calculating Q2
this uses the mean from the training data, not the
test ground truth
Parameters
----------
ground_truth :
The actual response values of the test set
predictions :
The predictions of the model for the test set
train_responses :
The response values of the training set
key :
Which column in the ground_truth we are predicting
word :
The mode to run in
ground_truth: pd.DataFrame :
predictions: pd.DataFrame :
train_responses: pd.DataFrame :
key: str :
word: str :
(Default value = "test")
Returns
-------
type
The calculated Coefficient (R2/Q1)
"""
errors = ground_truth[[key]] - predictions[[key]]
train_mean = np.mean(train_responses[[key]], axis=0)
test_mean = np.mean(ground_truth[[key]], axis=0)
logger.debug(f"Mean of {word} set: {test_mean.iloc[0]}")
logger.debug(f"Mean being used: {train_mean.iloc[0]}")
errors["squared"] = errors[key] * errors[key]
sum_squares_residuals = sum(errors["squared"])
logger.debug(
"Sum of squares of the residuals (explained variance) is"
f"{sum_squares_residuals}"
)
sum_squares_total = sum((ground_truth[key] - train_mean.iloc[0]) ** 2)
# stuff from Modde
# errors/1
logger.debug(
f"Sum of squares total (total variance) is {sum_squares_total}"
)
result = 1 - (sum_squares_residuals / sum_squares_total)
logger.info(f"{'Q2' if word=='test' else 'R2'} is {round(result,3)}")
return result
[docs]
def dunk(setting: "str|None" = None) -> None:
"""dunk your doenut
Parameters
----------
setting : str, default None
what you are dunking it into
"""
if setting == "coffee":
print("Splash!")
else:
print("Requires coffee")
return
[docs]
def average_replicates(
inputs: pd.DataFrame, responses: pd.DataFrame
) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""averages inputs that are the same
Parameters
----------
inputs :
The input data to average
responses :
The responses to averaged
inputs: pd.DataFrame :
responses: pd.DataFrame :
Returns
-------
type
A tuple of the averaged inputs and responses
"""
whole_inputs = inputs
averaged_responses = pd.DataFrame()
averaged_inputs = pd.DataFrame()
duplicates = [x for x in whole_inputs[whole_inputs.duplicated()].index]
duplicates_for_averaging = {}
non_duplicate_list = [x for x in whole_inputs.index if x not in duplicates]
for non_duplicate in non_duplicate_list:
this_duplicate_list = []
non_duplicate_row = whole_inputs.loc[[non_duplicate]]
for duplicate in duplicates:
duplicate_row = whole_inputs.loc[[duplicate]]
if non_duplicate_row.equals(duplicate_row):
this_duplicate_list.append(duplicate)
logger.debug(
f"found duplicate pairs: {non_duplicate}, {duplicate}"
)
if len(this_duplicate_list) > 0:
duplicates_for_averaging[non_duplicate] = this_duplicate_list
else:
averaged_inputs = pd.concat([averaged_inputs, non_duplicate_row])
averaged_responses = pd.concat(
[averaged_responses, responses.iloc[[non_duplicate]]]
)
for non_duplicate, duplicates in duplicates_for_averaging.items():
# print(f"nd: {non_duplicate}")
to_average = whole_inputs.loc[[non_duplicate]]
to_average_responses = responses.loc[[non_duplicate]]
for duplicate in duplicates:
to_average = pd.concat([to_average, whole_inputs.loc[[duplicate]]])
to_average_responses = pd.concat(
[to_average_responses, responses.loc[[duplicate]]]
)
meaned = to_average.mean(axis=0)
meaned_responses = to_average_responses.mean(axis=0)
try:
averaged_inputs = pd.concat(
[averaged_inputs, pd.DataFrame(meaned).transpose()],
ignore_index=True,
)
averaged_responses = pd.concat(
[
averaged_responses,
pd.DataFrame(meaned_responses).transpose(),
],
ignore_index=True,
)
except TypeError:
averaged_inputs = pd.DataFrame(meaned).transpose()
averaged_responses = pd.DataFrame(meaned_responses).transpose()
return averaged_inputs, averaged_responses
[docs]
def calc_ave_coeffs_and_errors(coeffs, labels, errors="std", normalise=False):
"""Coefficient plot
set error to 'std' for standard deviation
set error to 'p95' for 95th percentile (
approximated by 2*std)
Parameters
----------
coeffs :
The coefficents to calculate from
labels :
No longer used?
errors :
The type of error to calculate, C{std} or C{p95} (Default value = "std")
normalise :
Whether to normalise the data prior to calculation (Default value = False)
Returns
-------
type
A tuple of the averaged coefficients and their error bars
"""
ave_coeffs = np.mean(coeffs, axis=0)[0]
stds = np.std(coeffs, axis=0)[0]
if normalise:
ave_coeffs = ave_coeffs / stds
stds = np.ones_like(ave_coeffs)
if errors == "std":
error_bars = stds
elif errors == "p95":
# this is an approximation assuming a gaussian distribution in your coeffs
error_bars = 2 * stds
else:
raise ValueError(
f"Error: errors setting {errors} not known, chose std or p95"
)
return ave_coeffs, error_bars
[docs]
def autotune_model(
inputs,
responses,
source_list,
response_selector=[0],
use_scaled_inputs=True,
do_scaling_here=True,
drop_duplicates="average",
errors="p95",
normalise=True,
do_hierarchical=True,
remove_significant=False,
):
"""Attempts to automatically tune a parsimonious model
TODO:: update to new code and remove redundant parameters
Parameters
----------
inputs :
The input data to train on
responses :
The response values for the input data
source_list :
param response_selector: (Optional) Which columns in responses to use
use_scaled_inputs :
Optional) Whether to scale the inputs before calculations (Default value = True)
do_scaling_here :
Optional) Whether to scale each set of train/test data (Default value = True)
drop_duplicates :
Optional) Do we ingnore (C{'no'}), C{'average'}, C{'Drop'} duplicate input values (Default value = "average")
errors :
Optional) C{'p95'} for 95th percentile or C{'std'} for standard deviation for error calculation (Default value = "p95")
normalise :
Optional) Whether to normalise the coefficents for error calculation (Default value = True)
do_hierarchical :
Optional) Do we maintain a hierarchical model? (Default value = True)
remove_significant :
Optional) Model will continue removing terms until only one is left (Default value = False)
response_selector :
(Default value = [0])
Returns
-------
type
A tuple of the terms used in the final model and the final model.
"""
sat_inputs = inputs
logger.debug(f"Source list is {source_list}")
input_selector = [i for i in range(len(sat_inputs.columns))]
output_indices = input_selector
# global list of all column names.
input_terms = list(sat_inputs.columns)
output_terms = input_terms
logger.debug("numbers\tnames")
for i, v in enumerate(input_terms):
logger.debug(f"{i}\t{v}")
this_model = None
have_removed = True
R2_over_opt = []
Q2_over_opt = []
n_terms_over_opt = []
terms = []
while have_removed:
logger.info("Beginning loop")
selected_input_indices = output_indices
selected_input_terms = output_terms
if len(selected_input_indices) == 0:
break
data = ModifiableDataSet(sat_inputs, responses)
if selected_input_terms:
data.filter(selected_input_terms)
this_model = AveragedModel(
data, scale_run_data=True, drop_duplicates=drop_duplicates
)
selected_inputs = sat_inputs.iloc[:, selected_input_indices]
selected_input_terms = list(selected_inputs.columns)
R2_over_opt.append(this_model.r2)
Q2_over_opt.append(this_model.q2)
# cell 2
# print("Cell 2:")
logger.info(f"Selected terms {selected_input_terms}")
logger.info(f"Source List: {source_list}")
# build a dictionary mapping from input term to the
# set of derived term's indices.
# Note that we are only caring about indices still in
# selected_input_indices (I.e. ones we have not thrown out!)
dependency_dict = {}
for i in selected_input_indices:
dependency_dict[i] = set()
# ignore 1st order terms. They have no antecedents
if isinstance(source_list[i], str):
continue
# single term - a direct power of a 1st order term.
if isinstance(source_list[i], int):
if i in selected_input_indices:
dependency_dict[source_list[i]].add(i)
# some other 2nd+ term.
if isinstance(source_list[i], list):
for x in source_list[i]:
if i in selected_input_indices:
try:
dependency_dict[x].add(i)
except: # TODO:: fix blank except. I _think_ KeyError
if do_hierarchical:
print(
"Error: Heirarchical model missing lower level terms!!!!"
)
logger.info(f"Dependencies: {dependency_dict}")
# Handy shortcut - since the empty set is considered false,
# we can just test dependency_dict[some_term] to see if there
# are still dependents.
# cell 3
# enforce_hierarchical_model = do_hierarchical
# whether to enforce hierarchy over terms (i.e. a lower order term
# must be present for a higher order one)
ave_coeffs, error_bars = calc_ave_coeffs_and_errors(
coeffs=this_model.coeffs,
labels=selected_input_terms,
errors="p95",
normalise=True,
)
n_terms_over_opt.append(len(ave_coeffs))
# create a copy of source_list that we will modify for the next iteration
output_indices = list(selected_input_indices)
# build a list of all terms whose error bar crosses 0.
# Values are tuples of the form (source_list index, |error_bar_size|)
insignificant_terms = []
for i in range(len(ave_coeffs)):
if abs(ave_coeffs[i]) < abs(error_bars[i]):
# print("{:.2}- {:.2}", ave_coeffs[i], error_bars[i])
logger.debug(f"{i}:\t{input_terms[i]}\t{source_list[i]}")
insignificant_terms.append(
(selected_input_indices[i], abs(ave_coeffs[i]))
) # diffs.append(abs(ave_coeffs[i]) - abs(error_bars[i]))
# for removing smallest significant terms
if insignificant_terms == [] and remove_significant:
for i in range(len(ave_coeffs)):
# print("{:.2}- {:.2}", ave_coeffs[i], error_bars[i])
logger.info(f"{i}:\t{input_terms[i]}\t{source_list[i]}")
insignificant_terms.append(
(selected_input_indices[i], abs(ave_coeffs[i]))
) # diffs.append(abs(ave_coeffs[i]) - abs(error_bars[i]))
# Now sort in order of ave_coeff
insignificant_terms = sorted(insignificant_terms, key=lambda x: x[1])
logger.info(f"Insignificant Terms: {insignificant_terms}")
# Now find the first term we can remove (if any)
have_removed = False
for idx, error_value in insignificant_terms:
# If it has dependents, and you're doing an heirarchical model skip it
if do_hierarchical:
if dependency_dict[idx]:
continue
logger.info(
f"removing term {input_terms[idx]} ({idx}) with error {error_value}"
)
output_indices.remove(idx)
output_terms.remove(output_terms[idx])
have_removed = True
break
logger.info(f"output_indices are {output_indices}")
terms.append(output_indices)
return (
output_indices,
this_model,
)
[docs]
def map_chemical_space(
unscaled_model,
x_key,
y_key,
c_key,
x_limits,
y_limits,
constant,
n_points,
hook_function,
):
"""Calculates a three way map of chemical space for plotting
#TODO:: Should move this to doenut.plot
Parameters
----------
unscaled_model :
The model to plot
x_key :
What key to use for the X axis
y_key :
What key to use for the Y axis
c_key :
What key to use for the C axis
x_limits :
Tuple of min/max range of X to plot
y_limits :
Tuple of min/max range of y to plot
constant :
The value for C
n_points :
How many marks along each axis to generate
hook_function :
A custom data processing function for post processing the data
Returns
-------
type
Three meshes of the model's predictions for the keys/ranges predicted.
"""
min_x = x_limits[0]
max_x = x_limits[1]
min_y = y_limits[0]
max_y = y_limits[1]
x = np.linspace(min_x, max_x, n_points)
y = np.linspace(min_y, max_y, n_points)
mesh_x, mesh_y = np.meshgrid(x, y)
z_df = pd.DataFrame()
z_df[x_key] = mesh_x.reshape(-1)
z_df[y_key] = mesh_y.reshape(-1)
z_df[c_key] = constant
# add any extra terms
z_df = hook_function(z_df)
mesh_z = unscaled_model.predict(z_df).reshape(n_points, n_points)
return mesh_x, mesh_y, mesh_z
[docs]
def add_higher_order_terms(
inputs: pd.DataFrame,
add_squares: bool = True,
add_interactions: bool = True,
column_list: list = [],
) -> Tuple[pd.DataFrame, List]:
"""Generate a saturated set of inputs by adding the power and interaction terms
Currently does not go above power of 2
Parameters
----------
inputs :
The data to generate from
add_squares :
Optional) Whether to add square terms, e.g. x_1*2
add_interactions :
Optional) Whether to add interaction terms, e.g. x_1*x_2
column_list :
Optional) Which columns to generate from
inputs: pd.DataFrame :
add_squares: bool :
(Default value = True)
add_interactions: bool :
(Default value = True)
column_list: list :
(Default value = [])
Returns
-------
type
Tuple of the saturated inputs, and a list of which inputs created
which input column.
"""
sat_inputs = copy.deepcopy(inputs)
if not column_list:
# do all columns
column_list = [x for x in inputs.columns]
logger.debug(f"Input array has columns {column_list}")
source_list = [x for x in column_list]
if add_squares:
logger.debug("Adding square terms:")
for i in range(len(column_list)):
source_list.append(i)
input_name = column_list[i]
new_name = input_name + "**2"
logger.debug(new_name)
sat_inputs[new_name] = inputs[input_name] * inputs[input_name]
if add_interactions:
logger.debug("Adding interaction terms:")
for i in range(len(column_list)):
for j in range(i + 1, len(column_list)):
source_list.append([i, j])
input_name_1 = column_list[i]
input_name_2 = column_list[j]
new_name = input_name_1 + "*" + input_name_2
logger.debug(new_name)
sat_inputs[new_name] = (
inputs[input_name_1] * inputs[input_name_2]
)
return sat_inputs, source_list
[docs]
def predict_from_model(model, inputs, input_selector):
"""Reorgs the inputs and does a prediction
Parameters
----------
model :
the model to use
inputs :
the saturated inputs
input_selector :
the subset of inputs the model is using
Returns
-------
type
Tuple of the predictions and the terms used to generate them
"""
list_of_terms = [inputs.columns[x] for x in input_selector]
model_inputs = inputs[list_of_terms]
predictions = model.predict(model_inputs)
return predictions, list_of_terms