您的当前位置:首页正文

Kafaka基础快速入门

2024-11-29 来源:个人技术集锦

Kafaka基本入门

一 基本认识

1.1 消息中间件(消息队列)

  • 消息(Message):是在两台计算机间传送的数据单位。消息可以非常简单,例如只包含文本字符串;也可以更复杂,可能包含嵌入对象。

  • 队列(Queue):消息队列,用来保存消息直到发送给消费者。是一种数据结构,先进进出。

  • 消息队列的主要特点是异步处理,主要目的是减少请求响应时间和解耦。所以主要的使用场景就是将比较耗时而且不需要即时(同步)返回结果的操作作为消息放入消息队列。同时由于使用了消息队列,只要保证消息格式不变,消息的发送方和接收方并不需要彼此联系,也不需要受对方的影响,即解耦和。这也是消息中间件的意义所在。

1.2 常用消息中间件

  • ActiveMQ:是 Apache开源产品,完全支持 J M S 规范的消息中间件,是一个纯Java程序,因此只需要操作系统支持Java虚拟机,ActiveMQ便可执行。ActiveMQ可以很容易内嵌到使用Spring的系统里面去通过了常见J2EE服务器的测试。JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。其丰富的 API 、多种集群构建模式使得他成为业界老牌消息中间件,在中小型企业中应用广泛!
  • Kafka: 是由 Linkedin 公司开发的,它是一个分布式的,支持多分区、多副本,基于 Zookeeper 的分布式消息流平台,它同时也是一款开源的基于发布订阅模式的消息引擎系统。KAFKA基于TCP协议。
  • RocketMQ:阿里系下开源的一款分布式、队列模型的消息中间件,原名Metaq,3.0版本名称改为RocketMQ,是阿里参照kafka设计思想使用java实现的一套mq。同时将阿里系内部多款mq产品(Notify、metaq)进行整合,只维护核心功能,去除了所有其他运行时依赖,保证核心功能最简化,在此基础上配合阿里上述其他开源产品实现不同场景下mq的架构,目前主要多用于订单交易系统。
  • RabbitMQ:使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP,STOMP,也正是如此,使的它变的非常重量级,更适合于企业级的开发。同时实现了Broker架构,核心思想是生产者不会将消息直接发送给队列,消息在发送给客户端时先在中心队列排队。对路由(Routing),负载均衡(Load balance)、数据持久化都有很好的支持。多用于进行企业级的ESB整合。
  • ZeroMQ:号称最快的消息队列系统,专门为高吞吐量/低延迟的场景开发,在金融界的应用中经常使用,偏重于实时数据通信场景。ZMQ能够实现RabbitMQ不擅长的高级/复杂的队列,但是开发人员需要自己组合多种技术框架,开发成本高。

1.3 通信协议

  • AMQP:Advanced Message Queuing Protocol一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件的限制。

  • MQTT:(Message Queuing Telemetry Transport,消息队列遥测传输)是IBM开发的一个即时通讯协议,有可能成为物联网的重要组成部分。该协议支持所有平台,几乎可以把所有联网物品和外部连接起来,被用来当做传感器和致动器(比如通过Twitter让房屋联网)的通信协议。

  • STOMP:(Streaming Text Orientated Message Protocol)是流文本定向消息协议,是一种为MOM(Message Oriented Middleware,面向消息的中间件)设计的简单文本协议。STOMP提供一个可互操作的连接格式,允许客户端与任意STOMP消息代理(Broker)进行交互。

  • XMPP:(可扩展消息处理现场协议,Extensible Messaging and Presence Protocol)是基于可扩展标记语言(XML)的协议,多用于即时消息(IM)以及在线现场探测。适用于服务器之间的准即时操作。核心是基于XML流传输,这个协议可能最终允许因特网用户向因特网上的其他任何人发送即时消息,即使其操作系统和浏览器不同。

  • 其他:有些特殊框架(如:redis、kafka、zeroMq等)根据自身需要未严格遵循MQ规范,而是基于TCP\IP自行封装了一套协议,通过网络socket接口进行传输,实现了MQ的功能。

1.4 基本术语

Broker

消息服务器,作为server提供消息核心服务

Producer

消息生产者,业务的发起方,负责生产消息传输给broker,

Consumer

消息消费者,业务的处理方,负责从broker获取消息并进行业务逻辑处理

Topic

主题,发布订阅模式下的消息统一汇集地,不同生产者向topic发送消息,由MQ服务器分发到不同的订阅者,实现消息的 广播

Queue

队列,PTP模式下,特定生产者向特定queue发送消息,消费者订阅特定的queue完成指定消息的接收

Message

