hdfs寫之寫數(shù)據(jù)<二>

一、寫數(shù)據(jù)流程圖

該流程主要是客戶端開始寫數(shù)據(jù),然后把數(shù)據(jù)切分多個(gè)chunk,多個(gè)chunk組成一個(gè)packet,發(fā)送到queue中,等待datanode寫數(shù)據(jù)。

image.png

二、客戶端FSDataOutputStream寫數(shù)據(jù)方法調(diào)用

1、FSDataOutputStream.writer寫方法

調(diào)用DFSOutputStream父類的FSOutputSummer.write方法

    @Override
    public void write(byte b[], int off, int len) throws IOException {
      //2。 DFSOutputStream extends FSOutputSummer.write
      out.write(b, off, len);
      position += len;                            // update position
      if (statistics != null) {
        statistics.incrementBytesWritten(len);
      }
    }

三、客戶端DFSOutputStream寫數(shù)據(jù)流程

1、FSOutputSummer.writer寫方法

該類FSOutputSummer是DFSOutputStream的父類,客戶端調(diào)用都是DFSOutputStream實(shí)例

該方法檢查客戶端端狀態(tài)是否正常

 /**
   * Writes <code>len</code> bytes from the specified byte array 
   * starting at offset <code>off</code> and generate a checksum for
   * each data chunk.
   *
   * <p> This method stores bytes from the given array into this
   * stream's buffer before it gets checksumed. The buffer gets checksumed 
   * and flushed to the underlying output stream when all data 
   * in a checksum chunk are in the buffer.  If the buffer is empty and
   * requested length is at least as large as the size of next checksum chunk
   * size, this method will checksum and write the chunk directly 
   * to the underlying output stream.  Thus it avoids uneccessary data copy.
   *
   * @param      b     the data.
   * @param      off   the start offset in the data.
   * @param      len   the number of bytes to write.
   * @exception  IOException  if an I/O error occurs.
   */
  @Override
  public synchronized void write(byte b[], int off, int len)
      throws IOException {
    //3
    checkClosed();
    
    if (off < 0 || len < 0 || off > b.length - len) {
      throw new ArrayIndexOutOfBoundsException();
    }
    //len is block size
    for (int n=0; n<len; n += write1(b, off+n, len-n)) {
    }
  }

2、FSOutputSummer.writer1

該方法最終調(diào)用writeChecksumChunks,包括flushBuffer也是調(diào)用writeChecksumChunks。寫chunk數(shù)據(jù)到packet中。

/**
   * Write a portion of an array, flushing to the underlying
   * stream at most once if necessary.
   */
  private int write1(byte b[], int off, int len) throws IOException {
    //buf: internal buffer for storing data before it is checksumed
    //如果buffer為空并且寫入數(shù)據(jù)大于buffer長度(一個(gè)校驗(yàn)塊chunk大?。苯訉?shù)據(jù)與校驗(yàn)寫入IO中
    if(count==0 && len>=buf.length) {
      // local buffer is empty and user buffer size >= local buffer size, so
      // simply checksum the user buffer and send it directly to the underlying
      // stream
      final int length = buf.length;//4608=512*9=sum.getBytesPerChecksum() * BUFFER_NUM_CHUNKS
      //每次正好寫一個(gè)校驗(yàn)塊chunk大小
      writeChecksumChunks(b, off, length);
      return length;
    }
    // 當(dāng)數(shù)據(jù)小于本地?cái)?shù)據(jù)庫chunk時(shí)候,先寫入buf,當(dāng)buf寫滿之后,flushBuffer也執(zhí)行writeChecksumChunks
    // copy user data to local buffer
    int bytesToCopy = buf.length-count;
    bytesToCopy = (len<bytesToCopy) ? len : bytesToCopy;
    System.arraycopy(b, off, buf, count, bytesToCopy);
    count += bytesToCopy;
    if (count == buf.length) {
      // local buffer is full
      flushBuffer();
    } 
    return bytesToCopy;
  }

####3、FSOutputSummer.writeChecksumChunks


根據(jù)寫的數(shù)據(jù)大小,切分多個(gè)chunk分別由writeChunk寫。

  /** Generate checksums for the given data chunks and output chunks & checksums
   * to the underlying output stream.
   */
  private void writeChecksumChunks(byte b[], int off, int len)
  throws IOException {
    //len=4068
    sum.calculateChunkedSums(b, off, len, checksum, 0);
    //每次正好寫一個(gè)校驗(yàn)塊chunk大小,len=4608,getBytesPerChecksum=512
    for (int i = 0; i < len; i += sum.getBytesPerChecksum()) {
      //chunkLen=512,和blocksize有什么數(shù)學(xué)計(jì)算關(guān)系?
      int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i);
      int ckOffset = i / sum.getBytesPerChecksum() * getChecksumSize();
      writeChunk(b, off + i, chunkLen, checksum, ckOffset, getChecksumSize());
    }
  }

