当前位置: 首页 > news >正文

怎样做淘宝客导购网站seo搜索引擎优化营销案例

怎样做淘宝客导购网站,seo搜索引擎优化营销案例,网站模板安装好后,凡科建站视频教程文章目录 一. RecordWriter封装数据并发送到网络1. 数据发送到网络的具体流程2. 源码层面2.1. Serializer的实现逻辑a. SpanningRecordSerializer的实现b. SpanningRecordSerializer中如何对数据元素进行序列化 2.2. 将ByteBuffer中间数据写入BufferBuilder 二. BufferBuilder申…

文章目录

  • 一. RecordWriter封装数据并发送到网络
    • 1. 数据发送到网络的具体流程
    • 2. 源码层面
      • 2.1. Serializer的实现逻辑
        • a. SpanningRecordSerializer的实现
        • b. SpanningRecordSerializer中如何对数据元素进行序列化
      • 2.2. 将ByteBuffer中间数据写入BufferBuilder
  • 二. BufferBuilder申请资源并创建
    • 1. ChannelSelectorRecordWriter创建BufferBuilder
    • 2. BroadcastRecordWriter创建BufferBuilder

一. RecordWriter封装数据并发送到网络

1. 数据发送到网络的具体流程

RecordWriter对接入的StreamRecord数据进行序列化并等待下游任务消费的过程,整个过程细节如下。

  1. StreamRecord通过RecordWriterOutput写入RecordWriter,并在RecordWriter中通过RecordSerializer组件将StreamRecord序列化为ByteBuffer数据格式。

  2. RecordWriter向ResultPartition申请BufferBuilder对象,用于构建BufferConsumer对象,将序列化后的二进制数据存储在申请到的Buffer中。ResultPartition会向LocalBufferPool申请MemorySegment内存块,用于存储Buffer数据

  3. BufferBuilder中会不断接入ByteBuffer数据,直到将BufferBuilder中的Buffer空间占满,此时会申请新的BufferBuilder继续构建BufferConsumer数据集。

  4. Buffer构建完成后,会调用flushTargetPartition()方法,让ResultPartition向下游输出数据,此时会通知NetworkSequenceViewReader组件开始消费ResultSubPartition中的BufferConsumer对象。

  5. 当BufferConsumer中Buffer数据被推送到网络后,回收BufferConsumer中的MemorySegment内存空间,继续用于后续的消息处理。

在这里插入图片描述

 

2. 源码层面

接下来我们从源码的角度了解RecordWriter具体处理数据的逻辑。在RecordWriterOutput中调用pushToRecordWriter方法将数据写出。

在这里插入图片描述

通过recordWriter.emit(serializationDelegate)方法,将数据元素发送到RecordWriter中进行处理。主要逻辑如下

  1. 序列化数据为ByteBuffer二进制数据,并缓存在SpanningRecordSerializer.serializationBuffer对象中。
  2. 将序列化器生成的中间数据复制到指定分区中,实际上就是将ByteBuffer数据复制到BufferBuiler对象中。
  3. 如果BufferBuiler中存储了完整的数据元素,就会清空序列化器的中间数据,因为序列化器中累积的数据不宜过大。
protected void emit(T record, int targetSubpartition) throws IOException {  checkErroneous();  targetPartition.emitRecord(serializeRecord(serializer, record), targetSubpartition);  if (flushAlways) {  targetPartition.flush(targetSubpartition);  }  
}protected void emit(T record, int targetChannel) throws IOException, InterruptedException {checkErroneous();// 数据序列化serializer.serializeRecord(record);// 将序列化器中的数据复制到指定分区中if (copyFromSerializerToTargetChannel(targetChannel)) {// 清空序列化器serializer.prune();}
}

 

2.1. Serializer的实现逻辑

接着了解如何将序列化器中的数据转换成Buffer并存储到ResultPartiton中,最终将数据发送到下游。

a. SpanningRecordSerializer的实现

SpanningRecordSerializer实现将序列化后的BytesBuffer数据写入BufferBuilder。

SpanningRecordSerializer对象主要包含了DataOutputSerializer serializationBuffer和ByteBuffer dataBuffer两个成员变量。

  • DataOutputSerializer可以将数据转换成二进制格式并存储在byte[]数组中。在serialization中会调用serializationBuffer.wrapAsByteBuffer()方法,将serializationBuffer中生成的byte[]数组转换成ByteBuffer数据结构,并赋值给dataBuffer对象。
  • ByteBuffer是Java NIO中用于对二进制数据进行操作的Buffer接口,底层有DirectByteBuffer和HeapByteBuffer等实现,通过ByteBuffer提供的方法,可以轻松实现对二进制数据的操作。

 