消息体,根据不同通信协议定义的固定格式进行编码的数据包,来封装业务数据,实现消息的传输

二 kafaka的基本介绍

2.1 概述

Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志、访问日志,消息服务等等,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。

主要应用场景是:日志收集系统和消息系统。

Kafka主要设计目标如下:

  • 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能。
  • 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输。
  • 支持Kafka Server间的消息分区,及分布式消费,同时保证每个partition内的消息顺序传输。
  • 同时支持离线数据处理和实时数据处理。
  • Scale out:支持在线水平扩展

2.2 消息系统介绍

一个消息系统负责将数据从一个应用传递到另外一个应用,应用只需关注于数据,无需关注数据在两个或多个应用间是如何传递的。分布式消息传递基于可靠的消息队列,在客户端应用和消息系统之间异步传递消息。有两种主要的消息传递模式:点对点传递模式、发布-订阅模式。大部分的消息系统选用发布-订阅模式。Kafka就是一种发布-订阅模式

2.3 点对点消息传递模式

在点对点消息系统中,消息持久化到一个队列中。此时,将有一个或多个消费者消费队列中的数据。但是一条消息只能被消费一次。当一个消费者消费了队列中的某条数据之后,该条数据则从消息队列中删除。该模式即使有多个消费者同时消费数据,也能保证数据处理的顺序。这种架构描述示意图如下:

生产者发送一条消息到queue,只有一个消费者能收到

2.4 发布-订阅消息传递模式

在发布-订阅消息系统中,消息被持久化到一个topic中。与点对点消息系统不同的是,消费者可以订阅一个或多个topic,消费者可以消费该topic中所有的数据,同一条数据可以被多个消费者消费,数据被消费后不会立马删除。在发布-订阅消息系统中,消息的生产者称为发布者,消费者称为订阅者。该模式的示例图如下:

发布者发送到topic的消息,只有订阅了topic的订阅者才会收到消息

三 Kafka中的术语解释

3.1 概述

在深入理解Kafka之前,先介绍一下Kafka中的术语。下图展示了Kafka的相关术语以及之间的关系:

3.2 broker

Kafka 集群包含一个或多个服务器,服务器节点称为broker。

3.3 Topic

每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic,类似于数据库的表名。

3.4 Partition

topic中的数据分割为一个或多个partition。每个topic至少有一个partition。每个partition中的数据使用多个segment文件存储。partition中的数据是有序的,不同partition间的数据丢失了数据的顺序。如果topic有多个partition,消费数据时就不能保证数据的顺序。在需要严格保证消息的消费顺序的场景下,需要将partition数目设为1。

3.5 Producer

生产者即数据的发布者,该角色将消息发布到Kafka的topic中。broker接收到生产者发送的消息后,broker将该消息追加到当前用于追加数据的segment文件中。生产者发送的消息,存储到一个partition中,生产者也可以指定数据存储的partition。

3.6 Consumer

消费者可以从broker中读取数据。消费者可以消费多个topic中的数据。

3.7 Consumer Group

每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。

3.8 Leader

每个partition有多个副本,其中有且仅有一个作为Leader,Leader是当前负责数据的读写的partition。

3.9 Follower

Follower跟随Leader,所有写请求都通过Leader路由,数据变更会广播给所有Follower,Follower与Leader保持数据同步。如果Leader失效,则从Follower中选举出一个新的Leader。

四 Kafaka的安装

4.1 Zookeeper的安装

  • 首页:
  • 安装
# 解压 
tar -zxvf apache-zookeeper-3.7.0-bin.tar.gz
# 修改配置文件
cd conf
cp coo_sample.cfg zoo.cfg
vim zoo.cfg
#启动
bin/zkServer.sh start
#查看
jps
#状态查看
bin/zkServer.sh status
#停止
bin/zkServer.sh stop
#启动客户端
bin/zkCli.sh
#退出
quit
  • tickTime = 2000:通信心跳时间,Zookeeper服务器与客户端心跳时间,单位毫秒
  • initLimit = 10:LF初始通信时限,Leader和Follower初始连接时能容忍的最多心跳数(tickTime的数量)
  • syncLimit = 5:LF同步通信时限,Leader和Follower之间通信时间如果超过syncLimit * tickTime,Leader认为Follwer死 掉,从服务器列表中删除Follwer
  • dataDir:保存Zookeeper中的数据,注意:默认的tmp目录,容易被Linux系统定期删除,所以一般不用默认的tmp目录。
  • clientPort = 2181:客户端连接端口,通常不做修改。

4.2 Kafka的安装

  • 官网:
#解压
tar -zxvf kafka_2.11-2.4.0.tgz
#修改配置文件
cd config
vim server.properties
# 修改以下配置
#broker.id属性在kafka集群中必须要是唯⼀
broker.id=0
#kafka部署的机器ip和提供服务的端⼝号(内网)
#listeners=PLAINTEXT://服务器地址:9092 
#阿里云外网
advertised.listeners=PLAINTEXT://阿里云地址:9092
#kafka的消息存储⽂件
log.dir=/usr/local/data/kafka-logs
#kafka连接zookeeper的地址
zookeeper.connect=192.168.65.60:2181
#是否可以删除
delete.topic.enable=true
# 启动
cd bin
./kafka-server-start.sh -daemon ../config/server.properties
# 检查是否启动
jps
#查看端口问题
netstat -an | grep 9092
#或者
lsof -i:9092
# 防火墙开发端口
firewall-cmd --zone=public --add-port=9092/tcp --permanent
firewall-cmd --reload
#停止kafka
./kafka-server-stop.sh ../config/server.properties

4.3 基本命令

注:这些命令我们不需要记,因为我们是在代码中完成这些命令

4.3.1 创建topic
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my
4.3.2 查看创建的topic
./kafka-topics.sh --list --zookeeper localhost:2181
4.3.3 删除某个topic

删除topic的前提是需要将kafka的消费者和生产者停止

 ./kafka-topics.sh --delete --zookeeper localhost:2181 --topic my
4.3.4 查看某个topic的信息
./kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic: my-replicated-topic	PartitionCount: 1	ReplicationFactor: 1	Configs: 
Topic: my-replicated-topic	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
4.3.5 发送消息
./kafka-console-producer.sh --broker-list 服务器地址:9092 --topic my
4.3.6 接受消息
# 重头消费
./kafka-console-consumer.sh --bootstrap-server 服务器地址:9092 --topic my --from-beginning
# :从当前主题中的最后⼀条消息的offset(偏移量位置)+1开始消费
./kafka-console-consumer.sh --bootstrap-server 服务器地址:9092 --topic my 
4.3.7 消息的有序性
  • ⽣产者将消息发送给broker,broker会将消息保存在本地的⽇志⽂件中
  • 消息的保存是有序的,通过offset偏移量来描述消息的有序性
  • 消费者消费消息时也是通过offset来描述当前要消费的那条消息的位置

4.4 消费者组

4.4.1 单播消费

在⼀个kafka的topic中,启动两个消费者,⼀个⽣产者,问:⽣产者发送消息,这条消息是否 同时会被两个消费者消费? 如果多个消费者在同⼀个消费组,那么只有⼀个消费者可以收到订阅的topic中的消息。换⾔ 之,同⼀个消费组中只能有⼀个消费者收到⼀个topic中的消息。

./kafka-console-consumer.sh --bootstrap-server 服务器地址:9092 --consumer-property group.id=testGroup  --topic my --from-beginning
4.4.2 多播消费

不同的消费组订阅同⼀个topic,那么不同的消费组中只有⼀个消费者能收到消息。实际上也 是多个消费组中的多个消费者收到了同⼀个消息。

./kafka-console-consumer.sh --bootstrap-server 服务器地址:9092 --consumer-property group.id=testGroup01  --topic my --from-beginning
./kafka-console-consumer.sh --bootstrap-server 服务器地址:9092 --consumer-property group.id=testGroup02  --topic my --from-beginning
4.4.3 查看消费组的信息
./kafka-consumer-groups.sh --bootstrap-server 服务器地海:9092 --describe --group testGroup

重点关注以下⼏个信息:

  • current-offset: 最后被消费的消息的偏移量
  • Log-end-offset: 消息总量(最后⼀条消息的偏移量)
  • Lag:积压了多少条消息

五 Kafka中主题和分区的概念

5.1 主题

主题-topic在kafka中是⼀个逻辑的概念,kafka通过topic将消息进⾏分类。不同的topic会被 订阅该topic的消费者消费。 但是有⼀个问题,如果说这个topic中的消息⾮常⾮常多,多到需要⼏T来存,因为消息是会被 保存到log⽇志⽂件中的。为了解决这个⽂件过⼤的问题,kafka提出了Partition分区的概念

5.2 分区

通过partition将⼀个topic中的消息分区来存储。这样的好处有多个:

  • 分区存储,可以解决统⼀存储⽂件过⼤的问题

  • 提供了读写的吞吐量:读和写可以同时在多个分区中进⾏

./kafka-topics.sh --create --zookeeper localhost:2181 --replicationfactor 1 --partitions 2 --topic test

