流式处理框架 & Spark Streaming & Flume & Kafka

流式处理框架 Spark Streaming

  • 很多企业为了支持决策分析而构建的数据仓库系统,其中存放的大量历史数据就是静态数据。
  • 对于技术人员来说,可以利用数据挖掘和OLAP(On-Line Analytical Processing)分析工具从中找到对企业有价值的信息,这也是离线批处理的一般过程。

目录

  1. 流式处理框架基础
  2. Flume
  3. Kafka

1. 流式处理框架基础

1.1 流数据一般特征

流数据具有如下特征:

  • 数据快速持续到达,潜在大小也许是无穷无尽的;
    • 数据来源众多,格式复杂;
    • 数据量大,但是不十分关注存储,一旦经过处理,要么被丢弃,要么被归档存储;
    • 注重数据的整体价值,不过分关注个别数据;
    • 数据顺序颠倒,或者不完整,系统无法控制将要处理的新到达的数据元素的顺序;

因此,用于静态数据存储的存储工具和批处理计算框架便无法满足流式计算、实时分析的应用需求。

1.2 实时计算的核心框架

1.3 数据源

为何数据源在流式计算中显得异常重要

所谓数据源,是指为了满足不同需求而不断输出数据的框架。不同于针对静态数据的批处理,要进行实时计算,就必须有足够“实时”且能够稳定运行的消息“投递者”,它必须能够高效的整合更低一级数据源发送的实时数据,并根据处理框架的API对数据进行规范化处理和转存,同时,对于流式处理系统而言,很多时候经过流式处理框架处理后的数据也是流数据,也可视作一种数据源。

数据源的高低级之分

对于流式处理框架而言,数据源的高低级之分,其实就是数据“输出”框架的高低级之分。任何数据源,要进行收集、转存和传递等工作,就需要有与之对应的处理框架,其框架越完善、功能越强大、且能够提供更便于实时处理的数据,我们就称其高级数据源,反之就称为低级数据源。

数据源本身的“处理框架”

这其实并不是规范的说法,通常来说,低级数据源我们会直接称呼其数据流,如文件流、套接字流等,而高级数据源,我们更多的会根据其作用,称其为日志收集系统(如Flume),或分布式消息队列(如Kafka)。对于初学者而言,我们可以将这些复杂的框架名称统一想象成一个数据的中转站,这些框架的核心作用都是收集、暂存然后投递数据。

大数据分析师的知识边界

一方面,我们要进行流式计算,就必须了解数据源对计算框架投递消息的方法,如此才能根据数据源设计算法,或者根据计算需求调整数据源;另一方面,我们也要区别大数据分析师与ETL工程师的区别,我们只需要了解与计算框架最近一层的数据源是如何与计算框架相互嵌套的既可,而无需再深究该数据源更低一层数据源、甚至是数据产生第一层数据源是如何工作的。

2. Flume

2.1 Flume介绍

Flume是Cloudera公司开发的分布式、高可用的日志收集系统。现已开源给Apache。
目前比较有代表性的日志收集系统,除了Flume之外,还有FaceBook的Scribe。

Flume原始版本为OG,后经过整体架构的重新设计,以改名为Flume-NG。Flume发展至今,已不限于日志收集,还可以通过简单的配置,收集不同数据源的海量数据并将数据准确高效的传输到不同的处理框架或数据存储系统。
目前Flume可对接的主流大数据框架有Hadoop、Kafka、Hive、HBase、Spark等。

2.2 Flume基本架构

Flume-NG采用三层架构设计,分别对应其三个核心组件:Source、Channel和Sink,其中,Source主要功能为收集上一层数据源传输过来的数据,Channel用于数据的暂时存储,Sink则用于数据的处理,即数据的传输对象和传输方式。其基本架构如下图所示:

