适用场景

适用场景

ZMQ主要用于产品之间或者模块之间,将产品或者模块之间的直接依赖转变为间接依赖,保证各个产品的独立性和层次性。

不适用场景

  • 上游环节关注下游环节的结果

如果上游环节需要直接依赖下游环节,根据具体的结果推进后续的流程,则不适合使用ZMQ

  • 不能容忍异步操作

ZMQ将系统分隔开,需要2个系统之间进行异步的操作,对于某个完整的业务,无法从业务角度拆分开,则不适合使用ZMQ。

  • 不能容忍短暂的不一致

ZMQ引入后,跨系统的执行存在延迟,无论延迟的大小,对于业务来说,必然存在2个系统的数据出现短暂的不一致,在一定的延迟之后,才能达到最终一致。对于对系统要求强一致性的场景,则不适合使用ZMQ。

对于ZMQ不适用的场景,可以参考使用ZDubbo。

命名规范

Producer ID

定义:一类 Producer 的标识,这类 Producer 通常生产并发送一类消息,且发送逻辑一致。

规范:PID_<应用名>_<生产者组名>,使用大写的PID_作为前缀,应用名表示该组所属的应用,生产者组名由应用自己定义,只能包含字母,数字和下划线,长度64字节以内

Consumer ID

定义:一类 Consumer 的标识,这类 Consumer 通常接收并消费一类消息,且消费逻辑一致。

规范:CID_<应用名>_<消费者组名>,使用大写的CID_作为前缀,应用名表示该组所属的应用,消费者组名由应用自己定义,只能包含字母,数字和下划线,长度64字节以内

Topic

定义:消息主题,一级消息类型,通过 Topic 对消息进行分类。

规范:只能包含字母,数字和下划线,长度64字节以内

名词解释

Message

消息,消息队列中信息传递的载体。

Message Key

消息的业务标识,由消息生产者(Producer)设置,唯一标识某个业务逻辑。

Topic

消息主题,一级消息类型,通过 Topic 对消息进行分类。

Producer

消息生产者,也称为消息发布者,负责生产并发送消息。

Producer ID

一类 Producer 的标识,这类 Producer 通常生产并发送一类消息,且发送逻辑一致。

Producer实例

Producer的一个对象实例,不同的 Producer 实例可以运行在不同进程内或者不同机器上。Producer 实例线程安全,可在同一进程内多线程之间共享。

Consumer

消息消费者,也称为消息订阅者,负责接收并消费消息。

Consumer ID

一类 Consumer 的标识,这类 Consumer 通常接收并消费一类消息,且消费逻辑一致。

Consumer实例

Consumer的一个对象实例,不同的 Consumer 实例可以运行在不同进程内或者不同机器上。一个 Consumer 实例内配置线程池消费消息。

集群消费

一个Consumer ID 所标识的所有 Consumer 平均分摊消费消息。例如某个 Topic 有 9 条消息,一个 Consumer ID 有 3 个 Consumer 实例,那么在集群消费模式下每个实例平均分摊,只消费其中的 3 条消息。

消费消息幂等性处理

消息有可能出现重复消费的情况,由于服务端去重会损失ZMQ性能,所以ZMQ本身不做去重操作,具体的去重需要业务去做。

如果对消息重复消费比较敏感的情况下,业务发送消息的时候可以在message的key值设置一个全局唯一的ID,每次消费消息后可将消息记录本地数据库,下次消费时可以将消息的key与本地存储的消息的key比较,如果重复则不进行消费,不能根据msgId去做消费去重,因为可能出现msgId相同的情况。

订阅关系一致

订阅关系一致是指同一个消费组下的消费者的消费逻辑需要一致,即同一个Consumer_Id下的所有消费者实例的消费逻辑一致。

不然会出现同一个消费组下的消费者没有订阅此topic也会被分配消费队列,这样会导致一个topic下的队列有的可以被消费而有的队列不会被消费。

具体的消费一致性必须满足同一个消费组下的不同消费者实例订阅的Topic需要一致。

消费重试

业务端消费消息失败可以进行消费重试,只需要消费时return ProcessStatus.Retry即可对此条消息进行重试。客户端消费重试次数默认为16次,重试16次后消息将被丢入死信队里不能再被消费。

