普通文件流使用的是DFSOutputStream
,EC文件使用的流式DFSStripedOutputStream
:
public class DFSStripedOutputStream extends DFSOutputStream
implements StreamCapabilities {...}
在FSOutputSummer
构造器中加断点,得到调用栈:
"main@1" prio=5 tid=0x1 nid=NA runnable
java.lang.Thread.State: RUNNABLE
at org.apache.hadoop.fs.FSOutputSummer.<init>(FSOutputSummer.java:53)
at org.apache.hadoop.hdfs.DFSOutputStream.<init>(DFSOutputStream.java:191)
at org.apache.hadoop.hdfs.DFSOutputStream.<init>(DFSOutputStream.java:247)
at org.apache.hadoop.hdfs.DFSStripedOutputStream.<init>(DFSStripedOutputStream.java:291)
at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:310)
at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1216)
at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1195)
at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1133)
at org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:530)
at org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:527)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:541)
at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:468)
at org.apache.hadoop.fs.FilterFileSystem.create(FilterFileSystem.java:193)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1194)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1174)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1063)
at org.apache.hadoop.fs.shell.CommandWithDestination$TargetFileSystem.create(CommandWithDestination.java:509)
at org.apache.hadoop.fs.shell.CommandWithDestination$TargetFileSystem.writeStreamToFile(CommandWithDestination.java:484)
at org.apache.hadoop.fs.shell.CommandWithDestination.copyStreamToTarget(CommandWithDestination.java:407)
at org.apache.hadoop.fs.shell.CommandWithDestination.copyFileToTarget(CommandWithDestination.java:342)
at org.apache.hadoop.fs.shell.CommandWithDestination.processPath(CommandWithDestination.java:277)
at org.apache.hadoop.fs.shell.CommandWithDestination.processPath(CommandWithDestination.java:262)
at org.apache.hadoop.fs.shell.Command.processPathInternal(Command.java:367)
at org.apache.hadoop.fs.shell.Command.processPaths(Command.java:331)
at org.apache.hadoop.fs.shell.Command.processPathArgument(Command.java:304)
at org.apache.hadoop.fs.shell.CommandWithDestination.processPathArgument(CommandWithDestination.java:257)
at org.apache.hadoop.fs.shell.Command.processArgument(Command.java:286)
at org.apache.hadoop.fs.shell.Command.processArguments(Command.java:270)
at org.apache.hadoop.fs.shell.CommandWithDestination.processArguments(CommandWithDestination.java:228)
at org.apache.hadoop.fs.shell.CopyCommands$Put.processArguments(CopyCommands.java:295)
at org.apache.hadoop.fs.shell.FsCommand.processRawArguments(FsCommand.java:122)
at org.apache.hadoop.fs.shell.Command.run(Command.java:177)
at org.apache.hadoop.fs.FsShell.run(FsShell.java:327)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:90)
at cn.whbing.hadoop.ReadWriteTest.testWrite(ReadWriteTest.java:132)
可以看到在DFSOutputStream中的create方法中在创建输出流的时候,对类型做了判断。
public DFSOutputStream create(String src, FsPermission permission,
EnumSet<CreateFlag> flag, boolean createParent, short replication,
long blockSize, Progressable progress, int buffersize,
ChecksumOpt checksumOpt, InetSocketAddress[] favoredNodes,
String ecPolicyName) throws IOException {
checkOpen();
final FsPermission masked = applyUMask(permission);
LOG.debug("{}: masked={}", src, masked);
final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
src, masked, flag, createParent, replication, blockSize, progress,
dfsClientConf.createChecksum(checksumOpt),
getFavoredNodesStr(favoredNodes), ecPolicyName);
beginFileLease(result.getFileId(), result);
return result;
}
继续看newStreamForCreate
:
final DFSOutputStream out;
if(stat.getErasureCodingPolicy() != null) {
out = new DFSStripedOutputStream(dfsClient, src, stat,
flag, progress, checksum, favoredNodes);
} else {
out = new DFSOutputStream(dfsClient, src, stat,
flag, progress, checksum, favoredNodes, true);
}
out.start();
1.org.apache.hadoop.hdfs.DFSStripedOutputStream.<init>(DFSStripedOutputStream.java:291)
对于createFile
和appendFile
,相对三副本,代码如下:
// 创建一个新文件
/** Construct a new output stream for creating a file. */
DFSStripedOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
EnumSet<CreateFlag> flag, Progressable progress,
DataChecksum checksum, String[] favoredNodes)
throws IOException {
super(dfsClient, src, stat, flag, progress, checksum, favoredNodes, false);
if (LOG.isDebugEnabled()) {
LOG.debug("Creating DFSStripedOutputStream for " + src);
}
// 在创建一个新的文件的stream时,相比三副本,多了一些参数。
ecPolicy = stat.getErasureCodingPolicy();
final int numParityBlocks = ecPolicy.getNumParityUnits();
cellSize = ecPolicy.getCellSize();
numDataBlocks = ecPolicy.getNumDataUnits();
numAllBlocks = numDataBlocks + numParityBlocks;
this.favoredNodes = favoredNodes;
failedStreamers = new ArrayList<>();
corruptBlockCountMap = new LinkedHashMap<>();
flushAllExecutor = Executors.newFixedThreadPool(numAllBlocks);
flushAllExecutorCompletionService = new
ExecutorCompletionService<>(flushAllExecutor);
ErasureCoderOptions coderOptions = new ErasureCoderOptions(
numDataBlocks, numParityBlocks);
encoder = CodecUtil.createRawEncoder(dfsClient.getConfiguration(),
ecPolicy.getCodecName(), coderOptions);
coordinator = new Coordinator(numAllBlocks);
cellBuffers = new CellBuffers(numParityBlocks);
streamers = new ArrayList<>(numAllBlocks);
for (short i = 0; i < numAllBlocks; i++) {
StripedDataStreamer streamer = new StripedDataStreamer(stat,
dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager,
favoredNodes, i, coordinator, getAddBlockFlags());
streamers.add(streamer);
}
currentPackets = new DFSPacket[streamers.size()];
setCurrentStreamer(0);
}
// 追加写
/** Construct a new output stream for appending to a file. */
DFSStripedOutputStream(DFSClient dfsClient, String src,
EnumSet<CreateFlag> flags, Progressable progress, LocatedBlock lastBlock,
HdfsFileStatus stat, DataChecksum checksum, String[] favoredNodes)
throws IOException {
this(dfsClient, src, stat, flags, progress, checksum, favoredNodes);
initialFileSize = stat.getLen(); // length of file when opened
prevBlockGroup4Append = lastBlock != null ? lastBlock.getBlock() : null;
}
我们看一下这些字段的作用:
1.ecPolicy
:ErasureCodingPolicy ecPolicy = null; 默认为null,不使用ec,就是三副本。
2.cellSize
:每个cell的大小,必须为1024整数倍。
Preconditions.checkArgument(cellSize > 0, "cellSize must be positive");
Preconditions.checkArgument(cellSize % 1024 == 0,
"cellSize must be 1024 aligned");
3.encoder
:编码工具。
4.streamers
:在ec中,streamers
是一个list,即多个streamer。个数同数据块的个数。
streamers = new ArrayList<>(numAllBlocks);
for (short i = 0; i < numAllBlocks; i++) {
StripedDataStreamer streamer = new StripedDataStreamer(stat,
dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager,
favoredNodes, i, coordinator, getAddBlockFlags());
streamers.add(streamer);
}
5.currentPackets
:currentPackets = new DFSPacket[streamers.size()];,每个streamer都有自己的packet。
三个内部类承担的角色如下:
- 数据块记为:Data
- 校验块记为:Parity
链接:HDFS EC:将纠删码技术融入HDFS
- 对HDFS的一个普通文件来说,构成它的基本单位是块。
- 对于EC模式下的文件,构成它的基本单位为块组。
- 在EC模式下,构成文件的基本单位为块组,因此首先需要考虑的是如何在NameNode里保存每个文件的块组信息。一种比较直接的方法是给每个块组分配一个块ID,同时用一个Map来记录这个ID与块组信息的映射,每个块组信息包含了每个内部块的信息。对小文件来说,这种做法将增加其在NameNode中的内存消耗。以RS(6,3)为例,如果一个文件比6个块略小些,那么NameNode必须为它维护10个ID(1个块组ID、6个数据块ID和3个校验块ID)。在小文件数目占优的情况下,NameNode的内存使用将面临考验。
调用栈:
java.lang.Thread.State: RUNNABLE
at org.apache.hadoop.hdfs.DFSStripedOutputStream.allocateNewBlock(DFSStripedOutputStream.java:474)
at org.apache.hadoop.hdfs.DFSStripedOutputStream.writeChunk(DFSStripedOutputStream.java:541)
- locked <0xa87> (a org.apache.hadoop.hdfs.DFSStripedOutputStream)
at org.apache.hadoop.fs.FSOutputSummer.writeChecksumChunks(FSOutputSummer.java:217)
at org.apache.hadoop.fs.FSOutputSummer.write1(FSOutputSummer.java:125)
at org.apache.hadoop.fs.FSOutputSummer.write(FSOutputSummer.java:111)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:57)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
- locked <0xa89> (a org.apache.hadoop.hdfs.client.HdfsDataOutputStream)
at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:96)
at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:68)
at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:129)
at org.apache.hadoop.fs.shell.CommandWithDestination$TargetFileSystem.writeStreamToFile(CommandWithDestination.java:485)
at org.apache.hadoop.fs.shell.CommandWithDestination.copyStreamToTarget(CommandWithDestination.java:407)
at org.apache.hadoop.fs.shell.CommandWithDestination.copyFileToTarget(CommandWithDestination.java:342)
at org.apache.hadoop.fs.shell.CommandWithDestination.processPath(CommandWithDestination.java:277)
at org.apache.hadoop.fs.shell.CommandWithDestination.processPath(CommandWithDestination.java:262)
at org.apache.hadoop.fs.shell.Command.processPathInternal(Command.java:367)
at org.apache.hadoop.fs.shell.Command.processPaths(Command.java:331)
at org.apache.hadoop.fs.shell.Command.processPathArgument(Command.java:304)
at org.apache.hadoop.fs.shell.CommandWithDestination.processPathArgument(CommandWithDestination.java:257)
at org.apache.hadoop.fs.shell.Command.processArgument(Command.java:286)
at org.apache.hadoop.fs.shell.Command.processArguments(Command.java:270)
at org.apache.hadoop.fs.shell.CommandWithDestination.processArguments(CommandWithDestination.java:228)
at org.apache.hadoop.fs.shell.CopyCommands$Put.processArguments(CopyCommands.java:295)
at org.apache.hadoop.fs.shell.FsCommand.processRawArguments(FsCommand.java:122)
at org.apache.hadoop.fs.shell.Command.run(Command.java:177)
at org.apache.hadoop.fs.FsShell.run(FsShell.java:327)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:90)
at cn.whbing.hadoop.ReadWriteTest.testWrite(ReadWriteTest.java:133)
可以看到writeChunk
的时候由DFSStripedOutputStream
实现:
我们看一下分配块的代码:
// DFSStripedOutputStream.java
private void allocateNewBlock() throws IOException {
// 1.首次写chunk的时候,块组为空,走下边的逻辑;不为空则waitEndBlock
if (currentBlockGroup != null) {
for (int i = 0; i < numAllBlocks; i++) {
// sync all the healthy streamers before writing to the new block
waitEndBlocks(i);
}
}
failedStreamers.clear();
DatanodeInfo[] excludedNodes = getExcludedNodes();
LOG.debug("Excluding DataNodes when allocating new block: "
+ Arrays.asList(excludedNodes));
// replace failed streamers
// 2.当前块组成为前一个块组,当前肯定为空
ExtendedBlock prevBlockGroup = currentBlockGroup;
if (prevBlockGroup4Append != null) {
prevBlockGroup = prevBlockGroup4Append;
prevBlockGroup4Append = null;
}
replaceFailedStreamers();
LOG.debug("Allocating new block group. The previous block group: "
+ prevBlockGroup);
final LocatedBlock lb = addBlock(excludedNodes, dfsClient, src,
prevBlockGroup, fileId, favoredNodes, getAddBlockFlags());
assert lb.isStriped();
// assign the new block to the current block group
currentBlockGroup = lb.getBlock();
blockGroupIndex++;
// 通过块组解析出块的数组
final LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup(
(LocatedStripedBlock) lb, cellSize, numDataBlocks,
numAllBlocks - numDataBlocks);
for (int i = 0; i < blocks.length; i++) {
StripedDataStreamer si = getStripedDataStreamer(i);
assert si.isHealthy();
if (blocks[i] == null) {
// allocBlock() should guarantee that all data blocks are successfully
// allocated.
assert i >= numDataBlocks;
// Set exception and close streamer as there is no block locations
// found for the parity block.
LOG.warn("Cannot allocate parity block(index={}, policy={}). " +
"Not enough datanodes? Exclude nodes={}", i, ecPolicy.getName(),
excludedNodes);
si.getLastException().set(
new IOException("Failed to get parity block, index=" + i));
si.getErrorState().setInternalError();
si.close(true);
} else {
// 协调器获得下一个块
coordinator.getFollowingBlocks().offer(i, blocks[i]);
}
}
}
块组信息:
通过块组获取到的block
信息为:
我们继续看writeChunk的其他逻辑:
@Override
protected synchronized void writeChunk(byte[] bytes, int offset, int len,
byte[] checksum, int ckoff, int cklen) throws IOException {
final int index = getCurrentIndex();
final int pos = cellBuffers.addTo(index, bytes, offset, len);
final boolean cellFull = pos == cellSize;
if (currentBlockGroup == null || shouldEndBlockGroup()) {
// the incoming data should belong to a new block. Allocate a new block.
allocateNewBlock(); // 分配块组,见上文代码
}
// len = 512字节,见下文截图
currentBlockGroup.setNumBytes(currentBlockGroup.getNumBytes() + len);
// note: the current streamer can be refreshed after allocating a new block
final StripedDataStreamer current = getCurrentStreamer();
if (current.isHealthy()) {
try {
super.writeChunk(bytes, offset, len, checksum, ckoff, cklen);
} catch(Exception e) {
handleCurrentStreamerFailure("offset=" + offset + ", length=" + len, e);
}
}
// Two extra steps are needed when a striping cell is full:
// 1. Forward the current index pointer
// 2. Generate parity packets if a full stripe of data cells are present
if (cellFull) {
int next = index + 1;
//When all data cells in a stripe are ready, we need to encode
//them and generate some parity cells. These cells will be
//converted to packets and put to their DataStreamer's queue.
if (next == numDataBlocks) {
cellBuffers.flipDataBuffers();
writeParityCells();
next = 0;
// if this is the end of the block group, end each internal block
if (shouldEndBlockGroup()) {
flushAllInternals();
checkStreamerFailures();
for (int i = 0; i < numAllBlocks; i++) {
final StripedDataStreamer s = setCurrentStreamer(i);
if (s.isHealthy()) {
try {
endBlock();
} catch (IOException ignored) {}
}
}
} else {
// check failure state for all the streamers. Bump GS if necessary
checkStreamerFailures();
}
}
setCurrentStreamer(next);
}
}
currentBlockGroup.setNumBytes
时状态:
current = getCurrentStreamer();
时获得当前sreamer
:
其中的coordinator
内容如下:
当cell没有满时,继续循环写cell,这时会不停往packet里写chunk:
这时blockGroup的字节会循环地加
512
个字节。
看一下debug log如下:
入下:
2019-12-24 10:32:16,368 DEBUG hdfs.DFSOutputStream: Allocating new block group. The previous block group: null
2019-12-24 10:32:16,374 DEBUG ipc.Client: IPC Client (1388278453) connection to hadoop1/10.179.25.59:9000 from hadoop sending #5 org.apache.hadoop.hdfs.protocol.ClientProtocol.addBlock
2019-12-24 10:32:16,383 DEBUG ipc.Client: IPC Client (1388278453) connection to hadoop1/10.179.25.59:9000 from hadoop got value #5
2019-12-24 10:32:16,383 DEBUG ipc.ProtobufRpcEngine: Call: addBlock took 10ms
2019-12-24 10:32:16,400 DEBUG hdfs.DFSClient: WriteChunk allocating new packet seqno=0, src=/ec/write/hadoop3.tar.gz._COPYING_, packetSize=65016, chunksPerPacket=126, bytesCurBlock=0, DFSStripedOutputStream:#0: block==null
2019-12-24 10:32:16,403 DEBUG hdfs.DFSOutputStream: enqueue full packet seqno: 0 offsetInBlock: 0 lastPacketInBlock: false lastByteOffsetInBlock: 64512, src=/ec/write/hadoop3.tar.gz._COPYING_, bytesCurBlock=64512, blockSize=134217728, appendChunk=false, #0: block==null
2019-12-24 10:32:16,368 DEBUG hdfs.DFSOutputStream: Excluding DataNodes when allocating new block: []
2019-12-24 10:32:16,368 DEBUG hdfs.DFSOutputStream: Allocating new block group. The previous block group: null
2019-12-24 10:32:16,374 DEBUG ipc.Client: IPC Client (1388278453) connection to hadoop1/10.179.25.59:9000 from hadoop sending #5 org.apache.hadoop.hdfs.protocol.ClientProtocol.addBlock
2019-12-24 10:32:16,383 DEBUG ipc.Client: IPC Client (1388278453) connection to hadoop1/10.179.25.59:9000 from hadoop got value #5
2019-12-24 10:32:16,383 DEBUG ipc.ProtobufRpcEngine: Call: addBlock took 10ms
2019-12-24 10:32:16,400 DEBUG hdfs.DFSClient: WriteChunk allocating new packet seqno=0, src=/ec/write/hadoop3.tar.gz._COPYING_, packetSize=65016, chunksPerPacket=126, bytesCurBlock=0, DFSStripedOutputStream:#0: b
lock==null
2019-12-24 10:32:16,403 DEBUG hdfs.DFSOutputStream: enqueue full packet seqno: 0 offsetInBlock: 0 lastPacketInBlock: false lastByteOffsetInBlock: 64512, src=/ec/write/hadoop3.tar.gz._COPYING_, bytesCurBlock=64512
, blockSize=134217728, appendChunk=false, #0: block==null
2019-12-24 10:32:16,404 DEBUG hdfs.DataStreamer: Queued packet seqno: 0 offsetInBlock: 0 lastPacketInBlock: false lastByteOffsetInBlock: 64512, #0: block==null
2019-12-24 10:32:16,404 DEBUG hdfs.DFSClient: computePacketChunkSize: src=/ec/write/hadoop3.tar.gz._COPYING_, chunkSize=516, chunksPerPacket=126, packetSize=65016
2019-12-24 10:32:16,404 DEBUG hdfs.DataStreamer: stage=PIPELINE_SETUP_CREATE, #0: block==null
2019-12-24 10:32:16,404 DEBUG hdfs.DFSClient: WriteChunk allocating new packet seqno=1, src=/ec/write/hadoop3.tar.gz._COPYING_, packetSize=65016, chunksPerPacket=126, bytesCurBlock=64512, DFSStripedOutputStream:#0: block==null
2019-12-24 10:32:16,404 DEBUG hdfs.DataStreamer: Allocating new block: #0: block==null
2019-12-24 10:32:16,404 DEBUG hdfs.DataStreamer: pipeline = [DatanodeInfoWithStorage[10.179.17.22:9866,DS-0a5062e9-e555-4d80-9b51-9f64963787d7,DISK]], #0: blk_-9223372036854768096_3947
2019-12-24 10:32:16,404 DEBUG hdfs.DataStreamer: Connecting to datanode 10.179.17.22:9866
2019-12-24 10:32:16,405 DEBUG hdfs.DFSOutputStream: enqueue full packet seqno: 1 offsetInBlock: 64512 lastPacketInBlock: false lastByteOffsetInBlock: 129024, src=/ec/write/hadoop3.tar.gz._COPYING_, bytesCurBlock=129024, blockSize=134217728, appendChunk=false, #0: blk_-9223372036854768096_3947
2019-12-24 10:32:16,405 DEBUG hdfs.DataStreamer: Queued packet seqno: 1 offsetInBlock: 64512 lastPacketInBlock: false lastByteOffsetInBlock: 129024, #0: blk_-9223372036854768096_3947
2019-12-24 10:32:16,405 DEBUG hdfs.DFSClient: computePacketChunkSize: src=/ec/write/hadoop3.tar.gz._COPYING_, chunkSize=516, chunksPerPacket=126, packetSize=65016
2019-12-24 10:32:16,405 DEBUG hdfs.DataStreamer: Send buf size 43520
2019-12-24 10:32:16,406 DEBUG hdfs.DFSClient: WriteChunk allocating new packet seqno=2, src=/ec/write/hadoop3.tar.gz._COPYING_, packetSize=65016, chunksPerPacket=126, bytesCurBlock=129024, DFSStripedOutputStream:#0: blk_-9223372036854768096_3947
2019-12-24 10:32:16,406 DEBUG sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2019-12-24 10:32:16,406 DEBUG ipc.Client: IPC Client (1388278453) connection to hadoop1/10.179.25.59:9000 from hadoop sending #6 org.apache.hadoop.hdfs.protocol.ClientProtocol.getServerDefaults
2019-12-24 10:32:16,407 DEBUG hdfs.DFSOutputStream: enqueue full packet seqno: 2 offsetInBlock: 129024 lastPacketInBlock: false lastByteOffsetInBlock: 193536, src=/ec/write/hadoop3.tar.gz._COPYING_, bytesCurBlock=193536, blockSize=134217728, appendChunk=false, #0: blk_-9223372036854768096_3947
2019-12-24 10:32:16,407 DEBUG hdfs.DataStreamer: Queued packet seqno: 2 offsetInBlock: 129024 lastPacketInBlock: false lastByteOffsetInBlock: 193536, #0: blk_-9223372036854768096_3947
2019-12-24 10:32:16,407 DEBUG hdfs.DFSClient: computePacketChunkSize: src=/ec/write/hadoop3.tar.gz._COPYING_, chunkSize=516, chunksPerPacket=126, packetSize=65016
2019-12-24 10:32:16,407 DEBUG hdfs.DFSClient: WriteChunk allocating new packet seqno=3, src=/ec/write/hadoop3.tar.gz._COPYING_, packetSize=65016, chunksPerPacket=126, bytesCurBlock=193536, DFSStripedOutputStream:#0: blk_-9223372036854768096_3947
2019-12-24 10:32:16,407 DEBUG ipc.Client: IPC Client (1388278453) connection to hadoop1/10.179.25.59:9000 from hadoop got value #6
2019-12-24 10:32:16,407 DEBUG ipc.ProtobufRpcEngine: Call: getServerDefaults took 1ms
2019-12-24 10:32:16,408 DEBUG hdfs.DFSOutputStream: enqueue full packet seqno: 3 offsetInBlock: 193536 lastPacketInBlock: false lastByteOffsetInBlock: 258048, src=/ec/write/hadoop3.tar.gz._COPYING_, bytesCurBlock=258048, blockSize=134217728, appendChunk=false, #0: blk_-9223372036854768096_3947
synchronized void enqueueCurrentPacketFull() throws IOException {
LOG.debug("enqueue full {}, src={}, bytesCurBlock={}, blockSize={},"
+ " appendChunk={}, {}", currentPacket, src, getStreamer()
.getBytesCurBlock(), blockSize, getStreamer().getAppendChunk(),
getStreamer());
enqueueCurrentPacket();
adjustChunkBoundary();
endBlock();
}
如果packet满了,将这个满的packet入队列。如果到达块的大小了,则走最后一个packet逻辑。
Q:为什么分配一个black显示为null?
Allocating new block: #0: block==null
什么时候cellFull
呢
一次写512字节。