Redis高并發(fā)架構(gòu)實戰(zhàn)

(1)先來一個小案例作為切入點

/*
這里記為代碼一
*/
@RestController
public class IndexController {

    @Autowired
    private Redisson redisson;
    @Autowired
    private StringRedisTemplate stringRedisTemplate;    //組件spring-boot-starter-data-redis

    @RequestMapping("/deduct_stock")
    public String deductStock(){
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));   //可以理解為jedis.get("stock")
        if(stock > 0){
            int realStock = stock - 1;
            stringRedisTemplate.opsForValue().set("Stock", realStock + "");     //可以理解為jedis.set(key,value)
            System.out.println("扣減成功,剩余庫存:" + realStock);
        }else{
            System.out.println("扣減失敗,庫存不足");
        }

        return "end";
    }
}

然后在redis中搞一個庫存為200


現(xiàn)在很明顯,代碼一 存在線程安全問題,會有可能讀到都是200,然后都減1后設(shè)置為199,就不對了。
很多同學(xué)都會想到加一把鎖

(2)synchronized

/*
代碼二
*/
public String deductStock(){
    synchronized (this){
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));   //可以理解為jedis.get("stock")
        if(stock > 0){
            int realStock = stock - 1;
            stringRedisTemplate.opsForValue().set("Stock", realStock + "");     //可以理解為jedis.set(key,value)
            System.out.println("扣減成功,剩余庫存:" + realStock);
        }else{
            System.out.println("扣減失敗,庫存不足");
        }
    }

    return "end";
}

這樣的確是只能有一個線程執(zhí)行操作,確實是線程安全了。但是它只能在單機環(huán)境下運行,只能鎖住一個tomcat,分布式的時候就不行了。


(3)分布式鎖

這時,應(yīng)該考慮分布式鎖。SETNX(SET if Not eXists)。和set的區(qū)別是:
set tuling A
set tuling B
結(jié)果會是B
setnx tuling A
setnx tuling B
結(jié)果會是A

/*
代碼三
*/
public String deductStock(){

    String lockKey = "product_101";
    //如果返回false,說明redis中有這個key了,不做任何操作。如果返回true說明執(zhí)行這個命令之前沒有這個key,并設(shè)置成功了
    Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "tuling");     //就理解為jedis.setnx(key,value)
    if(!result){
        return "error_code";    //給前端錯誤碼,當(dāng)前系統(tǒng)繁忙,請稍后再試
    }

    int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));   //可以理解為jedis.get("stock")
    if(stock > 0){
        int realStock = stock - 1;
        stringRedisTemplate.opsForValue().set("Stock", realStock + "");     //可以理解為jedis.set(key,value)
        System.out.println("扣減成功,剩余庫存:" + realStock);
    }else{
        System.out.println("扣減失敗,庫存不足");
    }

    stringRedisTemplate.delete(lockKey);

    return "end";
}

redis那邊是單線程操作的,會排隊,只有排隊頭的可以設(shè)置成功,后面的設(shè)置不成功,這樣入門級的分布式鎖設(shè)計完了。大家想想還有沒有問題?
這個時候還是存在問題,當(dāng)獲取到鎖的線程有異常,導(dǎo)致沒法刪除key,就會導(dǎo)致其他線程獲取不到鎖,就算能捕獲異常,但如果是系統(tǒng)掛了呢,運維重啟呢

/*
代碼四
*/
public String deductStock(){

    String lockKey = "product_101";
    //如果返回false,說明redis中有這個key了,不做任何操作。如果返回true說明執(zhí)行這個命令之前沒有這個key,并設(shè)置成功了
    Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "tuling");     //就理解為jedis.setnx(key,value)
    if(!result){
        return "error_code";    //給前端錯誤碼,當(dāng)前系統(tǒng)繁忙,請稍后再試
    }

    try{
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));   //可以理解為jedis.get("stock")
        if(stock > 0){
            int realStock = stock - 1;
            stringRedisTemplate.opsForValue().set("Stock", realStock + "");     //可以理解為jedis.set(key,value)
            System.out.println("扣減成功,剩余庫存:" + realStock);
        }else{
            System.out.println("扣減失敗,庫存不足");
        }
    }finally {
        stringRedisTemplate.delete(lockKey);
    }

    return "end";
}