消费重试次数也可以自定义,可以创建订阅组的时候指定订阅组级别重试次数,同时可以设置具体消息的重试次数,设置方式如下:

msg.putProperty(MessageConst.PROPERTY_MAX_RECONSUME_TIMES, “10”);

消费者消费起点设置

对于新加入的消费者消费会有一个消费起点,现在支持两种方式:

● 从最开始位置消费

● 从最新位置消费

默认方式是从最新位置消费,也就是,我们创建消费者的时候可以直接指定从哪里开始消费。建议从最开始位置消费,不会出现漏消费消息的问题,但是可能会出现重复消费的问题,对于重复消费业务端要做去重。

消费起点设置可以在创建消费者的时候在消费者的属性中设置设置,具体示例如下:

从最开始位置消费:
properties.put(PropertyKey.ConsumeFrom_Where, PropertyKey.ConsumeFrom_First_Value);
或是从最新位置消费
properties.put(PropertyKey.ConsumeFrom_Where, PropertyKey.ConsumeFrom_Last_Value);

消息优先级

由于ZMQ所有消息都是持久化的,所以如果按照优先级来排序,开销会非常大,因此RocketMQ没有特意支持消息优先级,但是可以通过变通的方式实现类似功能,即单独配置一个优先级高的队列,和一个普通优先级的队列, 将不同优先级发送到不同队列即可。

消息大小限制

消息中间件对消息的大小有限制,避免大消息对系统的性能的影响。因此producer和broker都对消息体大小进行了限制,如果超过最大大小,则返回失败。注意 producer和broker的消息限制必须设置一致,系统默认消息大小不能超过4M

消息查询

ZMQ支持对消息进行查询的功能,应用提供消息对应的条件,就可以查询出消息的相关信息。具体提供以下2种方式:

根据消息MessageKey

生产者发送消息时,可以指定消息的业务key,用来表示消息的特征,比如订单ID,充值流水等,具体表示格式类似OrderId:12345,这种key一般是基于应用的,可以重复,但是从业务上来说,可以保证唯一,保证消息查询时直接精确命中。

根据消息的MessageId

生产者发送消息时,在消息发送成功后,ZMQ会返回MessageId,这个是系统生成的ID,具体特殊的含义。应用只需要记录下MessageId,再后续查询时提供,就可以精确查询到这个消息。

顺序消息

ZMQ支持严格的顺序消息,使用顺序消息时需要在创建topic时设置topic的order属性为true, 使用顺序消息在集群模式下如果接收顺序消息的broker宕机了会导致发送消息失败,如果此broker不恢复提供服务则发送顺序消息会一直失败。

使用顺序消息注意事项:使用顺序消息时broker动态新增或是减少的情况下需要先停止客户端,然后在扩容完成后更新完顺序消息topic之后才能重新发送消息。

实现顺序消息还有另外一种方式:某个topic用于收发顺序消息时设置此topic的队列个数为1,使用一个消息生产者往这个topic发送消息然后使用一个消费者订阅此topic,但是这样会大大降低发送和消费消息的效率。

消息回溯

对于消费进度可以进行回溯,可以通过命令行进行操作,具体命令行如下:

./mqadmin resetOffsetByTime -g groupName -t topic -f true -s yyyy-MM-dd#HH:mm:ss:SSS,命令也可参看ZMQ命令行手册。

流量控制

流量控制通过broker级别和topic级别两个级别来控制,可以通过命令设置broker流量的最大值,在创建topic时也可以设置具体topic的流量最大值。

设置和修改broker流量命令:./mqadmin updateBrokerFluxConfig –c clusterName –b brekerName –l limitnumber。

设置topic流量限制只需要在创建topic时设置-l参数即可。

发送消息注意事项

  • 消息属性key设置

每个消息建议设置唯一的key,这样便于后期排查问题,消费时也可以根据key去重。

  • 消息发送失败处理

ZMQ内部具有发送失败重试机制,内部会重试三次,至多会重试三个broker,但是极端情况下也不能保证消息一定发送成功,对于发送消息不可丢的业务建议将发送失败的消息保存本地数据库或其他方式记录本地,之后定时尝试发送这些发送失败的消息,对于发送失败的消息记录日志务必打印sendresult和key字段。

  • 发送消息方式选择

