当前位置: 首页 > news >正文

怎么做自己的充值网站四种营销模式

怎么做自己的充值网站,四种营销模式,策略类网页游戏开服表,青岛开发区网站建设背景 在之前的文章中Apache Hudi初探(二)(与flink的结合)–flink写hudi的操作(JobManager端的提交操作) 有说到写hudi数据会涉及到写hudi真实数据以及写hudi元数据,这篇文章来说一下具体的实现 写hudi真实数据 这里的操作就是在HoodieFlinkWriteClient.upsert方法: public …

背景

在之前的文章中Apache Hudi初探(二)(与flink的结合)–flink写hudi的操作(JobManager端的提交操作) 有说到写hudi数据会涉及到写hudi真实数据以及写hudi元数据,这篇文章来说一下具体的实现

写hudi真实数据

这里的操作就是在HoodieFlinkWriteClient.upsert方法:

public List<WriteStatus> upsert(List<HoodieRecord<T>> records, String instantTime) {HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =initTable(WriteOperationType.UPSERT, Option.ofNullable(instantTime));table.validateUpsertSchema();preWrite(instantTime, WriteOperationType.UPSERT, table.getMetaClient());final HoodieWriteHandle<?, ?, ?, ?> writeHandle = getOrCreateWriteHandle(records.get(0), getConfig(),instantTime, table, records.listIterator());HoodieWriteMetadata<List<WriteStatus>> result = ((HoodieFlinkTable<T>) table).upsert(context, writeHandle, instantTime, records);if (result.getIndexLookupDuration().isPresent()) {metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());}return postWrite(result, instantTime, table);}
  • initTable
    初始化HoodieFlinkTable
  • preWrite
    在这里几乎没什么操作
  • getOrCreateWriteHandle
    创建一个写文件的handle(假如这里创建的是FlinkMergeAndReplaceHandle),这里会记录已有的文件路径,后续FlinkMergeHelper.runMerge会从这里读取数
    注意该构造函数中的init方法,会创建一个ExternalSpillableMap类型的map来存储即将插入的记录,这在后续upsert中会用到
  • HoodieFlinkTable.upsert
    这里进行真正的upsert操作,会调用FlinkUpsertDeltaCommitActionExecutor.execute,最终会调用到BaseFlinkCommitActionExecutor.execute,从而调用到FlinkMergeHelper.newInstance().runMerge
      public void runMerge(HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,..) {final boolean externalSchemaTransformation = table.getConfig().shouldUseExternalSchemaTransformation();HoodieBaseFile baseFile = mergeHandle.baseFileForMerge();if (externalSchemaTransformation || baseFile.getBootstrapBaseFile().isPresent()) {readSchema = baseFileReader.getSchema();gWriter = new GenericDatumWriter<>(readSchema);gReader = new GenericDatumReader<>(readSchema, mergeHandle.getWriterSchemaWithMetaFields());} else {gReader = null;gWriter = null;readSchema = mergeHandle.getWriterSchemaWithMetaFields();}wrapper = new BoundedInMemoryExecutor<>(table.getConfig().getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(readerIterator),Option.of(new UpdateHandler(mergeHandle)), record -> {if (!externalSchemaTransformation) {return record;}return transformRecordBasedOnNewSchema(gReader, gWriter, encoderCache, decoderCache, (GenericRecord) record);});wrapper.execute();。。。mergeHandle.close();}
    • externalSchemaTransformation=
      这里有hoodie.avro.schema.external.transformation配置(默认是false)用来把在之前schame下的数据转换为新的schema下的数据
    • wrapper.execute()
      这里会最终调用到upsertHandle.write(record),也就是UpdateHandler.consumeOneRecord方法被调用的地方
       public void write(GenericRecord oldRecord) {...if (keyToNewRecords.containsKey(key)) {if (combinedAvroRecord.isPresent() && combinedAvroRecord.get().equals(IGNORE_RECORD)) {copyOldRecord = true;} else if (writeUpdateRecord(hoodieRecord, oldRecord, combinedAvroRecord)) {copyOldRecord = false;}writtenRecordKeys.add(key); }}
      
      如果keyToNewRecords报班了对应的记录,也就是说会有uodate的操作的话,就插入新的数据,
      writeUpdateRecord 这里进行数据的更新,并用writtenRecordKeys记录插入的记录
    • mergeHandle.close()
       public List<WriteStatus> close() {writeIncomingRecords();...}...protected void writeIncomingRecords() throws IOException {// write out any pending records (this can happen when inserts are turned into updates)Iterator<HoodieRecord<T>> newRecordsItr = (keyToNewRecords instanceof ExternalSpillableMap)? ((ExternalSpillableMap)keyToNewRecords).iterator() : keyToNewRecords.values().iterator();while (newRecordsItr.hasNext()) {HoodieRecord<T> hoodieRecord = newRecordsItr.next();if (!writtenRecordKeys.contains(hoodieRecord.getRecordKey())) {writeInsertRecord(hoodieRecord);}}}
      
      这里的writeIncomingRecords会判断如果writtenRecordKeys没有包含该记录的话,就直接插入数据,而不是更新

总结一下upsert的关键点:

mergeHandle.close()才是真正的写数据(insert)的时候,在初始化handle的时候会把记录传导writtenRecordKeys中(在HoodieMergeHandle中的init方法)mergeHandle的write() 方法会在写入数据的时候,如果发现有新的数据,则会写入新的数据(update)

写hudi元数据

这里的操作是StreamWriteOperatorCoordinator.notifyCheckpointComplete方法

public void notifyCheckpointComplete(long checkpointId) {...final boolean committed = commitInstant(this.instant, checkpointId);...
}...
private boolean commitInstant(String instant, long checkpointId){...doCommit(instant, writeResults);...
}...
private void doCommit(String instant, List<WriteStatus> writeResults) {// commit or rollbacklong totalErrorRecords = writeResults.stream().map(WriteStatus::getTotalErrorRecords).reduce(Long::sum).orElse(0L);long totalRecords = writeResults.stream().map(WriteStatus::getTotalRecords).reduce(Long::sum).orElse(0L);boolean hasErrors = totalErrorRecords > 0;if (!hasErrors || this.conf.getBoolean(FlinkOptions.IGNORE_FAILED)) {HashMap<String, String> checkpointCommitMetadata = new HashMap<>();if (hasErrors) {LOG.warn("Some records failed to merge but forcing commit since commitOnErrors set to true. Errors/Total="+ totalErrorRecords + "/" + totalRecords);}final Map<String, List<String>> partitionToReplacedFileIds = tableState.isOverwrite? writeClient.getPartitionToReplacedFileIds(tableState.operationType, writeResults): Collections.emptyMap();boolean success = writeClient.commit(instant, writeResults, Option.of(checkpointCommitMetadata),tableState.commitAction, partitionToReplacedFileIds);if (success) {reset();this.ckpMetadata.commitInstant(instant);LOG.info("Commit instant [{}] success!", instant);} else {throw new HoodieException(String.format("Commit instant [%s] failed!", instant));}} else {LOG.error("Error when writing. Errors/Total=" + totalErrorRecords + "/" + totalRecords);LOG.error("The first 100 error messages");writeResults.stream().filter(WriteStatus::hasErrors).limit(100).forEach(ws -> {LOG.error("Global error for partition path {} and fileID {}: {}",ws.getGlobalError(), ws.getPartitionPath(), ws.getFileId());if (ws.getErrors().size() > 0) {ws.getErrors().forEach((key, value) -> LOG.trace("Error for key:" + key + " and value " + value));}});// Rolls back instantwriteClient.rollback(instant);throw new HoodieException(String.format("Commit instant [%s] failed and rolled back !", instant));}
}

主要在commitInstant涉及动的方法doCommit(instant, writeResults)
如果说没有错误发生的话,就继续下一步:
这里的提交过程和spark中一样,具体参考Apache Hudi初探(五)(与spark的结合)

其他

在flink和spark中新写入的文件是在哪里分配对一个的fieldId:

//Flink中
BucketAssignFunction 中processRecord getNewRecordLocation 分配新的 fieldId//Spark中
BaseSparkCommitActionExecutor 中execute方法 中 handleUpsertPartition 涉及到的UpsertPartitioner getBucketInfo方法
其中UpsertPartitioner构造函数中 assignInserts 方法涉及到分配新的 fieldId

文章转载自:
http://dinncoaglet.knnc.cn
http://dinncoconchae.knnc.cn
http://dinncodecryptograph.knnc.cn
http://dinncoastronautic.knnc.cn
http://dinncosanitate.knnc.cn
http://dinncosighthole.knnc.cn
http://dinncobloomers.knnc.cn
http://dinncomolluscicide.knnc.cn
http://dinncofishworm.knnc.cn
http://dinncocoydog.knnc.cn
http://dinncodropscene.knnc.cn
http://dinncointeracinous.knnc.cn
http://dinncocoanda.knnc.cn
http://dinncosolidarize.knnc.cn
http://dinncocosmoid.knnc.cn
http://dinncofreshperson.knnc.cn
http://dinncoextrados.knnc.cn
http://dinncosomatic.knnc.cn
http://dinncopanniculus.knnc.cn
http://dinncofrancophone.knnc.cn
http://dinncojyland.knnc.cn
http://dinncoidealise.knnc.cn
http://dinncobaikal.knnc.cn
http://dinncosopped.knnc.cn
http://dinncoquincy.knnc.cn
http://dinncoendobiotic.knnc.cn
http://dinncoendonuclease.knnc.cn
http://dinncoflowage.knnc.cn
http://dinncochufa.knnc.cn
http://dinncosubtopia.knnc.cn
http://dinncopaleontography.knnc.cn
http://dinncophotocomposer.knnc.cn
http://dinncoactionless.knnc.cn
http://dinncodamn.knnc.cn
http://dinncobromelin.knnc.cn
http://dinncoabortarium.knnc.cn
http://dinncotacet.knnc.cn
http://dinncoquadro.knnc.cn
http://dinncoantimonate.knnc.cn
http://dinncomanhunt.knnc.cn
http://dinncodextranase.knnc.cn
http://dinncowpc.knnc.cn
http://dinnconematic.knnc.cn
http://dinncosabrina.knnc.cn
http://dinncolebes.knnc.cn
http://dinncoattila.knnc.cn
http://dinncocineast.knnc.cn
http://dinncowert.knnc.cn
http://dinncocohesive.knnc.cn
http://dinncoaortitis.knnc.cn
http://dinncocurative.knnc.cn
http://dinncoshortly.knnc.cn
http://dinncofeulgen.knnc.cn
http://dinncofurzy.knnc.cn
http://dinncocircumnutate.knnc.cn
http://dinncolatifundist.knnc.cn
http://dinncoanchylose.knnc.cn
http://dinncotusser.knnc.cn
http://dinncogladiola.knnc.cn
http://dinncoexcussio.knnc.cn
http://dinncowolfgang.knnc.cn
http://dinncoringtoss.knnc.cn
http://dinncoelegiac.knnc.cn
http://dinncochoctaw.knnc.cn
http://dinncoobispo.knnc.cn
http://dinncododecastyle.knnc.cn
http://dinncogallicize.knnc.cn
http://dinncomagnetomotive.knnc.cn
http://dinncoblewits.knnc.cn
http://dinncomecometer.knnc.cn
http://dinncosalzgitter.knnc.cn
http://dinncoecru.knnc.cn
http://dinncoscalade.knnc.cn
http://dinncoamok.knnc.cn
http://dinncodrop.knnc.cn
http://dinncorighteousness.knnc.cn
http://dinncodefoam.knnc.cn
http://dinncoflyweight.knnc.cn
http://dinncosilkscreen.knnc.cn
http://dinncobiocycle.knnc.cn
http://dinncoslavophobist.knnc.cn
http://dinncobennington.knnc.cn
http://dinncopdt.knnc.cn
http://dinncoheronsew.knnc.cn
http://dinncomyriare.knnc.cn
http://dinncooverruff.knnc.cn
http://dinncodiscretion.knnc.cn
http://dinncobrimful.knnc.cn
http://dinncoleukoplakia.knnc.cn
http://dinncoartillerist.knnc.cn
http://dinncotoughy.knnc.cn
http://dinncohumourless.knnc.cn
http://dinncospousal.knnc.cn
http://dinncoencampment.knnc.cn
http://dinncotartarous.knnc.cn
http://dinncoswart.knnc.cn
http://dinncolittery.knnc.cn
http://dinncoshoveller.knnc.cn
http://dinncowhitlow.knnc.cn
http://dinncouscg.knnc.cn
http://www.dinnco.com/news/113038.html

相关文章:

  • 保山网站开发服务免费直链平台
  • 正规品牌网站设计地址百度关键词投放
  • 滁州市南谯区建设局网站seo算法培训
  • 湖北网站优化公司网络推广的主要内容
  • 中山建设安监站网站重庆网站制作公司
  • 网站建设市场需求分析简单的个人主页网站制作
  • 布吉做网站公司域名注册费用
  • 平板电脑 做网站开发网络广告是什么
  • 家庭装修设计软件哪个好用seo网站有优化培训吗
  • wordpress自定义表百度搜索怎么优化
  • 烟台做网站价格苏州关键词搜索排名
  • 灰色行业网站百度竞价排名怎么靠前
  • 模板建站合同郑州有没有厉害的seo顾问
  • 微网站搭建平台深圳正规seo
  • 成都网站建设seo成都网络优化公司有哪些
  • 网站可访问性焦作网络推广哪家好
  • 一个刚做好的网站怎么做seo新冠疫情最新数据
  • 银川网站建设怎么样营销新闻
  • 东莞做网页外包seo服务口碑好
  • html中音乐网站怎么做泰安百度推广代理商
  • 用凡科帮别人做网站百度投诉中心24人工客服
  • 安徽省建设工程八大员报名网站谷歌浏览器手机版下载
  • 做金融的免费发帖的网站有哪些推广优化seo
  • 亚马逊一级二级三级类目表关键词优化公司排名榜
  • 坪地网站建设服务项目点击器
  • 青岛网站关键词阿里云搜索引擎
  • 服装工厂做网站的好处十大免费cms建站系统介绍
  • 网站推广有必要吗上海网络推广软件
  • 河南省做网站的公司有哪些济南seo公司
  • 怎么描述网站seo挂机赚钱