示例 代碼四 還是存在問題,大家先想想解決方法。

(4)鎖超時

這樣的話,可以加一個超時時間來解決,給key一個超時時間,即使系統(tǒng)掛了,一段時間之后,其他機器還是能正常訪問

/*
代碼五
*/
public String deductStock(){

    String lockKey = "product_101";
    //如果返回false,說明redis中有這個key了,不做任何操作。如果返回true說明執(zhí)行這個命令之前沒有這個key,并設(shè)置成功了
    Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "tuling");     //就理解為jedis.setnx(key,value)
    stringRedisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);

    if(!result){
        return "error_code";    //給前端錯誤碼,當(dāng)前系統(tǒng)繁忙,請稍后再試
    }

    try{
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));   //可以理解為jedis.get("stock")
        if(stock > 0){
            int realStock = stock - 1;
            stringRedisTemplate.opsForValue().set("Stock", realStock + "");     //可以理解為jedis.set(key,value)
            System.out.println("扣減成功,剩余庫存:" + realStock);
        }else{
            System.out.println("扣減失敗,庫存不足");
        }
    }finally {
        stringRedisTemplate.delete(lockKey);
    }

    return "end";
}

大家想想 代碼五 還有問題嗎?

(5)加鎖操作原子性

假設(shè)設(shè)置了key之后,正準(zhǔn)備設(shè)置超時時間,但系統(tǒng)掛了,那還是回到之前的問題了,得保證原子性。應(yīng)該使用setIfAbsent的其他重載方法,有一個是可以同時設(shè)置超時時間的

/*
代碼六
*/
public String deductStock(){

    String lockKey = "product_101";
    //如果返回false,說明redis中有這個key了,不做任何操作。如果返回true說明執(zhí)行這個命令之前沒有這個key,并設(shè)置成功了
//        Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "tuling");     //就理解為jedis.setnx(key,value)
//        stringRedisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);

    Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "tuling", 10, TimeUnit.SECONDS);

    if(!result){
        return "error_code";    //給前端錯誤碼,當(dāng)前系統(tǒng)繁忙,請稍后再試
    }

    try{
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));   //可以理解為jedis.get("stock")
        if(stock > 0){
            int realStock = stock - 1;
            stringRedisTemplate.opsForValue().set("Stock", realStock + "");     //可以理解為jedis.set(key,value)
            System.out.println("扣減成功,剩余庫存:" + realStock);
        }else{
            System.out.println("扣減失敗,庫存不足");
        }
    }finally {
        stringRedisTemplate.delete(lockKey);
    }

    return "end";
}

大家思考一下,代碼六 還有沒有問題?
遇到高并發(fā)的時候,通常執(zhí)行會比較慢,慢執(zhí)行啊,中間sql語句執(zhí)行很慢這樣,假設(shè)執(zhí)行完這個方法需要15秒,當(dāng)線程執(zhí)行了10秒的時候,由于設(shè)置了超時時間是10秒,并且是高并發(fā)場景,這個時候key就刪除了,另外的線程就獲取了鎖

這樣就相當(dāng)于鎖永久失效。雖然把過期時間放大是可以避免,但還是無法徹底解決問題。
本質(zhì)是自己加的鎖被別人解掉了,所以解決就是鎖只能自己解鎖

