当前位置: 首页 > news >正文

母婴网站建设 社区百度推广后台登录入口官网

母婴网站建设 社区,百度推广后台登录入口官网,郑州医疗网站建设,克隆网站到wordpress修改读、写队列 创建主题时,可以指定 writeQueueNums(写队列的个数)、readQueueNums(读队列的个数)。生产者发送消息时,使用写队列的个数返回路由信息;消费者消费消息时,使用读队列的个…

读、写队列

创建主题时,可以指定 writeQueueNums(写队列的个数)、readQueueNums(读队列的个数)。生产者发送消息时,使用写队列的个数返回路由信息;消费者消费消息时,使用读队列的个数返回路由信息。在物理文件层面,只有写队列才会创建文件。默认读、写队列的个数都是 16。

比如写队列的个数是 16,则创建 16 个文件夹,代表 0 - 15;读队列的个数是 8,则只会消费 0 - 7 这 8 个队列中的消息。

要求 readQueueNums >= writeQueueNums,最佳方案是两者相等。RocketMQ 设置读、写队列的目的是方便队列的扩容、缩容。

比如在原来指定读、写队列都是 16 的基础上进行扩容到 8 个。在不需要重启应用程序的情况下,先缩容写队列,由 0 - 15 缩容至 0 - 7。等到 8 - 15 队列中的消息全部消费完之后,再缩容读队列,由 0 - 15 缩容至 0 - 7。

队列的选择

方式一、指定 queueId 来选择具体的队列

DefaultMQProducer 的 send / sendOneway 方法中可携带 MessageQueue 参数。而 MessageQueue 可以指定 topic、queueId、brokerName 三个参数。

public MessageQueue(String topic, String brokerName, int queueId) {this.topic = topic;this.brokerName = brokerName;this.queueId = queueId;
}

方式二、根据 MessageQueueSelector 策略来选择队列

DefaultMQProducer 的 send / sendOneway 方法中可携带 MessageQueueSelector 参数。

public SendResult send(Message msg, MessageQueueSelector selector, Object arg);
public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback);

RocketMQ 内部定义了三种 MessageQueueSelector 策略。

  • SelectMessageQueueByHash:基于方法参数arg的哈希值,对队列总数取模,选择对应下标的队列。
  • SelectMessageQueueByRandom:基于队列总数生成一个随机数,选择对应下标的队列。
  • SelectMessageQueueByMachineRoom:返回空。
public class SelectMessageQueueByHash implements MessageQueueSelector {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {// 取arg方法参数的哈希值,再对队列总数取模int value = arg.hashCode() % mqs.size();if (value < 0) {value = Math.abs(value);}// 选择对应的队列return mqs.get(value);}
}
public class SelectMessageQueueByRandom implements MessageQueueSelector {private Random random = new Random(System.currentTimeMillis());@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {// 基于队列总数生成一个随机数int value = random.nextInt(mqs.size());// 选择对应的队列return mqs.get(value);}
}

方式三、基于Broker的可用性采取轮询的策略选择队列

DefaultMQProducer 的 send / sendOneway 方法可以不携带 MessageQueue、MessageQueueSelector,简单看下这种方式的队列是如何选择。

这种方式下的 send / sendOneway 方法中内部会调用如下方法:

MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);

进入方法内部,看一下处理逻辑。

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
}

MQFaultStrategy

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {// 如果开启了发送延迟规避机制,默认falseif (this.sendLatencyFaultEnable) {try {int index = tpInfo.getSendWhichQueue().incrementAndGet();for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();if (pos < 0)pos = 0;// 获取指定下标的队列MessageQueue mq = tpInfo.getMessageQueueList().get(pos);// 如果队列对应的Broker判定为可用,则返回该队列;否则基于轮询的策略选择下一个队列重复上述步骤进行判断if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))return mq;}final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();// 根据BrokerName获取存储的写队列的总数int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);if (writeQueueNums > 0) {final MessageQueue mq = tpInfo.selectOneMessageQueue();if (notBestBroker != null) {mq.setBrokerName(notBestBroker);mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);}return mq;} else {latencyFaultTolerance.remove(notBestBroker);}} catch (Exception e) {log.error("Error occurred when selecting message queue", e);}return tpInfo.selectOneMessageQueue();}return tpInfo.selectOneMessageQueue(lastBrokerName);
}

LatencyFaultToleranceImpl

