樂觀鎖和悲觀鎖

參考:
https://www.cnblogs.com/qjjazry/p/6581568.html
https://www.cnblogs.com/xuyuanjia/p/6027414.html

悲觀鎖(Pessimistic Locking):
總是假設(shè)最壞的情況發(fā)生,因此每次在取數(shù)據(jù)的時候就會加鎖,操作完成后才釋放鎖。

樂觀鎖(Optimistic Locking):
假設(shè)大數(shù)據(jù)情況下不會發(fā)生數(shù)據(jù)沖突,因此在取數(shù)據(jù)的時候不加鎖,只有在更新的時候判斷該數(shù)據(jù)是否在此期間被修改過了。

1. Java中的悲觀鎖和樂觀鎖

  • 悲觀鎖
    Java中典型的悲觀鎖就是synchronized。

  • 樂觀鎖
    java.util.concurrent.atomic包下面的原子變量類就使用了樂觀鎖實現(xiàn)。
    Compare and Swap(CAS)是一種樂觀鎖的實現(xiàn)方式。
    CAS基本原理:CAS有三個要素:需要讀寫的內(nèi)存位置(V)、進行比較的預(yù)期原值(A)和擬寫入的新值(B),如果發(fā)現(xiàn)位置V的值和預(yù)期原值A(chǔ)相同,則將新值B更新到V處,否則不處理。

以AtomicInteger為例,看看CAS原理。

public class AtomicInteger extends Number implements java.io.Serializable {
    // setup to use Unsafe.compareAndSwapInt for updates
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;

