
使用Storm实现实时大数据分析
简单和明了,Storm让大变得轻松加愉快。
当今世界,公司的日常运营经常会生成TB级别的数据。数据来源囊括了互联网装置可以捕获的任何类型数据,网站、社交媒体、交易型商业数据以及其它商业环境中创建的数据。考虑到数据的生成量,实时处理成为了许多机构需要面对的首要挑战。我们经常用的一个非常有效的开源实时计算工具就是Storm —— Twitter开发,通常被比作“实时的Hadoop”。然而Storm远比Hadoop来的简单,因为用它处理大数据不会带来新老技术的交替。
Shruthi Kumar、Siddharth Patankar共同效力于Infosys,分别从事技术分析和研发工作。本文详述了Storm的使用方法,例子中的项目名称为“超速报警系统(Speeding Alert System)”。我们想实现的功能是:实时分析过往车辆的数据,一旦车辆数据超过预设的临界值 —— 便触发一个trigger并把相关的数据存入数据库。
Storm
对比Hadoop的批处理,Storm是个实时的、分布式以及具备高容错的计算系统。同Hadoop一样Storm也可以处理大批量的数据,然而Storm在保证高可靠性的前提下还可以让处理进行的更加实时;也就是说,所有的信息都会被处理。Storm同样还具备容错和分布计算这些特性,这就让Storm可以扩展到不同的机器上进行大批量的数据处理。他同样还有以下的这些特性:
易于扩展。对于扩展,你只需要添加机器和改变对应的topology(拓扑)设置。Storm使用Hadoop Zookeeper进行集群协调,这样可以充分的保证大型集群的良好运行。
每条信息的处理都可以得到保证。
Storm集群管理简易。
Storm的容错机能:一旦topology递交,Storm会一直运行它直到topology被废除或者被关闭。而在执行中出现错误时,也会由Storm重新分配任务。
尽管通常使用Java,Storm中的topology可以用任何语言设计。
当然为了更好的理解文章,你首先需要安装和设置Storm。需要通过以下几个简单的步骤:
从Storm官方下载Storm安装文件
将bin/directory解压到你的PATH上,并保证bin/storm脚本是可执行的。
Storm组件
Storm集群主要由一个主节点和一群工作节点(worker node)组成,通过 Zookeeper进行协调。
主节点:
主节点通常运行一个后台程序 —— Nimbus,用于响应分布在集群中的节点,分配任务和监测故障。这个很类似于Hadoop中的Job Tracker。
工作节点:
工作节点同样会运行一个后台程序 —— Supervisor,用于收听工作指派并基于要求运行工作进程。每个工作节点都是topology中一个子集的实现。而Nimbus和Supervisor之间的协调则通过Zookeeper系统或者集群。
Zookeeper
Zookeeper是完成Supervisor和Nimbus之间协调的服务。而应用程序实现实时的逻辑则被封装进Storm中的“topology”。topology则是一组由Spouts(数据源)和Bolts(数据操作)通过Stream Groupings进行连接的图。下面对出现的术语进行更深刻的解析。
Spout:
简而言之,Spout从来源处读取数据并放入topology。Spout分成可靠和不可靠两种;当Storm接收失败时,可靠的Spout会对tuple(元组,数据项组成的列表)进行重发;而不可靠的Spout不会考虑接收成功与否只发射一次。而Spout中最主要的方法就是nextTuple(),该方法会发射一个新的tuple到topology,如果没有新tuple发射则会简单的返回。
Bolt:
Topology中所有的处理都由Bolt完成。Bolt可以完成任何事,比如:连接的过滤、聚合、访问文件/数据库、等等。Bolt从Spout中接收数据并进行处理,如果遇到复杂流的处理也可能将tuple发送给另一个Bolt进行处理。而Bolt中最重要的方法是execute(),以新的tuple作为参数接收。不管是Spout还是Bolt,如果将tuple发射成多个流,这些流都可以通过declareStream()来声明。
Stream Groupings:
Stream Grouping定义了一个流在Bolt任务间该如何被切分。这里有Storm提供的6个Stream Grouping类型:
1. 随机分组(Shuffle grouping):随机分发tuple到Bolt的任务,保证每个任务获得相等数量的tuple。
2. 字段分组(Fields grouping):根据指定字段分割数据流,并分组。例如,根据“user-id”字段,相同“user-id”的元组总是分发到同一个任务,不同“user-id”的元组可能分发到不同的任务。
3. 全部分组(All grouping):tuple被复制到bolt的所有任务。这种类型需要谨慎使用。
4. 全局分组(Global grouping):全部流都分配到bolt的同一个任务。明确地说,是分配给ID最小的那个task。
5. 无分组(None grouping):你不需要关心流是如何分组。目前,无分组等效于随机分组。但最终,Storm将把无分组的Bolts放到Bolts或Spouts订阅它们的同一线程去执行(如果可能)。
6. 直接分组(Direct grouping):这是一个特别的分组类型。元组生产者决定tuple由哪个元组处理者任务接收。
当然还可以实现CustomStreamGroupimg接口来定制自己需要的分组。
项目实施
当下情况我们需要给Spout和Bolt设计一种能够处理大量数据(日志文件)的topology,当一个特定数据值超过预设的临界值时促发警报。使用Storm的topology,逐行读入日志文件并且监视输入数据。在Storm组件方面,Spout负责读入输入数据。它不仅从现有的文件中读入数据,同时还监视着新文件。文件一旦被修改Spout会读入新的版本并且覆盖之前的tuple(可以被Bolt读入的格式),将tuple发射给Bolt进行临界分析,这样就可以发现所有可能超临界的记录。
下一节将对用例进行详细介绍。
临界分析
这一节,将主要聚焦于临界值的两种分析类型:瞬间临界(instant thershold)和时间序列临界(time series threshold)。
瞬间临界值监测:一个字段的值在那个瞬间超过了预设的临界值,如果条件符合的话则触发一个trigger。举个例子当车辆超越80公里每小时,则触发trigger。
时间序列临界监测:字段的值在一个给定的时间段内超过了预设的临界值,如果条件符合则触发一个触发器。比如:在5分钟类,时速超过80KM两次及以上的车辆。
Listing One显示了我们将使用的一个类型日志,其中包含的车辆数据信息有:车牌号、车辆行驶的速度以及数据获取的位置。
AB 12360North city
BC 12370South city
CD 23440South city
DE 12340East city
EF 12390South city
GH 12350West city
这里将创建一个对应的XML文件,这将包含引入数据的模式。这个XML将用于日志文件的解析。XML的设计模式和对应的说明请见下表。
XML文件和日志文件都存放在Spout可以随时监测的目录下,用以关注文件的实时更新。而这个用例中的topology请见下图。
Figure 1:Storm中建立的topology,用以实现数据实时处理
如图所示:FilelistenerSpout接收输入日志并进行逐行的读入,接着将数据发射给ThresoldCalculatorBolt进行更深一步的临界值处理。一旦处理完成,被计算行的数据将发送给DBWriterBolt,然后由DBWriterBolt存入给数据库。下面将对这个过程的实现进行详细的解析。
Spout的实现
Spout以日志文件和XML描述文件作为接收对象。XML文件包含了与日志一致的设计模式。不妨设想一下一个示例日志文件,包含了车辆的车牌号、行驶速度、以及数据的捕获位置。(看下图)
Figure2:数据从日志文件到Spout的流程图
Listing Two显示了tuple对应的XML,其中指定了字段、将日志文件切割成字段的定界符以及字段的类型。XML文件以及数据都被保存到Spout指定的路径。
Listing Two:用以描述日志文件的XML文件。
<
TUPLEINFO
>
<
FIELDLIST
>
<
FIELD
>
<
COLUMNNAME
>
vehicle_number
COLUMNNAME
>
<
COLUMNTYPE
>
string
COLUMNTYPE
>
FIELD
>
<
FIELD
>
<
COLUMNNAME
>
speed
COLUMNNAME
>
<
COLUMNTYPE
>
int
COLUMNTYPE
>
FIELD
>
<
FIELD
>
<
COLUMNNAME
>
location
COLUMNNAME
>
<
COLUMNTYPE
>
string
COLUMNTYPE
>
FIELD
>
FIELDLIST
>
<
DELIMITER
>
,
DELIMITER
>
TUPLEINFO
>
通过构造函数及它的参数Directory、PathSpout和TupleInfo对象创建Spout对象。TupleInfo储存了日志文件的字段、定界符、字段的类型这些很必要的信息。这个对象通过XSTream序列化XML时建立。
Spout的实现步骤:
对文件的改变进行分开的监听,并监视目录下有无新日志文件添加。
在数据得到了字段的说明后,将其转换成tuple。
声明Spout和Bolt之间的分组,并决定tuple发送给Bolt的途径。
Spout的具体编码在Listing Three中显示。
Listing Three:Spout中open、nextTuple和delcareOutputFields方法的逻辑。
public void open( Map conf, TopologyContext context,SpoutOutputCollector collector )
{
_collector = collector;
try
{
fileReader = new BufferedReader(new FileReader(new File(file)));
}
catch (FileNotFoundException e)
{
System.exit(1);
}
}
public void nextTuple()
{
protected void ListenFile(File file)
{
Utils.sleep(2000);
RandomAccessFile access = null;
String line = null;
try
{
while ((line = access.readLine()) != null)
{
if (line !=null)
{
String[] fields=null;
if (tupleInfo.getDelimiter().equals("|")) fields = line.split("\\"+tupleInfo.getDelimiter());
else
fields = line.split (tupleInfo.getDelimiter());
if (tupleInfo.getFieldList().size() == fields.length) _collector.emit(new Values(fields));
}
}
}
catch (IOException ex){ }
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer)
{
String[] fieldsArr = new String [tupleInfo.getFieldList().size()];
for(int i=0; i
{
fieldsArr[i] = tupleInfo.getFieldList().get(i).getColumnName();
}
declarer.declare(new Fields(fieldsArr));
}
declareOutputFileds()决定了tuple发射的格式,这样的话Bolt就可以用类似的方法将tuple译码。Spout持续对日志文件的数据的变更进行监听,一旦有添加Spout就会进行读入并且发送给Bolt进行处理。
Bolt的实现
Spout的输出结果将给予Bolt进行更深一步的处理。经过对用例的思考,我们的topology中需要如Figure 3中的两个Bolt。
Figure 3:Spout到Bolt的数据流程。
ThresholdCalculatorBolt
Spout将tuple发出,由ThresholdCalculatorBolt接收并进行临界值处理。在这里,它将接收好几项输入进行检查;分别是:
临界值检查
临界值栏数检查(拆分成字段的数目)
临界值数据类型(拆分后字段的类型)
临界值出现的频数
临界值时间段检查
Listing Four中的类,定义用来保存这些值。
Listing Four:ThresholdInfo类
public class ThresholdInfo implementsSerializable
{
private String action;
private String rule;
private Object thresholdValue;
private int thresholdColNumber;
private Integer timeWindow;
private int frequencyOfOccurence;
}
基于字段中提供的值,临界值检查将被Listing Five中的execute()方法执行。代码大部分的功能是解析和接收值的检测。
Listing Five:临界值检测代码段
public void execute(Tuple tuple, BasicOutputCollector collector)
{
if(tuple!=null)
{
List
数据分析咨询请扫描二维码
若不方便扫码,搜微信号:CDAshujufenxi
机器学习解决实际问题的核心关键:从业务到落地的全流程解析 在人工智能技术落地的浪潮中,机器学习作为核心工具,已广泛应用于 ...
2025-09-09SPSS 编码状态区域中 Unicode 的功能与价值解析 在 SPSS(Statistical Product and Service Solutions,统计产品与服务解决方案 ...
2025-09-09CDA 数据分析师:驾驭商业数据分析流程的核心力量 在商业决策从 “经验驱动” 向 “数据驱动” 转型的过程中,商业数据分析总体 ...
2025-09-09R 语言:数据科学与科研领域的核心工具及优势解析 一、引言 在数据驱动决策的时代,无论是科研人员验证实验假设(如前文中的 T ...
2025-09-08T 检验在假设检验中的应用与实践 一、引言 在科研数据分析、医学实验验证、经济指标对比等领域,常常需要判断 “样本间的差异是 ...
2025-09-08在商业竞争日益激烈的当下,“用数据说话” 已从企业的 “加分项” 变为 “生存必需”。然而,零散的数据分析无法持续为业务赋能 ...
2025-09-08随机森林算法的核心特点:原理、优势与应用解析 在机器学习领域,随机森林(Random Forest)作为集成学习(Ensemble Learning) ...
2025-09-05Excel 区域名定义:从基础到进阶的高效应用指南 在 Excel 数据处理中,频繁引用单元格区域(如A2:A100、B3:D20)不仅容易出错, ...
2025-09-05CDA 数据分析师:以六大分析方法构建数据驱动业务的核心能力 在数据驱动决策成为企业共识的当下,CDA(Certified Data Analyst) ...
2025-09-05SQL 日期截取:从基础方法到业务实战的全维度解析 在数据处理与业务分析中,日期数据是连接 “业务行为” 与 “时间维度” 的核 ...
2025-09-04在卷积神经网络(CNN)的发展历程中,解决 “梯度消失”“特征复用不足”“模型参数冗余” 一直是核心命题。2017 年提出的密集连 ...
2025-09-04CDA 数据分析师:驾驭数据范式,释放数据价值 在数字化转型浪潮席卷全球的当下,数据已成为企业核心生产要素。而 CDA(Certified ...
2025-09-04K-Means 聚类:无监督学习中数据分群的核心算法 在数据分析领域,当我们面对海量无标签数据(如用户行为记录、商品属性数据、图 ...
2025-09-03特征值、特征向量与主成分:数据降维背后的线性代数逻辑 在机器学习、数据分析与信号处理领域,“降维” 是破解高维数据复杂性的 ...
2025-09-03CDA 数据分析师与数据分析:解锁数据价值的关键 在数字经济高速发展的今天,数据已成为企业核心资产与社会发展的重要驱动力。无 ...
2025-09-03解析 loss.backward ():深度学习中梯度汇总与同步的自动触发核心 在深度学习模型训练流程中,loss.backward()是连接 “前向计算 ...
2025-09-02要解答 “画 K-S 图时横轴是等距还是等频” 的问题,需先明确 K-S 图的核心用途(检验样本分布与理论分布的一致性),再结合横轴 ...
2025-09-02CDA 数据分析师:助力企业破解数据需求与数据分析需求难题 在数字化浪潮席卷全球的当下,数据已成为企业核心战略资产。无论是市 ...
2025-09-02Power BI 度量值实战:基于每月收入与税金占比计算累计税金分摊金额 在企业财务分析中,税金分摊是成本核算与利润统计的核心环节 ...
2025-09-01巧用 ALTER TABLE rent ADD INDEX:租房系统数据库性能优化实践 在租房管理系统中,rent表是核心业务表之一,通常存储租赁订单信 ...
2025-09-01