[大数据与分布式系统]大数据架构之数据处理

Posted by Yinhj on November 10, 2016

离线批量处理

海量处理计算框架:

MapReduce; 管理框架YARN改进了MapReduce的缺点; 计算框架Spark更迅速; 在此基础上,还提出了hive,pig,impala,spark sql等工具。

MapReduce:

1.分割Data splitting: 数据分片发送到Mapper。 2.映射Mapping: key-value映射。 (合并combining ,在每个mapper节点进行本地归约) 3.洗牌Shuffing: 将key-value的配对发给reducer归约。(可能还包括分配partitioning,例如专门的地点reducer) 4.归约reducing:分析配对,键内容相同,则将值合并。 图片发自简书App

图片发自简书App

1.工作跟踪节点:决定处理哪些文件、为不同任务分配节点,监控所有任务运行。 2.任务跟踪节点:从节点上运行分配的单项任务。

缺点: 1.工作节点完成太多功能、资源消耗,单点故障隐患。 2.任务节点用任务数量衡量负载过于简单,map和reduce任务严格划分,可能导致系统资源未充分利用。

YARN:

为上层应用提供统一hadoop资源管理和调度。 将主节点分成两个独立服务程序:全局资源管理和针对每个应用的主节点。让子任务的监测分布式处理。在YARN基础上,还可以运行spark和storm这样的流式计算和实时性作业。

Spark:

基于内存计算 弹性分布式数据集(RDD),容错机制 适用机器学习等需要多次迭代的算法 scala语言 兼容HDFS和HBase等分布式存储,可以运行在YARN中。

运用spark的公司

Hortonworks, IBM, Cloudera, MapR, Pivotal。

sparksql

基于catalyst的交互式sql 支持hive的多种数据格式

spark streaming

针对实时数据流处理,可对数据进行map, reduce, join等高级操作,将结果输出到文件系统或数据库。

spark graphX

提供用于图计算的api,定义了table和graph两种视图,有自己独自的操作符。

MLBase

提供简单声明方法指定机器学习任务,动态选择较优的学习算法。

spark R

开源数据分析软件,通过RDD提供的api,使用R在集群中提交并运行任务。

Hive:

hadoop基础上的数据库工具,存储、查询、分析存储在HDFS中的大数据。 1.采用HiveQL来实现数据的提取、转化和加载,将sql转化为mapreduce任务在后台运行。 2.也可自定义开发mapper和reducer来处理內建模块无法完成的复杂工作。

架构包括用户端、解释器、元数据存储和分析数据存储。 1.用户端:主要包含命令行(CLI)、客户端(Client)和Web图形化界面(WebGUI)。最常用的是CLI,它启动的时候会同时启动一个Hive守护进程服务,使用者可以交互式地输入命令并得到相应的结果输出。Client是Hive的客户端,用户通过它连接到Hive的服务器。Client模式启动的时候,需要启动Hive服务器所在的节点,并进行相应的配置。WebGUI工具允许用户通过浏览器访问Hive,使用前要启动HWI组件(Hive Web Interface)。

·解释器:主要包含执行编译器、优化器和执行器,它们完成HiveQL查询语句的词法分析、语法分析、编译、优化及计划的生成。生成的查询计划也会存储在HDFS中,并在随后通过MapReduce框架调用执行。这也体现了Hive的核心思想之一,就是尽量简化MapReduce开发的工作量,使得某些操作和查询的复杂逻辑对使用者完全透明。

·元数据存储:Hive中的元数据包括表的名字、表的列、表分区、表数据所在的目录、是否为外部表,等等。尽管Hive采用NoSQL的方式进行工作,但它仍然使用关系型数据库存储元数据,这点主要是考虑到元数据的规模较小,而对读写同步的要求很高。此外,将元数据的存储从Hive的数据服务中解耦出来,可以大大减少执行语义检查的时间,也能提高整个系统运行的健壮性。常用的关系型数据库配置是MySQL或Derby嵌入式数据库。

