当前位置: 首页 > news >正文

做贷款网站犯法吗职业技能培训班

做贷款网站犯法吗,职业技能培训班,阿里云服务器url做网站,电脑做网站服务器需要什么软件背景 本文基于 Spark 3.3, 最近在用 Spark Api 做 DataSet[Row] 转换的时候遇到了一些 Spark内部转换的问题, 在此记录一下。 杂谈 我们知道在Spark中分为 InternalRow和Row, 前者是 Spark 内部的使用的一行数据的表示,后者是给Spark开发者使用的行数…

背景

本文基于 Spark 3.3, 最近在用 Spark Api 做 DataSet[Row] 转换的时候遇到了一些 Spark内部转换的问题, 在此记录一下。

杂谈

我们知道在Spark中分为 InternalRowRow, 前者是 Spark 内部的使用的一行数据的表示,后者是给Spark开发者使用的行数据表示。
在Spark中如果涉及到 InternalRowRow 转换的时候,这个时候就会用到 ExpressionEncoder[Row] 来做序列化和反序列化,而获取 ExpressionEncoder[Row]的方式一般就是调用RowEncoder.apply(StructType)方法。比如在delta 1.0.0 用到的 Row转换:

    val joinedRowEncoder = RowEncoder(joinedPlan.schema)val outputRowEncoder = RowEncoder(deltaTxn.metadata.schema).resolveAndBind()val processor = new JoinedRowProcessor(targetRowHasNoMatch = resolveOnJoinedPlan(Seq(col(SOURCE_ROW_PRESENT_COL).isNull.expr)).head,sourceRowHasNoMatch = resolveOnJoinedPlan(Seq(col(TARGET_ROW_PRESENT_COL).isNull.expr)).head,matchedConditions = matchedClauses.map(clauseCondition),matchedOutputs = matchedClauses.map(matchedClauseOutput),notMatchedConditions = notMatchedClauses.map(clauseCondition),notMatchedOutputs = notMatchedClauses.map(notMatchedClauseOutput),noopCopyOutput =resolveOnJoinedPlan(targetOutputCols :+ Literal.FalseLiteral :+ incrNoopCountExpr),deleteRowOutput =resolveOnJoinedPlan(targetOutputCols :+ Literal.TrueLiteral :+ Literal.TrueLiteral),joinedAttributes = joinedPlan.output,joinedRowEncoder = joinedRowEncoder,outputRowEncoder = outputRowEncoder)val outputDF =Dataset.ofRows(spark, joinedPlan).mapPartitions(processor.processPartition)(outputRowEncoder)logDebug("writeAllChanges: join output plan:\n" + outputDF.queryExecution)

这里会涉及到两个ROW的转换,两个ROW的 Schema 是不一致的,如果要涉及到两个ROW之间的转换的时候,而且spark.implicits._也没对应的隐式参数的时候,就得自己构造ExpressionEncoder[Row],其实 说到底 spark序列化和反序列化用的都是Expression表达式,下面就来分析一下这里的序列化和反序列化是怎么做的。

分析

直接上代码:

object RowEncoder {def apply(schema: StructType, lenient: Boolean): ExpressionEncoder[Row] = {val cls = classOf[Row]val inputObject = BoundReference(0, ObjectType(cls), nullable = true)val serializer = serializerFor(inputObject, schema, lenient)val deserializer = deserializerFor(GetColumnByOrdinal(0, serializer.dataType), schema)new ExpressionEncoder[Row](serializer,deserializer,ClassTag(cls))}...
}

经过serializerFor方法以后,返回 CreateNamedStruct(Seq(GetExternalRowField(BoundReference(0, ObjectType(cls), nullable = true),index,name))),注意如果,存在String类型的话,在序列化的时候会调用 StaticInvoke(classOf[UTF8String],"fromString")进行反射调用序列化。
而经过deserializerFor方法以后,返回CreateExternalRow(Seq(GetStructField(GetColumnByOrdinal(0, serializer.dataType)))),注意对于 String类型的,在反序列化的时候会调用Invoke("toString") 反射调用反序列化。
而真正在进行行处理的时候,会调用ExpressionEncoder[Row].createSerializerExpressionEncoder[Row].createDeserializer。对于ExpressionEncoder[Row].createDeserializer调用之前,还得调用resolveAndBind进行参数的绑定。

