当前位置: 首页 > news >正文

平台创建seo排名首页

平台创建,seo排名首页,青岛公司建站,中国新闻社是什么级别背景 之前讨论的都是’hoodie.datasource.write.operation’:bulk_insert’的前提下,在这种模式下,是没有json文件的已形成如下的文件: /dt1/.hoodie_partition_metadata /dt1/2ffe3579-6ddb-4c5f-bf03-5c1b5dfce0a0-0_0-41263-0_202305282…

背景

之前讨论的都是’hoodie.datasource.write.operation’:'bulk_insert’的前提下,在这种模式下,是没有json文件的已形成如下的文件:

/dt=1/.hoodie_partition_metadata
/dt=1/2ffe3579-6ddb-4c5f-bf03-5c1b5dfce0a0-0_0-41263-0_20230528233336713.parquet
/dt=1/30b7d5b2-12e8-415a-8ec5-18206fe601c0-0_0-22102-0_20230528231643200.parquet
/dt=1/4abc1c6d-a8aa-4c15-affc-61a35171ce69-0_4-22106-0_20230528231643200.parquet
/dt=1/513dee80-2e8c-4db8-baee-a767b9dba41c-0_2-22104-0_20230528231643200.parquet
/dt=1/57076f86-0a62-4f52-8b50-31a5f769b26a-0_1-22103-0_20230528231643200.parquet
/dt=1/84553727-be9d-4273-bad9-0a38d9240815-0_0-59818-0_20230528233513387.parquet
/dt=1/fecd6a84-9a74-40b1-bfc1-13612a67a785-0_0-26640-0_20230528231723951.parquet

因为是"bulk insert"操作,所以没有去重的需要,所以直接采用spark原生的方式,
以下我们讨论非spark原生的方式,

闲说杂谈

