生产者
发送普通消息
Producer对象对于一个应用来说,可以只使用一个实例。在应用关闭时,一般以优雅的方式来关闭Producer,需要调用shutdown方法。
Producer producer = null;
try {
//创建生产者工厂
MQClientFactory factory = new MQClientFactoryImpl();
//生产者属性信息
Properties properties = new Properties();
properties.put(PropertyKey.Producer_Id, "PID_001");
properties.put(PropertyKey.Namesrv_Addr, "10.45.7.107:9876");
//创建生产者
producer = factory.createProducer(properties);
//启动生产者
producer.start();
//构造消息
MQMessage msg = new MQMessage("testTopic", "", "OrderId:12345", "new client api".getBytes());
//发送消息
ProduceResult result = producer.send(msg);
System.out.println(result);
} finally {
if (null != producer) {
//关闭生产者
producer.shutdown();
}
}
发送事务消息
TransactionProducer对象对于一个应用来说,需要按照业务来区分,因为事务消息涉及到事务补偿的需求,ZMQ需要回查业务的事务状态。回查的方式,就是通过Producer_Id来回查的,必须确保此TransactionProducer能够确认对应的Producer_Id发送的事务状态。另外,在应用关闭时,一般以优雅的方式来关闭Producer,需要调用shutdown方法。
TransactionProducer producer = null;
try {
MQClientFactory factory = new MQClientFactoryImpl();
Properties properties = new Properties();
properties.put(PropertyKey.Producer_Id, "PID_001");
properties.put(PropertyKey.Namesrv_Addr, "10.45.7.107:9876");
TransactionStateFeedback feedback = new TransactionStateFeedback() {
@Override
public TransactionState checkState(MQMessage message) {
// TODO check the transaction state by the message.
return TransactionState.Commit;
}
};
producer = factory.createTransactionProducer(properties, feedback);
producer.start();
MQMessage msg = new MQMessage("testTopic", "", "OrderId:12345", "new client api".getBytes());
TransactionProduceResult tryResult = producer.sendTry(msg);
// TODO do the business transaction.
// service.do();
producer.sendCommit(tryResult);
System.out.println(tryResult);
} finally {
if (null != producer) {
producer.shutdown();
}
}
主要分为4个阶段:
1.发送try消息
TransactionProduceResult tryResult = producer.sendTry(msg);
2.提交事务消息
producer.sendCommit(tryResult);
3.回滚事务消息
producer.sendRollback(tryResult, “sample error”);
4.事务状态回查
TransactionStateFeedback feedback = new TransactionStateFeedback() {
@Override
public TransactionState checkState(MQMessage message) {
// TODO check the transaction state by the message.
return TransactionState.Commit;
}
};
消费者
集群消费方式(默认消费方式)
MQClientFactory factory = new MQClientFactoryImpl();
//消费者的属性信息
Properties properties = new Properties();
properties.put(PropertyKey.Consumer_Id, "CID_001");
properties.put(PropertyKey.Consumer_Worker_Size, "10");
properties.put(PropertyKey.Namesrv_Addr, "10.45.7.107:9876");
//消息处理器
final MQMessageHandler handler = new MQMessageHandler() {
@Override
public ProcessStatus process(MQMessage message, MessageHandlerContext context) {
System.out.println(message.getTopic() + " " + new String(message.getBody(), Charset.forName("UTF-8")));
return ProcessStatus.Done;
}
};
Consumer consumer = factory.createConsumer(properties);
//订阅消息,可以调用多次,以订阅不同的topic
consumer.subscribe("testTopic", handler);
consumer.start();
客户端集成spring使用
客户端可以集成spring进行使用,方便使用spring框架进行开发的项目,具体有普通消息生产者与spring集成、顺序消息生产者与spring集成、事务消息生产者与spring集成、普通消息消费者与spring集成、顺序消息消费者与spring集成,具体使用配置方法以及使用代码示例详见《ZMQ客户端集成Spring开发手册》。