当前位置: 首页 > news >正文

石家庄官网建设360优化大师下载官网

石家庄官网建设,360优化大师下载官网,做asp网站教程,南宁seo品牌费用是多少RocketMQ支持的消息类型有三种:普通消息、顺序消息、延时消息、事务消息。以下内容的代码部分都是基于rocketmq-spring-boot-starter做的。 普通消息 普通消息是一种无序消息,消息分布在各个MessageQueue当中,以保证效率为第一使命。这种消息…

        RocketMQ支持的消息类型有三种:普通消息、顺序消息、延时消息、事务消息。以下内容的代码部分都是基于rocketmq-spring-boot-starter做的。

普通消息

        普通消息是一种无序消息,消息分布在各个MessageQueue当中,以保证效率为第一使命。这种消息适用于对顺序没有要求的基础消费需求。这里的Topic和MessageQueue是多对多关系。

// 生产者
public static void main(String[] args) throws MQClientException {DefaultMQProducer producer = new DefaultMQProducer("rmq-group");producer.setNamesrvAddr("172.16.200.38:9876");producer.setInstanceName("producer");producer.start();try {for (int i = 0; i < 10; i++) {Thread.sleep(1000);Message msg = new Message("Topic-test", "testTag", (new Date() + " RocketMQ test msg " + i).getBytes());SendResult sendResult = producer.send(msg);System.out.println(sendResult.getMsgId());System.out.println(sendResult.getMessageQueue());System.out.println(sendResult.getSendStatus());System.out.println(sendResult.getOffsetMsgId());System.out.println(sendResult.getQueueOffset());System.out.println();System.out.println("============================");}} catch (Exception e) {e.printStackTrace();}producer.shutdown();
}// 消费者
public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");consumer.setNamesrvAddr("172.16.200.38:9876");consumer.setInstanceName("consumer");consumer.subscribe("Topic-test", "*");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt messageExt : msgs) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(messageExt.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者consumer.start();
}

顺序消息

        当我开始对消息的时序性有要求的时候,普通消息就无法满足我们的需求了。当我们要求顺序消费的时候,我们的Topic就不能采用多对多的形式,存放在多个MessageQueue中,而是需要牺牲一部分性能,存放在一个MessageQueue中。

// 生产者
public static void main(String[] args) throws MQClientException {DefaultMQProducer producer = new DefaultMQProducer("rmq-group");producer.setNamesrvAddr("172.16.200.38:9876");producer.setInstanceName("producer");producer.start();try {for (int i = 0; i < 10; i++) {Thread.sleep(1000);Message msg = new Message("Topic-test", "testTag", (new Date() + " RocketMQ test msg " + i).getBytes());SendResult sendResult = producer.send(msg, (mqs, msg1, arg) -> mqs.get(0), null);System.out.println(sendResult.getMsgId());System.out.println(sendResult.getMessageQueue());System.out.println(sendResult.getSendStatus());System.out.println(sendResult.getOffsetMsgId());System.out.println(sendResult.getQueueOffset());System.out.println("RocketMQ test msg " + i);System.out.println("============================");}} catch (Exception e) {e.printStackTrace();}producer.shutdown();
}// 消费者
public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");consumer.setNamesrvAddr("172.16.200.38:9876");consumer.setInstanceName("consumer");consumer.subscribe("Topic-test", "*");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);// 跟普通消费不同的地方,这里采用了顺序消费方法。consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {for (MessageExt messageExt : msgs) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(messageExt.getBody()));}return ConsumeOrderlyStatus.SUCCESS;}});//启动消费者consumer.start();
}

 延时消息

        通过指定的通知时间间隔,让消息不会立刻被消费者收到。

