一種實時數(shù)據(jù)庫
數(shù)據(jù)同步方案及實現(xiàn)
神州信息
董志
1.
概述
變化數(shù)據(jù)捕獲簡稱CDC(Change Data Capture),可以識別提取從上次提取之后發(fā)生變化的數(shù)據(jù),在廣義的概念上,只要能捕獲數(shù)據(jù)變更的技術,我們都可以稱為CDC。通常我們說的 CDC技術主要面向數(shù)據(jù)庫的變更,是一種用于捕獲數(shù)據(jù)庫中數(shù)據(jù)變更的技術。CDC的兩種模式:
(1)同步:同步CDC主要是采用觸發(fā)器記錄新增數(shù)據(jù),基本能夠做到實時增量提取。
(2)異步:異步CDC通過分析已經(jīng)提交的日志記錄來得到增量數(shù)據(jù)信息,有一定的延時,是本文采用的模式。
1.1. 應用場景
(1)數(shù)據(jù)同步,用于備份、容災。
(2)數(shù)據(jù)分發(fā),一個數(shù)據(jù)源分發(fā)給多個下游。
(3)數(shù)據(jù)采集,面向數(shù)據(jù)倉庫/數(shù)據(jù)湖的ETL數(shù)據(jù)集成。
1.2. 主流的實現(xiàn)機制
(1)基于查詢的CDC
a)離線調(diào)度查詢作業(yè),批處理。
b)無法保障數(shù)據(jù)一致性。
c)不保障實時性。
(2)基于日志的CDC
a)實時消費日志,流處理。
b)保障數(shù)據(jù)一致性。
c)提供實時數(shù)據(jù)。
2.
方案對比
主流開源CDC方案對比如下圖所示,主要通過監(jiān)控各數(shù)據(jù)庫的事務日志達到監(jiān)控數(shù)據(jù)變化的目的,根據(jù)對比采用Flink CDC 方案。
圖:多種CDC技術對比
(1) DataX 不支持增量同步,Canal 不支持全量同步。雖然兩者都是非常流行的數(shù)據(jù)同步工具,但在場景支持上仍不完善。
(2) 在全量+增量一體化同步方面,只有Flink CDC、Debezium、Oracle Goldengate 支持較好。
(3) 在架構方面,Apache Flink 是一個非常優(yōu)秀的分布式流處理框架,因此Flink CDC 作為Apache Flink 的一個組件具有非常靈活的水平擴展能力。而DataX 和Canal 是個單機架構,在大數(shù)據(jù)場景下容易面臨性能瓶頸的問題。
(4) 在數(shù)據(jù)加工的能力上,CDC 工具是否能夠方便地對數(shù)據(jù)做一些清洗、過濾、聚合,甚至關聯(lián)操作?Flink CDC 依托強大的Flink SQL 流式計算能力,可以非常方便地對數(shù)據(jù)進行加工。而 Debezium 等則需要通過復雜的 Java 代碼才能完成,使用門檻比較高。
(5) 另外,在生態(tài)方面,這里指的是上下游存儲的支持。Flink CDC 上下游非常豐富,支持對接MySQL、PostgreSQL 等數(shù)據(jù)源,還支持寫入到TiDB、HBase、Kafka、Hudi 等各種存儲系統(tǒng)中,也支持靈活的自定義connector。
因此,不論從性能還是適用范圍上,F(xiàn)link CDC 都可以作為最佳選擇。Flink CDC Connectors是Apache Flink的一組source連接器,使用變更數(shù)據(jù)捕獲 (CDC) 從不同的數(shù)據(jù)庫中獲取變更,F(xiàn)link CDC連接器集成了Debezium作為引擎來捕獲數(shù)據(jù)變化,所以它可以充分發(fā)揮Debezium的能力。目前連接器支持的數(shù)據(jù)庫有:MySQL(5.6+)、PostgreSQL(9.6+)、MongoDB(3.6+)、Oracle(11+)、TiDB(5.1.x+)、SQL Server(2012+)和Oceanbase(3.1.x+)。
3.
數(shù)據(jù)庫事務日志
目前支持的關系型數(shù)據(jù)庫包括:MySQL、Oracle、PostgreSQL、SQL Server,主要采用基于WAL日志方式進行數(shù)據(jù)變化監(jiān)聽。下面介紹各關系型數(shù)據(jù)庫的日志類型:
1. MySQL
(1)Error log錯誤日志記錄了MySQL Server運行過程中所有較為嚴重的警告和錯誤信息,以及MySQL Server每次啟動和關閉的詳細信息。
(2)Binary log二進制日志,記錄著數(shù)據(jù)庫發(fā)生的各種事務信息。
(3)Update log更新日志是MySQL在較老版本上使用的,其功能跟Bin log類似,只不過不是以二進制格式記錄,而是以簡單文本格式記錄內(nèi)容。
(4)Query log查詢?nèi)罩居涗汳ySQL中所有的query。
(5)Slow query log慢查詢?nèi)罩居涗浀木褪菆?zhí)行時間較長的query。
(6)InnoDB redo log,InnoDB是一個事務安全的存儲引擎,其事務安全性主要就是通過在線redo日志和記錄在表空間中的undo信息來保證的。
2. Oracle
(1)系統(tǒng)報警日志alert.log。
(2)跟蹤日志(用戶和進程) trace.log。
(3)重做日志。
a. 在線重做日志:又稱聯(lián)機重做日志,指Oracle以SQL腳本的形式實時記錄數(shù)據(jù)庫的數(shù)據(jù)更新,換句話說,實時保存已執(zhí)行的SQL腳本到在線日志文件中(按特定的格式)。
b. 歸檔重做日志:指當條件滿足時,Oracle將在線重做日志以文件形式保存到硬盤(持久化)。
3. PostgreSQL
(1)pg_log文件夾中的日志一般用來記錄服務器與DB的狀態(tài),如各種Error信息,定位慢查詢SQL,數(shù)據(jù)庫的啟動關閉信息,發(fā)生checkpoint過于頻繁等的告警信息等。
(2)pg_xlog文件夾中的日志是記錄的PostgreSQL的WAL信息,也就是一些事務日志信息(transaction log),記錄著數(shù)據(jù)庫發(fā)生的各種事務信息。
(3)pg_clog文件夾存儲的也是事務日志文件,但與pg_xlog不同的是它記錄的是事務的元數(shù)據(jù)(metadata),這個日志告訴我們哪些事務完成了,哪些沒有完成。
4. SQL Server
(1)交易日志(Transaction logs),是針對數(shù)據(jù)庫改變所做的記錄,它可以記錄針對數(shù)據(jù)庫的任何操作,并將記錄結果保存在獨立的文件中。對于任何每一個交易過程,交易日志都有非常全面的記錄,根據(jù)這些記錄可以將數(shù)據(jù)文件恢復成交易前的狀態(tài)。
4.
功能實現(xiàn)
1. 整體架構
整體架構如下圖所示,首先各源端數(shù)據(jù)庫需要開啟相應的事務日志,F(xiàn)link CDC 任務會監(jiān)聽各數(shù)據(jù)庫的事務變化日志,然后對日志數(shù)據(jù)進行處理,最后將數(shù)據(jù)進行傳輸:
(1)通過訂閱發(fā)布方式將消息發(fā)送到Redis 的Channel 中,通知消費者數(shù)據(jù)庫中的數(shù)據(jù)發(fā)生了變化。
(2)以流的方式存儲到Kafka 的 Topic中,供下游程序進行消費。
(3)抽取到其他關系型數(shù)據(jù)庫中,實現(xiàn) ETL 功能。
圖:整體架構圖
2. 數(shù)據(jù)格式
由于 Flink CDC 內(nèi)部集成了 Debezium 組件,通過 Debezium 進行數(shù)據(jù)采集,所以數(shù)據(jù)格式同 Debezium,監(jiān)聽到的數(shù)據(jù)格式如下圖所示,after 代表變化后的數(shù)據(jù);source 代表源端的數(shù)據(jù)庫相關信息,包括 Debezium 版本號、連接器類型、數(shù)據(jù)庫名、表名等;op 代表操作的類型,此處為讀操作。
圖:數(shù)據(jù)格式
3. 事務日志開啟
(1) MySQL 開啟Bin Log 日志
在 my.cnf 里面加上如下配置,重啟服務。
查看是否開啟Bin Log日志 show variables like 'log_%';
(2) Oracle 開啟歸檔日志
啟用歸檔日志:
檢查歸檔日志是否啟用:
啟動補充日志記錄:
4. 具體代碼實現(xiàn)
DataStream方式監(jiān)聽 MySQL 數(shù)據(jù)庫實現(xiàn):
DataStream方式監(jiān)聽 Oracle 數(shù)據(jù)庫實現(xiàn):
Flink SQL方式監(jiān)聽 MySQL 數(shù)據(jù)庫實現(xiàn):
自定義反序列化器:
自定義Redis Sink: