转载

Hadoop CombineFileInputFormat原理及源码分析

引言

引用《Hadoop权威指南》原文如下:

Hadoop works better with a small number of large files than a large number of small files. One reason for this is that FileInputFormat generates splits in such a way that each split is all or part of a single file. If the file is very small (“small” means significantly smaller than an HDFS block) and there are a lot of them, each map task will process very little input, and there will be a lot of them (one per file), each of which imposes extra bookkeeping overhead. Compare a 1 GB file broken into sixteen 64 MB blocks and 10,000 or so 100 KB files. The 10,000 files use one map each, and the job time can be tens or hundreds of times slower than the equivalent one with a single input file and 16 map tasks.

The situation is alleviated somewhat by CombineFileInputFormat, which was designed to work well with small files. Where FileInputFormat creates a split per file, CombineFileInputFormat packs many files into each split so that each mapper has more to process. Crucially, CombineFileInputFormat takes node and rack locality into account when deciding which blocks to place in the same split, so it does not compromise the speed at which it can process the input in a typical MapReduce job.

可以看出,执行MR任务时如果“小文件”太多,对于Hadoop是很不友好的,而CombineFileInputFormat就是用来缓解这个问题的。CombineFileInputFormat通过将多个“小文件”合并为一个“切片”(再形成切片的过程中也考虑同一节点、同一机架的数据本地性),让每一个Mapper任务可以处理更多的数据,从而提高MR任务的执行速度。

思路

 

CombineFileInputFormat涉及到三个重要的属性:

mapred.max.split.size:同一节点或同一机架的数据块形成切片时,切片大小的最大值;

mapred.min.split.size.per.node:同一节点的数据块形成切片时,切片大小的最小值;

mapred.min.split.size.per.rack:同一机架的数据块形成切片时,切片大小的最小值。

切片形成过程:

(1)逐个节点(数据块)形成切片;

a.遍历并累加这个节点上的数据块,如果累加数据块大小大于或等于mapred.max.split.size,则将这些数据块形成一个切片,继承该过程,直到剩余数据块累加大小小于mapred.max.split.size,则进行下一步;

b.如果剩余数据块累加大小大于或等于mapred.min.split.size.per.node,则将这些剩余数据块形成一个切片,如果剩余数据块累加大小小于mapred.min.split.size.per.node,则这些数据块留待后续处理。

(2)逐个机架(数据块)形成切片;

a.遍历并累加这个机架上的数据块(这些数据块即为上一步遗留下来的数据块),如果累加数据块大小大于或等于mapred.max.split.size,则将这些数据块形成一个切片,继承该过程,直到剩余数据块累加大小小于mapred.max.split.size,则进行下一步;

b.如果剩余数据块累加大小大于或等于mapred.min.split.size.per.rack,则将这些剩余数据块形成一个切片,如果剩余数据块累加大小小于mapred.min.split.size.per.rack,则这些数据块留待后续处理。

(3)遍历并累加剩余数据块,如果数据块大小大于或等于mapred.max.split.size,则将这些数据块形成一个切片,继承该过程,直到剩余数据块累加大小小于mapred.max.split.size,则进行下一步;

(4)剩余数据块形成一个切片。

核心实现

 
// mapping from a rack name to the list of blocks it has HashMap<String, List<OneBlockInfo>> rackToBlocks =   new HashMap<String, List<OneBlockInfo>>(); // mapping from a block to the nodes on which it has replicas HashMap<OneBlockInfo, String[]> blockToNodes =   new HashMap<OneBlockInfo, String[]>(); // mapping from a node to the list of blocks that it contains HashMap<String, List<OneBlockInfo>> nodeToBlocks =   new HashMap<String, List<OneBlockInfo>>(); 

开始形成切片之前,需要初始化三个重要的映射关系:

rackToBlocks:机架和数据块的对应关系,即某一个机架上有哪些数据块;

blockToNodes:数据块与节点的对应关系,即一块数据块的“拷贝”位于哪些节点;

nodeToBlocks:节点和数据块的对应关系,即某一个节点上有哪些数据块;

初始化过程如下代码所示,其中每一个Path代表的文件被形成一个OneFileInfo对象,映射关系也在形成OneFileInfo的过程中被维护。

// populate all the blocks for all files long totLength = 0; for (int i = 0; i < paths.length; i++) {   files[i] = new OneFileInfo(paths[i], job,                               rackToBlocks, blockToNodes, nodeToBlocks, rackToNodes);   totLength += files[i].getLength(); }

(1)逐个节点(数据块)形成切片,代码如下:

// 保存当前切片所包含的数据块     ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>();     // 保存当前切片中的数据块属于哪些节点     ArrayList<String> nodes = new ArrayList<String>();     // 保存当前切片的大小     long curSplitSize = 0;       // process all nodes and create splits that are local to a node.      // 依次处理每个节点上的数据块     for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter = nodeToBlocks.entrySet().iterator(); iter.hasNext();) {       Map.Entry<String, List<OneBlockInfo>> one = iter.next();       nodes.add(one.getKey());       List<OneBlockInfo> blocksInNode = one.getValue();         // for each block, copy it into validBlocks. Delete it from blockToNodes so that the same block does not appear in        // two different splits.       // 依次处理每个数据块,注意blockToNodes变量的作用,它保证了同一数据块不会出现在两个切片中       for (OneBlockInfo oneblock : blocksInNode) {         if (blockToNodes.containsKey(oneblock)) {           validBlocks.add(oneblock);           blockToNodes.remove(oneblock);           curSplitSize += oneblock.length;             // if the accumulated split size exceeds the maximum, then create this split.           // 如果数据块累积大小大于或等于maxSize,则形成一个切片           if (maxSize != 0 && curSplitSize >= maxSize) {             // create an input split and add it to the splits array             addCreatedSplit(job, splits, nodes, validBlocks);             curSplitSize = 0;             validBlocks.clear();           }         }       }       // if there were any blocks left over and their combined size is       // larger than minSplitNode, then combine them into one split.       // Otherwise add them back to the unprocessed pool. It is likely        // that they will be combined with other blocks from the same rack later on.       // 如果剩余数据块大小大于或等于minSizeNode,则将这些数据块构成一个切片;       // 如果剩余数据块大小小于minSizeNode,则将这些数据块归还给blockToNodes,交由后期“同一机架”过程处理       if (minSizeNode != 0 && curSplitSize >= minSizeNode) {         // create an input split and add it to the splits array         addCreatedSplit(job, splits, nodes, validBlocks);       } else {         for (OneBlockInfo oneblock : validBlocks) {           blockToNodes.put(oneblock, oneblock.hosts);         }       }       validBlocks.clear();       nodes.clear();       curSplitSize = 0;     }

(2)逐个机架(数据块)形成切片,代码如下:

// if blocks in a rack are below the specified minimum size, then keep them     // in 'overflow'. After the processing of all racks is complete, these overflow     // blocks will be combined into splits.     // overflowBlocks用于保存“同一机架”过程处理之后剩余的数据块     ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>();     ArrayList<String> racks = new ArrayList<String>();       // Process all racks over and over again until there is no more work to do.     while (blockToNodes.size() > 0) {       // Create one split for this rack before moving over to the next rack.        // Come back to this rack after creating a single split for each of the        // remaining racks.       // Process one rack location at a time, Combine all possible blocks that       // reside on this rack as one split. (constrained by minimum and maximum       // split size).         // iterate over all racks        // 依次处理每个机架       for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter =             rackToBlocks.entrySet().iterator(); iter.hasNext();) {         Map.Entry<String, List<OneBlockInfo>> one = iter.next();         racks.add(one.getKey());         List<OneBlockInfo> blocks = one.getValue();           // for each block, copy it into validBlocks. Delete it from          // blockToNodes so that the same block does not appear in          // two different splits.         boolean createdSplit = false;         // 依次处理该机架的每个数据块         for (OneBlockInfo oneblock : blocks) {           if (blockToNodes.containsKey(oneblock)) {             validBlocks.add(oneblock);             blockToNodes.remove(oneblock);             curSplitSize += oneblock.length;                    // if the accumulated split size exceeds the maximum, then create this split.             // 如果数据块累积大小大于或等于maxSize,则形成一个切片             if (maxSize != 0 && curSplitSize >= maxSize) {               // create an input split and add it to the splits array               addCreatedSplit(job, splits, getHosts(racks), validBlocks);               createdSplit = true;               break;             }           }         }           // if we created a split, then just go to the next rack         if (createdSplit) {           curSplitSize = 0;           validBlocks.clear();           racks.clear();           continue;         }           if (!validBlocks.isEmpty()) {           // 如果剩余数据块大小大于或等于minSizeRack,则将这些数据块构成一个切片           if (minSizeRack != 0 && curSplitSize >= minSizeRack) {             // if there is a mimimum size specified, then create a single split             // otherwise, store these blocks into overflow data structure             addCreatedSplit(job, splits, getHosts(racks), validBlocks);           } else {             // There were a few blocks in this rack that remained to be processed.             // Keep them in 'overflow' block list. These will be combined later.             // 如果剩余数据块大小小于minSizeRack,则将这些数据块加入overflowBlocks             overflowBlocks.addAll(validBlocks);           }         }         curSplitSize = 0;         validBlocks.clear();         racks.clear();       }     }

(3)遍历并累加剩余数据块,代码如下:

// Process all overflow blocks     for (OneBlockInfo oneblock : overflowBlocks) {   validBlocks.add(oneblock);   curSplitSize += oneblock.length;   // This might cause an exiting rack location to be re-added,   // but it should be ok.   for (int i = 0; i < oneblock.racks.length; i++) {     racks.add(oneblock.racks[i]);   }   // if the accumulated split size exceeds the maximum, then    // create this split.   // 如果剩余数据块大小大于或等于maxSize,则将这些数据块构成一个切片   if (maxSize != 0 && curSplitSize >= maxSize) {     // create an input split and add it to the splits array     addCreatedSplit(job, splits, getHosts(racks), validBlocks);     curSplitSize = 0;     validBlocks.clear();     racks.clear();   }     } 

(4)剩余数据块形成一个切片,代码如下:

// Process any remaining blocks, if any.     if (!validBlocks.isEmpty()) {       addCreatedSplit(job, splits, getHosts(racks), validBlocks);     }

总结

CombineFileInputFormat形成切片过程中考虑数据本地性(同一节点、同一机架),首先处理同一节点的数据块,然后处理同一机架的数据块,最后处理剩余的数据块,可见本地性是逐步减弱的。另外CombineFileInputFormat是抽象的,具体使用时需要自己实现getRecordReader方法。

正文到此结束
Loading...