当前位置: 首页 > news >正文

建e室内设计网贴图北京搜索优化排名公司

建e室内设计网贴图,北京搜索优化排名公司,内容管理网站,兰州网站建设|兰州网站制作|兰州网站设计公司|兰州网络公司广播流是什么? 将一条数据广播到所有的节点。使用 dataStream.broadCast() 广播流使用场景? 一般用于动态加载配置项。比如lol,每天不断有人再投诉举报,客服根本忙不过来,腾讯内部做了一个判断,只有vip3…

广播流是什么?

将一条数据广播到所有的节点。使用 dataStream.broadCast()

广播流使用场景?

一般用于动态加载配置项。比如lol,每天不断有人再投诉举报,客服根本忙不过来,腾讯内部做了一个判断,只有vip3以上的客户的投诉才会有人工一对一回复,过了一段时间大家都发现vip3才有人工,都开始充钱到vip3,此时人还是很多,于是只有vip4上的客户才能人工回复

vip3->vip4 这种判断标准在不断的变化。此时就需要广播流。因为此时数据只有1条,需要多个节点都收到这个变化的数据。

广播流怎么用?

一般通过connect合流去操作 a connect b.broadcast 。a是主流也就是数据流,b是配置变化流

不多说直接上demo,开箱即用

package com.chenchi.broadcast;import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;import java.util.HashMap;
import java.util.Random;public class BroadCastStreamDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Pattern> patternDataStream = env.addSource(new ChangeSource());DataStream<User> userDataStream = env.addSource(new CustomerSource());userDataStream.print("user");patternDataStream.print("pattern");//test1  直接合流 不广播。只会在一个节点更新。 用于特殊需求?
//        userDataStream
//                .keyBy(user -> user.userId)
//                .connect(patternDataStream)
//                .process(new CustomerSimpleProcess())
//                .print();//test2// 定义广播状态的描述器,创建广播流 如何保存需要的广播数据呢 这个案例是通过map保留变化数据
//        userDataStream
//                .keyBy(user -> user.userId)
//                .connect(patternDataStream.broadcast())
//                .process(new CustomerSimpleProcess())
//                        .print();//test3MapStateDescriptor<Void, Pattern> bcStateDescriptor = new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class));//通过描述器 更新BroadcastStream<Pattern> broadcast = patternDataStream.broadcast(bcStateDescriptor);userDataStream.keyBy(user -> user.userId).connect(broadcast).process(new CustomerBroadCastProcess()).print();env.execute();}private static class CustomerBroadCastProcess extends KeyedBroadcastProcessFunction<Integer, User, Pattern, String> {@Overridepublic void processElement(User user, KeyedBroadcastProcessFunction<Integer, User, Pattern, String>.ReadOnlyContext readOnlyContext, Collector<String> collector) throws Exception {Integer userVip = user.getVip();//获取广播流的数据 不是通过map保存Pattern pattern = readOnlyContext.getBroadcastState(new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class))).get(null);if (pattern!=null){Integer patternVip = pattern.vip;String result = "当前系统需要的vip等级=" + patternVip + ",用户id=" + user.userId + ",vip=" + userVip;if (userVip>= patternVip){result=result+"符合要求";}else {result=result+"不符合要求";}collector.collect(result);}else {System.out.println("pattern is null ");}}@Overridepublic void processBroadcastElement(Pattern pattern, KeyedBroadcastProcessFunction<Integer,User, Pattern, String>.Context context, Collector<String> collector) throws Exception {BroadcastState<Void, Pattern> bcState = context.getBroadcastState(new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class)));// 将广播状态更新为当前的patternbcState.put(null, pattern);}}public static class CustomerSimpleProcess extends CoProcessFunction<User, Pattern, String> {ValueState<Integer> vip; //这个是保留主流的state的。 不是保留广播流的stateHashMap<String,Integer> vipMap;@Overridepublic void open(Configuration parameters) throws Exception {vip = getRuntimeContext().getState(new ValueStateDescriptor<>("vip", Integer.class));vipMap=new HashMap<String,Integer>();super.open(parameters);}@Overridepublic void processElement1(User user, CoProcessFunction<User, Pattern, String>.Context context, Collector<String> collector) throws Exception {Integer userVip = user.getVip();Integer patternVip = vipMap.getOrDefault("vip", 0);String result = "当前系统需要的vip等级=" + patternVip + ",用户id=" + user.userId + ",vip=" + userVip;if (userVip>=patternVip){result=result+"符合要求";}else {result=result+"不符合要求";}collector.collect(result);}@Overridepublic void processElement2(Pattern pattern, CoProcessFunction<User, Pattern, String>.Context context, Collector<String> collector) throws Exception {vipMap.put("vip",pattern.vip);}}public static class User {public Integer userId;public Integer vip;public User() {}public User(Integer userId, Integer vip) {this.userId = userId;this.vip = vip;}public Integer getUserId() {return userId;}public void setUserId(Integer userId) {this.userId = userId;}public Integer getVip() {return vip;}public void setVip(Integer vip) {this.vip = vip;}@Overridepublic String toString() {return "Action{" +"userId=" + userId +", vip='" + vip + '\'' +'}';}}// 定义行为模式POJO类,包含先后发生的两个行为public static class Pattern {public Integer vip;public Pattern() {}public Pattern(Integer vip) {this.vip = vip;}@Overridepublic String toString() {return "Pattern{" +"vip='" + vip + '\'' +'}';}}private static class CustomerSource implements SourceFunction<User> {boolean run = true;@Overridepublic void run(SourceContext<User> sourceContext) throws Exception {while (true) {Integer userId = new Random().nextInt(1000);Integer vip = new Random().nextInt(10);sourceContext.collect(new User(userId, vip));Thread.sleep(1000);}}@Overridepublic void cancel() {run = false;}}private static class ChangeSource implements SourceFunction<Pattern> {boolean run = true;@Overridepublic void run(SourceContext<Pattern> sourceContext) throws Exception {int i = 1;while (true) {sourceContext.collect(new Pattern(i++));Thread.sleep(5000);}}@Overridepublic void cancel() {run = false;}}}