5.3 日志信息

  • 00000.log: 这个⽂件中保存的就是消息
  • __consumer_offsets-49: kafka内部⾃⼰创建了__consumer_offsets主题包含了50个分区。这个主题⽤来存放消费 者消费某个主题的偏移量。因为每个消费者都会⾃⼰维护着消费的主题的偏移量,也就是 说每个消费者会把消费的主题的偏移量⾃主上报给kafka中的默认主题: consumer_offsets。
  • 因此kafka为了提升这个主题的并发性,默认设置了50个分区。 提交到哪个分区:通过hash函数:hash(consumerGroupId) % __consumer_offsets 主题的分区数 提交到该主题中的内容是:key是consumerGroupId+topic+分区号,value就是当前 offset的值 ⽂件中保存的消息,默认保存7天。
  • 七天到后消息会被删除。

六 Kafka集群的搭建

6.1 Zookeeper集群的搭建

  • 注意开放端口,以及关闭防火墙

  • ip:2181,ip:2182,ip:2183

  • 修改配置文件

cd conf
#修改配置文件
vim zoo.cfg
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=/environment/zookeeper/apache-zookeeper-3.6.3-bin/data_log
# the port at which the clients will connect
clientPort=2182
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1

## Metrics Providers
#
# https://prometheus.io Metrics Exporter
#metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
#metricsProvider.httpPort=7000
#metricsProvider.exportJvmInfo=true
server.1=ip1:2888:3888
server.2=ip2:2888:3888
server.3=ip3:2888:3888
quorumListenOnAllIPs=true
#启动zookeeper,修改其他机器的配置文件
bin/zkServer.sh start、
# 等待一下,查看选举状态
bin/zkServer.sh status
[root@shu apache-zookeeper-3.6.3-bin]# bin/zkServer.sh status
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /environment/zookeeper/apache-zookeeper-3.6.3-bin/bin/../conf/zoo.cfg
Client port found: 2182. Client address: localhost. Client SSL: false.
Mode: leader
[root@shu apache-zookeeper-3.6.3-bin]#
# 问题:端口开发问题,防火墙问题
# 防火墙开发端口
firewall-cmd --zone=public --add-port=2182/tcp --permanent
firewall-cmd --reload
#关闭防火墙
systemctl stop firewalld

6.2 Kafka集群的搭建

  • 注意开放端口,以及关闭防火墙

  • ip:9092,ip:9093,ip:9094

  • 修改配置文件

cd config
#修改配置文件
vim server.properties
#修改zookeeper连接
zookeeper.connect=ip:2181,ip:2182,ip:2183
# 分布修改三台的机器的配置文件,并启动
#broker.id属性在kafka集群中必须要是唯⼀
broker.id=0
./kafka-server-start.sh -daemon ../config/server.properties
# 检查是否启动
jps
#查看端口问题
netstat -an | grep 9092
#或者
lsof -i:9092
# 防火墙开发端口
firewall-cmd --zone=public --add-port=9092/tcp --permanent
firewall-cmd --reload
#停止kafka
./kafka-server-stop.sh ../config/server.properties
# 验证,我们在lead机器上面创建一个topic
 ./kafka-topics.sh --create --zookeeper localhost:2182 --replication-factor 1 --partitions 1 --topic my
#查看其余机器上的topic
[root@xlc bin]# ./kafka-topics.sh --list --zookeeper localhost:2183
my
[root@shu bin]# ./kafka-topics.sh --list --zookeeper localhost:2181
my

6.3 副本的概念

副本是为了为主题中的分区创建多个备份,多个副本在kafka集群的多个broker中,会有⼀个 副本作为leader,其他是follower(就是备份)

# 创建topic
./kafka-topics.sh --create --zookeeper localhost:2182 --replication-factor 3 --partitions 2 --topic my-replicated-topic
# 查看topic详细信息
./kafka-topics.sh --describe --zookeeper localhost:2182 --topic my-replicated-topic
[root@shu bin]# ./kafka-topics.sh --create --zookeeper localhost:2182 --replication-factor 3 --partitions 2 --topic my-replicated-topic
Created topic my-replicated-topic.
[root@shu bin]# ./kafka-topics.sh --describe --zookeeper localhost:2182 --topic my-replicated-topic
Topic: my-replicated-topic	PartitionCount: 2	ReplicationFactor: 3	Configs: 
	Topic: my-replicated-topic	Partition: 0	Leader: 2	Replicas: 2,1,0	Isr: 2,1,0
	Topic: my-replicated-topic	Partition: 1	Leader: 0	Replicas: 0,2,1	Isr: 0,2,1
  • leader: kafka的写和读的操作,都发⽣在leader上。

  • leader负责把数据同步给follower。当leader挂 了,经过主从选举,从多个follower中选举产⽣⼀个新的leader follower 接收leader的同步的数据

  • isr:可以同步和已同步的节点会被存⼊到isr集合中。这⾥有⼀个细节:如果isr中的节点性能较差,会被提出isr集合。

  • 集群中有多个broker,创建主题时可以指明主题有多个分区(把消息拆分到不同的分区中存 储),可以为分区创建多个副本,不同的副本存放在不同的broker⾥。

6.4 集群消费

  • 我们在领导服务器中,创建主体,发送消息
# 创建topic
./kafka-topics.sh --create --zookeeper localhost:2182 --replication-factor 3 --partitions 2 --topic my-replicated-topic
# 查看topic信息
./kafka-topics.sh --describe --zookeeper localhost:2182 --topic my-replicated-topic
# 创建消息
./kafka-console-producer.sh --broker-list ip:9093 --topic my-replicated-topic
>nihao
  • 其余机器接受消息
./kafka-console-consumer.sh --bootstrap-server ip:9092 --topic my-replicated-topic
./kafka-console-consumer.sh --bootstrap-server ip:9093 --topic my-replicated-topic
  • 集群消费组命令,参考前面的消费者组命令
  • ⼀个partition只能被⼀个消费组中的⼀个消费者消费,⽬的是为了保证消费的顺序性,但 是多个partion的多个消费者消费的总的顺序性是得不到保证的,那怎么做到消费的总顺 序性呢?
  • partition的数量决定了消费组中消费者的数量,建议同⼀个消费组中消费者的数量不要超 过partition的数量,否则多的消费者消费不到消息

6.5 集群中的controller

  • 集群中谁来充当controller 每个broker启动时会向zk创建⼀个临时序号节点,获得的序号最⼩的那个broker将会作为集 群中的controller,
  • 负责这么⼏件事: 当集群中有⼀个副本的leader挂掉,需要在集群中选举出⼀个新的leader,选举的规则是 从isr集合中最左边获得。
  • 当集群中有broker新增或减少,controller会同步信息给其他broker 当集群中有分区新增或减少,controller会同步信息给其他broker

6.6 rebalance机制

  • 前提:消费组中的消费者没有指明分区来消费 触发的条件:当消费组中的消费者和分区的关系发⽣变化的时候
  • 分区分配的策略:在rebalance之前,分区怎么分配会有这么三种策略
  • range:根据公示计算得到每个消费消费哪⼏个分区:前⾯的消费者是分区总数/消费 者数量+1,之后的消费者是分区总数/消费者数量
  • 轮询:⼤家轮着来
  • sticky:粘合策略,如果需要rebalance,会在之前已分配的基础上调整,不会改变之 前的分配情况。如果这个策略没有开,那么就要进⾏全部的重新分配。建议开启。

6.8 HW和LEO

  • LEO是某个副本最后消息的消息位置(log-end-offset)
  • HW是已完成同步的位置。消息在写⼊broker时,且每个broker完成这条消息的同步后,hw 才会变化。在这之前消费者是消费不到这条消息的。
  • 在同步完成之后,HW更新之后,消费者 才能消费到这条消息,这样的⽬的是防⽌消息的丢失。

七 代码中的实现

7.1 消息提供者

7.1 .1 Java消息提供者代码中的实现
  • 依赖
		<dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.4.1</version>
        </dependency>
  • 代码
/**
 * @Author shu
 * @Date: 2021/10/22/ 16:25
 * @Description
 **/
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class MySimpleProducer {
    private final static String TOPIC_NAME = "my-replicated-topic";
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //1.设置参数
        Properties props = new Properties();
        //领导者主机
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "ip:9093");
        //把发送的key从字符串序列化为字节数组
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //把发送消息value从字符串序列化为字节数组
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //2.创建⽣产消息的客户端,传⼊参数
        Producer<String,String> producer = new KafkaProducer<String, String>(props);
        //3.创建消息
        //key:作⽤是决定了往哪个分区上发,value:具体要发送的消息内容
        ProducerRecord<String,String> producerRecord = new ProducerRecord<>(TOPIC_NAME,"value","hello-kafka-ok");
        //4.发送消息,得到消息发送的元数据并输出
        RecordMetadata metadata = producer.send(producerRecord).get();
        System.out.println( "topic-" + metadata.topic() + "|partition-" + metadata.partition() + "|offset-" + metadata.offset());
    }
}
  • 查看结果

可以发现我们的消费者已经收到了消息

7.1.2 ⽣产者中的ack的配置
  • ack = 0 kafka-cluster:不需要任何的broker收到消息,就⽴即返回ack给⽣产者,最容易 丢消息的,效率是最⾼的

  • ack=1(默认): 多副本之间的leader已经收到消息,并把消息写⼊到本地的log中,才 会返回ack给⽣产者,性能和安全性是最均衡的

  • ack=-1/all:⾥⾯有默认的配置min.insync.replicas=2(默认为1,推荐配置⼤于等于2), 此时就需要leader和⼀个follower同步完后,才会返回ack给⽣产者(此时集群中有2个 broker已完成数据的接收),这种⽅式最安全,但性能最差。

props.put(ProducerConfig.ACKS_CONFIG, "1");
 /*
 发送失败会重试,默认重试间隔100ms,重试能保证消息发送的可靠性,但是也可能造
成消息重复发送,⽐如⽹络抖动,所以需要在
 接收者那边做好消息接收的幂等性处理
 */
 props.put(ProducerConfig.RETRIES_CONFIG, 3);
 //重试间隔设置
 props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);
  • producer 先从 zookeeper 的 “/brokers/…/state” 节点找到该 partition 的 leader

  • producer 将消息发送给该 leader

  • leader 将消息写入本地 log

  • followers 从 leader pull 消息,写入本地 log 后 leader 发送 ACK

  • leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset) 并向 producer 发送 ACK

异步

异步发送,⽣产者发送完消息后就可以执⾏之后的业务,broker在收到消息后异步调⽤⽣产 者提供的callback回调⽅法。但是容易造成消息丢失

//异步发送消息
producer.send(producerRecord, new Callback() {
 public void onCompletion(RecordMetadata metadata, Exception
exception) {
 if (exception != null) {
 System.err.println("发送消息失败:" +
exception.getStackTrace());
 }
 if (metadata != null) {
 System.out.println("异步⽅式发送消息结果:" + "topic-" +
metadata.topic() + "|partition-"
 + metadata.partition() + "|offset-" + metadata.offset());
 }
 }
 });
7.1.3 消息缓冲区

  • kafka默认会创建⼀个消息缓冲区,⽤来存放要发送的消息,缓冲区是32m
  • kafka本地线程会去缓冲区中⼀次拉16k的数据,发送到broker
  • 如果线程拉不到16k的数据,间隔10ms也会将已拉到的数据发到broker 七、Java客户端消费者的实现细节
 		//缓存区默认大小
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
        //拉取数据默认大小
        props.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
        //如果数据未满16k,也提交
        props.put(ProducerConfig.LINGER_MS_CONFIG,10);

7.2 消息消费者

7.2.1 java客服端基本实现
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

/**
 * @Author shu
 * @Date: 2021/10/25/ 15:09
 * @Description 消费者
 **/
public class MySimpleConsumer {
    //主题名
    private final static String TOPIC_NAME = "my-replicated-topic";
    //分组
    private final static String CONSUMER_GROUP_NAME = "testGroup";

    public static void main(String[] args) {
        Properties props =new Properties();
        //消息地址
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "47.104.223.187:9093");
        //分组
        props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);
        //序列化
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        //自动提交,拉取到信息之后,立马提交偏移量给consumer_offset,保证顺序消费,但是会造成消息丢失问题
//        // 是否⾃动提交offset,默认就是true
//        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
//        // ⾃动提交offset的间隔时间
//        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        //手动提交,当消费者消费消息完毕之后,返回偏移量
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        //⼀次poll最⼤拉取消息的条数,可以根据消费速度的快慢来设置
        // props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
        //props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);
        //1.创建⼀个消费者的客户端
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        //2. 消费者订阅主题列表
        consumer.subscribe(Arrays.asList(TOPIC_NAME));
        while (true) {
            /*
             * 3.poll() API 是拉取消息的⻓轮询
             */
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {
                //4.打印消息
                System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(),
                record.offset(), record.key(), record.value());
            }
            //所有的消息已消费完
            if (records.count() > 0) {//有消息
                // ⼿动同步提交offset,当前线程会阻塞直到offset提交成功
                // ⼀般使⽤同步提交,因为提交之后⼀般也没有什么逻辑代码了
                consumer.commitSync();//=======阻塞=== 提交成功
            }
        }
    }
}
7.2.1 自动提交与手动提交
  • 消费者⽆论是⾃动提交还是⼿动提交,都需要把所属的消费组+消费的某个主题+消费的某个 分区及消费的偏移量,这样的信息提交到集群的_consumer_offsets主题⾥⾯,保证顺序。
  • 自动提交:消费者poll消息下来以后就会⾃动提交offset,但是会造成消失丢失。
      //自动提交,拉取到信息之后,立马提交偏移量给consumer_offset,保证顺序消费,但是会造成消息丢失问题
