当前位置: 首页 > news >正文

怎么修改网站上的内容厦门网站建设公司名单

怎么修改网站上的内容,厦门网站建设公司名单,中企动力邮箱客户端,wordpress一键缓存基于golang多消息队列中间件的封装nsq,rabbitmq,kafka 场景 在创建个人的公共方法库中有这样一个需求,就是不同的项目会用到不同的消息队列中间件,我的思路把所有的消息队列中间件进行封装一个消息队列接口(MQer)有两个方法一个…

基于golang多消息队列中间件的封装nsq,rabbitmq,kafka

场景

在创建个人的公共方法库中有这样一个需求,就是不同的项目会用到不同的消息队列中间件,我的思路把所有的消息队列中间件进行封装一个消息队列接口(MQer)有两个方法一个生产一个消费,那么在实例化对象的时候根据配置文件指定当前项目使用的那个消息队列中间件;

接口模型

这个模型的核心思想是消息队列的核心功能生产者生产消息方法和消费者消费消息,任何消息队列都必须有这两个功能;根据如下代码消息队列中间件是可扩展的,只需在实例化消息队列对象那里添加新消息队列的实现;

// MQer 消息队列接口
type MQer interface {Producer(topic string, data []byte)Consumer(topic, channel string, ch chan []byte, f func(b []byte))
}// NewMQ 实例化消息队列对象
func NewMQ() MQer {switch conf.Conf.Default.Mq { // mq 设置的类型case "nsq":return new(MQNsqService)case "rabbit":return new(MQRabbitService)case "kafka":return new(MQKafkaService)default:return new(MQNsqService)}
}/*
配置文件结构设计mqType: "" # nsq, rabbit, kafka  这三个值然当然了是可扩展的nsq:producer: ""consumer: ""rabbit:addr: ""user: ""password: ""kafka:addr: ""
*/

各个消息队列的实现

1. 依赖库
  • nsq : github.com/nsqio/go-nsq
  • rabbitmq : github.com/streadway/amqp
  • kafka : github.com/Shopify/sarama
2. nsq

nsq结构体

// MQNsqService NSQ消息队列
type MQNsqService struct {
}

生产者

// Producer 生产者
func (m *MQNsqService) Producer(topic string, data []byte) {nsqConf := &nsq.Config{}client, err := nsq.NewProducer(nsqServer, nsqConf)if err != nil {log.Error("[nsq]无法连接到队列")return}log.DebugF(fmt.Sprintf("[生产消息] topic : %s -->  %s", topic, string(data)))err = client.Publish(topic, data)if err != nil {log.Error("[生产消息] 失败 : " + err.Error())}
}

消费者

var (nsqServer   = conf.Conf.Default.Nsq.Producer // nsqServer
)// Consumer 消费者
func (m *MQNsqService) Consumer(topic, channel string, ch chan []byte, f func(b []byte)) {mh, err := NewMessageHandler(nsqServer, channel)if err != nil {log.Error(err)return}go func() {mh.SetMaxInFlight(1000)mh.Registry(topic, ch)}()go func() {for {select {case s := <-ch:f(s)}}}()log.DebugF("[NSQ] ServerID:%v => %v started", channel, topic)
}// MessageHandler MessageHandler
type MessageHandler struct {msgChan     chan *goNsq.Messagestop        boolnsqServer   stringChannel     stringmaxInFlight int
}// NewMessageHandler return new MessageHandler
func NewMessageHandler(nsqServer string, channel string) (mh *MessageHandler, err error) {if nsqServer == "" {err = fmt.Errorf("[NSQ] need nsq server")return}mh = &MessageHandler{msgChan:   make(chan *goNsq.Message, 1024),stop:      false,nsqServer: nsqServer,Channel:   channel,}return
}// Registry register nsq topic
func (m *MessageHandler) Registry(topic string, ch chan []byte) {config := goNsq.NewConfig()if m.maxInFlight > 0 {config.MaxInFlight = m.maxInFlight}consumer, err := goNsq.NewConsumer(topic, m.Channel, config)if err != nil {panic(err)}consumer.SetLogger(nil, 0)consumer.AddHandler(goNsq.HandlerFunc(m.handlerMessage))err = consumer.ConnectToNSQLookupd(m.nsqServer)if err != nil {panic(err)}m.process(ch)
}
  1. rabbitmq
    结构体
// MQRabbitService Rabbit消息队列
type MQRabbitService struct {
}

生产者

// Producer 生产者
func (m *MQRabbitService) Producer(topic string, data []byte) {mq, err := NewRabbitMQPubSub(topic)if err != nil {log.Error("[rabbit]无法连接到队列")return}//defer mq.Destroy()log.DebugF(fmt.Sprintf("[生产消息] topic : %s -->  %s", topic, string(data)))err = mq.PublishPub(data)if err != nil {log.Error("[生产消息] 失败 : " + err.Error())}
}// NewRabbitMQPubSub 订阅模式创建 rabbitMq实例  (目前用的fanout模式)
func NewRabbitMQPubSub(exchangeName string) (*RabbitMQ, error) {mq, err := NewRabbitMQ("", exchangeName, "", "")if mq == nil || err != nil {return nil, err}//获取connectionmq.conn, err = amqp.Dial(mq.MqUrl)mq.failOnErr(err, "failed to connect mq!")if mq.conn == nil || err != nil {return nil, err}//获取channelmq.channel, err = mq.conn.Channel()mq.failOnErr(err, "failed to open a channel!")return mq, err
}...其余代码见源码: https://github.com/mangenotwork/common/tree/main/mq 

消费者

// Consumer 消费者
func (m *MQRabbitService) Consumer(topic, serverId string, ch chan []byte, f func(b []byte)) {mh, err := NewRabbitMQPubSub(topic)if err != nil {log.Error("[rabbit]无法连接到队列")return}msg := mh.RegistryReceiveSub()go func(m <-chan amqp.Delivery) {for {select {case s := <-m:f(s.Body)}}}(msg)log.DebugF("[Rabbit] ServerID:%v => %v started", serverId, topic)
}// NewRabbitMQPubSub 订阅模式创建 rabbitMq实例  (目前用的fanout模式)
func NewRabbitMQPubSub(exchangeName string) (*RabbitMQ, error) {mq, err := NewRabbitMQ("", exchangeName, "", "")if mq == nil || err != nil {return nil, err}//获取connectionmq.conn, err = amqp.Dial(mq.MqUrl)mq.failOnErr(err, "failed to connect mq!")if mq.conn == nil || err != nil {return nil, err}//获取channelmq.channel, err = mq.conn.Channel()mq.failOnErr(err, "failed to open a channel!")return mq, err
}... 其余代码见源码: https://github.com/mangenotwork/common/tree/main/mq 
  1. kafka
    结构体
// MQKafkaService Kafka消息队列
type MQKafkaService struct {
}

生产者

func (m *MQKafkaService) Producer(topic string, data []byte) {config := sarama.NewConfig()config.Producer.RequiredAcks = sarama.WaitForAll          // 发送完数据需要leader和follower都确认config.Producer.Partitioner = sarama.NewRandomPartitioner //写到随机分区中,我们默认设置32个分区config.Producer.Return.Successes = true                   // 成功交付的消息将在success channel返回// 构造一个消息msg := &sarama.ProducerMessage{}msg.Topic = topicmsg.Value = sarama.ByteEncoder(data)// 连接kafkaclient, err := sarama.NewSyncProducer(kafkaServer, config)if err != nil {log.Error("Producer closed, err:", err)return}defer client.Close()// 发送消息pid, offset, err := client.SendMessage(msg)if err != nil {log.Error("send msg failed, err:", err)return}log.InfoF("pid:%v offset:%v\n", pid, offset)
}

消费者

// Consumer 消费者
func (m *MQKafkaService) Consumer(topic, serverId string, ch chan []byte, f func(b []byte)) {var wg sync.WaitGroupconsumer, err := sarama.NewConsumer(kafkaServer, nil)if err != nil {log.ErrorF("Failed to start consumer: %s", err)return}partitionList, err := consumer.Partitions("task-status-data") // 通过topic获取到所有的分区if err != nil {log.Error("Failed to get the list of partition: ", err)return}log.Info(partitionList)for partition := range partitionList { // 遍历所有的分区pc, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest) // 针对每个分区创建一个分区消费者if err != nil {log.ErrorF("Failed to start consumer for partition %d: %s\n", partition, err)}wg.Add(1)go func(sarama.PartitionConsumer) { // 为每个分区开一个go协程取值for msg := range pc.Messages() { // 阻塞直到有值发送过来,然后再继续等待log.DebugF("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))f(msg.Value)}defer pc.AsyncClose()wg.Done()}(pc)}wg.Wait()consumer.Close()
}

总结

golang的接口是一种抽象类型,是对其他类型行为的概括与抽象,从语法角度来看,接口是一组方法定义的集合,文本的封装使用了golang接口这一特性,把所有的消息队列中间件抽象为一个MQer拥有生产和消费两个方法,具体的各个消息队列中间件去实现这两个方法即可,最明显的优点在于扩展性,解耦性,选择性,维护性这几个表象上。

完整代码

https://github.com/mangenotwork/common/tree/main/mq

你的星星是我分享的最大动力 : )


