JVM同步方法之偏向鎖

其實很早之前通過一些資料,就對偏向鎖稍微有些了解,周六準備看看Hotspot中關于偏向鎖的實現(xiàn),本以為應該暢通無阻,沒想到處處都是攔路虎,細節(jié)比較多,真是硬著頭皮看了一整天,才大概懂了點。筆者還在不斷學習,只是想把自己的筆記分享出來,理解能力有限,可能有不正確的地方,還望指正,別讓我誤導了他人??。

一:鎖的表示

Java里的鎖,主要都是對對象進行加鎖,如普通的synchronized非靜態(tài)方法,就是對當前執(zhí)行方法的對象進行加鎖。那么怎么對對象進行加鎖呢?對象的鎖其實主要就是通過對象頭的markOop進行表示的。markOop其實不是一個對象,只是一個字長的數(shù)據,在32為機器上,markOop為32個位,在64位上為64個位。markOop中不同的位區(qū)域存儲著不同的信息,但是需要注意的一點是,markOop每個位區(qū)域表示的信息不是一定的,在不同狀態(tài)下,markWord中存著不同的信息。接下來盜圖一張:

584866-20170420091115212-1624858175.jpg

由上圖可知在markWord在對象的不同狀態(tài)下,會有5種表示形式。

二:何為偏向鎖

很多情況下,一個鎖對象并不會發(fā)生被多個線程訪問得情況,更多是被同一個線程進行訪問,如果一個鎖對象每次都被同一個線程訪問,根本沒有發(fā)生并發(fā),但是每次都進行加鎖,那豈不是非常耗費性能。所以偏向鎖就被設計出來了。

偏向,也可以理解為偏心。當鎖對象第一次被某個線程訪問時,它會在其對象頭的markOop中記錄該線程ID,那么下次該線程再次訪問它時,就不需要進行加鎖了。但是這中間只要發(fā)生了其他線程訪問該鎖對象的情況,證明這個對象會發(fā)生并發(fā),就不能對這個對象再使用偏向鎖了,會進行鎖的升級,這是后話,我們這里還是主要討論下偏向鎖。

三:源碼探究

我們就以synchronized方法為入口吧。

之前在《JVM方法執(zhí)行的來龍去脈》中提到過,JVM執(zhí)行方法最后會以對應的entry_point例程作為入口。entry_point例程不僅會進行java方法棧幀的創(chuàng)建,如果是同步方法,還會進行加鎖:

address TemplateInterpreterGenerator::generate_normal_entry(bool synchronized) {
  ......
  if (synchronized) {
    // Allocate monitor and lock method
    lock_method();
  } else {
    ......
  }
  // 下面會開始執(zhí)行方法的字節(jié)碼
  ......

}

可見在執(zhí)行方法的字節(jié)碼之前,對于同步方法,entry_point例程插入了一道關卡:lock_method():

void TemplateInterpreterGenerator::lock_method() {

  .......
  // get synchronization object
  {
    Label done;
    __ movl(rax, access_flags);
    __ testl(rax, JVM_ACC_STATIC);
    // get receiver (assume this is frequent case)
    // 局部變量表中第一個變量,存放著即將鎖的對象指針,移動到rax中
    __ movptr(rax, Address(rlocals, Interpreter::local_offset_in_bytes(0)));
    __ jcc(Assembler::zero, done);
    __ load_mirror(rax, rbx);

    __ bind(done);
  }

  // add space for monitor & lock
  // 在當前棧幀中分配一個空間,用于分配一個BasicObjectLock對象
  __ subptr(rsp, entry_size); // add space for a monitor entry
  __ movptr(monitor_block_top, rsp);  // set new monitor block top
  // store object
  // 將要鎖的對象指針移動到分配的BasicObjectLock中的obj變量
  __ movptr(Address(rsp, BasicObjectLock::obj_offset_in_bytes()), rax);
  const Register lockreg = NOT_LP64(rdx) LP64_ONLY(c_rarg1);
  // 將分配的BasicObjectLock的指針移動到lockreg寄存器中
  __ movptr(lockreg, rsp); // object address
  // 加鎖
  __ lock_object(lockreg);
}

