3. EC HDFS WRITE调试篇

普通文件流使用的是DFSOutputStream,EC文件使用的流式DFSStripedOutputStream

public class DFSStripedOutputStream extends DFSOutputStream
    implements StreamCapabilities {...}

FSOutputSummer构造器中加断点,得到调用栈:

"main@1" prio=5 tid=0x1 nid=NA runnable
  java.lang.Thread.State: RUNNABLE
	  at org.apache.hadoop.fs.FSOutputSummer.<init>(FSOutputSummer.java:53)
	  at org.apache.hadoop.hdfs.DFSOutputStream.<init>(DFSOutputStream.java:191)
	  at org.apache.hadoop.hdfs.DFSOutputStream.<init>(DFSOutputStream.java:247)
	  at org.apache.hadoop.hdfs.DFSStripedOutputStream.<init>(DFSStripedOutputStream.java:291)
	  at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:310)
	  at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1216)
	  at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1195)
	  at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1133)
	  at org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:530)
	  at org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:527)
	  at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
	  at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:541)
	  at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:468)
	  at org.apache.hadoop.fs.FilterFileSystem.create(FilterFileSystem.java:193)
	  at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1194)
	  at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1174)
	  at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1063)
	  at org.apache.hadoop.fs.shell.CommandWithDestination$TargetFileSystem.create(CommandWithDestination.java:509)
	  at org.apache.hadoop.fs.shell.CommandWithDestination$TargetFileSystem.writeStreamToFile(CommandWithDestination.java:484)
	  at org.apache.hadoop.fs.shell.CommandWithDestination.copyStreamToTarget(CommandWithDestination.java:407)
	  at org.apache.hadoop.fs.shell.CommandWithDestination.copyFileToTarget(CommandWithDestination.java:342)
	  at org.apache.hadoop.fs.shell.CommandWithDestination.processPath(CommandWithDestination.java:277)
	  at org.apache.hadoop.fs.shell.CommandWithDestination.processPath(CommandWithDestination.java:262)
	  at org.apache.hadoop.fs.shell.Command.processPathInternal(Command.java:367)
	  at org.apache.hadoop.fs.shell.Command.processPaths(Command.java:331)
	  at org.apache.hadoop.fs.shell.Command.processPathArgument(Command.java:304)
	  at org.apache.hadoop.fs.shell.CommandWithDestination.processPathArgument(CommandWithDestination.java:257)
	  at org.apache.hadoop.fs.shell.Command.processArgument(Command.java:286)
	  at org.apache.hadoop.fs.shell.Command.processArguments(Command.java:270)
	  at org.apache.hadoop.fs.shell.CommandWithDestination.processArguments(CommandWithDestination.java:228)
	  at org.apache.hadoop.fs.shell.CopyCommands$Put.processArguments(CopyCommands.java:295)
	  at org.apache.hadoop.fs.shell.FsCommand.processRawArguments(FsCommand.java:122)
	  at org.apache.hadoop.fs.shell.Command.run(Command.java:177)
	  at org.apache.hadoop.fs.FsShell.run(FsShell.java:327)
	  at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76)
	  at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:90)
	  at cn.whbing.hadoop.ReadWriteTest.testWrite(ReadWriteTest.java:132)

可以看到在DFSOutputStream中的create方法中在创建输出流的时候,对类型做了判断。

public DFSOutputStream create(String src, FsPermission permission,
      EnumSet<CreateFlag> flag, boolean createParent, short replication,
      long blockSize, Progressable progress, int buffersize,
      ChecksumOpt checksumOpt, InetSocketAddress[] favoredNodes,
      String ecPolicyName) throws IOException {
    checkOpen();
    final FsPermission masked = applyUMask(permission);
    LOG.debug("{}: masked={}", src, masked);
    final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
        src, masked, flag, createParent, replication, blockSize, progress,
        dfsClientConf.createChecksum(checksumOpt),
        getFavoredNodesStr(favoredNodes), ecPolicyName);
    beginFileLease(result.getFileId(), result);
    return result;
  }

继续看newStreamForCreate:

      final DFSOutputStream out;
      if(stat.getErasureCodingPolicy() != null) {
        out = new DFSStripedOutputStream(dfsClient, src, stat,
            flag, progress, checksum, favoredNodes);
      } else {
        out = new DFSOutputStream(dfsClient, src, stat,
            flag, progress, checksum, favoredNodes, true);
      }
      out.start();