ZMQ发送消息有三种方式,同步、异步、oneway三种方式,可以选择适合自己的方式,比如一些不重要的信息对于发送结果不太敏感的可以使用oneway方式发送,这样可以提高发送效率。

一个RPC调用,通常是这样一个过程

  1. 客户端发送请求到服务器

  2. 服务器处理该请求

  3. 服务器向客户端返回应答

所以一个RPC的耗时时间是上述三个步骤的总和,而某些场景要求耗时非常短,但是对可靠性要求并不高,例如日志收集类应用,此类应用可以采用oneway形式调用

消费消息注意事项

绝大部分消息消费行为属于IO 密集型,即可能是操作数据库,或者调用RPC,这类消费行为的消费速度在于后端数据库或者外系统的吞吐量,通过增加消费并行度,可以提高总的消费吞吐量,但是并行度增加到一定程度,反而会下降

如果发生消息堆积的情况下,消息消费速度赶不上消息发送速度可以尝试丢弃不重要信息。

生产组规划

生产组名用于标识生产者所属的组,一般根据业务系统来规划生产组,给不同的应用规划不同的组名,以区分相应的业务。

对于不同的消息,生产者组还有其他的含义:

  • 普通消息

仅仅用于标示生产者,无实质性的用途

  • 事务消息

对于事务消息,对于超时未确认的消息,需要进行事务回查。回查时,会根据发送事务消息的生产者组,寻找在线的生产者,从列表中随机选取一个组对应的生产者进行回查状态。

  • 授权

如果系统启用了认证授权功能,系统会根据生产组和主题进行权限设置,只有授权的生产组,才能访问授权的主题

消费组规划

消费组,又称订阅组,用于订阅主题,对主题的消息进行消费,一个消费组内的多个消费者共同消费消息,同时ZMQ按照消费组来记录消息的消费进度。

消费组用于标识消费者所属的组,一般根据业务系统以及消费逻辑进行规划,保证消费组内的所有消费者的语义是对等的。

另外,如果系统启用了认证授权功能,系统会根据生产组和主题进行权限设置,只有授权的生产组,才能访问授权的主题

注意:消费组内的所有消费者,必须是完全对等的,订阅相同的主题列表,消费相同主题内的消息。切勿一个消费组内的不同消费者订阅不同的主题。

消费端的并发度

消费端在消费消息时,对于一个订阅组内,可以用于多个运行的消费者实例。订阅组订阅的主题消息,会以负载均衡的方式推送到各个消费者实例。

但是订阅组内的消费者并发度,也就是说能够真正处理消息的消费者个数,是由主题的队列数决定的。主题的一个队列,有且仅会被一个消费者处理。超过主题队列数的消费者,处于等待状态,如果有正在工作的消费者宕机或者推出,等待状态的消费者就会接替退出的消费者的工作,继续处理消息。

主题的队列数,是在创建主题的时候指定的。如果消费者的消息处理性能较低,可以动态增加主题的队列数,增加消费者的并行度,提高消费者集群的处理能力。

定时获取消息

ZMQ不支持每条消息都定时被消费,但是可以针对消息设置延时级别,根据延时级别的不同会在延时一定时间后才能被消费。

对于想要消息在指定时间被消费,应用可以将将消息规划为不同的topic然后应用设置定时任务在指定时间消费相应topic的消息。

认证授权功能使用

使用ZMQ收发消息时想使用认证授权功能首先需要ZMQ启动认证授权功能,具体可以通过命令行或是zmqmanager进行操作,命令的具体使用和zmqmanager的使用可以参考命令行手册或是ZMQ安全校验功能使用手册,基于集群创建用户名和密码,具体校验基于生产者或消费者和topic。

事务消息使用

ZMQ事务消息使用T(try)C(commit)C(cancle)模型实现,发送事务消息分为两阶段,第一步发送try消息,第二步根据本地业务处理决定是提交(commit)或是回滚消息(rollback),如果一分钟内没有提交也没有回滚的情况下ZMQ会触发回查机制对事务消息本地执行状态进行回查并根据回查后的状态对消息进行提交或是回滚。流程图如下:

注意使用producer发送完事务消息不要马上将producer停掉。

results matching ""

    No results matching ""