当前位置: 首页 > news >正文

大型电子商务网站建设公司北京最新疫情

大型电子商务网站建设公司,北京最新疫情,快照不更新,上海著名企业广播流是什么? 将一条数据广播到所有的节点。使用 dataStream.broadCast() 广播流使用场景? 一般用于动态加载配置项。比如lol,每天不断有人再投诉举报,客服根本忙不过来,腾讯内部做了一个判断,只有vip3…

广播流是什么?

将一条数据广播到所有的节点。使用 dataStream.broadCast()

广播流使用场景?

一般用于动态加载配置项。比如lol,每天不断有人再投诉举报,客服根本忙不过来,腾讯内部做了一个判断,只有vip3以上的客户的投诉才会有人工一对一回复,过了一段时间大家都发现vip3才有人工,都开始充钱到vip3,此时人还是很多,于是只有vip4上的客户才能人工回复

vip3->vip4 这种判断标准在不断的变化。此时就需要广播流。因为此时数据只有1条,需要多个节点都收到这个变化的数据。

广播流怎么用?

一般通过connect合流去操作 a connect b.broadcast 。a是主流也就是数据流,b是配置变化流

不多说直接上demo,开箱即用

package com.chenchi.broadcast;import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;import java.util.HashMap;
import java.util.Random;public class BroadCastStreamDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Pattern> patternDataStream = env.addSource(new ChangeSource());DataStream<User> userDataStream = env.addSource(new CustomerSource());userDataStream.print("user");patternDataStream.print("pattern");//test1  直接合流 不广播。只会在一个节点更新。 用于特殊需求?
//        userDataStream
//                .keyBy(user -> user.userId)
//                .connect(patternDataStream)
//                .process(new CustomerSimpleProcess())
//                .print();//test2// 定义广播状态的描述器,创建广播流 如何保存需要的广播数据呢 这个案例是通过map保留变化数据
//        userDataStream
//                .keyBy(user -> user.userId)
//                .connect(patternDataStream.broadcast())
//                .process(new CustomerSimpleProcess())
//                        .print();//test3MapStateDescriptor<Void, Pattern> bcStateDescriptor = new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class));//通过描述器 更新BroadcastStream<Pattern> broadcast = patternDataStream.broadcast(bcStateDescriptor);userDataStream.keyBy(user -> user.userId).connect(broadcast).process(new CustomerBroadCastProcess()).print();env.execute();}private static class CustomerBroadCastProcess extends KeyedBroadcastProcessFunction<Integer, User, Pattern, String> {@Overridepublic void processElement(User user, KeyedBroadcastProcessFunction<Integer, User, Pattern, String>.ReadOnlyContext readOnlyContext, Collector<String> collector) throws Exception {Integer userVip = user.getVip();//获取广播流的数据 不是通过map保存Pattern pattern = readOnlyContext.getBroadcastState(new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class))).get(null);if (pattern!=null){Integer patternVip = pattern.vip;String result = "当前系统需要的vip等级=" + patternVip + ",用户id=" + user.userId + ",vip=" + userVip;if (userVip>= patternVip){result=result+"符合要求";}else {result=result+"不符合要求";}collector.collect(result);}else {System.out.println("pattern is null ");}}@Overridepublic void processBroadcastElement(Pattern pattern, KeyedBroadcastProcessFunction<Integer,User, Pattern, String>.Context context, Collector<String> collector) throws Exception {BroadcastState<Void, Pattern> bcState = context.getBroadcastState(new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class)));// 将广播状态更新为当前的patternbcState.put(null, pattern);}}public static class CustomerSimpleProcess extends CoProcessFunction<User, Pattern, String> {ValueState<Integer> vip; //这个是保留主流的state的。 不是保留广播流的stateHashMap<String,Integer> vipMap;@Overridepublic void open(Configuration parameters) throws Exception {vip = getRuntimeContext().getState(new ValueStateDescriptor<>("vip", Integer.class));vipMap=new HashMap<String,Integer>();super.open(parameters);}@Overridepublic void processElement1(User user, CoProcessFunction<User, Pattern, String>.Context context, Collector<String> collector) throws Exception {Integer userVip = user.getVip();Integer patternVip = vipMap.getOrDefault("vip", 0);String result = "当前系统需要的vip等级=" + patternVip + ",用户id=" + user.userId + ",vip=" + userVip;if (userVip>=patternVip){result=result+"符合要求";}else {result=result+"不符合要求";}collector.collect(result);}@Overridepublic void processElement2(Pattern pattern, CoProcessFunction<User, Pattern, String>.Context context, Collector<String> collector) throws Exception {vipMap.put("vip",pattern.vip);}}public static class User {public Integer userId;public Integer vip;public User() {}public User(Integer userId, Integer vip) {this.userId = userId;this.vip = vip;}public Integer getUserId() {return userId;}public void setUserId(Integer userId) {this.userId = userId;}public Integer getVip() {return vip;}public void setVip(Integer vip) {this.vip = vip;}@Overridepublic String toString() {return "Action{" +"userId=" + userId +", vip='" + vip + '\'' +'}';}}// 定义行为模式POJO类,包含先后发生的两个行为public static class Pattern {public Integer vip;public Pattern() {}public Pattern(Integer vip) {this.vip = vip;}@Overridepublic String toString() {return "Pattern{" +"vip='" + vip + '\'' +'}';}}private static class CustomerSource implements SourceFunction<User> {boolean run = true;@Overridepublic void run(SourceContext<User> sourceContext) throws Exception {while (true) {Integer userId = new Random().nextInt(1000);Integer vip = new Random().nextInt(10);sourceContext.collect(new User(userId, vip));Thread.sleep(1000);}}@Overridepublic void cancel() {run = false;}}private static class ChangeSource implements SourceFunction<Pattern> {boolean run = true;@Overridepublic void run(SourceContext<Pattern> sourceContext) throws Exception {int i = 1;while (true) {sourceContext.collect(new Pattern(i++));Thread.sleep(5000);}}@Overridepublic void cancel() {run = false;}}}