其中,一个Agent代表一个Java进程,上图表示一个Event(数据)在一个Agent的传输流程。

  • Event:一条消息或者一个数据,具有可选的头信息,在头信息中可以设置时间戳、主机名等信息;
  • Source:数据源,接收或者收集不同形式的数据源;
  • Channel:Event的临时缓冲区,Source接收的Event会先发送到Channel进行缓存,在缓冲区内等待Sink的消费;
  • Sink:用于处理Channel中缓存的数据,并发送至下一层(处理框架、存储中心或者下一个Agent);
  • Agent:包含了Source、Channel、Sink等组件的Flume进程;
  • Interceptor:Event拦截器,在数据进入Channel之前根据配置要求,对数据进行头信息(Header)编辑,添加时间戳、主机名称等;
  • Selector:Event的选择器,即决定Event流入Channel的方式,主要有复制(Replicating)和复用(Multiplexing)两种选方式;
  • Sink Processor:Event Sink处理器,Flume提供了故障转移处理器和负载均衡处理器两种。

2.3 Flume核心组件:Source

Source核心功能

  • 用于对接各种数据源,并将收集到的Event发送至用于临时存储的Channel中。Flume中每个Agent对应一个Source,而每个Agent的Source在启动前必须通过修改配置变量来设置其接受消息种类,此即称为Source种类

常用Scoure种类(依据数据类型分类)

  • Avro Source:Avro是Doug Cutting牵头开发的一个数据序列化系统,设计用于支持大批量数据交换的应用。Avro Source支持Avro协议,接收RPC请求,Avro Source通过监听Avro端口接收外部Avro客户端流事件,在Flume的多层架构中经常被使用接收上游Sink发送的event。

  • Kafka Source:对接分布式消息队列Kafka,作为Kafka的消费者持续从Kafka中拉取数据,如果多个kafka source同时消费Kafka中同一个Topic,则其会被设置为同组id,从而保证多个Kafka Source之间不会重复拉取数据。

  • Exec Source:支持Linux命令,收集标准输出数据或者通过tail -f file的方式监听指定文件。Exce Source可以实现实时的消息传输,但却不会记录已读取的文件的位置,不支持断点续传,如果Exce Source重启或者挂掉都会造成后续增加的消息无法接收,建议只在测试环境中使用。

  • Spooling Directory Source:用于监听一个文件夹,收集文件夹中新文件数据,手机玩新文件数据会将文件名称的后缀改为.completed,缺点是不支持老文件(已经completed的文件)中新增数据集的收集,并且不能够对嵌套文件夹进行递归监听。

  • Taildir Source:监听一个文件或文件夹,通过正则表达式匹配需要监听的数据源文件,支持文件夹嵌套递归监听,Taildir Source通过将监听文件的位置写入到文件中,从而实现断点续传,并且能够保证没有重复的数据读取。

2.4 Flume核心组件:Channel

Channel核心功能

  • Channel是Event的临时缓冲区,存储Source收集但尚未被Sink读取的Event,其目标是为平衡Source收集速度和Sink读取速度,可视为Flume内部的消息队列。Channel线程的安全性较高且具有事务性,支持Source写失败重复写和Sink读失败重复读等操作。同时,我们根据Channel存储方式划分Channel种类。

常用Channel种类 (依据存储方式分类)

  • Memory Channel:缓冲区所有数据都存于内存,Memory Channel读写速度快,但存储量受内存限制,且当Flume进程挂掉、服务器宕机或重启时都会造成数据丢失。建议在服务器内存充足,且不关心数据丢失的场景下使用。

  • File Channel:缓冲区所有数据写入磁盘,File Channel存储容量大,无数据丢失风险。File Channel数据存储路径可以配置多个磁盘文件路径,通过磁盘并行写入提高其性能,在写入磁盘时是顺序写入,且单个数据文件大小可通过配置文件中maxFileSize参数进行调整,当被写入文件大小超过上限时,Flume会自动创建新文件用来存储后续Event。但数据文件数量不会无限增长,一旦旧文件被Sink读取完成,就将被删除。Flume通过设置检查点和备份检查点实现Agent重启之后快速将File Channel中的数据按顺序回放到内存中,以保证Agent在失败重启后仍能快速安全地提供服务。

  • Kafka Channel:值得一提的是,Kafka可作为Flume中Channel存储方式,Kafka是分布式、可扩展、高容错、高吞吐的分布式系统,Kafka通过其优秀的架构设计充分利用磁盘顺序读写特性,在廉价的硬件条件下就能完成高效的消息发布和订阅,对比其他两种Channel,Kafka Channel在读取速度、存储量和容错性上完美的弥补了二者短板,若能合理利用Kafka性能,能够达到事半功倍的效果。