4、DFSOutputStream.writeChunk

調(diào)用writeChunkImpl處理

  @Override
  protected synchronized void writeChunk(byte[] b, int offset, int len,
      byte[] checksum, int ckoff, int cklen) throws IOException {
    TraceScope scope =
        dfsClient.getPathTraceScope("DFSOutputStream#writeChunk", src);
    try {
      //len=512
      writeChunkImpl(b, offset, len, checksum, ckoff, cklen);
    } finally {
      scope.close();
    }
  }

5、DFSOutputStream.writeChunkImpl

該方法主要將數(shù)據(jù)和校驗(yàn)和寫入packet中,如果packet寫滿了chunk或者達(dá)到blocksize就會(huì)將整個(gè)packet發(fā)送給dequeue隊(duì)列中,等待線程DataStreamer 發(fā)送,最后發(fā)生一個(gè)空的packet告訴DataStreamer已經(jīng)發(fā)送完成一個(gè)整的packet。

  private synchronized void writeChunkImpl(byte[] b, int offset, int len,
          byte[] checksum, int ckoff, int cklen) throws IOException {
    dfsClient.checkOpen();
    checkClosed();

    //      int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i);
    // chunkLen = len
    //寫的數(shù)據(jù)不能大于校驗(yàn)塊chunk大小,len=512,bytesPerChecksum=512
    if (len > bytesPerChecksum) {
      throw new IOException("writeChunk() buffer size is " + len +
                            " is larger than supported  bytesPerChecksum " +
                            bytesPerChecksum);
    }
    //實(shí)際寫的校驗(yàn)和大小和給的值不一致
    if (cklen != 0 && cklen != getChecksumSize()) {
      throw new IOException("writeChunk() checksum size is supposed to be " +
                            getChecksumSize() + " but found to be " + cklen);
    }
    //當(dāng)前寫入的packet包為空則重新創(chuàng)建
    if (currentPacket == null) {
      //DFSPacket maxChunks=chunksPerPacket,第一次chunksPerPacket=126,后續(xù)就=1
      currentPacket = createPacket(packetSize, chunksPerPacket, 
          bytesCurBlock, currentSeqno++, false);
      if (DFSClient.LOG.isDebugEnabled()) {
        DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" + 
            currentPacket.getSeqno() +
            ", src=" + src +
            ", packetSize=" + packetSize +
            ", chunksPerPacket=" + chunksPerPacket +
            ", bytesCurBlock=" + bytesCurBlock);
      }
    }
    //寫入校驗(yàn)和
    currentPacket.writeChecksum(checksum, ckoff, cklen);
    //寫實(shí)際數(shù)據(jù)
    currentPacket.writeData(b, offset, len);
    //chunk個(gè)數(shù)加一
    currentPacket.incNumChunks();
    //當(dāng)前block size累加len(512)
    bytesCurBlock += len;
   // If packet is full, enqueue it for transmission
    //如果校驗(yàn)塊或者寫滿了block size 則將packet放到queue中。
    // 會(huì)不會(huì)有不等于的情況發(fā)生?由于incNumChunks是加一操作,所以肯定會(huì)有currentPacket.getNumChunks() == currentPacket.getMaxChunks()
    //blockSize=65536
    //如果當(dāng)前bytesCurBlock大小大于默認(rèn)的blockSize怎么辦?這種情況好像出現(xiàn)不了
    if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||
        bytesCurBlock == blockSize) {
      if (DFSClient.LOG.isDebugEnabled()) {
        DFSClient.LOG.debug("DFSClient writeChunk packet full seqno=" +
            currentPacket.getSeqno() +
            ", src=" + src +
            ", bytesCurBlock=" + bytesCurBlock +
            ", blockSize=" + blockSize +
            ", appendChunk=" + appendChunk);
      }
      //將數(shù)據(jù)currentPacket放到隊(duì)列dataqueue中,等待線程DataStreamer 發(fā)送
      waitAndQueueCurrentPacket();

      // If the reopened file did not end at chunk boundary and the above
      // write filled up its partial chunk. Tell the summer to generate full 
      // crc chunks from now on.
      //默認(rèn)appendChunk false,如果chunk沒有寫滿則appendChunk=true,見DataStreamer構(gòu)造方法
      if (appendChunk && bytesCurBlock%bytesPerChecksum == 0) {
        appendChunk = false;
        resetChecksumBufSize();
      }

      if (!appendChunk) {
        //writePacketSize=dfs.client-write-packet-size默認(rèn)65536
        int psize = Math.min((int)(blockSize-bytesCurBlock), dfsClient.getConf().writePacketSize);
        System.out.println("psize="+psize);
        //將chunksPerPacket重新計(jì)算packetsize
       /*
    final int bodySize = psize - PacketHeader.PKT_MAX_HEADER_LEN;
    final int chunkSize = csize + getChecksumSize();
    chunksPerPacket = Math.max(bodySize/chunkSize, 1);
    packetSize = chunkSize*chunksPerPacket;
    */

        computePacketChunkSize(psize, bytesPerChecksum);
      }
      //
      // if encountering a block boundary, send an empty packet to 
      // indicate the end of block and reset bytesCurBlock.
      //當(dāng)getNumChunks() == currentPacket.getMaxChunks時(shí)候bytesCurBlock=64512
      //dataQueue:
      //0 = {DFSPacket@7262} "packet seqno: 0 offsetInBlock: 0 lastPacketInBlock: false lastByteOffsetInBlock: 64512"
      //1 = {DFSPacket@7263} "packet seqno: 1 offsetInBlock: 64512 lastPacketInBlock: false lastByteOffsetInBlock: 65024"
      //2 = {DFSPacket@7188} "packet seqno: 2 offsetInBlock: 65024 lastPacketInBlock: false lastByteOffsetInBlock: 65536"
      //當(dāng)block正好寫滿了,發(fā)送一個(gè)空packet
      if (bytesCurBlock == blockSize) {
        currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++, true);
        currentPacket.setSyncBlock(shouldSyncBlock);
        waitAndQueueCurrentPacket();
        //重新賦值
        bytesCurBlock = 0;
        lastFlushOffset = 0;
      }
    }
  }