    static {
        try {
            valueOffset = unsafe.objectFieldOffset
                (AtomicInteger.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }

    private volatile int value;

    /**
     * Atomically increments by one the current value.
     *
     * @return the previous value
     */
    public final int getAndIncrement() {
        return unsafe.getAndAddInt(this, valueOffset, 1);
    }
}

private static final Unsafe unsafe = Unsafe.getUnsafe()
AtomicInteger類使用sun.misc.Unsafe類的compareAndSwapInt()方法執(zhí)行更新操作。Unsafe是JDK的內(nèi)部工具類,通過調(diào)用Native方法(C/C++)可以直接讀寫內(nèi)存、獲得地址偏移值、鎖定或釋放線程等。

private volatile int value
將value聲明為volatile,可保證線程間的數(shù)據(jù)可見性,但不能保證原子性。

private static final long valueOffset
valueOffset表示value字段相對于對象起始地址的偏移量,利用valueOffset = unsafe.objectFieldOffset(AtomicInteger.class.getDeclaredField("value"))方法獲得,該方法首先通過字段名value獲取到該字段的Field,然后調(diào)用Unsafe的objectFieldOffset方法獲取value字段相對于對象起始地址的偏移量。

public final int getAndIncrement()
該法的功能是獲取值并+1,它調(diào)用unsafe.getAndAddInt(this, valueOffset, 1):

public final int getAndAddInt(Object arg0, long arg1, int arg3) {
    int arg4;
    do {
        arg4 = this.getIntVolatile(arg0, arg1);
    } while (!this.compareAndSwapInt(arg0, arg1, arg4, arg4 + arg3));
    return arg4;
}

public native int getIntVolatile(Object arg0, long arg1);

public final native boolean compareAndSwapInt(Object arg0, long arg1, int arg3, int arg4);

compareAndSwapInt()就是樂觀鎖的一種實現(xiàn)(CAS)。
期望值:getIntVolatile()方法獲取主內(nèi)存中的value值;
讀寫的位置:工作內(nèi)存中對象+偏移量;
擬寫入的新值:期望值+1
如果期望值和讀寫位置上值相同,則在讀寫位置上寫入新值,返回true,否則不寫入新值,返回false。

getAndAddInt()方法利用循環(huán),不停的CAS直到寫入成功為止。

CAS的缺陷:ABA問題
線程1從位置V上取出A,線程2從位置V上也取出A;
線程2將A改成了B,然后又改成了A;
線程1根據(jù)CAS操作時,發(fā)現(xiàn)期望值是A,和原值相同,則執(zhí)行成功。
但這個過程存在問題,線程1感知不到A-B-A的過程。

解決:
從Java1.5開始JDK的atomic包里提供了一個類AtomicStampedReference來解決ABA問題。
該類本質(zhì)上是在原來的CAS基礎(chǔ)上加入了一個int類型的stamp(版本號),每次更新的時候檢查當前Reference和期望的Reference是否相同,當前stamp和期望的stamp是否相同,如果相同則更新并返回true,否則啥也不做返回false。

2. 數(shù)據(jù)庫中的悲觀鎖和樂觀鎖

建表(test)并插入一條數(shù)據(jù):

id count
1 0
  • 無鎖

封裝Task:每次查出count值并+1更新

/**
 * 封裝Task(無鎖)
 */
public class TaskWithOutLock implements Runnable {

    @Override
    public void run() {
        int id = 1;
        int count = 0;
        Connection connection = DBUtil.getConnection();
        PreparedStatement psQuery = null;
        PreparedStatement psUpdate = null;

        for (int i = 0; i < 1000; i++) {
            try {
                // 查詢數(shù)據(jù)
                String querySql = "select count from test where id = ?";
                psQuery = connection.prepareStatement(querySql);
                psQuery.setInt(1, id);
                ResultSet rs = psQuery.executeQuery();
                if (rs.next()) {
                    AtomicInteger atomicInteger = new AtomicInteger(rs.getInt("count"));
                    atomicInteger.getAndIncrement();
                    count = atomicInteger.get();
                }

                // 更新數(shù)據(jù)
                String updateSql = "update test set count = ? where id = ?";
                psUpdate = connection.prepareStatement(updateSql);
                psUpdate.setInt(1, count);
                psUpdate.setInt(2, id);
                Integer res = psUpdate.executeUpdate();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }

        try {
            psUpdate.close();
            psQuery.close();
            connection.close();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}

注:由于count變量是各線程私有的,所以不用AtomicInteger也可以。

測試:多線程跑任務(wù)

    public static void main(String[] args) throws InterruptedException {
        int corePoolSize = 2;
        ThreadPoolExecutor pool = new ThreadPoolExecutor(corePoolSize, corePoolSize * 2, 30, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>());

        for (int i = 0; i < 2; i++) {
            TaskWithOutLock task = new TaskWithOutLock();
            pool.execute(task);
        }

        // 線程池不再接收新任務(wù),但線程池中的任務(wù)繼續(xù)執(zhí)行
        pool.shutdown();

        // 阻塞當前線程直到 線程池中所有任務(wù)完成 或 超時 或 當前線程被中斷
        pool.awaitTermination(5, TimeUnit.MINUTES);

        System.out.println("DONE!");
    }

結(jié)果:
數(shù)據(jù)庫中count變?yōu)?993,可見在不加鎖的情況下,計算結(jié)果與期望的2000不同。

  • 悲觀鎖
    封裝Task:
/**
 * 封裝Task(悲觀鎖)
 */
public class TaskPessimisticLock implements Runnable {

    @Override
    public void run() {
        int id = 1;
        int count = 0;
        Connection connection = DBUtil.getConnection();
        PreparedStatement psQuery = null;
        PreparedStatement psUpdate = null;

        for (int i = 0; i < 1000; i++) {
            try {
                // 開啟事務(wù)
                connection.setAutoCommit(false);

                // 查詢數(shù)據(jù),加悲觀鎖
                String querySql = "select count from test where id = ? for update";
                psQuery = connection.prepareStatement(querySql);
                psQuery.setInt(1, id);
                ResultSet rs = psQuery.executeQuery();
                if (rs.next()) {
                    AtomicInteger atomicInteger = new AtomicInteger(rs.getInt("count"));
                    atomicInteger.getAndIncrement();
                    count = atomicInteger.get();
                }

                // 更新數(shù)據(jù)
                String updateSql = "update test set count = ? where id = ?";
                psUpdate = connection.prepareStatement(updateSql);
                psUpdate.setInt(1, count);
                psUpdate.setInt(2, id);
                Integer res = psUpdate.executeUpdate();

                // 提交事務(wù)
                connection.commit();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }

        try {
            psUpdate.close();
            psQuery.close();
            connection.close();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}

(1)開啟事務(wù)
(2)select count from test where id = ? for update打開悲觀鎖
(3)提交事務(wù)
注:悲觀鎖必須在事務(wù)中間,否則不生效。

測試:

    public static void main(String[] args) throws InterruptedException {
        int corePoolSize = 2;
        ThreadPoolExecutor pool = new ThreadPoolExecutor(corePoolSize, corePoolSize * 2, 30, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>());

        for (int i = 0; i < 2; i++) {
            TaskPessimisticLock task = new TaskPessimisticLock();
            pool.execute(task);
        }

        // 線程池不再接收新任務(wù),但線程池中的任務(wù)繼續(xù)執(zhí)行
        pool.shutdown();

        // 阻塞當前線程直到 線程池中所有任務(wù)完成 或 超時 或 當前線程被中斷
        pool.awaitTermination(5, TimeUnit.MINUTES);

        System.out.println("DONE!");
    }

結(jié)果:
count值為2000,與預(yù)期值相同。

  • 樂觀鎖
    修改表結(jié)構(gòu),加入version字段:
id count version
1 0 0

封裝Task:

/**
 * 封裝Task(樂觀鎖)
 */
public class TaskOptimisticLock implements Runnable {

    @Override
    public void run() {
        int id = 1;
        int count = 0;
        int version = 0;
        Connection connection = DBUtil.getConnection();
        PreparedStatement psQuery = null;
        PreparedStatement psUpdate = null;

        for (int i = 0; i < 1000; i++) {
            try {
                // 循環(huán)直到更新完成
                for (;;) {
                    // 查詢數(shù)據(jù)
                    String querySql = "select count,version from test where id = ?";
                    psQuery = connection.prepareStatement(querySql);
                    psQuery.setInt(1, id);
                    ResultSet rs = psQuery.executeQuery();
                    if (rs.next()) {
                        AtomicInteger atomicInteger = new AtomicInteger(rs.getInt("count"));
                        atomicInteger.getAndIncrement();
                        count = atomicInteger.get();

                        AtomicInteger atomicVersion = new AtomicInteger(rs.getInt("version"));
                        version = atomicVersion.get();
                    }

                    // 更新數(shù)據(jù)
                    String updateSql = "update test set count = ?,version = ? where id = ? and version = ?";
                    int newVersion = version + 1;
                    psUpdate = connection.prepareStatement(updateSql);
                    psUpdate.setInt(1, count);
                    psUpdate.setInt(2, newVersion);
                    psUpdate.setInt(3, id);
                    psUpdate.setInt(4, version);
                    Integer res = psUpdate.executeUpdate();

                    if (res > 0) {
                        break;
                    }
                }
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }

        try {
            psUpdate.close();
            psQuery.close();
            connection.close();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}

在更新數(shù)據(jù)的時候判斷version是否正確,如果正確則更新數(shù)據(jù)和version;否則循環(huán)判斷。

測試:

    public static void main(String[] args) throws InterruptedException {
        int corePoolSize = 2;
        ThreadPoolExecutor pool = new ThreadPoolExecutor(corePoolSize, corePoolSize * 2, 30, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>());
 
        for (int i = 0; i < 2; i++) {
            TaskOptimisticLock task = new TaskOptimisticLock();
            pool.execute(task);
        }

        // 線程池不再接收新任務(wù),但線程池中的任務(wù)繼續(xù)執(zhí)行
        pool.shutdown();

        // 阻塞當前線程直到 線程池中所有任務(wù)完成 或 超時 或 當前線程被中斷
        pool.awaitTermination(5, TimeUnit.MINUTES);

        System.out.println("DONE!");
    }

結(jié)果:
count = 2000,version = 2000。和預(yù)期一致。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容