2.5 Flume核心组件:Sink

Sink核心功能

  • 用于处理Channel中缓存的数据,当数据经过Sink Processor处理后由Sink进行后续处理,Sink可将数据传输至静态数据存储中心进行数据保存,也可以将数据传输至实时处理框架进行数据处理,当然,也可以将数据传输至下一层数据源,进行进一步数据聚合、整理或推送。

常用Sink种类 (依据投递接收方划分)

  • Avro Sink:Avro Sink常用于对接下一层Arvo Source,通过发送RPC请求将Event发送到下一层的Avro Source,同时,Avro Sink提供了端到端的批量压缩数据传输,从而解决RPC传输过程中占用大量网络资源以及产生大量Socket连接等问题。

  • HDFS Sink:HDFS作为Hadoop生态中的最常用文件系统,具有高容错、可扩展、高性能、低成本等特点,HDFS Sink通过将Event写入HDFS进行数据存储,能够有效、长期存储大量数据。

  • Kafka Sink:在消息传递过程中采用Kafka框架能够从很大程度上降低系统耦合度,从而增加系统系统稳定性和容错机制,Flume可通过Kafka Sink将Event写入Kafka的Topic,其他应用通过Kafka获得数据。Flume从1.7.0开始支持Kafka 0.9及以上版本。

Interceptor(拦截器) (与选择器配合使用)

Interceptor(拦截器)功能介绍

  • Source将Event写入Channel之前,可以用拦截器对Event进行添加头信息等简单处理,Source和Channel之间可设多个拦截器,不同拦截器可根据自身规则对Event进行简单处理,注意,拦截器属于轻量级插件,无法应对复杂数据处理工作。

常用Interceptor

  • 主机拦截器(Host Interceptor):Flume通过主机拦截器在Event头信息中添加主机名称或者IP。通过主机拦截器,Channel可以根据不同的主机信息分区存储Event,后续Sink也可根据不同主机信息对Event进行分别处理。

  • 静态拦截器(Static Interceptor):主要用于修改、过滤Event在此被拦截之前所设置的信息。

Selector(选择器) (与拦截器配合使用)

功能介绍

  • Source发送的Event通过Channel选择器来决定以何种方式写入Channel,Flume提供了三种常用的选择器,分别是复制Channel选择器(Replicating Channel Selector)、复用Channel选择器(Multiplexing Channel Selector)和自定义选择器。

常用选择器

  • 复制选择器( Replicating Channel Selector ):是Flume选择器的默认模式,即不对选择器进行设置时采用的模式,此时Source将以复制的方式将一个Event写入多个Channel中,不同的Sink可从不同的的Channel中获得相同的数据。复制选择器用途较多,当一个Event要做多个用途时,可考虑用复制选择器。

  • 复用Channel选择器(Multiplexing Channel Selector):复用选择器需要配合拦截器共同使用,复用选择器会根据Event的头信息来判断每个Event应该写入哪个Channel中。

💡

1
2
3
4
关于负载均衡和故障排除的相关说明:
Flume为了提高整体容错能力和稳定性,提供了负载均衡故障转移功能
这两项功能配置较为简单,只要合理配置Sink组,并且在每组Sink中设置多个
子Sink,就能够自动进行负载均衡和故障转移。

Kafka

Kafka是由LinkedIn公司开源的分布式消息队列。现已加入到Apache软件基金会,并且凭借其高吞吐、可扩展、高可用、部署简单、开发接口丰富等特性,已在各大互联网公司的实际生产环境中广泛使用。
同时,大多数分布式处理系统都支持使用kafka,如Spark、Storm、Druid、Flume等

