登录
首页精彩阅读加快python算法的四个方法:Dask篇
加快python算法的四个方法:Dask篇
2020-06-08
收藏

CDA数据分析师 出品

相信大家在做一些算法经常会被庞大的数据量所造成的超多计算量需要的时间而折磨的痛苦不已,接下来我们围绕四个方法来帮助大家加快一下python的计算时间,减少大家在算法上的等待时间。今天给大家讲述最后一方面的内容,关于Dask的方法运用。

1.简介

随着对机器学习算法并行化的需求不断增加,由于数据大小甚至模型大小呈指数级增长,如果我们拥有一个工具,可以帮助我们并行化处理Pandas的DataFrame,可以并行化处理Numpy的计算,甚至并行化我们的机器学习算法(可能是来自sklearn和Tensorflow的算法)也没有太多的麻烦,那它对我们会非常有帮助。

好消息是确实存在这样的库,其名称为Dask。Dask是一个并行计算库,它不仅有助于并行化现有的机器学习工具(Pandas和Numpy)(即使用高级集合),而且还有助于并行化低级任务/功能,并且可以通过制作任务图来处理这些功能之间的复杂交互。[ 即使用低级调度程序 ]这类似于Python的线程或多处理模块。

他们也有一个单独的机器学习库dask-ml,这与如现有的库集成如sklearn,xgboost和tensorflow。

Dask通过绘制任务之间的交互图来并行化分配给它的任务。使用Dask的.visualize方法来可视化你的工作将非常有帮助,该方法可用于所有数据类型以及你计算的复杂任务链。此方法将输出你的任务图,并且如果你的任务在每个级别具有多个节点(即,你的任务链结构在多个层次上具有许多独立的任务,例如数据块上的并行任务),然后Dask将能够并行化它们。

注意: Dask仍是一个相对较新的项目。它还有很长的路要走。不过,如果你不想学习全新的API(例如PySpark),Dask是你的最佳选择,将来肯定会越来越好。Spark / PySpark仍然遥遥领先,并且仍将继续改进。这是一个完善的Apache项目。

2.数据类型

Dask中的每种数据类型都提供现有数据类型的分布式版本,例如pandas中的DataFramenumpy中的ndarray和Python中的list。这些数据类型可以大于你的内存,Dask将以Blocked方式对数据并行(y)运行计算。Blocked从某种意义上说,它们是通过执行许多小的计算(即,以块为单位)来执行大型计算的,而块的数量为chunks的总数。

a)数组:

网格中的许多Numpy数组作为Dask数组

Dask Array对非常大的数组进行操作,将它们划分为块并并行执行这些块。它有许多可用的numpy方法,你可以使用这些方法来加快速度。但是其中一些没有实现。

只要支持numpy切片,Dask Array就可以从任何类似数组结构中读取数据,并且可以通过使用并且通过使用Dask . Array .from_array方法具有.shape属性。它还可以读取.npy和.zarr文件。

import dask.array as daimport numpy as nparr = numpy.random.randint(1, 1000, (10000, 10000))darr = da.from_array(arr, chunks=(1000, 1000))# 它会生成大小为(1000,1000)的块darr.npartitioins# 100

当你的数组真的很重时(即它们无法放入内存)并且numpy对此无能为力时,可以使用它。因此,Dask将它们分为数组块并为你并行处理它们。

现在,Dask对每种方法进行惰性评估。因此,要实际计算函数的值,必须使用.compute方法。它将以块为单位并行计算结果,同时并行化每个独立任务。

result = darr.compute

1)元素数量较少时,Numpy比Dask快;2)Dask接管了Numpy,耗时约1e7个元素;3)Numpy无法产生更多元素的结果,因为它无法将它们存储在内存中。

b)DataFrame

5个Pandas DataFrame在一个Dask DataFrame中提供每月数据(可以来自diff文件)

与Dask Arrays相似,Dask DataFrames通过将文件划分为块并将这些块的计算函数并行化,从而对不适合内存非常大的数据文件进行并行计算。

import dask.dataframe as dddf = dd.read_csv("BigFile(s).csv", blocksize=50e6)

现在,你可以应用/使用pandas库中可用的大多数功能,并在此处应用。

agg = df.groupby(["column"]).aggregate(["sum", "mean", "max", "min"])agg.columns = new_column_names #请查看notebookdf_new = df.merge(agg.reset_index, on="column", how="left")df_new.compute.head

c)Bag:

Dask Bag包并行处理包含多个数据类型元素Python的list相似对象。当你尝试处理一些半结构化数据(例如JSON Blob或日志文件)时,此功能很有用。

import dask.bag as dbb = db.from_txt("BigSemiStructuredData.txt")b.take(1)

