登录
首页精彩阅读加快python算法的四个方法:数据并行化篇
加快python算法的四个方法:数据并行化篇
2020-06-09
收藏

CDA数据分析师 出品

相信大家在做一些算法经常会被庞大的数据量所造成的超多计算量需要的时间而折磨的痛苦不已,接下来我们围绕四个方法来帮助大家加快一下Python的计算时间,减少大家在算法上的等待时间。以下给大家讲解关于数据并行化这方面的内容。

1.介绍

随着时间和处理器计算能力的增长,数据呈指数级增长,我们需要找到有效地处理数据的方法。那我们应该怎么办呢?

GPU是一种非常有效的解决方案。但是,GPU并不是为了机器学习而设计的,它是专门为复杂的图像处理和游戏而设计的。我们使算法能够在现有GPU上运行,并且确实取得了成果。现在,谷歌推出了一种名为TPU(张量处理单元)的新设备,该设备专门针对TensorFlow上的机器学习工作而量身定做的,其结果确实令人激动。同时英伟达在这方面也并没有退缩。

但是我们将来会在某个时候达到顶峰。即使我们我们现在拥有大量可用的数据集,但是单台机器或计算单元也不足以处理这样的负载。我们将不得不使用多台机器来完成我们的任务。我们将不得不并行化完成我们的任务。

接下来,我们将研究大多数情况下你将在Python中使用的一些方法。然后再介绍一下Dasktorch.multiprocessing

2.池和进程

Python库的Pool和Process方法都来自于multiprocessing它为我们的任务启动了一个新的过程,但是方式有所不同。Process每次调用仅执行一个进程:

import multiprocessing as mpp = mp.Process(target= ##目标函数, args= ##参数到函数)# 此调用将只生产一个进程,该进程将处理在后台使用给定的参数处理目标函数

但是这个过程还没有开始。要启动它,你必须执行以下操作:

p.start

现在,你可以将其保留在此处,或者通过以下方式检查该过程是否完成:

p.join#现在它将等待进程完成。

不检查过程是否已完成有许多用途。例如,在客户端-服务器应用程序中,数据包丢失的可能性或无响应进程的可能性确实很低,我们可以忽略它,这可以使我们的速度大大提高。[取决于申请程序]

对于多个进程,你必须创建多个Process。你想做多少就可以做多少。当你调用.start它们时,它们全部都将会启动。

processes =[mp.Process(target=func, args=(a, b)) for (a, b) in list]for p in processes: p.startfor p in processes: p.join

另一方面, Pool启动固定数量的进程,然后我们可以为这些进程分配一些任务。因此,在特定的时间实例中,只有固定数量的进程将在运行,其余的将在等待状态中。进程的数量通常被选作设备的内核数,如果此参数为空,也是可以作为默认的状态的。

pool = mp.Pool(processes=2)

现在有许多方法可以应用在Pool。在Data Science中,我们可以避免使用的是Pool.apply和Pool.map,因为它们会在任务完成后立即返回结果。Pool.apply仅采用一个参数,并且仅使用一个过程,而Pool.map将接受许多参数,并将其放入我们Pool的过程中。

results = [pool.apply(func, (x)) for x in X]# 或者 results = pool.map(func, (arg)) # 仅需要一个参数

考虑到我们前面的客户端-服务器应用程序的例子,此处预定义了要运行的最大进程数,因此,如果我们有很多请求/数据包,则n(仅在Pool中的最大进程)将运行一次,而其他将在等待其中一个进程插槽的队列中排队。

向量的所有元素的平方

# 我们如何使用数据框# A: 你可以使用一些可以并行化的函数df.shape# (100, 100)dfs = [df.iloc[i*25:i*25+25, 0] for i in range(4)]with Pool(4) as p: res = p.map(np.exp, dfs)for i in range(4): df.iloc[i*25:i*25+25, 0] = res[i]# 它可以方便的对数据进行预处理

什么时候使用什么?

如果你有很多任务,但其中很少的任务是计算密集型的,则应使用Process。因为如果它们需要大量计算,它们可能会阻塞你的CPU,并且你的系统可能会崩溃。如果你的系统可以一次处理所有这些操作,那么他们就不必在队列中等待机会了。

并且当你的任务数量固定且它们的计算量很大时,应使用Pool。因为你同时释放他们,那么你的系统很可能会崩溃。

3.线程处理

什么!线程处理在python中进行?

python中的线程声誉。人们的这一点看法是对的。实际上,线程在大多数情况下是不起作用的。那么问题到底是什么呢?

问题就出在GIL(全局解释器锁定)上。GIL是在Python的开发初期就引入的,当时甚至在操作系统中都没有线程的概念。选择它是因为它的简单性。

GIL一次仅允许一个CPU进程。也就是说,它一次仅允许一个线程访问python解释器。因此,一个线程将整个解释器Lock,直到它完成。

对于单线程程序,它非常快,因为只有一个Lock要维护。随着python的流行,有效地推出GIL而不损害所有相关应用程序变得越来越困难。这就是为什么它仍然存在的原因。

但是,如果你的任务不受CPU限制,则仍然可以使用多线程并行(y)。也就是说,如果你的任务受I / O约束,则可以使用多个线程并获得加速。因为大多数时候这些任务都在等待其他代理(例如磁盘等)的响应,并且在这段时间内它们可以释放锁,而让其他任务同时获取它。⁴

NOTE: (来自于官方网页)The GIL is controversial because it prevents multithreaded CPython programs from taking full advantage of multiprocessor systems in certain situations. Note that potentially blocking or long-running operations, such as I/O, image processing, and NumPy number crunching, happen outside the GIL. Therefore it is only in multithreaded programs that spend a lot of time inside the GIL, interpreting CPython bytecode, that the GIL becomes a bottleneck.

以下是对官方网页的解释:

GIL是有争议的,因为它阻止多线程CPython程序在某些情况下充分利用多处理器系统。注意,潜在的阻塞或长时间运行的操作,如I/O、图像处理和NumPy数字处理,都发生在GIL之外。因此,只有在花费大量时间在GIL内部解释CPython字节码的多线程程序中,GIL才会成为瓶颈。

因此,如果你的任务受IO限制,例如从服务器下载一些数据,对磁盘进行读/写等操作,则可以使用多个线程并获得加速。

from threading import Thread as timport queueq = queue.Queue # 用于放置和获取线程的结果func_ = lambda q, args: q.put(func(args))threads = [t(target=func_, args=(q, args)) for args in args_array]for t in threads: t.startfor t in threads: t.joinres = for t in threads: res.append(q.get) # 这些结果不一定是按顺序排列的

要保存线程的结果,可以使用类似于Queue 的方法。为此,你将必须如上所示定义函数,或者可以在函数内部使用Queue.put,但是为此,你必须更改函数定义以Queue`做为参数。

现在,你在队列中的结果不一定是按顺序排列的。如果希望结果按顺序排列,则可以传入一些计数器作为参数,如id作为参数,然后使用这些id来标识结果的来源。

threads = [t(func_, args = (i, q, args)) for i, args in enumerate(args_array)]# 并相应地更新函数NOTE:在pandas中的多处理中由于某些原因 'read.csv' 的方法并没有提供太多的加速,你可以考虑使用Dask做为替代

线程还是进程?

一个进程是重量级的,因为它可能包含许多自己的线程(包含至少一个线程),并且分配了自己的内存空间,而线程是轻量级的,因为它在父进程的内存区域上工作,因此制作起来更快。

进程内的线程之间的通信比较容易,因为它们共享相同的内存空间。而进程间的通信(IPC-进程间通信)则比较慢。但是,共享相同数据的线程又可能进入竞争状态,应谨慎使用Locks或使用类似的解决方案。

4.Dask

Dask是一个并行计算库,它不仅有助于并行化现有的机器学习工具(Pandas和Numpy)(即使用高级集合),而且还有助于并行化低级任务/功能,并且可以通过制作任务图来处理这些功能之间的复杂交互。[ 即使用低级调度程序 ]这类似于Python的线程或多处理模块。

他们也有一个单独的机器学习库dask-ml,这与如现有的库(如sklearn,xgboost和tensorflow)集成在一起。

from dask import delayed as delay@delaydef add(x, y): return x+y@delaydef sq(x): return x**2# 现在你可以以任何方式使用这些函数,Dask将使你的执行并行化。顾名思义,Dask不会立即执行函数调用,而是根据对输入和中间结果调用函数的方式生成计算图。计算最终结果:result.compute

Dask在做任何事情的时候都有一种内在的并行性。对于如何处理DataFrame的,你可以将其视为分而治之的方法,它将DataFrame分为多个块,然后并行应用给定的函数。

df = dask.DataFrame.read_csv("BigFile.csv", chunks=50000)# 你的DataFrame已经被划分为了多个块,你应用的每个函数将分别并行的应用所有的模块。它有大部分的Pandas功能,你可以使用:agg = df.groupby(["column"]).aggregate(["sum", "mean"])agg.columns = new_column_namesdf_new = df.merge(agg.reset_index, on="column", how="left")# 虽然到目前为止还没有计算结果,但是使用.compute可以并行计算。df_new.compute.head

它们还具有用于在计算机集群上运行它们的接口。

5.torch.multiprocessing

torch.multiprocessing是Python multiprocessing模块的封装函数,其API与原始模块100%兼容。因此,你可以在此处使用Python的 multiprocessing模块中的Queue',Pipe',Array'等。此外,为了使其更快,他们添加了一个方法,share_memory_该方法允许数据进入一个状态,在这个状态下任何进程都可以直接使用它,因此将该数据作为参数传递给不同的进程不会复制该数据。 。

你可以共享Tensors,模型的parameters,也可以根据需要在CPU或GPU上共享它们。

来自Pytorch的警告:(关于GPU上的共享) CUDA API要求导出到其他进程的分配在被其他进程使用时仍然有效。你应该小心,确保你共享的CUDA张量不会超出范围,只要有必要。这对于共享模型参数应该不是问题,但是传递其他类型的数据时应该小心。注意,这个限制不适用于共享CPU内存。

你可以在此处的"Pool and Process"部分中使用上面的方法,并且要获得更快的速度,可以使用share_memory_方法在所有进程之间共享一个Tensor(例如)而不被需要复制。

# 使用多个过程训练一个模型:import torch.multiprocessing as mpdef train(model): for data, labels in data_loader: optimizer.zero_grad loss_fn(model(data), labels).backward optimizer.step # 这将更新共享参数model = nn.Sequential(nn.Linear(n_in, n_h1), nn.ReLU, nn.Linear(n_h1, n_out))model.share_memory #需要"fork"方法工作processes = for i in range(4): # NO.的过程 p = mp.Process(target=train, args=(model,)) p.start processes.append(p)for p in processes: p.join

下一期继续看加快Python算法的第4种方法——Dask!

数据分析咨询请扫描二维码

客服在线
立即咨询