当前位置: 首页 > news >正文

北京做网站要多少钱百度快照怎么看

北京做网站要多少钱,百度快照怎么看,鞍山高新区网站,专业建设网站应该怎么做informer中的DeltaFIFO机制的实现分析与源码解读 DeltaFIFO作为informer中重要组件,本文从源码层面了解是如何DelatFIFO是实现的。 DeltaFIFO的定义 找到delta_fifo.go的源码,位于client-go/tools/cache/delta_fifo.go 代码结构大致如下: store定义…

informer中的DeltaFIFO机制的实现分析与源码解读

DeltaFIFO作为informer中重要组件,本文从源码层面了解是如何DelatFIFO是实现的。

DeltaFIFO的定义

找到delta_fifo.go的源码,位于client-go/tools/cache/delta_fifo.go

代码结构大致如下:
在这里插入图片描述

store定义了一个通用的存储接口

type Store interface {Add(obj interface{}) errorUpdate(obj interface{}) errorDelete(obj interface{}) errorList() []interface{}ListKeys() []stringGet(obj interface{}) (item interface{}, exists bool, err error)GetByKey(key string) (item interface{}, exists bool, err error)// Replace will delete the contents of the store, using instead the// given list. Store takes ownership of the list, you should not reference// it after calling this function.Replace([]interface{}, string) errorResync() error
}

Queue接口继承了store,但添加了Pop()重要方法,实现了队列的能力

// Queue is exactly like a Store, but has a Pop() method too.
type Queue interface {Store// Pop blocks until it has something to process.// It returns the object that was process and the result of processing.// The PopProcessFunc may return an ErrRequeue{...} to indicate the item// should be requeued before releasing the lock on the queue.Pop(PopProcessFunc) (interface{}, error)// AddIfNotPresent adds a value previously// returned by Pop back into the queue as long// as nothing else (presumably more recent)// has since been added.AddIfNotPresent(interface{}) error// HasSynced returns true if the first batch of items has been poppedHasSynced() bool// Close queueClose()
}

FIFO类型实现了Queue接口

type FIFO struct {lock sync.RWMutexcond sync.Cond// We depend on the property that items in the set are in the queue and vice versa.items map[string]interface{}queue []string// populated is true if the first batch of items inserted by Replace() has been populated// or Delete/Add/Update was called first.populated bool// initialPopulationCount is the number of items inserted by the first call of Replace()initialPopulationCount int// keyFunc is used to make the key used for queued item insertion and retrieval, and// should be deterministic.keyFunc KeyFunc// Indication the queue is closed.// Used to indicate a queue is closed so a control loop can exit when a queue is empty.// Currently, not used to gate any of CRED operations.closed     boolclosedLock sync.Mutex
}var (_ = Queue(&FIFO{}) // FIFO is a Queue
)

DeltaFIFO类型也实现了Queue接口,与FIFO主要的区别是有两种特殊的方式:replaced和sync。replaced一般发生在资源版本更新时,而sync由resync定时发起。

type DeltaFIFO struct {// lock/cond protects access to 'items' and 'queue'.lock sync.RWMutexcond sync.Cond// We depend on the property that items in the set are in// the queue and vice versa, and that all Deltas in this// map have at least one Delta.items map[string]Deltasqueue []string// populated is true if the first batch of items inserted by Replace() has been populated// or Delete/Add/Update was called first.populated bool// initialPopulationCount is the number of items inserted by the first call of Replace()initialPopulationCount int// keyFunc is used to make the key used for queued item// insertion and retrieval, and should be deterministic.keyFunc KeyFunc// knownObjects list keys that are "known", for the// purpose of figuring out which items have been deleted// when Replace() or Delete() is called.knownObjects KeyListerGetter// Indication the queue is closed.// Used to indicate a queue is closed so a control loop can exit when a queue is empty.// Currently, not used to gate any of CRED operations.closed     boolclosedLock sync.Mutex
}var (_ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue
)

DeltaFIFO的构造函数