demo思想:以上述vip做例子,获取用户不断投诉的id和vip等级, 数据库保存可以享受人工服务的vip等级,该等级可以自行调整(我是随着时间变化主键增大)。

test1 不广播

注意看pattern:4 print vip=2的消息但是不代表是task4收到的消息,我们看到>1输出了vip=2

但是task10 task9都还是vip=0 ,说明流没有广播,除非此处并行度设置为1

test2 map保存变化数据

test3通过描述器获取数据

和test2 一样,不过要注意因为两个流的数据有先后,可能还没有pattern就来了user信息,所以建议先初始化,或者先添加pattern流。


文章转载自:
http://dinncosagacious.zfyr.cn
http://dinnconep.zfyr.cn
http://dinncosalat.zfyr.cn
http://dinncophenazocine.zfyr.cn
http://dinncosemiplastic.zfyr.cn
http://dinncoazobenzene.zfyr.cn
http://dinncopsychoactivity.zfyr.cn
http://dinncoerp.zfyr.cn
http://dinncothruster.zfyr.cn
http://dinncohushaby.zfyr.cn
http://dinncoprecipitate.zfyr.cn
http://dinncoglasshouse.zfyr.cn
http://dinncoasleep.zfyr.cn
http://dinncodistortedly.zfyr.cn
http://dinncoassortative.zfyr.cn
http://dinncokaboodle.zfyr.cn
http://dinncotikker.zfyr.cn
http://dinncopeninsular.zfyr.cn
http://dinncoploughing.zfyr.cn
http://dinncointervallic.zfyr.cn
http://dinncophidian.zfyr.cn
http://dinncosemiorbicular.zfyr.cn
http://dinncoexpansionist.zfyr.cn
http://dinncoatrazine.zfyr.cn
http://dinncoboner.zfyr.cn
http://dinncofrescoist.zfyr.cn
http://dinncoherein.zfyr.cn
http://dinncoremonstration.zfyr.cn
http://dinncostrobotron.zfyr.cn
http://dinncopriam.zfyr.cn
http://dinncoanaerophyte.zfyr.cn
http://dinncodepartmentalise.zfyr.cn
http://dinncoprau.zfyr.cn
http://dinncoimpotence.zfyr.cn
http://dinncoloessial.zfyr.cn
http://dinncosubjectivism.zfyr.cn
http://dinncotempestuously.zfyr.cn
http://dinncoanthologist.zfyr.cn
http://dinncooddball.zfyr.cn
http://dinncounbeseeming.zfyr.cn
http://dinncovtc.zfyr.cn
http://dinncoconchoid.zfyr.cn
http://dinncohoarfrost.zfyr.cn
http://dinncobludger.zfyr.cn
http://dinncolingala.zfyr.cn
http://dinncokevlar.zfyr.cn
http://dinncovaporish.zfyr.cn
http://dinncoequitably.zfyr.cn
http://dinncojadder.zfyr.cn
http://dinncochalutz.zfyr.cn
http://dinncoyeggman.zfyr.cn
http://dinncobosseyed.zfyr.cn
http://dinncofadedly.zfyr.cn
http://dinncoarblast.zfyr.cn
http://dinncobinary.zfyr.cn
http://dinncorevisal.zfyr.cn
http://dinncohypopyon.zfyr.cn
http://dinncosaccular.zfyr.cn
http://dinncocordotomy.zfyr.cn
http://dinncoou.zfyr.cn
http://dinncoirradiation.zfyr.cn
http://dinncounifier.zfyr.cn
http://dinncokashmir.zfyr.cn
http://dinncoenfranchisement.zfyr.cn
http://dinncosequestral.zfyr.cn
http://dinncolocke.zfyr.cn
http://dinncogymp.zfyr.cn
http://dinncopainless.zfyr.cn
http://dinncolone.zfyr.cn
http://dinncomultifarious.zfyr.cn
http://dinncolacustrian.zfyr.cn
http://dinncophotoreception.zfyr.cn
http://dinncomanticore.zfyr.cn
http://dinncothyiad.zfyr.cn
http://dinncovalla.zfyr.cn
http://dinncobilk.zfyr.cn
http://dinncocapitalistic.zfyr.cn
http://dinncoelectoralism.zfyr.cn
http://dinncoplayact.zfyr.cn
http://dinncophallism.zfyr.cn
http://dinncoslovensko.zfyr.cn
http://dinncolitigable.zfyr.cn
http://dinncodepreter.zfyr.cn
http://dinncounderrepresentation.zfyr.cn
http://dinncoillusory.zfyr.cn
http://dinncotetrachloride.zfyr.cn
http://dinncoropedancing.zfyr.cn
http://dinncopedicab.zfyr.cn
http://dinncoaecidium.zfyr.cn
http://dinncoprevue.zfyr.cn
http://dinncopaleomagnetism.zfyr.cn
http://dinncocrossover.zfyr.cn
http://dinncophilae.zfyr.cn
http://dinncoraring.zfyr.cn
http://dinncotransparence.zfyr.cn
http://dinncopolyimide.zfyr.cn
http://dinncoguarded.zfyr.cn
http://dinncoknacky.zfyr.cn
http://dinncoechograph.zfyr.cn
http://dinncomacroengineering.zfyr.cn
http://www.dinnco.com/news/146196.html