Kafka基本特点

  • Kafka其实就是消息“中转站”
  • Kafka本身不产生消息,也不对消息进行永久存储,它只是消息的“中转站”。
  • 数据“中转站”至关重要
  • 在实际工作中,我们广泛需要消息“中转站”的原因,是数据采集端种类各异,消息格式数据格式种类繁多,同时数据应用情景也非常复杂,提取数据可能为了实时计算,可能为了存储一段时间后进行批处理,也可能直接推送至数据产品前段,也可能中间夹杂各种其他过程以及各过程之间的复用。因此,我们需要“中转站”对数据进行暂存、简单处理、再次投递。
  • Kafka作为最优秀的消息“中转站”之一,并被广泛认可,完全得益于其优秀的架构。

Kafka构架特点

  1. 生产者和消费者不需要彼此了解
  2. 消费者的性能问题不会影响生产者
  3. 消费者受到保护,免受生产者的伤害
  4. 在处理负载方面有很大的灵活性
  5. 消息可供任何人使用 -
  6. 大量的新用例,监控,审计和故障排除
  • 高吞吐率
  1. Kafka利用顺序读写磁盘的设计,提供了可以和内存随机读写相匹敌的读写速度;
  2. 其灵活的客户端API设计,利用Linux操作系统提供的“零拷贝”特性,减少了消息网络传输的时间;
  3. 同时提供端到端的消息压缩传输,对同一主题下的消息采用分布式存储;
  • 高容错、高可用

Kafka在集群模式下允许用户对每个分区(partition)配置多个副本,且将副本均匀的分到各个节点(broker)内存储,保证同一个分区的副本不会再同一台机器上存储。同时,多副本之间采用Leader-Follower机制同步消息,只有Leader对外提供读写服务,当Leader意外失败、Broker进程关闭、服务器宕机等情况导致数据不可用时,kafka会从Follower中选择一个成为Leader继续提供读写服务。

  • 可拓展

理论上Kafka的性能随着Broker的增加而增加,增加一个Broker只需为新增Broker设置一个唯一编号,配置文件编写完成后,Kafka就能通过ZooKeeper发现新的Broker,并投入使用。

  • 接藕

Kafka内部能够将消息生产阶段和处理阶段分开,两个阶段互相独立,且各自能实现自己的处理逻辑,通过Kafka提供的消息写入和消费接口实现对消息的连接处理。两个阶段相互独立,不仅降低了自身复杂度,同时也实现了对外部框架提供部分服务的功能(如Flume中嵌入Kafka)。

  • 峰值处理

实际工作中,经常会遇见数据在某个时间点爆发式增长(如双11),此时如果后台处理系统无法及时处理峰值需求,就会导致数据积压,严重时会导致系统崩溃。而若能合理使用Kafka进行数据中转,就相当于给系统接入了一个巨大的缓冲区,其既能接收持续暴增的请求,也能根据后台需求提供服务,进而提高了系统数据处理能力。

Kafka基本架构

  • Producer代表消息生产者
  • Consumer代表消息消费者
  • Broker代表Kafka集群中各节点
  • Partition表示消息的一个分区
  • ZooKeeper为Kafka集群提供资源调度服务

