包含编程籽料、学习路线图、爬虫代码、安装包等!【点击领取】
引言
在机器学习领域,随着数据集规模的不断扩大和模型复杂度的增加,训练时间变得越来越长。Python的多线程技术为我们提供了一种有效利用现代多核CPU资源的方法,可以显著加速数据预处理、特征工程和模型训练过程。本文将介绍10个高级Python多线程脚本,帮助你在机器学习项目中实现性能飞跃。
1. 多线程数据预处理流水线
import concurrent.futures
import pandas as pd
from sklearn.preprocessing import StandardScaler
def preprocess_chunk(data_chunk):
# 数据清洗
data_chunk = data_chunk.dropna()
# 特征缩放
scaler = StandardScaler()
scaled_features = scaler.fit_transform(data_chunk.select_dtypes(include=['float64']))
data_chunk[data_chunk.select_dtypes(include=['float64']).columns] = scaled_features
return data_chunk
def parallel_preprocessing(data, chunk_size=10000, workers=4):
chunks = [data[i:i + chunk_size] for i in range(0, len(data), chunk_size)]
with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
processed_chunks = list(executor.map(preprocess_chunk, chunks))
return pd.concat(processed_chunks)
# 使用示例
# data = pd.read_csv('large_dataset.csv')
# processed_data = parallel_preprocessing(data)
应用场景:大规模数据集的特征缩放、缺失值处理等预处理操作。
2. 并行特征工程生成
from concurrent.futures import ThreadPoolExecutor
import numpy as np
import pandas as pd
def generate_feature(args):
col1, col2, operation = args
if operation == 'add':
return col1 + col2
elif operation == 'mul':
return col1 * col2
elif operation == 'sub':
return col1 - col2
elif operation == 'div':
return np.where(col2 != 0, col1 / col2, 0)
def parallel_feature_engineering(data, feature_configs, workers=4):
features = pd.DataFrame()
with ThreadPoolExecutor(max_workers=workers) as executor:
results = executor.map(generate_feature,
[(data[config['col1']], data[config['col2']], config['op'])
for config in feature_configs])
for config, result in zip(feature_configs, results):
features[config['name']] = result
return pd.concat([data, features], axis=1)
# 使用示例
# configs = [
# {'name': 'feat1', 'col1': 'age', 'col2': 'income', 'op': 'mul'},
# {'name': 'feat2', 'col1': 'height', 'col2': 'weight', 'op': 'div'}
# ]
# enhanced_data = parallel_feature_engineering(data, configs)
应用场景:需要生成大量交互特征或派生特征时。
3. 多线程超参数搜索
from sklearn.model_selection import ParameterGrid
from sklearn.ensemble import RandomForestClassifier
from concurrent.futures import ThreadPoolExecutor
from sklearn.metrics import accuracy_score
def train_model(params, X_train, y_train, X_val, y_val):
model = RandomForestClassifier(**params)
model.fit(X_train, y_train)
preds = model.predict(X_val)
return accuracy_score(y_val, preds), params
def parallel_param_search(X_train, y_train, X_val, y_val, param_grid, workers=4):
grid = ParameterGrid(param_grid)
best_score = -1
best_params = None
with ThreadPoolExecutor(max_workers=workers) as executor:
futures = []
for params in grid:
futures.append(executor.submit(
train_model, params, X_train, y_train, X_val, y_val))
for future in concurrent.futures.as_completed(futures):
score, params = future.result()
if score > best_score:
best_score = score
best_params = params
return best_params, best_score
# 使用示例
# param_grid = {
# 'n_estimators': [50, 100, 200],
# 'max_depth': [None, 10, 20],
# 'min_samples_split': [2, 5, 10]
# }
# best_params, best_score = parallel_param_search(X_train, y_train, X_val, y_val, param_grid)
应用场景:加速随机森林、梯度提升树等模型的超参数调优过程。
4. 并行模型集成
from sklearn.base import clone
from concurrent.futures import ThreadPoolExecutor
import numpy as np
class ParallelEnsemble:
def __init__(self, base_estimator, n_estimators=10, workers=4):
self.base_estimator = base_estimator
self.n_estimators = n_estimators
self.workers = workers
self.estimators_ = []
def fit(self, X, y):
self.estimators_ = []
with ThreadPoolExecutor(max_workers=self.workers) as executor:
futures = []
for _ in range(self.n_estimators):
estimator = clone(self.base_estimator)
futures.append(executor.submit(estimator.fit, X, y))
for future in concurrent.futures.as_completed(futures):
self.estimators_.append(future.result())
return self
def predict_proba(self, X):
probas = []
with ThreadPoolExecutor(max_workers=self.workers) as executor:
futures = [executor.submit(estimator.predict_proba, X)
for estimator in self.estimators_]
for future in concurrent.futures.as_completed(futures):
probas.append(future.result())
return np.mean(probas, axis=0)
def predict(self, X):
proba = self.predict_proba(X)
return np.argmax(proba, axis=1)
# 使用示例
# from sklearn.linear_model import LogisticRegression
# ensemble = ParallelEnsemble(LogisticRegression(), n_estimators=10, workers=4)
# ensemble.fit(X_train, y_train)
# predictions = ensembl
应用场景:创建并行化的bagging集成模型,适用于任何基础估计器。
5. 多线程交叉验证评估
from sklearn.model_selection import KFold
from concurrent.futures import ThreadPoolExecutor
import numpy as np
from sklearn.metrics import get_scorer
def cross_val_score_parallel(estimator, X, y, cv=5, scoring='accuracy', workers=4):
kf = KFold(n_splits=cv)
scorer = get_scorer(scoring)
scores = []
def train_eval(train_idx, test_idx):
X_train, X_test = X[train_idx], X[test_idx]
y_train, y_test = y[train_idx], y[test_idx]
estimator.fit(X_train, y_train)
return scorer(estimator, X_test, y_test)
with ThreadPoolExecutor(max_workers=workers) as executor:
futures = []
for train_idx, test_idx in kf.split(X):
futures.append(executor.submit(train_eval, train_idx, test_idx))
for future in concurrent.futures.as_completed(futures):
scores.append(future.result())
return np.array(scores)
# 使用示例
# from sklearn.ensemble import GradientBoostingClassifier
# model = GradientBoostingClassifier()
# scores = cross_val_score_parallel(model, X, y, cv=5, workers=4)
# print(f"平均准确率: {scores.mean():.4f}")
应用场景:加速模型的交叉验证过程,特别适用于计算密集型模型。
6. 并行时间序列特征提取
import numpy as np
import pandas as pd
from concurrent.futures import ThreadPoolExecutor
from tsfresh import extract_features
def parallel_ts_feature_extraction(ts_data, column_id='id', column_sort='time', workers=4):
ids = ts_data[column_id].unique()
chunk_size = len(ids) // workers
id_chunks = [ids[i:i + chunk_size] for i in range(0, len(ids), chunk_size)]
def process_chunk(chunk_ids):
chunk_data = ts_data[ts_data[column_id].isin(chunk_ids)]
return extract_features(chunk_data, column_id=column_id, column_sort=column_sort)
features = []
with ThreadPoolExecutor(max_workers=workers) as executor:
futures = [executor.submit(process_chunk, chunk) for chunk in id_chunks]
for future in concurrent.futures.as_completed(futures):
features.append(future.result())
return pd.concat(features)
# 使用示例
# features = parallel_ts_feature_extraction(time_series_data, workers=4)
应用场景:处理大规模时间序列数据集的特征提取。
7. 多线程模型预测服务
from concurrent.futures import ThreadPoolExecutor
import numpy as np
from queue import Queue
from threading import Thread
class PredictionServer:
def __init__(self, model, max_workers=4, batch_size=32):
self.model = model
self.max_workers = max_workers
self.batch_size = batch_size
self.input_queue = Queue()
self.output_queue = Queue()
self.workers = []
def _worker(self):
while True:
batch = self.input_queue.get()
if batch is None:
break
ids, data = batch
preds = self.model.predict(data)
self.output_queue.put((ids, preds))
self.input_queue.task_done()
def start(self):
self.workers = []
for _ in range(self.max_workers):
t = Thread(target=self._worker)
t.start()
self.workers.append(t)
def stop(self):
for _ in range(self.max_workers):
self.input_queue.put(None)
for worker in self.workers:
worker.join()
def predict(self, X):
self.start()
num_samples = len(X)
predictions = [None] * num_samples
# 分批提交预测任务
for i in range(0, num_samples, self.batch_size):
batch = (list(range(i, min(i+self.batch_size, num_samples))),
X[i:i+self.batch_size])
self.input_queue.put(batch)
# 收集结果
results_received = 0
while results_received < num_samples:
ids, preds = self.output_queue.get()
for id_, pred in zip(ids, preds):
predictions[id_] = pred
results_received += len(ids)
self.output_queue.task_done()
self.stop()
return np.array(predictions)
# 使用示例
# server = PredictionServer(trained_model, max_workers=4)
# predictions = server.predict(X_test)
应用场景:构建高性能的模型预测服务,适用于在线或批量预测场景。
8. 并行特征选择
from sklearn.feature_selection import SelectKBest, mutual_info_classif
from concurrent.futures import ThreadPoolExecutor
import numpy as np
def parallel_feature_selection(X, y, k_features=10, workers=4):
n_features = X.shape[1]
features_per_worker = n_features // workers
selected_features = []
def select_features(feature_indices):
selector = SelectKBest(mutual_info_classif, k=min(k_features, len(feature_indices)))
X_subset = X[:, feature_indices]
selector.fit(X_subset, y)
return [feature_indices[i] for i in selector.get_support(indices=True)]
with ThreadPoolExecutor(max_workers=workers) as executor:
futures = []
for i in range(workers):
start = i * features_per_worker
end = (i+1)*features_per_worker if i != workers-1 else n_features
feature_indices = list(range(start, end))
futures.append(executor.submit(select_features, feature_indices))
for future in concurrent.futures.as_completed(futures):
selected_features.extend(future.result())
# 二次筛选
if len(selected_features) > k_features:
selector = SelectKBest(mutual_info_classif, k=k_features)
selector.fit(X[:, selected_features], y)
selected_features = [selected_features[i] for i in selector.get_support(indices=True)]
return selected_features
# 使用示例
# selected = parallel_feature_selection(X_train, y_train, k_features=20, workers=4)
# X_train_selected = X_train[:, selected]
# X_test_selected = X_test[:, selected]
应用场景:高维数据集的并行特征选择。
9. 多线程模型持久化
import concurrent.futures
import pickle
import gzip
from pathlib import Path
def save_model(model, filepath, compress=True):
if compress:
with gzip.open(filepath, 'wb') as f:
pickle.dump(model, f)
else:
with open(filepath, 'wb') as f:
pickle.dump(model, f)
return filepath
def parallel_save_models(models_info, workers=4):
Path("saved_models").mkdir(exist_ok=True)
with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
futures = []
for model_name, model in models_info.items():
filepath = f"saved_models/{model_name}.pkl.gz"
futures.append(executor.submit(save_model, model, filepath))
for future in concurrent.futures.as_completed(futures):
print(f"模型已保存到: {future.result()}")
# 使用示例
# models = {
# 'random_forest': rf_model,
# 'gradient_boosting': gb_model,
# 'svm': svm_model
# }
# parallel_save_models(models, workers=4)
应用场景:同时保存多个训练好的模型,节省I/O时间。
10. 多线程数据增强
import concurrent.futures
import numpy as np
from albumentations import Compose, HorizontalFlip, Rotate, RandomBrightnessContrast
def augment_image(image, augmentations):
return augmentations(image=image)['image']
def parallel_data_augmentation(images, labels, augmentations, multiplier=4, workers=4):
augmented_images = []
augmented_labels = []
# 创建增强管道
aug_pipeline = Compose([
HorizontalFlip(p=0.5),
Rotate(limit=30, p=0.5),
RandomBrightnessContrast(p=0.2),
])
# 准备任务参数
tasks = []
for _ in range(multiplier):
for img, lbl in zip(images, labels):
tasks.append((img, lbl, aug_pipeline))
# 并行执行增强
with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
futures = [executor.submit(augment_image, *task[:2], task[2]) for task in tasks]
for future, task in zip(futures, tasks):
augmented_images.append(future.result())
augmented_labels.append(task[1])
# 合并原始数据
augmented_images = np.concatenate([images, augmented_images])
augmented_labels = np.concatenate([labels, augmented_labels])
return augmented_images, augmented_labels
# 使用示例
# X_train_aug, y_train_aug = parallel_data_augmentation(X_train, y_train, multiplier=3, workers=4)
应用场景:图像数据的并行增强,特别适用于深度学习中的小数据集。
总结
本文介绍了10个Python多线程在机器学习中的高级应用脚本,涵盖了从数据预处理到模型训练、评估和部署的全流程。通过合理利用多线程技术,可以显著提升机器学习工作流的效率,特别是在处理大规模数据或计算密集型任务时。
最后:
希望你编程学习上不急不躁,按照计划有条不紊推进,把任何一件事做到极致,都是不容易的,加油,努力!相信自己!
文末福利
最后这里免费分享给大家一份Python全套学习资料,希望能帮到那些不满现状,想提升自己却又没有方向的朋友,也可以和我一起来学习交流呀。
包含编程资料、学习路线图、源代码、软件安装包等!【点击这里】领取!
① Python所有方向的学习路线图,清楚各个方向要学什么东西
② 100多节Python课程视频,涵盖必备基础、爬虫和数据分析
③ 100多个Python实战案例,学习不再是只会理论
④ 华为出品独家Python漫画教程,手机也能学习