Daskbag逐行读取,.take方法输出指定行数的元组。

Dask Bag在这样的Python对象集合上实现例如map,filter,fold,和groupby等操作。它使用Python迭代器并行地完成这个任务,占用的内存很小。它类似于PyToolz的并行版本或PySpark RDD的Python版本。

filtered = b.filter(lambda x: x["Name"]=="James")\ .map(lambda x: x["Address"] = "New_Address")filtered.compute

3.延时

如果你的任务有点简单,并且你不能或不想使用这些高级集合来执行操作,则可以使用低级调度程序,该程序可帮助你使用dask.delayed接口并行化代码/算法。dask.delayed也可以进行延迟计算。

import dask.delayed as delay@delaydef sq(x): return x**2@delay def add(x, y): return x+y@delay def sum(arr): sum=0 for i in range(len(arr)): sum+=arr[i] return sum

你可以根据需要在这些函数之间添加复杂的交互,使用上一个任务的结果作为下一个任务的参数。Dask不会立即计算这些函数,而是会为你的任务绘制图形,有效地合并你使用的函数之间的交互。

inputs = list(np.arange(1, 11))#将外接程序 dask.delayed 加入到列表temp = for i in range(len(inputs)): temp.append(sq(inputs[i])) # 计算输入的sq并保存 # 延迟计算在列表inputs=temp; temp = for i in range(0, len(inputs)-1, 2): temp.append(add(inputs[i]+inputs[i+1])) # 添加两个连续 # 结果从prev步骤inputs = tempresult = sum(inputs) # 将所有prev步骤的结果相加results.compute

你可以将延迟添加到具有许多可能的小块的任何可并行化代码中,从而获得加速的效果。它可以是你想计算的许多函数,例如上面的示例,或者可以使用并行读取多个文件pandas.read_csv。

4.分布式

首先,到目前为止,我们一直使用Dask的默认调度器来计算任务的结果。但是你可以根据需要从Dask提供的选项中更改它们。

Dask 带有四个可用的调度程序:

· threaded:由线程池支持的调度程序

· processes:由进程池支持的调度程序

· single-threaded(又名" sync"):同步调度程序,用于调试

· distributed:用于在多台计算机上执行图形的分布式调度程序

result.compute(scheduler="single-threaded") #用于调试# 或者dask.config.set(scheduler="single-threaded")result.compute#注:(从官方网页)#当被称为GIL的函数释放时,线程任务将工作得很好,而多处理总是启动时间较慢,并且在任务之间需要大量的通信。# 你可以通过其中一个得到调度程序 commands:dask.threaded.get, dask.multiprocessing.get, dask.local.get_sync#单线程的最后一个

但是,Dask还有一个调度器,dask.distributed由于以下原因它可能是首选使用的:

1. 它提供了异步API的访问,尤其是Future,

1. 它提供了一个诊断仪表板,可以提供有关性能和进度的宝贵见解

1. 它可以更复杂地处理数据位置,因此在需要多个流程的工作负载上,它比多处理调度程序更有效。

你可以创建一个Dask的dask.distributed调度程序,通过导入和创建客户端实现分布式调度器

from dask.distributed import Clientclient = Client # Set up a local cluster# 你可以导航到http://localhost:8787/status 查看# 诊断仪表板,如果你有Bokeh安装的话

现在,你可以使用client.submit方法,将函数和参数作为其参数,从而将任务提交到此集群。然后我们可以使用client.gather或.result方法来收集结果。

sent = client.submit(sq, 4) # sq: square 函数result = client.gather(sent) # 或者 sent.result

你也可以仅使用dask.distributed.progress来查看当前单元格中任务的进度。你还可以明确选择使用dask.distributed.wait来等待任务完成。

Note: (Local Cluster)有时您会注意到Dask正在超出内存使用,即使它正在划分任务。它可能发生在您身上,因为您试图在数据集上使用的函数需要您的大部分数据进行处理,而多重处理可能使情况变得更糟,因为所有工作人员都可能试图将数据集复制到内存中。这可能发生在聚合的情况下。或者您可能想限制Dask只使用特定数量的内存。

在这些情况下,您可以使用Dask.distributed。LocalCluster参数,并将它们传递给Client,从而使用本地机器的核心构建LocalCluster。

from dask.distributed import Client, LocalClusterclient = Client(n_workers=1, threads_per_worker=1, processes=False, memory_limit='25GB', scheduler_port=0, silence_logs=True, diagnostics_port=0)client

