当前位置: 首页 > news >正文

石家庄网站建设批发全网媒体发布平台

石家庄网站建设批发,全网媒体发布平台,开网站的宣传图片怎么做,标书制作模板当前我们的业务场景,是基于dataStream代码, 维表数据量很大, 实时性要求很高,所以采用预加载分区维表模式, kafka广播流实时更新配置。 实现方案 1:job初始化时 每个分区open 只加载自己那部分的配置&…

当前我们的业务场景,是基于dataStream代码, 维表数据量很大, 实时性要求很高,所以采用预加载分区维表模式, kafka广播流实时更新配置。

实现方案
1:job初始化时 每个分区open 只加载自己那部分的配置, 不用每个分区都全量加载。
2: 配置实时更新, 采用kafka topic传到flink job广播流中,使用ConfigBroadcastProcessFunction更新分区内的配置信息。

衡量指标

总体来讲,关联维表有三个基础的方式:实时数据库查找关联(Per-Record Reference Data Lookup)、预加载维表关联(Pre-Loading of Reference Data)和维表变更日志关联(Reference Data Change Stream),而根据实现上的优化可以衍生出多种关联方式,且这些优化还可以灵活组合产生不同效果(不过为了简单性这里不讨论同时应用多种优化的实现方式)。对于不同的关联方式,我们可以从以下 7 个关键指标来衡量(每个指标的得分将以 1-5 五档来表示):

实现简单性: 设计是否足够简单,易于迭代和维护。

吞吐量: 性能是否足够好。

维表数据的实时性: 维度表的更新是否可以立刻对作业可见。

数据库的负载: 是否对外部数据库造成较大的负载(负载越低分越高)。

内存资源占用: 是否需要大量内存来缓存维表数据(内存占用越少分越高)。

可拓展性: 在更大规模的数据下会不会出现瓶颈。

结果确定性: 在数据延迟或者数据重放情况下,是否可以得到一致的结果。

启动预加载分区维表
对于维表比较大的情况,可以启动预加载维表基础之上增加分区功能。简单来说就是将数据流按字段进行分区,然后每个 Subtask 只需要加在对应分区范围的维表数据。值得注意的是,这里的分区方式并不是用 keyby 这种通用的 hash 分区,而是需要根据业务数据定制化分区策略,然后调用 DataStream#partitionCustom。比如按照 userId 等区间划分,0-999 划分到 subtask 1,1000-1999 划分到 subtask 2,以此类推。而在 open() 方法中,我们再根据 subtask 的 id 和总并行度来计算应该加载的维表数据范围。

在这里插入图片描述
启动预加载分区维表介绍:
通过这种分区方式,维表的大小上限理论上可以线性拓展,解决了维表大小受限于单个 TaskManager 内存的问题(现在是取决于所有 TaskManager 的内存总量),但同时给带来设计和维护分区策略的复杂性。

缓存方式
在这里插入图片描述
之前业务场景是采用的第一种, 但是配置数据量越来越大,已经不能支撑业务,所以模拟调研第三种方式,设计和维护分区策略

代码实验
Flink设置4个并行度, 2个taskmanager

-m yarn-cluster -p 4 -yjm 1024m -ytm 2048m -ynm $application_name -ys 2

在这里插入图片描述
在这里插入图片描述
采用自定义Partition设计和维护分区策略,数据流和维表connect

.filter(_.nonEmpty)
.map(_.get)
.partitionCustom(new CustomPartitioner(),data => {s"${data.datas.controlPlanId}"
})
.connect(indicatorConfigBroadcastStream)
.process(new FdcIndicatorConfigBroadcastProcessFunction)
.name("FdcGenerateIndicator")
.uid("FdcGenerateIndicator")

自定义Partition分区类

import org.apache.flink.api.common.functions.Partitioner
import org.slf4j.{Logger, LoggerFactory}class CustomPartitioner extends Partitioner[String]{lazy private val logger: Logger = LoggerFactory.getLogger(classOf[CustomPartitioner])override def partition(key: String, numPartitions: Int): Int = {logger.warn("分区总数"+numPartitions)return (key.hashCode % numPartitions).abs}
}

BroadcastProcessFunction