//        // 是否⾃动提交offset,默认就是true
//        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
//        // ⾃动提交offset的间隔时间
//        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
  • 手动提交:当消费者消费完毕之后,提交偏移量给_consumer_offsets
		 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        //⼀次poll最⼤拉取消息的条数,可以根据消费速度的快慢来设置
        // props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
        //props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);
        while (true) {
            /*
             * 3.poll() API 是拉取消息的⻓轮询
             */
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {
                //4.打印消息
                System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(),
                record.offset(), record.key(), record.value());
            }
            //所有的消息已消费完
            if (records.count() > 0) {//有消息
                // ⼿动同步提交offset,当前线程会阻塞直到offset提交成功
                // ⼀般使⽤同步提交,因为提交之后⼀般也没有什么逻辑代码了
                consumer.commitSync();//=======阻塞=== 提交成功
            }
        }
7.2.3 ⻓轮询poll消息
  • 默认情况下,消费者⼀次会poll500条消息。
//⼀次poll最⼤拉取消息的条数,可以根据消费速度的快慢来设置
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
//⼀次poll最⼤拉取消息的条数,可以根据消费速度的快慢来设置
 props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
 //如果两次poll的时间如果超出了30s的时间间隔,kafka会认为其消费能⼒过弱,将其踢
出消费组。将分区分配给其他消费者。-rebalance
 props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);
while (true) {
 /*
 * poll() API 是拉取消息的⻓轮询
 */
 ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1000));
 for (ConsumerRecord<String, String> record : records) {
 System.out.printf("收到消息:partition = %d,offset = %d, key = %s,
value = %s%n", record.partition(),
 record.offset(), record.key(), record.value());
 }
  • 如果⼀次poll到500条,就直接执⾏for循环 如果这⼀次没有poll到500条。
  • 且时间在1秒内,那么⻓轮询继续poll,要么到500 条,要么到1s 如果多次poll都没达到500条,且1秒时间到了,那么直接执⾏for循环
  • 如果两次poll的间隔超过30s,集群会认为该消费者的消费能⼒过弱,该消费者被踢出消 费组,触发rebalance机制,rebalance机制会造成性能开销。可以通过设置这个参数, 让⼀次poll的消息条数少⼀点
7.2.4 心跳检查

消费者每隔1s向kafka集群发送⼼跳,集群发现如果有超过10s没有续约的消费者,将被踢出 消费组,触发该消费组的rebalance机制,将该分区交给消费组⾥的其他消费者进⾏消费。

//consumer给broker发送⼼跳的间隔时间
 props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
 //kafka如果超过10秒没有收到消费者的⼼跳,则会把消费者踢出消费组,进⾏
rebalance,把分区分配给其他消费者。
 props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);
7.2.5 指定分区和偏移量、时间消费
  • 分区消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
  • 从头消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME,
0)));
  • 指定offset消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);
  • 指定时间消费,根据时间,去所有的partition中确定该时间对应的offset,然后去所有的partition中找到该 offset之后的消息开始消费。
List<PartitionInfo> topicPartitions =
consumer.partitionsFor(TOPIC_NAME);
 //从1⼩时前开始消费
 long fetchDataTime = new Date().getTime() - 1000 * 60 * 60;
 Map<TopicPartition, Long> map = new HashMap<>();
 for (PartitionInfo par : topicPartitions) {
 map.put(new TopicPartition(TOPIC_NAME, par.partition()),
fetchDataTime);
 }
 Map<TopicPartition, OffsetAndTimestamp> parMap =
consumer.offsetsForTimes(map);
 for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry :
parMap.entrySet()) {
 TopicPartition key = entry.getKey();
 OffsetAndTimestamp value = entry.getValue();
 if (key == null || value == null) continue;
 Long offset = value.offset();
 System.out.println("partition-" + key.partition() +
"|offset-" + offset);
 System.out.println();
 //根据消费⾥的timestamp确定offset
 if (value != null) {
 consumer.assign(Arrays.asList(key));
 consumer.seek(key, offset);
 }
 }

7.6 SpringBoot中代码的实现

  • 依赖导入
<dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
  • 配置文件编写
