当前位置: 首页 > news >正文

网站开发保密协议范本怎么在百度做宣传广告

网站开发保密协议范本,怎么在百度做宣传广告,广西奶茶加盟网站建设,商标网商标注册查询通过Spark读取Parquet文件的基本流程 SQL > Spark解析SQL生成逻辑计划树 LogicalPlan > Spark创建扫描表/读取数据的逻辑计划结点 DataSourceV2ScanRelation > Spark优化逻辑计划树,生成物理计划树 SparkPlan > Spark根据不同的属性,将逻辑…

通过Spark读取Parquet文件的基本流程

SQL
==> Spark解析SQL生成逻辑计划树
LogicalPlan
==> Spark创建扫描表/读取数据的逻辑计划结点
DataSourceV2ScanRelation
==> Spark优化逻辑计划树,生成物理计划树
SparkPlan
==> Spark根据不同的属性,将逻辑计划结点DataSourceV2ScanRelation转换成物理计划结点BatchScanExec
BatchScanExec
==> BatchScanExec::inputRDD属性的延迟生成DataSourceRDD实例
DataSourceRDD
==> DataSourceRDD::compute方法创建PartitionReader实例
PartitionReader
==> Iceberg中实现了Spark中的BatchDataReader接口
BatchDataReader
==> BatchDataReader::open方法会创建Parquet文件上的迭代器(Spark中遍历数据的过程都是基于迭代器)
VectorizedParquetReader
==> VectorizedParquetReader::next方法,读取Parquet文件中的内容,并封装成Spark中的ColumnarBatch对象
ColumnarBatch

两种BaseBatchReader的实现类

BaseBatchReader支持以Batch + Vectorized的特性,读取底层的文件。

ColumnarBatchReader

