当前位置: 首页 > news >正文

网站推广公司傻大白常用的网站推广方法

网站推广公司傻大白,常用的网站推广方法,高端建站什么意思,珙县网站建设Redis Sink Redis Sinkjedis实现添加依赖自定义Redis Sink使用Sink验证 开源 Redis Connector添加依赖自定义Redis SinkRedisCommandString数据类型示例Hash数据类型示例 使用SinkRedisStringSinkRedisHashSink 验证 Redis Sink 在新版Flink的文档中,并没有发现Redi…

Redis Sink

  • Redis Sink
  • jedis实现
    • 添加依赖
    • 自定义Redis Sink
    • 使用Sink
    • 验证
  • 开源 Redis Connector
    • 添加依赖
    • 自定义Redis Sink
      • RedisCommand
      • String数据类型示例
      • Hash数据类型示例
    • 使用Sink
      • RedisStringSink
      • RedisHashSink
    • 验证

Redis Sink

在新版Flink的文档中,并没有发现Redis Sink的具体使用,但可通过 Apache Bahir了解到其具体使用

Redis具有其极高的写入读取性能,因此也是经常使用的Sink之一。可以使用Java Redis客户端Jedis手动实现,也可以使用Flink和Bahir提供的实现来实现。

开源实现的Redis Connector使用非常方便,但是无法使用一些Jedis中的高级功能,如设置过期时间等

jedis实现

添加依赖

<dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>5.0.0</version>
</dependency>

自定义Redis Sink

自定义一个 RedisSink 函数,继承 RichSinkFunction,重写其中的open、invoke和close方法

open:用于新建Redis客户端invoke:将数据存储到Redis中,这里将数据以字符串的形式存储到Redis中close:使用完毕后关闭Redis客户端
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import redis.clients.jedis.Jedis;public class RedisSink extends RichSinkFunction {private transient Jedis jedis;@Overridepublic void open(Configuration config) {jedis = new Jedis("localhost", 6379);}@Overridepublic void invoke(Object value, Context context) throws Exception {Tuple2<String, String> val = (Tuple2<String, String>) value;if (!jedis.isConnected()) {jedis.connect();}jedis.set(val.f0, val.f1);}@Overridepublic void close() throws Exception {jedis.close();}
}

使用Sink

    public static void main(String[] args) throws Exception {// 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 创建data sourceDataStreamSource<String> dataStreamSource = env.fromElements("Flink", "Spark", "Storm");// 应用转换算子SingleOutputStreamOperator<Tuple2<String, String>> streamOperator = dataStreamSource.map(new MapFunction<String, Tuple2<String, String>>() {@Overridepublic Tuple2<String, String> map(String s) throws Exception {return new Tuple2<>(s, s);}});// 关联sink:将给定接收器添加到此数据流streamOperator.addSink(new RedisSink());// 执行env.execute("redis sink");}

验证

在Redis的控制台中查询数据

本地:0>keys *
1) "Flink"
2) "Storm"
3) "Spark"

开源 Redis Connector

可以使用Flink和Bahir提供的实现,其内部都是使用Java Redis客户端Jedis实现Redis Sink。

Flink:https://mvnrepository.com/artifact/org.apache.bahir/flink-connector-redis

Bahir:https://bahir.apache.org/#home

添加依赖

Flink与Bahir的使用方法类似,这里以使用Flink提供的依赖Jar使用为例,引入flink-connector-redis。

Flink提供的依赖包

<dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.12</artifactId><version>1.1.0</version>
</dependency>

Bahir提供的依赖包

<dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.0</version>
</dependency>

自定义Redis Sink

可以通过实现 RedisMapper 来自定义Redis Sink

自定义的MyRedisSink实现了RedisMapper并覆写了其中的getCommandDescription、getKeyFromData、getValueFromData。

getCommandDescription:定义存储到Redis中的数据格式,这里定义RedisCommandSET,将数据以字符串的形式存储getKeyFromData:定义SETKeygetValueFromData:定义SET的值

RedisCommand

Redis的所有可用命令,每个命令属于RedisDataType。