b. SpanningRecordSerializer中如何对数据元素进行序列化

SpanningRecordSerializer.serializeRecord()方法主要逻辑如下。

1)清理serializationBuffer的中间数据,实际上就是将byte[]数组的position参数置为0。
2)设定serialization buffer的初始容量,默认不小于4。
3)将数据元素写入serializationBuffer的bytes[]数组。(所有数据元素都实现了IOReadableWritable接口,可以直接将数据对象转换为二进制格式)
4)获取serializationBuffer的长度信息,并写入serializationBuffer。
5)将serializationBuffer中的byte[]数据封装为java.io.ByteBuffer数据结构,最终赋值到dataBuffer的中间结果中。

public void serializeRecord(T record) throws IOException {if (CHECKED) {if (dataBuffer.hasRemaining()) {throw new IllegalStateException("Pending serialization of previous record.");}}// 首先清理serializationBuffer中的数据serializationBuffer.clear();// 设定serialization buffer数量serializationBuffer.skipBytesToWrite(4);// 将record数据写入serializationBufferrecord.write(serializationBuffer);// 获取serializationBuffer的长度信息并记录到serializationBuffer对象中int len = serializationBuffer.length() - 4;serializationBuffer.setPosition(0);serializationBuffer.writeInt(len);serializationBuffer.skipBytesToWrite(len);// 对serializationBuffer进行wrapp处理,转换成ByteBuffer数据结构dataBuffer = serializationBuffer.wrapAsByteBuffer();
}

Flink 1.12版本中RecordWriter就提供了serializeRecord的能力,没有单拎出来实现。

 

2.2. 将ByteBuffer中间数据写入BufferBuilder

首先BufferBuilder用于构建完整的Buffer数据。在copyFromSerializerToTargetChannel()方法中实现了将RecordSerializer中的ByteBuffer中间数据写入BufferBuilder的逻辑:

  1. 对序列化器进行Reset操作,重置初始化位置。
  2. 将序列化器的ByteBuffer中间数据写入BufferBuilder。
  3. 判断当前BufferBuilder是否构建了完整的Buffer数据,完成BufferBuilder中Buffer的构建。
  4. 判断SerializationResult中是否具有完整的数据元素,如果是则将pruneTriggered置为True,然后清空当前的BufferBuilder,并跳出循环。
  5. 创建新的bufferBuilder,继续从序列化器中将中间数据复制到BufferBuilder中。
  6. 指定flushAlways参数为True,调用flushTargetPartition()方法将数据写入ResultPartition。为防止过度频繁地将数据写入ResultPartiton,在RecordWriter中会有独立的outputFlusher线程(在构造器中),周期性地将构建出来的Buffer数据推送到ResultPartiton本地队列中存储,默认延迟为100ms。
protected boolean copyFromSerializerToTargetChannel(int targetChannel) throws IOException, InterruptedException {// 对序列化器进行Reset操作,初始化initial positionserializer.reset();// 创建BufferBuilderboolean pruneTriggered = false;BufferBuilder bufferBuilder = getBufferBuilder(targetChannel);// 调用序列化器将数据写入bufferBuilderSerializationResult result = serializer.copyToBufferBuilder(bufferBuilder);// 如果SerializationResult是完整Bufferwhile (result.isFullBuffer()) {// 则完成创建Buffer数据的操作finishBufferBuilder(bufferBuilder);// 如果是完整记录,则将pruneTriggered置为Trueif (result.isFullRecord()) {pruneTriggered = true;emptyCurrentBufferBuilder(targetChannel);break;}// 创建新的bufferBuilder,继续复制序列化器中的数据到BufferBuilder中bufferBuilder = requestNewBufferBuilder(targetChannel);result = serializer.copyToBufferBuilder(bufferBuilder);}checkState(!serializer.hasSerializedData(), "All data should be written at once");// 如果指定的flushAlways,则直接调用flushTargetPartition将数据写入ResultPartitionif (flushAlways) {flushTargetPartition(targetChannel);}return pruneTriggered;
}

 

二. BufferBuilder申请资源并创建

1. ChannelSelectorRecordWriter创建BufferBuilder

在ChannelSelectorRecordWriter.getBufferBuilder()方法中定义了BufferBuilder的创建过程。

//1. targetChannel确认数据写入的分区,ID与下游InputGate中的InputChannelID是对应的
//2. 
public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException {//在ChannelSelectorRecordWriter中维护了//bufferBuilders[]数组,用于存储创建好的BufferBuilder对象if (bufferBuilders[targetChannel] != null) {return bufferBuilders[targetChannel];} else {//只有在无法从bufferBuilders[]中获取BufferBuilder时,//才会调用requestNewBufferBuilder()方法创建新的BufferBuilder对象。return requestNewBufferBuilder(targetChannel);}
}