在上面的lock_method()中,會在當前方法棧幀中分配一段空間,用于分配一個BasicObjectLock對象,這個對象主要干兩件事,一是記錄將要鎖的對象指針,而是用一個字長的空間,復制鎖對象的markOop?,F(xiàn)在我們可能不知道這么做是為什么,但是后面就會清楚了。主要上面最后一步,調用了lock_object()進行加鎖:

void InterpreterMacroAssembler::lock_object(Register lock_reg) {
  ......
  // 如果使用重量級鎖,直接進入InterpreterRuntime::monitorenter()執(zhí)行
  if (UseHeavyMonitors) {
    call_VM(noreg,
            CAST_FROM_FN_PTR(address, InterpreterRuntime::monitorenter),
            lock_reg);
  } else {
    Label done;
    // cmpxchg其實就是CAS操作,必須使用rax寄存器作為老數(shù)據的存儲。
    const Register swap_reg = rax; // Must use rax for cmpxchg instruction
    const Register tmp_reg = rbx; // Will be passed to biased_locking_enter to avoid a problematic case where tmp_reg = no_reg.
    const Register obj_reg = LP64_ONLY(c_rarg3) NOT_LP64(rcx); // Will contain the oop

    ......
    Label slow_case;

    // Load object pointer into obj_reg
    movptr(obj_reg, Address(lock_reg, obj_offset));
    //如果虛擬機參數(shù)允許使用偏向鎖,那么進入biased_locking_enter()中
    if (UseBiasedLocking) {
      // lock_reg :存儲的是分配的BasicObjectLock的指針
      // obj_reg :存儲的是鎖對象的指針
      // slow_case :即InterpreterRuntime::monitorenter();
      // done :標志著獲取鎖成功。
      // slow_case 和 done 也被傳入,這樣在biased_locking_enter()中,就可以根據情況跳到這兩處了。
      biased_locking_enter(lock_reg, obj_reg, swap_reg, tmp_reg, false, done, &slow_case);
    }
    ......
    ......

    // 直接跳到這,需要進入InterpreterRuntime::monitorenter()中去獲取鎖。
    bind(slow_case);
    // Call the runtime routine for slow case
    call_VM(noreg,
            CAST_FROM_FN_PTR(address, InterpreterRuntime::monitorenter),
            lock_reg);
    // 直接跳到這表明獲取鎖成功,接下來就會返回到entry_point例程進行字節(jié)碼的執(zhí)行了。
    bind(done);
  }
}

上面可知,如果虛擬機參數(shù)允許使用偏向鎖,那么會進入biased_locking_enter()中,biased_locking_enter()這個方法涉及到了很多細節(jié),說實話在不了解這些細節(jié)的情況下直接看代碼,簡直是一頭霧水。接下來還是一邊看代碼一邊去講解細節(jié)吧。

四:偏向鎖的獲取

biased_locking_enter()也比較長,就不直接貼方法塊了,一步步分析比較好。

1:判斷鎖對象是否為偏向鎖狀態(tài)

  // mark_addr:鎖對象頭中的markOop指針。
  Address mark_addr      (obj_reg, oopDesc::mark_offset_in_bytes());
  NOT_LP64( Address saved_mark_addr(lock_reg, 0); )

  if (PrintBiasedLockingStatistics && counters == NULL) {
    counters = BiasedLocking::counters();
  }

  Label cas_label;
  int null_check_offset = -1;
  // 如果swap_reg中沒存mark_addr,那么就先將mark_addr存入swap_reg中。
  if (!swap_reg_contains_mark) {
    null_check_offset = offset();
    movptr(swap_reg, mark_addr);
  }
  // 將對象的mark_addr,即markOop指針移入tmp_reg中
  movptr(tmp_reg, swap_reg);
  // 將tmp_reg和biased_lock_mask_in_place進行與操作,biased_lock_mask_in_place為111,和它進行與就可以取出markOop中后三位,即(是否偏向鎖+鎖標志位)
  andptr(tmp_reg, markOopDesc::biased_lock_mask_in_place);
  // 將上面結果,即(是否偏向鎖+鎖標志位)和biased_lock_pattern再次比較(biased_lock_pattern為5,即101),如果不相等,則表明不為偏向鎖狀態(tài),需要進行CAS操作,跳往cas_label;否則即為偏向鎖狀態(tài),接著往下走。
  cmpptr(tmp_reg, markOopDesc::biased_lock_pattern);
  jcc(Assembler::notEqual, cas_label);

