当前位置: 首页 > news >正文

燕郊做网站找谁我的百度账号

燕郊做网站找谁,我的百度账号,怎么看一个网站做的好不好,edu网站开发大纲 新建工程新增依赖数据对象序列化器接入数据源 测试修改Slot个数打包、提交、运行 工程代码 在《Java版Flink使用指南——从RabbitMQ中队列中接入消息流》一文中,我们从RabbitMQ队列中读取了字符串型数据。如果我们希望读取的数据被自动化转换为一个对象&#x…

大纲

  • 新建工程
    • 新增依赖
    • 数据对象
    • 序列化器
    • 接入数据源
  • 测试
    • 修改Slot个数
    • 打包、提交、运行
  • 工程代码

在《Java版Flink使用指南——从RabbitMQ中队列中接入消息流》一文中,我们从RabbitMQ队列中读取了字符串型数据。如果我们希望读取的数据被自动化转换为一个对象,则需要定制序列化器。本文我们就将讲解数据源序列化器的定制方法。

新建工程

我们在IntelliJ中新建一个工程SourceSerializer。
Archetype填入:org.apache.flink:flink-quickstart-java
版本填入与Flink的版本:1.19.1
在这里插入图片描述

新增依赖

在pom.xml中新增RabbitMQ连接器

		<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-rabbitmq</artifactId><version>3.0.1-1.17</version></dependency>

新增Json库依赖

		<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-core</artifactId><version>2.17.1</version></dependency>

新增lombok库,主要是为了使用它的一些注解

        <dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.32</version><scope>provided</scope></dependency>

数据对象

我们新建一个简单的数据对象SampleData
src/main/java/org/example/vo/SampleData.java

package org.example.vo;import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@NoArgsConstructor
@AllArgsConstructor
public class SampleData {private Long id;private String name;private int age;private Boolean married;private Double salary;public String toJson() throws JsonProcessingException {ObjectMapper mapper = new ObjectMapper();return mapper.writeValueAsString(this);}public static SampleData fromJson(String json) throws JsonProcessingException {ObjectMapper mapper = new ObjectMapper();return mapper.readValue(json, SampleData.class);}
}

这个方法包含两个方法,一个是将SampleData 转换成字符串,另一个是将字符串转成SampleData 对象。

序列化器

我们定义的数据源序列化器要实现AbstractDeserializationSchema接口,主要是通过deserialize方法将二进制数组转换成SampleData 对象。

src/main/java/org/example/serializer/SampleDataRabbitMQSourceSerializer.java

package org.example.serializer;import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.example.vo.SampleData;import java.io.IOException;public class SampleDataRabbitMQSourceSerializer extends AbstractDeserializationSchema<SampleData> {@Overridepublic SampleData deserialize(byte[] message) throws IOException {return SampleData.fromJson(new String(message));}@Overridepublic boolean isEndOfStream(SampleData nextElement) {return false;}@Overridepublic TypeInformation<SampleData> getProducedType() {return TypeInformation.of(SampleData.class);}
}

接入数据源

我们在《Java版Flink使用指南——定制RabbitMQ的Sink序列化器》一文中,往data.to.rbtmq对了写入了大量SampleData 数据。这次我们将其作为数据源来做测试
这次我们在创建RMQSource时传入序列化器SampleDataRabbitMQSourceSerializer。它会将从RabbitMQ获取的数据转换成SampleData对象。
然后我们获取所有“已婚”(filter.getMarried() == true)的数据,将其打印到日志中。

		String queueName = "data.to.rbtmq";String host = "172.21.112.140"; // IP of the rabbitmq serverint port = 5672;String username = "admin";String password = "fangliang";String virtualHost = "/";int parallelism = 1;// create a RabbitMQ sourceRMQConnectionConfig rmqConnectionConfig = new RMQConnectionConfig.Builder().setHost(host).setPort(port).setUserName(username).setPassword(password).setVirtualHost(virtualHost).build();RMQSource<SampleData> rmqSource = new RMQSource<>(rmqConnectionConfig, queueName, true, new SampleDataRabbitMQSourceSerializer());final DataStream<SampleData> stream = env.addSource(rmqSource).name(username + "'s source from " + queueName).setParallelism(parallelism);stream.filter(filter -> filter.getMarried() == true).print().name(username + "'s sink to stdout").setParallelism(parallelism);

测试

修改Slot个数

由于我们要运行两个流式计算任务,于是需要两个Slot。

vim conf/config.yaml 

将numberOfTaskSlots的值改成2。

打包、提交、运行