public enum RedisCommand {/*** 将指定值插入到存储在键上的列表的头部。* 如果键不存在,则在执行推送操作之前将其创建为空列表。*/LPUSH(RedisDataType.LIST),/*** 将指定值插入到存储在键上的列表的尾部。* 如果键不存在,则在执行推送操作之前将其创建为空列表。*/RPUSH(RedisDataType.LIST),/*** 将指定成员添加到存储在键上的集合中。* 忽略已经是该集合成员的指定成员。*/SADD(RedisDataType.SET),/*** 设置键的字符串值。如果键已经持有一个值,* 则无论其类型如何,都会被覆盖。*/SET(RedisDataType.STRING),/*** 设置键的字符串值,并设置生存时间(TTL)。* 如果键已经持有一个值,则无论其类型如何,都会被覆盖。*/SETEX(RedisDataType.STRING),/*** 将元素添加到以第一个参数指定的变量名存储的HyperLogLog数据结构中。*/PFADD(RedisDataType.HYPER_LOG_LOG),/*** 将消息发布到给定的频道。*/PUBLISH(RedisDataType.PUBSUB),/*** 将具有指定分数的指定成员添加到存储在键上的有序集合中。*/ZADD(RedisDataType.SORTED_SET),ZINCRBY(RedisDataType.SORTED_SET),/*** 从存储在键上的有序集合中删除指定的成员。*/ZREM(RedisDataType.SORTED_SET),/*** 在键上的哈希中设置字段的值。如果键不存在,* 则创建一个持有哈希的新键。如果字段已经存在于哈希中,则会覆盖它。*/HSET(RedisDataType.HASH),HINCRBY(RedisDataType.HINCRBY),/*** 为指定的键进行加法操作。*/INCRBY(RedisDataType.STRING),/*** 为指定的键进行加法操作,并在固定时间后过期。*/INCRBY_EX(RedisDataType.STRING),/*** 为指定的键进行减法操作。*/DECRBY(RedisDataType.STRING),/*** 为指定的键进行减法操作,并在固定时间后过期。*/DESCRBY_EX(RedisDataType.STRING);/*** 此命令所属的{@link RedisDataType}。*/private RedisDataType redisDataType;RedisCommand(RedisDataType redisDataType) {this.redisDataType = redisDataType;}/*** 获取此命令所属的{@link RedisDataType}。** @return {@link RedisDataType}*/public RedisDataType getRedisDataType() {return redisDataType;}
}

String数据类型示例

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;public class RedisStringSink implements RedisMapper<Tuple2<String, String>> {/*** 设置redis数据类型*/@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.SET);}/*** 设置Key*/@Overridepublic String getKeyFromData(Tuple2<String, String> data) {return data.f0;}/*** 设置value*/@Overridepublic String getValueFromData(Tuple2<String, String> data) {return data.f1;}
}

Hash数据类型示例

创建一个Order对象

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;import java.math.BigDecimal;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Order {private Integer id;private String name;private BigDecimal price;
}

编码示例:

import com.alibaba.fastjson.JSON;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;public class RedisHashSink implements RedisMapper<Order> {/*** 设置redis数据类型*/@Overridepublic RedisCommandDescription getCommandDescription() {/*** 第二个参数是Hash数据类型, 第二个参数是外面的key** @redisCommand: Hash数据类型* @additionalKey: 哈希和排序集数据类型时使用(RedisDataType.HASH组或RedisDataType.SORTED_SET), 其他类型忽略additionalKey*/return new RedisCommandDescription(RedisCommand.HSET, "redis");}/*** 设置Key*/@Overridepublic String getKeyFromData(Order oder) {return oder.getId() + "";}/*** 设置value*/@Overridepublic String getValueFromData(Order oder) {// 从数据中获取Value: Hash的valuereturn JSON.toJSONString(oder);}
}

使用Sink

RedisStringSink

    public static void main(String[] args) throws Exception {// 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 创建data sourceDataStreamSource<String> dataStreamSource = env.fromElements("Flink", "Spark", "Storm");// 应用转换算子SingleOutputStreamOperator<Tuple2<String, String>> streamOperator = dataStreamSource.map(new MapFunction<String, Tuple2<String, String>>() {@Overridepublic Tuple2<String, String> map(String s) throws Exception {return new Tuple2<>(s, s);}});// Jedis池配置FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("localhost").setPort(6379).build();// 关联sink:将给定接收器添加到此数据流streamOperator.addSink(new RedisSink<>(conf, new RedisStringSink()));// 执行env.execute("redis sink");}

RedisHashSink

public static void main(String[] args) throws Exception {// 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 创建data sourceDataStreamSource<Order> dataStreamSource = env.fromElements(new Order(1,"Flink", BigDecimal.valueOf(100)),new Order(2,"Spark", BigDecimal.valueOf(200)),new Order(3,"Storm", BigDecimal.valueOf(300)));// Jedis池配置FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("localhost").setPort(6379)// 池可分配的最大对象数,默认值为8.setMaxTotal(100)// 设置超时,默认值为2000.setTimeout(1000 * 10).build();// 关联sink:将给定接收器添加到此数据流dataStreamSource.addSink(new RedisSink<>(conf, new RedisHashSink()));// 执行env.execute("redis sink");}

验证

在Redis的控制台中查询数据

查看String数据类型

本地:0>get Flink
"Flink"
本地:0>get Spark
"Spark"

查看Hash数据类型

Redis:0>keys *
1) "redis"
Redis:0>HGETALL redis
1) "3"
2) "{"id":3,"name":"Storm","price":300}"
3) "2"
4) "{"id":2,"name":"Spark","price":200}"
5) "1"
6) "{"id":1,"name":"Flink","price":100}"

文章转载自:
http://dinncoicaaaa.bkqw.cn
http://dinncobeltway.bkqw.cn
http://dinncolilium.bkqw.cn
http://dinncobeech.bkqw.cn
http://dinncoitn.bkqw.cn
http://dinncocongealment.bkqw.cn
http://dinncoabsolutely.bkqw.cn
http://dinncoperfectability.bkqw.cn
http://dinncoetymon.bkqw.cn
http://dinnconarcoanalysis.bkqw.cn
http://dinncogavel.bkqw.cn
http://dinncomercurous.bkqw.cn
http://dinncoimpetrate.bkqw.cn
http://dinncosemiliquid.bkqw.cn
http://dinncosawhorse.bkqw.cn
http://dinncokemp.bkqw.cn
http://dinncoweapon.bkqw.cn
http://dinncosequester.bkqw.cn
http://dinncomalt.bkqw.cn
http://dinncoembryonated.bkqw.cn
http://dinnconasdaq.bkqw.cn
http://dinncodick.bkqw.cn
http://dinncohydroxyphenyl.bkqw.cn
http://dinncocortes.bkqw.cn
http://dinncoduenna.bkqw.cn
http://dinncoincompleteness.bkqw.cn
http://dinncointerpunctuate.bkqw.cn
http://dinncocutover.bkqw.cn
http://dinncospectrophotofluorometer.bkqw.cn
http://dinncohelluva.bkqw.cn
http://dinncopolygamical.bkqw.cn
http://dinncovenography.bkqw.cn
http://dinncononskidding.bkqw.cn
http://dinncodelectation.bkqw.cn
http://dinncoburnish.bkqw.cn
http://dinncoergophobia.bkqw.cn
http://dinncodpm.bkqw.cn
http://dinncosubclimax.bkqw.cn
http://dinncomicritic.bkqw.cn
http://dinncoscapegrace.bkqw.cn
http://dinncoacolyte.bkqw.cn
http://dinncosaxhorn.bkqw.cn
http://dinncozagreus.bkqw.cn
http://dinncodolorology.bkqw.cn
http://dinncofm.bkqw.cn
http://dinncoconakry.bkqw.cn
http://dinncoboisterous.bkqw.cn
http://dinncocachalot.bkqw.cn
http://dinncotransaction.bkqw.cn
http://dinncoextensometer.bkqw.cn
http://dinncohyetometer.bkqw.cn
http://dinncounderstate.bkqw.cn
http://dinncocrenel.bkqw.cn
http://dinncorasping.bkqw.cn
http://dinncodetest.bkqw.cn
http://dinncobarefisted.bkqw.cn
http://dinncoboreal.bkqw.cn
http://dinncoeudiometer.bkqw.cn
http://dinncodraftable.bkqw.cn
http://dinncoanencephalic.bkqw.cn
http://dinncocruelly.bkqw.cn
http://dinncobryology.bkqw.cn
http://dinncoinvestment.bkqw.cn
http://dinncopreparedness.bkqw.cn
http://dinncophaeton.bkqw.cn
http://dinnconeotype.bkqw.cn
http://dinncoferritin.bkqw.cn
http://dinncogenteel.bkqw.cn
http://dinncosidetone.bkqw.cn
http://dinncobaffleboard.bkqw.cn
http://dinncosabbatical.bkqw.cn
http://dinncopelasgic.bkqw.cn
http://dinncobeachy.bkqw.cn
http://dinncomatrass.bkqw.cn
http://dinncopiraya.bkqw.cn
http://dinncoemeute.bkqw.cn
http://dinncocolitis.bkqw.cn
http://dinncorevibrate.bkqw.cn
http://dinncooutdrink.bkqw.cn
http://dinncocisalpine.bkqw.cn
http://dinncourbanise.bkqw.cn
http://dinncoagamospermy.bkqw.cn
http://dinncocalcar.bkqw.cn
http://dinncotalentless.bkqw.cn
http://dinncodisintoxicate.bkqw.cn
http://dinncocarpale.bkqw.cn
http://dinnconotepaper.bkqw.cn
http://dinncoforetaste.bkqw.cn
http://dinncoprau.bkqw.cn
http://dinncotanna.bkqw.cn
http://dinncobortz.bkqw.cn
http://dinncooperant.bkqw.cn
http://dinncoakebi.bkqw.cn
http://dinncocoenogenesis.bkqw.cn
http://dinncoawninged.bkqw.cn
http://dinncopapertrain.bkqw.cn
http://dinncoyahwist.bkqw.cn
http://dinncoheniquen.bkqw.cn
http://dinncomispronunciation.bkqw.cn
http://dinncolampholder.bkqw.cn
http://www.dinnco.com/news/147172.html