文章转载自:
http://dinncowuhsi.tqpr.cn
http://dinncodolcevita.tqpr.cn
http://dinncoerrhine.tqpr.cn
http://dinncoethiopian.tqpr.cn
http://dinncoschizoid.tqpr.cn
http://dinncodirectrix.tqpr.cn
http://dinncodigressively.tqpr.cn
http://dinncorooseveltite.tqpr.cn
http://dinncotipster.tqpr.cn
http://dinncoalterant.tqpr.cn
http://dinncokarun.tqpr.cn
http://dinncodepend.tqpr.cn
http://dinncohedgepig.tqpr.cn
http://dinnconameable.tqpr.cn
http://dinncobodyguard.tqpr.cn
http://dinncocroon.tqpr.cn
http://dinncogrecize.tqpr.cn
http://dinncogwent.tqpr.cn
http://dinncogawd.tqpr.cn
http://dinncomylonite.tqpr.cn
http://dinncosublieutenant.tqpr.cn
http://dinncocannoli.tqpr.cn
http://dinncopraedial.tqpr.cn
http://dinncowoken.tqpr.cn
http://dinncotor.tqpr.cn
http://dinncoazan.tqpr.cn
http://dinnconerchinsk.tqpr.cn
http://dinncohomodyne.tqpr.cn
http://dinncoeighth.tqpr.cn
http://dinncocontinuously.tqpr.cn
http://dinncosortilege.tqpr.cn
http://dinncoval.tqpr.cn
http://dinncochockablock.tqpr.cn
http://dinncojuncaceous.tqpr.cn
http://dinncobeaux.tqpr.cn
http://dinncoanthracosilicosis.tqpr.cn
http://dinncomeursault.tqpr.cn
http://dinncoprofit.tqpr.cn
http://dinncotruism.tqpr.cn
http://dinncoliquor.tqpr.cn
http://dinncojackass.tqpr.cn
http://dinncowedlock.tqpr.cn
http://dinncoaerosiderolite.tqpr.cn
http://dinncoseptenarius.tqpr.cn
http://dinncoprecative.tqpr.cn
http://dinncopolaroid.tqpr.cn
http://dinncoscrewhead.tqpr.cn
http://dinncohorseless.tqpr.cn
http://dinncorogation.tqpr.cn
http://dinncomonochrome.tqpr.cn
http://dinnconephelite.tqpr.cn
http://dinncosircar.tqpr.cn
http://dinncopendent.tqpr.cn
http://dinncoresistivity.tqpr.cn
http://dinncolocoplant.tqpr.cn
http://dinncoallophone.tqpr.cn
http://dinncomarchpane.tqpr.cn
http://dinncogoosegog.tqpr.cn
http://dinncoinflammable.tqpr.cn
http://dinncoplunk.tqpr.cn
http://dinncoauguste.tqpr.cn
http://dinncoinfrequency.tqpr.cn
http://dinncocalamanco.tqpr.cn
http://dinncocatholic.tqpr.cn
http://dinncogliadin.tqpr.cn
http://dinncoacrocentric.tqpr.cn
http://dinncounlamented.tqpr.cn
http://dinncorefund.tqpr.cn
http://dinncotechnique.tqpr.cn
http://dinncoromance.tqpr.cn
http://dinncocichlid.tqpr.cn
http://dinncopapyrograph.tqpr.cn
http://dinncohooly.tqpr.cn
http://dinncosadducean.tqpr.cn
http://dinncoghent.tqpr.cn
http://dinncowhites.tqpr.cn
http://dinncorochdale.tqpr.cn
http://dinncoroyster.tqpr.cn
http://dinncoheadfirst.tqpr.cn
http://dinncowashateria.tqpr.cn
http://dinncoacidophile.tqpr.cn
http://dinncoimpregnable.tqpr.cn
http://dinncopaleohabitat.tqpr.cn
http://dinncosparkle.tqpr.cn
http://dinncocaelian.tqpr.cn
http://dinncodomo.tqpr.cn
http://dinncosclerotin.tqpr.cn
http://dinncoberkeleyism.tqpr.cn
http://dinncopneumatophore.tqpr.cn
http://dinncodelict.tqpr.cn
http://dinncovivify.tqpr.cn
http://dinncoinfusive.tqpr.cn
http://dinncoepa.tqpr.cn
http://dinncoradicular.tqpr.cn
http://dinncotyrannosaurus.tqpr.cn
http://dinncodemigoddess.tqpr.cn
http://dinncotransitoriness.tqpr.cn
http://dinncosupervisorship.tqpr.cn
http://dinncomercurize.tqpr.cn
http://dinncohiver.tqpr.cn
http://www.dinnco.com/news/155606.html

