京公网安备 11010802034615号
经营许可证编号:京B2-20210330
CDA数据分析师 出品
相信大家在做一些算法经常会被庞大的数据量所造成的超多计算量需要的时间而折磨的痛苦不已,接下来我们围绕四个方法来帮助大家加快一下python的计算时间,减少大家在算法上的等待时间。今天给大家讲述最后一方面的内容,关于Dask的方法运用。
随着对机器学习算法并行化的需求不断增加,由于数据大小甚至模型大小呈指数级增长,如果我们拥有一个工具,可以帮助我们并行化处理Pandas的DataFrame,可以并行化处理Numpy的计算,甚至并行化我们的机器学习算法(可能是来自sklearn和Tensorflow的算法)也没有太多的麻烦,那它对我们会非常有帮助。
好消息是确实存在这样的库,其名称为Dask。Dask是一个并行计算库,它不仅有助于并行化现有的机器学习工具(Pandas和Numpy)(即使用高级集合),而且还有助于并行化低级任务/功能,并且可以通过制作任务图来处理这些功能之间的复杂交互。[ 即使用低级调度程序 ]这类似于Python的线程或多处理模块。
他们也有一个单独的机器学习库dask-ml,这与如现有的库集成如sklearn,xgboost和tensorflow。
Dask通过绘制任务之间的交互图来并行化分配给它的任务。使用Dask的.visualize方法来可视化你的工作将非常有帮助,该方法可用于所有数据类型以及你计算的复杂任务链。此方法将输出你的任务图,并且如果你的任务在每个级别具有多个节点(即,你的任务链结构在多个层次上具有许多独立的任务,例如数据块上的并行任务),然后Dask将能够并行化它们。
注意: Dask仍是一个相对较新的项目。它还有很长的路要走。不过,如果你不想学习全新的API(例如PySpark),Dask是你的最佳选择,将来肯定会越来越好。Spark / PySpark仍然遥遥领先,并且仍将继续改进。这是一个完善的Apache项目。
Dask中的每种数据类型都提供现有数据类型的分布式版本,例如pandas中的DataFrame、numpy中的ndarray和Python中的list。这些数据类型可以大于你的内存,Dask将以Blocked方式对数据并行(y)运行计算。Blocked从某种意义上说,它们是通过执行许多小的计算(即,以块为单位)来执行大型计算的,而块的数量为chunks的总数。
a)数组:
网格中的许多Numpy数组作为Dask数组
Dask Array对非常大的数组进行操作,将它们划分为块并并行执行这些块。它有许多可用的numpy方法,你可以使用这些方法来加快速度。但是其中一些没有实现。
只要支持numpy切片,Dask Array就可以从任何类似数组结构中读取数据,并且可以通过使用并且通过使用Dask . Array .from_array方法具有.shape属性。它还可以读取.npy和.zarr文件。
import dask.array as daimport numpy as nparr = numpy.random.randint(1, 1000, (10000, 10000))darr = da.from_array(arr, chunks=(1000, 1000))# 它会生成大小为(1000,1000)的块darr.npartitioins# 100
当你的数组真的很重时(即它们无法放入内存)并且numpy对此无能为力时,可以使用它。因此,Dask将它们分为数组块并为你并行处理它们。
现在,Dask对每种方法进行惰性评估。因此,要实际计算函数的值,必须使用.compute方法。它将以块为单位并行计算结果,同时并行化每个独立任务。
result = darr.compute
1)元素数量较少时,Numpy比Dask快;2)Dask接管了Numpy,耗时约1e7个元素;3)Numpy无法产生更多元素的结果,因为它无法将它们存储在内存中。
b)DataFrame:
5个Pandas DataFrame在一个Dask DataFrame中提供每月数据(可以来自diff文件)
与Dask Arrays相似,Dask DataFrames通过将文件划分为块并将这些块的计算函数并行化,从而对不适合内存非常大的数据文件进行并行计算。
import dask.dataframe as dddf = dd.read_csv("BigFile(s).csv", blocksize=50e6)
现在,你可以应用/使用pandas库中可用的大多数功能,并在此处应用。
agg = df.groupby(["column"]).aggregate(["sum", "mean", "max", "min"])agg.columns = new_column_names #请查看notebookdf_new = df.merge(agg.reset_index, on="column", how="left")df_new.compute.head
c)Bag:
Dask Bag包并行处理包含多个数据类型元素Python的list相似对象。当你尝试处理一些半结构化数据(例如JSON Blob或日志文件)时,此功能很有用。
import dask.bag as dbb = db.from_txt("BigSemiStructuredData.txt")b.take(1)
Daskbag逐行读取,.take方法输出指定行数的元组。
Dask Bag在这样的Python对象集合上实现例如map,filter,fold,和groupby等操作。它使用Python迭代器并行地完成这个任务,占用的内存很小。它类似于PyToolz的并行版本或PySpark RDD的Python版本。
filtered = b.filter(lambda x: x["Name"]=="James")\ .map(lambda x: x["Address"] = "New_Address")filtered.compute
如果你的任务有点简单,并且你不能或不想使用这些高级集合来执行操作,则可以使用低级调度程序,该程序可帮助你使用dask.delayed接口并行化代码/算法。dask.delayed也可以进行延迟计算。
import dask.delayed as delay@delaydef sq(x): return x**2@delay def add(x, y): return x+y@delay def sum(arr): sum=0 for i in range(len(arr)): sum+=arr[i] return sum
你可以根据需要在这些函数之间添加复杂的交互,使用上一个任务的结果作为下一个任务的参数。Dask不会立即计算这些函数,而是会为你的任务绘制图形,有效地合并你使用的函数之间的交互。
inputs = list(np.arange(1, 11))#将外接程序 dask.delayed 加入到列表temp = for i in range(len(inputs)): temp.append(sq(inputs[i])) # 计算输入的sq并保存 # 延迟计算在列表inputs=temp; temp = for i in range(0, len(inputs)-1, 2): temp.append(add(inputs[i]+inputs[i+1])) # 添加两个连续 # 结果从prev步骤inputs = tempresult = sum(inputs) # 将所有prev步骤的结果相加results.compute
你可以将延迟添加到具有许多可能的小块的任何可并行化代码中,从而获得加速的效果。它可以是你想计算的许多函数,例如上面的示例,或者可以使用并行读取多个文件pandas.read_csv。
首先,到目前为止,我们一直使用Dask的默认调度器来计算任务的结果。但是你可以根据需要从Dask提供的选项中更改它们。
Dask 带有四个可用的调度程序:
· threaded:由线程池支持的调度程序
· processes:由进程池支持的调度程序
· single-threaded(又名" sync"):同步调度程序,用于调试
· distributed:用于在多台计算机上执行图形的分布式调度程序
result.compute(scheduler="single-threaded") #用于调试# 或者dask.config.set(scheduler="single-threaded")result.compute#注:(从官方网页)#当被称为GIL的函数释放时,线程任务将工作得很好,而多处理总是启动时间较慢,并且在任务之间需要大量的通信。# 你可以通过其中一个得到调度程序 commands:dask.threaded.get, dask.multiprocessing.get, dask.local.get_sync#单线程的最后一个
但是,Dask还有一个调度器,dask.distributed由于以下原因它可能是首选使用的:
1. 它提供了异步API的访问,尤其是Future,
1. 它提供了一个诊断仪表板,可以提供有关性能和进度的宝贵见解
1. 它可以更复杂地处理数据位置,因此在需要多个流程的工作负载上,它比多处理调度程序更有效。
你可以创建一个Dask的dask.distributed调度程序,通过导入和创建客户端实现分布式调度器
from dask.distributed import Clientclient = Client # Set up a local cluster# 你可以导航到http://localhost:8787/status 查看# 诊断仪表板,如果你有Bokeh安装的话
现在,你可以使用client.submit方法,将函数和参数作为其参数,从而将任务提交到此集群。然后我们可以使用client.gather或.result方法来收集结果。
sent = client.submit(sq, 4) # sq: square 函数result = client.gather(sent) # 或者 sent.result
你也可以仅使用dask.distributed.progress来查看当前单元格中任务的进度。你还可以明确选择使用dask.distributed.wait来等待任务完成。
Note: (Local Cluster)有时您会注意到Dask正在超出内存使用,即使它正在划分任务。它可能发生在您身上,因为您试图在数据集上使用的函数需要您的大部分数据进行处理,而多重处理可能使情况变得更糟,因为所有工作人员都可能试图将数据集复制到内存中。这可能发生在聚合的情况下。或者您可能想限制Dask只使用特定数量的内存。
在这些情况下,您可以使用Dask.distributed。LocalCluster参数,并将它们传递给Client,从而使用本地机器的核心构建LocalCluster。
from dask.distributed import Client, LocalClusterclient = Client(n_workers=1, threads_per_worker=1, processes=False, memory_limit='25GB', scheduler_port=0, silence_logs=True, diagnostics_port=0)client
'scheduler_port=0'和' stics_port=0'将为这个特定的客户端选择随机端口号。在'process =False'的情况下,dask的客户端不会复制数据集,这可能发生在您所创建的每个进程中。您可以根据自己的需要或限制对客户机进行调优,要了解更多信息,可以查看LocalCluster的参数。您还可以在同一台机器的不同端口上使用多个客户机。
Dask也有一个库,可以帮助并允许大多数流行的机器学习库,例如sklearn,tensorflow和xgboost。
在机器学习中,你可能会遇到几个不同的问题。而具体的策略取决于你面临的问题:
1. 大型模型:数据适合放入RAM,但是训练时间太长。许多超参数组合,许多模型的大型集合等。
1. 大型数据集:数据大于RAM,并且不能选择抽样。
因此,你应该:
· 对于内存中适合的问题,只需使用scikit-learn(或你最喜欢的ML库)即可;
· 对于大型模型,请使用dask_ml.joblib和你最喜欢的scikit-learn估算器
· 对于大型数据集,请使用dask_ml估算器。
a)预处理:
dask_ml.preprocessing包含一些sklearn的一些功能,如RobustScalar(稳健标量),StandardScalar(标准标量),LabelEncoder(标签编码器),OneHotEncoder(独热编码),PolynomialFeatures(多项式特性)等等,以及它的一些自己的如Categorizer(分类器),DummyEncoder(虚拟编码),OrdinalEncoder(序数编码器)等。
你可以像使用PandasDataFrame一样使用它们。
from dask_ml.preprocessing import RobustScalardf = da.read_csv("BigFile.csv", chunks=50000)rsc = RobustScalardf["column"] = rsc.fit_transform(df["column"])
你可以使用Dask的DataFrame上的预处理方法,从Sklearn的Make_pipeline方法生成一个管道。
b)超参数搜索:
Dask具有sklearn用于进行超参数搜索的方法,例如GridSearchCV,RandomizedSearchCV等等。
from dask_ml.datasets import make_regressionfrom dask_ml.model_selection import train_test_split, GridSearchCVX, y = make_regression(chunks=50000)xtr, ytr, xval, yval = test_train_split(X, y)gsearch = GridSearchCV(estimator, param_grid, cv=10)gsearch.fit(xtr, ytr)
而且,如果要partial_fit与估算器一起使用,则可以使用dask-ml的IncrementalSearchCV。
注意:(来自Dask)如果要使用后拟合任务(如评分和预测),则使用基础估计量评分方法。如果你的估算器(可能来自sklearn )无法处理大型数据集,则将估算器包装在" dask_ml.wrappers.ParallelPostFit" 周围。它可以并行化" predict"," predict_proba"," transform"等方法。
c)模型/估计器:
Dask具有一些线性模型(的LinearRegression,LogisticRegression等),一些聚类模型(Kmeans和SpectralClustering),一种使用Tensorflow 的方法,使用Dask训练XGBoost模型的方法。
如果训练数据较小,则可以将sklearn的模型与结合使用Dask,如果ParallelPostFit包装数据较大,则可以与包装器一起使用(如果测试数据较大)。
from sklearn.linear_model import ElasticNetfrom dask_ml.wrappers import ParallelPostFitel = ParallelPostFit(estimator=ElasticNet)el.fit(Xtrain, ytrain)preds = el.predict(Xtest)
如果数据集不大但模型很大,则可以使用joblib。sklearns编写了许多用于并行执行的算法(你可能使用过n_jobs=-1参数),joblib该算法利用线程和进程来并行化工作负载。要用于Dask并行化,你可以创建一个Client(客户端)(必须),然后使用with joblib.parallel_backend('dask'):包装代码。
import dask_ml.joblibfrom sklearn.externals import joblibclient = Clientwith joblib.parallel_backend('dask'): # 你的 scikit-learn 代码
注意:DASK JOBLIB后端对于扩展CPU绑定的工作负载非常有用; 在RAM中包含数据集的工作负载,但具有许多可以并行完成的单独操作。要扩展到受RAM约束的工作负载(大于内存的数据集),你应该使用Dask的内置模型和方法。
而且,如果你训练的数据太大而无法容纳到内存中,那么你应该使用Dask的内置估算器来加快速度。你也可以使用Dask的wrapper.Incremental它使用基础估算器的partial_fit方法对整个数据集进行训练,但实际上是连续的。
Dask的内置估计器很好地扩展用于大型数据集与多种优化算法,如admm,lbfgs,gradient_descent等,并且正则化器如 L1,L2,ElasticNet等。
from dask_ml.linear_model import LogisticRegressionlr = LogisticRegressionlr.fit(X, y, solver="lbfgs")
经过4期的内容讲解,你学会加快Python算法的四种方法了么?
数据分析咨询请扫描二维码
若不方便扫码,搜微信号:CDAshujufenxi
数字化经营时代,企业的市场竞争早已从经验决策转向数据决策。门店营收、用户转化、产品销量、成本损耗、存量资产等所有经营行为 ...
2026-05-22在MySQL数据库日常运维、业务数据校验、数据迁移与数据清洗场景中,自增主键ID的连续性校验是一项基础且关键的工作。MySQL的Auto ...
2026-05-22 很多企业团队并非缺乏指标,而是陷入“指标失控”:仪表盘上堆满实时跳动的数据,却无法回答“当前瓶颈在哪、下一步该做什么 ...
2026-05-22【核心关键词】大数据、可视化、存储、架构、客户、离线、产品、同步、实时、数据仓库、数据分析、数据可视化、存储数据、离线 ...
2026-05-21在电商流量红利消退、公域获客成本持续走高的当下,存量用户深度挖掘已成为店铺增收增效的核心抓手。相较于付费投放获取的陌生新 ...
2026-05-21 很多数据分析师每天盯着几十个指标,但当被问到“这套指标要支撑什么业务目标”“指标之间是什么逻辑关系”“业务变化时如何 ...
2026-05-21在数据驱动决策的时代,数据质量直接决定分析结果的可靠性与准确性,而异常值作为数据清洗中的核心痛点,往往会扭曲分析结论、误 ...
2026-05-20 很多数据分析师每天盯着GMV、DAU、转化率,但当被问到“哪些指标在所有行业都适用”“哪些指标只对电商有意义”“二者如何搭 ...
2026-05-20Agent的能力边界,很大程度上取决于其掌握的Skill质量和数量。传统做法是靠人工编写和维护Skill,但这条路很快会遇到瓶颈。业务 ...
2026-05-20在统计分析中,方差分析(ANOVA)是一种常用的假设检验方法,核心用于分析“一个或多个自变量对单个因变量的影响”,广泛应用于 ...
2026-05-19 很多数据分析师每天盯着GMV、DAU、转化率,但当被问到“什么是指标”“指标和维度有什么区别”“如何定义指标值的计算规则和 ...
2026-05-19想高效备考 CDA 一级,拒绝盲目刷题、冗余学习?《CDA 一级教材知识手册》重磅来袭!以官方教材为核心,浓缩 13 章 103 个核心考 ...
2026-05-19在数据统计分析中,卡方检验是一种常用的非参数检验方法,核心用于判断两个或多个分类变量之间是否存在显著关联,广泛应用于市场 ...
2026-05-18在企业数字化转型的浪潮中,很多企业陷入了“技术堆砌”的误区——上线了ERP、CRM、BI等各类系统,积累了海量数据,却依然面临“ ...
2026-05-18小陈是某电商平台的数据分析师。老板交给他一个任务:“我们平台的注册用户已经突破1000万了,想了解一下用户的平均月消费金额。 ...
2026-05-18【专访摘要】本次CDA持证专访邀请到拥有丰富物流供应链数据分析经验的赖尧,他结合自身在京东、华莱士、兰格赛等企业的从业经历 ...
2026-05-15在数字化时代,企业的每一次业务优化、每一项技术迭代,都需要回答一个核心问题:这个动作到底能带来多少价值?是提升了用户转化 ...
2026-05-15在数据仓库建设中,事实表与维度表是两大核心组件,二者相互关联、缺一不可,共同构成数据仓库的基础架构。事实表聚焦“发生了什 ...
2026-05-15 很多数据分析师沉迷于复杂的机器学习算法,却忽略了数据分析最基础也最核心的能力——描述性统计。事实上,80%的商业分析问 ...
2026-05-15【核心关键词】互联网、机会、运营、关键词、账户、数字化、后台、客户、成本、网络、数据分析、底层逻辑、市场推广、数据反馈 ...
2026-05-14