·分析数据存储:Hive用于分析的海量数据都存储在HDFS之中,支持不同的存储类型包括纯文本文件、HBase等文件。一旦解释器接受了HiveQL,那么Hive将直接读取HDFS的数据,并将查询逻辑转化成MapReduce计算来完成。

Hive的数据模型包括几个主要概念:数据库(Database)、表(Table)、分区(Partition)和桶(Bucket)。

·数据库:作用是将用户的应用隔离到不同的数据模式中,Hive 0.6.0之后的版本都支持数据库,相当于关系型数据库里的命名空间(Namespace)。

·表:Hive的表和数据库中的表在概念上非常接近,在逻辑上,其由描述表格形式的元数据和存储于其中的具体数据共同组成,可以分为托管表和外部表。对托管表执行DROP命令的时候,会同时删除元数据和其中存储的数据,而对外部表执行该命令的时候,则只能删除元数据。

·分区:Hive中的分区方式和数据库中的差异很大,它的概念是根据分区列对表中的数据进行大致地划分,表的分区在Hive存储上就体现为主目录下的多个子目录,而子目录的名称就是分区列的名称。使用分区的好处在于,查询某个具体分区列里的数据时不用进行全表扫描,可以大大加快范围内的查询。

·桶:表和分区都是在目录级别上进行数据的拆分,而桶则是对数据源数据文件本身进行数据拆分。 图片发自简书App

·查询语言:由于底层依赖于Hadoop的平台,因此HiveQL不支持更新操作,也不支持索引和事务,子查询和连接(Join)操作也很有限。另外,HiveQL还有些特点是关系型SQL所无法企及的,比如和MapReduce计算过程的集成和多表查询。

·存储方式和计算模型:Hive和关系型数据库相比,存储和计算的方式也不同。Hive使用的是Hadoop的HDFS分布式文件系统和mapreduce。

·实时性:由于架构在Hadoop之上,Hive也继承了其批处理的方式,因此在作业提交和调度的时候需要大量的开销,并且不能在大规模数据集上实现低延迟地快速查询,自然相较于关系型数据库而言其实时性就较差了。

·扩展性、并行性和容错性:好。

Pig, Impala, SPARK SQL:

有些开发者,虽然对SQL不甚理解,但是擅于MapReduce的编程。Pig就是在这样的背景下应运而生的,Pig为大型数据集的处理提供了更高层次的抽象,以及更丰富的数据结构。

另一个执行于现有Hadoop基础设施上的互动SQL查询引擎是Impala,它是Cloudera公司主导开发的查询系统,目前的最新版本是2.1。类似Apache Hive,Impala也能通过类SQL的语言查询存储在HDFS和HBase中的PB级大数据。不过,Impala考虑了实时性更强的需求,为了实现这一点,Impala参考了Google的交互式数据分析系统Dremel。Impala使用Parquet实现了列存储,并借鉴了MPP并行数据库的思想。同时,它采用HiveQL和JDBC等接口,进行全局统一的元数据存储和读取。对于用户查询则是直接进行分布式处理,在HDFS或HBase上本地读写,因此具有良好的扩展性和容错性。此外,由于放弃了MapReduce的运行框架,它也没有MapReduce作业启动、洗牌、排序等开销,无须将中间结果写入磁盘,节省了大量的I/O开销,也降低了网络传输的数据量。当然,Impala并不是用来取代现有的MapReduce框架的,而是作为MapReduce的一个强力补充。一般而言,Impala更适合处理输出数据较小的查询请求,而对于大数据量的批处理任务,MapReduce依然是更好的选择。

Spark SQL,它是基于Catalyst引擎的交互式SQL技术,主要优化了以下几个方面。

·执行策略:Spark SQL在Hive兼容层面仅仅依赖于HiveQL解析器和元数据存储。从HiveQL被解析成抽象语法树之后,就全部由Spark SQL来接管了。执行计划的生成和优化均由Catalyst引擎来负责,借助Scala的模式匹配等函数式语言的特性,其策略比Hive更为简洁。

