一、寫數(shù)據(jù)流程圖
該流程主要是客戶端開始寫數(shù)據(jù),然后把數(shù)據(jù)切分多個(gè)chunk,多個(gè)chunk組成一個(gè)packet,發(fā)送到queue中,等待datanode寫數(shù)據(jù)。

二、客戶端FSDataOutputStream寫數(shù)據(jù)方法調(diào)用
1、FSDataOutputStream.writer寫方法
調(diào)用DFSOutputStream父類的FSOutputSummer.write方法
@Override
public void write(byte b[], int off, int len) throws IOException {
//2。 DFSOutputStream extends FSOutputSummer.write
out.write(b, off, len);
position += len; // update position
if (statistics != null) {
statistics.incrementBytesWritten(len);
}
}
三、客戶端DFSOutputStream寫數(shù)據(jù)流程
1、FSOutputSummer.writer寫方法
該類FSOutputSummer是DFSOutputStream的父類,客戶端調(diào)用都是DFSOutputStream實(shí)例
該方法檢查客戶端端狀態(tài)是否正常
/**
* Writes <code>len</code> bytes from the specified byte array
* starting at offset <code>off</code> and generate a checksum for
* each data chunk.
*
* <p> This method stores bytes from the given array into this
* stream's buffer before it gets checksumed. The buffer gets checksumed
* and flushed to the underlying output stream when all data
* in a checksum chunk are in the buffer. If the buffer is empty and
* requested length is at least as large as the size of next checksum chunk
* size, this method will checksum and write the chunk directly
* to the underlying output stream. Thus it avoids uneccessary data copy.
*
* @param b the data.
* @param off the start offset in the data.
* @param len the number of bytes to write.
* @exception IOException if an I/O error occurs.
*/
@Override
public synchronized void write(byte b[], int off, int len)
throws IOException {
//3
checkClosed();
if (off < 0 || len < 0 || off > b.length - len) {
throw new ArrayIndexOutOfBoundsException();
}
//len is block size
for (int n=0; n<len; n += write1(b, off+n, len-n)) {
}
}
2、FSOutputSummer.writer1
該方法最終調(diào)用writeChecksumChunks,包括flushBuffer也是調(diào)用writeChecksumChunks。寫chunk數(shù)據(jù)到packet中。
/**
* Write a portion of an array, flushing to the underlying
* stream at most once if necessary.
*/
private int write1(byte b[], int off, int len) throws IOException {
//buf: internal buffer for storing data before it is checksumed
//如果buffer為空并且寫入數(shù)據(jù)大于buffer長度(一個(gè)校驗(yàn)塊chunk大?。苯訉?shù)據(jù)與校驗(yàn)寫入IO中
if(count==0 && len>=buf.length) {
// local buffer is empty and user buffer size >= local buffer size, so
// simply checksum the user buffer and send it directly to the underlying
// stream
final int length = buf.length;//4608=512*9=sum.getBytesPerChecksum() * BUFFER_NUM_CHUNKS
//每次正好寫一個(gè)校驗(yàn)塊chunk大小
writeChecksumChunks(b, off, length);
return length;
}
// 當(dāng)數(shù)據(jù)小于本地?cái)?shù)據(jù)庫chunk時(shí)候,先寫入buf,當(dāng)buf寫滿之后,flushBuffer也執(zhí)行writeChecksumChunks
// copy user data to local buffer
int bytesToCopy = buf.length-count;
bytesToCopy = (len<bytesToCopy) ? len : bytesToCopy;
System.arraycopy(b, off, buf, count, bytesToCopy);
count += bytesToCopy;
if (count == buf.length) {
// local buffer is full
flushBuffer();
}
return bytesToCopy;
}
####3、FSOutputSummer.writeChecksumChunks
根據(jù)寫的數(shù)據(jù)大小,切分多個(gè)chunk分別由writeChunk寫。
/** Generate checksums for the given data chunks and output chunks & checksums
* to the underlying output stream.
*/
private void writeChecksumChunks(byte b[], int off, int len)
throws IOException {
//len=4068
sum.calculateChunkedSums(b, off, len, checksum, 0);
//每次正好寫一個(gè)校驗(yàn)塊chunk大小,len=4608,getBytesPerChecksum=512
for (int i = 0; i < len; i += sum.getBytesPerChecksum()) {
//chunkLen=512,和blocksize有什么數(shù)學(xué)計(jì)算關(guān)系?
int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i);
int ckOffset = i / sum.getBytesPerChecksum() * getChecksumSize();
writeChunk(b, off + i, chunkLen, checksum, ckOffset, getChecksumSize());
}
}
4、DFSOutputStream.writeChunk
調(diào)用writeChunkImpl處理
@Override
protected synchronized void writeChunk(byte[] b, int offset, int len,
byte[] checksum, int ckoff, int cklen) throws IOException {
TraceScope scope =
dfsClient.getPathTraceScope("DFSOutputStream#writeChunk", src);
try {
//len=512
writeChunkImpl(b, offset, len, checksum, ckoff, cklen);
} finally {
scope.close();
}
}
5、DFSOutputStream.writeChunkImpl
該方法主要將數(shù)據(jù)和校驗(yàn)和寫入packet中,如果packet寫滿了chunk或者達(dá)到blocksize就會(huì)將整個(gè)packet發(fā)送給dequeue隊(duì)列中,等待線程DataStreamer 發(fā)送,最后發(fā)生一個(gè)空的packet告訴DataStreamer已經(jīng)發(fā)送完成一個(gè)整的packet。
private synchronized void writeChunkImpl(byte[] b, int offset, int len,
byte[] checksum, int ckoff, int cklen) throws IOException {
dfsClient.checkOpen();
checkClosed();
// int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i);
// chunkLen = len
//寫的數(shù)據(jù)不能大于校驗(yàn)塊chunk大小,len=512,bytesPerChecksum=512
if (len > bytesPerChecksum) {
throw new IOException("writeChunk() buffer size is " + len +
" is larger than supported bytesPerChecksum " +
bytesPerChecksum);
}
//實(shí)際寫的校驗(yàn)和大小和給的值不一致
if (cklen != 0 && cklen != getChecksumSize()) {
throw new IOException("writeChunk() checksum size is supposed to be " +
getChecksumSize() + " but found to be " + cklen);
}
//當(dāng)前寫入的packet包為空則重新創(chuàng)建
if (currentPacket == null) {
//DFSPacket maxChunks=chunksPerPacket,第一次chunksPerPacket=126,后續(xù)就=1
currentPacket = createPacket(packetSize, chunksPerPacket,
bytesCurBlock, currentSeqno++, false);
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" +
currentPacket.getSeqno() +
", src=" + src +
", packetSize=" + packetSize +
", chunksPerPacket=" + chunksPerPacket +
", bytesCurBlock=" + bytesCurBlock);
}
}
//寫入校驗(yàn)和
currentPacket.writeChecksum(checksum, ckoff, cklen);
//寫實(shí)際數(shù)據(jù)
currentPacket.writeData(b, offset, len);
//chunk個(gè)數(shù)加一
currentPacket.incNumChunks();
//當(dāng)前block size累加len(512)
bytesCurBlock += len;
// If packet is full, enqueue it for transmission
//如果校驗(yàn)塊或者寫滿了block size 則將packet放到queue中。
// 會(huì)不會(huì)有不等于的情況發(fā)生?由于incNumChunks是加一操作,所以肯定會(huì)有currentPacket.getNumChunks() == currentPacket.getMaxChunks()
//blockSize=65536
//如果當(dāng)前bytesCurBlock大小大于默認(rèn)的blockSize怎么辦?這種情況好像出現(xiàn)不了
if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||
bytesCurBlock == blockSize) {
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("DFSClient writeChunk packet full seqno=" +
currentPacket.getSeqno() +
", src=" + src +
", bytesCurBlock=" + bytesCurBlock +
", blockSize=" + blockSize +
", appendChunk=" + appendChunk);
}
//將數(shù)據(jù)currentPacket放到隊(duì)列dataqueue中,等待線程DataStreamer 發(fā)送
waitAndQueueCurrentPacket();
// If the reopened file did not end at chunk boundary and the above
// write filled up its partial chunk. Tell the summer to generate full
// crc chunks from now on.
//默認(rèn)appendChunk false,如果chunk沒有寫滿則appendChunk=true,見DataStreamer構(gòu)造方法
if (appendChunk && bytesCurBlock%bytesPerChecksum == 0) {
appendChunk = false;
resetChecksumBufSize();
}
if (!appendChunk) {
//writePacketSize=dfs.client-write-packet-size默認(rèn)65536
int psize = Math.min((int)(blockSize-bytesCurBlock), dfsClient.getConf().writePacketSize);
System.out.println("psize="+psize);
//將chunksPerPacket重新計(jì)算packetsize
/*
final int bodySize = psize - PacketHeader.PKT_MAX_HEADER_LEN;
final int chunkSize = csize + getChecksumSize();
chunksPerPacket = Math.max(bodySize/chunkSize, 1);
packetSize = chunkSize*chunksPerPacket;
*/
computePacketChunkSize(psize, bytesPerChecksum);
}
//
// if encountering a block boundary, send an empty packet to
// indicate the end of block and reset bytesCurBlock.
//當(dāng)getNumChunks() == currentPacket.getMaxChunks時(shí)候bytesCurBlock=64512
//dataQueue:
//0 = {DFSPacket@7262} "packet seqno: 0 offsetInBlock: 0 lastPacketInBlock: false lastByteOffsetInBlock: 64512"
//1 = {DFSPacket@7263} "packet seqno: 1 offsetInBlock: 64512 lastPacketInBlock: false lastByteOffsetInBlock: 65024"
//2 = {DFSPacket@7188} "packet seqno: 2 offsetInBlock: 65024 lastPacketInBlock: false lastByteOffsetInBlock: 65536"
//當(dāng)block正好寫滿了,發(fā)送一個(gè)空packet
if (bytesCurBlock == blockSize) {
currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++, true);
currentPacket.setSyncBlock(shouldSyncBlock);
waitAndQueueCurrentPacket();
//重新賦值
bytesCurBlock = 0;
lastFlushOffset = 0;
}
}
}
6、DFSOutputStream.waitAndQueueCurrentPacket
判斷隊(duì)列是否已經(jīng)滿,如果滿了就等待,沒有滿就調(diào)用queueCurrentPacket方法將packet加入到隊(duì)列中。
private void waitAndQueueCurrentPacket() throws IOException {
synchronized (dataQueue) {
try {
// If queue is full, then wait till we have enough space
boolean firstWait = true;
try {
//如果dataQueue和ackQueue的長度大于dfs.client.write.max-packets-in-flight=80 就認(rèn)為queue滿了,就等待
while (!isClosed() && dataQueue.size() + ackQueue.size() >
dfsClient.getConf().writeMaxPackets) {
if (firstWait) {
Span span = Trace.currentSpan();
if (span != null) {
span.addTimelineAnnotation("dataQueue.wait");
}
firstWait = false;
}
try {
//在DataStream中等待notifyAll
dataQueue.wait();
} catch (InterruptedException e) {
// If we get interrupted while waiting to queue data, we still need to get rid
// of the current packet. This is because we have an invariant that if
// currentPacket gets full, it will get queued before the next writeChunk.
//
// Rather than wait around for space in the queue, we should instead try to
// return to the caller as soon as possible, even though we slightly overrun
// the MAX_PACKETS length.
Thread.currentThread().interrupt();
break;
}
}
} finally {
Span span = Trace.currentSpan();
if ((span != null) && (!firstWait)) {
span.addTimelineAnnotation("end.wait");
}
}
checkClosed();
//向隊(duì)列加入packet
queueCurrentPacket();
} catch (ClosedChannelException e) {
}
}
}
7、DFSOutputStream.queueCurrentPacket
將數(shù)據(jù)packet加入到LinkedList隊(duì)列中
private void queueCurrentPacket() {
synchronized (dataQueue) {
if (currentPacket == null) return;
currentPacket.addTraceParent(Trace.currentSpan());
//加入隊(duì)列
dataQueue.addLast(currentPacket);
lastQueuedSeqno = currentPacket.getSeqno();
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Queued packet " + currentPacket.getSeqno());
}
currentPacket = null;
dataQueue.notifyAll();
}
}