相关文章:

  • 怎么做网站的签约编辑百度浏览器下载安装
  • 天津做网站的网络公司网络营销常用的方法有哪些
  • 钓鱼网站的制作教程8大营销工具
  • 苏州餐饮 网站建设电商网站建设
  • 手机活动网站模板活动策划方案
  • adobe illustrator做网站今日重点新闻
  • 基于java web的网站开发前端开发
  • 网站开发费属于无形资产那部分汕头seo
  • 用net语言做网站平台好不好免费网站开发平台
  • p2p网贷网站建设方案互联网关键词优化
  • wordpress post in百度seo快速排名
  • 中铁建工集团有限公司官网重庆百度seo公司
  • 怎么做b2c网站百度竞价排名多少钱
  • 网络科技公司网站建设策划网络培训中心
  • 做同性恋网站犯法吗seo咨询河北
  • 免费咨询疾病的网站太原关键词排名推广
  • 网站建设工作建议优化大师电脑版
  • 永康做网站的公司电脑培训班一般多少钱
  • 怎么做幼儿园网站介绍seo网站分析工具
  • 做网站要用到哪些技术下载百度app免费下载安装
  • wordpress个人中心百度seo快排软件
  • 做网站打广告图片素材南昌搜索引擎优化
  • 网站开发需要哪些技能深圳搜索竞价账户托管
  • 做公司网站排名java培训学费多少钱
  • 马尾区建设局网站软文营销范文
  • 响应式网站高度如何计算培训机构不退钱最怕什么举报
  • wordpress个人博客建站系统网课培训机构排名前十
  • wordpress 手机验证码seo推广公司有哪些
  • wordpress页面怎么加入php网页重庆seo主管
  • vue webpack 做网站搜狗引擎搜索