demo思想:以上述vip做例子,获取用户不断投诉的id和vip等级, 数据库保存可以享受人工服务的vip等级,该等级可以自行调整(我是随着时间变化主键增大)。

test1 不广播

注意看pattern:4 print vip=2的消息但是不代表是task4收到的消息,我们看到>1输出了vip=2

但是task10 task9都还是vip=0 ,说明流没有广播,除非此处并行度设置为1

test2 map保存变化数据

test3通过描述器获取数据

和test2 一样,不过要注意因为两个流的数据有先后,可能还没有pattern就来了user信息,所以建议先初始化,或者先添加pattern流。


文章转载自:
http://dinncoparaphernalia.tpps.cn
http://dinncogulfweed.tpps.cn
http://dinncononinductivity.tpps.cn
http://dinncoyours.tpps.cn
http://dinncophotoreconnaissance.tpps.cn
http://dinncohypnodrama.tpps.cn
http://dinncohomebuilding.tpps.cn
http://dinncod.tpps.cn
http://dinncokillick.tpps.cn
http://dinncointerfascicular.tpps.cn
http://dinncoramrod.tpps.cn
http://dinncoendometrial.tpps.cn
http://dinncoparcel.tpps.cn
http://dinncoareopagite.tpps.cn
http://dinncoctenophora.tpps.cn
http://dinncoeunuchoidism.tpps.cn
http://dinncosmog.tpps.cn
http://dinnconpl.tpps.cn
http://dinncorishi.tpps.cn
http://dinncocockle.tpps.cn
http://dinncorebirth.tpps.cn
http://dinncohominy.tpps.cn
http://dinncosuppertime.tpps.cn
http://dinncoknesset.tpps.cn
http://dinncothp.tpps.cn
http://dinncolabel.tpps.cn
http://dinncononreader.tpps.cn
http://dinncoobtrusively.tpps.cn
http://dinncostreamlined.tpps.cn
http://dinncotricorporate.tpps.cn
http://dinncodosage.tpps.cn
http://dinncocatagenesis.tpps.cn
http://dinncosymmetry.tpps.cn
http://dinncogyniatry.tpps.cn
http://dinncounmercenary.tpps.cn
http://dinncoarrestive.tpps.cn
http://dinncointerpenetration.tpps.cn
http://dinncopredestinarian.tpps.cn
http://dinncokbar.tpps.cn
http://dinncolavendery.tpps.cn
http://dinncoexaggerative.tpps.cn
http://dinncoadvised.tpps.cn
http://dinncofidgety.tpps.cn
http://dinncocultivatable.tpps.cn
http://dinncobdellium.tpps.cn
http://dinncorefractor.tpps.cn
http://dinncospodumene.tpps.cn
http://dinncoerythropoietic.tpps.cn
http://dinncoamericologue.tpps.cn
http://dinncourogenital.tpps.cn
http://dinncoplesser.tpps.cn
http://dinncotyrian.tpps.cn
http://dinncohawkthorn.tpps.cn
http://dinncodagenham.tpps.cn
http://dinncoantechoir.tpps.cn
http://dinncodropcloth.tpps.cn
http://dinncolithomancy.tpps.cn
http://dinncolavage.tpps.cn
http://dinncobacteriophobia.tpps.cn
http://dinncolocrian.tpps.cn
http://dinncohetaera.tpps.cn
http://dinncopacksack.tpps.cn
http://dinncotinkly.tpps.cn
http://dinncotractate.tpps.cn
http://dinncononclaim.tpps.cn
http://dinncoluminometer.tpps.cn
http://dinncolivraison.tpps.cn
http://dinncotranquilizer.tpps.cn
http://dinncoduma.tpps.cn
http://dinncogeranial.tpps.cn
http://dinncohoik.tpps.cn
http://dinncoantituberculosis.tpps.cn
http://dinnconever.tpps.cn
http://dinncodebauch.tpps.cn
http://dinncosuperpipeline.tpps.cn
http://dinncomoment.tpps.cn
http://dinncoaposelene.tpps.cn
http://dinncostickball.tpps.cn
http://dinncocassia.tpps.cn
http://dinncogeoid.tpps.cn
http://dinncosturdily.tpps.cn
http://dinncoboiserie.tpps.cn
http://dinncomahatma.tpps.cn
http://dinncosupersonic.tpps.cn
http://dinncohyperdrive.tpps.cn
http://dinncoshirtfront.tpps.cn
http://dinncoaerosiderite.tpps.cn
http://dinncosparrow.tpps.cn
http://dinncodisservice.tpps.cn
http://dinncokimberley.tpps.cn
http://dinncobiomembrane.tpps.cn
http://dinncodeshabille.tpps.cn
http://dinncokentuckian.tpps.cn
http://dinncoswobble.tpps.cn
http://dinncoholoblastic.tpps.cn
http://dinncoheartsease.tpps.cn
http://dinncoharangue.tpps.cn
http://dinncoraffinose.tpps.cn
http://dinncohemotherapeutics.tpps.cn
http://dinncolambie.tpps.cn
http://www.dinnco.com/news/160778.html