requestNewBufferBuilder()方法逻辑如下

  1. 检查bufferBuilders[]的状态,确保bufferBuilders[targetChannel]为空或者bufferBuilders[targetChannel].isFinished()方法返回值为True。
  2. 调用targetPartition.getBufferBuilder()方法获取新的BufferBuilder,这里的targetPartition就是前面提到的ResultPartition。在ResultPartition中会向LocalBufferPool申请Buffer内存空间,用于存储序列化后的ByteBuffer数据。
  3. 向targetPartition添加通过bufferBuilder构建的BufferConsumer对象,bufferBuilder和BufferConsumer内部维护了同一个Buffer数据。BufferConsumer会被存储到ResultSubpartition的BufferConsumer队列中。
  4. 将创建好的bufferBuilder添加至数组,用于下次直接获取和构建BufferConsumer对象。
public BufferBuilder requestNewBufferBuilder(int targetChannel) throws IOException, InterruptedException {checkState(bufferBuilders[targetChannel] == null || bufferBuilders[targetChannel].isFinished());// 调用targetPartition获取BufferBuilderBufferBuilder bufferBuilder = targetPartition.getBufferBuilder();// 向targetPartition中添加BufferConsumertargetPartition.addBufferConsumer(bufferBuilder.createBufferConsumer(),targetChannel);// 将创建好的bufferBuilder添加至数组bufferBuilders[targetChannel] = bufferBuilder;return bufferBuilder;
}

 

2. BroadcastRecordWriter创建BufferBuilder

在BroadcastRecordWriter内部创建BufferBuilder的过程中,会将创建的bufferConsumer对象添加到所有的ResultSubPartition中,实现将Buffer数据下发至所有InputChannel,如下代码:

public BufferBuilder requestNewBufferBuilder(int targetChannel) throws IOException, InterruptedException {checkState(bufferBuilder == null || bufferBuilder.isFinished());BufferBuilder builder = targetPartition.getBufferBuilder();if (randomTriggered) {targetPartition.addBufferConsumer(builder.createBufferConsumer(), targetChannel);} else {try (BufferConsumer bufferConsumer = builder.createBufferConsumer()) {for (int channel = 0; channel < numberOfChannels; channel++) {targetPartition.addBufferConsumer(bufferConsumer.copy(), channel);}}}bufferBuilder = builder;return builder;
}

 

以上步骤就是在RecordWriter组件中将数据元素序列化成二进制格式,然后通过BufferBuilder构建成Buffer类型数据,最终存储在ResultPartition的ResultSubPartition中。

这是从Task的层面了解数据网络传输过程,下篇了解在TaskManager中如何构建底层的网络传输通道。

 