·进入门槛:Spark SQL能够对原生RDD对象进行关系查询,因此大大降低了用户门槛。虽然在很多方面Spark的性能优于Hadoop的MapReduce,但其运行模型也比MapReduce精细不少,这就使得Spark应用的性能调优比较复杂。单纯使用Spark的接口开发是需要花些学习成本的。这时就体现出了Spark SQL的优势——相比底层接口,SQL语言的接受程度更高,这和Hive相对于MapReduce的情况类似。更何况Catalyst引擎还提供了一系列常见的优化策略来协助用户实现目标。

·对Hive的依赖:相对于Shark,Spark SQL进一步削减了对Hive的依赖,不再需要自行维护打了补丁的Hive分支。因此,Shark后续将全面采用Spark SQL作为引擎,而不仅仅是查询优化方面。

提升及时性:消息机制

两种模型:点对点、发布订阅模型。 无论是哪种消息传送模式,都可以提升数据更新的及时性,并对复杂的系统架构进行解耦。举个例子,对于重点观察的用户行为,如果还是通过Flume这样的批量采集方式,可能无法达到业务方提出的实时监控和分析的需求。而消息的机制可以很好地解决这个问题。另外,由于消息机制的实时性更强,通常它还会和稍后介绍的Spark Streaming、Storm这样的流式计算结合起来使用。 ApacheMQ: ActiveMQ的主要目标是在尽可能多的跨平台和跨语言上提供一个统一的、标准的消息驱动的应用集成。 ActiveMQ同样以异步的形式提供松耦合的应用架构。 JMS规范保证了同步和异步消息传递、一次和仅一次的传递、对于订阅者的消息持久化,等等。基于JMS规范使得ActiveMQ和其他消息提供者拥有类似的基本特性。 支持多种连接协议:ActiveMQ提供了各种连接选择,包括HTTP、HTTPS、SSL、TCP、UDP、XMPP、IP多点传送等。 ActiveMQ既支持标准的JDBC方案,也可以通过KahaDB提供超快的持久方案。 多个ActiveMQ代理可以通过代理网络(Network of Brokers)进行联合的工作。 ActiveMQ提供了各种简便而又强大的管理方式,除了Java语言中最基本的JConsole,还有ActiveMQ Web Console、消息报告和各种系统日志等。

Kafka: ·高性能存储:通过特别设计的磁盘数据结构,保证时间复杂度为O(1)的消息持久化,这样数以TB的消息存储也能够保持良好的稳定性能。此外,被保存的消息可以多次被消费,用于商务智能ETL和其他一些实时应用程序。 ·天生分布式:Kafka被设计为一个分布式系统,它利用ZooKeeper来管理多个代理(Broker),支持负载均衡和副本机制,易于横向地扩展。ZooKeeper旨在构建可靠的、分布式的数据结构,这里用于管理和协调Kafka代理。当系统中新增了代理,或者某个代理故障失效时,ZooKeeper服务会通知生产者和消费者,让它们据此开始与其他代理协调工作。 ·高吞吐量:由于存储性能的大幅提升,以及良好的横向扩展性,因此即使是非常普通的硬件Kafka也可以支持每秒数十万的消息流,同时为发布和订阅提供惊人的吞吐量。 ·无状态代理:与其他消息系统不同,Kafka代理是无状态的。代理不会记录消息被消费的状态,而是需要消费者各自维护。 ·主题(Topic)和分区(Partition):支持通过Kafka服务器和消费机集群来分区消息。一个主题可以认为是一类消息,而每个主题可以分成多个分区。通过分区,可以将数据分散到多个服务器上,避免达到单机瓶颈。更多的分区意味着可以容纳更多的消费者,有效提升并发消费的能力。基于副本方案,还能够对多个分区进行备份和调度。 ·消费者分组(Consumer Group):Kafka中每个消费者均属于一个分组,每个分组中可以有多个消费者。主题中的某条消息可以被多个分组获得,不过同一分组中,只有一个消费者会获得该消息。

考虑到重复接收数据总比丢失数据要好,通常情况下Kafka的“至少一次”机制是使用者的首选。整体而言,对于一些常规的消息系统,Kafka是个理想的选择。内在的分布式设计、分区和副本,使得其具有良好的扩展性、容错性和性能优势。不过,目前Kafka并没有提供JMS中的事务性消息传输,无法严格地保证消息一定被处理,或者只被处理一次,适合那些对一致性要求不高的应用场景。 kafka架构