/*
代碼七
*/
public String deductStock(){

    String lockKey = "product_101";
    //如果返回false,說明redis中有這個key了,不做任何操作。如果返回true說明執(zhí)行這個命令之前沒有這個key,并設(shè)置成功了
//        Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "tuling");     //就理解為jedis.setnx(key,value)
//        stringRedisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);

    String clientId = UUID.randomUUID().toString();

    Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, clientId, 10, TimeUnit.SECONDS);

    if(!result){
        return "error_code";    //給前端錯誤碼,當(dāng)前系統(tǒng)繁忙,請稍后再試
    }

    try{
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));   //可以理解為jedis.get("stock")
        if(stock > 0){
            int realStock = stock - 1;
            stringRedisTemplate.opsForValue().set("Stock", realStock + "");     //可以理解為jedis.set(key,value)
            System.out.println("扣減成功,剩余庫存:" + realStock);
        }else{
            System.out.println("扣減失敗,庫存不足");
        }
    }finally {
        if(clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))){
            stringRedisTemplate.delete(lockKey);
        }
    }

    return "end";
}

代碼七 按上面的例子,鎖是自己過期的,這代碼只是能保證線程1無法刪除線程2的鎖,但線程1和線程2還是同時在跑啊。這個時間還有問題,但是先不管,先放放,因為不是想要引申的內(nèi)容,要繼續(xù)思考這個代碼還有除時間外的什么其他問題?
就是finally中的兩行代碼非原子,寫并發(fā)代碼和寫高并發(fā)代碼時的區(qū)別,應(yīng)該要習(xí)慣性的在代碼之間空幾行,表明這里執(zhí)行有時間差,非原子。
假設(shè)執(zhí)行判斷完clientId確實是等于當(dāng)前線程的value,假設(shè)這時剛好是9.9秒,突然發(fā)生卡頓,但這個if判斷已經(jīng)是true了,正準(zhǔn)備delete的時候,卡頓了,這時已經(jīng)過了10秒,線程2已經(jīng)獲取了鎖,然后線程1執(zhí)行delete,又出問題了,仍然是線程1刪除了線程2的鎖。
怎么處理?

(6)鎖續(xù)命

鎖續(xù)命:通常是這樣處理的,有一個分線程定時任務(wù),用來監(jiān)測線程還是否持有鎖,還持有的就延長鎖的過期時間,例如鎖超時是30秒,那么分線程每10秒判斷一下,線程還是否持有鎖,還持有就更新過期時間,不能說是延長,是按當(dāng)前時間又重新設(shè)置30秒過期,當(dāng)不持有了,定時任務(wù)就結(jié)束,分線程也結(jié)束。
redisson:操作redis的客戶端,有很多分布式功能,其中就有分布式鎖。想起了吧?代碼一 中就已經(jīng)引入了redisson

/*
代碼八
*/
public String deductStock(){

    String lockKey = "product_101";
    //如果返回false,說明redis中有這個key了,不做任何操作。如果返回true說明執(zhí)行這個命令之前沒有這個key,并設(shè)置成功了
//        Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "tuling");     //就理解為jedis.setnx(key,value)
//        stringRedisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);

    /*String clientId = UUID.randomUUID().toString();
    Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, clientId, 10, TimeUnit.SECONDS);
    if(!result){
        return "error_code";    //給前端錯誤碼,當(dāng)前系統(tǒng)繁忙,請稍后再試
    }*/

    RLock redissonLock = redisson.getLock(lockKey);

    try{
        //加鎖
        redissonLock.lock();    //理解為執(zhí)行了setIfAbsent(lockKey, clientId, 30, TimeUnit.SECONDS)
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));   //可以理解為jedis.get("stock")
        if(stock > 0){
            int realStock = stock - 1;
            stringRedisTemplate.opsForValue().set("Stock", realStock + "");     //可以理解為jedis.set(key,value)
            System.out.println("扣減成功,剩余庫存:" + realStock);
        }else{
            System.out.println("扣減失敗,庫存不足");
        }
    }finally {
        redissonLock.unlock();
        /*if(clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))){
            stringRedisTemplate.delete(lockKey);
        }*/
    }

    return "end";
}

redisson加鎖核心lua腳本


