flink cdc oceanbase 數(shù)據(jù)自定義格式序列化

import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.ververica.cdc.connectors.oceanbase.table.OceanBaseDeserializationSchema;
import com.ververica.cdc.connectors.oceanbase.table.OceanBaseRecord;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.shaded.netty4.io.netty.util.internal.ObjectUtil;
import org.apache.flink.util.Collector;
import org.apache.flink.util.StringUtils;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

import java.util.Map;
import java.util.Objects;


/**
 * @author majiajue
 * @Title:
 * @Description:
 * @date 2023/8/915:16
 */
public class OceanBaseDeserializer implements OceanBaseDeserializationSchema<String> {

    @Override
    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }

    @Override
    public void deserialize(OceanBaseRecord oceanBaseRecord, Collector<String> collector) throws Exception {
        JSONObject jsonObject = new JSONObject();

        //提取數(shù)據(jù)庫(kù)名

        String database = oceanBaseRecord.getSourceInfo().getDatabase();;
        //提取表名
        String tableName = oceanBaseRecord.getSourceInfo().getTable();

        //獲取after數(shù)據(jù)
//        Struct afterStruct = oceanBaseRecord.getLogMessageFieldsAfter();
        JSONObject afterJson = new JSONObject();
        //判斷是否有after
        if (oceanBaseRecord.getLogMessageFieldsAfter() != null) {
            //遍歷oceanBaseRecord.getLogMessageFieldsAfter()
            oceanBaseRecord.getLogMessageFieldsAfter().forEach((k,v)->{
                afterJson.put(k,v);
            });

        }
        JSONObject beforeJson = new JSONObject();
        if(oceanBaseRecord.getLogMessageFieldsBefore() != null){
            oceanBaseRecord.getLogMessageFieldsBefore().forEach((k,v)->{
                beforeJson.put(k,v);
            });
        }
        //獲取before數(shù)據(jù)
//        Struct beforeStruct = value.getStruct("before");
//        JSONObject beforeJson = new JSONObject();
//        //判斷是否有before
//        if (beforeStruct != null) {
//            for (Field field : beforeStruct.schema().fields()) {
//                beforeJson.put(field.name(), beforeStruct.get(field));
//            }
//        }

        //獲得操作類(lèi)型 DELETE UPDATE CREATE

        String type ="" ;
        if(Objects.isNull(oceanBaseRecord.getOpt())){
            //適配initial模式
            type = "insert".toUpperCase();
            if(oceanBaseRecord.getJdbcFields()!=null&oceanBaseRecord.getLogMessageFieldsAfter()==null){
                oceanBaseRecord.getJdbcFields().forEach((k,v)->{
                    afterJson.put(k,v);
                });
            }
        }else{
            type = oceanBaseRecord.getOpt().name();
        }
        // if ("create".toUpperCase().equals(type)) {
        //     type = "insert".toUpperCase();
        // }

        //封裝數(shù)據(jù)到JSONObject
        jsonObject.put("database", database);
        jsonObject.put("tableName", tableName);
        jsonObject.put("after", afterJson);
        jsonObject.put("before", beforeJson);
        jsonObject.put("type", type);
        jsonObject.put("ts",oceanBaseRecord.getSourceInfo().getTimestampS());

        collector.collect(jsonObject.toJSONString());

    }
}

在.deserializer(new OceanBaseDeserializer()) 這樣使用,這樣就會(huì)輸出json方便后續(xù)邏輯處理

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容