在线实时处理:

Storm: Storm为分布式实时计算提供了一组通用原语,这是管理队列及工作者集群的另一种方式。在计算时就将结果以流的形式输出给用户,从而进一步提升实时性。

·元组:这是Storm中使用的一种数据结构,包含了若干个键–值对(Key-Value Pair)的列表,这里的键–值对的定义和第3章散列表中所提到的类似。元组以一种分布式的方式并行地在Storm集群上进行创建和处理。 ·数据流:数据流是Storm中非常重要的一个抽象概念,是一个没有边界的元组序列,由Spout和Bolt进行发送和转发。对数据流的定义主要就是对其中的元组进行定义,此外还需要为其分配唯一的标识ID。 ·Spout:英文单词Spout翻译过来就是水龙头的意思,顾名思义它是提供数据源的,是一个计算任务中数据的生产者。Spout可以从数据库或文件系统等加载数据,然后作为入口,向由若干节点组成的拓扑结构中发送数据流。每个Spout都可以发送多个数据流,同时也可以按照送达的可靠性划分等级。 ·Bolt:可以将其理解为运算或函数,用于将一个或多个数据流作为输入,实施加工处理后,再进行新数据流的输出。Bolt可以接受Spout或其他Bolt发送的数据,并据此建立复杂的流转网络,形成最终的拓扑结构,完成对整条流水线数据的操作。Storm计算中的逻辑几乎都在Bolt中完成,例如过滤、分类、聚集、计算、查询数据库等。 ·流量分组:它决定了Spout和Bolt节点之间相互连接的方式,主要分为以下几种类型。 ·洗牌分组(Shuffle Grouping):随机地将元组分发到各个Bolt上,理论上这样做的结果是每个Bolt都会接收到同样数量的元组。 ·按字段值分组(Fields Grouping):按照指定的元组字段来进行分组。例如,按照“水质”来划分,那么具有同等质量的水源会被分到一组,发送到同一个或同一组Bolt上。这个逻辑和Hadoop中的MapReduce框架非常相似,这样一来,数据流上游的Spout或Bolt节点就和Mapper比较接近,而下游的Bolt节点则和Reducer比较接近。 ·广播(All):所有的元组都会发送到所有的Bolt上。 ·全局(Global):所有的元组都发送到全局指定的某个Bolt上。 ·不做指定(None):目前等同于洗牌分组,将来可能会进行新的定义扩充。 ·指定分组(Direct):明确指定元组发送到哪个确切的Bolt上。 ·拓扑结构:它是由流量分组连接起来的Spout和Bolt节点网络。在Storm中,一个实时计算应用程序的逻辑被封装在一个拓扑对象中,也可以称为计算拓扑。如果和Hadoop的生态系统对比,拓扑结构类似于MapReduce的任务,但是它们之间的关键区别在于,一个MapReduce任务最终总是会结束的,然而一个Storm的拓扑结构会一直运行,直到使用者主动关闭或出现异常。 图片发自简书App

SparkStreaming: Spark Streaming会把每块数据作为一个RDD,每个块都会生成一个Spark的任务来进行处理,最终结果也会返回多块。由于使用了这种设计模式,因此Spark Streaming可以同时兼容批量和实时数据的处理逻辑,以便于历史数据的融合。 Spark Streaming是处理某个时间段窗口内的事件,而Storm处理的是每次传入的单个事件,理论上Spark Streaming的延时性要比Storm略高。不过RDD的机制赋予了Spark Streaming更高的灵活性和容错性。在Storm中,每个单独的记录通过系统时必须被跟踪,这样Storm才能够保证每个记录至少被处理一次。但是在从错误中恢复过来的时候Storm允许出现重复记录,这就意味着某些状态可能被错误地更新多次。而Spark Streaming只需要在批量级别进行跟踪处理,即便一个节点发生故障,也可以有效地保证每个时间窗内的小量数据被完整地处理一次。