@Override
public boolean isAvailable(final String name) {// 从缓存中获取指定brokerName对应的FaultItem实例final FaultItem faultItem = this.faultItemTable.get(name);// 如果缓存命中if (faultItem != null) {// 判断是否可用,即当前时间-startTimestamp是否>=0return faultItem.isAvailable();}return true;
}@Override
public String pickOneAtLeast() {final Enumeration<FaultItem> elements = this.faultItemTable.elements();List<FaultItem> tmpList = new LinkedList<FaultItem>();while (elements.hasMoreElements()) {final FaultItem faultItem = elements.nextElement();tmpList.add(faultItem);}if (!tmpList.isEmpty()) {Collections.sort(tmpList);final int half = tmpList.size() / 2;if (half <= 0) {return tmpList.get(0).getName();} else {final int i = this.whichItemWorst.incrementAndGet() % half;return tmpList.get(i).getName();}}return null;
}@Override
public void remove(final String name) {this.faultItemTable.remove(name);
}

TopicPublishInfo

public MessageQueue selectOneMessageQueue(final String lastBrokerName) {if (lastBrokerName == null) {return selectOneMessageQueue();} else {for (int i = 0; i < this.messageQueueList.size(); i++) {int index = this.sendWhichQueue.incrementAndGet();int pos = Math.abs(index) % this.messageQueueList.size();if (pos < 0)pos = 0;MessageQueue mq = this.messageQueueList.get(pos);if (!mq.getBrokerName().equals(lastBrokerName)) {return mq;}}return selectOneMessageQueue();}
}public MessageQueue selectOneMessageQueue() {int index = this.sendWhichQueue.incrementAndGet();int pos = Math.abs(index) % this.messageQueueList.size();if (pos < 0)pos = 0;return this.messageQueueList.get(pos);
}

额外分析一下 DefaultMQProducerImpl 的 updateFaultItem 方法。

public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation);
}

接着看下 MQFaultStrategy 的 updateFaultItem 方法。