'scheduler_port=0'和' stics_port=0'将为这个特定的客户端选择随机端口号。在'process =False'的情况下,dask的客户端不会复制数据集,这可能发生在您所创建的每个进程中。您可以根据自己的需要或限制对客户机进行调优,要了解更多信息,可以查看LocalCluster的参数。您还可以在同一台机器的不同端口上使用多个客户机。

5.机器学习

Dask也有一个库,可以帮助并允许大多数流行的机器学习库,例如sklearn,tensorflow和xgboost。

机器学习中,你可能会遇到几个不同的问题。而具体的策略取决于你面临的问题:

1. 大型模型:数据适合放入RAM,但是训练时间太长。许多超参数组合,许多模型的大型集合等。

1. 大型数据集:数据大于RAM,并且不能选择抽样。

因此,你应该:

· 对于内存中适合的问题,只需使用scikit-learn(或你最喜欢的ML库)即可;

· 对于大型模型,请使用dask_ml.joblib和你最喜欢的scikit-learn估算器

· 对于大型数据集,请使用dask_ml估算器。

a)预处理:

dask_ml.preprocessing包含一些sklearn的一些功能,如RobustScalar(稳健标量),StandardScalar(标准标量),LabelEncoder(标签编码器),OneHotEncoder(独热编码),PolynomialFeatures(多项式特性)等等,以及它的一些自己的如Categorizer(分类器),DummyEncoder(虚拟编码),OrdinalEncoder(序数编码器)等。

你可以像使用PandasDataFrame一样使用它们。

from dask_ml.preprocessing import RobustScalardf = da.read_csv("BigFile.csv", chunks=50000)rsc = RobustScalardf["column"] = rsc.fit_transform(df["column"])

你可以使用Dask的DataFrame上的预处理方法,从Sklearn的Make_pipeline方法生成一个管道。

b)超参数搜索:

Dask具有sklearn用于进行超参数搜索的方法,例如GridSearchCV,RandomizedSearchCV等等。

from dask_ml.datasets import make_regressionfrom dask_ml.model_selection import train_test_split, GridSearchCVX, y = make_regression(chunks=50000)xtr, ytr, xval, yval = test_train_split(X, y)gsearch = GridSearchCV(estimator, param_grid, cv=10)gsearch.fit(xtr, ytr)

而且,如果要partial_fit与估算器一起使用,则可以使用dask-ml的IncrementalSearchCV。

注意:(来自Dask)如果要使用后拟合任务(如评分和预测),则使用基础估计量评分方法。如果你的估算器(可能来自sklearn )无法处理大型数据集,则将估算器包装在" dask_ml.wrappers.ParallelPostFit" 周围。它可以并行化" predict"," predict_proba"," transform"等方法。

c)模型/估计器:

Dask具有一些线性模型(的LinearRegression,LogisticRegression等),一些聚类模型(Kmeans和SpectralClustering),一种使用Tensorflow 的方法,使用Dask训练XGBoost模型的方法。

如果训练数据较小,则可以将sklearn的模型与结合使用Dask,如果ParallelPostFit包装数据较大,则可以与包装器一起使用(如果测试数据较大)。

from sklearn.linear_model import ElasticNetfrom dask_ml.wrappers import ParallelPostFitel = ParallelPostFit(estimator=ElasticNet)el.fit(Xtrain, ytrain)preds = el.predict(Xtest)

如果数据集不大但模型很大,则可以使用joblib。sklearns编写了许多用于并行执行的算法(你可能使用过n_jobs=-1参数),joblib该算法利用线程和进程来并行化工作负载。要用于Dask并行化,你可以创建一个Client(客户端)(必须),然后使用with joblib.parallel_backend('dask'):包装代码。

import dask_ml.joblibfrom sklearn.externals import joblibclient = Clientwith joblib.parallel_backend('dask'): # 你的 scikit-learn 代码

注意:DASK JOBLIB后端对于扩展CPU绑定的工作负载非常有用; 在RAM中包含数据集的工作负载,但具有许多可以并行完成的单独操作。要扩展到受RAM约束的工作负载(大于内存的数据集),你应该使用Dask的内置模型和方法。

而且,如果你训练的数据太大而无法容纳到内存中,那么你应该使用Dask的内置估算器来加快速度。你也可以使用Dask的wrapper.Incremental它使用基础估算器的partial_fit方法对整个数据集进行训练,但实际上是连续的。

Dask的内置估计器很好地扩展用于大型数据集与多种优化算法,如admm,lbfgs,gradient_descent等,并且正则化器如 L1,L2,ElasticNet等。

from dask_ml.linear_model import LogisticRegressionlr = LogisticRegressionlr.fit(X, y, solver="lbfgs")

经过4期的内容讲解,你学会加快Python算法的四种方法了么?

数据分析咨询请扫描二维码

客服在线
立即咨询