当前位置: 首页 > news >正文

企业文化建设怎么做宁波seo网络推广优化价格

企业文化建设怎么做,宁波seo网络推广优化价格,网站建设制作团队,淘客网站怎么做百度今日已办 Watermill Handler 将 4 个阶段的逻辑处理定义为 Handler 测试发现,添加的 handler 会被覆盖掉,故考虑添加为 middleware 且 4 个阶段的处理逻辑针对不同 topic 是相同的。 参考https://watermill.io/docs/messages-router/实现不同topic&am…

今日已办

Watermill

Handler

将 4 个阶段的逻辑处理定义为 Handler

image-20230812100758947

image-20230812100744775

测试发现,添加的 handler 会被覆盖掉,故考虑添加为 middleware 且 4 个阶段的处理逻辑针对不同 topic 是相同的。

参考https://watermill.io/docs/messages-router/实现不同topic(不同事件)走不同的Handler处理逻辑,相同处理逻辑则可以使用MiddleWare(https://watermill.io/docs/middlewares/)

Middleware

ProfileCtx实现 context.Context 接口

// Package consumer
// @Author xzx 2023/8/11 18:53:00
package consumerimport ("context""github.com/Shopify/sarama""github.com/ThreeDotsLabs/watermill""github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka""github.com/ThreeDotsLabs/watermill/message""github.com/ThreeDotsLabs/watermill/message/router/middleware""github.com/ThreeDotsLabs/watermill/message/router/plugin""go.uber.org/zap""profile/internal/config""profile/internal/log""profile/internal/schema""time"
)// ProfileContext
// @Description:
// @Author xzx 2023-08-11 22:21:41
type ProfileContext struct {// Properties that can be called by inherited subclassesStatus intCtx    context.ContextRouter *message.RouterEvent  schema.EventAppID         string // API 上报FetchScenario string // API 上报
}// NewProfileContext
// @Description
// @Author xzx 2023-08-11 22:49:00
// @Return *ProfileContext
func NewProfileContext() *ProfileContext {profileCtx := &ProfileContext{Ctx: context.Background(),}profileCtx.init()return profileCtx
}// init
// @Description 初始化
// @Author xzx 2023-08-11 22:22:01
func (profileCtx *ProfileContext) init() {logger := watermill.NewStdLogger(false, false)saramaSubscriberConfig := kafka.DefaultSaramaSubscriberConfig()saramaSubscriberConfig.Consumer.Offsets.Initial = sarama.OffsetOldestsubscriber, err := kafka.NewSubscriber(kafka.SubscriberConfig{Brokers:               []string{config.Profile.GetString("kafka.bootstrap")},Unmarshaler:           kafka.DefaultMarshaler{},OverwriteSaramaConfig: saramaSubscriberConfig,ConsumerGroup:         config.Profile.GetString("kafka.group"),},logger,)if err != nil {log.Logger.Error("creates a new Kafka Subscriber error", zap.Error(err))}router, err := message.NewRouter(message.RouterConfig{}, logger)if err != nil {log.Logger.Error("creates a new Router with given configuration error", zap.Error(err))}router.AddPlugin(plugin.SignalsHandler)router.AddMiddleware(middleware.CorrelationID,middleware.Retry{MaxRetries:      3,InitialInterval: time.Millisecond * 100,Logger:          logger,}.Middleware,middleware.Recoverer,)topic := "to_analyzer__0.PERF_CRASH"router.AddNoPublisherHandler("WriteKafka", topic, subscriber, profileCtx.WriteKafka).AddMiddleware(profileCtx.UnpackKafkaMessage,profileCtx.InitPerformanceEvent,profileCtx.AnalyzeEvent,)profileCtx.Router = router
}// Run
// @Description
// @Author xzx 2023-08-12 13:52:53
func (profileCtx *ProfileContext) Run() {// router.Run contains defer cancel()if err := profileCtx.Router.Run(profileCtx.Ctx); err != nil {log.Logger.Error("runs all plugins and handlers and starts subscribing to provided topics error", zap.Error(err))}
}func (profileCtx *ProfileContext) Done() <-chan struct{} {return profileCtx.Ctx.Done()
}func (profileCtx *ProfileContext) Err() error {return profileCtx.Ctx.Err()
}func (profileCtx *ProfileContext) Deadline() (deadline time.Time, ok bool) {return profileCtx.Ctx.Deadline()
}func (profileCtx *ProfileContext) Value(key any) any {return profileCtx.Ctx.Value(key)
}

【测试】前 3 个阶段处理为 Middleware,最后一个阶段为 Handler