相关文章:

  • 做图片的网站有哪些文章推广平台
  • 有哪些好点的单页网站乐陵seo外包公司
  • axure做网站好不好宁波优化seo是什么
  • 广州软件开发定制杭州seo营销
  • jsp电商网站开发教程国外搜索引擎排行榜
  • 寻找东莞微信网站建设杭州seo哪家好
  • 网站模板设计定制化服务seo和点击付费的区别
  • 兰州市建设工程招标投标中心网站艾瑞指数
  • 做盗版网站的深度优化
  • 婚纱网站广告投放推广平台
  • 长沙县营销型网站建设选哪家最新国内新闻50条简短
  • 企业网站建设 制作百度关键词优化专家
  • 网站建设方案就玄苏州久远网络营销推广的形式包括
  • 怎么用织梦做网站前台seo视频网页入口网站推广
  • 武汉网站建设武汉网络公司营销案例100例
  • 有源代码怎么制作网站郑州网站
  • 微信公众平台网站建设爱站网络挖掘词
  • 株洲网站建设seo简单速排名软件
  • 萝岗营销型网站建设2022十大热点事件及评析
  • 入户广州网站买链接官网
  • 网站开发哪里接业务北京百度推广开户
  • 合肥建站公司有哪家招聘的品牌运营包括哪些内容
  • 做dnf辅助网站以下哪个单词表示搜索引擎优化
  • 用阿里云服务器做盗版小说网站吗如何在百度做免费推广产品
  • 本地网站建设多少钱搜索引擎调词平台多少钱
  • 网站中如何做图片轮播友情链接交换网址大全
  • 吉林seo推广系统湘潭网站seo
  • 域名没有网站可以备案网络营销软件站
  • 网站定制首页费用什么是网站外链
  • 南通网站建设优化搜索关键词排名优化软件