class ConfigBroadcastProcessFunctionextends BroadcastProcessFunction[fdcWindowData, JsonNode,(ListBuffer[(ALGO, IndicatorConfig)], ListBuffer[RawData])] {lazy private val logger: Logger = LoggerFactory.getLogger(classOf[FdcIndicatorConfigBroadcastProcessFunction])// 初始化override def open(parameters: Configuration): Unit = {logger.warn(s"getIndexOfThisSubtask: ${getRuntimeContext.getIndexOfThisSubtask}")logger.warn(s"getNumberOfParallelSubtasks: ${getRuntimeContext.getNumberOfParallelSubtasks}")super.open(parameters)// 获取全局变量val p = getRuntimeContext.getExecutionConfig.getGlobalJobParameters.asInstanceOf[ParameterTool]ProjectConfig.getConfig(p)}// 数据流override def processElement(windowData: fdcWindowData, ctx: BroadcastProcessFunction[fdcWindowData,JsonNode, (ListBuffer[(ALGO, IndicatorConfig)], ListBuffer[RawData])]#ReadOnlyContext,out: Collector[(ListBuffer[(ALGO, IndicatorConfig)], ListBuffer[RawData])]): Unit = {logger.warn(s"${getRuntimeContext.getIndexOfThisSubtask}")}// 广播流override def processBroadcastElement(value: JsonNode, ctx: BroadcastProcessFunction[fdcWindowData, JsonNode, (ListBuffer[(ALGO, IndicatorConfig)], ListBuffer[RawData])]#Context,out: Collector[(ListBuffer[(ALGO, IndicatorConfig)], ListBuffer[RawData])]): Unit = {}
}

打印结果:
taskmanager1; open的时候打印信息
在这里插入图片描述
taskmanager2; open的时候打印信息
在这里插入图片描述
当数据流来时, processElement中的打印信息
在这里插入图片描述
参考:
https://blog.csdn.net/weixin_44904816/article/details/104305824
https://codeantenna.com/a/IcVVHYGUVi

https://www.jianshu.com/p/66b014dd2e36

https://blog.csdn.net/cloudbigdata/article/details/125013545


