您的当前位置:首页正文

Flume+Kafka实现日志文件流处理

2024-11-18 来源:个人技术集锦


一、为什么要集成Flume和Kafka

 我们很多人在在使用Flume和kafka时,都会问一句为什么要将Flume和Kafka集成?那首先就应该明白业务需求,一般使用Flume+Kafka架构都是希望完成实时流式的日志处理,后面再连接上Flink/Storm/Spark Streaming等流式实时处理技术,从而完成日志实时解析的目标。第一、如果Flume直接对接实时计算框架,当数据采集速度大于数据处理速度,很容易发生数据堆积或者数据丢失,而kafka可以当做一个消息缓存队列,从广义上理解,把它当做一个数据库,可以存放一段时间的数据。第二、Kafka属于中间件,一个明显的优势就是使各层解耦,使得出错时不会干扰其他组件。

 因此数据从数据源到flume再到Kafka时,数据一方面可以同步到HDFS做离线计算,另一方面可以做实时计算,可实现数据多分发。

二、概念剖析Flume+Kafka

 Source:Flume 搜集日志的方式多种多样,比如可以检测文件夹的变化spool Source,可以监测端口信息 Netcat Source,可以监控某各文件新增的内容 Exec Source等等,通常使用检测文件夹变化的方式来实时收集信息,所以本例中我们也将使用Spool Source。
 Channel:提供了一层缓冲机制,来实现数据的事务性传输,最大限度保证数据的安全传输。常用的有MemoryChannel:所有的events 被保存在内存中,优点是高吞吐,缺点是容量有限并且Agent 死掉时会丢失内存中的数据;FileChannel:所有的Events 被保存在文件中,优点是容量较大且死掉时数据可恢复,缺点是速度较慢。因此为了保证Event 在数据流点对点传输中是可靠地,要注意Channel 的选择。目前为了提高速度,我们暂时采用MemoryChannel,之后的目标是实现一个自定义channel—doubleChannel,解决上述的两个痛点问题。
 Sink:将数据转发到目的地,或者继续将数据转发到另外一个source,实现接力传输,多层之间通过AVRO Sink来实现。本例中,我们的最终目标是实现日志实时处理,因此实时的采集数据流就把数据发送到Kafka 中。
那么小结一下,使用的是对文件夹中文件变化进行监测的Spooling DirectorySource,channel 是用的MemoryChannel,sink 是自定义的kafkasink,用于向kafka 发送数据。

 Kafka 是由LinkedIn 开发的开源分布式消息系统,主要用于处理LinkedIn 的活跃数据,说白了也就是用户访问日志数据。这些数据主要包括PV、UV、用户行为(登陆、浏览、搜索、分享、点击)、系统运行日志(CPU、内存、磁盘、进程、网络)等方面的数据。这些数据通常以日志的形式进行存储,现有的消息队列系统可以很好的用于日志分析系统对于实时数据的处理,提高日志解析效率。那么说到Kafka,就必须掌握三个原理部分:Producer、Topic、Consumer:

 Producer:消息和数据的生产者,向Kafka的一个topic发布消息的过程即为生产过程,在本例中Flume应该是Producer;
Topic:主题,Kafka处理的消息的不同分类(逻辑概念),可以根据Topic的不同,去区分处理不同的消息。说的更直白一些,Topic就是起到资源隔离的作用,Producer向指定Topic中产生消息,Consumer再从指定的Topic中消费消息。
 Consumer:消息和数据的消费者,订阅topic并处理其发布的消息的过程即为消费过程。
提示:以下是本篇文章正文内容,下面案例可供参考

三、Flume+Kafka实战(详细步骤)

 3.1、Netcat Source + Kafka Sink
首先进行Netcat Source,这个source 我们可以用来进行测试,最简单也是最直观,在被监控的端口输入测试消息,连接kafka之后便可在consumer界面上看到提示的测试信息。

2.将以下内容粘贴进去
#example.conf: A single-node flume configuration
#test kafka sink with spooldir source
 
#Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
 
#Describe/configue the source
a1.sources.r1.type = spooldir
a1.sources.r1.channels = c1
a1.sources.r1.spoolDir = /root/logs
a1.sources.r1.fileHeader = true
 
#Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
#设置kafka的主题topic
a1.sinks.k1.topic = t3
#设置消费者编码为UTF-8
a1.sinks.k1.custom.encoding=UTF-8
#绑定kafka主机以及端口号
a1.sinks.k1.kafka.bootstrap.servers = a:9092,b:9092,c:9092
#设置kafka序列化方式
a1.sinks.k1.serializer.class = kafka.serializer.StringEncoder
 
 
#use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
 
#Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3.创建文件夹/root/logs,在logs里创建文件log-1,log-2,两个文件里面的内容随便写
4.启动kafka

cd /opt/dtc/software/kafka
bin/kafka-server-start.sh config/server.properties

5.kafka创建topic叫t3

kafka-topics.sh --zookeeper a:2181,b:2181,c:2181 --create --topic t3 --partitions 1 --replication-factor 1

6.启动Kafka Consumer,指定topic是t3

kafka-console-consumer.sh --bootstrap-server a:9092,b:9092,c:9092 --topic t3 --from-beginning

7.启动Flume,并指定配置文件

flume-ng agent -n a1 -c /opt/dtc/software/flume/conf/ -f /opt/dtc/software/flume/conf/kafka_spool.conf  -Dflume.root.logger=INFO,console

 3.2、Exec Source(文件souce)+ Kafka Sink
1.在logs文件夹下创建log-3,随便写入内容
2.新建flume的conf文件exec.conf


#Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#Describe/configue the source
a1.sources.r1.type = exec
a1.sources.r1.channels = c1
a1.sources.r1.command  = tail -F /root/logs/log-3

#Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
#设置kafka的主题topic
a1.sinks.k1.topic = t3
#设置消费者编码为UTF-8
a1.sinks.k1.custom.encoding=UTF-8
#绑定kafka主机以及端口号
a1.sinks.k1.kafka.bootstrap.servers = a:9092,b:9092,c:9092
#设置kafka序列化方式
a1.sinks.k1.serializer.class = kafka.serializer.StringEncoder


#use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3.剩下的更上面3.1一样,向log-3文件中不断写入内容,看kafka consumer变化
启动

cd /opt/dtc/software/flume/conf/
flume-ng agent -n a1 -c ../conf/ -f exec.conf  -Dflume.root.logger=INFO,console

总结

多实践

显示全文