// 生产者
public static void main(String[] args) throws MQClientException {DefaultMQProducer producer = new DefaultMQProducer("rmq-group");producer.setNamesrvAddr("172.16.200.38:9876");producer.setInstanceName("producer");producer.start();try {for (int i = 0; i < 10; i++) {Thread.sleep(1000);Message msg = new Message("Topic-test", "testTag", (new Date() + " RocketMQ test msg " + i).getBytes());// 指定延时等级// DelayTimeLevel对应的延时时间在服务端定义:rocketmq/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java// private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";msg.setDelayTimeLevel(4);// 按毫秒延时msg.setDelayTimeMs(1L);// 按秒延时msg.setDelayTimeSec(1);SendResult sendResult = producer.send(msg);System.out.println(sendResult.getMsgId());System.out.println(sendResult.getMessageQueue());System.out.println(sendResult.getSendStatus());System.out.println(sendResult.getOffsetMsgId());System.out.println(sendResult.getQueueOffset());System.out.println();System.out.println("============================");}} catch (Exception e) {e.printStackTrace();}producer.shutdown();
}// 消费者
public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");consumer.setNamesrvAddr("172.16.200.38:9876");consumer.setInstanceName("consumer");consumer.subscribe("Topic-test", "*");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt messageExt : msgs) {System.out.printf("Receive New Messages: %s At %s %n", new String(messageExt.getBody()), new Date());}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者consumer.start();
}

30秒的延迟

事务消息

        事务消息是RocketMQ提供的一种高级的消息类型,支持在分布式场景下保障消息生产和本地事务的最终一致性。

事务消息的生命周期

        (1)初始化:半事务消息被生产者构建并完成初始化,待发送到RocketMQ服务端的状态。

        (2)事务待提交:半事务消息被发送到服务端,和普通消息不同,半事务消息并不会直接被服务端持久化,而是会被单独存储到事务存储系统中,等待第二阶段本地事务返回执行结果后再提交。此时消息对下游消费者不可见。

        (3)消息回滚:第二阶段如果事务执行结果明确为回滚,服务端会将半事务消息回滚,该事务消息流程终止。

        (4)提交事务待消费:第二阶段如果事务执行结果明确为提交,服务端会将半事务消息重新存储到普通存储系统中,此时消息对下游消费者可见。

        (5)消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。此时服务端会等待消费者完成消费并提交消费结果,如果一段时间后没有收到消费者的响应,服务端会对消息进行重试处理。

        (6)消费完成:消费者完成了消费动作,并向服务端提交了消费结果,服务端标记当前消息已经被处理。

        (7)消息删除:服务端按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。

// 生产者
public static void main(String[] args) throws MQClientException {TransactionMQProducer producer = new TransactionMQProducer("rmq-group");TransactionListener listener = new TransactionListenerImpl();producer.setNamesrvAddr("172.16.200.38:9876");producer.setInstanceName("producer");producer.setTransactionListener(listener);producer.start();try {for (int i = 0; i < 10; i++) {Thread.sleep(1000);Message msg = new Message("Topic-test", "testTag", (new Date() + " RocketMQ test msg " + i).getBytes());SendResult sendResult = producer.sendMessageInTransaction(msg, null);System.out.println(new Date());System.out.println(sendResult.getMsgId());System.out.println(sendResult.getMessageQueue());System.out.println(sendResult.getSendStatus());System.out.println(sendResult.getOffsetMsgId());System.out.println(sendResult.getQueueOffset());System.out.println();System.out.println("============================");}} catch (Exception e) {e.printStackTrace();}producer.shutdown();
}// 消费者
public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");consumer.setNamesrvAddr("172.16.200.38:9876");consumer.setInstanceName("consumer");consumer.subscribe("Topic-test", "*");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt messageExt : msgs) {System.out.printf("Receive New Messages: %s At %s %n", new String(messageExt.getBody()), new Date());}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者consumer.start();
}

 控制台打印