6、DFSOutputStream.waitAndQueueCurrentPacket

判斷隊(duì)列是否已經(jīng)滿,如果滿了就等待,沒有滿就調(diào)用queueCurrentPacket方法將packet加入到隊(duì)列中。

private void waitAndQueueCurrentPacket() throws IOException {
    synchronized (dataQueue) {
      try {
      // If queue is full, then wait till we have enough space
        boolean firstWait = true;
        try {
          //如果dataQueue和ackQueue的長度大于dfs.client.write.max-packets-in-flight=80 就認(rèn)為queue滿了,就等待
          while (!isClosed() && dataQueue.size() + ackQueue.size() >
              dfsClient.getConf().writeMaxPackets) {
            if (firstWait) {
              Span span = Trace.currentSpan();
              if (span != null) {
                span.addTimelineAnnotation("dataQueue.wait");
              }
              firstWait = false;
            }
            try {
              //在DataStream中等待notifyAll
              dataQueue.wait();
            } catch (InterruptedException e) {
              // If we get interrupted while waiting to queue data, we still need to get rid
              // of the current packet. This is because we have an invariant that if
              // currentPacket gets full, it will get queued before the next writeChunk.
              //
              // Rather than wait around for space in the queue, we should instead try to
              // return to the caller as soon as possible, even though we slightly overrun
              // the MAX_PACKETS length.
              Thread.currentThread().interrupt();
              break;
            }
          }
        } finally {
          Span span = Trace.currentSpan();
          if ((span != null) && (!firstWait)) {
            span.addTimelineAnnotation("end.wait");
          }
        }
        checkClosed();
        //向隊(duì)列加入packet
        queueCurrentPacket();
      } catch (ClosedChannelException e) {
      }
    }
  }

7、DFSOutputStream.queueCurrentPacket

將數(shù)據(jù)packet加入到LinkedList隊(duì)列中

  private void queueCurrentPacket() {
    synchronized (dataQueue) {
      if (currentPacket == null) return;
      currentPacket.addTraceParent(Trace.currentSpan());
      //加入隊(duì)列
      dataQueue.addLast(currentPacket);
      lastQueuedSeqno = currentPacket.getSeqno();
      if (DFSClient.LOG.isDebugEnabled()) {
        DFSClient.LOG.debug("Queued packet " + currentPacket.getSeqno());
      }
      currentPacket = null;
      dataQueue.notifyAll();
    }
  }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 概要 64學(xué)時(shí) 3.5學(xué)分 章節(jié)安排 電子商務(wù)網(wǎng)站概況 HTML5+CSS3 JavaScript Node 電子...
    阿啊阿吖丁閱讀 9,880評論 0 3
  • Hadoop讀書筆記2-HDFS-read write file HDFS是一個(gè)分布式文件系統(tǒng),在HDFS上寫文...
    raincoffee閱讀 1,078評論 0 1
  • RTMP協(xié)議是Real Time Message Protocol(實(shí)時(shí)信息傳輸協(xié)議)的縮寫,它是由Adobe公司...
    iOS小肖閱讀 3,662評論 0 4
  • 我的媽媽今年37歲了,媽媽每天都輔導(dǎo)我寫作業(yè)。他非常辛苦,我愛我的媽媽。
    劉淼涵媽媽閱讀 299評論 0 0
  • 上周四晚上,我因?yàn)橄聵遣瓤?,左腿腓骨骨折,跟領(lǐng)導(dǎo)電話請假。 今天是第一天正式休班。 學(xué)...
    小胖兒_5b80閱讀 356評論 0 1

友情鏈接更多精彩內(nèi)容