在计算机编程中,类是一种有用的方式,用于组织数据(属性)和函数(方法)。例如,你可以定义一个类,该类定义了与机器学习模型相关的属性和方法。此类的实例可以具有训练数据文件名、模型类型等属性。与这些属性相关联的方法可以是拟合(fit)、预测(predict)和验证(validate)。
除了机器学习之外,类在数据科学的各个领域都有广泛的应用。你可以使用类来组织各种EDA任务、特征工程操作和机器学习模型训练。这非常理想,因为如果编写得当,类可以使现有的属性和方法易于理解、修改和调试。尤其是如果类方法被定义为完成单个明确定义的任务,这一点尤为明显。通常最佳的做法是定义只执行一项任务的函数,而类可以更直观地理解和维护这些方法。
尽管使用类可以使代码维护更简单直接,但随着复杂性的增加,理解起来也可能变得更加困难。如果你希望为基本的EDA、特征工程和模型训练组织属性和方法,一个单独的类可能足够了。但是,随着每种任务类型添加更多属性和方法,这些对象的初始化可能会变得相当模糊,特别是对于阅读你的代码的合作者。
基于此,最好针对每种任务类型(EDA、特征工程、机器学习)拥有助手类,而不是随着复杂性的增加使用单个类。在开发复杂的机器学习工作流时,应该有单独的EDA、特征工程和机器学习类,而不是单个类。
在这里,我们将考虑这些任务类型中的每一种,并了解如何编写一个单一类,使我们能够执行这些任务。
对于EDA,我们的类将允许我们读取数据、生成直方图和散点图。对于特征工程,我们的类将具有进行对数变换的方法。最后,对于机器学习,我们的类将具有拟合、预测和验证方法。
我们将看到随着我们添加额外的属性和方法,类的实例化和方法调用变得越来越难以阅读。我们将为每种任务类型添加额外的方法和属性,并说明随着复杂性的增加,可读性如何受到影响。然后,我们将看到如何将类的部分拆分为更易于理解和管理的助手类。
在这个工作中,我将在Deepnote中编写代码,Deepnote是一个协作的数据科学笔记本,可以轻松运行可重复的实验。
我们将使用医疗费用数据集进行工作。我们将使用患者的属性,如年龄、身体质量指数和子女数量来预测医疗费用。该数据在数据库内容许可证(DbCL:公共领域)下是公开免费使用、修改和共享的。
首先,让我们转到Deepnote并创建一个新项目(如果你还没有帐户,可以免费注册)。
让我们创建一个名为'helper_classes'的项目,并在该项目中创建一个名为'helper_classes_ds'的笔记本。还要将insurance.csv文件拖放到页面左侧面板上的“FILES”处:
接下来,我们将定义一个类,该类包含机器学习工作流程中一些基本步骤的高级概述。让我们首先导入我们将使用的所有软件包:
from sklearn.linear_model import LinearRegression
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
让我们定义一个名为'MLworkflow'的类,其中包含一个初始化方法,该方法初始化我们将用于存储模型预测和性能的字典。我们还将定义一个存储医疗费用数据的类属性:
class MLworkflow(object):
def __init__(self):
self._performance = {}
self._predictions = {}
self.data = pd.read_csv("insurance.csv")
接下来,我们将定义一个名为'eda'的方法,执行一些简单的可视化操作。如果你为变量histogram传递'True',它将生成指定的数值特征的直方图。如果你为变量scatter_plot传递'True',它将生成数值特征与目标之间的散点图:
class MLworkflow(object):
...
def eda(self, feature, target, histogram, scatter_plot):
self.corr = self.data[feature].corr(self.data[target])
if histogram:
self.data[feature].hist()
plt.show()
if scatter_plot:
plt.scatter(self.data[feature], self.data[target])
plt.show()
接下来,我们将定义另一个名为'data_prep'的方法,该方法定义了我们的输入和输出。我们还将定义一个名为transform的参数,可以用它来对数值列进行对数转换:
class MLworkflow(object):
...
def data_prep(self, features, target, transform):
for feature in features:
if transform:
self.data[feature] = np.log(self.data[feature])
self.X = self.data[features]
self.y = self.data[target]
我们还将定义一个fit方法。它将拆分用于训练和测试的数据,其中测试大小可以由'split'参数指定。我们还提供了将模型拟合为线性回归或随机森林模型的选项。这显然可以扩展到任意数量的模型类型:
class MLworkflow(object):
...
def fit(self, model_name, split):
X_train, X_test, y_train, y_test = train_test_split(self.X, self.y, random_state=42, test_size=split)
self.X_test = X_test
self.y_test = y_test
if model_name == 'lr':
self.model = LinearRegression()
self.model.fit(X_train, y_train)
elif model_name == 'rf':
self.model = RandomForestRegressor(random_state=42)
self.model.fit(X_train, y_train)
然后,我们将定义一个predict方法,在我们的测试集上生成预测结果。我们将把结果存储在我们的预测字典中,其中字典键将是模型类型:
class MLworkflow(object):
...
def predict(self, model_name):
self._predictions[model_name] = self.model.predict(self.X_test)
最后,对于每种模型类型计算性能。我们将使用平均绝对误差作为性能指标,并使用一个名为validate的方法将这些值存储在我们的性能字典中:
class MLworkflow(object):
...
def validate(self, model_name):
self._performance[model_name] = mean_absolute_error(self._predictions[model_name], self.y_test)
完整的类如下所示:
class MLworkflow(object):
def __init__(self):
self._performance = {}
self._predictions = {}
self.data = pd.read_csv("insurance.csv")
def eda(self, feature, target, histogram, scatter_plot):
self.corr = self.data[feature].corr(self.data[target])
if histogram:
self.data[feature].hist()
plt.show()
if scatter_plot:
plt.scatter(self.data[feature], self.data[target])
plt.show()
def data_prep(self, features, target, transform):
for feature in features:
if transform:
self.data[feature] = np.log(self.data[feature])
self.X = self.data[features]
self.y = self.data[target]
def fit(self, model_name, split):
X_train, X_test, y_train, y_test = train_test_split(self.X, self.y, random_state=42, test_size=split)
self.X_test = X_test
self.y_test = y_test
if model_name == 'lr':
self.model = LinearRegression()
self.model.fit(X_train, y_train)
elif model_name == 'rf':
self.model = RandomForestRegressor(random_state=42)
self.model.fit(X_train, y_train)
def predict(self, model_name):
self._predictions[model_name] = self.model.predict(self.X_test)
def validate(self, model_name):
self._performance[model_name] = mean_absolute_error(self._predictions[model_name], self.y_test)
我们可以定义该类的一个实例并生成一些可视化效果:
mlworkflow = MLworkflow()
mlworkflow.eda('age', 'charges', True, True)
print(mlworkflow.corr)
mlworkflow.eda('bmi', 'charges', True, True)
print(mlworkflow.corr)
然后,我们可以定义一个实例并构建线性回归和随机森林模型。我们首先定义一个类的实例,并调用数据准备方法,使用我们希望使用的输入和输出:
model = MLworkflow()
features = ['bmi', 'age']
model.data_prep(features, 'charges', True)
然后,我们可以通过调用fit方法来构建一个线性回归模型,使用model_name参数值'lr'表示线性回归,测试集大小为20%。然后在模型实例上调用predict和validate方法:
model.fit('lr', 0.2)
model.predict('lr')
model.validate('lr')
我们可以对随机森林模型做同样的操作:
model.fit('rf', 0.2)
model.predict('rf')
model.validate('rf')
结果是,我们的模型对象将具有一个名为_performance的属性。我们可以通过模型对象访问它,并打印出字典:
model = MLworkflow()
features = ['bmi', 'age']
model.data_prep(features, 'charges', True)
model.fit('lr', 0.2)
model.predict('lr')
model.validate('lr')
model.fit('rf', 0.2)
model.predict('rf')
model.validate('rf')
print(model._performance)
{'lr': 9232.307984997156, 'rf': 9161.66313279731}
我们可以看到,我们有一个带有键'lr'和'rf'的字典,其平均绝对误差值分别为9232和9161。
虽然用于定义该类别的代码足够简单,但随着复杂性的增加,阅读和解释它可能变得困难。
例如,除了能够监控模型类型之外,如果我们希望能够在数据中的不同类别上构建模型怎么办。例如,如果我们希望仅在女性患者上训练线性回归模型或仅在男性患者上训练随机森林模型。让我们看看如何编写这个修改过的类别。
与之前类似,我们定义了一个初始化方法,在该方法中初始化必要的字典。我们将添加一个名为models的新字典:
class MLworkflowExtended(object):
def __init__(self):
self._performance = {}
self._predictions = {}
self._models = {}
self.data = pd.read_csv("insurance.csv")
eda和数据准备方法基本保持不变:
class MLworkflowExtended(object):
...
def eda(self, feature, target, histogram, scatter_plot):
self.corr = self.data[feature].corr(self.data[target])
if histogram:
self.data[feature].hist()
plt.show()
if scatter_plot:
plt.scatter(self.data[feature], self.data[target])
plt.show()
def data_prep(self, features, target, transform):
self.target = target
for feature in features:
if transform:
self.data[feature] = np.log(self.data[feature])
fit方法有很多变化。现在它接受model_category和category_values两个变量,以及我们随机森林算法的默认值。它还检查初始化的字典中是否存在类别值。如果不存在,它们将被初始化为空字典。结果是一个字典的字典,最外层的键是类别值。它们映射到的值是包含算法类型和性能的字典。
结构如下所示:
_performance = {'category1':{'algorithm1':100, 'algorithm2':200}, 'category2':{'algorithm1':300, 'algorithm2':500}
我们还将根据指定的类别筛选数据。
对应此逻辑的代码如下所示:
def fit(self, model_name, model_category, category_value, split, n_estimators=10, max_depth=10):
self.split = split
self.model_category = model_category
self.category_value = category_value
if category_value not in self._predictions:
self._predictions[category_value]= {}
if category_value not in self._performance:
self._performance[category_value] = {}
if category_value not in self._models:
self._models[category_value] = {}
self.data_cat = self.data[self.data[model_category] == category_value]
其余逻辑与之前类似。完整函数如下所示:
def fit(self, model_name, model_category, category_value, split, n_estimators=10, max_depth=10):
self.split = split
self.model_category = model_category
self.category_value = category_value
if category_value not in self._predictions:
self._predictions[category_value]= {}
if category_value not in self._performance:
self._performance[category_value] = {}
if category_value not in self._models:
self._models[category_value] = {}
self.data_cat = self.data[self.data[model_category] == category_value]
self.X = self.data_cat[features]
self.y = self.data_cat[self.target]
X_train, X_test, y_train, y_test = train_test_split(self.X, self.y, random_state=42, test_size=split)
self.X_test = X_test
self.y_test = y_test
if model_name == 'lr':
self.model = LinearRegression()
self.model.fit(X_train, y_train)
elif model_name == 'rf':
self.model = RandomForestRegressor(n_estimators=n_estimators, max_depth = max_depth, random_state=42)
self.model.fit(X_train, y_train)
self._models[category_value] = self.model
请注意,此函数的复杂性显著增加。
预测和验证方法类似。不同之处在于我们现在按类别存储预测和性能:
def predict(self, model_name):
self._predictions[self.category_value][model_name] = self._models[self.category_value].predict(self.X_test)
def validate(self, model_name):
self._performance[self.category_value][model_name] = mean_absolute_error(self._predictions[self.category_value][model_name], self.y_test)
完整的类别如下所示:
class MLworkflowExtended(object):
def __init__(self):
self._performance = {}
self._predictions = {}
self._models = {}
self.data = pd.read_csv("insurance.csv")
def eda(self, feature, target, histogram, scatter_plot):
self.corr = self.data[feature].corr(self.data[target])
if histogram:
self.data[feature].hist()
plt.show()
if scatter_plot:
plt.scatter(self.data[feature], self.data[target])
plt.show()
def data_prep(self, features, target, transform):
self.target = target
for feature in features:
if transform:
self.data[feature] = np.log(self.data[feature])
def fit(self, model_name, model_category, category_value, split, n_estimators=10, max_depth=10):
self.split = split
self.model_category = model_category
self.category_value = category_value
if category_value not in self._predictions:
self._predictions[category_value]= {}
if category_value not in self._performance:
self._performance[category_value] = {}
if category_value not in self._models:
self._models[category_value] = {}
self.data_cat = self.data[self.data[model_category] == category_value]
self.X = self.data_cat[features]
self.y = self.data_cat[self.target]
X_train, X_test, y_train, y_test = train_test_split(self.X, self.y, random_state=42, test_size=split)
self.X_test = X_test
self.y_test = y_test
if model_name == 'lr':
self.model = LinearRegression()
self.model.fit(X_train, y_train)
elif model_name == 'rf':
self.model = RandomForestRegressor(n_estimators=n_estimators, max_depth = max_depth, random_state=42)
self.model.fit(X_train, y_train)
self._models[category_value] = self.model
def predict(self, model_name):
self._predictions[self.category_value][model_name] = self._models[self.category_value].predict(self.X_test)
def validate(self, model_name):
self._performance[self.category_value][model_name] = mean_absolute_error(self._predictions[self.category_value][model_name], self.y_test)
然后,我们可以运行根据模型类型和类别变化的实验。例如,让我们在单独的女性和男性数据集上构建一些线性回归和随机森林模型:
model = MLworkflowExtended()
features = ['bmi', 'age']
model.data_prep(features, 'charges', True)
model.fit('lr', 'sex', 'female', 0.2)
model.predict('lr')
model.validate('lr')
print(model._performance)
model.fit('rf', 'sex', 'female', 0.2)
model.predict('rf')
model.validate('rf')
print(model._performance)
model.fit('rf','sex', 'male', 0.2, 100, 100)
model.predict('rf')
model.validate('rf')
print(model._performance)
model.fit('lr','sex', 'male', 0.2)
model.predict('lr')
model.validate('lr')
print(model._performance)
{'female': {'lr': 8016.511847126879}}
{'female': {'lr': 8016.511847126879, 'rf': 8626.57969374399}}
{'female': {'lr': 8016.511847126879, 'rf': 8626.57969374399}, 'male': {'rf': 10547.991737227838}}
{'female': {'lr': 8016.511847126879, 'rf': 8626.57969374399}, 'male': {'rf': 10547.991737227838, 'lr': 9604.81470061645}}
同样,我们可以对地区类别进行相同的操作。让我们针对西南和西北运行实验:
model = MLworkflowExtended()
features = ['bmi', 'age']
model.data_prep(features, 'charges', True)
model.fit('lr', 'region', 'southwest', 0.2)
model.predict('lr')
model.validate('lr')
print(model._performance)
model.fit('rf', 'region', 'southwest', 0.2)
model.predict('rf')
model.validate('rf')
print(model._performance)
model.fit('rf','region', 'northwest', 0.2, 100, 100)
model.predict('rf')
model.validate('rf')
print(model._performance)
model.fit('lr','region', 'northwest', 0.2)
model.predict('lr')
model.validate('lr')
print(model._performance)
{'southwest': {'lr': 8899.213068898414}}
{'southwest': {'lr': 8899.213068898414, 'rf': 8600.643187882553}}
{'southwest': {'lr': 8899.213068898414, 'rf': 8600.643187882553}, 'northwest': {'rf': 7070.996465990001}}
{'southwest': {'lr': 8899.213068898414, 'rf': 8600.643187882553}, 'northwest': {'rf': 7070.996465990001, 'lr': 7481.114681045734}}
虽然这样做完全可以,但是运行特定实验的代码变得难以阅读。例如,在拟合我们的随机森林时,初次阅读我们的代码的人可能不清楚fit方法中的所有传递值的含义:
model.fit('rf','region', 'northwest', 0.2, 100, 100)
随着我们增加类的功能,这可能变得更加复杂。
为了避免不断增加的复杂性,通常可以使用辅助类,这些类是基于ML工作流的每个部分进行定义的。
我们可以从定义一个EDA辅助类开始:
class EDA(object):
def __init__(self):
self.data = pd.read_csv("insurance.csv")
def eda(self, feature, target, histogram, scatter_plot):
self.target = target
self.corr = self.data[feature].corr(self.data[target])
if histogram:
self.data[feature].hist()
plt.show()
if scatter_plot:
plt.scatter(self.data[feature], self.data[target])
plt.show()
然后,我们可以在特征工程类中使用eda类来访问我们的数据:
class FeatureEngineering(object):
def __init__(self):
eda = EDA()
self.data = eda.data
def engineer(self, features, target, transform, display):
self.target = target
for feature in features:
if transform and display:
print(f"{feature}/{target} correlation Before log-tranform:", self.data[feature].corr(self.data[self.target]))
self.data[feature] = np.log(self.data[feature])
print(f"{feature}/{target} correlation After log-tranform:", self.data[feature].corr(self.data[self.target]))
接下来,我们将定义数据准备类。在数据准备类的init方法中,我们将初始化用于存储模型、预测结果和性能的字典。我们还将使用特征工程类来对bmi和age应用对数变换。最后,我们将存储修改后的数据和目标变量在数据准备属性中:
class DataPrep(object):
def __init__(self):
self._performance = {}
self._predictions = {}
self._models = {}
feature_engineering = FeatureEngineering()
feature_engineering.engineer(['bmi', 'age'], 'charges', True, False)
self.data = feature_engineering.data
self.target = feature_engineering.target
def dataprep(self, model_name, model_category, category_value, split):
self.split = split
self.model_category = model_category
self.category_value = category_value
if category_value not in self._predictions:
self._predictions[category_value]= {}
if category_value not in self._performance:
self._performance[category_value] = {}
if category_value not in self._models:
self._models[category_value] = {}
接下来,在数据准备类中定义一个数据准备方法。我们将首先为训练/测试集划分、模型类别和类别值定义属性。然后,我们将检查预测、性能和模型字典中是否存在类别值。如果不存在,我们将为新的类别存储一个空字典:
class DataPrep(object):
...
def dataprep(self, model_name, model_category, category_value, split):
self.split = split
self.model_category = model_category
self.category_value = category_value
if category_value not in self._predictions:
self._predictions[category_value]= {}
if category_value not in self._performance:
self._performance[category_value] = {}
if category_value not in self._models:
self._models[category_value] = {}
然后,我们将根据类别进行过滤,定义输入和输出,将数据划分为训练集和测试集,并将结果存储在数据准备属性中:
class DataPrep(object):
...
def dataprep(self, model_name, model_category, category_value, split):
...
self.data_cat = self.data[self.data[model_category] == category_value]
self.X = self.data_cat[features]
self.y = self.data_cat[self.target]
X_train, X_test, y_train, y_test = train_test_split(self.X, self.y, random_state=42, test_size=split)
self.X_test = X_test
self.y_test = y_test
self.X_train = X_train
self.y_train = y_train
完整的数据准备类如下:
class DataPrep(object):
def __init__(self):
self._performance = {}
self._predictions = {}
self._models = {}
feature_engineering = FeatureEngineering()
feature_engineering.engineer(['bmi', 'age'], 'charges', True, False)
self.data = feature_engineering.data
self.target = feature_engineering.target
def dataprep(self, model_name, model_category, category_value, split):
self.split = split
self.model_category = model_category
self.category_value = category_value
if category_value not in self._predictions:
self._predictions[category_value]= {}
if category_value not in self._performance:
self._performance[category_value] = {}
if category_value not in self._models:
self._models[category_value] = {}
self.data_cat = self.data[self.data[model_category] == category_value]
self.X = self.data_cat[features]
self.y = self.data_cat[self.target]
X_train, X_test, y_train, y_test = train_test_split(self.X, self.y, random_state=42, test_size=split)
self.X_test = X_test
self.y_test = y_test
self.X_train = X_train
self.y_train = y_train
最后,我们定义一个模型训练类,它允许我们访问准备好的数据、训练模型、生成预测和计算性能:
class ModelTraining(object):
def __init__(self, dataprep):
self._models = dataprep._models
self._predictions = dataprep._predictions
self._performance = dataprep._performance
def get_data(self, training_data, category_value):
self.X_train, self.X_test, self.y_train, self.y_test = training_data
self.category_value = category_value
def fit(self, model_name, n_estimators=10, max_depth=10):
if model_name == 'lr':
self.model = LinearRegression()
self.model.fit(self.X_train, self.y_train)
elif model_name == 'rf':
self.model = RandomForestRegressor(n_estimators=n_estimators, max_depth = max_depth, random_state=42)
self.model.fit(self.X_train, self.y_train)
self._models[self.category_value] = self.model
def predict(self, model_name):
self._predictions[self.category_value][model_name] = self._models[self.category_value].predict(self.X_test)
def validate(self, model_name):
self._performance[self.category_value][model_name] = mean_absolute_error(self._predictions[self.category_value][model_name], self.y_test)
现在,我们可以使用类层次结构运行一系列实验。例如,我们可以构建一个仅基于女性患者数据训练的随机森林模型:
dataprep = DataPrep()
dataprep.dataprep('rf', 'sex', 'female', 0.2)
training_data = dataprep.X_train, dataprep.X_test, dataprep.y_train, dataprep.y_test
category_value = dataprep.category_value
modeltraining = ModelTraining(dataprep)
modeltraining.get_data(training_data, category_value)
modeltraining.fit('rf', 200, 200)
modeltraining.predict('rf')
modeltraining.validate('rf')
print(modeltraining._performance)
{'female': {'rf': 8466.305049427323}}
我们还可以构建一个仅基于女性患者数据训练的线性回归模型。该模型的性能将添加到现有的性能字典中:
dataprep.dataprep('lr', 'sex', 'female', 0.2)
training_data = dataprep.X_train, dataprep.X_test, dataprep.y_train, dataprep.y_test
category_value = dataprep.category_value
modeltraining = ModelTraining(dataprep)
modeltraining.get_data(training_data, category_value)
modeltraining.fit('lr')
modeltraining.predict('lr')
modeltraining.validate('lr')
print(modeltraining._performance)
{'female': {'rf': 8466.305049427323, 'lr': 8034.741428854192}}
我们可以对男性患者做同样的操作。这是线性回归的结果:
dataprep.dataprep('lr', 'sex', 'male', 0.2)
training_data = dataprep.X_train, dataprep.X_test, dataprep.y_train, dataprep.y_test
category_value = dataprep.category_value
modeltraining = ModelTraining(dataprep)
modeltraining.get_data(training_data, category_value)
modeltraining.fit('lr')
modeltraining.predict('lr')
modeltraining.validate('lr')
print(modeltraining._performance)
{'female': {'rf': 8466.305049427323, 'lr': 8034.741428854192}, 'male': {'lr': 9583.028554450382}}
和随机森林的结果:
dataprep.dataprep('rf', 'sex', 'male', 0.2)
training_data = dataprep.X_train, dataprep.X_test, dataprep.y_train, dataprep.y_test
category_value = dataprep.category_value
modeltraining = ModelTraining(dataprep)
modeltraining.get_data(training_data, category_value)
modeltraining.fit('rf', 200, 200)
modeltraining.predict('rf')
modeltraining.validate('rf')
print(modeltraining._performance)
{'female': {'rf': 8466.305049427323, 'lr': 8034.741428854192}, 'male': {'lr': 9583.028554450382, 'rf': 10609.717391992232}}
我们可以看到,我们有一个包含多个实验及其对应的模型类型、类别级别和模型性能值的字典。
本文中使用的代码可以在GitHub上找到:https://github.com/spierre91/deepnote/blob/main/helper_class_ml.ipynb
在本文中,我们讨论了如何使用面向对象编程来简化数据科学工作流程的部分。
首先,我们定义了一个单一的ML工作流类,它可以进行简单的EDA、数据准备、模型训练和验证。
然后,我们看到随着我们向类添加功能,对类实例的方法调用变得难以阅读。为了避免阅读和解释代码时出现问题,我们设计了一个由一系列辅助类组成的类层次结构。每个辅助类对应于ML工作流程中的一个步骤。这样可以轻松理解方法与高级任务的关系,从而提高代码的可读性和可维护性。
我鼓励你在自己的ML项目中尝试使用这种方法。
✄-----------------------------------------------看到这里,说明你喜欢这篇文章,请点击「在看」或顺手「转发」「点赞」。
欢迎微信搜索「panchuangxx」,添加小编磐小小仙微信,每日朋友圈更新一篇高质量推文(无广告),为您提供更多精彩内容。
▼ ▼ 扫描二维码添加小编 ▼ ▼