前言
在上文《Elasticsearch存儲(chǔ)設(shè)計(jì)與MySQL數(shù)據(jù)同步方案》中提及,常見(jiàn)同步方案有三種。
為什么有這個(gè)框架?抱歉,當(dāng)前應(yīng)該說(shuō)是Demo。
一方面,與團(tuán)隊(duì)商討之后,根據(jù)當(dāng)前情況,使用方案二定時(shí)Select同步方案落地最為適宜。
另一方面,通過(guò)Binlog 實(shí)現(xiàn)Elasticsearch與MySQL的數(shù)據(jù)同步已經(jīng)有canal開(kāi)源實(shí)現(xiàn)了,沒(méi)有必要重復(fù)造輪子。
落地方案
MySQL數(shù)據(jù)表維護(hù)一個(gè)業(yè)務(wù)無(wú)關(guān)的更新時(shí)間,任何更新數(shù)據(jù)表內(nèi)容的操作都會(huì)使該字段的更新;
生產(chǎn)者定時(shí)任務(wù),按一定的時(shí)間周期掃描MySQL數(shù)據(jù)表,把該時(shí)間段內(nèi)發(fā)生變化的數(shù)據(jù)標(biāo)識(shí)(或者主鍵)push到MQ中;
消費(fèi)者定時(shí)任務(wù),負(fù)責(zé)消費(fèi)MQ中的內(nèi)容,組裝數(shù)據(jù)同步到Elasticsearch中。

