
CDA数据分析师 出品
相信大家在做一些算法经常会被庞大的数据量所造成的超多计算量需要的时间而折磨的痛苦不已,接下来我们围绕四个方法来帮助大家加快一下Python的计算时间,减少大家在算法上的等待时间。以下给大家讲解关于数据并行化这方面的内容。
随着时间和处理器计算能力的增长,数据呈指数级增长,我们需要找到有效地处理数据的方法。那我们应该怎么办呢?
GPU是一种非常有效的解决方案。但是,GPU并不是为了机器学习而设计的,它是专门为复杂的图像处理和游戏而设计的。我们使算法能够在现有GPU上运行,并且确实取得了成果。现在,谷歌推出了一种名为TPU(张量处理单元)的新设备,该设备专门针对TensorFlow上的机器学习工作而量身定做的,其结果确实令人激动。同时英伟达在这方面也并没有退缩。
但是我们将来会在某个时候达到顶峰。即使我们我们现在拥有大量可用的数据集,但是单台机器或计算单元也不足以处理这样的负载。我们将不得不使用多台机器来完成我们的任务。我们将不得不并行化完成我们的任务。
接下来,我们将研究大多数情况下你将在Python中使用的一些方法。然后再介绍一下Dask和torch.multiprocessing。
Python库的Pool和Process方法都来自于multiprocessing它为我们的任务启动了一个新的过程,但是方式有所不同。Process每次调用仅执行一个进程:
import multiprocessing as mpp = mp.Process(target= ##目标函数, args= ##参数到函数)# 此调用将只生产一个进程,该进程将处理在后台使用给定的参数处理目标函数
但是这个过程还没有开始。要启动它,你必须执行以下操作:
p.start
现在,你可以将其保留在此处,或者通过以下方式检查该过程是否完成:
p.join#现在它将等待进程完成。
不检查过程是否已完成有许多用途。例如,在客户端-服务器应用程序中,数据包丢失的可能性或无响应进程的可能性确实很低,我们可以忽略它,这可以使我们的速度大大提高。[取决于申请程序]
对于多个进程,你必须创建多个Process。你想做多少就可以做多少。当你调用.start它们时,它们全部都将会启动。
processes =[mp.Process(target=func, args=(a, b)) for (a, b) in list]for p in processes: p.startfor p in processes: p.join
另一方面, Pool启动固定数量的进程,然后我们可以为这些进程分配一些任务。因此,在特定的时间实例中,只有固定数量的进程将在运行,其余的将在等待状态中。进程的数量通常被选作设备的内核数,如果此参数为空,也是可以作为默认的状态的。
pool = mp.Pool(processes=2)
现在有许多方法可以应用在Pool。在Data Science中,我们可以避免使用的是Pool.apply和Pool.map,因为它们会在任务完成后立即返回结果。Pool.apply仅采用一个参数,并且仅使用一个过程,而Pool.map将接受许多参数,并将其放入我们Pool的过程中。
results = [pool.apply(func, (x)) for x in X]# 或者 results = pool.map(func, (arg)) # 仅需要一个参数
考虑到我们前面的客户端-服务器应用程序的例子,此处预定义了要运行的最大进程数,因此,如果我们有很多请求/数据包,则n(仅在Pool中的最大进程)将运行一次,而其他将在等待其中一个进程插槽的队列中排队。
向量的所有元素的平方
# 我们如何使用数据框# A: 你可以使用一些可以并行化的函数df.shape# (100, 100)dfs = [df.iloc[i*25:i*25+25, 0] for i in range(4)]with Pool(4) as p: res = p.map(np.exp, dfs)for i in range(4): df.iloc[i*25:i*25+25, 0] = res[i]# 它可以方便的对数据进行预处理
什么时候使用什么?
如果你有很多任务,但其中很少的任务是计算密集型的,则应使用Process。因为如果它们需要大量计算,它们可能会阻塞你的CPU,并且你的系统可能会崩溃。如果你的系统可以一次处理所有这些操作,那么他们就不必在队列中等待机会了。
并且当你的任务数量固定且它们的计算量很大时,应使用Pool。因为你同时释放他们,那么你的系统很可能会崩溃。
什么!线程处理在python中进行?
python中的线程声誉。人们的这一点看法是对的。实际上,线程在大多数情况下是不起作用的。那么问题到底是什么呢?
问题就出在GIL(全局解释器锁定)上。GIL是在Python的开发初期就引入的,当时甚至在操作系统中都没有线程的概念。选择它是因为它的简单性。
GIL一次仅允许一个CPU进程。也就是说,它一次仅允许一个线程访问python解释器。因此,一个线程将整个解释器Lock,直到它完成。
对于单线程程序,它非常快,因为只有一个Lock要维护。随着python的流行,有效地推出GIL而不损害所有相关应用程序变得越来越困难。这就是为什么它仍然存在的原因。
但是,如果你的任务不受CPU限制,则仍然可以使用多线程并行(y)。也就是说,如果你的任务受I / O约束,则可以使用多个线程并获得加速。因为大多数时候这些任务都在等待其他代理(例如磁盘等)的响应,并且在这段时间内它们可以释放锁,而让其他任务同时获取它。⁴
NOTE: (来自于官方网页)The GIL is controversial because it prevents multithreaded CPython programs from taking full advantage of multiprocessor systems in certain situations. Note that potentially blocking or long-running operations, such as I/O, image processing, and NumPy number crunching, happen outside the GIL. Therefore it is only in multithreaded programs that spend a lot of time inside the GIL, interpreting CPython bytecode, that the GIL becomes a bottleneck.
以下是对官方网页的解释:
GIL是有争议的,因为它阻止多线程CPython程序在某些情况下充分利用多处理器系统。注意,潜在的阻塞或长时间运行的操作,如I/O、图像处理和NumPy数字处理,都发生在GIL之外。因此,只有在花费大量时间在GIL内部解释CPython字节码的多线程程序中,GIL才会成为瓶颈。
因此,如果你的任务受IO限制,例如从服务器下载一些数据,对磁盘进行读/写等操作,则可以使用多个线程并获得加速。
from threading import Thread as timport queueq = queue.Queue # 用于放置和获取线程的结果func_ = lambda q, args: q.put(func(args))threads = [t(target=func_, args=(q, args)) for args in args_array]for t in threads: t.startfor t in threads: t.joinres = for t in threads: res.append(q.get) # 这些结果不一定是按顺序排列的
要保存线程的结果,可以使用类似于Queue 的方法。为此,你将必须如上所示定义函数,或者可以在函数内部使用Queue.put,但是为此,你必须更改函数定义以Queue`做为参数。
现在,你在队列中的结果不一定是按顺序排列的。如果希望结果按顺序排列,则可以传入一些计数器作为参数,如id作为参数,然后使用这些id来标识结果的来源。
threads = [t(func_, args = (i, q, args)) for i, args in enumerate(args_array)]# 并相应地更新函数NOTE:在pandas中的多处理中由于某些原因 'read.csv' 的方法并没有提供太多的加速,你可以考虑使用Dask做为替代
线程还是进程?
一个进程是重量级的,因为它可能包含许多自己的线程(包含至少一个线程),并且分配了自己的内存空间,而线程是轻量级的,因为它在父进程的内存区域上工作,因此制作起来更快。
进程内的线程之间的通信比较容易,因为它们共享相同的内存空间。而进程间的通信(IPC-进程间通信)则比较慢。但是,共享相同数据的线程又可能进入竞争状态,应谨慎使用Locks或使用类似的解决方案。
Dask是一个并行计算库,它不仅有助于并行化现有的机器学习工具(Pandas和Numpy)(即使用高级集合),而且还有助于并行化低级任务/功能,并且可以通过制作任务图来处理这些功能之间的复杂交互。[ 即使用低级调度程序 ]这类似于Python的线程或多处理模块。
他们也有一个单独的机器学习库dask-ml,这与如现有的库(如sklearn,xgboost和tensorflow)集成在一起。
from dask import delayed as delay@delaydef add(x, y): return x+y@delaydef sq(x): return x**2# 现在你可以以任何方式使用这些函数,Dask将使你的执行并行化。顾名思义,Dask不会立即执行函数调用,而是根据对输入和中间结果调用函数的方式生成计算图。计算最终结果:result.compute
Dask在做任何事情的时候都有一种内在的并行性。对于如何处理DataFrame的,你可以将其视为分而治之的方法,它将DataFrame分为多个块,然后并行应用给定的函数。
df = dask.DataFrame.read_csv("BigFile.csv", chunks=50000)# 你的DataFrame已经被划分为了多个块,你应用的每个函数将分别并行的应用所有的模块。它有大部分的Pandas功能,你可以使用:agg = df.groupby(["column"]).aggregate(["sum", "mean"])agg.columns = new_column_namesdf_new = df.merge(agg.reset_index, on="column", how="left")# 虽然到目前为止还没有计算结果,但是使用.compute可以并行计算。df_new.compute.head
它们还具有用于在计算机集群上运行它们的接口。
torch.multiprocessing是Python multiprocessing模块的封装函数,其API与原始模块100%兼容。因此,你可以在此处使用Python的 multiprocessing模块中的Queue',Pipe',Array'等。此外,为了使其更快,他们添加了一个方法,share_memory_该方法允许数据进入一个状态,在这个状态下任何进程都可以直接使用它,因此将该数据作为参数传递给不同的进程不会复制该数据。 。
你可以共享Tensors,模型的parameters,也可以根据需要在CPU或GPU上共享它们。
来自Pytorch的警告:(关于GPU上的共享) CUDA API要求导出到其他进程的分配在被其他进程使用时仍然有效。你应该小心,确保你共享的CUDA张量不会超出范围,只要有必要。这对于共享模型参数应该不是问题,但是传递其他类型的数据时应该小心。注意,这个限制不适用于共享CPU内存。
你可以在此处的"Pool and Process"部分中使用上面的方法,并且要获得更快的速度,可以使用share_memory_方法在所有进程之间共享一个Tensor(例如)而不被需要复制。
# 使用多个过程训练一个模型:import torch.multiprocessing as mpdef train(model): for data, labels in data_loader: optimizer.zero_grad loss_fn(model(data), labels).backward optimizer.step # 这将更新共享参数model = nn.Sequential(nn.Linear(n_in, n_h1), nn.ReLU, nn.Linear(n_h1, n_out))model.share_memory #需要"fork"方法工作processes = for i in range(4): # NO.的过程 p = mp.Process(target=train, args=(model,)) p.start processes.append(p)for p in processes: p.join
下一期继续看加快Python算法的第4种方法——Dask!
数据分析咨询请扫描二维码
若不方便扫码,搜微信号:CDAshujufenxi
t 检验与 Wilcoxon 检验:数据差异比较的两大统计利器 在数据分析中,“比较差异” 是核心需求之一 —— 如新药疗效是否优于旧药 ...
2025-08-26季节性分解外推法:解锁时间序列预测的规律密码 在商业决策、资源调度、政策制定等领域,准确的预测是规避风险、提升效率的关键 ...
2025-08-26CDA 数据分析师:数据治理驱动下的企业数据价值守护者 在数字经济时代,数据已成为企业核心战略资产,其价值的释放离不开高 ...
2025-08-26基于 SPSS 的 ROC 曲线平滑调整方法与实践指南 摘要 受试者工作特征曲线(ROC 曲线)是评估诊断模型或预测指标效能的核心工具, ...
2025-08-25神经网络隐藏层神经元个数的确定方法与实践 摘要 在神经网络模型设计中,隐藏层神经元个数的确定是影响模型性能、训练效率与泛 ...
2025-08-25CDA 数据分析师与数据思维:驱动企业管理升级的核心力量 在数字化浪潮席卷全球的当下,数据已成为企业继人力、物力、财力之后的 ...
2025-08-25CDA数据分析师与数据指标:基础概念与协同逻辑 一、CDA 数据分析师:数据驱动时代的核心角色 1.1 定义与行业价值 CDA(Certified ...
2025-08-22Power Query 移动加权平均计算 Power Query 移动加权平均设置全解析:从原理到实战 一、移动加权平均法的核心逻辑 移动加权平均 ...
2025-08-22描述性统计:CDA数据分析师的基础核心与实践应用 一、描述性统计的定位:CDA 认证的 “入门基石” 在 CDA(Certified Data Analy ...
2025-08-22基于 Python response.text 的科技新闻数据清洗去噪实践 在通过 Python requests 库的 response.text 获取 API 数据后,原始数据 ...
2025-08-21基于 Python response.text 的科技新闻综述 在 Python 网络爬虫与 API 调用场景中,response.text 是 requests 库发起请求后获取 ...
2025-08-21数据治理新浪潮:CDA 数据分析师的战略价值与驱动逻辑 一、数据治理的多维驱动引擎 在数字经济与人工智能深度融合的时代,数据治 ...
2025-08-21Power BI 热力地图制作指南:从数据准备到实战分析 在数据可视化领域,热力地图凭借 “直观呈现数据密度与分布趋势” 的核心优势 ...
2025-08-20PyTorch 矩阵运算加速库:从原理到实践的全面解析 在深度学习领域,矩阵运算堪称 “计算基石”。无论是卷积神经网络(CNN)中的 ...
2025-08-20数据建模:CDA 数据分析师的核心驱动力 在数字经济浪潮中,数据已成为企业决策的核心资产。CDA(Certified Data Analyst)数据分 ...
2025-08-20KS 曲线不光滑:模型评估的隐形陷阱,从原因到破局的全指南 在分类模型(如风控违约预测、电商用户流失预警、医疗疾病诊断)的评 ...
2025-08-20偏态分布:揭开数据背后的非对称真相,赋能精准决策 在数据分析的世界里,“正态分布” 常被视为 “理想模型”—— 数据围绕均值 ...
2025-08-19CDA 数据分析师:数字化时代的价值创造者与决策智囊 在数据洪流席卷全球的今天,“数据驱动” 已从企业战略口号落地为核心 ...
2025-08-19CDA 数据分析师:善用 Power BI 索引列,提升数据处理与分析效率 在 Power BI 数据分析流程中,“数据准备” 是决定后续分析质量 ...
2025-08-18CDA 数据分析师:巧用 SQL 多个聚合函数,解锁数据多维洞察 在企业数据分析场景中,单一维度的统计(如 “总销售额”“用户总数 ...
2025-08-18