1.org.apache.hadoop.hdfs.DFSStripedOutputStream.<init>(DFSStripedOutputStream.java:291)

对于createFileappendFile,相对三副本,代码如下:

  // 创建一个新文件
  /** Construct a new output stream for creating a file. */
  DFSStripedOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
                         EnumSet<CreateFlag> flag, Progressable progress,
                         DataChecksum checksum, String[] favoredNodes)
                         throws IOException {
    super(dfsClient, src, stat, flag, progress, checksum, favoredNodes, false);
    if (LOG.isDebugEnabled()) {
      LOG.debug("Creating DFSStripedOutputStream for " + src);
    }
    // 在创建一个新的文件的stream时,相比三副本,多了一些参数。
    ecPolicy = stat.getErasureCodingPolicy();
    final int numParityBlocks = ecPolicy.getNumParityUnits();
    cellSize = ecPolicy.getCellSize();
    numDataBlocks = ecPolicy.getNumDataUnits();
    numAllBlocks = numDataBlocks + numParityBlocks;
    this.favoredNodes = favoredNodes;
    failedStreamers = new ArrayList<>();
    corruptBlockCountMap = new LinkedHashMap<>();
    flushAllExecutor = Executors.newFixedThreadPool(numAllBlocks);
    flushAllExecutorCompletionService = new
        ExecutorCompletionService<>(flushAllExecutor);

    ErasureCoderOptions coderOptions = new ErasureCoderOptions(
        numDataBlocks, numParityBlocks);
    encoder = CodecUtil.createRawEncoder(dfsClient.getConfiguration(),
        ecPolicy.getCodecName(), coderOptions);

    coordinator = new Coordinator(numAllBlocks);
    cellBuffers = new CellBuffers(numParityBlocks);

    streamers = new ArrayList<>(numAllBlocks);
    for (short i = 0; i < numAllBlocks; i++) {
      StripedDataStreamer streamer = new StripedDataStreamer(stat,
          dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager,
          favoredNodes, i, coordinator, getAddBlockFlags());
      streamers.add(streamer);
    }
    currentPackets = new DFSPacket[streamers.size()];
    setCurrentStreamer(0);
  }

  // 追加写
  /** Construct a new output stream for appending to a file. */
  DFSStripedOutputStream(DFSClient dfsClient, String src,
      EnumSet<CreateFlag> flags, Progressable progress, LocatedBlock lastBlock,
      HdfsFileStatus stat, DataChecksum checksum, String[] favoredNodes)
      throws IOException {
    this(dfsClient, src, stat, flags, progress, checksum, favoredNodes);
    initialFileSize = stat.getLen(); // length of file when opened
    prevBlockGroup4Append = lastBlock != null ? lastBlock.getBlock() : null;
  }

我们看一下这些字段的作用:
1.ecPolicy :ErasureCodingPolicy ecPolicy = null; 默认为null,不使用ec,就是三副本。
2.cellSize:每个cell的大小,必须为1024整数倍。

Preconditions.checkArgument(cellSize > 0, "cellSize must be positive");
    Preconditions.checkArgument(cellSize % 1024 == 0,
        "cellSize must be 1024 aligned");

3.encoder:编码工具。
4.streamers:在ec中,streamers是一个list,即多个streamer。个数同数据块的个数。

    streamers = new ArrayList<>(numAllBlocks);
    for (short i = 0; i < numAllBlocks; i++) {
      StripedDataStreamer streamer = new StripedDataStreamer(stat,
          dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager,
          favoredNodes, i, coordinator, getAddBlockFlags());
      streamers.add(streamer);
    }

5.currentPackets:currentPackets = new DFSPacket[streamers.size()];,每个streamer都有自己的packet。


在这里插入图片描述
三个内部类承担的角色如下:
在这里插入图片描述

  • 数据块记为:Data
  • 校验块记为:Parity