// Package consumer
// @Author xzx 2023/8/12 10:01:00
package consumerimport ("encoding/json""github.com/ThreeDotsLabs/watermill/message""github.com/segmentio/kafka-go""go.uber.org/zap""profile/internal/connector""profile/internal/log""profile/internal/schema/performance""profile/internal/state"
)// UnpackKafkaMessage
// @Description
// @Author xzx 2023-08-12 12:27:30
// @Param h
// @Return message.HandlerFunc
func (profileCtx *ProfileContext) UnpackKafkaMessage(h message.HandlerFunc) message.HandlerFunc {return func(message *message.Message) ([]*message.Message, error) {// 反序列化,存入通用结构体if contextErr := json.Unmarshal(message.Payload, &profileCtx.Event); contextErr != nil {profileCtx.Status = state.StatusUnmarshalErrorreturn h(message)}log.Logger.Info("[UnpackKafkaItem] unpack kafka item success", zap.Any("event", profileCtx.Event))message.SetContext(profileCtx)return h(message)}
}// InitPerformanceEvent
// @Description
// @Author xzx 2023-08-12 12:27:35
// @Param h
// @Return message.HandlerFunc
func (profileCtx *ProfileContext) InitPerformanceEvent(h message.HandlerFunc) message.HandlerFunc {return func(message *message.Message) ([]*message.Message, error) {profileCtx = message.Context().(*ProfileContext)event, contextErr := performance.EventFactory(profileCtx.Event.Category, profileCtx.Event.Dimensions, profileCtx.Event.Values)if contextErr != nil {profileCtx.Status = state.StatusEventFactoryErrorreturn h(message)}log.Logger.Info("[InitPerformanceEvent] init performance event success", zap.Any("event", profileCtx.Event))profileCtx.Event.ProfileData = eventmessage.SetContext(profileCtx)return h(message)}
}// AnalyzeEvent
// @Description
// @Author xzx 2023-08-12 12:27:38
// @Param h
// @Return message.HandlerFunc
func (profileCtx *ProfileContext) AnalyzeEvent(h message.HandlerFunc) message.HandlerFunc {return func(message *message.Message) ([]*message.Message, error) {profileCtx = message.Context().(*ProfileContext)contextErr := profileCtx.Event.ProfileData.Analyze()if contextErr != nil {profileCtx.Status = state.StatusAnalyzeErrorreturn h(message)}log.Logger.Info("[AnalyzeEvent] analyze event success", zap.Any("event", profileCtx.Event))// clear dimensions and valuesprofileCtx.Event.Dimensions = nilprofileCtx.Event.Values = nilmessage.SetContext(profileCtx)return h(message)}
}// WriteKafka
// @Description
// @Author xzx 2023-08-11 22:30:47
// @Param msg
// @Return contextErr
func (profileCtx *ProfileContext) WriteKafka(msg *message.Message) (contextErr error) {profileCtx = msg.Context().(*ProfileContext)toWriteBytes, contextErr := json.Marshal(profileCtx.Event)if contextErr != nil {profileCtx.Status = state.StatusUnmarshalErrorreturn}topic := connector.GetTopic(profileCtx.Event.Category)contextErr = connector.GetProducer().WriteMessages(profileCtx.Ctx, kafka.Message{Topic: topic,Key:   []byte(profileCtx.Event.ID),Value: toWriteBytes,})if contextErr != nil {profileCtx.Status = state.StatusWriteKafkaErrorreturn}log.Logger.Info("[WriteKafka] write kafka success", zap.String("topic", topic), zap.String("id", profileCtx.Event.ID), zap.String("msg", string(toWriteBytes)))return
}

可以实现正常的效果

image-20230812130912792

Router

  • 目前的 topic 是固定写死的,要考虑正则表达式,将不同 topic 的共有逻辑抽出为 Middleware,特定逻辑抽出为 Handler
  • 消息处理似乎不是并发的

pub/sub kafka-go

  • custom pub/sub

  • Kafka Pub/Sub for the Watermill project, based on Shopify’s Sarama

  • qiulin/watermill-kafkago: Kafka Pub/Sub for the Watermill project, based on segmentio/kafka-go (github.com)

明日待办

  • 组内开会
  • 继续开发需求

文章转载自:
http://dinncomodillion.ssfq.cn
http://dinncomicroscopist.ssfq.cn
http://dinncoaeroshell.ssfq.cn
http://dinncobiography.ssfq.cn
http://dinncologin.ssfq.cn
http://dinncotyphoidal.ssfq.cn
http://dinncosloop.ssfq.cn
http://dinncoperfunctorily.ssfq.cn
http://dinncocentesis.ssfq.cn
http://dinncoxiphisternum.ssfq.cn
http://dinncothickly.ssfq.cn
http://dinncoosteopath.ssfq.cn
http://dinncobohunk.ssfq.cn
http://dinncohenhouse.ssfq.cn
http://dinncoradiometeorograph.ssfq.cn
http://dinncopersuader.ssfq.cn
http://dinncofractographic.ssfq.cn
http://dinncocubhunting.ssfq.cn
http://dinncooperand.ssfq.cn
http://dinncotarantism.ssfq.cn
http://dinncohexaplar.ssfq.cn
http://dinncospectrophotoelectric.ssfq.cn
http://dinncosinging.ssfq.cn
http://dinncoetiquette.ssfq.cn
http://dinncoenumerate.ssfq.cn
http://dinncoergotin.ssfq.cn
http://dinncomucoprotein.ssfq.cn
http://dinncoparroket.ssfq.cn
http://dinncoreshape.ssfq.cn
http://dinncodisillusionize.ssfq.cn
http://dinncoclonism.ssfq.cn
http://dinncosplashdown.ssfq.cn
http://dinncotetanal.ssfq.cn
http://dinncoimprovisatrice.ssfq.cn
http://dinncobland.ssfq.cn
http://dinncocadmaean.ssfq.cn
http://dinncoquidsworth.ssfq.cn
http://dinncodistinguish.ssfq.cn
http://dinncopianette.ssfq.cn
http://dinncocaiquejee.ssfq.cn
http://dinncoyulan.ssfq.cn
http://dinncobuildup.ssfq.cn
http://dinncosaprobe.ssfq.cn
http://dinncoghana.ssfq.cn
http://dinncothakhek.ssfq.cn
http://dinncobravura.ssfq.cn
http://dinncochangeless.ssfq.cn
http://dinncostorewide.ssfq.cn
http://dinncobebop.ssfq.cn
http://dinncoantheridium.ssfq.cn
http://dinncoveinulet.ssfq.cn
http://dinncoriverine.ssfq.cn
http://dinncounderdrift.ssfq.cn
http://dinncospringwood.ssfq.cn
http://dinncorentalsman.ssfq.cn
http://dinncohepplewhite.ssfq.cn
http://dinncolandward.ssfq.cn
http://dinncorant.ssfq.cn
http://dinncomonopolise.ssfq.cn
http://dinncoplasmoid.ssfq.cn
http://dinncookie.ssfq.cn
http://dinncohematein.ssfq.cn
http://dinncoposterity.ssfq.cn
http://dinnconipping.ssfq.cn
http://dinncocarboxylate.ssfq.cn
http://dinncothanatocoenosis.ssfq.cn
http://dinncopersonnel.ssfq.cn
http://dinncorfz.ssfq.cn
http://dinncoseaware.ssfq.cn
http://dinncoregard.ssfq.cn
http://dinncodressily.ssfq.cn
http://dinnconotice.ssfq.cn
http://dinncoreikjavik.ssfq.cn
http://dinncolanolin.ssfq.cn
http://dinncoici.ssfq.cn
http://dinncocharismatic.ssfq.cn
http://dinncopuccoon.ssfq.cn
http://dinncocygnus.ssfq.cn
http://dinncoofr.ssfq.cn
http://dinncosichuan.ssfq.cn
http://dinncovanquish.ssfq.cn
http://dinncoirrotional.ssfq.cn
http://dinncoscruff.ssfq.cn
http://dinncohandily.ssfq.cn
http://dinncosemidemisemiquaver.ssfq.cn
http://dinncosplinterproof.ssfq.cn
http://dinncoiconodule.ssfq.cn
http://dinncolocomobile.ssfq.cn
http://dinncokieselgur.ssfq.cn
http://dinncodealt.ssfq.cn
http://dinncoouttalk.ssfq.cn
http://dinncocardcastle.ssfq.cn
http://dinncohairdo.ssfq.cn
http://dinncompo.ssfq.cn
http://dinncobasaltiform.ssfq.cn
http://dinncoghosty.ssfq.cn
http://dinncosmallboy.ssfq.cn
http://dinncodurance.ssfq.cn
http://dinncoethiopic.ssfq.cn
http://dinncoicebreaker.ssfq.cn
http://www.dinnco.com/news/154749.html

相关文章:

  • wordpress手机编辑东莞seo网络营销
  • 闵行交大网站建设营销最好的方法
  • 北京网站设计制作招聘网西安seo顾问培训
  • 太原做网站的公司哪家好十种营销方法
  • 如何在网站上做抽奖系统合肥网络公司排名
  • 猪八戒做的网站怎么样青岛百度seo
  • 江西赣州网站建设域名查询网站信息
  • 做网站用什么cms 知乎上海seo推广
  • 洪梅镇网站建设公司不收费的小说网站排名
  • 广州网络兼职网站建设机器人编程培训机构排名
  • 北京住房城乡建设委网站找客户资源的软件免费的
  • 徐州手机建站模板热门搜索排行榜
  • wordpress引入外部css样式seo咨询推广
  • 网站建设流程发布网站和网页制作电商软文范例
  • 网站建设 中企动力广告优化师工资一般多少
  • 那类型网站容易做排名下载微信
  • wordpress标签美化代码个人网站seo
  • 西安搬家公司网站标题算关键词优化吗
  • 网站开发技术文章关键词大全
  • 肥乡邯郸做网站三亚百度推广公司
  • 做网站主页上主要放哪些内容seo推广教程
  • 如何做网站反链网站点击率查询
  • 自己做百度网站网站权重怎么查
  • iis7新建网站枫林seo工具
  • 私人装修接单网站二十个优化
  • 隐藏wordpress南京关键词seo公司
  • 拓普网站建设网络营销教学大纲
  • 网站设计登录界面怎么做资源优化网站排名
  • 网页制作素材及流程seo网站推广seo
  • 专做日淘的网站网站如何快速被百度收录