Elasticsearch的數(shù)據(jù)來(lái)自Mysql數(shù)據(jù)庫(kù)中,所以當(dāng)我們的MySQL發(fā)生改變時(shí),Elasticsearch也要跟著改變,這時(shí)候我們的es的數(shù)據(jù)就要和mysql同步了
同步實(shí)現(xiàn)思路
常見(jiàn)的數(shù)據(jù)同步方案有三種:
- 同步調(diào)用
- 異步通知
- 監(jiān)聽(tīng)binlog
方案一:

- hotel-demo對(duì)外提供接口,用來(lái)修改elasticsearch中的數(shù)據(jù)
- 酒店管理服務(wù)在完成數(shù)據(jù)庫(kù)操作后,直接調(diào)用hotel-demo提供的接口,
也就是說(shuō)MySQL修改完去修改es的數(shù)據(jù)
- 優(yōu)點(diǎn):實(shí)現(xiàn)簡(jiǎn)單,粗暴
- 缺點(diǎn):業(yè)務(wù)耦合度高
方案二

hotel-admin對(duì)mysql數(shù)據(jù)庫(kù)數(shù)據(jù)完成增、刪、改后,發(fā)送MQ消息
hotel-demo監(jiān)聽(tīng)MQ,接收到消息后完成elasticsearch數(shù)據(jù)修改
優(yōu)點(diǎn):低耦合,實(shí)現(xiàn)難度一般
缺點(diǎn):依賴mq的可靠性
這個(gè)實(shí)現(xiàn)方式也就是使用mq進(jìn)行操縱,當(dāng)我們修改MySQL的服務(wù)器修改完以后會(huì)將信息發(fā)送給MQ,然后修改ES的會(huì)進(jìn)行監(jiān)聽(tīng),當(dāng)監(jiān)聽(tīng)到了以后就進(jìn)行修改es的操作
方式三:

- 給mysql開(kāi)啟binlog功能
- mysql完成增、刪、改操作都會(huì)記錄在binlog中
- hotel-demo基于canal監(jiān)聽(tīng)binlog變化,實(shí)時(shí)更新elasticsearch中的內(nèi)容
- 也就是監(jiān)聽(tīng)mysql,如果MySQL的數(shù)據(jù)有變化那么就直接去改變es的數(shù)據(jù)
- 優(yōu)點(diǎn):完全解除服務(wù)間耦合
- 缺點(diǎn):開(kāi)啟binlog增加數(shù)據(jù)庫(kù)負(fù)擔(dān)、實(shí)現(xiàn)復(fù)雜度高
在這里使用的是第二種實(shí)現(xiàn)方案:使用MQ來(lái)寫(xiě)
同步案例代碼
用來(lái)操控ES的代碼(負(fù)責(zé)監(jiān)聽(tīng)MQ隊(duì)列)
/** * 監(jiān)聽(tīng)增加和修改的隊(duì)列 * 因?yàn)槲覀兊腅S中可以進(jìn)行全量修改,當(dāng)有這個(gè)id的數(shù)據(jù)的時(shí)候那么就先刪除再新增,沒(méi)有這個(gè)數(shù)據(jù)那么就直接新增 * 所以隊(duì)列過(guò)來(lái)的id不管是新增還是修改es都可以判斷如果有這個(gè)數(shù)據(jù)id那么就先刪除再新增,如果沒(méi)有這個(gè)數(shù)據(jù)就直接新增,所以新增和修改他倆用一個(gè)方法就行了 * * @param id 隊(duì)列中需要進(jìn)行操作的id */ @RabbitListener(bindings = @QueueBinding( value = @Queue(name = MqConstants.HOTEL_INSERT_QUEUE), exchange = @Exchange(name = MqConstants.HOTEL_EXCHANGE, type = ExchangeTypes.DIRECT), key = MqConstants.HOTEL_INSERT_KEY )) public void insertAndUpdate(Long id) { if (id == null) { return; } log.info("入?yún)ⅲ簕}", id); //監(jiān)聽(tīng)到以后拿到id去數(shù)據(jù)庫(kù)查詢整個(gè)數(shù)據(jù) Hotel hotel = iHotelService.getById(id); //因?yàn)椴榈膍ysql數(shù)據(jù)和es的數(shù)據(jù)有些不一樣所以需要做轉(zhuǎn)換 HotelDoc hotelDoc = new HotelDoc(hotel); //轉(zhuǎn)換為json String hotelDocJson = JSON.toJSONString(hotelDoc); System.out.println("hotelDocJson = " + hotelDocJson); //發(fā)送到ES中,因?yàn)槲覀兊腅S中可以進(jìn)行全量修改,當(dāng)有這個(gè)id的數(shù)據(jù)的時(shí)候那么就先刪除再新增,沒(méi)有這個(gè)數(shù)據(jù)那么就直接新增 //創(chuàng)建請(qǐng)求語(yǔ)義對(duì)象 添加文檔數(shù)據(jù) IndexRequest request = new IndexRequest("hotel"); //這個(gè)新增就是PUT在es中 request.id(hotel.getId().toString()).source(hotelDocJson, XContentType.JSON); //發(fā)送請(qǐng)求 try { IndexResponse response = client.index(request, RequestOptions.DEFAULT); RestStatus status = response.status(); log.info("響應(yīng)結(jié)果為:{}", status); } catch (IOException e) { e.printStackTrace(); } } /** * 監(jiān)聽(tīng)刪除隊(duì)列 * * @param id 隊(duì)列中需要進(jìn)行操作的id */ @RabbitListener(bindings = @QueueBinding( value = @Queue(name = MqConstants.HOTEL_DELETE_QUEUE), exchange = @Exchange(name = MqConstants.HOTEL_EXCHANGE, type = ExchangeTypes.DIRECT), key = MqConstants.HOTEL_DELETE_KEY )) public void deleteByMqId(Long id) { if (id == null) { return; } log.info("入?yún)ⅲ簕}", id); //先創(chuàng)建語(yǔ)義對(duì)象,直接就可以給里面寫(xiě)id的字段 DeleteRequest request = new DeleteRequest("hotel", id.toString()); //發(fā)送請(qǐng)求 try { DeleteResponse response = client.delete(request, RequestOptions.DEFAULT); RestStatus status = response.status(); log.info("響應(yīng)結(jié)果為:{}", status); } catch (IOException e) { e.printStackTrace(); }
用來(lái)操作MySQL的代碼:
@RestController@RequestMapping("hotel")public class HotelController { //注入和RabbitMQ鏈接 @Autowired private RabbitTemplate rabbitTemplate; @Autowired private IHotelService hotelService; @GetMapping("/{id}") public Hotel queryById(@PathVariable("id") Long id) { return hotelService.getById(id); } @GetMapping("/list") public PageResult hotelList( @RequestParam(value = "page", defaultValue = "1") Integer page, @RequestParam(value = "size", defaultValue = "1") Integer size ) { Page<Hotel> result = hotelService.page(new Page<>(page, size)); return new PageResult(result.getTotal(), result.getRecords()); } @PostMapping public void saveHotel(@RequestBody Hotel hotel) { hotelService.save(hotel); rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_INSERT_KEY, hotel.getId()); } @PutMapping() public void updateById(@RequestBody Hotel hotel) { if (hotel.getId() == null) { throw new InvalidParameterException("id不能為空"); } hotelService.updateById(hotel); rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_INSERT_KEY, hotel.getId()); } @DeleteMapping("/{id}") public void deleteById(@PathVariable("id") Long id) { hotelService.removeById(id); rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_DELETE_KEY, id); }}
當(dāng)然也是可以不使用注解來(lái)寫(xiě),直接在配置文件中寫(xiě)隊(duì)列綁定的交換機(jī)