您的当前位置:首页正文

消息队列——数十万级消息的消费方案

2024-10-17 来源:个人技术集锦

背景:

​ 下游平台通过消息队列上报监控消息,但是消息量很大,在三分钟左右可以达到百万级别,而对于我的服务来说,我需要对这些消息进行一些业务处理,然后再存入es中。(为了简化场景,以下对于消息的处理只是单纯的存储到es中)

服务启动不到10s,es中写入的数据

青铜方案:

​ MQ只要收到消息,就直接调用es进行存储。

伪代码如下:

// 伪代码版本public void processRequestMessage(MessageInfo info) {    // 将接收到的信息对象复制为一个新对象(例如,监控数据对象)     MonitorData monitorData = Util.copyProperties(info, MonitorData.class);    // 将新对象的 JSON 字符串索引到 Elasticsearch 中     elasticClient.index("monitor_index", info.getId(), convertToJson(monitorData), false);}

存在的问题:

​ 不难发现,这样的实现方式,会导致消息消费速度非常慢,甚至导致消息积压和服务挂掉,因为这里对es的调用次数=消息条数,通过在本地的测试中也可以发现,即使在关闭掉消息生产者后,还是需要很长一段时间才能将消息消费完全消费掉。

白银方案:

​ 通过瓶颈,可以很自然的想到使用es的批量增加,那么只需要实现一个缓冲池,将消息暂存到缓冲池中,在达到一定大小的时候再统一在es存储

伪代码如下:

@Component@Slf4jpublic class ESOperationMonitorBuffer{    private static final int BUFFER_SIZE = 100;  // 缓冲池大小    private List<ElasticDoc> buffer;  // 用于存储消息的缓冲池    @Autowired    private ElasticClient elasticClient;    private String indexName = EsConstans.NODE_MONITOR_INDEX;  // Elasticsearch 索引名称    public ESOperationMonitorBuffer() {        this.buffer = new ArrayList<>();    }    // 添加消息到缓冲池    public void addMessage(WlwMessageShareInfo message)  {        ElasticDoc elasticDoc = new ElasticDoc();        elasticDoc.setIndex(indexName);        MonitorESData monitorESData = BeanUtil.copyProperties(message, MonitorESData.class);        elasticDoc.setDoc(JSONObject.toJSONString(monitorESData));        buffer.add(elasticDoc);        if(buffer.size() > BUFFER_SIZE){        	flush();        }    }    /**     * 执行 flush 操作     */    private void flush()  {       log.info("开始批量插入 Elasticsearch,共 {} 条数据", buffer.size());            if (buffer.isEmpty()) {                return;  // 如果缓冲池为空,不执行操作            }            BulkResponse index = elasticClient.index(buffer, false);// 批量插入            if (index.hasFailures()) {                log.error("批量插入 Elasticsearch 失败,失败原因:{}", index.buildFailureMessage());            }            // 清空缓冲池            buffer.clear();    }    // 如果程序关闭前有剩余数据,执行 flush 操作    public void close() throws IOException {    }}

存在的问题:

  1. 若消息数量一直没达到阈值,就一直不会保存到es

  2. 存在并发问题,ConcurrentModificationException(并发修改异常),是基于java集合中的 快速失败(fail-fast) 机制产生的,在使用迭代器遍历一个集合对象时,如果遍历过程中对集合对象的内容进行了增删改,就会抛出该异常。快速失败机制使得java的集合类不能在多线程下并发修改,也不能在迭代过程中被修改。在上面场景中的表现就是在flush操作中时,又有消息进入到了buffer中。

黄金方案:

解决问题1 :可以开启一个定时任务去执行flush方法

解决问题2 :可能大家第一时间会想到对buffer加锁,但是这样又会导致在存入buffer的时候速度慢,所以不难想到可以对 flush() 方法加锁, 但是这样一来还是无法解决buffer存在的并发问题,怎么办呢?其实很简单,我们可以用两个buffer来分别给add()方法和flush()方法使用,这样一来,就可以避免并发问题,并且继续对flush()方法加锁,避免和定时任务同时执行,导致数据重复。

伪代码如下:

@Component@Slf4jpublic class ESOperationMonitorBuffer implements CommandLineRunner {    private static final int BUFFER_SIZE = 100;  // 缓冲池大小    private List<ElasticDoc> buffer;  // 用于存储消息的缓冲池    private List<ElasticDoc> temBuffer;  // 用于存储临时消息的缓冲池    @Autowired    private ElasticClient elasticClient;    private String indexName = EsConstans.NODE_MONITOR_INDEX;  // Elasticsearch 索引名称    private Lock lock = new ReentrantLock();        public ESOperationMonitorBuffer() {        this.buffer = new ArrayList<>();        this.temBuffer = new ArrayList<>();    }    // 添加消息到缓冲池    public void addMessage(WlwMessageShareInfo message)  {        ElasticDoc elasticDoc = new ElasticDoc();        elasticDoc.setIndex(indexName);        MonitorESData monitorESData = BeanUtil.copyProperties(message, MonitorESData.class);        elasticDoc.setDoc(JSONObject.toJSONString(monitorESData));        temBuffer.add(elasticDoc);        // 当缓冲池达到设定大小时,批量插入到 Elasticsearch        if (temBuffer.size() >= BUFFER_SIZE) {            lock.lock();            try{                buffer.addAll(temBuffer);                temBuffer.clear();            }catch(Exception e){                log.error("添加消息到缓冲池失败",e);            }finally {                lock.unlock();            }            flush();        }    }    /**     * 执行 flush 操作     */    private void flush()  {        lock.lock();        try{            log.info("开始批量插入 Elasticsearch,共 {} 条数据", buffer.size());            if (buffer.isEmpty()) {                return;  // 如果缓冲池为空,不执行操作            }            BulkResponse index = elasticClient.index(buffer, false);// 批量插入            if (index.hasFailures()) {                log.error("批量插入 Elasticsearch 失败,失败原因:{}", index.buildFailureMessage());            }            // 清空缓冲池            buffer.clear();        }catch (Exception e){            log.info("批量插入 Elasticsearch 失败",e);        }finally {            lock.unlock();        }    }    // 如果程序关闭前有剩余数据,执行 flush 操作    public void close() throws IOException {    }    @Override    public void run(String... args) throws Exception {        log.info("启动 ESOperationMonitorBuffer 缓冲池,开启线程池定时执行flush操作");        // 定时执行 flush 操作        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);        scheduledExecutorService.scheduleAtFixedRate(()->{            try{                flush();            }catch (Exception e){                log.error("定时执行 flush 操作失败",e);            }        },1,5, TimeUnit.SECONDS);    }}

显示全文