server.port=8080
#########kafka配置#############
# lead机器
spring.kafka.bootstrap-servers=ip:9093
#########producer############
# ack
spring.kafka.producer.acks=1
# 拉取大小
spring.kafka.producer.batch-size=16384
# 重试次数
spring.kafka.producer.retries=10
# 缓冲区大小
spring.kafka.producer.buffer-memory=33554432
# 序列化
spring.kafka.producer.key-serializer= org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#########consumer############
# 关闭自动提交
spring.kafka.consumer.enable-auto-commit=false
# 消费组
spring.kafka.consumer.group-id=default-group
#
spring.kafka.consumer.auto-offset-reset=earliest
# 反序列化
spring.kafka.consumer.key-deserializer= org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer= org.apache.kafka.common.serialization.StringDeserializer
# 最大消息
spring.kafka.consumer.max-poll-records=500

spring.kafka.listener.ack-mode=manual_immediate
# redis
spring.redis.host=ip
  • 服务端
package com.demo.demo;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Author shu
 * @Date: 2021/10/27/ 16:45
 * @Description
 **/
@RestController
public class KafkaProvide {
    private final static String TOPIC_NAME = "my-replicated-topic";
    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

    @RequestMapping("/send")
    public String sendMessage(){
        kafkaTemplate.send(TOPIC_NAME,0,"key","this is a message!");
        return "send success!";
    }
}

  • 消费端
package com.demo.demo;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.PartitionOffset;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

/**
 * @Author shu
 * @Date: 2021/10/27/ 16:46
 * @Description
 **/
@Component
public class KafkaConsumer {
    /**
     * 单条消息消费
     * @param record
     * @param ack
     */
    @KafkaListener(topics = "my-replicated-topic",groupId = "MyGroup1")
    public void listenGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String value = record.value();
        System.out.println(value);
        System.out.println(record);
        //⼿动提交offset
        ack.acknowledge();
    }


    /**
     * 其他分区消费配置
     * @param record
     * @param ack
     */
    @KafkaListener(groupId = "testGroup", topicPartitions = {
            @TopicPartition(topic = "topic1", partitions = {"0", "1"}),
            @TopicPartition(topic = "topic2", partitions = "0",
                    partitionOffsets = @PartitionOffset(partition = "1",
                            initialOffset = "100"))
    },concurrency = "3")//concurrency就是同组下的消费者个数,就是并发消费数,建议⼩于等于分区总数
    public void listenGroupPro(ConsumerRecord<String, String> record,
                               Acknowledgment ack) {
        String value = record.value();
        System.out.println(value);
        System.out.println(record);
        //⼿动提交offset
        ack.acknowledge();
    }
}
package com.demo.demo.pojo;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;
import java.util.Date;

/**
 * @Author shu
 * @Date: 2021/10/29/ 9:49
 * @Description 消息实体类
 **/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class MsgInfo implements Serializable {
    private Long id;
    private String name;
    private Long msg;
    private Date time;
}



package com.demo.demo.kafka;

import com.demo.demo.pojo.MsgInfo;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

/**
 * @Author shu
 * @Date: 2021/10/28/ 19:55
 * @Description
 **/
@Component
public class KafkaTest {
    //topic
    private final static String TOPIC_NAME = "my-replicated-topic";
    //程序执行的初始时间,只会保留一份
    private static final AtomicLong lastRecieveMessage = new AtomicLong(System.currentTimeMillis());
    //时间转换
    private static final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    //前缀
    private static final String KEY_PREFIX = "test";
    //缓存
    private final List<ConsumerRecord<String,String>> DataList = new ArrayList<>();
    //json
    private final Gson gson = new GsonBuilder().create();
    //kafka
    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

    /**
     * 消息接受者(每隔1分钟执行)
     */
    @Scheduled(cron = "0 */1 * * * ?")
    public void Consumer() {
        long last = lastRecieveMessage.get();
        long current = System.currentTimeMillis();
        if ((current - last) > (60 * 1000)){
            System.out.println(DataList);
            for (ConsumerRecord<String, String> consumerRecord : DataList) {
                MsgInfo info = gson.fromJson(consumerRecord.value(), MsgInfo.class);
                System.out.println("消息:"+info);
            }
            DataList.clear();

        }
    }



    /**
     * 消息发送者(30s执行一次)
     */
    @Scheduled(cron = "0/30 * * * * ? ")
    public void Provide(){
        long last = lastRecieveMessage.get();
        long current = System.currentTimeMillis();
        if ((current - last) > (30 * 1000) ){
            MsgInfo msgInfo=new MsgInfo(current-last,"测试",last,new Date());
            kafkaTemplate.send(TOPIC_NAME,"test",gson.toJson(msgInfo));
        }

    }


    /**
     * 单条消费
     * @param record
     * @param ack
     */
    @KafkaListener(topics = TOPIC_NAME,groupId = "MyGroup1")
    public void listenGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
        DataList.add(record);
        //⼿动提交offset
        ack.acknowledge();
    }
}

显示全文