// 生产者控制台打印
Wed Jul 26 17:55:45 CST 2023 RocketMQ test msg 0 : executeLocalTransaction:事务执行失败,回滚
Wed Jul 26 17:55:45 CST 2023
7F00000154F073D16E938497DE420000
MessageQueue [topic=Topic-test, brokerName=localhost.localdomain, queueId=1]
SEND_OK
null
93============================
Wed Jul 26 17:55:46 CST 2023 RocketMQ test msg 1 : executeLocalTransaction:未知状态
Wed Jul 26 17:55:46 CST 2023
7F00000154F073D16E938497E2340001
MessageQueue [topic=Topic-test, brokerName=localhost.localdomain, queueId=2]
SEND_OK
null
94============================
Wed Jul 26 17:55:47 CST 2023 RocketMQ test msg 2 : executeLocalTransaction:事务执行失败,回滚
Wed Jul 26 17:55:47 CST 2023
7F00000154F073D16E938497E6230002
MessageQueue [topic=Topic-test, brokerName=localhost.localdomain, queueId=3]
SEND_OK
null
95============================
Wed Jul 26 17:55:48 CST 2023 RocketMQ test msg 3 : executeLocalTransaction:事务执行成功,提交
Wed Jul 26 17:55:48 CST 2023
7F00000154F073D16E938497EA180003
MessageQueue [topic=Topic-test, brokerName=localhost.localdomain, queueId=0]
SEND_OK
null
96============================
Wed Jul 26 17:55:49 CST 2023 RocketMQ test msg 4 : executeLocalTransaction:事务执行失败,回滚
Wed Jul 26 17:55:49 CST 2023
7F00000154F073D16E938497EE0B0004
MessageQueue [topic=Topic-test, brokerName=localhost.localdomain, queueId=1]
SEND_OK
null
97============================
Wed Jul 26 17:55:50 CST 2023 RocketMQ test msg 5 : executeLocalTransaction:事务执行成功,提交
Wed Jul 26 17:55:50 CST 2023
7F00000154F073D16E938497F1F70005
MessageQueue [topic=Topic-test, brokerName=localhost.localdomain, queueId=2]
SEND_OK
null
98============================
Wed Jul 26 17:55:51 CST 2023 RocketMQ test msg 6 : executeLocalTransaction:事务执行成功,提交
Wed Jul 26 17:55:51 CST 2023
7F00000154F073D16E938497F5EC0006
MessageQueue [topic=Topic-test, brokerName=localhost.localdomain, queueId=3]
SEND_OK
null
99============================
Wed Jul 26 17:55:52 CST 2023 RocketMQ test msg 7 : executeLocalTransaction:事务执行失败,回滚
Wed Jul 26 17:55:52 CST 2023
7F00000154F073D16E938497F9E50007
MessageQueue [topic=Topic-test, brokerName=localhost.localdomain, queueId=0]
SEND_OK
null
100============================
Wed Jul 26 17:55:53 CST 2023 RocketMQ test msg 8 : executeLocalTransaction:事务执行失败,回滚
Wed Jul 26 17:55:53 CST 2023
7F00000154F073D16E938497FDD30008
MessageQueue [topic=Topic-test, brokerName=localhost.localdomain, queueId=1]
SEND_OK
null
101============================
Wed Jul 26 17:55:54 CST 2023 RocketMQ test msg 9 : executeLocalTransaction:事务执行成功,提交
Wed Jul 26 17:55:54 CST 2023
7F00000154F073D16E93849801C90009
MessageQueue [topic=Topic-test, brokerName=localhost.localdomain, queueId=2]
SEND_OK
null
102============================// 消费者控制台打印
Receive New Messages: Wed Jul 26 17:55:50 CST 2023 RocketMQ test msg 5 At Wed Jul 26 17:55:50 CST 2023 
Receive New Messages: Wed Jul 26 17:55:51 CST 2023 RocketMQ test msg 6 At Wed Jul 26 17:55:51 CST 2023 
Receive New Messages: Wed Jul 26 17:55:54 CST 2023 RocketMQ test msg 9 At Wed Jul 26 17:55:54 CST 2023 

这里消费者只收到了执行事务成功的5,6,9。

http://www.dinnco.com/news/76676.html

相关文章:

  • 案例展示网站护肤品推广软文
  • seo建站还有市场吗网站seo站长工具
  • 厦门模板网站建设建个网站需要多少钱
  • 电脑从做系统怎么找回以前登录的网站百度关键词优化的意思
  • 做黑网站吗百度seo推广首选帝搜软件
  • 灌南网页定制广州 关于进一步优化
  • wordpress制作网站模板优秀的网络搜索引擎营销案例
  • 重庆企业网站制作外包百度网站推广关键词怎么查
  • 兰州装修公司哪家口碑最好搜索引擎优化方法包括
  • 郑州网站建设批发今日时政新闻
  • 工作总结ppt模板免费下载 素材seo技术培训宁波
  • 北京网站改版公司江苏seo平台
  • 网站生成器下载nba最新交易信息
  • 访问自己做的网站吗优化教程网站推广排名
  • 想做个网站报价蔬菜价格怎么做黄冈seo
  • 临沂市建设局网站勘察设计哪里有做网络推广的
  • 做网站用的软件免费入驻的卖货平台
  • 如何建立公司网站多少钱开发小程序
  • 网站建设经费方案网络推广公司网站
  • 做网上水果网站的调查广东省最新疫情
  • 第一次装wordpress上海seo网站优化
  • 做自媒体的有哪些素材网站我想做电商
  • 抵押网站建设方案网页设计作品
  • 创业做招商加盟类网站赚钱网站如何推广出去
  • 网站开发付款方式和比例上海搜索排名优化公司
  • 专业的做网站的搜索引擎优化技术有哪些
  • 有服务器如何做网站企业培训权威机构
  • 呼伦贝尔旅游包车网站咋做网络推广工具有哪些
  • wordpress smtp配置搜索引擎优化培训
  • 三门峡市住房的城乡建设局网站win10必做的优化