第六章主要说了消息消费的方式,消息队列的负载,消息拉去 消息消费 消息消费进度存储,消息过滤,定时消息,顺序消息

RMQ消息消费的方式包含集群模式和广播模式

负载交由RebalanceService线程负责,按照负载算法进行重新分配,分配原则为同一个消费者可以分配多个消息消费队列,同一个消息消费队列同一时间只会分配给一个消费者

然后是消息消费流程

消息消费中需要消息拉去,交给RebalanceService创建的PullRequest进行拉去,一次拉取32条消息,然后进行不断的提交,并交由Broker进行控流

并发的消息消费是并发的对同一个消息消费队列的消息进行消费,消费成功后,取出消息处理队列中最小的消息偏移量作为消息消费进度偏移量存在与消息消费进度存储文件中

如果消费者消费失败,就会返回RECOSUME_LATER,将消息存储在主题为SCHEDULE_TOPIC_XXX的消息消费队列中,等待一段时间的拉取

RMQ不支持任意精度的定时消息,只有提供特定的消息延迟级别,比如1S 2S 5S等,通过在broker配置文件中可以设置DelayLevel,其本质就是创建对应延迟级别的定时任务从消费队列中拉取消息并恢复消息的原主题再次存入commitLog方便拉取消费

消息的过滤则是支持表达式模式和类过滤模式,我们只说了表达式模式

而表达式模式分为了TAG模式和SQL92模式,TAG默认是指定一个TAG,然后订阅TAG,如果订阅的TAG包含消息TAG则消费,SQL表达式则是基于SQL表达式过滤模式

顺序消息,则是消息消费者内的线程池中的线程对消息消费队列只能串行的消费,即加上消息队列的锁

发表评论

邮箱地址不会被公开。 必填项已用*标注