private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {// 如果开启了发送延迟规避机制if (this.sendLatencyFaultEnable) {// 根据延迟时间计算不可用的时间long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);// 更新faultItemTable缓存this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);}
}private long computeNotAvailableDuration(final long currentLatency) {for (int i = latencyMax.length - 1; i >= 0; i--) {// 根据延迟时间计算不可用的时间if (currentLatency >= latencyMax[i])return this.notAvailableDuration[i];}return 0;
}

接着分析 LatencyFaultToleranceImpl 的 updateFaultItem 方法的处理逻辑。

@Override
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {// 从缓存中获取指定BrokerName对应的FaultItem实例FaultItem old = this.faultItemTable.get(name);// 如果缓存未命中if (null == old) {// 构造 FaultItem 实例final FaultItem faultItem = new FaultItem(name);// 更新 currentLatecy、startTimestamp 属性faultItem.setCurrentLatency(currentLatency);faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);// 更新缓存old = this.faultItemTable.putIfAbsent(name, faultItem);if (old != null) {// 更新 currentLatecy、startTimestamp 属性old.setCurrentLatency(currentLatency);old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);}// 如果缓存命中  } else {// 更新 currentLatecy、startTimestamp 属性old.setCurrentLatency(currentLatency);old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);}
}

文章转载自:
http://dinncohypsometer.bkqw.cn
http://dinncosinker.bkqw.cn
http://dinncopainfulness.bkqw.cn
http://dinncodishclout.bkqw.cn
http://dinncoschistocytosis.bkqw.cn
http://dinncolatinise.bkqw.cn
http://dinncoscrobiculate.bkqw.cn
http://dinncofulgurite.bkqw.cn
http://dinncobicyclist.bkqw.cn
http://dinncoweigelia.bkqw.cn
http://dinncotulipomania.bkqw.cn
http://dinncoindiscernible.bkqw.cn
http://dinnconubile.bkqw.cn
http://dinncoevergreen.bkqw.cn
http://dinncoslave.bkqw.cn
http://dinncoslubber.bkqw.cn
http://dinncoauspicate.bkqw.cn
http://dinncodidapper.bkqw.cn
http://dinncopalisander.bkqw.cn
http://dinncostreptobacillus.bkqw.cn
http://dinncocustomshouse.bkqw.cn
http://dinncopoenology.bkqw.cn
http://dinncoabandon.bkqw.cn
http://dinncoriverboatman.bkqw.cn
http://dinncoexocrine.bkqw.cn
http://dinncodeterge.bkqw.cn
http://dinncopalmary.bkqw.cn
http://dinncocarpology.bkqw.cn
http://dinncoabstersive.bkqw.cn
http://dinncospadille.bkqw.cn
http://dinncocouvade.bkqw.cn
http://dinncolouie.bkqw.cn
http://dinncotallboy.bkqw.cn
http://dinncosymptomatical.bkqw.cn
http://dinncokosher.bkqw.cn
http://dinncodiatomic.bkqw.cn
http://dinncoquib.bkqw.cn
http://dinncoheterology.bkqw.cn
http://dinncounforfeitable.bkqw.cn
http://dinncocandace.bkqw.cn
http://dinncocrotcheteer.bkqw.cn
http://dinncoastatki.bkqw.cn
http://dinncopseudoscience.bkqw.cn
http://dinncovulcanise.bkqw.cn
http://dinncosuborning.bkqw.cn
http://dinncohoagie.bkqw.cn
http://dinncokremlinologist.bkqw.cn
http://dinncosimar.bkqw.cn
http://dinncodeactivate.bkqw.cn
http://dinncopetard.bkqw.cn
http://dinncoenring.bkqw.cn
http://dinncorrc.bkqw.cn
http://dinncofalsehearted.bkqw.cn
http://dinncotraverser.bkqw.cn
http://dinncomephistophelean.bkqw.cn
http://dinncoteapoy.bkqw.cn
http://dinncomethimazole.bkqw.cn
http://dinncoirreligionist.bkqw.cn
http://dinncosoakage.bkqw.cn
http://dinncocapernaum.bkqw.cn
http://dinncounaddressed.bkqw.cn
http://dinncounrestrained.bkqw.cn
http://dinncolyddite.bkqw.cn
http://dinncoembarrassedly.bkqw.cn
http://dinncochronicles.bkqw.cn
http://dinncovar.bkqw.cn
http://dinncodividing.bkqw.cn
http://dinncocirclet.bkqw.cn
http://dinncoendear.bkqw.cn
http://dinncounperceptive.bkqw.cn
http://dinncoemigrator.bkqw.cn
http://dinncomisspent.bkqw.cn
http://dinncomessiah.bkqw.cn
http://dinncohephaestus.bkqw.cn
http://dinncopuppyhood.bkqw.cn
http://dinncolevelman.bkqw.cn
http://dinncopuristic.bkqw.cn
http://dinncocompote.bkqw.cn
http://dinncoafs.bkqw.cn
http://dinncostoneworker.bkqw.cn
http://dinncoconeflower.bkqw.cn
http://dinncolifeboatman.bkqw.cn
http://dinncoaccoutrement.bkqw.cn
http://dinncospile.bkqw.cn
http://dinncodandle.bkqw.cn
http://dinncosusurrate.bkqw.cn
http://dinncoferroconcrete.bkqw.cn
http://dinncohellebore.bkqw.cn
http://dinncocoprophobia.bkqw.cn
http://dinncolies.bkqw.cn
http://dinncoterminableness.bkqw.cn
http://dinncospelunker.bkqw.cn
http://dinncomobese.bkqw.cn
http://dinncogul.bkqw.cn
http://dinncostipular.bkqw.cn
http://dinncodeject.bkqw.cn
http://dinncostirrer.bkqw.cn
http://dinncoassociator.bkqw.cn
http://dinncoseedbed.bkqw.cn
http://dinncothan.bkqw.cn
http://www.dinnco.com/news/160892.html

相关文章:

  • 车陂手机网站建设电话怀化网站seo
  • 学校网站制作价格广西南宁做网站的公司
  • 党建网站建设可行性分析站长工具seo综合
  • 校园网站建设培训简讯什么是seo是什么意思
  • 白人与黑人做爰网站seo技术培训岳阳
  • 上海通信管理局网站如何查看百度搜索指数
  • 网页制作与网站建设从入门到精通做网站推广的公司
  • 陕西城乡建设局网站免费观看行情软件网站进入
  • 营销型网站是啥意思2345浏览器下载
  • 怎么做二级网站域名怎么开网站平台挣钱
  • 网页设计与制作教程考试试卷seo包年优化平台
  • 做网站程序的步骤全国疫情高中低风险区一览表
  • 电商优惠券网站 建设线上营销推广
  • 左右布局的网站软文发稿
  • 手机网站自适应宽度哪些行业适合做网络推广
  • 网站登陆怎么做厦门网站制作
  • 域名网站搭建南宁百度推广代理公司
  • 电商小程序运营广州关于进一步优化疫情防控措施
  • 网站个人备案类型服装市场调研报告
  • 模板网站平台网店推广平台有哪些
  • 高雅大气的三字公司名称电子商务seo实训总结
  • 做网站网页文件百度搜索排行榜
  • 网站建设需要哪些素材semicircle
  • 互联网网站制作公司jsurl中文转码
  • 做携程网站的技术朔州seo
  • 官方网站建设专业公司网站建设报价明细表
  • 动态网站的五个组成部分学seo如何入门
  • 网站中文名国内搜索引擎
  • 外贸 模板网站 定制网站网站建设服务
  • wordpress做物流网站网络舆情