2:走到這,表明鎖對象已經為偏向鎖態(tài),需要判斷鎖對象之前是否已經偏向當前線程。

  // 將鎖對象所屬類的prototype_header移動至tmp_reg中,prototype_header中存儲的也是markOop。
  // prototype_header是專門為偏向鎖打造的,初始時類的prototype_header為偏向鎖態(tài),即后三位為101,一旦發(fā)生了bulk_revoke,那么就會設為無鎖態(tài),即001。
  // bulk_revoke為批量撤銷,每次類發(fā)生bulk_rebais時(類的所有對象重設偏向鎖),類prototype_header中的epoch就會+1,當epoch達到一個閾值時,就會發(fā)生bulk_revoke,撤銷該類每個對象的偏向鎖,這樣該類的所有對象以后都不能使用偏向鎖了,其實也就是虛擬機認為該對象不適合偏向鎖。
  load_prototype_header(tmp_reg, obj_reg);

  // 將當前線程id和類的prototype_header相或,這樣得到的markOop為(當前線程id + prototype_header中的(epoch + 分代年齡 + 偏向鎖標志 + 鎖標志位)),注意prototype_header的分代年齡那4個字節(jié)為0
  orptr(tmp_reg, r15_thread);
  // 將上面計算得到的結果與鎖對象的markOop進行異或,tmp_reg中相等的位全部被置為0,只剩下不相等的位。
  xorptr(tmp_reg, swap_reg);
  Register header_reg = tmp_reg;
  // 對((int) markOopDesc::age_mask_in_place)進行按位取反,age_mask_in_place為...0001111000,取反后,變成了...1110000111,除了分代年齡那4位,其他位全為1;
  // 將取反后的結果再與header_reg相與,這樣就把header_reg中除了分代年齡之外的其他位取了出來,即將上面異或得到的結果中分代年齡給忽略掉。
  andptr(header_reg, ~((int) markOopDesc::age_mask_in_place));
  // 如果除了分代年齡,對象的markOop和(當前線程id+其他位)相等,那么上面與操作的結果應該為0,表明對象之前已經偏向當前線程,即markOop中存放有當前線程id,那么跳到done處,直接進入方法執(zhí)行即可;否則表明當前線程還不是偏向鎖的持有者,會接著往下走。
  jcc(Assembler::equal, done);

3:走到這,表明鎖對象并沒有偏向當前線程,接下來判斷是否需要撤銷鎖對象的偏向。

  // 將header_reg和111相與,如果結果不為0,則表明header_reg后三位存在不為0的位,證明之前進行異或時,類的prototype_header后面三位與對象markOop的后三位不相等,但是能走到這,表明對象markword后三位為101,即偏向模式。現(xiàn)在類的prototype_header和對象markOop后三位不相等,即對象所屬類不再支持偏向,發(fā)生了bulk_revoke,所以需要對當前對象進行偏向鎖的撤銷;否則表明目前該類還支持偏向鎖,接著往下走。
  testptr(header_reg, markOopDesc::biased_lock_mask_in_place);
  jccb(Assembler::notZero, try_revoke_bias);

4:走到這,表明鎖對象還支持偏向鎖,需要判斷當前對象的epoch是否合法,如果不合法,需要取進行重偏向。合法的話接著往下走。

  // 測試對象所屬類的prototype_header中epoch是否為0,不為0的話則表明之前異或時,類的prototype_header中epoch和對象markOop的epoch不相等,表明類在對象分配后發(fā)生過bulk_rebais()(前面提到過,每次發(fā)生bulk_rebaise,類的prototype header中的epoch都會+1),所以之前對象的偏向就無效了,需要進行重偏向。否則接著往下走。
  testptr(header_reg, markOopDesc::epoch_mask_in_place);
  jccb(Assembler::notZero, try_rebias);

