生产者

发送普通消息

Producer对象对于一个应用来说,可以只使用一个实例。在应用关闭时,一般以优雅的方式来关闭Producer,需要调用shutdown方法。

Producer producer = null;
try {
    //创建生产者工厂
    MQClientFactory factory = new MQClientFactoryImpl();
    //生产者属性信息
    Properties properties = new Properties();
    properties.put(PropertyKey.Producer_Id, "PID_001");
    properties.put(PropertyKey.Namesrv_Addr, "10.45.7.107:9876");
    //创建生产者
    producer = factory.createProducer(properties);
    //启动生产者
    producer.start();
    //构造消息
    MQMessage msg = new MQMessage("testTopic", "", "OrderId:12345", "new client api".getBytes());
    //发送消息
ProduceResult result = producer.send(msg);
    System.out.println(result);
} finally {
    if (null != producer) {
        //关闭生产者
        producer.shutdown();
    }
}

发送事务消息

TransactionProducer对象对于一个应用来说,需要按照业务来区分,因为事务消息涉及到事务补偿的需求,ZMQ需要回查业务的事务状态。回查的方式,就是通过Producer_Id来回查的,必须确保此TransactionProducer能够确认对应的Producer_Id发送的事务状态。另外,在应用关闭时,一般以优雅的方式来关闭Producer,需要调用shutdown方法。

TransactionProducer producer = null;
try {
    MQClientFactory factory = new MQClientFactoryImpl();
    Properties properties = new Properties();
    properties.put(PropertyKey.Producer_Id, "PID_001");
    properties.put(PropertyKey.Namesrv_Addr, "10.45.7.107:9876");
    TransactionStateFeedback feedback = new TransactionStateFeedback() {
        @Override
        public TransactionState checkState(MQMessage message) {
            // TODO check the transaction state by the message.
            return TransactionState.Commit;
        }
    };
    producer = factory.createTransactionProducer(properties, feedback);
    producer.start();
    MQMessage msg = new MQMessage("testTopic", "", "OrderId:12345", "new client api".getBytes());
    TransactionProduceResult tryResult = producer.sendTry(msg);
        // TODO do the business transaction.
    // service.do();
    producer.sendCommit(tryResult);
    System.out.println(tryResult);
} finally {
    if (null != producer) {
        producer.shutdown();
    }
}

主要分为4个阶段:

1.发送try消息

TransactionProduceResult tryResult = producer.sendTry(msg);

2.提交事务消息

producer.sendCommit(tryResult);

3.回滚事务消息

producer.sendRollback(tryResult, “sample error”);

4.事务状态回查

TransactionStateFeedback feedback = new TransactionStateFeedback() {
    @Override
    public TransactionState checkState(MQMessage message) {
        // TODO check the transaction state by the message.
        return TransactionState.Commit;
    }
};

消费者

集群消费方式(默认消费方式)

MQClientFactory factory = new MQClientFactoryImpl();
//消费者的属性信息
Properties properties = new Properties();
properties.put(PropertyKey.Consumer_Id, "CID_001");
properties.put(PropertyKey.Consumer_Worker_Size, "10");
properties.put(PropertyKey.Namesrv_Addr, "10.45.7.107:9876");
//消息处理器
final MQMessageHandler handler = new MQMessageHandler() {
    @Override
    public ProcessStatus process(MQMessage message, MessageHandlerContext context) {
        System.out.println(message.getTopic() + " " + new String(message.getBody(), Charset.forName("UTF-8")));
        return ProcessStatus.Done;
    }
};
Consumer consumer = factory.createConsumer(properties);
//订阅消息,可以调用多次,以订阅不同的topic
consumer.subscribe("testTopic", handler);
consumer.start();

客户端集成spring使用

客户端可以集成spring进行使用,方便使用spring框架进行开发的项目,具体有普通消息生产者与spring集成、顺序消息生产者与spring集成、事务消息生产者与spring集成、普通消息消费者与spring集成、顺序消息消费者与spring集成,具体使用配置方法以及使用代码示例详见《ZMQ客户端集成Spring开发手册》。

results matching ""

    No results matching ""