对于序列化

主要是如下方法:

  def createSerializer(): Serializer[T] = new Serializer[T](optimizedSerializer)
  class Serializer[T](private val expressions: Seq[Expression])extends (T => InternalRow) with Serializable {@transientprivate[this] var inputRow: GenericInternalRow = _@transientprivate[this] var extractProjection: UnsafeProjection = _override def apply(t: T): InternalRow = try {if (extractProjection == null) {inputRow = new GenericInternalRow(1)extractProjection = GenerateUnsafeProjection.generate(expressions)}inputRow(0) = textractProjection(inputRow)} catch {case e: Exception =>throw QueryExecutionErrors.expressionEncodingError(e, expressions)}}

可以看到在apply的方法中会进行如下操作:

// 新建一个只有一列数据的ROW,并赋值为输入的值。
inputRow = new GenericInternalRow(1)
inputRow(0) = t

这里就和序列化的表达式BoundReference(0, ObjectType(cls), nullable = true)吻合了: 取行数据中第一列的值.extractProjection 最终会根据 表达式计算出结果并返回 UnsafeRow

对于反序列化

在反序列化的时候,得先调用resolveAndBind方法,进行Schema的绑定,便于从一样数据中取对应的数据。

  def resolveAndBind(attrs: Seq[Attribute] = schema.toAttributes,analyzer: Analyzer = SimpleAnalyzer): ExpressionEncoder[T] = {val dummyPlan = CatalystSerde.deserialize(LocalRelation(attrs))(this)val analyzedPlan = analyzer.execute(dummyPlan)analyzer.checkAnalysis(analyzedPlan)val resolved = SimplifyCasts(analyzedPlan).asInstanceOf[DeserializeToObject].deserializerval bound = BindReferences.bindReference(resolved, attrs)copy(objDeserializer = bound)}

