当前位置: 首页 > news >正文

低价车网站建设北京计算机培训机构前十名

低价车网站建设,北京计算机培训机构前十名,国外做网站被动收入,dede 网站地图生成目录 1:生产者(同步、异步、单向) 1.1:同步发送消息(每发送一条等待mq返回值) 1.2:异步发送消息 1.3:单向发送消息(不管成功失败,只管发送消息&#xff09…

目录

1:生产者(同步、异步、单向)

1.1:同步发送消息(每发送一条等待mq返回值)

1.2:异步发送消息

1.3:单向发送消息(不管成功失败,只管发送消息)

1.4:顺序发送消息

1.5:批量发送消息

1.6:定时发送消息

2:消费者

2.1:push消费

2.2:pull消费


1:生产者(同步、异步、单向)

在了解生产者之前,首先再次查看这个图片。生产者发送消息,围绕生产者的概念和怎么发送消息来解析MQ。生产者有重要的的message元素

Message:包含以下属性

字段名默认值必要性说明
Topicnull必填消息所属 topic 的名称
Bodynull必填消息体
Tagsnull选填消息标签,方便服务器过滤使用。目前只支持每个消息设置一个
Keysnull选填代表这条消息的业务关键词,唯一ID
Flag0选填完全由应用来设置,RocketMQ 不做干预
DelayTimeLevel0选填消息延时级别,0 表示不延时,大于 0 会延时特定的时间才会被消费
WaitStoreMsgOKtrue选填表示消息是否在服务器落盘后才返回应答。mq接受到消息,存入磁盘,然后返回成功或者失败

队列:为了支持高并发和水平扩展,需要对 Topic 进行分区,在 RocketMQ 中这被称为队列,一个 Topic 可能有多个队列,并且可能分布在不同的 Broker 上。保证消息的发送和消费的并发速度。在生产者将消息发送到MQ的broker的时候,这个时候broker的内部维护了队列,保证先进先出。默认一个topic里边有4个读4个写的队列

我们怎么发送消息,生产者发送消息包含同步,异步,单向这三个方面。当然按照功能又扩展出来顺序消息、批量消息、定时消息、事务消息等模式。

1.1:同步发送消息(每发送一条等待mq返回值)

同步发送:每次发送一条,等待mq的返回值成功,然后发送下一条,适合可靠的消息传递,适用范围最广泛。如重要的通知消息、短消息通知等

代码如下:

public class 普通消息发送_同步 {/*** 同步消息发送,发送之后等待服务端返回结果* 消息发送到broker 等待响应,成功后接着发送下一条数据* 保证了消息发送的可靠性** 使用场景:大部分可靠性要求高的场景*/public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {//1:初始化一个生产者,并且设置group是商品组System.out.println("同步发送消息模式");String producerGroup="shop_Group";DefaultMQProducer producer=new DefaultMQProducer(producerGroup);//2:设置nameServerproducer.setNamesrvAddr("localhost:9876");//3:启动producerproducer.start();//4:发送100条消息for (int i = 0; i < 10; i++) {//5:定义消息体Message msg=new Message();//设置消息主题 必填String topic="huyijuTopic";msg.setTopic(topic);//设置消息体 必填String body="同步:"+i;msg.setBody(body.getBytes(StandardCharsets.UTF_8));//设置落盘策略 默认落盘成功返回true 选填msg.setWaitStoreMsgOK(true);//设置消息keys 消息唯一标识 选填msg.setKeys("shop"+i);//设置消息标签 选填msg.setTags("同步");//6:发送数据SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);}//7:发送完消息,关闭生产者producer.shutdown();}}

1.2:异步发送消息

异步发送消息:发送完后不用等待响应,就可以发送第二条消息,通过回调接口,来接受响应成功或者失败。异步发送一般用于链路耗时较长,对响应时间较为敏感的业务场景。例如,视频上传后通知启动转码服务,转码完成后通知推送转码结果等。

代码如下:

public class 普通消息发送_异步 {/*** 异步消息发送模式 发送数据之后不等响应接着发送,需要回调接口,回调接口告知失败 或者成功** 适用场景:适用于发送文件视频等大的文件  节省时间**/public static void main(String[] args) throws Exception {System.out.println("异步发送普通消息");// 初始化一个producer并设置Producer group nameDefaultMQProducer producer = new DefaultMQProducer("shop_Group");// 设置NameServer地址producer.setNamesrvAddr("localhost:9876");// 启动producerproducer.setRetryTimesWhenSendAsyncFailed(0);//重试次数producer.start();for (int i = 0; i < 5; i++) {// 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤String topic="huyijuTopic";Message msg = new Message(topic,"异步",("异步"+i).getBytes(StandardCharsets.UTF_8));// 异步发送消息, 发送结果通过callback返回给客户端producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("数据发送成功:"+sendResult);}@Overridepublic void onException(Throwable e) {System.out.println("数据发送失败:"+e);e.printStackTrace();}});}// 一旦producer不再使用,关闭producerproducer.shutdown();}}

1.3:单向发送消息(不管成功失败,只管发送消息)

单向发送:生产者向mq发送消息,不等待mq的返回值,是否消息接收成功或者失败。生产者只管发送,适用于日志等可靠性不高的场景。发送速度很快,微秒级的速度

public class 普通消息发送_单向 {/*** 单向发送消息模式* 服务方只发送消息 不等服务端响应 也不管回调 发送速度很快 就是只管发送消息  不管成功失败** 适用场景:适用于发送日志,对于数据可靠性要求不高*/public static void main(String[] args) throws Exception {System.out.println("单向发送普通消息");// 初始化一个producer并设置Producer group nameDefaultMQProducer producer = new DefaultMQProducer("shop_Group");// 设置NameServer地址producer.setNamesrvAddr("127.0.0.1:9876");// 启动producerproducer.start();for (int i = 0; i < 10; i++) {final int index = i;// 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤String body="单向"+i;Message msg = new Message("huyijuTopic","单向",body.getBytes(StandardCharsets.UTF_8));// 不管不顾,直接发送,没有返回值producer.sendOneway(msg);}// 一旦producer不再使用,关闭producerproducer.shutdown();}}

1.4:顺序发送消息

我们知道,我们发送的消息,存储到了mq的topic的队列里边,默认的topic是4个队列

根据消息的key将消息轮训的插入队列中,队列的消息能保证FIFO,但是我们并不知道实际具体那条消息在那个队列,无法保证比如订单号是01的所有操作在同一个队列。如下图

在这里插入图片描述

 消费者再消费的时候,无法保证业务的一致性。所有才有了顺序发送,我们传入指定的订单号,只要订单号一直,就一定会存到相同的队列。

在这里插入图片描述

 代码如下:

/*** 顺序发送:SendResult send(Message msg, MessageQueueSelector selector, Object arg)* 根据同一个arg的值存入,相同的队列  队列一共四个,很多数据的时候,根据arg%4 存入指定的队列,先进先出** 适用场景:下单,支付,物流等场景,我们使用orderId作为分区id 会发送到同一个队列 保证顺序** 注意事项:只能有一个生产者,因为分布式环境,多个生产者发送相同的同一个orderId,无法判定先后顺序* 必须是单一的生产者** 如果一个Broker掉线,那么此时队列总数是否会发化?** 如果发生变化,那么同一个 ShardingKey 的消息就会发送到不同的队列上,造成乱序。* 如果不发生变化,那消息将会发送到掉线Broker的队列上,必然是失败的。因此 Apache RocketMQ 提供了两种模式,* 如果要保证严格顺序而不是可用性,创建 Topic 是要指定 -o 参数(--order)为true,表示顺序消息:** sh bin/mqadmin updateTopic -c DefaultCluster -t TopicTest -o true -n 127.0.0.1:9876*/
public class 顺序消息发送 {public static void main(String[] args) {System.out.println("顺序消息发送");DefaultMQProducer producer = new DefaultMQProducer("shop_Group");try {// 设置NameServer地址producer.setNamesrvAddr("127.0.0.1:9876");int a=producer.getDefaultTopicQueueNums();System.out.println("默认的队列大小:"+a);producer.start();for (int i = 0; i < 5; i++) {
//                String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
//                Message msg1 =
//                        new Message("TopicTest", tags[i % tags.length], "KEY" + i,
//                                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));Message msg=new Message();//设置主题String topic="huyijuTopic";msg.setTopic(topic);//设置内容String body="顺序发送"+i;msg.setBody(body.getBytes(RemotingHelper.DEFAULT_CHARSET));//设置keysmsg.setKeys("key"+i);//设置tagsmsg.setTags("顺序发送");//订单id 根据不同的id将消息发送到不同的队列(队列总共4个 取模放入队列) 遵循FIFO
//                int orderId = i%a;
//                System.out.println("订单id:"+orderId);SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> list, Message message, Object arg) {
//                        System.out.println("参数arg:"+arg);//这里参数的send方法的参数一致
//                        System.out.println("参数list:"+list.size());//默认队列是4
//                        System.out.println(message);//根据订单id取模,存入指定的队列 然后返回该队列Integer id = (Integer) arg;int index = id % list.size();MessageQueue messageQueue = list.get(index);return messageQueue;}}, 5);//这里的5就是实际上我们的订单号,根据这个参数将消息存到相同的队列System.out.printf("%s%n", sendResult);}} catch (MQClientException e) {e.printStackTrace();} catch (UnsupportedEncodingException e) {e.printStackTrace();} catch (MQBrokerException e) {e.printStackTrace();} catch (RemotingException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}finally {producer.shutdown();}}
}

1.5:批量发送消息

批量发送消息:将消息批量发送到mq来节省时间

代码如下:

/*** 批量投送消息,增加吞吐率 减少网络调用次数** 需要注意的是批量消息的大小不能超过 1MB**/
public class 批量消息发送 {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {System.out.println("批量消息投送");DefaultMQProducer producer = new DefaultMQProducer("shop_Group");producer.setNamesrvAddr("127.0.0.1:9876");producer.start();String body1="批量任务1";String body2="批量任务2";String body3="批量任务3";Message message1=new Message("huyijuTopic","批量任务",body1.getBytes(StandardCharsets.UTF_8));Message message2=new Message("huyijuTopic","批量任务",body2.getBytes(StandardCharsets.UTF_8));Message message3=new Message("huyijuTopic","批量任务",body3.getBytes(StandardCharsets.UTF_8));List<Message> list=new ArrayList<>();list.add(message1);list.add(message2);list.add(message3);SendResult send = producer.send(list);System.out.println("批量消息投送结束"+send);producer.shutdown();}
}

1.6:定时发送消息

定时消息发送:将消息发送到mq,mq根据定时将消息发送给消费者。切记不要搞反了,消息是发送到mq之后,定时发送个消费者。

代码如下:

/*** 延时消息发送,数据发送到mq之后 指定的时间之后才能消费** 适用场景:定时任务、超时精准投送** 缺点:大量的定时任务 容易造成消息积压  时间一到 消费者亚历山大**/
public class 延时消息发送 {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("shop_Group");// 设置NameServer地址
//        1	1s	10	6min
//        2	5s	11	7min
//        3	10s	12	8min
//        4	30s	13	9min
//        5	1min	14	10minproducer.setNamesrvAddr("127.0.0.1:9876");producer.start();for (int i = 0; i < 10; i++) {String body= "定时任务"+i;Message message=new Message("huyijuTopic","定时任务",body.getBytes(StandardCharsets.UTF_8));message.setDelayTimeLevel(3);//设置的定时任务级别SendResult send = producer.send(message);System.out.println("定时消息返回值:"+send);}producer.shutdown();}
}

2:消费者

消息的消费者很简单,只两种模式

第一种(推送模式):订阅mq服务的topic,mq收到消息把消息推送给消费者,适用范围广

第二种(拉取模式):订阅mq服务的topic,mq收到消息,消费者定时去mq拉取消息

2.1:push消费

普通消息的推送消费


//适用于普通的消息推送,不适合用于顺序消息的消费
public class 消息接收_推送1 {public static void main(String[] args) throws MQClientException {String group = "Shop_Group_push";//1:初始化消息接收组DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group);//2:设置NameServer地址consumer.setNamesrvAddr("localhost:9876");//3:订阅一个或多个topic,并指定tag过滤条件,这里指定*表示接收所有tag的消息String topic="huyijuTopic";consumer.subscribe(topic,"*");consumer.setMessageModel(MessageModel.CLUSTERING);//默认是集群模式//consumer.setMessageModel(MessageModel.BROADCASTING);//这里是广播模式//4.1:注册回调接口来处理从Broker中收到的消息 单个对列保证先进先出//但是多个队列 被消费者并发消费   不能保证消费的顺序性consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {System.out.println(list);for (int i = 0; i < list.size(); i++) {byte[] body = list.get(i).getBody();String resault= null;try {resault = new String(body,"utf-8");} catch (UnsupportedEncodingException e) {e.printStackTrace();}System.out.println("接受huyijuTopic的第"+i+"条消息:"+resault);}// 返回消息消费状态,ConsumeConcurrentlyStatus.CONSUME_SUCCESS为消费成功return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();
//        consumer.shutdown();System.out.println("推送消息接收启动1");}
}

顺序消息的推送消费 

public class 消息接收_顺序消费1 {public static void main(String[] args) throws MQClientException {System.out.println("顺序消费1");String group = "Shop_Group_push";//1:初始化消息接收组DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group);//2:设置NameServer地址consumer.setNamesrvAddr("localhost:9876");//3:订阅一个或多个topic,并指定tag过滤条件,这里指定*表示接收所有tag的消息String topic="huyijuTopic";consumer.subscribe(topic,"*");consumer.setMessageModel(MessageModel.CLUSTERING);//默认是集群模式//consumer.setMessageModel(MessageModel.BROADCASTING);//这里是广播模式//4.2:注册回调接口来处理从Broker中收到的消息 单个对列保证先进先出//但是多个队列 被消费者并发消费,不能保证消费的顺序性 这里使用MessageListenerOrderlyconsumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {//AtomicLong consumeTimes = new AtomicLong(0);consumeTimes.incrementAndGet();for (int i = 0; i < list.size(); i++) {byte[] body = list.get(i).getBody();String resault= null;try {resault = new String(body,"utf-8");} catch (UnsupportedEncodingException e) {e.printStackTrace();}System.out.println("接受huyijuTopic的第"+i+"条消息:"+resault);}//返回消费状态return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();
//        consumer.shutdown();System.out.println("推送消息接收启动1");}
}

2.2:pull消费

消费者一直循环去mq拉取消息

public class 消息接收_拉取消息2 {public static void main(String[] args) throws MQClientException {System.out.println("开始消息拉取");DefaultLitePullConsumer defaultLitePullConsumer = new DefaultLitePullConsumer();defaultLitePullConsumer.setConsumerGroup("Shop_Pull_Group");//订阅主题  拉取消息String topic="huyijuTopic";defaultLitePullConsumer.subscribe(topic, "*");defaultLitePullConsumer.setPullBatchSize(1);defaultLitePullConsumer.setNamesrvAddr("localhost:9876");defaultLitePullConsumer.start();try {while (true) {List<MessageExt> messageExts = defaultLitePullConsumer.poll();System.out.printf("%s%n", messageExts);System.out.println("拉取数据长度:"+messageExts.size());for (int i = 0; i < messageExts.size(); i++) {byte[] body = messageExts.get(i).getBody();String resault= new String(body,"utf-8");System.out.println("拉取消息:"+resault);}}} catch (UnsupportedEncodingException e) {e.printStackTrace();} finally {defaultLitePullConsumer.shutdown();}}
}

以上就是生产者和消费者的消息模型。 


文章转载自:
http://dinncopiave.zfyr.cn
http://dinncocrop.zfyr.cn
http://dinncoblackheart.zfyr.cn
http://dinncopsychologue.zfyr.cn
http://dinncomensurability.zfyr.cn
http://dinncocabdriver.zfyr.cn
http://dinncogenette.zfyr.cn
http://dinncomodernise.zfyr.cn
http://dinncocicely.zfyr.cn
http://dinncoboorish.zfyr.cn
http://dinncogruffly.zfyr.cn
http://dinncomisattribution.zfyr.cn
http://dinncolachrymatory.zfyr.cn
http://dinncopaba.zfyr.cn
http://dinncomillennialist.zfyr.cn
http://dinncointernauts.zfyr.cn
http://dinncoisoagglutination.zfyr.cn
http://dinnconightwork.zfyr.cn
http://dinncosubteenager.zfyr.cn
http://dinncodebutant.zfyr.cn
http://dinncoachromaticity.zfyr.cn
http://dinncoconsignment.zfyr.cn
http://dinncointerfluent.zfyr.cn
http://dinncocesura.zfyr.cn
http://dinncopvm.zfyr.cn
http://dinncorevalve.zfyr.cn
http://dinncosyrphian.zfyr.cn
http://dinncohog.zfyr.cn
http://dinncospeculator.zfyr.cn
http://dinncoautotype.zfyr.cn
http://dinncospug.zfyr.cn
http://dinncoabsolvable.zfyr.cn
http://dinncolaverbread.zfyr.cn
http://dinncocaisson.zfyr.cn
http://dinnconasa.zfyr.cn
http://dinncoresolutive.zfyr.cn
http://dinncoaegean.zfyr.cn
http://dinncocursing.zfyr.cn
http://dinncohippophile.zfyr.cn
http://dinncogemmation.zfyr.cn
http://dinncouri.zfyr.cn
http://dinncofestival.zfyr.cn
http://dinncovolsci.zfyr.cn
http://dinncoindecomposable.zfyr.cn
http://dinncorigmarole.zfyr.cn
http://dinncolexicographer.zfyr.cn
http://dinncoparageusia.zfyr.cn
http://dinncorestive.zfyr.cn
http://dinncocountertide.zfyr.cn
http://dinncodhole.zfyr.cn
http://dinncospiroid.zfyr.cn
http://dinncosheryl.zfyr.cn
http://dinncopolemically.zfyr.cn
http://dinncolapsable.zfyr.cn
http://dinncoinvalidity.zfyr.cn
http://dinncophotogene.zfyr.cn
http://dinncoallodium.zfyr.cn
http://dinncoarena.zfyr.cn
http://dinncofreshener.zfyr.cn
http://dinncosurgery.zfyr.cn
http://dinncogetparms.zfyr.cn
http://dinncoinapplication.zfyr.cn
http://dinncoquadrominium.zfyr.cn
http://dinncosimulacra.zfyr.cn
http://dinncosantero.zfyr.cn
http://dinncowidowerhood.zfyr.cn
http://dinncomussulman.zfyr.cn
http://dinncopromotional.zfyr.cn
http://dinncoindeclinable.zfyr.cn
http://dinncoinotropic.zfyr.cn
http://dinncoindisputably.zfyr.cn
http://dinncoflaked.zfyr.cn
http://dinncosupersaturate.zfyr.cn
http://dinncoinaudibly.zfyr.cn
http://dinncoclimacterical.zfyr.cn
http://dinncoxenomorphic.zfyr.cn
http://dinncokurdish.zfyr.cn
http://dinncoempleomania.zfyr.cn
http://dinncolibidinous.zfyr.cn
http://dinncosaltchucker.zfyr.cn
http://dinncokolinsky.zfyr.cn
http://dinncomotel.zfyr.cn
http://dinncodisembosom.zfyr.cn
http://dinncoichthyophagy.zfyr.cn
http://dinncospermophile.zfyr.cn
http://dinncochardin.zfyr.cn
http://dinncohuebnerite.zfyr.cn
http://dinncobright.zfyr.cn
http://dinncozurich.zfyr.cn
http://dinncocrackling.zfyr.cn
http://dinncostakeout.zfyr.cn
http://dinncooverhead.zfyr.cn
http://dinncotetrabranchiate.zfyr.cn
http://dinncomiri.zfyr.cn
http://dinncoiago.zfyr.cn
http://dinncoknub.zfyr.cn
http://dinncoaubergiste.zfyr.cn
http://dinncocommutator.zfyr.cn
http://dinncolongboat.zfyr.cn
http://dinncoeducationally.zfyr.cn
http://www.dinnco.com/news/118532.html

相关文章:

  • 流行的动态网站开发语言介绍seo技术教程博客
  • html网站架设郑州网站建设七彩科技
  • 欧美网站设计欣赏线上营销公司
  • 白云优化网站建设河北百度推广seo
  • 地方网站运营方案短视频运营
  • 建一个购物网站优秀企业网站模板
  • 肥西县建设官方局网站互联网网站
  • 设计师设计网电商seo是什么
  • 淘宝优惠券网站怎么做 知乎长沙整站优化
  • 网站怎么做微信接口新软件推广平台
  • 免费咨询承诺书aso优化报价
  • 上海网站建设广丰网站seo
  • 免费做手机网站建设本溪seo优化
  • 外贸公司网站开发长尾关键词网站
  • 编程猫少儿编程网站怎么做百度推广
  • asp.net 手机网站模板百度知道怎么赚钱
  • 做网站花费五合一网站建设
  • 网页版word在线编辑如何对一个网站进行seo
  • 网站设计培训学校网页设计框架图
  • 德州网站开发湖北seo网站推广
  • 廊坊做网站的电话广告公司网上接单平台
  • cpa广告联盟网站建设南通网络推广
  • 太湖手机网站建设绍兴百度seo排名
  • 设计一个个人网站的基本步骤百度竞价运营
  • 资阳网站设计必应搜索推广
  • 古玩网站建设意义百度 营销推广靠谱吗
  • 河北省建设厅政府网站网站推广要点
  • 做阿里云网站的公司英文外链seo兼职
  • 义乌网站备案国家大事新闻近三天
  • 经典网站欣赏怎么在百度上注册店铺