消息(Message):是在两台计算机间传送的数据单位。消息可以非常简单,例如只包含文本字符串;也可以更复杂,可能包含嵌入对象。
队列(Queue):消息队列,用来保存消息直到发送给消费者。是一种数据结构,先进进出。
消息队列的主要特点是异步处理,主要目的是减少请求响应时间和解耦。所以主要的使用场景就是将比较耗时而且不需要即时(同步)返回结果的操作作为消息放入消息队列。同时由于使用了消息队列,只要保证消息格式不变,消息的发送方和接收方并不需要彼此联系,也不需要受对方的影响,即解耦和。这也是消息中间件的意义所在。
ActiveMQ
:是 Apache开源产品,完全支持 J M S 规范的消息中间件,是一个纯Java程序,因此只需要操作系统支持Java虚拟机,ActiveMQ
便可执行。ActiveMQ
可以很容易内嵌到使用Spring
的系统里面去通过了常见J2EE
服务器的测试。JMS
即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API
,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。其丰富的 API
、多种集群构建模式使得他成为业界老牌消息中间件,在中小型企业中应用广泛!Kafka
: 是由 Linkedin
公司开发的,它是一个分布式的,支持多分区、多副本,基于 Zookeeper
的分布式消息流平台,它同时也是一款开源的基于发布订阅模式的消息引擎系统。KAFKA
基于TCP协议。RocketMQ
:阿里系下开源的一款分布式、队列模型的消息中间件,原名Metaq
,3.0版本名称改为RocketMQ
,是阿里参照kafka
设计思想使用java
实现的一套mq
。同时将阿里系内部多款mq
产品(Notify、metaq
)进行整合,只维护核心功能,去除了所有其他运行时依赖,保证核心功能最简化,在此基础上配合阿里上述其他开源产品实现不同场景下mq
的架构,目前主要多用于订单交易系统。RabbitMQ
:使用Erlang
编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP,STOMP
,也正是如此,使的它变的非常重量级,更适合于企业级的开发。同时实现了Broker架构,核心思想是生产者不会将消息直接发送给队列,消息在发送给客户端时先在中心队列排队。对路由(Routing),负载均衡(Load balance)、数据持久化都有很好的支持。多用于进行企业级的ESB
整合。ZeroMQ
:号称最快的消息队列系统,专门为高吞吐量/低延迟的场景开发,在金融界的应用中经常使用,偏重于实时数据通信场景。ZMQ
能够实现RabbitMQ
不擅长的高级/复杂的队列,但是开发人员需要自己组合多种技术框架,开发成本高。AMQP
:Advanced Message Queuing Protocol
一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件的限制。
MQTT
:(Message Queuing Telemetry Transport,消息队列遥测传输)
是IBM开发的一个即时通讯协议,有可能成为物联网的重要组成部分。该协议支持所有平台,几乎可以把所有联网物品和外部连接起来,被用来当做传感器和致动器(比如通过Twitter让房屋联网)的通信协议。
STOMP
:(Streaming Text Orientated Message Protocol)
是流文本定向消息协议,是一种为MOM(Message Oriented Middleware
,面向消息的中间件)设计的简单文本协议。STOMP提供一个可互操作的连接格式,允许客户端与任意STOMP消息代理(Broker)进行交互。
XMPP
:(可扩展消息处理现场协议,Extensible Messaging and Presence Protocol)
是基于可扩展标记语言(XML)的协议,多用于即时消息(IM)以及在线现场探测。适用于服务器之间的准即时操作。核心是基于XML流传输,这个协议可能最终允许因特网用户向因特网上的其他任何人发送即时消息,即使其操作系统和浏览器不同。
其他:有些特殊框架(如:redis、kafka、zeroMq
等)根据自身需要未严格遵循MQ规范,而是基于TCP\IP自行封装了一套协议,通过网络socket接口进行传输,实现了MQ的功能。
Broker
消息服务器,作为server提供消息核心服务
Producer
消息生产者,业务的发起方,负责生产消息传输给broker,
Consumer
消息消费者,业务的处理方,负责从broker获取消息并进行业务逻辑处理
Topic
主题,发布订阅模式下的消息统一汇集地,不同生产者向topic发送消息,由MQ服务器分发到不同的订阅者,实现消息的 广播
Queue
队列,PTP模式下,特定生产者向特定queue发送消息,消费者订阅特定的queue完成指定消息的接收
Message
消息体,根据不同通信协议定义的固定格式进行编码的数据包,来封装业务数据,实现消息的传输
Kafka是最初由Linkedin
公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper
协调的分布式日志系统(也可以当做MQ
系统),常见可以用于web/nginx
日志、访问日志,消息服务等等,Linkedin
于2010年贡献给了Apache基金会并成为顶级开源项目。
主要应用场景是:日志收集系统和消息系统。
Kafka主要设计目标如下:
一个消息系统负责将数据从一个应用传递到另外一个应用,应用只需关注于数据,无需关注数据在两个或多个应用间是如何传递的。分布式消息传递基于可靠的消息队列,在客户端应用和消息系统之间异步传递消息。有两种主要的消息传递模式:点对点传递模式、发布-订阅模式。大部分的消息系统选用发布-订阅模式。Kafka就是一种发布-订阅模式。
在点对点消息系统中,消息持久化到一个队列中。此时,将有一个或多个消费者消费队列中的数据。但是一条消息只能被消费一次。当一个消费者消费了队列中的某条数据之后,该条数据则从消息队列中删除。该模式即使有多个消费者同时消费数据,也能保证数据处理的顺序。这种架构描述示意图如下:
生产者发送一条消息到queue,只有一个消费者能收到。
在发布-订阅消息系统中,消息被持久化到一个topic中。与点对点消息系统不同的是,消费者可以订阅一个或多个topic,消费者可以消费该topic中所有的数据,同一条数据可以被多个消费者消费,数据被消费后不会立马删除。在发布-订阅消息系统中,消息的生产者称为发布者,消费者称为订阅者。该模式的示例图如下:
发布者发送到topic的消息,只有订阅了topic的订阅者才会收到消息。
在深入理解Kafka之前,先介绍一下Kafka中的术语。下图展示了Kafka的相关术语以及之间的关系:
Kafka 集群包含一个或多个服务器,服务器节点称为broker。
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic,类似于数据库的表名。
topic中的数据分割为一个或多个partition。每个topic至少有一个partition。每个partition中的数据使用多个segment文件存储。partition中的数据是有序的,不同partition间的数据丢失了数据的顺序。如果topic有多个partition,消费数据时就不能保证数据的顺序。在需要严格保证消息的消费顺序的场景下,需要将partition数目设为1。
生产者即数据的发布者,该角色将消息发布到Kafka的topic中。broker接收到生产者发送的消息后,broker将该消息追加到当前用于追加数据的segment文件中。生产者发送的消息,存储到一个partition中,生产者也可以指定数据存储的partition。
消费者可以从broker中读取数据。消费者可以消费多个topic中的数据。
每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。
每个partition有多个副本,其中有且仅有一个作为Leader,Leader是当前负责数据的读写的partition。
Follower跟随Leader,所有写请求都通过Leader路由,数据变更会广播给所有Follower,Follower与Leader保持数据同步。如果Leader失效,则从Follower中选举出一个新的Leader。
# 解压
tar -zxvf apache-zookeeper-3.7.0-bin.tar.gz
# 修改配置文件
cd conf
cp coo_sample.cfg zoo.cfg
vim zoo.cfg
#启动
bin/zkServer.sh start
#查看
jps
#状态查看
bin/zkServer.sh status
#停止
bin/zkServer.sh stop
#启动客户端
bin/zkCli.sh
#退出
quit
tickTime = 2000
:通信心跳时间,Zookeeper
服务器与客户端心跳时间,单位毫秒initLimit = 10
:LF初始通信时限,Leader和Follower初始连接时能容忍的最多心跳数(tickTime
的数量)syncLimit = 5
:LF同步通信时限,Leader和Follower之间通信时间如果超过syncLimit * tickTime
,Leader认为Follwer
死 掉,从服务器列表中删除Follwer
。dataDir
:保存Zookeeper
中的数据,注意:默认的tmp
目录,容易被Linux系统定期删除,所以一般不用默认的tmp
目录。clientPort = 2181
:客户端连接端口,通常不做修改。#解压
tar -zxvf kafka_2.11-2.4.0.tgz
#修改配置文件
cd config
vim server.properties
# 修改以下配置
#broker.id属性在kafka集群中必须要是唯⼀
broker.id=0
#kafka部署的机器ip和提供服务的端⼝号(内网)
#listeners=PLAINTEXT://服务器地址:9092
#阿里云外网
advertised.listeners=PLAINTEXT://阿里云地址:9092
#kafka的消息存储⽂件
log.dir=/usr/local/data/kafka-logs
#kafka连接zookeeper的地址
zookeeper.connect=192.168.65.60:2181
#是否可以删除
delete.topic.enable=true
# 启动
cd bin
./kafka-server-start.sh -daemon ../config/server.properties
# 检查是否启动
jps
#查看端口问题
netstat -an | grep 9092
#或者
lsof -i:9092
# 防火墙开发端口
firewall-cmd --zone=public --add-port=9092/tcp --permanent
firewall-cmd --reload
#停止kafka
./kafka-server-stop.sh ../config/server.properties
注:这些命令我们不需要记,因为我们是在代码中完成这些命令
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my
./kafka-topics.sh --list --zookeeper localhost:2181
删除topic的前提是需要将kafka的消费者和生产者停止
./kafka-topics.sh --delete --zookeeper localhost:2181 --topic my
./kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic: my-replicated-topic PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
./kafka-console-producer.sh --broker-list 服务器地址:9092 --topic my
# 重头消费
./kafka-console-consumer.sh --bootstrap-server 服务器地址:9092 --topic my --from-beginning
# :从当前主题中的最后⼀条消息的offset(偏移量位置)+1开始消费
./kafka-console-consumer.sh --bootstrap-server 服务器地址:9092 --topic my
在⼀个kafka的topic中,启动两个消费者,⼀个⽣产者,问:⽣产者发送消息,这条消息是否 同时会被两个消费者消费? 如果多个消费者在同⼀个消费组,那么只有⼀个消费者可以收到订阅的topic中的消息。换⾔ 之,同⼀个消费组中只能有⼀个消费者收到⼀个topic中的消息。
./kafka-console-consumer.sh --bootstrap-server 服务器地址:9092 --consumer-property group.id=testGroup --topic my --from-beginning
不同的消费组订阅同⼀个topic,那么不同的消费组中只有⼀个消费者能收到消息。实际上也 是多个消费组中的多个消费者收到了同⼀个消息。
./kafka-console-consumer.sh --bootstrap-server 服务器地址:9092 --consumer-property group.id=testGroup01 --topic my --from-beginning
./kafka-console-consumer.sh --bootstrap-server 服务器地址:9092 --consumer-property group.id=testGroup02 --topic my --from-beginning
./kafka-consumer-groups.sh --bootstrap-server 服务器地海:9092 --describe --group testGroup
重点关注以下⼏个信息:
主题-topic在kafka中是⼀个逻辑的概念,kafka通过topic将消息进⾏分类。不同的topic会被 订阅该topic的消费者消费。 但是有⼀个问题,如果说这个topic中的消息⾮常⾮常多,多到需要⼏T来存,因为消息是会被 保存到log⽇志⽂件中的。为了解决这个⽂件过⼤的问题,kafka提出了Partition分区的概念
通过partition将⼀个topic中的消息分区来存储。这样的好处有多个:
分区存储,可以解决统⼀存储⽂件过⼤的问题
提供了读写的吞吐量:读和写可以同时在多个分区中进⾏
./kafka-topics.sh --create --zookeeper localhost:2181 --replicationfactor 1 --partitions 2 --topic test
注意开放端口,以及关闭防火墙
ip:2181,ip:2182,ip:2183
修改配置文件
cd conf
#修改配置文件
vim zoo.cfg
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/environment/zookeeper/apache-zookeeper-3.6.3-bin/data_log
# the port at which the clients will connect
clientPort=2182
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
## Metrics Providers
#
# https://prometheus.io Metrics Exporter
#metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
#metricsProvider.httpPort=7000
#metricsProvider.exportJvmInfo=true
server.1=ip1:2888:3888
server.2=ip2:2888:3888
server.3=ip3:2888:3888
quorumListenOnAllIPs=true
#启动zookeeper,修改其他机器的配置文件
bin/zkServer.sh start、
# 等待一下,查看选举状态
bin/zkServer.sh status
[root@shu apache-zookeeper-3.6.3-bin]# bin/zkServer.sh status
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /environment/zookeeper/apache-zookeeper-3.6.3-bin/bin/../conf/zoo.cfg
Client port found: 2182. Client address: localhost. Client SSL: false.
Mode: leader
[root@shu apache-zookeeper-3.6.3-bin]#
# 问题:端口开发问题,防火墙问题
# 防火墙开发端口
firewall-cmd --zone=public --add-port=2182/tcp --permanent
firewall-cmd --reload
#关闭防火墙
systemctl stop firewalld
注意开放端口,以及关闭防火墙
ip:9092,ip:9093,ip:9094
修改配置文件
cd config
#修改配置文件
vim server.properties
#修改zookeeper连接
zookeeper.connect=ip:2181,ip:2182,ip:2183
# 分布修改三台的机器的配置文件,并启动
#broker.id属性在kafka集群中必须要是唯⼀
broker.id=0
./kafka-server-start.sh -daemon ../config/server.properties
# 检查是否启动
jps
#查看端口问题
netstat -an | grep 9092
#或者
lsof -i:9092
# 防火墙开发端口
firewall-cmd --zone=public --add-port=9092/tcp --permanent
firewall-cmd --reload
#停止kafka
./kafka-server-stop.sh ../config/server.properties
# 验证,我们在lead机器上面创建一个topic
./kafka-topics.sh --create --zookeeper localhost:2182 --replication-factor 1 --partitions 1 --topic my
#查看其余机器上的topic
[root@xlc bin]# ./kafka-topics.sh --list --zookeeper localhost:2183
my
[root@shu bin]# ./kafka-topics.sh --list --zookeeper localhost:2181
my
副本是为了为主题中的分区创建多个备份,多个副本在kafka集群的多个broker中,会有⼀个 副本作为leader,其他是follower(就是备份)
# 创建topic
./kafka-topics.sh --create --zookeeper localhost:2182 --replication-factor 3 --partitions 2 --topic my-replicated-topic
# 查看topic详细信息
./kafka-topics.sh --describe --zookeeper localhost:2182 --topic my-replicated-topic
[root@shu bin]# ./kafka-topics.sh --create --zookeeper localhost:2182 --replication-factor 3 --partitions 2 --topic my-replicated-topic
Created topic my-replicated-topic.
[root@shu bin]# ./kafka-topics.sh --describe --zookeeper localhost:2182 --topic my-replicated-topic
Topic: my-replicated-topic PartitionCount: 2 ReplicationFactor: 3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
Topic: my-replicated-topic Partition: 1 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
leader: kafka的写和读的操作,都发⽣在leader上。
leader负责把数据同步给follower。当leader挂 了,经过主从选举,从多个follower中选举产⽣⼀个新的leader follower 接收leader的同步的数据
isr:可以同步和已同步的节点会被存⼊到isr集合中。这⾥有⼀个细节:如果isr中的节点性能较差,会被提出isr集合。
集群中有多个broker,创建主题时可以指明主题有多个分区(把消息拆分到不同的分区中存 储),可以为分区创建多个副本,不同的副本存放在不同的broker⾥。
# 创建topic
./kafka-topics.sh --create --zookeeper localhost:2182 --replication-factor 3 --partitions 2 --topic my-replicated-topic
# 查看topic信息
./kafka-topics.sh --describe --zookeeper localhost:2182 --topic my-replicated-topic
# 创建消息
./kafka-console-producer.sh --broker-list ip:9093 --topic my-replicated-topic
>nihao
./kafka-console-consumer.sh --bootstrap-server ip:9092 --topic my-replicated-topic
./kafka-console-consumer.sh --bootstrap-server ip:9093 --topic my-replicated-topic
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
/**
* @Author shu
* @Date: 2021/10/22/ 16:25
* @Description
**/
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class MySimpleProducer {
private final static String TOPIC_NAME = "my-replicated-topic";
public static void main(String[] args) throws ExecutionException, InterruptedException {
//1.设置参数
Properties props = new Properties();
//领导者主机
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "ip:9093");
//把发送的key从字符串序列化为字节数组
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//把发送消息value从字符串序列化为字节数组
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//2.创建⽣产消息的客户端,传⼊参数
Producer<String,String> producer = new KafkaProducer<String, String>(props);
//3.创建消息
//key:作⽤是决定了往哪个分区上发,value:具体要发送的消息内容
ProducerRecord<String,String> producerRecord = new ProducerRecord<>(TOPIC_NAME,"value","hello-kafka-ok");
//4.发送消息,得到消息发送的元数据并输出
RecordMetadata metadata = producer.send(producerRecord).get();
System.out.println( "topic-" + metadata.topic() + "|partition-" + metadata.partition() + "|offset-" + metadata.offset());
}
}
可以发现我们的消费者已经收到了消息
ack = 0 kafka-cluster
:不需要任何的broker收到消息,就⽴即返回ack给⽣产者,最容易 丢消息的,效率是最⾼的
ack=1(默认)
: 多副本之间的leader已经收到消息,并把消息写⼊到本地的log中,才 会返回ack给⽣产者,性能和安全性是最均衡的
ack=-1/all
:⾥⾯有默认的配置min.insync.replicas=2(默认为1,推荐配置⼤于等于2), 此时就需要leader和⼀个follower同步完后,才会返回ack给⽣产者(此时集群中有2个 broker已完成数据的接收),这种⽅式最安全,但性能最差。
props.put(ProducerConfig.ACKS_CONFIG, "1");
/*
发送失败会重试,默认重试间隔100ms,重试能保证消息发送的可靠性,但是也可能造
成消息重复发送,⽐如⽹络抖动,所以需要在
接收者那边做好消息接收的幂等性处理
*/
props.put(ProducerConfig.RETRIES_CONFIG, 3);
//重试间隔设置
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);
producer 先从 zookeeper 的 “/brokers/…/state” 节点找到该 partition 的 leader
producer 将消息发送给该 leader
leader 将消息写入本地 log
followers 从 leader pull 消息,写入本地 log 后 leader 发送 ACK
leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset) 并向 producer 发送 ACK
异步
异步发送,⽣产者发送完消息后就可以执⾏之后的业务,broker在收到消息后异步调⽤⽣产 者提供的callback回调⽅法。但是容易造成消息丢失。
//异步发送消息
producer.send(producerRecord, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception
exception) {
if (exception != null) {
System.err.println("发送消息失败:" +
exception.getStackTrace());
}
if (metadata != null) {
System.out.println("异步⽅式发送消息结果:" + "topic-" +
metadata.topic() + "|partition-"
+ metadata.partition() + "|offset-" + metadata.offset());
}
}
});
//缓存区默认大小
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
//拉取数据默认大小
props.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
//如果数据未满16k,也提交
props.put(ProducerConfig.LINGER_MS_CONFIG,10);
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
/**
* @Author shu
* @Date: 2021/10/25/ 15:09
* @Description 消费者
**/
public class MySimpleConsumer {
//主题名
private final static String TOPIC_NAME = "my-replicated-topic";
//分组
private final static String CONSUMER_GROUP_NAME = "testGroup";
public static void main(String[] args) {
Properties props =new Properties();
//消息地址
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "47.104.223.187:9093");
//分组
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);
//序列化
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//自动提交,拉取到信息之后,立马提交偏移量给consumer_offset,保证顺序消费,但是会造成消息丢失问题
// // 是否⾃动提交offset,默认就是true
// props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
// // ⾃动提交offset的间隔时间
// props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
//手动提交,当消费者消费消息完毕之后,返回偏移量
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
//⼀次poll最⼤拉取消息的条数,可以根据消费速度的快慢来设置
// props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
//props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);
//1.创建⼀个消费者的客户端
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
//2. 消费者订阅主题列表
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
/*
* 3.poll() API 是拉取消息的⻓轮询
*/
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
//4.打印消息
System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(),
record.offset(), record.key(), record.value());
}
//所有的消息已消费完
if (records.count() > 0) {//有消息
// ⼿动同步提交offset,当前线程会阻塞直到offset提交成功
// ⼀般使⽤同步提交,因为提交之后⼀般也没有什么逻辑代码了
consumer.commitSync();//=======阻塞=== 提交成功
}
}
}
}
_consumer_offsets
主题⾥⾯,保证顺序。 //自动提交,拉取到信息之后,立马提交偏移量给consumer_offset,保证顺序消费,但是会造成消息丢失问题
// // 是否⾃动提交offset,默认就是true
// props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
// // ⾃动提交offset的间隔时间
// props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
_consumer_offsets
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
//⼀次poll最⼤拉取消息的条数,可以根据消费速度的快慢来设置
// props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
//props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);
while (true) {
/*
* 3.poll() API 是拉取消息的⻓轮询
*/
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
//4.打印消息
System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(),
record.offset(), record.key(), record.value());
}
//所有的消息已消费完
if (records.count() > 0) {//有消息
// ⼿动同步提交offset,当前线程会阻塞直到offset提交成功
// ⼀般使⽤同步提交,因为提交之后⼀般也没有什么逻辑代码了
consumer.commitSync();//=======阻塞=== 提交成功
}
}
//⼀次poll最⼤拉取消息的条数,可以根据消费速度的快慢来设置
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
//⼀次poll最⼤拉取消息的条数,可以根据消费速度的快慢来设置
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
//如果两次poll的时间如果超出了30s的时间间隔,kafka会认为其消费能⼒过弱,将其踢
出消费组。将分区分配给其他消费者。-rebalance
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);
while (true) {
/*
* poll() API 是拉取消息的⻓轮询
*/
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("收到消息:partition = %d,offset = %d, key = %s,
value = %s%n", record.partition(),
record.offset(), record.key(), record.value());
}
消费者每隔1s向kafka集群发送⼼跳,集群发现如果有超过10s没有续约的消费者,将被踢出 消费组,触发该消费组的rebalance机制,将该分区交给消费组⾥的其他消费者进⾏消费。
//consumer给broker发送⼼跳的间隔时间
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
//kafka如果超过10秒没有收到消费者的⼼跳,则会把消费者踢出消费组,进⾏
rebalance,把分区分配给其他消费者。
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME,
0)));
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);
List<PartitionInfo> topicPartitions =
consumer.partitionsFor(TOPIC_NAME);
//从1⼩时前开始消费
long fetchDataTime = new Date().getTime() - 1000 * 60 * 60;
Map<TopicPartition, Long> map = new HashMap<>();
for (PartitionInfo par : topicPartitions) {
map.put(new TopicPartition(TOPIC_NAME, par.partition()),
fetchDataTime);
}
Map<TopicPartition, OffsetAndTimestamp> parMap =
consumer.offsetsForTimes(map);
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry :
parMap.entrySet()) {
TopicPartition key = entry.getKey();
OffsetAndTimestamp value = entry.getValue();
if (key == null || value == null) continue;
Long offset = value.offset();
System.out.println("partition-" + key.partition() +
"|offset-" + offset);
System.out.println();
//根据消费⾥的timestamp确定offset
if (value != null) {
consumer.assign(Arrays.asList(key));
consumer.seek(key, offset);
}
}
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
server.port=8080
#########kafka配置#############
# lead机器
spring.kafka.bootstrap-servers=ip:9093
#########producer############
# ack
spring.kafka.producer.acks=1
# 拉取大小
spring.kafka.producer.batch-size=16384
# 重试次数
spring.kafka.producer.retries=10
# 缓冲区大小
spring.kafka.producer.buffer-memory=33554432
# 序列化
spring.kafka.producer.key-serializer= org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#########consumer############
# 关闭自动提交
spring.kafka.consumer.enable-auto-commit=false
# 消费组
spring.kafka.consumer.group-id=default-group
#
spring.kafka.consumer.auto-offset-reset=earliest
# 反序列化
spring.kafka.consumer.key-deserializer= org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer= org.apache.kafka.common.serialization.StringDeserializer
# 最大消息
spring.kafka.consumer.max-poll-records=500
spring.kafka.listener.ack-mode=manual_immediate
# redis
spring.redis.host=ip
package com.demo.demo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @Author shu
* @Date: 2021/10/27/ 16:45
* @Description
**/
@RestController
public class KafkaProvide {
private final static String TOPIC_NAME = "my-replicated-topic";
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
@RequestMapping("/send")
public String sendMessage(){
kafkaTemplate.send(TOPIC_NAME,0,"key","this is a message!");
return "send success!";
}
}
package com.demo.demo;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.PartitionOffset;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
/**
* @Author shu
* @Date: 2021/10/27/ 16:46
* @Description
**/
@Component
public class KafkaConsumer {
/**
* 单条消息消费
* @param record
* @param ack
*/
@KafkaListener(topics = "my-replicated-topic",groupId = "MyGroup1")
public void listenGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
String value = record.value();
System.out.println(value);
System.out.println(record);
//⼿动提交offset
ack.acknowledge();
}
/**
* 其他分区消费配置
* @param record
* @param ack
*/
@KafkaListener(groupId = "testGroup", topicPartitions = {
@TopicPartition(topic = "topic1", partitions = {"0", "1"}),
@TopicPartition(topic = "topic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1",
initialOffset = "100"))
},concurrency = "3")//concurrency就是同组下的消费者个数,就是并发消费数,建议⼩于等于分区总数
public void listenGroupPro(ConsumerRecord<String, String> record,
Acknowledgment ack) {
String value = record.value();
System.out.println(value);
System.out.println(record);
//⼿动提交offset
ack.acknowledge();
}
}
package com.demo.demo.pojo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.util.Date;
/**
* @Author shu
* @Date: 2021/10/29/ 9:49
* @Description 消息实体类
**/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class MsgInfo implements Serializable {
private Long id;
private String name;
private Long msg;
private Date time;
}
package com.demo.demo.kafka;
import com.demo.demo.pojo.MsgInfo;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
/**
* @Author shu
* @Date: 2021/10/28/ 19:55
* @Description
**/
@Component
public class KafkaTest {
//topic
private final static String TOPIC_NAME = "my-replicated-topic";
//程序执行的初始时间,只会保留一份
private static final AtomicLong lastRecieveMessage = new AtomicLong(System.currentTimeMillis());
//时间转换
private static final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//前缀
private static final String KEY_PREFIX = "test";
//缓存
private final List<ConsumerRecord<String,String>> DataList = new ArrayList<>();
//json
private final Gson gson = new GsonBuilder().create();
//kafka
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
/**
* 消息接受者(每隔1分钟执行)
*/
@Scheduled(cron = "0 */1 * * * ?")
public void Consumer() {
long last = lastRecieveMessage.get();
long current = System.currentTimeMillis();
if ((current - last) > (60 * 1000)){
System.out.println(DataList);
for (ConsumerRecord<String, String> consumerRecord : DataList) {
MsgInfo info = gson.fromJson(consumerRecord.value(), MsgInfo.class);
System.out.println("消息:"+info);
}
DataList.clear();
}
}
/**
* 消息发送者(30s执行一次)
*/
@Scheduled(cron = "0/30 * * * * ? ")
public void Provide(){
long last = lastRecieveMessage.get();
long current = System.currentTimeMillis();
if ((current - last) > (30 * 1000) ){
MsgInfo msgInfo=new MsgInfo(current-last,"测试",last,new Date());
kafkaTemplate.send(TOPIC_NAME,"test",gson.toJson(msgInfo));
}
}
/**
* 单条消费
* @param record
* @param ack
*/
@KafkaListener(topics = TOPIC_NAME,groupId = "MyGroup1")
public void listenGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
DataList.add(record);
//⼿动提交offset
ack.acknowledge();
}
}