Mysql和Elasticsearch的數(shù)據(jù)同步

Elasticsearch的數(shù)據(jù)來(lái)自Mysql數(shù)據(jù)庫(kù)中,所以當(dāng)我們的MySQL發(fā)生改變時(shí),Elasticsearch也要跟著改變,這時(shí)候我們的es的數(shù)據(jù)就要和mysql同步了

同步實(shí)現(xiàn)思路

常見(jiàn)的數(shù)據(jù)同步方案有三種:

  • 同步調(diào)用
  • 異步通知
  • 監(jiān)聽(tīng)binlog

方案一:

  • hotel-demo對(duì)外提供接口,用來(lái)修改elasticsearch中的數(shù)據(jù)
  • 酒店管理服務(wù)在完成數(shù)據(jù)庫(kù)操作后,直接調(diào)用hotel-demo提供的接口,

也就是說(shuō)MySQL修改完去修改es的數(shù)據(jù)

  • 優(yōu)點(diǎn):實(shí)現(xiàn)簡(jiǎn)單,粗暴
  • 缺點(diǎn):業(yè)務(wù)耦合度高

方案二

  • hotel-admin對(duì)mysql數(shù)據(jù)庫(kù)數(shù)據(jù)完成增、刪、改后,發(fā)送MQ消息

  • hotel-demo監(jiān)聽(tīng)MQ,接收到消息后完成elasticsearch數(shù)據(jù)修改

  • 優(yōu)點(diǎn):低耦合,實(shí)現(xiàn)難度一般

  • 缺點(diǎn):依賴mq的可靠性

  • 這個(gè)實(shí)現(xiàn)方式也就是使用mq進(jìn)行操縱,當(dāng)我們修改MySQL的服務(wù)器修改完以后會(huì)將信息發(fā)送給MQ,然后修改ES的會(huì)進(jìn)行監(jiān)聽(tīng),當(dāng)監(jiān)聽(tīng)到了以后就進(jìn)行修改es的操作

方式三:

  • 給mysql開(kāi)啟binlog功能
  • mysql完成增、刪、改操作都會(huì)記錄在binlog中
  • hotel-demo基于canal監(jiān)聽(tīng)binlog變化,實(shí)時(shí)更新elasticsearch中的內(nèi)容
  • 也就是監(jiān)聽(tīng)mysql,如果MySQL的數(shù)據(jù)有變化那么就直接去改變es的數(shù)據(jù)
  • 優(yōu)點(diǎn):完全解除服務(wù)間耦合
  • 缺點(diǎn):開(kāi)啟binlog增加數(shù)據(jù)庫(kù)負(fù)擔(dān)、實(shí)現(xiàn)復(fù)雜度高

在這里使用的是第二種實(shí)現(xiàn)方案:使用MQ來(lái)寫(xiě)

同步案例代碼

用來(lái)操控ES的代碼(負(fù)責(zé)監(jiān)聽(tīng)MQ隊(duì)列)