// NewDeltaFIFOWithOptions returns a Queue which can be used to process changes to
// items. See also the comment on DeltaFIFO.
func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {return NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction:  keyFunc,KnownObjects: knownObjects,})
}func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {if opts.KeyFunction == nil {opts.KeyFunction = MetaNamespaceKeyFunc // 如果不指定keyFunc,默认就是MetaNamespaceKeyFunc}f := &DeltaFIFO{items:        map[string]Deltas{}, 	// 存放[]Delta的数组,queue:        []string{},						// 存储obj的key,key通常是ns/name格式的字符串keyFunc:      opts.KeyFunction,			// 由obj生成key的函数knownObjects: opts.KnownObjects,emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced,transformer:           opts.Transformer,}f.cond.L = &f.lockreturn f
}var (_ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue
)

Delta的定义

根据Delta数据结构的定义,delta包含了一个资源对象的变更类型及变更的内容。这里的Object不一定是完整的资源数据,大部分场景下只会有变更的部分信息。

// Delta is a member of Deltas (a list of Delta objects) which
// in its turn is the type stored by a DeltaFIFO. It tells you what
// change happened, and the object's state after* that change.
//
// [*] Unless the change is a deletion, and then you'll get the final
// state of the object before it was deleted.
type Delta struct {Type   DeltaType		// 表示对obj的操作类型"Added/Updated/Deleted/Replaced/Sync"Object interface{}	// 表示某个资源对象,比如命名为"one"的pod
}// Deltas is a list of one or more 'Delta's to an individual object.
// The oldest delta is at index 0, the newest delta is the last one.
type Deltas []Delta// DeltaType is the type of a change (addition, deletion, etc)
type DeltaType string// Change type definition
const (Added   DeltaType = "Added"Updated DeltaType = "Updated"Deleted DeltaType = "Deleted"// Replaced is emitted when we encountered watch errors and had to do a// relist. We don't know if the replaced object has changed.//// NOTE: Previous versions of DeltaFIFO would use Sync for Replace events// as well. Hence, Replaced is only emitted when the option// EmitDeltaTypeReplaced is true.Replaced DeltaType = "Replaced"// Sync is for synthetic events during a periodic resync.Sync DeltaType = "Sync"
)

Deletas的ADD()入队分析

watch机制监控到事件后,会把事件入队操作。

// Add inserts an item, and puts it in the queue. The item is only enqueued
// if it doesn't already exist in the set.
func (f *DeltaFIFO) Add(obj interface{}) error {f.lock.Lock()defer f.lock.Unlock()f.populated = truereturn f.queueActionLocked(Added, obj)	// 实际调用的是函数queueActionLocked()
}

queueActionLocked的逻辑主要包括从obj生产key(代码中是id),再有actionType和Obj构建一个新的Delta, 再把Delta加入Deltas切片中,之后,把Deltas放入items哈希表,key放入Queue队列中去。要注意Delta加入Deltas时需要进行出重。

// queueActionLocked appends to the delta list for the object.
// Caller must lock first.
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {id, err := f.KeyOf(obj)									// 先用keyFunc,通过obj获取到对应的keyif err != nil {return KeyError{obj, err}}newDeltas := append(f.items[id], Delta{actionType, obj})	// 用actionType和Obj构建一个新的Delta,再把Delta追加到(f.items[id]返回的)Deltas切片newDeltas = dedupDeltas(newDeltas)												// 对新的Deltas切去去重if len(newDeltas) > 0 {																		// 如果newDeltas切片中存在Deltaif _, exists := f.items[id]; !exists {									f.queue = append(f.queue, id)													// 将key放到queue中}f.items[id] = newDeltas																	// 把新的Deltes切片,放到items哈希表f.cond.Broadcast()}return nil
}

Delta去重

Delta进行Add()操作时,会对加入的delta进行去重。去重逻辑目前只针对两个delete类型的delta有效:当delta数组中倒数第一个和第二个delta都是delete类型时,将会去掉其中一个

// re-listing and watching can deliver the same update multiple times in any
// order. This will combine the most recent two deltas if they are the same.
func dedupDeltas(deltas Deltas) Deltas {n := len(deltas)if n < 2 {return deltas}a := &deltas[n-1]b := &deltas[n-2]if out := isDup(a, b); out != nil {d := append(Deltas{}, deltas[:n-2]...)return append(d, *out)}return deltas
}// If a & b represent the same event, returns the delta that ought to be kept.
// Otherwise, returns nil.
// TODO: is there anything other than deletions that need deduping?
func isDup(a, b *Delta) *Delta {if out := isDeletionDup(a, b); out != nil {return out}// TODO: Detect other duplicate situations? Are there any?return nil
}// keep the one with the most information if both are deletions.
func isDeletionDup(a, b *Delta) *Delta {if b.Type != Deleted || a.Type != Deleted {					// 仅处理a,b都是"Deleted"类型的事件;return nil}// Do more sophisticated checks, or is this sufficient?if _, ok := b.Object.(DeletedFinalStateUnknown); ok { // 如果a,b都是"Deleted",就只返回一个Deltareturn a}return b
}

在这里插入图片描述

Deltas的pop出队

deltaFIFO出队的操作和普通的队列出队类似,从队头取出一个资源对象key,并删除items中key对应的deltas数组。

pop出队时,会调用传参PopProcessFunc对出队元素进行处理。

// Pop blocks until an item is added to the queue, and then returns it.  If
// multiple items are ready, they are returned in the order in which they were
// added/updated. The item is removed from the queue (and the store) before it
// is returned, so if you don't successfully process it, you need to add it back
// with AddIfNotPresent().
// process function is called under lock, so it is safe update data structures
// in it that need to be in sync with the queue (e.g. knownKeys). The PopProcessFunc
// may return an instance of ErrRequeue with a nested error to indicate the current
// item should be requeued (equivalent to calling AddIfNotPresent under the lock).
//
// Pop returns a 'Deltas', which has a complete list of all the things
// that happened to the object (deltas) while it was sitting in the queue.
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {f.lock.Lock()defer f.lock.Unlock()for {// 如果队列为空,就f.cond.Wait阻塞等待for len(f.queue) == 0 {// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.// When Close() is called, the f.closed is set and the condition is broadcasted.// Which causes this loop to continue and return from the Pop().if f.IsClosed() {return nil, ErrFIFOClosed}f.cond.Wait()					}// 从f.queue中去重第一个元素id := f.queue[0]f.queue = f.queue[1:]if f.initialPopulationCount > 0 {f.initialPopulationCount--}// 从items哈希表中根据id,取出Deltasitem, ok := f.items[id]// 如果itmes哈希表中差不到id对应的Deltas,就结束进入下次循环if !ok {// Item may have been deleted subsequently.continue}// 从items哈希表中删除id对应的Deltasdelete(f.items, id)// process()函数来处理从items哈希表中取出的Deltaserr := process(item)// 如果出现错误,就把id加回queue,同时把Deltas加回itemsif e, ok := err.(ErrRequeue); ok {f.addIfNotPresent(id, item)err = e.Err}// Don't need to copyDeltas here, because we're transferring// ownership to the caller.return item, err}

接着我们看看process()函数具体是什么。

如果对informer启动比较熟悉的话,可以知道在创建informer时,newInformer()函数需要指定ProcessFunc。这个处理函数包括数据同步到存储,以及调用注册的用户函数两个操作。

func newInformer(lw ListerWatcher,objType runtime.Object,resyncPeriod time.Duration,h ResourceEventHandler,clientState Store,
) Controller {// This will hold incoming changes. Note how we pass clientState in as a// KeyLister, that way resync operations will result in the correct set// of update/delete deltas.fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{KnownObjects:          clientState,EmitDeltaTypeReplaced: true,})cfg := &Config{Queue:            fifo,ListerWatcher:    lw,ObjectType:       objType,FullResyncPeriod: resyncPeriod,RetryOnError:     false,// 指定处理从deltaFIFO队列pop处理的数据的处理函数ProcessFuncProcess: func(obj interface{}) error {// from oldest to newestfor _, d := range obj.(Deltas) {switch d.Type {case Sync, Replaced, Added, Updated:// 同步存储数据,clientState是一个storeif old, exists, err := clientState.Get(d.Object); err == nil && exists {if err := clientState.Update(d.Object); err != nil {return err}// 回调用户定义的hander函数h.OnUpdate(old, d.Object)} else {// 同步存储数据if err := clientState.Add(d.Object); err != nil {return err}// 回调用户定义的hander函数h.OnAdd(d.Object)}case Deleted:// 同步存储数据if err := clientState.Delete(d.Object); err != nil {return err}// 回调用户定义的hander函数h.OnDelete(d.Object)}}return nil},}return New(cfg)
}

进一步探究一下,informer启动run()后,会调用controller.Run(),最后c.processLoop会循环处理pop出队处理,流程大致如下:

informer.run(stopCh) —> s.controller.Run(stopCh)—>c.processLoop—>c.config.Queue.Pop(PopProcessFunc(c.config.Process))

源码位于:vender/k8s.io/client-go/tools/cache/controller.go

func (c *controller) Run(stopCh <-chan struct{}) {defer utilruntime.HandleCrash()go func() {<-stopChc.config.Queue.Close()}()// 构建一个Reflectorr := NewReflector(c.config.ListerWatcher,c.config.ObjectType,c.config.Queue,c.config.FullResyncPeriod,)r.ShouldResync = c.config.ShouldResyncr.clock = c.clockc.reflectorMutex.Lock()c.reflector = rc.reflectorMutex.Unlock()var wg wait.Groupwg.StartWithChannel(stopCh, r.Run)// 调用c.processLoop函数wait.Until(c.processLoop, time.Second, stopCh)wg.Wait()
}func (c *controller) processLoop() {for {// 循环的Pop出队,把出队的事件交给PopProcessFunc函数处理obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))if err != nil {if err == ErrFIFOClosed {return}if c.config.RetryOnError {// This is the safe way to re-enqueue.c.config.Queue.AddIfNotPresent(obj)}}}
}

DeltaFIFO出队入队的流程图

在这里插入图片描述


文章转载自:
http://dinncoostitic.wbqt.cn
http://dinncovesa.wbqt.cn
http://dinncocarbine.wbqt.cn
http://dinncomocky.wbqt.cn
http://dinncoorthopaedist.wbqt.cn
http://dinncothymine.wbqt.cn
http://dinncomultiflash.wbqt.cn
http://dinncoobligor.wbqt.cn
http://dinncomhs.wbqt.cn
http://dinncogaoleress.wbqt.cn
http://dinncoremorse.wbqt.cn
http://dinncoinstantiate.wbqt.cn
http://dinncomandi.wbqt.cn
http://dinncoadmiralship.wbqt.cn
http://dinncobronchoconstriction.wbqt.cn
http://dinncoregulatory.wbqt.cn
http://dinncoalumnus.wbqt.cn
http://dinncoaetna.wbqt.cn
http://dinncodungy.wbqt.cn
http://dinncomisbehavior.wbqt.cn
http://dinncononsulfide.wbqt.cn
http://dinncodemographer.wbqt.cn
http://dinncoplutocratic.wbqt.cn
http://dinncohallucinate.wbqt.cn
http://dinncocineprojector.wbqt.cn
http://dinnconeighbourship.wbqt.cn
http://dinncofellowship.wbqt.cn
http://dinncorheochord.wbqt.cn
http://dinncopolytonal.wbqt.cn
http://dinncodiseasedness.wbqt.cn
http://dinncoionosphere.wbqt.cn
http://dinncomeemies.wbqt.cn
http://dinncolineup.wbqt.cn
http://dinncoscattershot.wbqt.cn
http://dinncoecru.wbqt.cn
http://dinncocode.wbqt.cn
http://dinncosol.wbqt.cn
http://dinncociting.wbqt.cn
http://dinncolabellum.wbqt.cn
http://dinncowithstand.wbqt.cn
http://dinncotabs.wbqt.cn
http://dinncoludic.wbqt.cn
http://dinncocomic.wbqt.cn
http://dinncotribunicial.wbqt.cn
http://dinncocornfed.wbqt.cn
http://dinncodewfall.wbqt.cn
http://dinncotrackway.wbqt.cn
http://dinncovindication.wbqt.cn
http://dinncoplanula.wbqt.cn
http://dinncoschematism.wbqt.cn
http://dinncodiminuendo.wbqt.cn
http://dinncogaekwar.wbqt.cn
http://dinncochoose.wbqt.cn
http://dinncomonomaniacal.wbqt.cn
http://dinncoeradication.wbqt.cn
http://dinncohistogenetically.wbqt.cn
http://dinncoeyrir.wbqt.cn
http://dinncoabolishable.wbqt.cn
http://dinncoganov.wbqt.cn
http://dinncopolitico.wbqt.cn
http://dinncorheogoniometer.wbqt.cn
http://dinncoimpute.wbqt.cn
http://dinncoinhabitable.wbqt.cn
http://dinncolambskin.wbqt.cn
http://dinncovoila.wbqt.cn
http://dinncostrophiole.wbqt.cn
http://dinncogrillroom.wbqt.cn
http://dinncodruggy.wbqt.cn
http://dinncoamphidromia.wbqt.cn
http://dinncoovertime.wbqt.cn
http://dinncosarcous.wbqt.cn
http://dinncoeditress.wbqt.cn
http://dinncoprovocator.wbqt.cn
http://dinncopermissively.wbqt.cn
http://dinncopostcranial.wbqt.cn
http://dinncogibli.wbqt.cn
http://dinncoprefigurative.wbqt.cn
http://dinncodramatize.wbqt.cn
http://dinncofaraday.wbqt.cn
http://dinncoairplay.wbqt.cn
http://dinncopreocular.wbqt.cn
http://dinncohyperaldosteronism.wbqt.cn
http://dinncogryphon.wbqt.cn
http://dinncocephalic.wbqt.cn
http://dinncojiff.wbqt.cn
http://dinncoantitone.wbqt.cn
http://dinncocallithump.wbqt.cn
http://dinncohastate.wbqt.cn
http://dinncounenviable.wbqt.cn
http://dinncoinspiratory.wbqt.cn
http://dinncomedibank.wbqt.cn
http://dinncoinexorably.wbqt.cn
http://dinncobaedeker.wbqt.cn
http://dinncohypomotility.wbqt.cn
http://dinncoprovisory.wbqt.cn
http://dinncoserviceman.wbqt.cn
http://dinncoamenity.wbqt.cn
http://dinncoependymary.wbqt.cn
http://dinncobilabiate.wbqt.cn
http://dinncomyasthenia.wbqt.cn
http://www.dinnco.com/news/125629.html

相关文章:

  • 贵州省城乡与建设厅网站什么是整合营销概念
  • 邯郸做网站优化艾滋病多久可以查出来
  • wordpress4.7下载北京官网seo
  • 建一个免费网站的流程衡水seo排名
  • 怎样用自己的服务器做网站谷歌seo排名公司
  • 找不同 网站开发英文站友情链接去哪里查
  • 专业做域名的网站软文推广什么意思
  • 不花钱做推广的网站上海自动seo
  • 微信网站开发合同泽成seo网站排名
  • 免费做请帖的网站北京专业网站优化
  • 做网站团队seo优化方案案例
  • 做家政服务类网站的要求微信运营工具
  • Wordpress 普通图片裁剪win10最强性能优化设置
  • 莆田网站建设公司渠道营销推广方案
  • wordpress 页面代码seo关键词优化经验技巧
  • 网站升级 html泉州seo报价
  • 网站系统性能定义成都网站优化公司
  • 政府网站群集约化建设郑州seo优化服务
  • 做网站企业湛江seo网站管理
  • 网站中图片中间是加号怎么做seo具体seo怎么优化
  • 怎样建一个个人网站抖音引流推广一个30元
  • 怎样做网商网站上海关键词优化报价
  • 政府网站集约化建设范围信息推广平台
  • 做视频网站审核编辑有假么云南网络推广服务
  • 电商网站与企业网站区别windows10优化软件
  • 分类网站建设方案东莞营销网站建设直播
  • 外贸网站建设 东莞软文写作公司
  • 莱芜论坛24小时主题帖seo优化包括
  • 网站收录说明游戏推广代理app
  • 做外链权重高的女性网站企业网络营销策略案例