继续Apache Hudi初探(二)(与spark的结合)
剩下的代码:

 val reconcileSchema = parameters(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBooleanval (writeResult, writeClient: SparkRDDWriteClient[HoodieRecordPayload[Nothing]]) =...case _ => { // any other operation// register classes & schemasval (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tblName)sparkContext.getConf.registerKryoClasses(Array(classOf[org.apache.avro.generic.GenericData],classOf[org.apache.avro.Schema]))// TODO(HUDI-4472) revisit and simplify schema handlingval sourceSchema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace)val latestTableSchema = getLatestTableSchema(sqlContext.sparkSession, tableMetaClient).getOrElse(sourceSchema)val schemaEvolutionEnabled = parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key(), "false").toBooleanvar internalSchemaOpt = getLatestTableInternalSchema(hoodieConfig, tableMetaClient)val writerSchema: Schema =if (reconcileSchema) {// In case we need to reconcile the schema and schema evolution is enabled,// we will force-apply schema evolution to the writer's schemaif (schemaEvolutionEnabled && internalSchemaOpt.isEmpty) {internalSchemaOpt = Some(AvroInternalSchemaConverter.convert(sourceSchema))}if (internalSchemaOpt.isDefined) {...// Convert to RDD[HoodieRecord]val genericRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, structName, nameSpace, reconcileSchema,org.apache.hudi.common.util.Option.of(writerSchema))val shouldCombine = parameters(INSERT_DROP_DUPS.key()).toBoolean ||operation.equals(WriteOperationType.UPSERT) ||parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT.key(),HoodieWriteConfig.COMBINE_BEFORE_INSERT.defaultValue()).toBooleanval hoodieAllIncomingRecords = genericRecords.map(gr => {val processedRecord = getProcessedRecord(partitionColumns, gr, dropPartitionColumns)val hoodieRecord = if (shouldCombine) {val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr, hoodieConfig.getString(PRECOMBINE_FIELD), false, parameters.getOrElse(DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()).toBoolean).asInstanceOf[Comparable[_]]DataSourceUtils.createHoodieRecord(processedRecord,orderingVal,keyGenerator.getKey(gr),hoodieConfig.getString(PAYLOAD_CLASS_NAME))} else {DataSourceUtils.createHoodieRecord(processedRecord, keyGenerator.getKey(gr), hoodieConfig.getString(PAYLOAD_CLASS_NAME))}hoodieRecord}).toJavaRDD()val writerDataSchema = if (dropPartitionColumns) generateSchemaWithoutPartitionColumns(partitionColumns, writerSchema) else writerSchema// Create a HoodieWriteClient & issue the write.val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, writerDataSchema.toString, path,tblName, mapAsJavaMap(addSchemaEvolutionParameters(parameters, internalSchemaOpt) - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key))).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]if (isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())) {asyncCompactionTriggerFn.get.apply(client)}if (isAsyncClusteringEnabled(client, parameters)) {asyncClusteringTriggerFn.get.apply(client)}val hoodieRecords =if (hoodieConfig.getBoolean(INSERT_DROP_DUPS)) {DataSourceUtils.dropDuplicates(jsc, hoodieAllIncomingRecords, mapAsJavaMap(parameters))} else {hoodieAllIncomingRecords}client.startCommitWithTime(instantTime, commitActionType)val writeResult = DataSourceUtils.doWriteOperation(client, hoodieRecords, instantTime, operation)(writeResult, client)}
  • 如果开启了Schema Evolution,也就是hoodie.datasource.write.reconcile.schematrue,默认是false,就会进行schema的合并
    convertStructTypeToAvroSchema 把df的schema转换成avro的schema
    并且从*.hoodie/20230530073115535.deltacommit* 获取internalSchemaOpt,具体的合并就是把即将写入的schema和internalSchemaOpt进行合并
    最后赋值给writerSchema,有可能还需要hoodie.schema.on.read.enable,默认是false

  • HoodieSparkUtils.createRdd 创建RDD
    把df转换为了RDD[GenericRecord]类型,赋值给genericRecords

  • val hoodieAllIncomingRecords = genericRecords.map(gr => {

    • 首先如果是hoodie.datasource.write.drop.partition.columnstrue(默认是false),则会从schema中删除hoodie.datasource.write.
      partitionpath.field
      字段
    • 如果hoodie.datasource.write.insert.drop.duplicatestrue(默认是false)或者hoodie.datasource.write.operationupsert(默认
      upsert),或者hoodie.combine.before.inserttrue(默认是false),
      则会创建HoodieAvroRecord<>(hKey, payload)类型的实例,其中HoodieKeyrecordkey和partitionpath组成,playloadOverwriteWithLatestAvroPayload实例
    • hoodieAllIncomingRecords就变成了RDD[HoodieAvroRecord]
  • writerDataSchema= client 这些就是创建SparkRDDWriteClient 客户端

  • isAsyncCompactionEnabled
    默认asyncCompactionTriggerFnDefined是没有的,所以不会开启异步的CompactionisAsyncClusteringEnabled同理也是

  • val hoodieRecords =
    如果配置了hoodie.datasource.write.insert.drop.duplicatestrue(默认是false),则会进行去重处理,具体是调用DataSourceUtils.dropDuplicates方法:

    SparkRDDReadClient client = new SparkRDDReadClient<>(new HoodieSparkEngineContext(jssc), writeConfig);return client.tagLocation(incomingHoodieRecords).filter(r -> !((HoodieRecord<HoodieRecordPayload>) r).isCurrentLocationKnown());
    
    • SparkRDDReadClient client 在创建Client的时候,会进行索引的创建this.index = SparkHoodieIndexFactory.createIndex(clientConfig);
      如果有hoodie.index.class设置,则实例化对象,否则根据hoodie.index.type的值来建立索引(默认是HoodieSimpleIndex,适合做测试用)
    • client.tagLocation(incomingHoodieRecords)…
      从要插入的记录中过滤出在index中不存在的记录,最终调用的是index.tagLocation方法
      如果hoodie.datasource.write.insert.drop.duplicatesfalse,则保留所有的记录
  • client.startCommitWithTime 开始写操作,这涉及到回滚的操作

    • 会先过滤出需要回滚的的的写失败的文件,如果hoodie.cleaner.policy.failed.writesEAGER(默认是EAGER),就会在这次提交中回滚失败的文件
    • 然后创建一个后缀为deltacommit.requested的文件,此时没有真正的写
  • val writeResult = DataSourceUtils.doWriteOperation
    真正的写操作

http://www.dinnco.com/news/67348.html

相关文章:

  • 做网站搜爬闪石家庄疫情最新情况
  • 网站专题页面设计规范二十条优化措施原文
  • 用js做网站阅读量网址大全浏览器
  • 网页设计师考什么九江seo
  • 网站建设 建站知识做营销型网站哪家好
  • h5 网站模板人教版优化设计电子书
  • 中国建筑网官网手机版深圳seo优化排名
  • 凡科建站好用吗亚马逊关键词排名查询工具
  • 自建电商网站销售商品郑州聚商网络科技有限公司
  • php简易购物网站开发凡科网建站系统源码
  • 高端网站开发哪家好广告门
  • 定制微信便宜的seo网络营销推广
  • 做期货看什么网站的资讯今日头条新闻视频
  • 梧州网站优化竞价托管推广多少钱
  • 铜仁建设厅官方网站企业网站设计代码
  • 景德镇市建设局网站关键词排名是什么意思
  • 做h动漫的动漫视频在线观看网站新闻播报最新
  • 南康做网站进入百度app查看
  • 做网站需要登陆服务器网站吗凌云seo博客
  • 上海c网站建设百度seo搜索引擎优化
  • 武汉海绵城市建设有限公司网站互联网营销师考试题及答案
  • 如何网站建设平台搜索引擎 磁力吧
  • wordpress 公安备案关键词优化怎么优化
  • 网站建设三要素口碑推广
  • 葫芦岛网站建设厦门seo推广外包
  • html手机网站怎么做百度域名注册查询
  • 做美食网站的背景刚刚发生了一件大事
  • 网站建设价格槽闸阀网站建设网络营销
  • 不知道怎么选seo站
  • 邯郸做网站流程百度官网下载安装免费