KEYS[1]:product_101
ARGV[2]:getLockName(threadId)
ARGV[1]:internalLockLeaseTime(初始化是30秒)
可以看到第250行和251行,就相當(dāng)于 代碼五 中的

Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "tuling");     //就理解為jedis.setnx(key,value)
stringRedisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);

而這兩行代碼是不具有原子性的,線程不安全。Lua腳本可以保證原子性

鎖續(xù)命:

/*
https://github.com/redisson/redisson/blob/redisson-3.6.5/redisson/src/main/java/org/redisson/RedissonLock.java
redisson-3.6.5 RedissonLock.java,其他版本會不太不一樣,但原理應(yīng)該不變吧
*/
    private void scheduleExpirationRenewal(final long threadId) {
        if (expirationRenewalMap.containsKey(getEntryName())) {
            return;
        }

        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {

                RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                            "return 1; " +
                        "end; " +
                        "return 0;",
                          Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));

                future.addListener(new FutureListener<Boolean>() {
                    @Override
                    public void operationComplete(Future<Boolean> future) throws Exception {
                        expirationRenewalMap.remove(getEntryName());
                        if (!future.isSuccess()) {
                            log.error("Can't update lock " + getName() + " expiration", future.cause());
                            return;
                        }

                        if (future.getNow()) {
                            // reschedule itself
                            scheduleExpirationRenewal(threadId);
                        }
                    }
                });
            }
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

        if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
            task.cancel();
        }
    }

它延遲 internalLockLeaseTime / 3 秒執(zhí)行run方法,為它重新設(shè)置expire為 internalLockLeaseTime。
commandExecutor.evalWriteAsync返回了一個future,然后future又添加監(jiān)聽器,最后執(zhí)行當(dāng)前方法scheduleExpirationRenewal(threadId);,就是一直重復(fù)續(xù)命,又再延遲調(diào)用,相當(dāng)于定時任務(wù)。

到目前為止,基本就沒有什么坑了,redisson已經(jīng)是填了很多坑,可以放心使用 代碼八 進行實現(xiàn)。
但是,還有點問題,假設(shè)有多個請求在執(zhí)行redissonLock.lock()加鎖,只能有一個線程在處理,其他都得等著,系統(tǒng)就會很慢,存在性能問題,該怎么優(yōu)化能做到雙十一能用的級別?

(7)zookeeper

redis一般都是有主從架構(gòu)的,基本不會是單機使用


redis主節(jié)點馬上告訴客戶端加鎖成功,線程1就執(zhí)行業(yè)務(wù)代碼邏輯,然后redis準(zhǔn)備把key同步給從節(jié)點時候,結(jié)果主節(jié)點掛了,某個從節(jié)點選舉成為新的Master主節(jié)點,來了個線程3訪問新的主節(jié)點加鎖,線程3就發(fā)現(xiàn)沒有product_101這個key,又可以加鎖成功了,線程1業(yè)務(wù)邏輯還沒執(zhí)行完畢,線程3就開始執(zhí)行,就又出現(xiàn)了問題


主從架構(gòu)鎖失效的問題,可以用zookeeper來實現(xiàn)分布式鎖,和redis類似,是樹形結(jié)構(gòu)。redis更多的實現(xiàn)是AP架構(gòu),zookeeper更多的實現(xiàn)是CAP架構(gòu)。
zookeeper的話,當(dāng)要寫一個key,不是就立即返回成功的,會先把key同步給集群的其他節(jié)點,子節(jié)點會返回同步成功的信息,主節(jié)點會判斷是否已經(jīng)有超過半數(shù)的子節(jié)點都同步成功,這時才告訴客戶端成功了,是為了保證一致性,犧牲了及時響應(yīng),但它能保證那些已經(jīng)同步了子節(jié)點才能成功leader,redis就沒有這個機制,也就是線程3再來請求leader的時候,必然會有key,加鎖就不成功,解決了上述問題。
但如果不使用zookeeper,就是要使用redis來解決呢?(因為redis的并發(fā)比zookeeper高不少)如果要高并發(fā),就用redis,就有上述主從鎖問題,如果要保證健壯性就用zookeeper,但犧牲了并發(fā)數(shù)。

