我们可以有三种方式
基本步骤如下:
流程如下:
流程如下:
方式一:同步调用
方式二:异步通知
方式三:监听binlog
现在我们使用消息队列完成mysql数据和es索引库数据的同步
这个工程是对mysql进行操作时,发送消息到mq消息队列中,然后hotel子工程接口消息队列的消息后,对es索引库的数据进行修改同步数据
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
server:
port: 8099
spring:
datasource:
url: jdbc:mysql://192.168.230.130/hotel_db?useSSL=false
username: root
password: 1234
driver-class-name: com.mysql.jdbc.Driver
rabbitmq:
host: 192.168.230.100
password: 1234
virtual-host: /
username: hhh
port: 5672 #发送消息和接口接收消息的端口号
维护了一个常量类,保存交换机需要的名字和消息队列的名字和需要的routingKey
public class MqConstants {
/**
* 交换机
*/
public final static String HOTEL_EXCHANGE = "hotel.topic";
/**
* 监听新增和修改的队列
*/
public final static String HOTEL_INSERT_QUEUE = "hotel.insert.queue";
/**
* 监听删除的队列
*/
public final static String HOTEL_DELETE_QUEUE = "hotel.delete.queue";
/**
* 新增或修改的RoutingKey
*/
public final static String HOTEL_INSERT_KEY = "hotel.insert";
/**
* 删除的RoutingKey
*/
public final static String HOTEL_DELETE_KEY = "hotel.delete";
}
/**
* mq配置类,用于创建交换机和消息对列,并进行绑定
*/
@Configuration
public class MqConfig {
/**
* 声明了一个主题交换机
*/
@Bean
public TopicExchange topicExchange(){
return new TopicExchange(MqConstants.HOTEL_EXCHANGE,true,false);
}
/**
* 对数据库进行插入操作或者更新操作时发消息到的消息队列,
* 因为对es进行操作时,如果数据不存在就插入,数据存在就直接更新,所以更新和插入操作只需要一个消息队列
*/
@Bean
public Queue insertQueue(){
return new Queue(MqConstants.HOTEL_INSERT_QUEUE,true);
}
/**
* 删除数据时的消息对列
*/
@Bean
public Queue deleteQueue(){
return new Queue(MqConstants.HOTEL_DELETE_QUEUE,true);
}
/**
* 将插入数据的消息队列绑定到主题交换机上,并设置routingKey
* @return
*/
@Bean
public Binding bindQueue1(){
return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstants.HOTEL_INSERT_KEY);
}
@Bean
public Binding bindQueue2(){
return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstants.HOTEL_DELETE_KEY);
}
}
查询数据时不需要消息队列,只有新增,更新和删除数据时需要消息队列,发送到消息对列的消息为hotel每个对象的id,这样才可以知道哪一个数据发生了改变
@RestController
@RequestMapping("hotel")
public class HotelController {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private IHotelService hotelService;
@GetMapping("/{id}")
public Hotel queryById(@PathVariable("id") Long id){
return hotelService.getById(id);
}
@GetMapping("/list")
public PageResult hotelList(
@RequestParam(value = "page", defaultValue = "1") Integer page,
@RequestParam(value = "size", defaultValue = "1") Integer size
){
Page<Hotel> result = hotelService.page(new Page<>(page, size));
return new PageResult(result.getTotal(), result.getRecords());
}
@PostMapping
public void saveHotel(@RequestBody Hotel hotel){
hotelService.save(hotel);
//TODO:传递到消息队列的消息为插入数据的id,这样才可以知道哪个数据发生了改变
rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INSERT_KEY,hotel.getId());
}
@PutMapping()
public void updateById(@RequestBody Hotel hotel){
if (hotel.getId() == null) {
throw new InvalidParameterException("id不能为空");
}
hotelService.updateById(hotel);
//TODO:传递到消息队列的消息为插入数据的id,这样才可以知道哪个数据发生了改变
rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INSERT_KEY,hotel.getId());
}
@DeleteMapping("/{id}")
public void deleteById(@PathVariable("id") Long id) {
hotelService.removeById(id);
//TODO:传递到消息队列的消息为插入数据的id,这样才可以知道哪个数据发生了改变
rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_DELETE_KEY,id);
}
}
删除这个id为38812的数据
消息队列声明成功
现在删除数据的消息队列有一条数据
GET /hotel/_doc/38812
编写监听消息队列的类
@Component
public class MqListener {
@Autowired
private HotelService hotelService;
/**
* 监听插入消息的消息队列
* @param id 消息队列中的id消息
*/
@RabbitListener(queues = MqConstants.HOTEL_INSERT_QUEUE)
private void receiveInsertOrUpdateMsg(Long id){
hotelService.updateOrInsertMsg(id);
}
/**
* 监听删除消息的消息队列
*/
@RabbitListener(queues = MqConstants.HOTEL_DELETE_QUEUE)
private void receiveDeleteMsg(Long id){
hotelService.deleteMsg(id);
}
}
实现
/**
* 新增数据和更新数据
* @param id
*/
@Override
public void updateOrInsertMsg(Long id) {
//根据id去数据库查询最新的对象消息
Hotel hotel = hotelMapper.selectById(id);
//发送数据
HotelDoc hotelDoc = new HotelDoc(hotel);
String jsonData = JSON.toJSONString(hotelDoc);
IndexRequest request = new IndexRequest("hotel").id(id + "");
request.source(jsonData, XContentType.JSON);
try {
restHighLevelClient.index(request,RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException("更新数据失败");
}
}
/**
* 删除数据
* @param id
*/
@Override
public void deleteMsg(Long id) {
DeleteRequest request = new DeleteRequest("hotel", id+"");
try {
restHighLevelClient.delete(request,RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException("删除数据失败");
}
}
es的数据也已经进行删除
单机的elasticsearch做数据存储,必然面临两个问题:海量数据存储问题、单点故障问题。
解决的方法
ES集群相关概念:
集群(cluster):一组拥有共同的 cluster name 的 节点。
节点(node) :集群中的一个 Elasticearch 实例
分片(shard):索引可以被拆分为不同的部分进行存储,称为分片。在集群环境下,一个索引的不同分片可以拆分到不同的节点中
创建多个es节点,然后将一个索引库的所有数据分到不同的节点中
此处,我们把数据分成3片:shard0、shard1、shard2
节点中可以存储自己的主分片(自己节点的数据)以及其他节点的副本分片,副本分片是其他节点数据的备份
主分片(Primary shard):相对于副本分片的定义。
副本分片(Replica shard)每个主分片可以有一个或者多个副本,数据和主分片一样。
我们会在单机上利用docker容器运行多个es实例来模拟es集群。不过生产环境推荐大家每一台服务节点仅部署一个es的实例。
部署es集群可以直接使用docker-compose来完成,但这要求你的Linux虚拟机至少有4G的内存空间
编写docker-compose文件
version: '2.2'
services:
es01:
image: elasticsearch:7.12.1
container_name: es01
environment:
- node.name=es01
- cluster.name=es-docker-cluster
- discovery.seed_hosts=es02,es03
- cluster.initial_master_nodes=es01,es02,es03
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- data01:/usr/share/elasticsearch/data
ports:
- 9201:9200
networks:
- elastic
es02:
image: elasticsearch:7.12.1
container_name: es02
environment:
- node.name=es02
- cluster.name=es-docker-cluster
- discovery.seed_hosts=es01,es03
- cluster.initial_master_nodes=es01,es02,es03
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- data02:/usr/share/elasticsearch/data
ports:
- 9202:9200
networks:
- elastic
es03:
image: elasticsearch:7.12.1
container_name: es03
environment:
- node.name=es03
- cluster.name=es-docker-cluster
- discovery.seed_hosts=es01,es02
- cluster.initial_master_nodes=es01,es02,es03
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- data03:/usr/share/elasticsearch/data
ports:
- 9203:9200
networks:
- elastic
volumes:
data01:
driver: local
data02:
driver: local
data03:
driver: local
networks:
elastic:
driver: bridge
运行命令
docker-compose up -d
elasticsearch中集群节点有不同的职责划分:
默认情况下,集群中的任何一个节点都同时具备上述四种角色。
但是真实的集群一定要将集群职责分离:
职责分离可以让我们根据不同节点的需求分配不同的硬件去部署。而且避免业务之间的互相干扰。
一个典型的es集群职责划分如图:
脑裂是因为集群中的节点失联导致的。
例如一个集群中,主节点与其它节点因为网络问题失联:
其他节点会一位主节点已经down掉,所以会从候选节点中选择一个新的主节点,然后之前的主节点网络正常后重新连接,这样一样在es集群中就出现了两个主节点,这就是脑裂问题
解决脑裂的方案是,要求选票超过 ( eligible节点数量 + 1 )/ 2 才能当选为主,因此eligible节点数量最好是奇数。对应配置项是discovery.zen.minimum_master_nodes,在es7.0以后,已经成为默认配置,因此一般不会发生脑裂问题
例如:3个节点形成的集群,选票必须超过 (3 + 1) / 2 ,也就是2票。
node1节点作为主节点因为网络问题失联时,node2,node3会选出一个新的主节点,假设新的主节点为node3,那么这个新的主节点就会得到node3,node2的选票,这样一来就算node1重新连接后,他只有一票,就会自动取消自己的主节点,这样就可以解决集群的脑裂问题
master eligible节点的作用是什么?
data节点的作用是什么?
coordinator节点的作用是什么?
路由请求到其它节点
合并查询到的结果,返回给用户
PUT /itcast
{
"settings": {
"number_of_shards": 3, // 分片数量
"number_of_replicas": 1 // 副本数量
},
"mappings": {
"properties": {
"title":{
"type":"text"
}
// mapping映射定义 ...
}
}
}
当新增文档时,应该保存到不同分片,保证数据均衡,那么coordinating node如何确定数据该存储到哪个分片呢?
插入三条数据:
测试可以看到,三条数据分别在不同分片:
elasticsearch会通过hash算法来计算文档应该存储到哪个分片:
说明:
新增文档的流程如下:
过程细节描述:
集群写入时,会先随机选取一个节点(node),该节点可以称之为“协调节点”。 新文档写入前,es会对其id做hash取模,来确定该文档会分布在哪个分片上。 当分片位置确定好后,es会判图当前“协调节点”上是否有该主分片。如果有,直接写;如果没有,则会将数据路由到包含该主分片的节点上。 整个写入过程是,es会将文档先写入主分片上(如p0),写完后再将数据同步一份到副本上(如r0) 待副本数据也写完后,副本节点会通知协调节点,最后协调节点告知客户端,文档写入结束。
elasticsearch的查询分成两个阶段:
scatter phase:分散阶段,coordinating node会把请求分发到每一个分片
gather phase:聚集阶段,coordinating node汇总data node的搜索结果,并处理为最终结果集返回给用户
集群的master节点会监控集群中的节点状态,如果发现有节点宕机,会立即将宕机节点的分片数据迁移到其它节点,确保数据安全,这个叫做故障转移。
1)例如一个集群结构如图:
现在,node1是主节点,其它两个节点是从节点。
2)突然,node1发生了故障:
宕机后的第一件事,需要重新选主,例如选中了node2:
node2成为主节点后,会检测集群监控状态,然后就会发现分片0没有主分片,分片1没有副本分片,所以就会创建分片0的主分片和分片1的副本分片,最后分别存储在node2,node3节点中。因此需要将node1上的数据迁移到node2、node3:
如果node1节点恢复,就会删除与node1相同的分片数据,变回原来的样子,但是现在的主节点是node2