Kafka基本概念

  • Broker:一个Kafka的实例就是一个Broker,相当于Flume中的Agent;

  • Topic:主题,Kafka中同一类型数据集的名称,相当于数据库中的表,Producer将同一类型数据写入同一个Topic中,同一个Consumer或Consumer Group从同一个Topic中消费数据,同时,一个Topic在物理上会被分成多分存储到不同的物理机上;

  • Partition:分区,一个Topic可设置多个分区,相当于把一个数据集分成多分,存储到不同分区中进行存储(类似于HDFS),分区命名规则为topicname-index;

  • Segment:段文件,Kafka中最小存储单位,一个topic包含多个Partition,一个Partition又包含多个Segment,Segment以其内部消息的起始偏移量进行索引;

  • Offset:消息的起始偏移量,可作为Segment的索引;

  • Replication:副本,一个Partition可有一个或多个副本,创建Topic时可设置副本数量;

  • Producer:消息生产者,负责向Kafka集群中发布消息;

  • Consumer Group:消费者组,一个Consumer Group可包含一个或多个Consumer,当一个topic被一个消费者组消费的时候,一条消息只能由其中一位消费者消费,不会出现多位消费者消费同一条信息的情况;

  • Consumer:消息消费者,可从指定topic中拉取消息;

  • ZooKeeper:kafka需要ZooKeeper对其进行协调管理,安装Kafka过程将自带一个ZooKeeper。

实操流程

单节点演示

  1. 启动zoopeeper服务
    bin/zookeeper-server-start.sh config/zookeeper.properties

  2. 启动Kafka服务
    bin/kafka-server-start.sh config/server.properties

  3. 创建topic取名cdatest
    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic cdatest

  4. 查看topic
    bin/kafka-topics.sh --list --zookeeper localhost:2181

  5. 启动生产者指定topic,并在终端输入测试数据
    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic cdatest

  6. 启动消费者指定topic
    bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic cdatest --from-beginning

集群模式测试

  1. 修改参数server.properties

  2. 启动zookeeper集群服务

  3. 启动kafka集群(分别在各节点执行)
    ./bin/kafka-server-start.sh config/server.properties

  4. 任意一个节点创建topic
    ./bin/kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave2:2181 --replication-factor 2 --partitions 3 --topic kktest

  5. 查看topic状态
    ./bin/kafka-topics.sh --describe --zookeeper master:2181,slave1:2181,slave2:2181 --topic kktest

  6. 创建producer,手动输入测试数据(以master为例)
    ./bin/kafka-console-producer.sh --broker-list master:9092 --topic kktest

  7. 以slave1为例创建consumer消费数据
    ./bin/kafka-console-consumer.sh --bootstrap-server slave1:9092 --from-beginning --topic kktest

可以在不同节点创建多个生产者和消费者

使用外部数据源进行数据传输

  1. 启动Kafka,启动prodecer和consumer

  2. 在Kafka安装路径内创建test.txt然后执行:q

  3. ~/kafka/bin/connect-standalone.sh ~/kafka/config/connect-standalone.properties ~/kafka/config/connect-file-source.properties ~/kafka/config/connect-file-sink.properties

自定义模式读取外部文件时,有两个主要的配置文件
connect-file-source.properties
connect-file-sink.properties

根据需要修改文件读取模式、文件路径、Topic等

Kafka于Flume联合部署

创建flume配置文件:如kafkaSource.conf

1
# Name the components on this agent  
  a1.sources = r1  
  a1.sinks = k1  
  a1.channels = c1  
 
# Describe/configure the source  
  a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource  
  a1.sources.r1.kafka.topics = kafkaTopic
  a1.sources.r1.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092 
 
# Describe the sink  
  a1.sinks.k1.type = logger
 
# Use a channel which buffers events in memory  
  a1.channels.c1.type = memory  
  a1.channels.c1.capacity = 1000  
  a1.channels.c1.transactionCapacity = 100  
 
# Bind the source and sink to the channel  
  a1.sources.r1.channels = c1  
  a1.sinks.k1.channel = c1

启动Flume:

1
~/flume/bin/flume-ng agent --conf ~/flume/conf --conf-file ~/flume/conf/kafkaSource.conf --name a1 -Dflume.root.logger=INFO,console

创建kafkaTopic

~/kafka/bin/kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave2:2181 --replication-factor 2 --partitions 3 --topic kafkaTopic

创建producer,输入测试信息
~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic sparkMLLib

顺序出错的分析

  1. Kafka传出来的数据顺序就已经被打乱
  2. 计算资源(虽然传输的数据很微小,但是集群的启动成本高昂)