/**     * 監(jiān)聽(tīng)增加和修改的隊(duì)列     * 因?yàn)槲覀兊腅S中可以進(jìn)行全量修改,當(dāng)有這個(gè)id的數(shù)據(jù)的時(shí)候那么就先刪除再新增,沒(méi)有這個(gè)數(shù)據(jù)那么就直接新增     * 所以隊(duì)列過(guò)來(lái)的id不管是新增還是修改es都可以判斷如果有這個(gè)數(shù)據(jù)id那么就先刪除再新增,如果沒(méi)有這個(gè)數(shù)據(jù)就直接新增,所以新增和修改他倆用一個(gè)方法就行了     *     * @param id 隊(duì)列中需要進(jìn)行操作的id     */    @RabbitListener(bindings = @QueueBinding(            value = @Queue(name = MqConstants.HOTEL_INSERT_QUEUE),            exchange = @Exchange(name = MqConstants.HOTEL_EXCHANGE, type = ExchangeTypes.DIRECT),            key = MqConstants.HOTEL_INSERT_KEY    ))    public void insertAndUpdate(Long id) {        if (id == null) {            return;        }        log.info("入?yún)ⅲ簕}", id);        //監(jiān)聽(tīng)到以后拿到id去數(shù)據(jù)庫(kù)查詢整個(gè)數(shù)據(jù)        Hotel hotel = iHotelService.getById(id);        //因?yàn)椴榈膍ysql數(shù)據(jù)和es的數(shù)據(jù)有些不一樣所以需要做轉(zhuǎn)換        HotelDoc hotelDoc = new HotelDoc(hotel);        //轉(zhuǎn)換為json        String hotelDocJson = JSON.toJSONString(hotelDoc);        System.out.println("hotelDocJson = " + hotelDocJson);        //發(fā)送到ES中,因?yàn)槲覀兊腅S中可以進(jìn)行全量修改,當(dāng)有這個(gè)id的數(shù)據(jù)的時(shí)候那么就先刪除再新增,沒(méi)有這個(gè)數(shù)據(jù)那么就直接新增        //創(chuàng)建請(qǐng)求語(yǔ)義對(duì)象 添加文檔數(shù)據(jù)        IndexRequest request = new IndexRequest("hotel");        //這個(gè)新增就是PUT在es中        request.id(hotel.getId().toString()).source(hotelDocJson, XContentType.JSON);        //發(fā)送請(qǐng)求        try {            IndexResponse response = client.index(request, RequestOptions.DEFAULT);            RestStatus status = response.status();            log.info("響應(yīng)結(jié)果為:{}", status);        } catch (IOException e) {            e.printStackTrace();        }    }    /**     * 監(jiān)聽(tīng)刪除隊(duì)列     *     * @param id 隊(duì)列中需要進(jìn)行操作的id     */    @RabbitListener(bindings = @QueueBinding(            value = @Queue(name = MqConstants.HOTEL_DELETE_QUEUE),            exchange = @Exchange(name = MqConstants.HOTEL_EXCHANGE, type = ExchangeTypes.DIRECT),            key = MqConstants.HOTEL_DELETE_KEY    ))    public void deleteByMqId(Long id) {        if (id == null) {            return;        }        log.info("入?yún)ⅲ簕}", id);        //先創(chuàng)建語(yǔ)義對(duì)象,直接就可以給里面寫(xiě)id的字段        DeleteRequest request = new DeleteRequest("hotel", id.toString());        //發(fā)送請(qǐng)求        try {            DeleteResponse response = client.delete(request, RequestOptions.DEFAULT);            RestStatus status = response.status();            log.info("響應(yīng)結(jié)果為:{}", status);        } catch (IOException e) {            e.printStackTrace();        }

用來(lái)操作MySQL的代碼:

@RestController@RequestMapping("hotel")public class HotelController {    //注入和RabbitMQ鏈接    @Autowired    private RabbitTemplate rabbitTemplate;    @Autowired    private IHotelService hotelService;    @GetMapping("/{id}")    public Hotel queryById(@PathVariable("id") Long id) {        return hotelService.getById(id);    }    @GetMapping("/list")    public PageResult hotelList(            @RequestParam(value = "page", defaultValue = "1") Integer page,            @RequestParam(value = "size", defaultValue = "1") Integer size    ) {        Page<Hotel> result = hotelService.page(new Page<>(page, size));        return new PageResult(result.getTotal(), result.getRecords());    }    @PostMapping    public void saveHotel(@RequestBody Hotel hotel) {        hotelService.save(hotel);        rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_INSERT_KEY, hotel.getId());    }    @PutMapping()    public void updateById(@RequestBody Hotel hotel) {        if (hotel.getId() == null) {            throw new InvalidParameterException("id不能為空");        }        hotelService.updateById(hotel);        rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_INSERT_KEY, hotel.getId());    }    @DeleteMapping("/{id}")    public void deleteById(@PathVariable("id") Long id) {        hotelService.removeById(id);        rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_DELETE_KEY, id);    }}

當(dāng)然也是可以不使用注解來(lái)寫(xiě),直接在配置文件中寫(xiě)隊(duì)列綁定的交換機(jī)

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容