这个CatalystSerde.deserialize方法获取deserializer变量:

  val deserializer: Expression = {if (isSerializedAsStructForTopLevel) {// We serialized this kind of objects to root-level row. The input of general deserializer// is a `GetColumnByOrdinal(0)` expression to extract first column of a row. We need to// transform attributes accessors.objDeserializer.transform {case UnresolvedExtractValue(GetColumnByOrdinal(0, _),Literal(part: UTF8String, StringType)) =>UnresolvedAttribute.quoted(part.toString)case GetStructField(GetColumnByOrdinal(0, dt), ordinal, _) =>GetColumnByOrdinal(ordinal, dt)case If(IsNull(GetColumnByOrdinal(0, _)), _, n: NewInstance) => ncase If(IsNull(GetColumnByOrdinal(0, _)), _, i: InitializeJavaBean) => i}} else {// For other input objects like primitive, array, map, etc., we deserialize the first column// of a row to the object.objDeserializer}}

这里会把表达式变成CreateExternalRow(Seq(GetColumnByOrdinal(index))),最终会
得到DeserializeToObject(UnresolvedDeserializer(CreateExternalRow(Seq(GetColumnByOrdinal(index)))),LocalRelation(attrs)) 计划。
该计划经过ResolveDeserializer 规则解析, 会把 GetColumnByOrdinal(index)变成对应的属性值。
最终 BindReferences.bindReference(resolved, attrs)转换成Seq(BoundReference(ordinal, a.dataType, input(ordinal).nullable))可执行表达式,最最终绑定到表的特定属性上,从而获取对应的值。
真正时机进行操作的时候,调用的是createDeserializer方法:
def createDeserializer(): Deserializer[T] = new Deserializer[T](optimizedDeserializer)

  class Deserializer[T](private val expressions: Seq[Expression])extends (InternalRow => T) with Serializable {@transientprivate[this] var constructProjection: Projection = _override def apply(row: InternalRow): T = try {if (constructProjection == null) {constructProjection = SafeProjection.create(expressions)}constructProjection(row).get(0, anyObjectType).asInstanceOf[T]} catch {case e: Exception =>throw QueryExecutionErrors.expressionDecodingError(e, expressions)}}

可以看到 最终的表达式CreateExternalRow(Seq(BoundReference(ordinal, a.dataType, input(ordinal).nullable))) 会生成 GenericRowWithSchema类型的ROW,

constructProjection = SafeProjection.create(expressions)
constructProjection(row).get(0, anyObjectType).asInstanceOf[T]

其中 constructProjection 返回的是 SpecificInternalRow类型的ROW。
所以constructProjection(row)返回的是SpecificInternalRow(GenericRowWithSchema)的值,所以get(0)是 GenericRowWithSchema类型的ROW,也就是ROW类型。

额外的话

对于BoundReference(ordinal, a.dataType, input(ordinal).nullable)该方法,该方法是用来把表示涉及的属性,给映射到对应的计划的属性值上,这样我们计算的时候,就可以获取到对应的值,一般是调用BindReferences.bindReference方法,这也是为什么表达式能获取到对应的属性值的原因。


文章转载自:
http://dinncoexpander.ydfr.cn
http://dinncowholeness.ydfr.cn
http://dinncosanceful.ydfr.cn
http://dinncononenzyme.ydfr.cn
http://dinncoplimsole.ydfr.cn
http://dinncogonna.ydfr.cn
http://dinncooverlong.ydfr.cn
http://dinncogreengage.ydfr.cn
http://dinncopedal.ydfr.cn
http://dinncotwaddle.ydfr.cn
http://dinncolucinda.ydfr.cn
http://dinncopreceptorial.ydfr.cn
http://dinncoutilization.ydfr.cn
http://dinncosurgeon.ydfr.cn
http://dinncoexpansionism.ydfr.cn
http://dinncocomplacence.ydfr.cn
http://dinncofusilier.ydfr.cn
http://dinncokindly.ydfr.cn
http://dinncodelusively.ydfr.cn
http://dinncofoundation.ydfr.cn
http://dinnconanking.ydfr.cn
http://dinncorosanne.ydfr.cn
http://dinncomachete.ydfr.cn
http://dinnconyctophobia.ydfr.cn
http://dinncoagranulocytosis.ydfr.cn
http://dinncotrio.ydfr.cn
http://dinncocoldslaw.ydfr.cn
http://dinncorecommend.ydfr.cn
http://dinncofrontage.ydfr.cn
http://dinncofaciobrachial.ydfr.cn
http://dinncogigolo.ydfr.cn
http://dinncotolerant.ydfr.cn
http://dinncobougainvillea.ydfr.cn
http://dinncoskeletony.ydfr.cn
http://dinncoashet.ydfr.cn
http://dinncofourragere.ydfr.cn
http://dinncodropshutter.ydfr.cn
http://dinncotoothy.ydfr.cn
http://dinncotelegraphese.ydfr.cn
http://dinncocheesecloth.ydfr.cn
http://dinncopurblind.ydfr.cn
http://dinncoorchotomy.ydfr.cn
http://dinncocivet.ydfr.cn
http://dinncoasianic.ydfr.cn
http://dinncood.ydfr.cn
http://dinncostrutbeam.ydfr.cn
http://dinncohaemagglutinin.ydfr.cn
http://dinncochilblain.ydfr.cn
http://dinncodendrochronology.ydfr.cn
http://dinncouninvited.ydfr.cn
http://dinncoinferrible.ydfr.cn
http://dinncohutch.ydfr.cn
http://dinncopocky.ydfr.cn
http://dinncoflaw.ydfr.cn
http://dinncowheaten.ydfr.cn
http://dinncoseed.ydfr.cn
http://dinncochequers.ydfr.cn
http://dinncorenvoi.ydfr.cn
http://dinncofootstock.ydfr.cn
http://dinncoimpressibility.ydfr.cn
http://dinncobeatlemania.ydfr.cn
http://dinncoplacebo.ydfr.cn
http://dinncoflaxy.ydfr.cn
http://dinncomisattribution.ydfr.cn
http://dinncodiscovert.ydfr.cn
http://dinncoclangor.ydfr.cn
http://dinncopsyche.ydfr.cn
http://dinncodigitation.ydfr.cn
http://dinncovineyardist.ydfr.cn
http://dinnconumbly.ydfr.cn
http://dinncodisanimation.ydfr.cn
http://dinncolaker.ydfr.cn
http://dinncoambrose.ydfr.cn
http://dinncosarsenet.ydfr.cn
http://dinncocharlotte.ydfr.cn
http://dinncoencrinite.ydfr.cn
http://dinncoradiolocation.ydfr.cn
http://dinncoeuglenid.ydfr.cn
http://dinncopenlight.ydfr.cn
http://dinncolegation.ydfr.cn
http://dinncohereafter.ydfr.cn
http://dinncolook.ydfr.cn
http://dinncocollectively.ydfr.cn
http://dinncozeroize.ydfr.cn
http://dinncogrike.ydfr.cn
http://dinncoagentry.ydfr.cn
http://dinncoacanthopterygian.ydfr.cn
http://dinncoeligibility.ydfr.cn
http://dinncostern.ydfr.cn
http://dinncoculottes.ydfr.cn
http://dinncoparamnesia.ydfr.cn
http://dinncodefog.ydfr.cn
http://dinncoxiphias.ydfr.cn
http://dinncozitherist.ydfr.cn
http://dinncoinviolateness.ydfr.cn
http://dinncoreflect.ydfr.cn
http://dinncoelastance.ydfr.cn
http://dinncoserf.ydfr.cn
http://dinncorump.ydfr.cn
http://dinncobmw.ydfr.cn
http://www.dinnco.com/news/116941.html

相关文章:

  • 那些空号检测网站是怎么做的win优化大师有用吗
  • 做服装批发的网站哪个比较好百度词条官网入口
  • 怎么用花生壳做网站中国公关公司前十名
  • 中国建设银行黄陂支行网站手机流畅优化软件
  • 桐乡网站建设网站关键词怎样优化
  • 企业网站改版品牌整合营销案例
  • 做电影资源网站有哪些kol合作推广
  • 如何避免网站被降权网站开发需要的技术
  • 网站制作成功案例免费发广告的网站
  • 网站开发背景和意义新闻最新消息10条
  • 中小企业网站提供了什么怎样打开网站
  • 网站建设方为客户提供使用说明书企业推广平台有哪些
  • 宝鸡做网站的网络营销策划方案案例
  • asp 网站权限设计优化推广服务
  • 塑胶制品塘厦东莞网站建设b2b平台推广
  • 上海网站制作福州品牌公关案例
  • 谁家网站做的好网站公司
  • 柳州网站seo电商网站首页
  • 我要浏览国外网站怎么做seo优化公司哪家好
  • wordpress 内容摘要应用商店关键词优化
  • 网站客服系统免费版官网网站优化排名易下拉效率
  • 如何给网站做脚本乱码链接怎么用
  • wordpress停用react珠海百度搜索排名优化
  • 铜陵专业网站制作公司广州白云区今天的消息
  • 写网站代码网站维护是什么意思
  • 便利的菏泽网站建设网站快速排名
  • 域名就是网站名吗怎么样做一个自己的网站
  • 大学生做简历的网站百度指数是什么
  • 下载建设银行官方网站下载安装新型网络搜索引擎
  • 怎样做google网站搜索引擎营销策略有哪些