链接:HDFS EC:将纠删码技术融入HDFS
  • 对HDFS的一个普通文件来说,构成它的基本单位是块。
  • 对于EC模式下的文件,构成它的基本单位为块组。
  • 在EC模式下,构成文件的基本单位为块组,因此首先需要考虑的是如何在NameNode里保存每个文件的块组信息。一种比较直接的方法是给每个块组分配一个块ID,同时用一个Map来记录这个ID与块组信息的映射,每个块组信息包含了每个内部块的信息。对小文件来说,这种做法将增加其在NameNode中的内存消耗。以RS(6,3)为例,如果一个文件比6个块略小些,那么NameNode必须为它维护10个ID(1个块组ID、6个数据块ID和3个校验块ID)。在小文件数目占优的情况下,NameNode的内存使用将面临考验。

调用栈:

java.lang.Thread.State: RUNNABLE
	  at org.apache.hadoop.hdfs.DFSStripedOutputStream.allocateNewBlock(DFSStripedOutputStream.java:474)
	  at org.apache.hadoop.hdfs.DFSStripedOutputStream.writeChunk(DFSStripedOutputStream.java:541)
	  - locked <0xa87> (a org.apache.hadoop.hdfs.DFSStripedOutputStream)
	  at org.apache.hadoop.fs.FSOutputSummer.writeChecksumChunks(FSOutputSummer.java:217)
	  at org.apache.hadoop.fs.FSOutputSummer.write1(FSOutputSummer.java:125)
	  at org.apache.hadoop.fs.FSOutputSummer.write(FSOutputSummer.java:111)
	  at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:57)
	  at java.io.DataOutputStream.write(DataOutputStream.java:107)
	  - locked <0xa89> (a org.apache.hadoop.hdfs.client.HdfsDataOutputStream)
	  at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:96)
	  at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:68)
	  at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:129)
	  at org.apache.hadoop.fs.shell.CommandWithDestination$TargetFileSystem.writeStreamToFile(CommandWithDestination.java:485)
	  at org.apache.hadoop.fs.shell.CommandWithDestination.copyStreamToTarget(CommandWithDestination.java:407)
	  at org.apache.hadoop.fs.shell.CommandWithDestination.copyFileToTarget(CommandWithDestination.java:342)
	  at org.apache.hadoop.fs.shell.CommandWithDestination.processPath(CommandWithDestination.java:277)
	  at org.apache.hadoop.fs.shell.CommandWithDestination.processPath(CommandWithDestination.java:262)
	  at org.apache.hadoop.fs.shell.Command.processPathInternal(Command.java:367)
	  at org.apache.hadoop.fs.shell.Command.processPaths(Command.java:331)
	  at org.apache.hadoop.fs.shell.Command.processPathArgument(Command.java:304)
	  at org.apache.hadoop.fs.shell.CommandWithDestination.processPathArgument(CommandWithDestination.java:257)
	  at org.apache.hadoop.fs.shell.Command.processArgument(Command.java:286)
	  at org.apache.hadoop.fs.shell.Command.processArguments(Command.java:270)
	  at org.apache.hadoop.fs.shell.CommandWithDestination.processArguments(CommandWithDestination.java:228)
	  at org.apache.hadoop.fs.shell.CopyCommands$Put.processArguments(CopyCommands.java:295)
	  at org.apache.hadoop.fs.shell.FsCommand.processRawArguments(FsCommand.java:122)
	  at org.apache.hadoop.fs.shell.Command.run(Command.java:177)
	  at org.apache.hadoop.fs.FsShell.run(FsShell.java:327)
	  at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76)
	  at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:90)
	  at cn.whbing.hadoop.ReadWriteTest.testWrite(ReadWriteTest.java:133)

可以看到writeChunk的时候由DFSStripedOutputStream实现:
在这里插入图片描述
我们看一下分配块的代码:

// DFSStripedOutputStream.java
private void allocateNewBlock() throws IOException {
    // 1.首次写chunk的时候,块组为空,走下边的逻辑;不为空则waitEndBlock
    if (currentBlockGroup != null) {
      for (int i = 0; i < numAllBlocks; i++) {
        // sync all the healthy streamers before writing to the new block
        waitEndBlocks(i);
      }
    }
    failedStreamers.clear();
    DatanodeInfo[] excludedNodes = getExcludedNodes();
    LOG.debug("Excluding DataNodes when allocating new block: "
        + Arrays.asList(excludedNodes));

    // replace failed streamers
    // 2.当前块组成为前一个块组,当前肯定为空
    ExtendedBlock prevBlockGroup = currentBlockGroup;
    if (prevBlockGroup4Append != null) {
      prevBlockGroup = prevBlockGroup4Append;
      prevBlockGroup4Append = null;
    }
    replaceFailedStreamers();

    LOG.debug("Allocating new block group. The previous block group: "
        + prevBlockGroup);
    final LocatedBlock lb = addBlock(excludedNodes, dfsClient, src,
         prevBlockGroup, fileId, favoredNodes, getAddBlockFlags());
    assert lb.isStriped();
    // assign the new block to the current block group
    currentBlockGroup = lb.getBlock();
    blockGroupIndex++;

    // 通过块组解析出块的数组
    final LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup(
        (LocatedStripedBlock) lb, cellSize, numDataBlocks,
        numAllBlocks - numDataBlocks);
    for (int i = 0; i < blocks.length; i++) {
      StripedDataStreamer si = getStripedDataStreamer(i);
      assert si.isHealthy();
      if (blocks[i] == null) {
        // allocBlock() should guarantee that all data blocks are successfully
        // allocated.
        assert i >= numDataBlocks;
        // Set exception and close streamer as there is no block locations
        // found for the parity block.
        LOG.warn("Cannot allocate parity block(index={}, policy={}). " +
            "Not enough datanodes? Exclude nodes={}", i,  ecPolicy.getName(),
            excludedNodes);
        si.getLastException().set(
            new IOException("Failed to get parity block, index=" + i));
        si.getErrorState().setInternalError();
        si.close(true);
      } else {
        // 协调器获得下一个块
        coordinator.getFollowingBlocks().offer(i, blocks[i]);
      }
    }
  }

在这里插入图片描述
块组信息:
在这里插入图片描述
通过块组获取到的block信息为:
在这里插入图片描述


我们继续看writeChunk的其他逻辑:

  @Override
  protected synchronized void writeChunk(byte[] bytes, int offset, int len,
      byte[] checksum, int ckoff, int cklen) throws IOException {
    final int index = getCurrentIndex();
    final int pos = cellBuffers.addTo(index, bytes, offset, len);
    final boolean cellFull = pos == cellSize;

    if (currentBlockGroup == null || shouldEndBlockGroup()) {
      // the incoming data should belong to a new block. Allocate a new block.
      allocateNewBlock(); // 分配块组,见上文代码
    }

    // len = 512字节,见下文截图
    currentBlockGroup.setNumBytes(currentBlockGroup.getNumBytes() + len);
    // note: the current streamer can be refreshed after allocating a new block
    final StripedDataStreamer current = getCurrentStreamer();
    if (current.isHealthy()) {
      try {
        super.writeChunk(bytes, offset, len, checksum, ckoff, cklen);
      } catch(Exception e) {
        handleCurrentStreamerFailure("offset=" + offset + ", length=" + len, e);
      }
    }

    // Two extra steps are needed when a striping cell is full:
    // 1. Forward the current index pointer
    // 2. Generate parity packets if a full stripe of data cells are present
    if (cellFull) {
      int next = index + 1;
      //When all data cells in a stripe are ready, we need to encode
      //them and generate some parity cells. These cells will be
      //converted to packets and put to their DataStreamer's queue.
      if (next == numDataBlocks) {
        cellBuffers.flipDataBuffers();
        writeParityCells();
        next = 0;

        // if this is the end of the block group, end each internal block
        if (shouldEndBlockGroup()) {
          flushAllInternals();
          checkStreamerFailures();
          for (int i = 0; i < numAllBlocks; i++) {
            final StripedDataStreamer s = setCurrentStreamer(i);
            if (s.isHealthy()) {
              try {
                endBlock();
              } catch (IOException ignored) {}
            }
          }
        } else {
          // check failure state for all the streamers. Bump GS if necessary
          checkStreamerFailures();
        }
      }
      setCurrentStreamer(next);
    }
  }

currentBlockGroup.setNumBytes时状态:
在这里插入图片描述
current = getCurrentStreamer();时获得当前sreamer:
在这里插入图片描述
其中的coordinator内容如下:
在这里插入图片描述
当cell没有满时,继续循环写cell,这时会不停往packet里写chunk:
在这里插入图片描述这时blockGroup的字节会循环地加512个字节。
在这里插入图片描述
看一下debug log如下:
在这里插入图片描述
入下:

2019-12-24 10:32:16,368 DEBUG hdfs.DFSOutputStream: Allocating new block group. The previous block group: null
2019-12-24 10:32:16,374 DEBUG ipc.Client: IPC Client (1388278453) connection to hadoop1/10.179.25.59:9000 from hadoop sending #5 org.apache.hadoop.hdfs.protocol.ClientProtocol.addBlock
2019-12-24 10:32:16,383 DEBUG ipc.Client: IPC Client (1388278453) connection to hadoop1/10.179.25.59:9000 from hadoop got value #5
2019-12-24 10:32:16,383 DEBUG ipc.ProtobufRpcEngine: Call: addBlock took 10ms
2019-12-24 10:32:16,400 DEBUG hdfs.DFSClient: WriteChunk allocating new packet seqno=0, src=/ec/write/hadoop3.tar.gz._COPYING_, packetSize=65016, chunksPerPacket=126, bytesCurBlock=0, DFSStripedOutputStream:#0: block==null
2019-12-24 10:32:16,403 DEBUG hdfs.DFSOutputStream: enqueue full packet seqno: 0 offsetInBlock: 0 lastPacketInBlock: false lastByteOffsetInBlock: 64512, src=/ec/write/hadoop3.tar.gz._COPYING_, bytesCurBlock=64512, blockSize=134217728, appendChunk=false, #0: block==null
2019-12-24 10:32:16,368 DEBUG hdfs.DFSOutputStream: Excluding DataNodes when allocating new block: []
2019-12-24 10:32:16,368 DEBUG hdfs.DFSOutputStream: Allocating new block group. The previous block group: null
2019-12-24 10:32:16,374 DEBUG ipc.Client: IPC Client (1388278453) connection to hadoop1/10.179.25.59:9000 from hadoop sending #5 org.apache.hadoop.hdfs.protocol.ClientProtocol.addBlock
2019-12-24 10:32:16,383 DEBUG ipc.Client: IPC Client (1388278453) connection to hadoop1/10.179.25.59:9000 from hadoop got value #5
2019-12-24 10:32:16,383 DEBUG ipc.ProtobufRpcEngine: Call: addBlock took 10ms
2019-12-24 10:32:16,400 DEBUG hdfs.DFSClient: WriteChunk allocating new packet seqno=0, src=/ec/write/hadoop3.tar.gz._COPYING_, packetSize=65016, chunksPerPacket=126, bytesCurBlock=0, DFSStripedOutputStream:#0: b
lock==null
2019-12-24 10:32:16,403 DEBUG hdfs.DFSOutputStream: enqueue full packet seqno: 0 offsetInBlock: 0 lastPacketInBlock: false lastByteOffsetInBlock: 64512, src=/ec/write/hadoop3.tar.gz._COPYING_, bytesCurBlock=64512
, blockSize=134217728, appendChunk=false, #0: block==null
2019-12-24 10:32:16,404 DEBUG hdfs.DataStreamer: Queued packet seqno: 0 offsetInBlock: 0 lastPacketInBlock: false lastByteOffsetInBlock: 64512, #0: block==null
2019-12-24 10:32:16,404 DEBUG hdfs.DFSClient: computePacketChunkSize: src=/ec/write/hadoop3.tar.gz._COPYING_, chunkSize=516, chunksPerPacket=126, packetSize=65016
2019-12-24 10:32:16,404 DEBUG hdfs.DataStreamer: stage=PIPELINE_SETUP_CREATE, #0: block==null
2019-12-24 10:32:16,404 DEBUG hdfs.DFSClient: WriteChunk allocating new packet seqno=1, src=/ec/write/hadoop3.tar.gz._COPYING_, packetSize=65016, chunksPerPacket=126, bytesCurBlock=64512, DFSStripedOutputStream:#0: block==null
2019-12-24 10:32:16,404 DEBUG hdfs.DataStreamer: Allocating new block: #0: block==null
2019-12-24 10:32:16,404 DEBUG hdfs.DataStreamer: pipeline = [DatanodeInfoWithStorage[10.179.17.22:9866,DS-0a5062e9-e555-4d80-9b51-9f64963787d7,DISK]], #0: blk_-9223372036854768096_3947
2019-12-24 10:32:16,404 DEBUG hdfs.DataStreamer: Connecting to datanode 10.179.17.22:9866
2019-12-24 10:32:16,405 DEBUG hdfs.DFSOutputStream: enqueue full packet seqno: 1 offsetInBlock: 64512 lastPacketInBlock: false lastByteOffsetInBlock: 129024, src=/ec/write/hadoop3.tar.gz._COPYING_, bytesCurBlock=129024, blockSize=134217728, appendChunk=false, #0: blk_-9223372036854768096_3947
2019-12-24 10:32:16,405 DEBUG hdfs.DataStreamer: Queued packet seqno: 1 offsetInBlock: 64512 lastPacketInBlock: false lastByteOffsetInBlock: 129024, #0: blk_-9223372036854768096_3947
2019-12-24 10:32:16,405 DEBUG hdfs.DFSClient: computePacketChunkSize: src=/ec/write/hadoop3.tar.gz._COPYING_, chunkSize=516, chunksPerPacket=126, packetSize=65016
2019-12-24 10:32:16,405 DEBUG hdfs.DataStreamer: Send buf size 43520
2019-12-24 10:32:16,406 DEBUG hdfs.DFSClient: WriteChunk allocating new packet seqno=2, src=/ec/write/hadoop3.tar.gz._COPYING_, packetSize=65016, chunksPerPacket=126, bytesCurBlock=129024, DFSStripedOutputStream:#0: blk_-9223372036854768096_3947
2019-12-24 10:32:16,406 DEBUG sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2019-12-24 10:32:16,406 DEBUG ipc.Client: IPC Client (1388278453) connection to hadoop1/10.179.25.59:9000 from hadoop sending #6 org.apache.hadoop.hdfs.protocol.ClientProtocol.getServerDefaults
2019-12-24 10:32:16,407 DEBUG hdfs.DFSOutputStream: enqueue full packet seqno: 2 offsetInBlock: 129024 lastPacketInBlock: false lastByteOffsetInBlock: 193536, src=/ec/write/hadoop3.tar.gz._COPYING_, bytesCurBlock=193536, blockSize=134217728, appendChunk=false, #0: blk_-9223372036854768096_3947
2019-12-24 10:32:16,407 DEBUG hdfs.DataStreamer: Queued packet seqno: 2 offsetInBlock: 129024 lastPacketInBlock: false lastByteOffsetInBlock: 193536, #0: blk_-9223372036854768096_3947
2019-12-24 10:32:16,407 DEBUG hdfs.DFSClient: computePacketChunkSize: src=/ec/write/hadoop3.tar.gz._COPYING_, chunkSize=516, chunksPerPacket=126, packetSize=65016
2019-12-24 10:32:16,407 DEBUG hdfs.DFSClient: WriteChunk allocating new packet seqno=3, src=/ec/write/hadoop3.tar.gz._COPYING_, packetSize=65016, chunksPerPacket=126, bytesCurBlock=193536, DFSStripedOutputStream:#0: blk_-9223372036854768096_3947
2019-12-24 10:32:16,407 DEBUG ipc.Client: IPC Client (1388278453) connection to hadoop1/10.179.25.59:9000 from hadoop got value #6
2019-12-24 10:32:16,407 DEBUG ipc.ProtobufRpcEngine: Call: getServerDefaults took 1ms
2019-12-24 10:32:16,408 DEBUG hdfs.DFSOutputStream: enqueue full packet seqno: 3 offsetInBlock: 193536 lastPacketInBlock: false lastByteOffsetInBlock: 258048, src=/ec/write/hadoop3.tar.gz._COPYING_, bytesCurBlock=258048, blockSize=134217728, appendChunk=false, #0: blk_-9223372036854768096_3947

在这里插入图片描述

  synchronized void enqueueCurrentPacketFull() throws IOException {
    LOG.debug("enqueue full {}, src={}, bytesCurBlock={}, blockSize={},"
            + " appendChunk={}, {}", currentPacket, src, getStreamer()
            .getBytesCurBlock(), blockSize, getStreamer().getAppendChunk(),
        getStreamer());
    enqueueCurrentPacket();
    adjustChunkBoundary();
    endBlock();
  }

如果packet满了,将这个满的packet入队列。如果到达块的大小了,则走最后一个packet逻辑。

Q:为什么分配一个black显示为null?

Allocating new block: #0: block==null

什么时候cellFull

在这里插入图片描述在这里插入图片描述
在这里插入图片描述
一次写512字节。
在这里插入图片描述

在这里插入图片描述

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值