通过VectorizedSparkParquetReaders::build Reader()静态方法创建的读取器,关键特性如下:

  1. 支持读取Delete File
  2. 以Arrow的格式直接读取Parquet文件
  3. 最终返回的数据集的类型为Spark.ColumnarBatch,是Spark中的实现类
  public static ColumnarBatchReader buildReader(Schema expectedSchema,MessageType fileSchema,Map<Integer, ?> idToConstant,DeleteFilter<InternalRow> deleteFilter) {return (ColumnarBatchReader)TypeWithSchemaVisitor.visit(expectedSchema.asStruct(),fileSchema,new ReaderBuilder(expectedSchema,fileSchema,NullCheckingForGet.NULL_CHECKING_ENABLED,idToConstant,ColumnarBatchReader::new,deleteFilter));

ArrowBatchReader

通过ArrowReader::buildReader()静态方法创建的读取器,关键特性如下:

  1. 不支持读取Delete File
  2. 以Arrow的格式直接读取Parquet文件
  3. 返回的最终结果为ColumnarBatch类型,是Iceberg内置的实现类

在Iceberg 1.2.x的版本中,只在测试用例中使用到,因此在这里不再讨论,它的实现比ColumnarBatchReader更简单。

ColumnarBatchReader的创建

DataSourceRDD::compute方法中创建PartitionReader实例

// 在计算RDD数据的过程中,会通过如下的方法创建一个实现了PartitionReader接口的具体类的实例,
// 这里partitionReaderFactory的类型为SparkColumnarReaderFactory,
// SparkColumnarReaderFactory类是Iceberg中的实现,它重写了createColumnarReader(InputPartition)接口
// 以返回一个PartitionReader<ColumnarBatch>的实例。
val batchReader = partitionReaderFactory.createColumnarReader(inputPartition)

PartitionReaderFactory.createColumnarReader方法创建BatchDataReader实例

class SparkColumnarReaderFactory implements PartitionReaderFactory {public PartitionReader<ColumnarBatch> createColumnarReader(InputPartition inputPartition) {SparkInputPartition partition = (SparkInputPartition) inputPartition;if (partition.allTasksOfType(FileScanTask.class)) {return new BatchDataReader(partition, batchSize);} else {throw new UnsupportedOperationException("Unsupported task group for columnar reads: " + partition.taskGroup());}}
}

BatchDataReader::open方法创建VectorizedParquetReader迭代器

BatchDataReader::open

class BatchDataReader extends BaseBatchReader<FileScanTask>implements PartitionReader<ColumnarBatch> {@Overrideprotected CloseableIterator<ColumnarBatch> open(FileScanTask task) {// 获取Data File的路径String filePath = task.file().path().toString();LOG.debug("Opening data file {}", filePath);// update the current file for Spark's filename() functionInputFileBlockHolder.set(filePath, task.start(), task.length());Map<Integer, ?> idToConstant = constantsMap(task, expectedSchema());// 获取底层文件的句柄InputFile inputFile = getInputFile(filePath);Preconditions.checkNotNull(inputFile, "Could not find InputFile associated with FileScanTask");// 获取数据文件对应的Delete FilesSparkDeleteFilter deleteFilter =task.deletes().isEmpty()? null: new SparkDeleteFilter(filePath, task.deletes(), counter());// 返回一个数据文件上的迭代器return newBatchIterable(inputFile,task.file().format(),task.start(),task.length(),task.residual(),idToConstant,deleteFilter).iterator();}
}

BaseBatchReader::newBatchIterable方法创建VectorizedParquetReader实例

VectorizedParquetReader类是最上层的类,它提供了对遍历文件内容的入口。

abstract class BaseBatchReader<T extends ScanTask> extends BaseReader<ColumnarBatch, T> {protected CloseableIterable<ColumnarBatch> newBatchIterable(InputFile inputFile,FileFormat format,long start,long length,Expression residual,Map<Integer, ?> idToConstant,SparkDeleteFilter deleteFilter) {switch (format) {case PARQUET:// 如果文件的格式是PARQUET,则创建一个Parquet上的迭代器return newParquetIterable(inputFile, start, length, residual, idToConstant, deleteFilter);case ORC:// 忽略,不讨论return newOrcIterable(inputFile, start, length, residual, idToConstant);default:throw new UnsupportedOperationException("Format: " + format + " not supported for batched reads");}}private CloseableIterable<ColumnarBatch> newParquetIterable(InputFile inputFile,long start,long length,Expression residual,Map<Integer, ?> idToConstant,SparkDeleteFilter deleteFilter) {// get required schema if there are deletesSchema requiredSchema = deleteFilter != null ? deleteFilter.requiredSchema() : expectedSchema();return Parquet.read(inputFile).project(requiredSchema).split(start, length)// 指定可以创建BaseBatchReader的实现类的实例的方法.createBatchedReaderFunc(fileSchema ->VectorizedSparkParquetReaders.buildReader(requiredSchema, fileSchema, idToConstant, deleteFilter)).recordsPerBatch(batchSize).filter(residual).caseSensitive(caseSensitive())// Spark eagerly consumes the batches. So the underlying memory allocated could be reused// without worrying about subsequent reads clobbering over each other. This improves// read performance as every batch read doesn't have to pay the cost of allocating memory..reuseContainers().withNameMapping(nameMapping()).build();}
}

ColumnarBatchReader::new方法创建ColumnarBatchReader实例

VectorizedSparkParquetReaders.buildReader()方法见第一大章节的简述。

public class ColumnarBatchReader extends BaseBatchReader<ColumnarBatch> {private final boolean hasIsDeletedColumn;private DeleteFilter<InternalRow> deletes = null;private long rowStartPosInBatch = 0;// 只有一个构造器,readers是保存了读取文件中每一个列(字段)的Reader,它们都是实现了VectorizedReader<T>接口的// VectorizedArrowReader<T>的实例public ColumnarBatchReader(List<VectorizedReader<?>> readers) {super(readers);// 遍历每一个字段的Reader类型,看看当前文件中是不是存在内置的列_deleted,它标识着当前当前行是不是被删除了。this.hasIsDeletedColumn =readers.stream().anyMatch(reader -> reader instanceof DeletedVectorReader);}
}

Parquet文件读取

通过前面的分析,知道对上层(Spark RDD)可见的接口,是由VectorizedParquetReader(一个Iterator的实现类)提供的,
它内部封装了对ColumnarBatchReader的操作。

VectorizedParquetReader::iterator方法,返回Parquet文件上的迭代器

public class VectorizedParquetReader<T> extends CloseableGroup implements CloseableIterable<T> {@Overridepublic CloseableIterator<T> iterator() {FileIterator<T> iter = new FileIterator<>(init());addCloseable(iter);return iter;}
}

FileIterator::next方法,读取数据

由于FilterIterator实现了JAVA中的Iterator接口,因此可以在compute Spark RDD时,通过这个迭代器,获取到文件中的内容,
也就是next()方法返回的ColumnarBatch对象。

  /*** 这里T的类型为ColumnarBatch。*/private static class FileIterator<T> implements CloseableIterator<T> {public T next() {if (!hasNext()) {throw new NoSuchElementException();}if (valuesRead >= nextRowGroupStart) {// 第一次执行时,valuesRead == nextRowGroupStart,表示开始读取一个新的RowGroup// 这里调用advance()后,nextRowGroupStart指向了下一个要读取的RowGroup的起始位置,// 但当前的RowGroup是还没有被读取的,被延迟到了后面的过程。advance();}// batchSize is an integer, so casting to integer is safe// 读取当前RowGroup的数据,其中://   nextRowGroupStart指向的是下一个RowGroup的起始位置,//   valuesRead的值表示一共读取了多少行// 这里必须有nextRowGroupStart >= nextRowGroupStart,而它们的差值就是当前RowGroup剩余的没有被读取的行int numValuesToRead = (int) Math.min(nextRowGroupStart - valuesRead, batchSize);// 读取指定数量的行,这里的model就是前面提到的ColumnarBatchReader的实例对象。if (reuseContainers) {this.last = model.read(last, numValuesToRead);} else {this.last = model.read(null, numValuesToRead);}// 累加读取的行数valuesRead += numValuesToRead;return last;}/*** 移动读取指针到下一个RowGroup的起始位置。*/private void advance() {while (shouldSkip[nextRowGroup]) {nextRowGroup += 1;reader.skipNextRowGroup();}PageReadStore pages;try {pages = reader.readNextRowGroup();} catch (IOException e) {throw new RuntimeIOException(e);}// 从绑定的RowGroups信息中,计算下一个RowGroup的起始位置long rowPosition = rowGroupsStartRowPos[nextRowGroup];model.setRowGroupInfo(pages, columnChunkMetadata.get(nextRowGroup), rowPosition);nextRowGroupStart += pages.getRowCount();nextRowGroup += 1;}}

ColumnarBatchReader::read

public class ColumnarBatchReader extends BaseBatchReader<ColumnarBatch> {protected final VectorHolder[] vectorHolders;@Overridepublic final ColumnarBatch read(ColumnarBatch reuse, int numRowsToRead) {if (reuse == null) {// 如果指定了不复用当前的VectorHolder来存储数据时,就关闭它们closeVectors();}// 由内部类ColumnBatchLoader负责代理进行真正的读取操作。ColumnarBatch columnarBatch = new ColumnBatchLoader(numRowsToRead).loadDataToColumnBatch();rowStartPosInBatch += numRowsToRead;return columnarBatch;}
}

ColumnBatchLoader::loadDataToColumnBatch读取数据,封装成ColumnarBatch对象

  private class ColumnBatchLoader {// 读取的数据记录总数private final int numRowsToRead;// the rowId mapping to skip deleted rows for all column vectors inside a batch, it is null when// there is no deletesprivate int[] rowIdMapping;// the array to indicate if a row is deleted or not, it is null when there is no "_deleted"// metadata columnprivate boolean[] isDeleted;ColumnBatchLoader(int numRowsToRead) {Preconditions.checkArgument(numRowsToRead > 0, "Invalid number of rows to read: %s", numRowsToRead);this.numRowsToRead = numRowsToRead;if (hasIsDeletedColumn) {isDeleted = new boolean[numRowsToRead];}}ColumnarBatch loadDataToColumnBatch() {// 对读取的数据记录进行过滤,得到未删除的数据记录总数int numRowsUndeleted = initRowIdMapping();// 以Arrows格式,读取每一列的数据,表示为Spark.ColumnVector类型ColumnVector[] arrowColumnVectors = readDataToColumnVectors();// 创建一个ColumnarBatch实例,包含所有存活的数据ColumnarBatch newColumnarBatch = new ColumnarBatch(arrowColumnVectors);newColumnarBatch.setNumRows(numRowsUndeleted);if (hasEqDeletes()) {// 如果有等值删除的文件存在,则还需要按值来过滤掉被删除的数据行// 由于基于等值删除的文件过滤数据时,需要知道每一行的实际值,因此只有将数据读取到内存中才知道哪一行要被删除掉applyEqDelete(newColumnarBatch);}if (hasIsDeletedColumn && rowIdMapping != null) {// 如果存在被删除的数据行,则需要重新分配行号,从0开始自然递增// reset the row id mapping array, so that it doesn't filter out the deleted rowsfor (int i = 0; i < numRowsToRead; i++) {rowIdMapping[i] = i;}newColumnarBatch.setNumRows(numRowsToRead);}// 返回return newColumnarBatch;}ColumnVector[] readDataToColumnVectors() {ColumnVector[] arrowColumnVectors = new ColumnVector[readers.length];ColumnVectorBuilder columnVectorBuilder = new ColumnVectorBuilder();for (int i = 0; i < readers.length; i += 1) {vectorHolders[i] = readers[i].read(vectorHolders[i], numRowsToRead);int numRowsInVector = vectorHolders[i].numValues();Preconditions.checkState(numRowsInVector == numRowsToRead,"Number of rows in the vector %s didn't match expected %s ",numRowsInVector,numRowsToRead);arrowColumnVectors[i] =columnVectorBuilder.withDeletedRows(rowIdMapping, isDeleted).build(vectorHolders[i], numRowsInVector);}return arrowColumnVectors;}boolean hasEqDeletes() {return deletes != null && deletes.hasEqDeletes();}int initRowIdMapping() {Pair<int[], Integer> posDeleteRowIdMapping = posDelRowIdMapping();if (posDeleteRowIdMapping != null) {rowIdMapping = posDeleteRowIdMapping.first();return posDeleteRowIdMapping.second();} else {rowIdMapping = initEqDeleteRowIdMapping();return numRowsToRead;}}/*** 如果当前文件包含 positions delete files,那么需要建立索引数据结构*/Pair<int[], Integer> posDelRowIdMapping() {if (deletes != null && deletes.hasPosDeletes()) {return buildPosDelRowIdMapping(deletes.deletedRowPositions());} else {return null;}}/*** Build a row id mapping inside a batch, which skips deleted rows. Here is an example of how we* delete 2 rows in a batch with 8 rows in total. [0,1,2,3,4,5,6,7] -- Original status of the* row id mapping array [F,F,F,F,F,F,F,F] -- Original status of the isDeleted array Position* delete 2, 6 [0,1,3,4,5,7,-,-] -- After applying position deletes [Set Num records to 6]* [F,F,T,F,F,F,T,F] -- After applying position deletes** @param deletedRowPositions a set of deleted row positions* @return the mapping array and the new num of rows in a batch, null if no row is deleted*/Pair<int[], Integer> buildPosDelRowIdMapping(PositionDeleteIndex deletedRowPositions) {if (deletedRowPositions == null) {return null;}// 为新读取的数据记录,创建一个数组,保存所有没有被删除的行号,从0开始// 基本算法:使用双指针,将所有未删除的行放到队列一端,且有序int[] posDelRowIdMapping = new int[numRowsToRead];int originalRowId = 0; // 指向待判定的行的下标int currentRowId = 0; // 存活行的下标while (originalRowId < numRowsToRead) {if (!deletedRowPositions.isDeleted(originalRowId + rowStartPosInBatch)) {// 如果当前行没有被删除,则将其添加到currentRowId指向的位置posDelRowIdMapping[currentRowId] = originalRowId;// currentRowId指向下一个待插入的位置  currentRowId++;} else {if (hasIsDeletedColumn) {isDeleted[originalRowId] = true;}deletes.incrementDeleteCount();}originalRowId++;}if (currentRowId == numRowsToRead) {// there is no delete in this batchreturn null;} else {return Pair.of(posDelRowIdMapping, currentRowId);}}int[] initEqDeleteRowIdMapping() {int[] eqDeleteRowIdMapping = null;if (hasEqDeletes()) {eqDeleteRowIdMapping = new int[numRowsToRead];for (int i = 0; i < numRowsToRead; i++) {eqDeleteRowIdMapping[i] = i;}}return eqDeleteRowIdMapping;}/*** Filter out the equality deleted rows. Here is an example, [0,1,2,3,4,5,6,7] -- Original* status of the row id mapping array [F,F,F,F,F,F,F,F] -- Original status of the isDeleted* array Position delete 2, 6 [0,1,3,4,5,7,-,-] -- After applying position deletes [Set Num* records to 6] [F,F,T,F,F,F,T,F] -- After applying position deletes Equality delete 1 <= x <=* 3 [0,4,5,7,-,-,-,-] -- After applying equality deletes [Set Num records to 4]* [F,T,T,T,F,F,T,F] -- After applying equality deletes** @param columnarBatch the {@link ColumnarBatch} to apply the equality delete*/void applyEqDelete(ColumnarBatch columnarBatch) {// 对经过position deletes 过滤的数据行,进行按值删除Iterator<InternalRow> it = columnarBatch.rowIterator();int rowId = 0;int currentRowId = 0;while (it.hasNext()) { // 行式遍历InternalRow row = it.next();if (deletes.eqDeletedRowFilter().test(row)) {// the row is NOT deleted// skip deleted rows by pointing to the next undeleted row Id// 更新成员变量rowIdMappingrowIdMapping[currentRowId] = rowIdMapping[rowId];currentRowId++;} else {if (hasIsDeletedColumn) {isDeleted[rowIdMapping[rowId]] = true;}deletes.incrementDeleteCount();}rowId++;}// 更新最新的存活记录数columnarBatch.setNumRows(currentRowId);}}
http://www.dinnco.com/news/63464.html

相关文章:

  • 网页设计师培训费用黑帽seo365t技术
  • 软件定制开发网站建设慧达seo免登录发布
  • 多功能垫块机seo优化诊断
  • 0基础微信小程序开发教程搜索引擎优化通常要注意的问题有
  • 访问的网站显示建设中郑州网站优化公司
  • 网站的建设是什么外链工具下载
  • ps软件免费下载安装谷歌seo招聘
  • mac 下载wordpressseo资料网
  • 哪些网站可以发布免费招聘信息怎么宣传网站
  • 自己在家怎么做网站服务器国际实时新闻
  • 专业做外贸网站如何在百度发布信息推广
  • 网站优秀作品提高工作效率心得体会
  • 游戏代理哪个平台靠谱临沂seo推广
  • 长沙网开亿面做网站多少钱有源码怎么搭建网站
  • 手机公司网站建设比较好的seo推广平台服务
  • 嘉兴网站建设技术托管百度站内搜索
  • 自己做装修网站东营网站seo
  • 网站规划建设与管理维护教学大纲软文推广的100个范例
  • 学校网站建设哪家好如何推广软件
  • 长春网站建设seo外包靠谱
  • 商旅通官网app广州网站优化服务商
  • 自助建站基础工作主要包括重庆seo网站推广优化
  • 艺术品网站开发电商运营平台
  • 做网站阜新想做app推广项目在哪找
  • 钦州做网站东莞优化网站关键词优化
  • 36kr网站用什么做的人工智能培训机构哪个好
  • 购物帮 做特惠的导购网站seo入门免费教程
  • 开发一个直播appseo优化网站技术排名百度推广
  • 做投标的在什么网站找信息html家乡网站设计
  • 做网站cnfg十大搜索引擎排名