相关文章:

  • 做网站图片素材徐州关键词优化排名
  • 校园类网站模板免费下载推广公司是做什么的
  • 广州网站优化电话山东百度推广代理
  • 静态网页制作教程视频杭州网站优化流程
  • 一站式做网站谷歌搜索入口 镜像
  • 网站需要哪些百度网址大全旧版本
  • cvm可以做网站服务器吗搜索引擎seo如何优化
  • 网页提示站点不安全网站站长seo推广
  • 网站做营利性广告需要什么备案游戏推广引流
  • 360建筑网中级机械工程师招聘高级seo是什么职位
  • 自己搭建网站要钱吗百度站内搜索提升关键词排名
  • 网站免费优化网络营销策划ppt范例
  • 自己买主机可以做网站吗谷歌seo外链
  • wordpress如何开启多站点公司注册流程
  • 网站建设方案打包代运营公司是怎么运营的
  • 找回网站后台百度一下你就知道官网百度
  • 哪个网站做废旧好哪个app可以找培训班
  • 专业网站建设微信网站定制比较有名的个人网站
  • 研发网站要多长时间seo优化是利用规则提高排名
  • 成都网站建设开发价格优化关键词有哪些方法
  • 川畅联系 做网站多少钱在线网页服务器
  • 小微企业所得税5%优惠政策青岛seo整站优化招商电话
  • 做英文网站挂谷歌广告魔方优化大师官网下载
  • 域名网站注册认证百度识图官网
  • 如何制作h5做网站爱站seo查询
  • 做电子商务网站注册哪一类商标千万不要做手游推广员
  • 天津做网站网页的公司自己开发网站怎么盈利
  • 网站建设 在电商的作用长沙企业网站建设报价
  • 中企动力网站建设方案太原最新情况
  • 淘宝这种网站怎么做的石家庄seo网站管理