MySQL負(fù)責(zé)業(yè)務(wù)事物場(chǎng)景的數(shù)據(jù)存儲(chǔ),而Elasticsearch負(fù)責(zé)系統(tǒng)的數(shù)據(jù)檢索和數(shù)據(jù)導(dǎo)出功能。
代碼倉(cāng)庫(kù)
Elasticsearch存儲(chǔ)設(shè)計(jì)
MySQL初始化腳本:mysql-init.sql
ElasticSearch初始化腳本:elasticsearch-init.txt
關(guān)系型數(shù)據(jù)庫(kù)MySQL的關(guān)系表設(shè)計(jì):user, user_extend, user_operation_log
MySQL關(guān)聯(lián)關(guān)系如下:
user : user_extend : user_operation_log = 1 : 1 : n
Elasticsearch mapping 如下:
{
"properties": {
"userId": {
"type": "long"
},
"name": {
"type": "keyword"
},
"age": {
"type": "integer"
},
"email": {
"type": "keyword"
},
"headPortrait": {
"type": "text"
},
"imgs": {
"type": "text"
},
"userOperationLogs": {
"properties": {
"id": {
"type": "long"
},
"userId": {
"type": "long"
},
"desc": {
"type": "text"
}
}
}
}
}
關(guān)鍵項(xiàng)解釋(JAVA)
本系統(tǒng)是基于Java語(yǔ)言實(shí)現(xiàn)的,本文只展示主要代碼,具體實(shí)現(xiàn)請(qǐng)從Github拉取
Jar依賴
| 框架組件 | 版本 |
|---|---|
| spring-boot | 2.5.2 |
| mybatis | 3.3.2 |
| elasticsearch-rest-high-level-client | 7.12.1<br />注意根據(jù)Elasticsearch版本挑選,最好跟你的Elasticsearch版本對(duì)其,特別注意,別夸版本使用jar! |
| Elasticsearch | 7.12.1<br />同上 |
| jedis | 3.6.1 |
主要Class解釋
- ElasticsearchConfig:相關(guān)配置,初始化Elasticsearch操作客戶端:RestHighLevelClient;
- BaseEsPo:是Elasticsearch Type的ORM對(duì)象基類;
- BaseDao:是對(duì)Elasticsearch的Type的curd操作實(shí)現(xiàn)的基類;
- BaseProducer:生產(chǎn)者定時(shí)任務(wù)基類,實(shí)現(xiàn)了定時(shí)從MySQL指定數(shù)據(jù)表中拉取更新的數(shù)據(jù)標(biāo)識(shí)push到MQ中。
- BaseConsumer:消費(fèi)者定時(shí)任務(wù)基類,實(shí)現(xiàn)了根據(jù)MQ中的數(shù)據(jù)標(biāo)識(shí)組合數(shù)據(jù)同步到Elasticsearch的能力;
主要配置項(xiàng)
# 生產(chǎn)者配置示例
## [通用] 是否打印生產(chǎn)內(nèi)容
es.producer.log.enable=0
## 是否啟用
UserInfoProducer.enable=1
## 初始更新時(shí)間
UserExtendProducer.default.updateTimeStart=0
## 一次最大生產(chǎn)數(shù)量
UserExtendProducer.maxSize=1
## 隊(duì)列名稱
UserExtendProducer.queue.redis.key=UserInfoProducer:queue:9
## 上一次處理完成的更新時(shí)間
UserExtendProducer.update.time.start.redis.key=UserInfoProducer:updateTimeStart:9
# 消費(fèi)者配置示例
## [通用] 隊(duì)列堆積預(yù)警開(kāi)關(guān)
es.consumer.queue.size.alarm.enable=0
## [通用] 隊(duì)列堆積預(yù)警閾值,是最大消費(fèi)數(shù)量的倍數(shù)
es.consumer.queue.size.threshold.multiple=0
## 是否啟用
UserInfoConsumer.enable=1
## 一次消費(fèi)最大數(shù)量
UserInfoConsumer.consumeSize=1
編碼實(shí)現(xiàn)(JAVA)
注意:本文僅列出主要代碼,具體詳情請(qǐng)前往Github查看
- Elasticsearch 配置,初始化:RestHighLevelClient
@Slf4j
@Configuration
public class ElasticsearchConfig {
@Value("${elasticsearch.host}")
private String host;
@Value("${elasticsearch.port}")
private String port;
@Value("${elasticsearch.username:}")
private String username;
@Value("${elasticsearch.password:}")
private String password;
/**
* 索引名稱
*/
public static final String ES_INDEX_NAME = "user";
/**
* 類型名稱
*/
public static final String ES_TYPE_NAME = "_doc";
@Bean
public RestHighLevelClient restHighLevelClient() {
log.info("restHighLevelClient init start, host = {}, port = {}, username = {}, password = {}", host, port, username, password);
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
return new RestHighLevelClient(RestClient.builder(new HttpHost(host, Integer.parseInt(port), "http"))
.setHttpClientConfigCallback((HttpAsyncClientBuilder httpAsyncClientBuilder) -> httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider)));
}
}
- BaseEsPo
@Data
public abstract class BaseEsPo {
private String esId;
private Long userId;
public void checkElseThrow() {
if (userId == null || userId == 0) {
throw new RuntimeException("userId不能為空");
}
}
}
- UserInfo 對(duì)應(yīng) user表的內(nèi)容
@Getter
@Setter
@NoArgsConstructor
@ToString(callSuper = true)
public class UserInfo extends BaseEsPo {
private Long id;
private String name;
private Integer age;
private String email;
}
- BaseDao負(fù)責(zé)Es對(duì)象的crud操作
@Slf4j
@Getter
public abstract class BaseDao {
@Resource
protected RestHighLevelClient restHighLevelClient;
protected static final Integer MAX_BULK_SIZE = 100;
public <T extends BaseEsPo> Optional<T> findByUserId(Long userId, Class<T> clazz) {
String className = this.getClass().getSimpleName();
List<T> pos = new ArrayList<>();
SearchRequest request = new SearchRequest();
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(QueryBuilders.matchPhraseQuery("userId", userId));
request.source(searchSourceBuilder);
request.indices(ElasticsearchConfig.ES_INDEX_NAME);
request.types(ElasticsearchConfig.ES_TYPE_NAME);
try {
log.info("className = {}, restHighLevelClient.search, req = {}", className, JSON.toJSONString(request));
SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
log.info("className = {}, restHighLevelClient.search, res = {}", className, JSON.toJSONString(response));
for (SearchHit hit : response.getHits().getHits()) {
String id = hit.getId();
T po = JSON.parseObject(hit.getSourceAsString(), clazz);
po.setEsId(id);
pos.add(po);
}
} catch (IOException e) {
log.error(String.format("className = %s, es查詢異常, userId = %s", className, userId), e);
throw new RuntimeException(e);
}
if (CollectionUtils.isEmpty(pos)) {
return Optional.empty();
}
if (CollectionUtils.size(pos) > 1) {
log.info("es:存在多條數(shù)據(jù):userId:{}", userId);
}
return Optional.of(pos.get(0));
}
public <T extends BaseEsPo> void insertOrUpdate(T po, Class<T> clazz) {
String className = this.getClass().getSimpleName();
log.info("className = {}, insertOrUpdate po = {}", className, JSON.toJSONString(po));
po.checkElseThrow();
Optional<T> optional = findByUserId(po.getUserId(), clazz);
if (optional.isPresent()) {
T poInEs = optional.get();
log.info("className = {}, userId = {}, esId = {}", className, po.getUserId(), poInEs.getEsId());
UpdateRequest request = new UpdateRequest(ElasticsearchConfig.ES_INDEX_NAME, ElasticsearchConfig.ES_TYPE_NAME, poInEs.getEsId());
request.doc(JSON.toJSONString(po), XContentType.JSON);
request.fetchSource(true);
UpdateResponse response;
try {
log.info("className = {}, restHighLevelClient.update req = {}", className, JSON.toJSONString(request));
response = restHighLevelClient.update(request, RequestOptions.DEFAULT);
log.info("className = {}, restHighLevelClient.update res = {}", className, JSON.toJSONString(response));
} catch (ElasticsearchException | IOException e) {
log.info("className = {}, 同步es失敗, userId = {}", className, po.getUserId());
throw new RuntimeException(e);
}
if (response != null) {
if (response.getResult() == DocWriteResponse.Result.CREATED) {
log.info("className = {}, 新增文檔成功, userId = {}", className, po.getUserId());
} else if (response.getResult() == DocWriteResponse.Result.UPDATED) {
log.info("className = {}, 修改文檔成功, userId = {}", className, po.getUserId());
}
}
} else {
IndexRequest request = new IndexRequest(ElasticsearchConfig.ES_INDEX_NAME, ElasticsearchConfig.ES_TYPE_NAME);
request.source(JSON.toJSONString(po), XContentType.JSON);
IndexResponse response;
try {
log.info("className = {}, restHighLevelClient.index req = {}", className, JSON.toJSONString(request));
response = restHighLevelClient.index(request, RequestOptions.DEFAULT);
log.info("className = {}, restHighLevelClient.index res = {}", className, JSON.toJSONString(response));
} catch (ElasticsearchException | IOException e) {
log.info("className = {}, 同步es失敗, userId = {}", className, po.getUserId());
throw new RuntimeException(e);
}
if (response != null) {
if (response.getResult() == DocWriteResponse.Result.CREATED) {
log.info("className = {}, 新增文檔成功, userId = {}", className, po.getUserId());
} else if (response.getResult() == DocWriteResponse.Result.UPDATED) {
log.info("className = {}, 修改文檔成功, userId = {}", className, po.getUserId());
}
}
}
}
}
- UserInfoDao、UserInfoDaoImpl :負(fù)責(zé)UserInfo的curd操作實(shí)現(xiàn)
public interface UserInfoDao {
/**
* 新增或更新用戶基本信息
*
* @param userInfo 用戶基本信息
*/
void insertOrUpdate(UserInfo userInfo);
}
@Slf4j
@Service
public class UserInfoDaoImpl extends BaseDao implements UserInfoDao {
@Override
public void insertOrUpdate(UserInfo userInfo) {
insertOrUpdate(userInfo, UserInfo.class);
}
}
- UserSyncService、BaseUserSyncService、UserInfoSyncServiceImpl:Service層接口與實(shí)現(xiàn)
public interface UserSyncService {
/**
* 根據(jù)userId同步人物畫像
*
* @param userId userId
*/
void syncByUserId(Long userId);
}
public abstract class BaseUserSyncService implements UserSyncService {
@Resource
protected EsSyncMapper esSyncMapper;
@Resource
private RedisUtil redisUtil;
private static final String REDIS_KEY_PREFIX = "UserSyncService:";
/**
* 根據(jù)userId同步
*
* @param userId userId
*/
@Override
public void syncByUserId(Long userId) {
String redisKey = REDIS_KEY_PREFIX + userId;
try {
redisUtil.repetitionRequestLockOrElseThrow(redisKey);
} catch (Exception e) {
throw new EsSyncConcurrentLockException(e);
}
BaseEsPo po = selectOneByUserId(userId);
insertOrUpdate(po);
redisUtil.remove(redisKey);
}
/**
* 根據(jù)userId查詢一條數(shù)據(jù)
*
* @param userId userId
* @return po
*/
protected abstract BaseEsPo selectOneByUserId(Long userId);
/**
* 同步數(shù)據(jù)到es
*
* @param po 對(duì)象
*/
protected abstract void insertOrUpdate(BaseEsPo po);
}
@Service
public class UserInfoSyncServiceImpl extends BaseUserSyncService {
@Resource
private UserInfoDao userInfoDao;
@Override
protected BaseEsPo selectOneByUserId(Long userId) {
return esSyncMapper.selectUserInfoByUserId(userId);
}
@Override
protected void insertOrUpdate(BaseEsPo po) {
userInfoDao.insertOrUpdate((UserInfo) po);
}
}
- 生產(chǎn)者定時(shí)任務(wù)BaseProducer、UserInfoProducer
@Slf4j
public abstract class BaseProducer {
@Resource
private EsSyncMapper esSyncMapper;
@Resource
private RedisUtil redisUtil;
@Value("${es.producer.log.enable:0}")
protected Integer logEnable;
public void produce(Integer enable, String producerQueueRedisKey, String updateTimeStartRedisKey,
String defaultUpdateTimeStart, Integer maxSize, String primaryKeyColumnName,
String uniqueColumnName, String tableName, String updateColumnName) {
long startTime = System.currentTimeMillis();
log.info("produce start");
String taskName = this.getClass().getSimpleName();
if (enable != null && enable == 0) {
log.info("taskName = {}, produce disable end, cost : {} ms", taskName, System.currentTimeMillis() - startTime);
return;
}
String updateTimeStart = redisUtil.get(updateTimeStartRedisKey);
if (StringUtils.isBlank(updateTimeStart)) {
updateTimeStart = defaultUpdateTimeStart;
}
String updateTimeEnd = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
Long latestMaxPrimaryKey = 0L;
List<Long> updatedDataPrimaryKeys;
do {
log.info("taskName = {}, primaryKeyColumnName = {}, tableName = {}, updateColumnName = {}, updateTimeStart = {}, updateTimeEnd = {}, latestMaxPrimaryKey = {}, maxSize = {}",
taskName, primaryKeyColumnName, tableName, updateColumnName, updateTimeStart, updateTimeEnd, latestMaxPrimaryKey, maxSize);
updatedDataPrimaryKeys = esSyncMapper.selectUpdatedDataPrimaryKey(primaryKeyColumnName, tableName, updateColumnName, updateTimeStart, updateTimeEnd, latestMaxPrimaryKey, maxSize);
log.info("taskName = {}, updatedDataPrimaryKeys size = {}", taskName, updatedDataPrimaryKeys.size());
if (CollectionUtils.isNotEmpty(updatedDataPrimaryKeys)) {
if (DelStatus.DELETED.getCode() == logEnable) {
log.info("updatedDataPrimaryKeys = {}", JSON.toJSONString(updatedDataPrimaryKeys));
}
if (CollectionUtils.size(updatedDataPrimaryKeys) >= maxSize) {
log.info("更新數(shù)量超過(guò)閾值, maxSize = {}", maxSize);
}
Map<String, Double> scoreMembers = new HashMap<>(updatedDataPrimaryKeys.size());
List<Map<String, Object>> updatedDatas = esSyncMapper.selectUniqueKeyByPrimaryKey(primaryKeyColumnName, tableName, uniqueColumnName, updateColumnName, updatedDataPrimaryKeys, maxSize);
if (CollectionUtils.isNotEmpty(updatedDatas)) {
for (Map<String, Object> updatedData : updatedDatas) {
Object userIdObject = updatedData.get(uniqueColumnName);
String userIdString = String.valueOf(userIdObject);
Double score = redisUtil.zscore(producerQueueRedisKey, userIdString);
if (score != null) {
log.info("score != null, userIdString = {}", userIdString);
} else {
log.info("score == null, userIdString = {}", userIdString);
LocalDateTime updateTime = (LocalDateTime) updatedData.get(updateColumnName);
Double updateTimeDouble = (double) (updateTime == null ? 0L : updateTime.toEpochSecond(DateUtil.ZONE_OFFSET));
scoreMembers.put(userIdString, updateTimeDouble);
}
}
if (!scoreMembers.isEmpty()) {
if (DelStatus.DELETED.getCode() == logEnable) {
log.info("scoreMembers = {}", JSON.toJSONString(scoreMembers));
}
redisUtil.zadd(producerQueueRedisKey, scoreMembers);
}
}
latestMaxPrimaryKey = updatedDataPrimaryKeys.get(updatedDataPrimaryKeys.size() - 1);
} else {
latestMaxPrimaryKey = 0L;
}
} while (CollectionUtils.size(updatedDataPrimaryKeys) >= maxSize && latestMaxPrimaryKey > 0L);
redisUtil.set(updateTimeStartRedisKey, updateTimeEnd, 60 * 60 * 24 * 30);
log.info("taskName = {}, produce end, cost : {} ms", taskName, System.currentTimeMillis() - startTime);
}
}
@Slf4j
@Configuration
@EnableScheduling
public class UserInfoProducer extends BaseProducer {
@Value("${UserInfoProducer.enable:0}")
private Integer enable;
@Value("${UserInfoProducer.default.updateTimeStart:2021-07-09 14:00:00}")
private String defaultUpdateTimeStart;
@Value("${UserInfoProducer.maxSize:1}")
private Integer maxSize;
@Value("${UserInfoProducer.queue.redis.key:UserInfoProducer:queue:9}")
private String producerQueueRedisKey;
@Value("${UserInfoProducer.update.time.start.redis.key:UserInfoProducer:updateTimeStart:9}")
private String producerUpdateTimeStartRedisKey;
@Scheduled(initialDelay = 10000, fixedDelayString = "${UserInfoProducer.fixedDelayString:1000}")
public void produce() {
try {
String primaryKeyColumnName = "id";
String uniqueColumnName = "userId";
String tableName = "user_extend";
String updateColumnName = "update_time";
produce(enable, producerQueueRedisKey, producerUpdateTimeStartRedisKey,
defaultUpdateTimeStart, maxSize, primaryKeyColumnName,
uniqueColumnName, tableName, updateColumnName);
} catch (Throwable throwable) {
log.error(String.format("UserInfoProducer:發(fā)生未知異常, e = %s", throwable), throwable);
}
}
}
- 消費(fèi)者定時(shí)任務(wù)BaseConsumer、UserInfoConsumer
@Slf4j
public abstract class BaseConsumer {
@Resource
protected RedisUtil redisUtil;
@Value("${es.consumer.queue.size.alarm.enable:0}")
protected Integer queueSizeAlarmEnable;
@Value("${es.consumer.queue.size.threshold.multiple:10}")
protected Integer queueSizeThresholdMultiple;
private final UserSyncService userSyncService;
protected BaseConsumer(UserSyncService userSyncService) {
this.userSyncService = userSyncService;
}
public void consume(Integer enable, Integer consumeSize, String queueName) {
long startTime = System.currentTimeMillis();
String taskName = this.getClass().getSimpleName();
log.info("taskName = {}, consume start, enable = {}, consumeSize = {}, queueName = {}", taskName, enable, consumeSize, queueName);
if (enable != null && enable == 0) {
log.info("taskName = {}, consume end, cost : {} ms", taskName, System.currentTimeMillis() - startTime);
return;
}
long currentSecond = LocalDateTime.now().toEpochSecond(ZoneOffset.of("+8"));
Set<String> uniqueValueString = redisUtil.zrangeByScore(queueName, 0, currentSecond, 0, consumeSize);
log.info("taskName = {}, userIdStrings size = {}, content = {}", taskName, uniqueValueString.size(), JSON.toJSONString(uniqueValueString));
uniqueValueString.removeIf(Objects::isNull);
uniqueValueString.removeIf(item -> Long.parseLong(item) == 0);
for (String uniqueValue : uniqueValueString) {
Long userId = null;
try {
userId = Long.valueOf(uniqueValue);
userSyncService.syncByUserId(userId);
redisUtil.zrem(queueName, uniqueValue);
} catch (Throwable throwable) {
if (throwable instanceof EsSyncConcurrentLockException) {
double updateTimeDouble = LocalDateTime.now().toEpochSecond(DateUtil.ZONE_OFFSET) + 60;
redisUtil.zadd(queueName, updateTimeDouble, uniqueValue);
log.info("并發(fā)同步ES, userId = {}", userId);
} else {
throw throwable;
}
}
}
if (DelStatus.DELETED.getCode() == queueSizeAlarmEnable) {
Long queueSize = redisUtil.zcard(queueName);
if ((long) queueSizeThresholdMultiple * consumeSize < queueSize) {
log.info("taskName = {}, 生產(chǎn)堆積, queueName = {}", taskName, queueName);
}
}
log.info("taskName = {}, consume end, cost : {} ms", taskName, System.currentTimeMillis() - startTime);
}
}
@Slf4j
@Configuration
@EnableScheduling
public class UserInfoConsumer extends BaseConsumer {
@Value("${UserInfoConsumer.enable:0}")
private Integer enable;
@Value("${UserInfoConsumer.consumeSize:1}")
private Integer consumeSize;
@Value("${UserInfoProducer.queue.redis.key:UserInfoProducer:queue:9}")
private String producerQueueRedisKey;
protected UserInfoConsumer(UserInfoSyncServiceImpl userPortraitSyncService) {
super(userPortraitSyncService);
}
@Scheduled(initialDelay = 10000, fixedDelayString = "${UserInfoConsumer.fixedDelayString:1000}")
public void consume() {
try {
super.consume(enable, consumeSize, producerQueueRedisKey);
} catch (Throwable throwable) {
log.error(String.format("UserInfoConsumer:發(fā)生未知異常, e = %s", throwable), throwable);
}
}
}