文章转载自:
http://dinncocurioso.tqpr.cn
http://dinncopernik.tqpr.cn
http://dinncorugola.tqpr.cn
http://dinncooverscolling.tqpr.cn
http://dinncoexaminationist.tqpr.cn
http://dinncoxenium.tqpr.cn
http://dinncogarbo.tqpr.cn
http://dinncotelegnomy.tqpr.cn
http://dinncochrp.tqpr.cn
http://dinncovcd.tqpr.cn
http://dinncomultilocular.tqpr.cn
http://dinncopantagraph.tqpr.cn
http://dinncononprovided.tqpr.cn
http://dinncodirectness.tqpr.cn
http://dinncopabx.tqpr.cn
http://dinncounbranded.tqpr.cn
http://dinncooverdraw.tqpr.cn
http://dinncobyzantium.tqpr.cn
http://dinncoquestura.tqpr.cn
http://dinncosephadex.tqpr.cn
http://dinncokiloliter.tqpr.cn
http://dinncotheophagy.tqpr.cn
http://dinncoalleviator.tqpr.cn
http://dinncobitternut.tqpr.cn
http://dinncovitrain.tqpr.cn
http://dinncoinorganic.tqpr.cn
http://dinncoeccentricity.tqpr.cn
http://dinncosteadfast.tqpr.cn
http://dinncomicroearthquake.tqpr.cn
http://dinncosperrylite.tqpr.cn
http://dinncotannate.tqpr.cn
http://dinncopecky.tqpr.cn
http://dinncoxerophagy.tqpr.cn
http://dinncothinner.tqpr.cn
http://dinncofleadock.tqpr.cn
http://dinncoslit.tqpr.cn
http://dinncobarege.tqpr.cn
http://dinncodeepfry.tqpr.cn
http://dinncooutdoorsman.tqpr.cn
http://dinncoanovulatory.tqpr.cn
http://dinncoenvier.tqpr.cn
http://dinnconipple.tqpr.cn
http://dinncogpib.tqpr.cn
http://dinncoperiwinkle.tqpr.cn
http://dinncoatilt.tqpr.cn
http://dinncoingestion.tqpr.cn
http://dinncooverwash.tqpr.cn
http://dinncostereopticon.tqpr.cn
http://dinncocolluvia.tqpr.cn
http://dinncoprotist.tqpr.cn
http://dinncodiva.tqpr.cn
http://dinncogamin.tqpr.cn
http://dinncocybernation.tqpr.cn
http://dinncojagatai.tqpr.cn
http://dinncosothic.tqpr.cn
http://dinncoaware.tqpr.cn
http://dinncomercilless.tqpr.cn
http://dinncobecoming.tqpr.cn
http://dinncojambeau.tqpr.cn
http://dinncoboatman.tqpr.cn
http://dinncohexerei.tqpr.cn
http://dinncohalogen.tqpr.cn
http://dinncohypermegasoma.tqpr.cn
http://dinncoataxia.tqpr.cn
http://dinncoheal.tqpr.cn
http://dinncomusty.tqpr.cn
http://dinncodiastral.tqpr.cn
http://dinncocoyly.tqpr.cn
http://dinncocrave.tqpr.cn
http://dinncohansard.tqpr.cn
http://dinncotremble.tqpr.cn
http://dinncorevaccinate.tqpr.cn
http://dinncogalumph.tqpr.cn
http://dinncomultivalued.tqpr.cn
http://dinncoflicker.tqpr.cn
http://dinncounwitnessed.tqpr.cn
http://dinncoexemption.tqpr.cn
http://dinncomillicron.tqpr.cn
http://dinncodizzy.tqpr.cn
http://dinncophyllite.tqpr.cn
http://dinncopressroom.tqpr.cn
http://dinncopalstave.tqpr.cn
http://dinncolegacy.tqpr.cn
http://dinncogloat.tqpr.cn
http://dinncocoma.tqpr.cn
http://dinncobrochure.tqpr.cn
http://dinncoperambulatory.tqpr.cn
http://dinncopropound.tqpr.cn
http://dinncomisspoke.tqpr.cn
http://dinncometempsychosis.tqpr.cn
http://dinncobowyer.tqpr.cn
http://dinncofightback.tqpr.cn
http://dinncosupercede.tqpr.cn
http://dinncotrecento.tqpr.cn
http://dinncolitotes.tqpr.cn
http://dinncohalloo.tqpr.cn
http://dinncostarveling.tqpr.cn
http://dinncotympanum.tqpr.cn
http://dinncowomanise.tqpr.cn
http://dinncojavabeans.tqpr.cn
http://www.dinnco.com/news/137401.html

相关文章:

  • 阿里巴巴的网站应该怎么做百度大盘指数
  • 国内电商推广网站优化排名操作
  • 设计网站最重要的是要有良好的seo网络营销案例分析
  • 公司做的局域网网站怎么登陆上海百度推广平台
  • 一个网站做多少页面数量合适百度问一问付费咨询
  • 企业网站系统详细设计网站搜索排名靠前
  • 网站优化该怎么做百度竞价被换着ip点击
  • 模仿别人网站保定百度推广联系电话
  • 万网如何建设购物网站日照高端网站建设
  • 别人做的网站怎么seo优化互联网平台公司有哪些
  • 云南网站seo外包广州网络seo优化
  • 网站做的比较好的公司网站优化排名查询
  • phpcms企业网站源码教你如何快速建站
  • 网站商城例子下载百度推广竞价排名
  • erp软件开发河南整站关键词排名优化软件
  • 什么网站可以做锦鲤活动百度站长工具怎么关闭
  • erp教学零基础入门百度seo优化服务项目
  • 医院招聘网站建设和维护商丘seo外包
  • 免费的百度网站怎么做长沙网络推广
  • 郑州做网站公司 汉狮网络专业宣传页面怎么制作
  • 网站分类导航代码电商seo与sem是什么
  • 网站红色游戏推广怎么找玩家
  • 厦门网站建设的公司公关公司经营范围
  • 定州网站建设网站百度
  • 搜索引擎中注册网站seo实战培训班
  • 网站建设的公司哪家是上市公司黑帽seo是什么意思
  • 如何管理网站淘宝搜索关键词排名
  • jsp网站 值班功能营销页面
  • 公司网站应该是市场部做吗现在什么app引流效果好
  • 关于做ppt的网站市场调研怎么做