<menu id="ycqsw"></menu><nav id="ycqsw"><code id="ycqsw"></code></nav>
<dd id="ycqsw"><menu id="ycqsw"></menu></dd>
  • <nav id="ycqsw"></nav>
    <menu id="ycqsw"><strong id="ycqsw"></strong></menu>
    <xmp id="ycqsw"><nav id="ycqsw"></nav>
  • java停止線程的方法(創建線程的三種方法)


    一.引言

    “操作系統的線程狀態和java的線程狀態有什么關系?”這是校招時被問到的一個問題。當時只顧著看博文、面經等零散的資料,沒有形成系統的知識體系,一時語塞,答的不是很對。在網上也沒找到足夠細致地講解博文,于是整理出了這篇內容。

    Java鎖與線程的那些“不可描述”的事兒

    Java的線程狀態牽扯到了同步語義,要探討Java的線程狀態的,必不可免要回顧其鎖機制。因此本文的主要分為兩大塊:一是Synchronized源碼粗析,分析了各類鎖的進入、釋放、升級過程,并大致說明了monitor原理;二是介紹了線程的實現方式和Java線程狀態轉換的部分細節。

    二. Synchronized鎖

    Java采用synchronized關鍵字、以互斥同步的方式的解決線程安全問題,那么什么是線程安全呢?這里引用《Java并發編程實戰》作者Brian Goetz給出的定義:

    當多個線程同時訪問一個對象時,如果不用考慮這些線程在運行時環境下的調度和交替執行,也不需要進行額外的同步,或者在調用方進行任何其他的協調操作,調用這個對象的行為都可以獲得正確的結果,那就稱這個對象是線程安全的?!?Brian Goetz

    2.1 Synchronized的使用

    先寫過個demo,大致過一下synchronized的使用,包含同步代碼塊、實例方法和靜態方法。

    public synchronized void test1(){
      }
    
      public void test2(){
        synchronized(new Test()){
        }
      }
    
      public static synchronized void test3(){
      }

    反編譯可查看字節碼:

    public synchronized void test1();
        descriptor: ()V
        flags: ACC_PUBLIC, ACC_SYNCHRONIZED    // here
    
      public void test2();
        descriptor: ()V
        flags: ACC_PUBLIC
        Code:
          stack=2, locals=3, args_size=1
             0: new           #2                  // class com/easy/helloworld/Test
             3: dup
             4: invokespecial #3                  // Method "<init>":()V
             7: dup
             8: astore_1
             9: monitorenter                   // here
            10: aload_1
            11: monitorexit                    // here
            12: goto          20
            15: astore_2
            16: aload_1
            17: monitorexit                    // here
            18: aload_2
            19: athrow
            20: return
    
      public static synchronized void test3();
        descriptor: ()V
        flags: ACC_PUBLIC, ACC_STATIC, ACC_SYNCHRONIZED   // here

    可以觀察到:

    • 同步代碼:通過moniterenter、moniterexit 關聯到到一個monitor對象,進入時設置Owner為當前線程,計數+1、退出-1。除了正常出口的 monitorexit,還在異常處理代碼里插入了 monitorexit。
    • 實例方法:隱式調用moniterenter、moniterexit
    • 靜態方法:隱式調用moniterenter、moniterexit

    2.2 Moniterenter、Moniterexit

    monitorenter和monitorexit這兩個jvm指令,主要是基于 Mark Word 和 Object monitor 來實現的。

    在 JVM 中,對象在內存中分為三塊區域:

    • 對象頭:由 Mark Word 和 Klass Point 構成。
      • 1. Mark Word(標記字段): 用于存儲對象自身的運行時數據,例如存儲對象的HashCode,分代年齡、鎖標志位等信息,是synchronized實現輕量級鎖和偏向鎖的關鍵。64位JVM的Mark Word組成如下:
      • 2. Klass Point(類型指針): 對象指向它的類元數據的指針,虛擬機通過這個指針來確定這個對象是哪個類的實例。
    • 實例數據:這部分主要是存放類的數據信息,父類的信息。
    • 字節對齊:為了內存的IO性能,JVM要求對象起始地址必須是8字節的整數倍。對于不對齊的對象,需要填充數據進行對齊。

    在JDK 1.6之前, synchronized 只有傳統的鎖機制,直接關聯到 monitor 對象,存在性能上的瓶頸。在JDK 1.6后,為了提高鎖的獲取與釋放效率,JVM引入了兩種鎖機制:偏向鎖和輕量級鎖。它們的引入是為了解決在沒有多線程競爭或基本沒有競爭的場景下因使用傳統鎖機制帶來的性能開銷問題。這幾種鎖的實現和轉換正是依靠對象頭中的 Mark Word 。

    本章內容近萬字,時間不充裕的同學可以直接看 本章小節 。

    2.3 偏向鎖

    引入偏向鎖的目的:在沒有多線程競爭的情況下,盡量減少不必要的輕量級鎖的執行。輕量級鎖的獲取及釋放依賴多次CAS原子指令,而偏向鎖只依賴一次CAS原子指令。但在多線程競爭時,需要進行偏向鎖撤銷步驟,因此其撤銷的開銷必須小于節省下來的CAS開銷,否則偏向鎖并不能帶來收益。JDK 1.6中默認開啟偏向鎖,可以通過-XX:-UseBiasedLocking來禁用偏向鎖。

    2.3.1 進入偏向鎖

    關于HotSpot虛擬機中獲取鎖的入口,網上主要有兩種看法:一為interpreterRuntime.cpp#monitorenter#1608;二為bytecodeInterpreter.cpp#1816。在HotSpot的中,有兩處地方對 monitorenter 指令進行解析:一個是bytecodeInterpreter.cpp#1816 ,另一個在templateTable_x86_64.cpp#3667。其中, bytecodeInterpreter 是JVM中的字節碼解釋器, templateInterpreter 為模板解釋器。HotSpot對運行效率有著極其執著的追求,顯然會傾向于用模板解釋器來實現。R大的讀書筆記中有說明,HotSpot中只用到了模板解釋器,并沒有用到字節碼解釋器。因此,本文認為 montorenter 的解析入口為templateTable_x86_64.cpp#3667。

    但模板解釋器 templateInterpreter 都是匯編代碼,不易讀,且實現邏輯與字節碼解釋器 bytecodeInterpreter 大體一致。因此本文的源碼都以 bytecodeInterpreter 來說明,借此窺探 synchronized 的實現原理。在看代碼之前,先介紹幾個在偏向鎖中會被大量應用的概念,以便后續理解:

    prototype_header :JVM中的每個類有一個類似 mark word 的 prototype_header ,用來標記該class的 epoch 和偏向開關等信息。

    匿名偏向狀態 :鎖對象mark word標志位為101,且存儲的 Thread ID 為空時的狀態(即鎖對象為偏向鎖,且沒有線程偏向于這個鎖對象)。

    Atomic::cmpxchg_ptr :CAS函數。這個方法有三個參數,依次為 exchange_value 、 dest 、 compare_value 。如果dest的值為 compare_value 則更新為 exchange_value ,并返回 compare_value 。否則,不更新并返回 實際原值 。

    接下來開始源碼實現分析,HotSpot中偏向鎖的具體實現可參考bytecodeInterpreter.cpp#1816,代碼如下:

    CASE(_monitorenter): {
      //鎖對象
      oop lockee = STACK_OBJECT(-1);
      // derefing's lockee ought to provoke implicit null check
      CHECK_NULL(lockee);
      // 步驟1
      // 在棧中找到第一個空閑的Lock Record
      // 會找到棧中最高的
      BasicObjectLock* limit = istate->monitor_base();
      BasicObjectLock* most_recent = (BasicObjectLock*) istate->stack_base();
      BasicObjectLock* entry = NULL;
      while (most_recent != limit ) {
        if (most_recent->obj() == NULL) entry = most_recent;
        else if (most_recent->obj() == lockee) break;
        most_recent++;
      }
      // entry不為null,代表還有空閑的Lock Record
      if (entry != NULL) {
        // 將Lock Record的obj指針指向鎖對象
        entry->set_obj(lockee);
        int success = false;
        uintptr_t epoch_mask_in_place = (uintptr_t)markOopDesc::epoch_mask_in_place;
        // markoop即對象頭的mark word
        markOop mark = lockee->mark();
        intptr_t hash = (intptr_t) markOopDesc::no_hash;
        // 步驟2
        // implies UseBiasedLocking
        // 如果為偏向模式,即判斷標識位是否為101
        if (mark->has_bias_pattern()) {
          ...
          // 一頓操作
          anticipated_bias_locking_value =
            (((uintptr_t)lockee->klass()->prototype_header() | thread_ident) ^ (uintptr_t)mark) &
            ~((uintptr_t) markOopDesc::age_mask_in_place);
          // 步驟3
          if  (anticipated_bias_locking_value == 0) {
            // already biased towards this thread, nothing to do
            // 偏向的是自己,啥都不做
            if (PrintBiasedLockingStatistics) {
              (* BiasedLocking::biased_lock_entry_count_addr())++;
            }
            success = true;
          }
          // class的prototype_header不是偏向模式
          else if ((anticipated_bias_locking_value & markOopDesc::biased_lock_mask_in_place) != 0) {
            // 嘗試撤銷偏向
                    ...
          }
          // epoch過期,重新偏向
          else if ((anticipated_bias_locking_value & epoch_mask_in_place) !=0) {
            // try rebias
                    ...
            success = true;    
          }
          else {
            // try to bias towards thread in case object is anonymously biased
            // 嘗試偏向該線程,只有匿名偏向能成功
            // 構建了匿名偏向的mark word
            markOop header = (markOop) ((uintptr_t) mark & ((uintptr_t)markOopDesc::biased_lock_mask_in_place |(uintptr_t)markOopDesc::age_mask_in_place |epoch_mask_in_place));
            if (hash != markOopDesc::no_hash) {
              header = header->copy_set_hash(hash);
            }
            // 用「或」操作設置thread ID
            markOop new_header = (markOop) ((uintptr_t) header | thread_ident);、
            // 只有匿名偏向才能成功
            if (Atomic::cmpxchg_ptr((void*)new_header, lockee->mark_addr(), header) == header) {
              // cas修改成功    
              if (PrintBiasedLockingStatistics)
                (* BiasedLocking::anonymously_biased_lock_entry_count_addr())++;
            }
            else {
              // 失敗說明存在競爭,進入monitorenter
              CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
            }
            success = true;
          }
        }
        // 步驟4
        // traditional lightweight locking
        // false走輕量級鎖邏輯
        if (!success) {
          // 構造一個無鎖狀態的Displaced Mark Word,并將lock record指向它
          markOop displaced = lockee->mark()->set_unlocked();
          entry->lock()->set_displaced_header(displaced);
          bool call_vm = UseHeavyMonitors;
          if (call_vm || Atomic::cmpxchg_ptr(entry, lockee->mark_addr(), displaced) != displaced) {
            // 如果CAS替換不成功,代表鎖對象不是無鎖狀態,這時候判斷下是不是鎖重入
            // Is it simple recursive case?
            if (!call_vm && THREAD->is_lock_owned((address) displaced->clear_lock_bits())) {
              // 如果是鎖重入,則直接將Displaced Mark Word設置為null
              // 輕量級鎖重入是使用lock record的數量來計入的
              entry->lock()->set_displaced_header(NULL);
            } else {
              CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
            }
          }
        }
        UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1);
      } else {
        // 沒拿到lock record,重新執行
        istate->set_msg(more_monitors);
        UPDATE_PC_AND_RETURN(0); // Re-execute
      }
    }

    偏向鎖流程:

    步驟 1 、從當前線程的棧中找到一個空閑的 Lock Record ,并指向當前鎖對象。

    步驟 2 、獲取對象的markOop數據mark,即對象頭的Mark Word;

    步驟 3 、判斷鎖對象的 mark word 是否是偏向模式,即第3位是否為101。若不是,進入步驟4。若是,計算
    anticipated_bias_locking_value ,判斷偏向狀態:

    步驟 3.1 、
    anticipated_bias_locking_value 若為0,代表 偏向的線程是當前線程 且 mark word 的epoch等于class的epoch,這種情況下直接執行同步代碼塊,什么都不用做。

    步驟 3.2 、判斷class的 prototype_header 是否為非偏向模式。若為非偏向模式,CAS嘗試將對象恢復為無鎖狀態。無論cas是否成功都會進入輕量級鎖邏輯。

    步驟 3.3 、如果epoch偏向 時間戳已過期 ,則需要重偏向。利用CAS指令將鎖對象的 mark word 替換為一個偏向當前線程且epoch為類的epoch的新的 mark word 。

    步驟 3.4 、CAS將偏向線程改為當前線程,如果當前是 匿名偏向 (即對象頭中的bit field存儲的Thread ID為空)且 無并發沖突 ,則能 修改成功 獲取偏向鎖,否則進入 鎖升級 的邏輯。

    步驟 4 、走到一步會進行輕量級鎖邏輯。構造一個無鎖狀態的 mark word ,然后存儲到 Lock Record 。設置為無鎖狀態的原因是:輕量級鎖解鎖時是將對象頭的 mark word cas替換為 Lock Record 中的 Displaced Mark Word ,所以設置為無鎖狀態。如果是鎖重入,則將 Lock Record 的 Displaced Mark Word 設置為null,放到棧幀中,起到計數作用。

    以上是偏向鎖加鎖的大致流程,如果當前鎖 已偏向其他線程 || epoch值過期 || class偏向模式關閉 || 獲取偏向鎖的過程中存在并發沖突 ,都會進入到
    InterpreterRuntime::monitorenter 方法, 在該方法中會進行偏向鎖撤銷和升級。流程如下圖所示:

    Java鎖與線程的那些“不可描述”的事兒

    Issue:有的同學可能會問了,對象一開始不是無鎖狀態嗎,為什么上述偏向鎖邏輯沒有判斷 無鎖狀態的鎖對象 (001)?

    只有匿名偏向的對象才能進入偏向鎖模式。偏向鎖是延時初始化的,默認是4000ms。初始化后會將所有加載的Klass的prototype header修改為匿名偏向樣式。當創建一個對象時,會通過Klass的prototype_header來初始化該對象的對象頭。簡單的說,偏向鎖初始化結束后,后續所有對象的對象頭都為 匿名偏向 樣式,在此之前創建的對象則為 無鎖狀態 。而對于無鎖狀態的鎖對象,如果有競爭,會直接進入到輕量級鎖。這也是為什么JVM啟動前4秒對象會直接進入到輕量級鎖的原因。

    為什么需要延遲初始化?

    JVM啟動時必不可免會有大量sync的操作,而偏向鎖并不是都有利。如果開啟了偏向鎖,會發生大量鎖撤銷和鎖升級操作,大大降低JVM啟動效率。

    因此,我們可以明確地說,只有鎖對象處于 匿名偏向 狀態,線程才能拿到到我們通常意義上的偏向鎖。而處于無鎖狀態的鎖對象,只能進入到輕量級鎖狀態。

    2.3.2 偏向鎖的撤銷

    偏向鎖的 撤銷 (revoke)是一個很特殊的操作,為了執行撤銷操作,需要等待 全局安全點 ,此時所有的工作線程都停止了執行。偏向鎖的撤銷操作并不是將對象恢復到無鎖可偏向的狀態,而是在偏向鎖的獲取過程中,發現競爭時,直接將一個被偏向的對象 升級到 被加了輕量級鎖的狀態。這個操作的具體完成方式如下:

    IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorenter(JavaThread* thread, BasicObjectLock* elem))
      ...
      Handle h_obj(thread, elem->obj());
      assert(Universe::heap()->is_in_reserved_or_null(h_obj()),
             "must be NULL or an object");
        // 開啟了偏向鎖
      if (UseBiasedLocking) {
        // Retry fast entry if bias is revoked to avoid unnecessary inflation
        ObjectSynchronizer::fast_enter(h_obj, elem->lock(), true, CHECK);
      } else {
        ObjectSynchronizer::slow_enter(h_obj, elem->lock(), CHECK);
      }
      ...

    如果開啟了JVM偏向鎖,則會進入到
    ObjectSynchronizer::fast_enter 方法中。

    void ObjectSynchronizer::fast_enter(Handle obj, BasicLock* lock, bool attempt_rebias, TRAPS) {
     //再次校驗
     if (UseBiasedLocking) {
        if (!SafepointSynchronize::is_at_safepoint()) {
          //不在安全點的執行
          BiasedLocking::Condition cond = BiasedLocking::revoke_and_rebias(obj, attempt_rebias, THREAD);
          if (cond == BiasedLocking::BIAS_REVOKED_AND_REBIASED) {
            return;
          }
        } else {
          assert(!attempt_rebias, "can not rebias toward VM thread");    
          //批量撤銷,底層調用bulk_revoke_or_rebias_at_safepoint
          BiasedLocking::revoke_at_safepoint(obj);
        }
        assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now");
     }
     slow_enter (obj, lock, THREAD) ;
    }

    主要看
    BiasedLocking::revoke_and_rebias 方法。這個方法的主要作用像它的方法名:撤銷或者重偏向。第一個參數封裝了鎖對象和當前線程,第二個參數代表是否允許重偏向,這里是true。

    BiasedLocking::Condition BiasedLocking::revoke_and_rebias(Handle obj, bool attempt_rebias, TRAPS) {
      assert(!SafepointSynchronize::is_at_safepoint(), "must not be called while at safepoint");
      markOop mark = obj->mark(); //獲取鎖對象的對象頭
      if (mark->is_biased_anonymously() && !attempt_rebias) {
        // 如果鎖對象為匿名偏向狀態且不允許重偏向下,進入該分支。在一個非全局安全點進行偏向鎖撤銷
        markOop biased_value       = mark;
        // 創建一個匿名偏向的markword
        markOop unbiased_prototype = markOopDesc::prototype()->set_age(mark->age());
        // 通過cas重新設置偏向鎖狀態
        markOop res_mark = (markOop) Atomic::cmpxchg_ptr(unbiased_prototype, obj->mark_addr(), mark);
        if (res_mark == biased_value) {// 如果CAS成功,返回偏向鎖撤銷狀態
          return BIAS_REVOKED;
        }
      } else if (mark->has_bias_pattern()) {
        // 鎖為偏向模式(101)會走到這里 
        Klass* k = obj->klass(); 
        markOop prototype_header = k->prototype_header();
        // 如果對應class關閉了偏向模式
        if (!prototype_header->has_bias_pattern()) {
          markOop biased_value       = mark;
          // CAS更新對象頭markword為非偏向鎖
          markOop res_mark = (markOop) Atomic::cmpxchg_ptr(prototype_header, obj->mark_addr(), mark);
          assert(!(*(obj->mark_addr()))->has_bias_pattern(), "even if we raced, should still be revoked");
          return BIAS_REVOKED; // 返回偏向鎖撤銷狀態
        } else if (prototype_header->bias_epoch() != mark->bias_epoch()) {
          // 如果epoch過期,則進入當前分支
          if (attempt_rebias) {
            // 如果允許重偏
            assert(THREAD->is_Java_thread(), "");
            markOop biased_value       = mark;
            markOop rebiased_prototype = markOopDesc::encode((JavaThread*) THREAD, mark->age(), prototype_header->bias_epoch());
            // 通過CAS操作, 將本線程的 ThreadID 、時間戳、分代年齡嘗試寫入對象頭中
            markOop res_mark = (markOop) Atomic::cmpxchg_ptr(rebiased_prototype, obj->mark_addr(), mark);
            if (res_mark == biased_value) { //CAS成功,則返回撤銷和重新偏向狀態
              return BIAS_REVOKED_AND_REBIASED;
            }
          } else {
            // 如果不允許嘗試獲取偏向鎖,進入該分支取消偏向
            // 通過CAS操作更新分代年齡
            markOop biased_value       = mark;
            markOop unbiased_prototype = markOopDesc::prototype()->set_age(mark->age());
            markOop res_mark = (markOop) Atomic::cmpxchg_ptr(unbiased_prototype, obj->mark_addr(), mark);
            if (res_mark == biased_value) { //如果CAS操作成功,返回偏向鎖撤銷狀態
              return BIAS_REVOKED;
            }
          }
        }
      }
      //執行到這里有以下兩種情況:
      //1.對象不是偏向模式
      //2.上面的cas操作失敗
      HeuristicsResult heuristics = update_heuristics(obj(), attempt_rebias);
      if (heuristics == HR_NOT_BIASED) {
        // 非偏向從這出去
        // 輕量級鎖、重量級鎖
        return NOT_BIASED;
      } else if (heuristics == HR_SINGLE_REVOKE) {
        // 撤銷單個線程
        // Mark,最常見的執行分支
        // Mark,最常見的執行分支
        // Mark,最常見的執行分支
        Klass *k = obj->klass();
        markOop prototype_header = k->prototype_header();
        if (mark->biased_locker() == THREAD &&
            prototype_header->bias_epoch() == mark->bias_epoch()) {
          // 偏向當前線程且不過期
          // 這里撤銷的是偏向當前線程的鎖,調用Object#hashcode方法時也會走到這一步
          // 因為只要遍歷當前線程的棧就能拿到lock record了,所以不需要等到safe point再撤銷。
          ResourceMark rm;
          if (TraceBiasedLocking) {
            tty->print_cr("Revoking bias by walking my own stack:");
          }
          BiasedLocking::Condition cond = revoke_bias(obj(), false, false, (JavaThread*) THREAD);
          ((JavaThread*) THREAD)->set_cached_monitor_info(NULL);
          assert(cond == BIAS_REVOKED, "why not?");
          return cond;
        } else {
          // 下面代碼最終會在safepoint調用revoke_bias方法撤銷偏向
          VM_RevokeBias revoke(&obj, (JavaThread*) THREAD);
          VMThread::execute(&revoke);
          return revoke.status_code();
        }
      }
      assert((heuristics == HR_BULK_REVOKE) ||
             (heuristics == HR_BULK_REBIAS), "?");
       //批量撤銷、批量重偏向的邏輯
      VM_BulkRevokeBias bulk_revoke(&obj, (JavaThread*) THREAD,
                                    (heuristics == HR_BULK_REBIAS),
                                    attempt_rebias);
      VMThread::execute(&bulk_revoke);
      return bulk_revoke.status_code();
    }

    這塊代碼注釋寫的算是比較清楚,只簡單介紹下最常見的情況:鎖已經偏向線程A,此時線程B嘗試獲取鎖。這種情況下會走到Mark標記的分支。如果需要撤銷的是當前線程,只要遍歷當前線程的棧就能拿到lock record,可以直接調用 revoke_bias ,不需要等到safe point再撤銷。在調用Object#hashcode時,也會走到該分支將為偏向鎖的鎖對象直接恢復為無鎖狀態。若不是當前線程,會被push到VM Thread中等到 safepoint 的時候再執行。

    VMThread內部維護了一個VMOperationQueue類型的隊列,用于保存內部提交的VM線程操作VM_operation。GC、偏向鎖的撤銷等操作都是在這里被執行。

    撤銷調用的 revoke_bias 方法的代碼就不貼了。大致邏輯是:

    步驟 1 、查看偏向的線程是否存活,如果已經死亡,則直接撤銷偏向鎖。JVM維護了一個集合存放所有存活的線程,通過遍歷該集合判斷某個線程是否存活。

    步驟 2 、偏向的線程是否還在同步塊中,如果不在,則撤銷偏向鎖。如果在同步塊中,執行步驟3。這里 是否在同步塊的判斷 基于上文提到的偏向鎖的重入計數方式:在偏向鎖的獲取中,每次進入同步塊的時候都會在棧中找到第一個可用(即棧中最高的)的 Lock Record ,將其obj字段指向鎖對象。每次解鎖的時候都會把最低的 Lock Record 移除掉,所以可以通過遍歷線程棧中的 Lock Record 來判斷是否還在同步塊中。輕量級鎖的重入也是基于 Lock Record 的計數來判斷。

    步驟 3 、升級為輕量級鎖。將偏向線程所有相關 Lock Record 的 Displaced Mark Word 設置為null,再將最高位的 Lock Record 的 Displaced Mark Word 設置為無鎖狀態,然后將對象頭指向最高位的 Lock Record 。這里沒有用到CAS指令,因為是在 safepoint ,可以直接升級成輕量級鎖。

    2.3.3 偏向鎖的釋放

    偏向鎖的釋放可參考bytecodeInterpreter.cpp#1923,這里也不貼了。偏向鎖的釋放只要將對應 Lock Record 釋放就好了,但這里的釋放并不會將mark word里面的thread ID去掉,這樣做是為了下一次更方便的加鎖。而輕量級鎖則需要將 Displaced Mark Word 替換到對象頭的mark word中。如果CAS失敗或者是重量級鎖則進入到
    InterpreterRuntime::monitorexit 方法中。

    2.3.4 批量重偏向與撤銷

    從上節偏向鎖的加鎖解鎖過程中可以看出,當只有一個線程反復進入同步塊時,偏向鎖帶來的性能開銷基本可以忽略,但是當有其他線程嘗試獲得鎖時,就需要等到 safe point 時將偏向鎖撤銷為無鎖狀態或升級為輕量級/重量級鎖。因此,JVM中增加了一種批量重偏向/撤銷的機制以減少鎖撤銷的開銷,而mark word中的epoch也是在這里被大量應用,這里不展開說明。但無論怎么優化,偏向鎖的撤銷仍有一定不可避免的成本。如果業務場景存在大量多線程競爭,那偏向鎖的存在不僅不能提高性能,而且會導致性能下降( 偏向鎖并不都有利,jdk15默認不開啟 )。

    2.4 輕量級鎖

    引入輕量級鎖的目的:在多線程交替執行同步塊的情況下,盡量避免重量級鎖使用的操作系統互斥量帶來的開銷,但是如果多個線程在同一時刻進入臨界區,會導致輕量級鎖膨脹升級重量級鎖,所以輕量級鎖的出現并非是要替代重量級鎖。

    2.4.1 進入輕量級鎖

    輕量級鎖在上文或多或少已經涉及到,其獲取流程入口為bytecodeInterpreter.cpp#1816。前大半部分都是偏向鎖邏輯,還有一部分為輕量級鎖邏輯。在偏向鎖邏輯中,cas失敗會執行到
    InterpreterRuntime::monitorenter 。在輕量級鎖邏輯中,如果當前線程不是輕量級鎖的重入,也會執行到
    InterpreterRuntime::monitorenter 。我們再看看
    InterpreterRuntime::monitorenter 方法:

    IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorenter(JavaThread* thread, BasicObjectLock* elem))
      ...
      Handle h_obj(thread, elem->obj());
      assert(Universe::heap()->is_in_reserved_or_null(h_obj()),
             "must be NULL or an object");
      if (UseBiasedLocking) {
        // Retry fast entry if bias is revoked to avoid unnecessary inflation
        ObjectSynchronizer::fast_enter(h_obj, elem->lock(), true, CHECK);
      } else {
        ObjectSynchronizer::slow_enter(h_obj, elem->lock(), CHECK);
      }
      ...
    IRT_END

    fast_enter 的流程在偏向鎖的撤銷小節中已經分析過,主要邏輯為 revoke_and_rebias :如果當前是偏向模式且偏向的線程還在使用鎖,會將鎖的 mark word 改為輕量級鎖的狀態,并將偏向的線程棧中的 Lock Record 修改為輕量級鎖對應的形式(此時Lock Record是無鎖狀態),且返回值不是 BIAS_REVOKED_AND_REBIASED ,會繼續執行 slow_enter 。

    我們直接看 slow_enter 的流程:

    void ObjectSynchronizer::slow_enter(Handle obj, BasicLock* lock, TRAPS) {
      // 步驟1
      markOop mark = obj->mark();
      assert(!mark->has_bias_pattern(), "should not see bias pattern here");
      // 步驟2
      // 如果為無鎖狀態
      if (mark->is_neutral()) {
        // 步驟3
        // 設置mark word到棧 
        lock->set_displaced_header(mark);
        // CAS更新指向棧中Lock Record的指針
        if (mark == (markOop) Atomic::cmpxchg_ptr(lock, obj()->mark_addr(), mark)) {
          TEVENT (slow_enter: release stacklock) ;
          return ;
        }
        // Fall through to inflate() ... cas失敗走下面鎖膨脹方法
      } else if (mark->has_locker() && THREAD->is_lock_owned((address)mark->locker())) {
        // 步驟4
        // 為輕量級鎖且owner為當前線程
        assert(lock != mark->locker(), "must not re-lock the same lock");
        assert(lock != (BasicLock*)obj->mark(), "don't relock with same BasicLock");
        // 設置Displaced Mark Word為null,重入計數用
        lock->set_displaced_header(NULL);
        return;
      }
      // 步驟5
      // 走到這一步說明已經是存在多個線程競爭鎖了,需要膨脹或已經是重量級鎖
      lock->set_displaced_header(markOopDesc::unused_mark());
      // 進入、膨脹到重量級鎖的入口
      // 膨脹后再調用monitor的enter方法競爭鎖
      ObjectSynchronizer::inflate(THREAD, obj())->enter(THREAD);
    }

    步驟 1 、 markOop mark = obj->mark() 方法獲取對象的markOop數據mark;

    步驟 2 、 mark->is_neutral() 方法判斷mark是否為無鎖狀態,標識位 001 ;

    步驟 3 、如果mark處于無鎖狀態,把mark保存到BasicLock對象(Lock Record的屬性)的displaced_header字段;

    步驟 3.1 、通過CAS嘗試將Mark Word更新為指向BasicLock對象的指針,如果更新成功,表示競爭到鎖,則執行同步代碼,否則執行步驟4;

    步驟 4 、如果是重入,則設置Displaced Mark Word為null。

    步驟 5 、到這說明有多個線程競爭輕量級鎖,輕量級鎖需要膨脹升級為重量級鎖;

    結合上文偏向鎖的流程,可以整理得到如下的流程圖:

    Java鎖與線程的那些“不可描述”的事兒

    2.4.2 輕量級鎖的釋放

    輕量級鎖釋放的入口在bytecodeInterpreter.cpp#1923。

    輕量級鎖釋放時需要將 Displaced Mark Word 替換回對象頭的 mark word 中。如果CAS失敗或者是重量級鎖則進入到
    InterpreterRuntime::monitorexit 方法中。 monitorexit 直接調用 slow_exit 方法釋放 Lock Record 。直接看 slow_exit :

    IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorexit(JavaThread* thread, BasicObjectLock* elem))
      Handle h_obj(thread, elem->obj());
      assert(Universe::heap()->is_in_reserved_or_null(h_obj()),
             "must be NULL or an object");
      if (elem == NULL || h_obj()->is_unlocked()) {
        THROW(vmSymbols::java_lang_IllegalMonitorStateException());
      }
      // 直接調用slow_exit
      ObjectSynchronizer::slow_exit(h_obj(), elem->lock(), thread);
      // Free entry. This must be done here, since a pending exception might be installed on
      // exit. If it is not cleared, the exception handling code will try to unlock the monitor again.
      elem->set_obj(NULL);
    IRT_END
    
    void ObjectSynchronizer::slow_exit(oop object, BasicLock* lock, TRAPS) {
      fast_exit (object, lock, THREAD) ;
    }
    
    void ObjectSynchronizer::fast_exit(oop object, BasicLock* lock, TRAPS) {
      ...
      // displaced header就是對象mark word的拷貝
      markOop dhw = lock->displaced_header();
      markOop mark ;
      if (dhw == NULL) {
         // 什么也不做
         // Recursive stack-lock. 遞歸堆棧鎖
         // Diagnostics -- Could be: stack-locked, inflating, inflated. 
         ...
         return ;
      }
      mark = object->mark() ;
      // 此處為輕量級鎖的釋放過程,使用CAS方式解鎖。
      // 如果對象被當前線程堆棧鎖定,嘗試將displaced header和鎖對象中的MarkWord替換回來。
      // If the object is stack-locked by the current thread, try to
      // swing the displaced header from the box back to the mark.
      if (mark == (markOop) lock) {
         assert (dhw->is_neutral(), "invariant") ;
         if ((markOop) Atomic::cmpxchg_ptr (dhw, object->mark_addr(), mark) == mark) {
            TEVENT (fast_exit: release stacklock) ;
            return;
         }
      }
      //走到這里說明已經是重量級鎖或者解鎖時發生了競爭,膨脹后再調用monitor的exit方法釋放
      ObjectSynchronizer::inflate(THREAD, object)->exit (true, THREAD) ;
    }

    最后執行的是如果是fast_exit方法。如果是輕量級鎖,嘗試cas替換 mark word 。若解鎖時有競爭,會調用 inflate 方法進行重量級鎖膨脹,升級到到重量級鎖后再執行 exit 方法。

    2.5 重量級鎖

    2.5.1 重量級鎖的進入

    重量級鎖通過對象內部的監視器(monitor)實現,其依賴于底層操作系統的 Mutex Lock 實現,需要額外的用戶態到內核態切換的開銷。由上文分析, slow_enter 獲取輕量級鎖未成功時,會在 inflate 中完成鎖膨脹:

    ObjectMonitor * ATTR ObjectSynchronizer::inflate (Thread * Self, oop object) {
      ...
      for (;;) {
          const markOop mark = object->mark() ;
          assert (!mark->has_bias_pattern(), "invariant") ;  
          // mark是以下狀態中的一種:
          // *  Inflated(重量級鎖狀態)     - 直接返回
          // *  Stack-locked(輕量級鎖狀態) - 膨脹
          // *  INFLATING(膨脹中)    - 忙等待直到膨脹完成
          // *  Neutral(無鎖狀態)      - 膨脹
          // *  BIASED(偏向鎖)       - 非法狀態,在這里不會出現
    
          // CASE: inflated
          if (mark->has_monitor()) {
              // 已經是重量級鎖狀態了,直接返回
              ObjectMonitor * inf = mark->monitor() ;
              ...
              return inf ;
          }
          // CASE: inflation in progress
          if (mark == markOopDesc::INFLATING()) {
             // 正在膨脹中,說明另一個線程正在進行鎖膨脹,continue重試
             TEVENT (Inflate: spin while INFLATING) ;
             // 在該方法中會進行spin/yield/park等操作完成自旋動作 
             ReadStableMark(object) ;
             continue ;
          }
          // 當前是輕量級鎖,后面分析
          // CASE: stack-locked
              if (mark->has_locker()) {
            ...
          }
          // 無鎖狀態
          // CASE: neutral
          // 分配以及初始化ObjectMonitor對象
          ObjectMonitor * m = omAlloc (Self) ;
          // prepare m for installation - set monitor to initial state
          m->Recycle();
          m->set_header(mark);
          // owner為NULL
          m->set_owner(NULL);
          m->set_object(object);
          m->OwnerIsThread = 1 ;
          m->_recursions   = 0 ;
          m->_Responsible  = NULL ;
          m->_SpinDuration = ObjectMonitor::Knob_SpinLimit ;       // consider: keep metastats by type/class
            // 用CAS替換對象頭的mark word為重量級鎖狀態
          if (Atomic::cmpxchg_ptr (markOopDesc::encode(m), object->mark_addr(), mark) != mark) {
              // 不成功說明有另外一個線程在執行inflate,釋放monitor對象
              m->set_object (NULL) ;
              m->set_owner  (NULL) ;
              m->OwnerIsThread = 0 ;
              m->Recycle() ;
              omRelease (Self, m, true) ;
              m = NULL ;
              continue ;
              // interference - the markword changed - just retry.
              // The state-transitions are one-way, so there's no chance of
              // live-lock -- "Inflated" is an absorbing state.
          }
    
          ...
          return m ;
    }

    inflate 其中是一個for循環,主要是為了處理多線程同時調用inflate的情況。然后會根據鎖對象的狀態進行不同的處理:

    1. 已經是重量級狀態,說明膨脹已經完成,返回并繼續執行ObjectMonitor::enter方法。
    2. 如果是輕量級鎖則需要進行膨脹操作。
    3. 如果是膨脹中狀態,則進行忙等待。
    4. 如果是無鎖狀態則需要進行膨脹操作。

    輕量級鎖膨脹流程如下:

    if (mark->has_locker()) {
      // 步驟1
      // 當前輕量級鎖狀態,先分配一個ObjectMonitor對象,并初始化值
      ObjectMonitor * m = omAlloc (Self) ;          
      m->Recycle();
      m->_Responsible  = NULL ;
      m->OwnerIsThread = 0 ;
      m->_recursions   = 0 ;
      m->_SpinDuration = ObjectMonitor::Knob_SpinLimit ;   // Consider: maintain by type/class
      // 步驟2
      // 將鎖對象的mark word設置為INFLATING (0)狀態 
      markOop cmp = (markOop) Atomic::cmpxchg_ptr (markOopDesc::INFLATING(), object->mark_addr(), mark) ;
      if (cmp != mark) {
        omRelease (Self, m, true) ;
        continue ;       // Interference -- just retry
      }
      // 步驟3
      // 棧中的displaced mark word
      markOop dmw = mark->displaced_mark_helper() ;
      assert (dmw->is_neutral(), "invariant") ;
      // 設置monitor的字段
      m->set_header(dmw) ;
      // owner為Lock Record
      m->set_owner(mark->locker());
      m->set_object(object);
      ...
      // 步驟4
      // 將鎖對象頭設置為重量級鎖狀態
      object->release_set_mark(markOopDesc::encode(m));
      ...
      return m ;
    }
    

    步驟 1 、調用 omAlloc 獲取一個可用的 ObjectMonitor 對象。在 omAlloc 方法中會先從 線程私有 的 monitor 集合 omFreeList 中分配對象,如果 omFreeList 中已經沒有 monitor 對象,則從 JVM全局 的 gFreeList 中分配一批 monitor 到 omFreeList 中;

    步驟 2 、通過CAS嘗試將Mark Word設置為markOopDesc:INFLATING,標識當前鎖正在膨脹中。如果CAS失敗,說明同一時刻其它線程已經將Mark Word設置為markOopDesc:INFLATING,當前線程進行自旋等待膨脹完成。

    步驟 3 、如果CAS成功,設置monitor的各個字段:設置 monitor 的header字段為 displaced mark word ,owner字段為 Lock Record ,obj字段為鎖對象等;

    步驟 4 、設置鎖對象頭的 mark word 為重量級鎖狀態,指向第一步分配的 monitor 對象;

    2.5.2 monitor競爭

    當鎖膨脹 inflate 執行完并返回對應的 ObjectMonitor 時,并不表示該線程競爭到了鎖,真正的鎖競爭發生在 ObjectMonitor::enter 方法中。

    void ATTR ObjectMonitor::enter(TRAPS) {
      Thread * const Self = THREAD ;
      void * cur ;
      // 步驟1
      // owner為null,如果能CAS設置成功,則當前線程直接獲得鎖
      cur = Atomic::cmpxchg_ptr (Self, &_owner, NULL) ;
      if (cur == NULL) {
         ...
         return ;
      }
      // 如果是重入的情況
      if (cur == Self) {
         // TODO-FIXME: check for integer overflow!  BUGID 6557169.
         _recursions ++ ;
         return ;
      }
      // 步驟2
      // 如果當前線程是之前持有輕量級鎖的線程
      // 上節輕量級鎖膨脹將owner指向之前Lock Record的指針
      // 這里利用owner判斷是否第一次進入。
      if (Self->is_lock_owned ((address)cur)) {
        assert (_recursions == 0, "internal state error");
        // 重入計數重置為1
        _recursions = 1 ;
        // 設置owner字段為當前線程
        _owner = Self ;
        OwnerIsThread = 1 ;
        return ;
      }
      ...
      // 步驟3
      // 在調用系統的同步操作之前,先嘗試自旋獲得鎖
      if (Knob_SpinEarly && TrySpin (Self) > 0) {    
         ...
         //自旋的過程中獲得了鎖,則直接返回
         Self->_Stalled = 0 ;
         return ;
      }
      ...
      { 
        ...
        // 步驟4
        for (;;) {
          jt->set_suspend_equivalent();
          // 在該方法中調用系統同步操作
          EnterI (THREAD) ;
          ...
        }
        Self->set_current_pending_monitor(NULL); 
      }
      ...
    }

    步驟 1 、當前是無鎖、鎖重入,簡單操作后返回。

    步驟 2 、當前線程是之前持有輕量級鎖的線程,則為首次進入,設置recursions為1,owner為當前線程,該線程成功獲得鎖并返回。

    步驟 3 、先 自旋嘗試 獲得鎖,盡可能減少同步操作帶來的開銷。

    步驟 4 、調用EnterI方法。

    這里注意,輕量級鎖膨脹成功時,會把owner字段設置為 Lock Record 的指針,并在競爭時判斷。這么做的原因是,假設當前線程A持有鎖對象的鎖,線程B進入同步代碼塊,并把鎖對象升級為重量級鎖。但此時,線程A可能還在執行,并無法感知其持有鎖對象的變化。因此,需要線程B在執行 ObjectMonitor::enter 時,將自己放入到阻塞等列等待。并需要線程A第二次進入、或者退出的時候對monitor進行一些操作,以此保證代碼塊的同步。

    這里有個 自旋 操作,直接看 TrySpin 對應的方法:

    // TrySpin對應的方法
    int ObjectMonitor::TrySpin_VaryDuration (Thread * Self) {
        // Dumb, brutal spin.  Good for comparative measurements against adaptive spinning.
        int ctr = Knob_FixedSpin ;  // 固定自旋次數
        if (ctr != 0) {
            while (--ctr >= 0) {
                if (TryLock (Self) > 0) return 1 ;
                SpinPause () ;
            }
            return 0 ;
        }
        // 上一次自旋次數
        for (ctr = Knob_PreSpin + 1; --ctr >= 0 ; ) {
          if (TryLock(Self) > 0) {  // 嘗試獲取鎖
            // Increase _SpinDuration ...
            // Note that we don't clamp SpinDuration precisely at SpinLimit.
            // Raising _SpurDuration to the poverty line is key.
            int x = _SpinDuration ;
            if (x < Knob_SpinLimit) {
               if (x < Knob_Poverty) x = Knob_Poverty ;
               _SpinDuration = x + Knob_BonusB ;
            }
            return 1 ;
          }
          ...
          ...

    從方法名和注釋可以看出,這就是自適應自旋, 和網上說的輕量級鎖cas失敗會自旋的說法并不一致 。實際上,無論是輕量級鎖cas自旋還是重量級鎖cas自旋,都是在用戶態盡可能減少同步操作帶來的開銷,并沒有太多本質上的區別。到此為止,我們可以再結合上述的內容,整理出如下的狀態轉換圖:

    Java鎖與線程的那些“不可描述”的事兒

    2.5.3 monitor等待

    ObjectMonitor 競爭失敗的線程,通過自旋執行 ObjectMonitor::EnterI 方法等待鎖的釋放,EnterI方法的部分邏輯實現如下:

    void ATTR ObjectMonitor::EnterI (TRAPS) {
            // 嘗試自旋
        if (TrySpin (Self) > 0) {
            ...
            return ;
        }
        ...
        // 將線程封裝成node節點中
        ObjectWaiter node(Self) ;
        Self->_ParkEvent->reset() ;
        node._prev   = (ObjectWaiter *) 0xBAD ;
        node.TState  = ObjectWaiter::TS_CXQ ;
        // 將node節點插入到_cxq隊列的頭部,cxq是一個單向鏈表
        ObjectWaiter * nxt ;
        for (;;) {
            node._next = nxt = _cxq ;
            if (Atomic::cmpxchg_ptr (&node, &_cxq, nxt) == nxt) break ;
            // CAS失敗的話 再嘗試獲得鎖,這樣可以降低插入到_cxq隊列的頻率
            if (TryLock (Self) > 0) {
                ...
                return ;
            }
        }
            ...
    }

    EnterI大致原理:一個 ObjectMonitor 對象包括兩個同步隊列( _cxq 和 _EntryList ) ,以及一個等待隊列 _WaitSet 。cxq、EntryList 、WaitSet都是由ObjectWaiter構成的鏈表結構。其中, _cxq 為單向鏈表, _EntryList 為雙向鏈表。

    Java鎖與線程的那些“不可描述”的事兒

    當一個線程嘗試獲得重量級鎖且沒有競爭到時,該線程會被封裝成一個 ObjectWaiter 對象插入到cxq的隊列的隊首,然后調用 park 函數掛起當前線程,進入BLOCKED狀態。當線程釋放鎖時,會根據喚醒策略,從cxq或EntryList中挑選一個線程 unpark 喚醒。如果線程獲得鎖后調用 Object#wait 方法,則會將線程加入到WaitSet中,進入WAITING或TIMED_WAITING狀態。當被 Object#notify 喚醒后,會將線程從WaitSet移動到cxq或EntryList中去,進入BLOCKED狀態。需要注意的是,當調用一個鎖對象的 wait 或 notify 方法時,若當前鎖的狀態是偏向鎖或輕量級鎖則會先膨脹成重量級鎖。

    2.5.4 monitor釋放

    當某個持有鎖的線程執行完同步代碼塊時,會進行鎖的釋放。在HotSpot中,通過退出monitor的方式實現鎖的釋放,并通知被阻塞的線程,具體實現位于 ObjectMonitor::exit 方法中。

    void ATTR ObjectMonitor::exit(bool not_suspended, TRAPS) {
       Thread * Self = THREAD ;
       // 如果_owner不是當前線程
       if (THREAD != _owner) {
         // 輕量級鎖膨脹上來,還沒調用過enter方法,_owner還指向之前輕量級鎖Lock Record的指針。
         if (THREAD->is_lock_owned((address) _owner)) {
           assert (_recursions == 0, "invariant") ;
           _owner = THREAD ;
           _recursions = 0 ;
           OwnerIsThread = 1 ;
         } else {
           // 異常情況:當前不是持有鎖的線程
           TEVENT (Exit - Throw IMSX) ;
           assert(false, "Non-balanced monitor enter/exit!");
           if (false) {
              THROW(vmSymbols::java_lang_IllegalMonitorStateException());
           }
           return;
         }
       }
       // 重入計數器還不為0,則計數器-1后返回
       if (_recursions != 0) {
         _recursions--;        // this is simple recursive enter
         TEVENT (Inflated exit - recursive) ;
         return ;
       }
       ...
       //這塊開始是喚醒操作
       for (;;) {
         ...
         ...
         ObjectWaiter * w = NULL ;
         // 根據QMode的不同會有不同的喚醒策略,默認為0
         int QMode = Knob_QMode ;
         if (QMode == 2 && _cxq != NULL) {
              ...
              ...

    步驟 1 、處理owner不是當前線程的狀況。這里特指之前持有輕量級鎖的線程,由于沒有調用過enter,owner指向仍為Lock Record的指針,以及其他異常情況。

    步驟 2 、重入計數器還不為0,則計數器-1后返回。

    步驟 3 、喚醒操作。根據不同的策略(由QMode指定),從cxq或EntryList中獲取頭節點,通過 ObjectMonitor::ExitEpilog 方法喚醒該節點封裝的線程,喚醒操作最終由unpark完成。

    根據QMode的不同(默認為0),有不同的處理方式:

    QMode = 0:暫時什么都不做;

    QMode = 2且cxq非空:取cxq隊列隊首的ObjectWaiter對象,調用ExitEpilog方法,該方法會喚醒ObjectWaiter對象的線程,然后立即返回,后面的代碼不會執行了;

    QMode = 3且cxq非空:把cxq隊列插入到EntryList的尾部;

    QMode = 4且cxq非空:把cxq隊列插入到EntryList的頭部;

    只 在QMode=2的時候會提前返回,等于0、3、4的時繼續執行:

    1. 如果EntryList的首元素非空,就取出來調用ExitEpilog方法,該方法會喚醒ObjectWaiter對象的線程,然后立即返回;
    2. 如果EntryList的首元素為空,就將cxq的所有元素放入到EntryList中,然后再從EntryList中取出來隊首元素執行ExitEpilog方法,然后立即返回;
    3. 被喚醒的線程,繼續競爭monitor。

    2.6 本章小節

    本章介紹了Synchronized的底層實現和鎖升級過程。對于鎖升級,再看看本文整理的圖,一圖勝千言:

    Java鎖與線程的那些“不可描述”的事兒

    這里有幾個點可以注意一下:

    1. HotSpot中,只用到了 模板解釋器 ,并沒有用到字節碼解釋器, monitorenter 的實際入口位于templateTable_x86_64.cpp#3667。本文的分析是基于字節碼解釋器的,因此部分結論不能作為實際執行情況。本章的內容只能作為Synchronized鎖升級原理、各類鎖的適用場景的一種 窺探 。
    2. 再次強調,無鎖狀態只能升級為輕量級鎖, 匿名偏向狀態 才能進入到偏向鎖。
    3. 偏向鎖 并不都有利, 其適用于 單個線程重入 的場景,原因為:偏向鎖的撤銷需要進入 safepoint ,開銷較大。需要進入 safepoint 是由于,偏向鎖的撤銷需要對鎖對象的 lock record 進行操作,而 lock record 要到每個線程的棧幀中遍歷尋找。在非safepoint,棧幀是動態的,會引入更多的問題。目前看來,偏向鎖存在的價值是為歷史遺留的Collection類如Vector和HashTable等做優化,遲早藥丸。Java 15中默認不開啟。
    4. 執行Object類的 hashcode 方法,偏向鎖撤銷并且鎖會膨脹為輕量級鎖或者重量鎖。執行Object類的 wait/notify/notifyall 方法,偏向鎖撤銷并膨脹成重量級鎖。
    5. 輕量級鎖適用于 兩個線程的交替執行 場景:線程A進入輕量級鎖,退出同步代碼塊并釋放鎖,會將鎖對象恢復為無鎖狀態;線程B再進入鎖,發現為無鎖狀態,會cas嘗試獲取該鎖對象的輕量級鎖。如果有競爭,則直接膨脹為重量級鎖,沒有自旋操作,詳情看10。
    6. 喚醒策略依賴于 QMode 。重量級鎖獲取失敗后,線程會加入cxq隊列。當線程釋放鎖時,會從cxq或EntryList中挑選一個線程喚醒。線程獲得鎖后調用 Object#wait 方法,則會將線程加入到WaitSet中。當被 Object#notify 喚醒后,會將線程從WaitSet移動到cxq或EntryList中去。
    7. 重量級鎖,會將線程放進等待隊列,等待操作系統調度。而偏向鎖和輕量級鎖,未交由操作系統調度,依然處于用戶態,只是采用CAS無鎖競爭的方式獲取鎖。CAS通過Unsafe類中compareAndSwap方法,jni調用C++方法,通過匯編指令鎖住cpu中的北橋信號。
    8. 許多文章聲稱一個對象關聯到一個monitor,這個說法不夠準確。如果對象已經是重量級鎖了,對象頭的確指向了一個 monitor 。但對于正在膨脹的鎖,會先從 線程私有 的 monitor 集合 omFreeList 中分配對象。如果 omFreeList 中已經沒有 monitor 對象,再從 JVM全局 的 gFreeList 中分配一批 monitor 到 omFreeList 中。
    9. 在編譯期間還有 鎖消除鎖粗化 這兩步鎖優化操作,本章沒做介紹。
    10. 字節碼實現中沒有體現輕量級鎖自旋邏輯。這可能是模板解釋器中的實現,或者是jvm在不同平臺、不同jvm版本的不同實現。但本文分析的字節碼鏈路中沒有發現該邏輯,倒是發現了 重量級鎖會自適應自旋競爭鎖 。因此個人對輕量級鎖自適應自旋的說法存疑,至少hotspot jdk8u字節碼實現中沒有這個邏輯。但兩者都是在用戶態進行自適應自旋,以盡可能減少同步操作帶來的開銷,沒有太多本質上的區別,并不需要特別關心。

    三、線程的實現與狀態轉換

    3.1 線程的實現

    (1)內核線程實現

    內核線程(Kernel-Level Thread,KLT):由 內核 來完成線程切換,內核通過 調度器 對線程進行調度,并負責將線程的任務 映射 到各個處理器上。程序一般不會直接去使用內核線程,而是使用內核線程的一種高級接口—— 輕量級進程 (Light Weight Process,LWP),也就是通常意義上的線程。

    優點:每個LWP都是獨立的調度單元。一個LWP被阻塞,不影響其他LWP。

    缺點:基于KLT,耗資源。線程的創建、析構、同步都需要進行系統調用,頻繁的用戶態、內核態切換。

    Java鎖與線程的那些“不可描述”的事兒

    (2) 用戶線程實現(User Thread,UT)

    廣義:非內核線程,都可認為是用戶線程。(包括LWT,雖然LWT的大多操作都要映射到KLT)

    狹義:完全建立在 用戶空間 的線程庫上,系統內核不能感知線程存在的實現。UT也只感知到掌管這些UT的進程P。

    優點:用戶線程的創建、同步、銷毀和調度完全在用戶態中完成,不需要內核的幫助。

    缺點:線程的創建、銷毀、切換和調度都是用戶必須考慮到問題?!白枞绾翁幚怼薄ⅰ岸嗵幚砥飨到y中如何將線程映射到其他處理器上”這類問題解決起來將會異常困難。

    Java鎖與線程的那些“不可描述”的事兒

    (3) 混合實現 混合模式下, 即存在用戶線程,也存在輕量級進程 。用戶線程的創建、切換、析構等操作依然廉價,可以支持大規模的用戶線程并發,且可以使用內核線程提供的線程調度功能及處理器映射。

    Java鎖與線程的那些“不可描述”的事兒

    線程的實現依賴操作系統支持的線程模型。在主流的操作系統上,hotspot、classic、art等虛擬機默認是 1:1的線程模型。在Solaris平臺上,hotspot支持1:1、N:M兩種線程模型。

    3.2 線程的轉換

    首先明確一點,當我們討論一個線程的狀態,指的是Thread 類中threadStatus的值。

    private volatile int threadStatus = 0;

    該值映射后對應的枚舉為:

    public enum State {
        NEW,
        RUNNABLE,
        BLOCKED,
        WAITING,
        TIMED_WAITING,
        TERMINATED;
    }

    也就是說,線程的具體狀態,看threadStatus就行了。

    NEW

    先要創建Thread 類的對象,才能談其狀態。

    Thread t = new Thread();

    這個時候,線程t就處于新建狀態。但他還不是“線程”。

    RUNNABLE

    然后調用start()方法。

    t.start();

    調用start()后,會執行一個native方法創建內核線程,以linux為例:

    private native void start0();
    
    // 最后走到這
    hotspot/src/os/linux/vm/os_linux.cpp
    pthread_create(...);

    這時候才有一個真正的線程創建出來,并即刻開始運行。這個內核線程與線程t進行1:1的映射。這時候t具備運行能力,進入RUNNABLE狀態。RUNNABLE可以細分為READY和RUNNING,兩者的區別只是是否等待到了資源并開始運行。處于RUNNABLE且未運行的線程,會進入一個就緒隊列中,等待操作系統的調度。處于就緒 隊列的線程都在等待資源,這個資源可以是cpu的時間片、也可以是系統的IO。JVM并不關系READY和RUNNING這兩種狀態,畢竟上述的枚舉類都不對RUNNABLE進行細分。

    TERMINATED

    當一個線程執行完畢(或者調用已經不建議的 stop 方法),線程的狀態就變為 TERMINATED。進入TERMINATED后,線程的狀態不可逆,無法再復活。

    關于BLOCKED、WAITING、TIMED_WAITING

    BLOCKED、WAITING、TIMED_WAITING都是帶有同步語義的狀態,我們先看一下 wait 和 notify 方法的底層實現。wait方法的底層實現:

    void ObjectSynchronizer::wait(Handle obj, jlong millis, TRAPS) {
        ...
        ...
      //獲得Object的monitor對象
      ObjectMonitor* monitor = ObjectSynchronizer::inflate(THREAD, obj());
      DTRACE_MONITOR_WAIT_PROBE(monitor, obj(), THREAD, millis);
      //調用monitor的wait方法
      monitor->wait(millis, true, THREAD);
        ...
    }
    
      inline void ObjectMonitor::AddWaiter(ObjectWaiter* node) {
        ...
      if (_WaitSet == NULL) {
        //_WaitSet為null,就初始化_waitSet
        _WaitSet = node;
        node->_prev = node;
        node->_next = node;
      } else {
        //否則就尾插
        ObjectWaiter* head = _WaitSet ;
        ObjectWaiter* tail = head->_prev;
        assert(tail->_next == head, "invariant check");
        tail->_next = node;
        head->_prev = node;
        node->_next = head;
        node->_prev = tail;
      }
    }

    主要流程:通過object獲得objectMonitor,將Thread封裝成OjectWaiter對象,然后 addWaiter 將它插入 waitSet 中,進入waiting或timed_waiting狀態。最后釋放鎖,并通過底層的 park 方法掛起線程;

    notify方法的底層實現:

    void ObjectSynchronizer::notify(Handle obj, TRAPS) {
        ...
        ...
        ObjectSynchronizer::inflate(THREAD, obj())->notify(THREAD);
    }
        //通過inflate方法得到ObjectMonitor對象
        ObjectMonitor * ATTR ObjectSynchronizer::inflate (Thread * Self, oop object) {
        ...
         if (mark->has_monitor()) {
              ObjectMonitor * inf = mark->monitor() ;
              assert (inf->header()->is_neutral(), "invariant");
              assert (inf->object() == object, "invariant") ;
              assert (ObjectSynchronizer::verify_objmon_isinpool(inf), "monitor is inva;lid");
              return inf 
          }
        ...
          }
        //調用ObjectMonitor的notify方法
        void ObjectMonitor::notify(TRAPS) {
        ...
        //調用DequeueWaiter方法移出_waiterSet第一個結點
        ObjectWaiter * iterator = DequeueWaiter() ;
        //將上面DequeueWaiter尾插入_EntrySet或cxq等操作
        ...
        ...
      }

    通過object獲得objectMonitor,調用objectMonitor的 notify 方法。這個notify最后會走到
    ObjectMonitor::DequeueWaiter 方法,獲取waitSet列表中的第一個ObjectWaiter節點。并根據不同的策略,將取出來的ObjectWaiter節點,加入到 EntryList 或 cxq 中。 notifyAll 的實現類似于 notify ,主要差別在多了個for循環。

    由這里以及上一章2.5.4 monitor釋放小節中可以了解到, notify 和 notifyAll 并不會立即釋放所占有的ObjectMonitor對象,其真正釋放ObjectMonitor的時間點是在執行 monitorexit 指令。

    一旦釋放 ObjectMonitor 對象了, entryList 和 cxq 中的ObjectWaiter節點會依據 QMode 所配置的策略,通過ExitEpilog方法喚醒取出來的ObjectWaiter節點。被喚醒的線程,繼續參與monitor的競爭。若競爭失敗,重新進入BLOCKED狀態,再回顧一下monitor的核心結構。

    Java鎖與線程的那些“不可描述”的事兒

    既然聊到了 wait 和 notify ,那順便也看下 join 、 sleep 和 park 。

    打開 Thread.join() 的源碼:

    public final synchronized void join(long millis) throws InterruptedException {
        ...
      if (millis == 0) {
        while (isAlive()) {
          wait(0);
        }
      } else {
        while (isAlive()) {
          long delay = millis - now;
          if (delay <= 0) {
            break;
          }
          wait(delay);
          now = System.currentTimeMillis() - base;
        }
      }
    }

    join 的本質仍然是 wait() 方法。在使用 join 時,JVM會幫我們隱式調用 notify ,因此我們不需要主動notify喚醒主線程。而 sleep() 方法最終是調用 SleepEvent 對象的park方法:

    int os::sleep(Thread* thread, jlong millis, bool interruptible) {
      //獲取thread中的_SleepEvent對象
      ParkEvent * const slp = thread->_SleepEvent ;
      ...
      //如果是允許被打斷
      if (interruptible) {
        //記錄下當前時間戳,這是時間比較的基準
        jlong prevtime = javaTimeNanos();
        for (;;) {
          //檢查打斷標記,如果打斷標記為true,則直接返回
          if (os::is_interrupted(thread, true)) {
            return OS_INTRPT;
          }
          //線程被喚醒后的當前時間戳
          jlong newtime = javaTimeNanos();
          //睡眠毫秒數減去當前已經經過的毫秒數
          millis -= (newtime - prevtime) / NANOSECS_PER_MILLISEC;
          //如果小于0,那么說明已經睡眠了足夠多的時間,直接返回
          if (millis <= 0) {
            return OS_OK;
          }
          //更新基準時間
          prevtime = newtime;
          //調用_SleepEvent對象的park方法,阻塞線程
          slp->park(millis);
        }
      } else {
        //如果不能打斷,除了不再返回OS_INTRPT以外,邏輯是完全相同的
        for (;;) {
          ...
          slp->park(millis);
          ...
        }
        return OS_OK ;
      }
    }

    Thread.sleep 在jvm層面上是調用thread中 SleepEvent 對象的 park() 方法實現阻塞線程,在此過程中會通過判斷時間戳來決定線程的睡眠時間是否達到了指定的毫秒??吹竭@里,對于 sleep 和 wait 的區別應該會有更深入的理解。

    park 、 unpark 方法也與同步語義無關。每個線程都與一個許可(permit)關聯。 unpark 函數為線程提供permit,線程調用 park 函數則等待并消耗permit。park和unpark方法具體實現比較復雜,這里不展開。到此為止,我們可以整理出如下的線程狀態轉換圖。

    Java鎖與線程的那些“不可描述”的事兒

    3.3 本章小節

    Java 將OS經典五種狀態中的ready和running,統一為 RUNNABLE。將WAITING(即不可能得到 CPU 運行機會的狀態)細分為了 BLOCKED、WAITING、TIMED_WAITING。本章的內容較為簡短,因為部分的內容已囊括在第一章中。

    這里提個會使人困惑的問題:使用socket時,調用accept(),read() 等阻塞方法時,線程處于什么狀態?

    答 案是java線程處于RUNNABLE狀態,OS線程處于WAITING狀態。因為在jvm層面,等待cpu時間片和等待io資源是等價的。

    這里有幾個點可以注意一下:

    1. JVM線程狀態不代表內核線程狀態;
    2. BLOCKED的線程一定處于entryList或cxq中,而處于WAITING和TIMED WAITING的線程,可能是由于執行了sleep或park進入該狀態,不一定在waitSet中。也就是說,處于BLOCKED狀態的線程一定是與同步相關。由這可延伸出,調用 jdk 的 lock并獲取不到鎖的線程,進入的是 WAITING 或 TIMED_WAITING 狀態,而不是BLOCKED狀態。

    版權聲明:本文內容由互聯網用戶自發貢獻,該文觀點僅代表作者本人。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如發現本站有涉嫌抄襲侵權/違法違規的內容, 請發送郵件至 舉報,一經查實,本站將立刻刪除。

    發表評論

    登錄后才能評論
    国产精品区一区二区免费