5:走到這,表明鎖對象的偏向態(tài)合法,可以嘗試去獲取鎖,使對象偏向當前線程。

  // 取出對象markOop中除線程id之外的其他位
  andptr(swap_reg,
         markOopDesc::biased_lock_mask_in_place | markOopDesc::age_mask_in_place | markOopDesc::epoch_mask_in_place);
  // 將其他位移動至 tmp_reg。
  movptr(tmp_reg, swap_reg);
  // 將其他位和當前線程id進行或,構造成一個新的完整的32位markOop,存入tmp_reg中。新的markOop因為保存了當前線程id,所以會偏向當前線程。
  orptr(tmp_reg, r15_thread);
  // 嘗試利用CAS操作將新構成的markOop存入對象頭的mark_addr處,如果設置成功,則獲取偏向鎖成功。
  // 這里說明下,cmpxchgptr操作會強制將rax寄存器(swap_reg)中內容作為老數(shù)據,與第二個參數(shù),在這里即mark_addr處的內容進行比較,如果相等,則將第一個參數(shù)的內容,即tmp_reg中的新數(shù)據,存入mark_addr。
  cmpxchgptr(tmp_reg, mark_addr); // compare tmp_reg and swap_reg
  // 上面CAS操作失敗的情況下,表明對象頭中的markOop數(shù)據已經被篡改,即有其他線程已經獲取到偏向鎖,因為偏向鎖不容許多個線程訪問同一個鎖對象,所以需要跳到slow_case處,去撤銷該對象的偏向鎖,并進行鎖升級。
  if (slow_case != NULL) {
    jcc(Assembler::notZero, *slow_case);
  }
  // 上面CAS成功的情況下,直接就跳往done處,回去執(zhí)行方法的字節(jié)碼了。
  jmp(done);

6:其實到這里,biased_locking_enter()已經結束了,不過上面多處提到了try_rebais和try_revoke,這兩個其實就是匯編里的標號,它們對應的代碼也定義在biased_locking_enter中。

  bind(try_rebias);
  // 將鎖對象所屬類的prototype_header送入tmp_reg。
  load_prototype_header(tmp_reg, obj_reg);
  // 嘗試用CAS操作,使對象的markOop重置為無線程id的偏向鎖態(tài),即不偏向任何線程。
  cmpxchgptr(tmp_reg, mark_addr); 
  // 和第5步一樣,如果CAS失敗,則表明對象頭的markOop數(shù)據已經被其他線程更改,需要跳往slow_case進行撤銷偏向鎖,否則跳往done處,執(zhí)行字節(jié)碼。
  if (slow_case != NULL) {
    jcc(Assembler::notZero, *slow_case);
  }
  jmp(done);

  bind(try_revoke_bias);
  // 走到這,表明這個類的prototype_header中已經沒有偏向鎖的位了,即這個類的所有對象都不再支持偏向鎖了,但是當前對象仍為偏向鎖狀態(tài),所以我們需要重置下當前對象的markOop為無鎖態(tài)。
  // 將鎖對象所屬類的prototype_header送入tmp_reg。
  load_prototype_header(tmp_reg, obj_reg);
  // 嘗試用CAS操作,使對象的markOop重置為無鎖態(tài)。這里是否失敗無所謂,即使失敗了,也表明其他線程已經移除了對象的偏向鎖標志。
  cmpxchgptr(tmp_reg, mark_addr); 
  //接下來會回到lock_object()方法中,繼續(xù)輕量級鎖的獲取。

五:總結

上面根據同步方法講了一下偏向鎖,筆者在這上面也啃了差不多整個周六,原理看似很簡單,但是在很多細節(jié)不清楚的情況下去看源碼,尤其是這種全是匯編代碼時,往往是一臉懵逼。而且HotSpot用一個并不是對象的markOop去表示鎖,涉及到計算時更讓人糊涂。如果大家只是想稍微了解下原理,建議還是不要太深入源碼細節(jié)。。。。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內容