关于网页设计的毕业论文台州关键词优化推荐
文章目录
- 初始化服务,获取nacosClient
- 获取配置
- 源码逻辑
- 获取json配置
- 发布配置
- 监听配置变化
- 源码逻辑
- listenConfigExecutor调度longPulling
- longPulling
- callListener
- 监听表 中 配置 的 结构
- 监听功能的调用逻辑
初始化服务,获取nacosClient
-
Nacos Client Config
- 是用于配置 Nacos 客户端的选项
- 它包含了客户端连接 Nacos 服务器所需的配置信息
- 连接的命名空间(NamespaceId)
- 连接超时时间(TimeoutMs)
- 是否在启动时不加载缓存(NotLoadCacheAtStart)
- 日志级别(LogLevel)等。
-
Nacos Server Config
- 是用于配置 Nacos 服务器的选项
- 它包含了 Nacos 服务器的配置信息
- 服务器的 IP 地址(IpAddr)
- 端口号(Port)
- 上下文路径(ContextPath)
- 协议(Scheme)等。
-
结构:
-
type NacosClient struct {client config_client.IConfigClient } // ClientConfigOptions 存储Nacos ClientConfig的部分配置项 type ClientConfigOptions struct {NamespaceId string `json:"namespaceId"`TimeoutMs uint64 `json:"timeoutMs"`NotLoadCacheAtStart bool `json:"notLoadCacheAtStart"`LogLevel string `json:"logLevel"`AppendToStdout bool `json:"appendToStdout"`LogDir string `json:"logDir"`CacheDir string `json:"cacheDir"` }type ServerConfigOptions struct {IpAddr string `json:"ipAddr"`Port uint64 `json:"port"`ContextPath string `json:"contextPath"`Scheme string `json:"scheme"` }
-
-
初始化
// Nacos Client Config
namespace := "3ac59d8c-8213-4619-859a-a00477496ae4"
ccOpts := nacos.ClientConfigOptions{NamespaceId: namespace,TimeoutMs: 100000000,NotLoadCacheAtStart: true,LogLevel: "debug",AppendToStdout: true,LogDir: "./config",CacheDir: "./config",
}
cfg.NacosClient = &ccOpts// Nacos Server Config
scOpt := nacos.ServerConfigOptions{IpAddr: "nacos.dev.surreal-ai.com",Port: 443,ContextPath: "/nacos",Scheme: "https",
}
cfg.NacosServer = &scOpt
- 创建
-
func NewNacosClient(ccOpts *ClientConfigOptions, scOpt *ServerConfigOptions) (*NacosClient, error) {//new 返回指针sc := []constant.ServerConfig{{IpAddr: scOpt.IpAddr,Port: scOpt.Port,ContextPath: scOpt.ContextPath,Scheme: scOpt.Scheme,},}cc := constant.ClientConfig{NamespaceId: ccOpts.NamespaceId,TimeoutMs: ccOpts.TimeoutMs,NotLoadCacheAtStart: ccOpts.NotLoadCacheAtStart,LogDir: ccOpts.LogDir,CacheDir: ccOpts.CacheDir,LogLevel: ccOpts.LogLevel,AppendToStdout: ccOpts.AppendToStdout,}// a more graceful way to create config clientclient, err := clients.NewConfigClient(vo.NacosClientParam{ClientConfig: &cc,ServerConfigs: sc,},)if err != nil {panic(err)}// 创建新的NacosClient实例并将config_client.IConfigClient包装在其中nacosClient := &NacosClient{client: client,}return nacosClient, nil }
-
获取配置
func (c *NacosClient) GetString(dataid string, group string) (string, error) {content, err := c.client.GetConfig(vo.ConfigParam{DataId: dataid,Group: group,})if err != nil {return content, err}return content, nil
}
源码逻辑
-
GetConfig
-
func (client *ConfigClient) GetConfig(param vo.ConfigParam) (content string, err error) {content, err = client.getConfigInner(param)if err != nil {return "", err}return client.decrypt(param.DataId, content) }
-
每次请求获取配置
-
-
getConfigInner
func (client *ConfigClient) getConfigInner(param vo.ConfigParam) (content string, err error) {if len(param.DataId) <= 0 {err = errors.New("[client.GetConfig] param.dataId can not be empty")return "", err}if len(param.Group) <= 0 {err = errors.New("[client.GetConfig] param.group can not be empty")return "", err}clientConfig, _ := client.GetClientConfig()cacheKey := util.GetConfigCacheKey(param.DataId, param.Group, clientConfig.NamespaceId)content, err = client.configProxy.GetConfigProxy(param, clientConfig.NamespaceId, clientConfig.AccessKey, clientConfig.SecretKey)if err != nil {logger.Errorf("get config from server error:%+v ", err)if _, ok := err.(*nacos_error.NacosError); ok {nacosErr := err.(*nacos_error.NacosError)if nacosErr.ErrorCode() == "404" {cache.WriteConfigToFile(cacheKey, client.configCacheDir, "")logger.Warnf("[client.GetConfig] config not found, dataId: %s, group: %s, namespaceId: %s.", param.DataId, param.Group, clientConfig.NamespaceId)return "", nil}if nacosErr.ErrorCode() == "403" {return "", errors.New("get config forbidden")}}content, err = cache.ReadConfigFromFile(cacheKey, client.configCacheDir)if err != nil {logger.Errorf("get config from cache error:%+v ", err)return "", errors.New("read config from both server and cache fail")}} else {cache.WriteConfigToFile(cacheKey, client.configCacheDir, content)}return content, nil
}
- 整体逻辑:
- 先尝试从配置服务器获取配置信息
- 如果获取失败则从缓存中获取,如果获取成功则将获取到的配置写入缓存。
- 先判断参数是否合法
- 获取客户端参数和 缓存key
- 向nacos获取
- 获取成功
- 写入对应的缓存文件返回
- 获取失败
- 首先在日志中记录获取配置失败的错误信息。然后,检查错误类型是否为
*nacos_error.NacosError
- 如果是,进一步判断错误码。
- 如果错误码是 “404”,表示配置不存在
- 则将空配置写入缓存,并在日志中记录相应的警告信息,然后返回空内容。
- 如果是,进一步判断错误码。
- 如果错误码是 “403”
- 表示无权限访问配置,函数会返回对应的错误信息。
- 如果不是 “404” 或 “403”
- 则表示从配置服务器获取配置信息失败
- 这时尝试从缓存文件中读取配置数据
- 使用
cache.ReadConfigFromFile()
方法从缓存中获取配置内容。 - 如果从缓存中获取失败,函数会返回对应的错误信息。
- 使用
- 首先在日志中记录获取配置失败的错误信息。然后,检查错误类型是否为
- 获取成功
- 如果从配置服务器获取配置信息成功(即
err
为nil
)- 则将获取到的配置内容写入缓存中,使用
cache.WriteConfigToFile()
方法。
- 则将获取到的配置内容写入缓存中,使用
获取json配置
func (c *NacosClient) GetObject(dataid string, group string, obj interface{}) error {val, err := c.client.GetConfig(vo.ConfigParam{DataId: dataid,Group: group,})if err != nil {return err}// 假设配置值是JSON格式的,可以使用json.Unmarshal将其解析到传入的obj中err = json.Unmarshal([]byte(val), obj)if err != nil {return err}return nil
}
发布配置
// PublishConfig is used to publish a configuration to Nacos.
func (c *NacosClient) PublishConfig(dataId, group, content string) error {_, err := c.client.PublishConfig(vo.ConfigParam{DataId: dataId,Group: group,Content: content,})if err != nil {return err}return nil
}
监听配置变化
func (c *NacosClient) Listen(dataid string, group string, callback func(namespace, group, dataId, data string)) error {err := c.client.ListenConfig(vo.ConfigParam{DataId: dataid,Group: group,OnChange: func(namespace, group, dataId, data string) {callback(namespace, group, dataId, data)},})if err != nil {return err}return nil
}
源码逻辑
func (client *ConfigClient) ListenConfig(param vo.ConfigParam) (err error) {if len(param.DataId) <= 0 {err = errors.New("[client.ListenConfig] DataId can not be empty")return err}if len(param.Group) <= 0 {err = errors.New("[client.ListenConfig] Group can not be empty")return err}clientConfig, err := client.GetClientConfig()if err != nil {err = errors.New("[checkConfigInfo.GetClientConfig] failed")return err}key := util.GetConfigCacheKey(param.DataId, param.Group, clientConfig.NamespaceId)var cData cacheDataif v, ok := client.cacheMap.Get(key); ok {cData = v.(cacheData)cData.isInitializing = true} else {var (content stringmd5Str string)if content, _ = cache.ReadConfigFromFile(key, client.configCacheDir); len(content) > 0 {md5Str = util.Md5(content)}listener := &cacheDataListener{listener: param.OnChange,lastMd5: md5Str,}cData = cacheData{isInitializing: true,dataId: param.DataId,group: param.Group,tenant: clientConfig.NamespaceId,content: content,md5: md5Str,cacheDataListener: listener,taskId: client.cacheMap.Count() / perTaskConfigSize,}}client.cacheMap.Set(key, cData)return
}
- 参数param为vo.ConfigParam类型
- 表示要监听的配置信息,包括DataId、Group和OnChange等。
- 如果DataId或Group为空,
- 则返回对应的错误信息。
- 首先获取客户端的配置信息,然后根据DataId、Group和NamespaceId构建一个缓存键key。
- 读取监听表 : cacheMap[key]
- 如果监听表中存在对应的配置信息 (cacheMap中存在这个key)
- 则将该配置信息的isInitializing字段设置为true
- 如果监听表中不存在对应的配置信息
- 则从缓存文件中读取内容并计算md5值,创建一个新的cacheData对象
- 并且设置isInitializing字段设置为true
- 如果监听表中存在对应的配置信息 (cacheMap中存在这个key)
- 将该对象加入cacheMap 中,由longPulling 进行监听
- 读取监听表
- 假如有的话,将设置为true
- 没有的话,从缓存文件中读取配置,创建新的 对象放入 监听表中
listenConfigExecutor调度longPulling
func (client *ConfigClient) listenConfigExecutor() func() error {return func() error {// 计算当前监听器的数量listenerSize := client.cacheMap.Count()// 计算总共需要的任务数量,每个任务处理的监听器数量为 perTaskConfigSizetaskCount := int(math.Ceil(float64(listenerSize) / float64(perTaskConfigSize)))// 获取当前正在执行的任务数量currentTaskCount := int(atomic.LoadInt32(&client.currentTaskCount))// 根据任务数量的比较,进行任务的启动和停止if taskCount > currentTaskCount {// 有新的监听器加入,需要启动新的任务来处理新的监听器for i := currentTaskCount; i < taskCount; i++ {// 设置任务状态为运行中client.schedulerMap.Set(strconv.Itoa(i), true)// 创建新的定时器,定时触发长轮询操作 client.longPulling(i)go client.delayScheduler(time.NewTimer(1*time.Millisecond), 10*time.Millisecond, strconv.Itoa(i), client.longPulling(i))}// 更新当前任务数量为 taskCountatomic.StoreInt32(&client.currentTaskCount, int32(taskCount))} else if taskCount < currentTaskCount {// 有监听器停止监听,需要停止相应的任务for i := taskCount; i < currentTaskCount; i++ {// 检查相应任务是否在 schedulerMap 中,如果存在则将任务状态设置为停止if _, ok := client.schedulerMap.Get(strconv.Itoa(i)); ok {client.schedulerMap.Set(strconv.Itoa(i), false)}}// 更新当前任务数量为 taskCountatomic.StoreInt32(&client.currentTaskCount, int32(taskCount))}return nil}
}
- 根据监听的个数 计算当前需要的 任务协程
longPulling
// longPulling 是一个长轮询监听配置变化的方法。
// 参数 taskId 表示监听任务的标识,用于区分不同的监听任务。
// 该方法返回一个函数,该函数用于执行长轮询操作,监听配置变化。
func (client *ConfigClient) longPulling(taskId int) func() error {return func() error {var listeningConfigs stringinitializationList := make([]cacheData, 0)// 遍历缓存中的所有配置信息,根据 taskId 筛选出当前监听任务的配置信息for _, key := range client.cacheMap.Keys() {if value, ok := client.cacheMap.Get(key); ok {cData := value.(cacheData)if cData.taskId == taskId {// 如果配置数据正在初始化中,则将其加入 initializationList 列表if cData.isInitializing {initializationList = append(initializationList, cData)}// 构建监听配置列表listeningConfigs,用于发送给配置服务器进行监听if len(cData.tenant) > 0 {listeningConfigs += cData.dataId + constant.SPLIT_CONFIG_INNER + cData.group + constant.SPLIT_CONFIG_INNER +cData.md5 + constant.SPLIT_CONFIG_INNER + cData.tenant + constant.SPLIT_CONFIG} else {listeningConfigs += cData.dataId + constant.SPLIT_CONFIG_INNER + cData.group + constant.SPLIT_CONFIG_INNER +cData.md5 + constant.SPLIT_CONFIG}}}}// 如果有要监听的配置信息,则继续进行长轮询if len(listeningConfigs) > 0 {clientConfig, err := client.GetClientConfig()if err != nil {logger.Errorf("[checkConfigInfo.GetClientConfig] 获取客户端配置失败 err: %+v", err)return err}// 构建监听配置请求参数params,用于发送给配置服务器进行监听params := make(map[string]string)params[constant.KEY_LISTEN_CONFIGS] = listeningConfigsvar changed string// 发送监听配置的请求,进行长轮询changedTmp, err := client.configProxy.ListenConfig(params, len(initializationList) > 0, clientConfig.NamespaceId, clientConfig.AccessKey, clientConfig.SecretKey)if err == nil {changed = changedTmp} else {// 如果监听配置出现错误,尝试处理错误情况if _, ok := err.(*nacos_error.NacosError); ok {// 如果返回的错误是NacosError类型,则将监听结果设为变更信息(changedTmp)changed = changedTmp} else {// 否则,记录错误日志并返回错误logger.Errorf("[client.ListenConfig] 监听配置错误 err: %+v", err)}return err}// 对于初始化中的配置数据,将其isInitializing字段设为false,表示配置数据已经初始化完毕for _, v := range initializationList {v.isInitializing = falseclient.cacheMap.Set(util.GetConfigCacheKey(v.dataId, v.group, v.tenant), v)}// 根据监听结果changed,判断是否有配置发生了变更,如果有则通知相应的监听器进行处理if len(strings.ToLower(strings.Trim(changed, " "))) == 0 {logger.Info("[client.ListenConfig] 配置无变更")} else {logger.Info("[client.ListenConfig] 配置发生变更: " + changed)client.callListener(changed, clientConfig.NamespaceId)}}// 返回nil,表示长轮询监听成功完成return nil}
}
- 遍历监听表,拼接listeningConfigs
-
listeningConfigs += cData.dataId + constant.SPLIT_CONFIG_INNER + cData.group + constant.SPLIT_CONFIG_INNER +cData.md5 + constant.SPLIT_CONFIG_INNER + cData.tenant + constant.SPLIT_CONFIG
-
- 发送请求
- 全部的 isInitializing = false , 一次长轮训结束
- 根据返回信息处理
- 假如为空,则未变化
- 不为空
- client.callListener(changed, clientConfig.NamespaceId) 处理变化的值
callListener
// Execute the Listener callback func()
// 执行监听器的回调函数
// 当配置发生变更时,通过该方法通知相应的监听器进行处理。
func (client *ConfigClient) callListener(changed, tenant string) {// 解码配置变更字符串changedDecoded, _ := url.QueryUnescape(changed)// 使用分隔符 "\u0001" 拆分配置变更信息,得到一个 changedConfigs 切片,每个元素表示一个配置项的变更内容。changedConfigs := strings.Split(changedDecoded, "\u0001")// 遍历 changedConfigs,每个元素代表一个配置项的变更信息。for _, config := range changedConfigs {// 使用分隔符 "\u0002" 拆分配置变更项,得到一个 attrs 切片,包含了配置项的 DataId 和 Group 信息,以及其他可能的变更内容。attrs := strings.Split(config, "\u0002")// 如果 attrs 的长度大于等于 2,表示配置项的 DataId 和 Group 信息是有效的,可以根据这些信息从缓存中获取相应的配置数据。if len(attrs) >= 2 {// 从缓存中获取配置数据if value, ok := client.cacheMap.Get(util.GetConfigCacheKey(attrs[0], attrs[1], tenant)); ok {cData := value.(cacheData)// 获取配置内容,并计算新的 MD5 值content, err := client.getConfigInner(vo.ConfigParam{DataId: cData.dataId,Group: cData.group,})if err != nil {// 获取配置内容出错,记录错误日志并继续处理下一个配置变更项logger.Errorf("[client.getConfigInner] DataId:[%s] Group:[%s] Error:[%+v]", cData.dataId, cData.group, err)continue}// 更新配置数据cData.content = contentcData.md5 = util.Md5(content)// 如果 MD5 值与之前的不同,则表示配置发生了变更,调用监听器的回调函数处理配置变更。if cData.md5 != cData.cacheDataListener.lastMd5 {go cData.cacheDataListener.listener(tenant, attrs[1], attrs[0], cData.content)cData.cacheDataListener.lastMd5 = cData.md5client.cacheMap.Set(util.GetConfigCacheKey(cData.dataId, cData.group, tenant), cData)}}}}
}
- 解析出每个变化的配置项信息 ,分隔符"\u0001"
- 遍历
- 解析出具体信息 ,分隔符\u0002"
- 解析出的信息长度大于等于 2,表示配置项的 DataId 和 Group 信息是有效的
- 从监听表 中 拿 这个 配置数据
- getConfigInner 从远端 获取对应的最新数据
- 更新配置数据到
- 假如与之前的MD5 不一样
- 调用 自己设置的回调函数
- 更新监听表
监听表 中 配置 的 结构
type cacheData struct {isInitializing booldataId stringgroup stringcontent stringtenant stringcacheDataListener *cacheDataListener // 回调函数的封装md5 stringappName stringtaskId int
}
- cacheDataListener
-
type cacheDataListener struct {listener vo.Listener //type Listener func(namespace, group, dataId, data string)lastMd5 string }
-