(8)Redlock

硬是要使用redis的話,看看Redlock


redis沒有主從關(guān)系,是對等的,往每個節(jié)點發(fā)送加鎖命令,只有超過半數(shù)的節(jié)點返回成功才認(rèn)為客戶端加鎖成功,和zookeeper原理類似。但這種方式不推薦,原來是一個redis節(jié)點,現(xiàn)在搞多個,要半數(shù)加鎖成功,對我們加鎖性能受一定影響,這樣的話,還不如用zookeeper,因為redlock還有不少問題。

@RequestMapping("/redlock")
public String redlock(){

    String lockKey = "product_101";
    RLock lock1 = redisson.getLock(lockKey);
    RLock lock2 = redisson.getLock(lockKey);
    RLock lock3 = redisson.getLock(lockKey);

    //根據(jù)多個RLock對象構(gòu)件RedissonRedLock
    RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);

    try{
        /*
        * waitTimeout 嘗試獲取鎖的最大等待時間,超過這個數(shù),則認(rèn)為獲取鎖失敗
        * leaseTime   鎖的持有時間,超過這個時間鎖會自動失?。ㄖ祽?yīng)設(shè)置為大于業(yè)務(wù)處理的時間,確保在鎖有效期內(nèi)業(yè)務(wù)能處理完)
        * */
        boolean res = redLock.tryLock(10, 30, TimeUnit.SECONDS);
        if(res){
            //成功獲取鎖,處理業(yè)務(wù)
        }
    }catch (Exception e) {
        throw  new RuntimeException("lock fail");
    }finally {
        //無論如何,最后都要解鎖
        redLock.unlock();
    }

    return "end";
}

回到 代碼八,redissonLock.lock();會導(dǎo)致其他線程等待,也就是分布式鎖把并行請求變串行化執(zhí)行了。那么如何提升分布式鎖性能?

(9)分段鎖

模仿ConcurrentHashMap,分段鎖。
假設(shè)product_101的數(shù)量是200,那么可以分十段,
product_101_1=20
product_101_2=20
product_101_3=20
......
product_101_10=20
200個庫存分10個key存到redis中去,讓每個線程去減不同的段位的庫存,如果不夠減的話就減一下個段位,實現(xiàn)的話有點難,但可以理解這個思想,就不再去擴展了。

(10)緩存數(shù)據(jù)庫雙寫不一致

接下來說redis作為緩存使用的時候,常見問題有:緩存無底洞、緩存穿透、緩存雪崩、緩存失效、熱點key傾斜、熱點key重建、緩存數(shù)據(jù)庫雙寫不一致。
這里針對緩存數(shù)據(jù)庫雙寫不一致的問題說一下。
什么是緩存數(shù)據(jù)庫雙寫不一致?

看上去線程1寫數(shù)據(jù)庫,然后更新緩存,線程2寫數(shù)據(jù)庫,然后更新緩存,沒有什么問題,但如果線程1操作較慢(小卡頓)呢?


有些人就會說,通常不會直接更新緩存,而是把緩存刪掉,即更新就刪緩存,讀數(shù)據(jù)的時候再設(shè)置緩存,的確這樣是比較好,因為每次寫完就更新緩存的話,如果不讀緩存,相當(dāng)于白更新。


但這樣還是有問題


還有什么方法解決?

(11)解決雙寫不一致的方法

延遲雙刪:刪緩存刪兩次,刪除之后sleep(一段時間)后再刪一次
但這種方法只能說是減少,并不能解決問題,并且還讓所有的寫請求都得sleep一段時間

內(nèi)存隊列:用hash運算把操作路由到某個隊列中順序執(zhí)行。是可以解決,但復(fù)雜,寫不好很可能有性能問題或是bug

