
写chunk的入口:
"main@1" prio=5 tid=0x1 nid=NA runnable
java.lang.Thread.State: RUNNABLE
at org.apache.hadoop.fs.FSOutputSummer.write(FSOutputSummer.java:104)
- locked <0xaea> (a org.apache.hadoop.hdfs.DFSOutputStream)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
- locked <0xaec> (a org.apache.hadoop.hdfs.client.HdfsDataOutputStream)
at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:87)
at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:59)
at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:119)
at org.apache.hadoop.fs.shell.CommandWithDestination$TargetFileSystem.writeStreamToFile(CommandWithDestination.java:466)
at org.apache.hadoop.fs.shell.CommandWithDestination.copyStreamToTarget(CommandWithDestination.java:391)
at org.apache.hadoop.fs.shell.CommandWithDestination.copyFileToTarget(CommandWithDestination.java:328)
at org.apache.hadoop.fs.shell.CommandWithDestination.processPath(CommandWithDestination.java:263)
at org.apache.hadoop.fs.shell.CommandWithDestination.processPath(CommandWithDestination.java:248)
at org.apache.hadoop.fs.shell.Command.processPaths(Command.java:319)
at org.apache.hadoop.fs.shell.Command.processPathArgument(Command.java:291)
at org.apache.hadoop.fs.shell.CommandWithDestination.processPathArgument(CommandWithDestination.java:243)
at org.apache.hadoop.fs.shell.Command.processArgument(Command.java:273)
at org.apache.hadoop.fs.shell.Command.processArguments(Command.java:257)
at org.apache.hadoop.fs.shell.CommandWithDestination.processArguments(CommandWithDestination.java:220)
at org.apache.hadoop.fs.shell.CopyCommands$Put.processArguments(CopyCommands.java:267)
at org.apache.hadoop.fs.shell.Command.processRawArguments(Command.java:203)
at org.apache.hadoop.fs.shell.Command.run(Command.java:167)
at org.apache.hadoop.fs.FsShell.run(FsShell.java:287)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
at cn.whbing.hadoop.ReadWriteTest.testWrite(ReadWriteTest.java:132)
FSOutputSummer的write(b,off,len)实质上就一行:
for (int n=0;n<len;n+=write1(b, off+n, len-n)) {
}
循环调用write1写入数据。
(1) write
(2)
private synchronized void writeChunkImpl(byte[] b, int offset, int len,
byte[] checksum, int ckoff, int cklen) throws IOException {
dfsClient.checkOpen();
checkClosed();
if (len > bytesPerChecksum) {
throw new IOException("writeChunk() buffer size is " + len +
" is larger than supported bytesPerChecksum " +
bytesPerChecksum);
}
if (cklen != 0 && cklen != getChecksumSize()) {
throw new IOException("writeChunk() checksum size is supposed to be " +
getChecksumSize() + " but found to be " + cklen);
}
if (currentPacket == null) { // 当前Packet为空,则创建一个Packet
currentPacket = createPacket(packetSize, chunksPerPacket,
bytesCurBlock, currentSeqno++, false);
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" +
currentPacket.getSeqno() +
", src=" + src +
", packetSize=" + packetSize +
", chunksPerPacket=" + chunksPerPacket +
", bytesCurBlock=" + bytesCurBlock);
}
}
// 写入chunk至Packet
currentPacket.writeChecksum(checksum, ckoff, cklen);
currentPacket.writeData(b, offset, len);
currentPacket.incNumChunks();
bytesCurBlock += len;
// If packet is full, enqueue it for transmission
//
if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||
bytesCurBlock == blockSize) {
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("DFSClient writeChunk packet full seqno=" +
currentPacket.getSeqno() +
", src=" + src +
", bytesCurBlock=" + bytesCurBlock +
", blockSize=" + blockSize +
", appendChunk=" + appendChunk);
}
waitAndQueueCurrentPacket();
// If the reopened file did not end at chunk boundary and the above
// write filled up its partial chunk. Tell the summer to generate full
// crc chunks from now on.
if (appendChunk && bytesCurBlock%bytesPerChecksum == 0) {
appendChunk = false;
resetChecksumBufSize();
}
if (!appendChunk) {
int psize = Math.min((int)(blockSize-bytesCurBlock), dfsClient.getConf().writePacketSize);
computePacketChunkSize(psize, bytesPerChecksum);
}
//
// if encountering a block boundary, send an empty packet to
// indicate the end of block and reset bytesCurBlock.
//
if (bytesCurBlock == blockSize) {
currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++, true);
currentPacket.setSyncBlock(shouldSyncBlock);
waitAndQueueCurrentPacket();
bytesCurBlock = 0;
lastFlushOffset = 0;
}
}
}
写chunk的实现:
- 当前数据包Packet为空,则创建一个Packet
- 写入chunk至Packet
currentPacket.writeChecksum(checksum, ckoff, cklen);
currentPacket.writeData(b, offset, len);
currentPacket.incNumChunks();
bytesCurBlock += len;
- write1方法会去检查是否写入的数据满了一个chunk(正常情况下512Bytes),如果满了,则为这个chunk计算一个checksum,4个字节,然后将这个chunk和对应的checksum写入当前Packet中(DFSOutputStream的writeChunk方法),格式就是上面那个图中的格式。当Packet满了,也就是说塞入的chunk的个数到达了预先计算的值,就将这个packet放入dataQueue,后台会有一个DataStreamer线程专门从这个dataQueue中取一个个的packet发送出去。