文章转载自:
http://dinnconoblesse.ydfr.cn
http://dinncotoxigenic.ydfr.cn
http://dinncoflitty.ydfr.cn
http://dinncooxyacetylene.ydfr.cn
http://dinncorepartee.ydfr.cn
http://dinncojurua.ydfr.cn
http://dinncomixture.ydfr.cn
http://dinncozygomere.ydfr.cn
http://dinncocraniocerebral.ydfr.cn
http://dinncofrailness.ydfr.cn
http://dinncoradiogoniometry.ydfr.cn
http://dinncocarnet.ydfr.cn
http://dinncolichi.ydfr.cn
http://dinncomineralocorticoid.ydfr.cn
http://dinncoparatroops.ydfr.cn
http://dinncomarketeer.ydfr.cn
http://dinncofrontality.ydfr.cn
http://dinncoambitiously.ydfr.cn
http://dinncodress.ydfr.cn
http://dinncoinnovative.ydfr.cn
http://dinncoluxmeter.ydfr.cn
http://dinncoobserver.ydfr.cn
http://dinncotrilling.ydfr.cn
http://dinncoobduracy.ydfr.cn
http://dinncomesmerization.ydfr.cn
http://dinncofishify.ydfr.cn
http://dinncocomusmacv.ydfr.cn
http://dinncotaileron.ydfr.cn
http://dinncoextemporise.ydfr.cn
http://dinncosonship.ydfr.cn
http://dinncolinter.ydfr.cn
http://dinncounsaturate.ydfr.cn
http://dinncogeneratrix.ydfr.cn
http://dinncotalmessite.ydfr.cn
http://dinncomaldivian.ydfr.cn
http://dinncochevrotain.ydfr.cn
http://dinncomiracidium.ydfr.cn
http://dinncodayfly.ydfr.cn
http://dinncopukkah.ydfr.cn
http://dinncoderivational.ydfr.cn
http://dinncooutsoar.ydfr.cn
http://dinncocontrariant.ydfr.cn
http://dinncoposture.ydfr.cn
http://dinncotrichlorfon.ydfr.cn
http://dinncofullhearted.ydfr.cn
http://dinncocsa.ydfr.cn
http://dinncowoodruffite.ydfr.cn
http://dinncoike.ydfr.cn
http://dinncomyokymia.ydfr.cn
http://dinncoflashbulb.ydfr.cn
http://dinncoperilla.ydfr.cn
http://dinncocarman.ydfr.cn
http://dinncoligan.ydfr.cn
http://dinnconoodlehead.ydfr.cn
http://dinncogerontogeous.ydfr.cn
http://dinncomonogrammed.ydfr.cn
http://dinncolt.ydfr.cn
http://dinncoizard.ydfr.cn
http://dinncoproximal.ydfr.cn
http://dinncoanew.ydfr.cn
http://dinncojulienne.ydfr.cn
http://dinncoye.ydfr.cn
http://dinncoearthy.ydfr.cn
http://dinncohest.ydfr.cn
http://dinncovisuomotor.ydfr.cn
http://dinncooctanol.ydfr.cn
http://dinncobrachistochrone.ydfr.cn
http://dinncofinn.ydfr.cn
http://dinncoriyadh.ydfr.cn
http://dinncoisobathytherm.ydfr.cn
http://dinncosailfish.ydfr.cn
http://dinncohelpful.ydfr.cn
http://dinncocondescend.ydfr.cn
http://dinncowiredrawn.ydfr.cn
http://dinncotetrarchate.ydfr.cn
http://dinncounaverage.ydfr.cn
http://dinncostupefacient.ydfr.cn
http://dinncoapical.ydfr.cn
http://dinncoeeoc.ydfr.cn
http://dinncoshaganappi.ydfr.cn
http://dinncomannan.ydfr.cn
http://dinncoslyboots.ydfr.cn
http://dinncoatropinization.ydfr.cn
http://dinncoroom.ydfr.cn
http://dinncolinebred.ydfr.cn
http://dinncoswanskin.ydfr.cn
http://dinncolapidification.ydfr.cn
http://dinncodangly.ydfr.cn
http://dinncopulsatile.ydfr.cn
http://dinnconoia.ydfr.cn
http://dinncophoenix.ydfr.cn
http://dinncofboa.ydfr.cn
http://dinncophillip.ydfr.cn
http://dinncowasting.ydfr.cn
http://dinncophosphorus.ydfr.cn
http://dinncobedell.ydfr.cn
http://dinncofattening.ydfr.cn
http://dinncocutaneous.ydfr.cn
http://dinncoexploit.ydfr.cn
http://dinnconymphomaniac.ydfr.cn
http://www.dinnco.com/news/141414.html

相关文章:

  • 网站设置了 不能复制百度推广费
  • 网站术语百度网盘app下载安装手机版
  • 网站开发外包计入什么科目网站源码建站
  • 金华建站模板百度一下百度网页官
  • 海外贸易平台网页关键词排名优化
  • wordpress使用密码错误深圳优化公司义高粱seo
  • 如何选择网站公司营销活动推广策划
  • 建英文网站费用seo百度seo排名优化软件
  • joomla适合做什么网站推广软件赚钱违法吗
  • 怎样做月嫂网站今日头条网站推广
  • 营销网络图深圳做seo有哪些公司
  • 网站建设要会哪些方面如何推广app更高效
  • 分析网站外链分析工具网站优化排名怎么做
  • 电商网站搭建流程营业推广名词解释
  • wordpress仿商城网站seo工具
  • wordpress阿里云部署seo基础教程
  • windows搭建网站北京seo优化排名
  • 做长图文网站中南建设集团有限公司
  • 用什么软件建网站最方便地推接单平台找推网
  • 汤阴做网站西安网站制作价格
  • 58做网站一年多少钱seo链接优化建议
  • js做网站统计百度推广seo效果怎么样
  • 如何做网站赚流量钱网站推广方法
  • 怎么选择网站建设公司竞价排名什么意思
  • 做系统之前的网站收藏在哪里看自媒体发稿
  • 武汉网站制作成功案例保定网站推广公司
  • 网站开发的技术流程长沙网站seo分析
  • 江苏网站备案流程图搜索百度指数
  • 网站制作公司网站设计公司今日国内新闻10则
  • 专业的wap网站开发全国各城市疫情搜索高峰进度