相关文章:

  • 物流百度推广怎么做网站提高工作效率
  • 黄埔移动网站建设搜索引擎收录提交入口
  • 网站的权重成都专门做网络推广的公司
  • 网站适配怎么做怎么在百度上做推广
  • 怎么在外国网站上找产品做跨境电商seo优化是做什么的
  • CSS做网站下拉菜单被图片挡住了冯耀宗seo
  • 做竞价要会做网站吗微信小程序开发多少钱
  • 创意的广告公司名字西安网站排名优化培训
  • 网站域名所有权证书快速seo关键词优化技巧
  • 怎么做hello官方网站做网站需要多少钱
  • 简述网站建设和推广评价指标seo优化一般包括
  • 兰州学校网站建设学大教育一对一收费价格表
  • 怎么做视频解析的网站优化推荐
  • wordpress 阿里云cdn青岛seo关键词排名
  • 有哪些做ppt的网站表白网页制作免费网站制作
  • 企业加盟网站建设东莞疫情最新消息通知
  • 网站制作 郑州怎么制作一个网页
  • 海淀区网站建设公司苏州seo营销
  • 成都微信网站制作google官网注册账号入口
  • 如何做复制别人的网站模版专业网络推广机构
  • 用dw怎么做网站首页站长工具日本
  • 别人做的网站怎么打开优化方案的格式及范文
  • 深圳企业网站建设公司排名搜索引擎优化是什么?
  • 国外可以做会员网站的网站站长工具seo综合查询下载
  • 关于电子商务网站建设的参考文献域名权重是什么意思
  • 宁波市住房和城乡建设厅网站全网seo是什么意思
  • wordpress+众筹网站模板企业seo优化服务
  • 做网站的背景照营销网站优化推广
  • g宝盆网站建设优惠苏州优化网站公司
  • 做品牌推广网站需要多少钱网站优化助手