Flink1.7.2 Dataset 文件切片计算方式和切片数据读取源码分析

  • 时间:
  • 浏览:1
  • 来源:大发5分排列5_极速5分排列3

从缓存区readBuffer一键复制当前行数据到 wrapBuffer

实际计算时,当计算最后有一一有1个切片时,可能剩下的数据大小小于 切片大小的1.1倍,就中放有一一有1个切片中,什么都没有切分了,直接把剩下的数据中放最后有一一有1个切片中,可能可能切之后,由于最后一切片数据量很小,浪费资源

计算实际切片的大小,blockSize 此处为文件大小,maxSplitSize 一般都小于blockSize,统统最后取的是切片的最大长度maxSplitSize

调置当前分片

当前切片信息

对当前切片进行处置 ,调用 DelimitedInputFormat.open(),//open还没让你之后开始真正的读数据,假使 定位,把第有一一有1个换行符,分到前有一一有1个分片,此人 从第六个换行符让你之后开始读取数据





Source 的操作链(ChainedFlatMapDriver,ChainedMapDriver,SynchronousChainedCombineDriver) 即 FlatMap -> Map -> Combine (SUM(1)),也假使 source读到的数据,都还要经过链上的算子操作

调用FileInputFormat.createInputSplits(并行度)再实际处置

把JobVertex 转化为ExecutionJobVertex,调用new ExecutionJobVertex(),ExecutionJobVertex中存了inputSplits,统统会根据并行并来计算inputSplits的个数

调置当前分片的让你之后开始位置

当前切片信息

对当前切片进行处置 ,调用 DelimitedInputFormat.open(),//open还没让你之后开始真正的读数据,假使 定位,把第有一一有1个换行符,分到前有一一有1个分片,此人 从第六个换行符让你之后开始读取数据

-转志Integer

读取一行数据,也假使 读到第有一一有1个换行符

流定位到让你之后开始位置

可能有换行符,还要删除换行符,在readBuffer

调置当前分片的让你之后开始位置

end

long maxSplitSize = totalLength / minNumSplits + (totalLength % minNumSplits == 0 ? 0 : 1);

把jobGraph是由JobVertex组成,调用executionGraph.attachJobGraph(sortedTopology) 把JobGraph转成ExecutionGraph,ExecutionGraph由ExecutionJobVertex组成,即把JobVertex转成ExecutionJobVertex

调置当前分片的长度

调置当前分片的长度

随机读到有一一有1个切片,给当前DataSourceTask使用,可能在Source读取数据时是不按key分区,也就不分谁处置,有任务来处置,就给有一一有1个切片处置就行,每给出有一一有1个从总的切片中移除

切片拆分的计算法律依据 ,初使值 bytesUnassigned = len(文件总数据长度),每分一次bytesUnassigned会减去当前切片的大小,也假使 bytesUnassigned每次都有还剩下总的数据大小,当bytesUnassigned > maxBytesForLastSplit 就突然 循环拆分切片,切片的长度为splitSize(切片大小) = 5, 让你之后开始位置从0让你之后开始,让你每个切片让你之后开始位置都还要加带让你所有切片大小 position += splitSize ;

当前切片默认值设置

Source 的操作链(ChainedFlatMapDriver,ChainedMapDriver,SynchronousChainedCombineDriver) 即 FlatMap -> Map -> Combine (SUM(1)),也假使 source读到的数据,都还要经过链上的算子操作

首先遍历路径是文件或目录,计算出所有文件中放List files = new ArrayList<>()中存储,计算出所有文件总大小totalLength,计算文件切片,当然是所有文件总大小来计算

第一次,startPos =0 ,count = 0,没读到数据

从缓存区readBuffer一键复制当前行数据到 wrapBuffer

流定位到让你之后开始位置

每个切片最大长度计算,totalLength = 9 为文件总长度,minNumSplits = 2 为并行度,也假使 9不到整除并行度2,说明有余数,可能把余数的数据单独在分配有一一有1个切片,有可能这有一一有1个切片的数据量很少,就浪费资源了,这里的做法是,余数的最大值,也假使 每个切片+1,就把这里多的余数分配到前面的每个切片中,也假使 每个切片的最大值为 9 / 2 + 1 = 5

本示例拆分的结果

当前切片默认值设置

可能while循环拆分切片是有条件的,bytesUnassigned > maxBytesForLastSplit,那可能bytesUnassigned <= maxBytesForLastSplit,就还要把剩下的数据,都中放最后有一一有1个切片中

随机读到有一一有1个切片,给当前DataSourceTask使用,可能在Source读取数据时是不按key分区,也就不分谁处置,有任务来处置,就给有一一有1个切片处置就行,每给出有一一有1个从总的切片中移除

调置当前分片

可能有换行符,还要删除换行符,在readBuffer