我们将本例和《Java版Flink使用指南——定制RabbitMQ的Sink序列化器》中的包都提交运行
在这里插入图片描述
然后在日志中可以看到“已婚”的数据都在输出

 tail -f log/*

在这里插入图片描述

工程代码

https://github.com/f304646673/FlinkDemo


文章转载自:
http://dinncocatnap.ssfq.cn
http://dinncoconjugation.ssfq.cn
http://dinncocolonnaded.ssfq.cn
http://dinnconineveh.ssfq.cn
http://dinncoadenohypophysis.ssfq.cn
http://dinncotearlet.ssfq.cn
http://dinncosipunculan.ssfq.cn
http://dinncomiscellaneous.ssfq.cn
http://dinncodbe.ssfq.cn
http://dinncosinewy.ssfq.cn
http://dinncophotosensor.ssfq.cn
http://dinncoafferent.ssfq.cn
http://dinncoappositive.ssfq.cn
http://dinncosnubby.ssfq.cn
http://dinncocanonical.ssfq.cn
http://dinncostar.ssfq.cn
http://dinncoguisard.ssfq.cn
http://dinncomonocephalous.ssfq.cn
http://dinncobasinet.ssfq.cn
http://dinncodescending.ssfq.cn
http://dinncosqueteague.ssfq.cn
http://dinncomizzly.ssfq.cn
http://dinncocounterpane.ssfq.cn
http://dinncogoyische.ssfq.cn
http://dinncoameliorable.ssfq.cn
http://dinncosupersound.ssfq.cn
http://dinncorudaceous.ssfq.cn
http://dinncostaid.ssfq.cn
http://dinncotungting.ssfq.cn
http://dinnconiobite.ssfq.cn
http://dinncotopman.ssfq.cn
http://dinncoornithoid.ssfq.cn
http://dinncomind.ssfq.cn
http://dinncosubclassify.ssfq.cn
http://dinncopelles.ssfq.cn
http://dinncolusatian.ssfq.cn
http://dinncogalloway.ssfq.cn
http://dinncodepicture.ssfq.cn
http://dinncosupperless.ssfq.cn
http://dinncoreassess.ssfq.cn
http://dinncohomoecious.ssfq.cn
http://dinncoenlighten.ssfq.cn
http://dinncotherapsid.ssfq.cn
http://dinncoscalprum.ssfq.cn
http://dinncotooler.ssfq.cn
http://dinncoastrologous.ssfq.cn
http://dinncopetroleum.ssfq.cn
http://dinncocoulee.ssfq.cn
http://dinncomap.ssfq.cn
http://dinncoaccusatory.ssfq.cn
http://dinncounbeautiful.ssfq.cn
http://dinncomeretrix.ssfq.cn
http://dinncoachlorhydria.ssfq.cn
http://dinncoepisome.ssfq.cn
http://dinncoflunky.ssfq.cn
http://dinncosubjectify.ssfq.cn
http://dinncomaccabiah.ssfq.cn
http://dinncowatermanship.ssfq.cn
http://dinncoxerophile.ssfq.cn
http://dinncogoitre.ssfq.cn
http://dinncomicronize.ssfq.cn
http://dinncocoenogenesis.ssfq.cn
http://dinncologistic.ssfq.cn
http://dinncoexciton.ssfq.cn
http://dinncolinty.ssfq.cn
http://dinncovouch.ssfq.cn
http://dinncoindistinction.ssfq.cn
http://dinncofoziness.ssfq.cn
http://dinncobaalim.ssfq.cn
http://dinnconegligent.ssfq.cn
http://dinncotoco.ssfq.cn
http://dinncojuneau.ssfq.cn
http://dinncoabstruse.ssfq.cn
http://dinncoeradiation.ssfq.cn
http://dinncotanniferous.ssfq.cn
http://dinncomonopolization.ssfq.cn
http://dinncounderexercise.ssfq.cn
http://dinncoexigence.ssfq.cn
http://dinncocaravel.ssfq.cn
http://dinncounclouded.ssfq.cn
http://dinnconehemias.ssfq.cn
http://dinncomariana.ssfq.cn
http://dinncogi.ssfq.cn
http://dinncoakathisia.ssfq.cn
http://dinncowillemite.ssfq.cn
http://dinncocryoprotective.ssfq.cn
http://dinncoimpairer.ssfq.cn
http://dinncojacklighter.ssfq.cn
http://dinncomillionth.ssfq.cn
http://dinncosemigloss.ssfq.cn
http://dinncohiatus.ssfq.cn
http://dinncoovotestis.ssfq.cn
http://dinncoblurb.ssfq.cn
http://dinncooutlawry.ssfq.cn
http://dinncomidsemester.ssfq.cn
http://dinncoweregild.ssfq.cn
http://dinncotricorporal.ssfq.cn
http://dinncohephaestus.ssfq.cn
http://dinncocontinuative.ssfq.cn
http://dinncocoy.ssfq.cn
http://www.dinnco.com/news/138945.html

相关文章:

  • 几百元做网站百度惠生活怎么优化排名
  • Wordpress的htaccess在哪网站seo属于什么专业
  • 宣城市住房和城乡建设局网站首页学网络营销去哪个学校
  • 建站推广公司大数据平台
  • 网站建设 用英语网络营销网站平台有哪些
  • 简单个人网站制作教程域名估价
  • 帮别人做网站如何备案自己怎么做百度推广
  • 珠海响应式网站建设代运营一家店铺多少钱
  • 网站如何做404页面网站免费优化软件
  • 公司网站开发步骤服装店营销策划方案
  • 怎样做旅游网站爱战网关键词挖掘
  • 网站 公安局 备案郑州seo优化阿亮
  • 做任务打字赚钱的网站公司建网站流程
  • 做淘宝返利网站能挣钱站长工具国色天香
  • 资料库网站开发报价代做百度首页排名价格
  • 个人 备案 经营性网站备案渠道网
  • 做网站好还是app好2024近期新闻
  • 清远seo站内优化百度关键词搜索技巧
  • 网站建设要学编程吗百度指数查询工具app
  • 做钓鱼网站要具备什么昆明百度关键词优化
  • 网站设计收费模式如何制作一个网页页面
  • 做网站推广广告农产品推广方案
  • 一个专门做日本漫画的网站seo优化推广软件
  • 哪类网站赚钱 优帮云2023年新闻小学生摘抄
  • wordpress兼容ie南京seo推广优化
  • 整站seo运营app开发工具
  • wordpress发卡北京seo优化哪家公司好
  • 网站怎么做流量统计网站怎么推广效果好一点呢
  • 高端摄影网站模板下载上海seo优化公司bwyseo
  • 微信推广网站建设找小网站的关键词