還有沒有其他解決方法?

問題的本質(zhì)就是操作過程中不是原子性,如果(寫數(shù)據(jù)庫-刪除緩存)是不可分割的操作,(查緩存-查數(shù)據(jù)庫-更新緩存)是不可分割的操作,即在操作前加分布式鎖,操作完后解鎖,所有線程的操作為隊列,把多個并發(fā)執(zhí)行的線程串行化


直接這樣上鎖,性能肯定是有問題的,怎么優(yōu)化?

(12)讀多寫少的情況

直接上分布式鎖會有問題,使用讀寫鎖
讀寫鎖:讀操作加讀鎖,寫操作加寫鎖,讀操作不互斥,寫鎖跟讀鎖、寫鎖跟寫鎖互斥。
由于很多系統(tǒng)都是讀多寫少的情況,所以可以提高性能

/*
代碼九
*/
@RequestMapping("/get_stock")
public String getStock(@RequestParam("clientId") Long clientId) throws InterruptedException{

    String lockKey = "product_101";

    RReadWriteLock readWriteLock = redisson.getReadWriteLock(lockKey);
    RLock rLock = readWriteLock.readLock();

    rLock.lock();
    System.out.println("獲取讀鎖成功:client="+clientId);
    String stock = stringRedisTemplate.opsForValue().get("stock");
    if(StringUtils.isEmpty(stock)){
        System.out.println("查詢數(shù)據(jù)庫庫存為10。。。");
        Thread.sleep(5000);
        stringRedisTemplate.opsForValue().set("stock", 10);
    }
    rLock.unlock();
    System.out.println("釋放讀鎖成功:client="+clientId);

    return "end";
}

@RequestMapping("/update_stock")
public String updateStock(@RequestParam("clientId") Long clientId) throws InterruptedException{

    String lockKey = "product_101";

    RReadWriteLock readWriteLock = redisson.getReadWriteLock(lockKey);
    RLock writeLock = readWriteLock.writeLock();

    writeLock.lock();
    System.out.println("獲取寫鎖成功:client="+clientId);
    System.out.println("修改商品101的數(shù)據(jù)庫庫存為6。。。");
    stringRedisTemplate.delete("stock");
    Thread.sleep(5000);
    writeLock.unlock();
    System.out.println("釋放寫鎖成功:client="+clientId);

    return "end";
}

原理就是lua腳本為每個key設(shè)置一個mode的值來記錄是read還是write。
RedissonWriteLock.java


但如果讀多寫也多的情況呢,怎么處理?
不采用上面的方法,仍然是給緩存過期時間,然后操作的時候直接操作數(shù)據(jù)庫。例如在頁面上看到的庫存,其實很多時候都是和數(shù)據(jù)庫的值不一致的,就是為了實現(xiàn)高并發(fā),又要用數(shù)據(jù)庫又要用緩存,只能犧牲一致性,犧牲一致性其實關(guān)系并不大,想一想,假設(shè)一致的話,加入購物車、下訂單,中間是有時間差的,這個時候可能就沒有了庫存了,對用戶來說是不一致,但對程序來說,程序以及保證了一致,只是意義不大,所以犧牲一致性來提高性能。假設(shè)過期時間是一分鐘,那在這一分鐘內(nèi)可能是不一致,但如果一分鐘后庫存不變,又讀取更新了緩存,這個時候就變一致了,只需要確保在下單的時候是用db的數(shù)據(jù)即可。

(13)讀多寫多的情況

如果是讀多寫多,又要保證緩存數(shù)據(jù)庫一致性,怎么辦?
對讀多寫多的場景,就不應(yīng)該用緩存,直接操作數(shù)據(jù)庫就好了,對吧。
也有方法既使用緩存,又應(yīng)對讀多寫多的場景,中間件canal。后面就學(xué)不著了,需要報課。。。。以后再看看

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容