Flink CDC Doris#
Flink Doris Connector / Flink CDC
Docker#
doris.ymlDoris: http://localhost:8030/
User: root | admin / 密码为空
flink.ymlFlink: http://localhost:8081/
Flink Doris Connector#
Flink Doris Connector是通过 Flink 来读取和写入数据到 Doris 集群,同时集成了FlinkCDC,可以更便捷的对上游 MySQL 等数据库进行整库同步。
#wget https://repository.apache.org/content/repositories/releases/org/apache/doris/flink-doris-connector-1.20/24.0.1/flink-doris-connector-1.20-24.0.1.jar
#wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-mysql-cdc/3.5.0/flink-sql-connector-mysql-cdc-3.5.0.jar
#wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.27/mysql-connector-java-8.0.27.jar
docker cp flink-doris-connector-1.20-24.0.1.jar jobmanager:/opt/flink/lib
docker cp flink-sql-connector-mysql-cdc-3.5.0.jar jobmanager:/opt/flink/lib
docker cp mysql-connector-java-8.0.27.jar jobmanager:/opt/flink/lib
docker cp flink-doris-connector-1.20-24.0.1.jar taskmanager:/opt/flink/lib
docker cp flink-sql-connector-mysql-cdc-3.5.0.jar taskmanager:/opt/flink/lib
docker cp mysql-connector-java-8.0.27.jar taskmanager:/opt/flink/lib
MySQL 整库同步#
docker exec -it jobmanager bash
bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.20-24.0.1.jar \
mysql-sync-database \
--database test_db \
--mysql-conf hostname=127.0.0.1 \
--mysql-conf port=3306 \
--mysql-conf username=root \
--mysql-conf password=123456 \
--mysql-conf database-name=mysql_db \
--including-tables "tbl1|test.*" \
--sink-conf fenodes=127.0.0.1:8030 \
--sink-conf username=root \
--sink-conf password=123456 \
--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
--sink-conf sink.label-prefix=label \
--table-conf replication_num=1
Oracle 整库同步#
wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-oracle-cdc/3.5.0/flink-sql-connector-oracle-cdc-3.5.0.jar
wget https://repo1.maven.org/maven2/com/oracle/ojdbc/ojdbc8/19.3.0.0/ojdbc8-19.3.0.0.jar
bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
./lib/flink-doris-connector-1.20-24.0.1.jar \
oracle-sync-database \
--database test_db \
--oracle-conf hostname=127.0.0.1 \
--oracle-conf port=1521 \
--oracle-conf username=admin \
--oracle-conf password="password" \
--oracle-conf database-name=XE \
--oracle-conf schema-name=ADMIN \
--including-tables "tbl1|tbl2" \
--sink-conf fenodes=127.0.0.1:8030 \
--sink-conf username=root \
--sink-conf password=\
--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
--sink-conf sink.label-prefix=label \
--table-conf replication_num=1
使用原理#

Flink CDC#
Flink CDC 是一个基于流的数据集成工具,旨在为用户提供一套功能更加全面的编程接口(API)。 该工具使得用户能够以 YAML 配置文件的形式,优雅地定义其 ETL(Extract, Transform, Load)流程,并协助用户自动化生成定制化的 Flink 算子并且提交 Flink 作业。
Streaming ELT 同步 MySQL 到 Doris#
通过 Flink CDC CLI 提交任务#
基于 Flink CDC 快速构建 MySQL 到 Doris 的 Streaming ELT 作业,包含整库同步、表结构变更同步和分库分表同步的功能。
docker cp flink-cdc-3.5.0-bin.tar.gz jobmanager:/tmp
docker exec -it jobmanager bash
mkdir -p /opt/flink-cdc
tar -xzvf /tmp/flink-cdc-3.5.0-bin.tar.gz -C /tmp/ && \
mv /tmp/flink-cdc-3.5.0/* /opt/flink-cdc/ && \
mv /opt/flink-cdc/lib/flink-cdc-dist-3.5.0.jar /opt/flink-cdc/lib/flink-cdc-dist.jar && \
rm -rf /tmp/flink-cdc-3.5.0 /tmp/flink-cdc-3.5.0-bin.tar.gz
docker cp flink-cdc-pipeline-connector-doris-3.5.0.jar jobmanager:/opt/flink-cdc/lib
docker cp flink-cdc-pipeline-connector-mysql-3.5.0.jar jobmanager:/opt/flink-cdc/lib
docker cp mysql-connector-java-8.0.27.jar jobmanager:/opt/flink/lib
vi mysql-to-doris.yaml
source:
type: mysql
hostname: localhost
port: 3306
username: root
password: 123456
tables: app_db.\.*
server-id: 5400-5404
server-time-zone: Asia/Shanghai
sink:
type: doris
fenodes: 127.0.0.1:8030
username: root
password: ""
table.create.properties.light_schema_change: true
table.create.properties.replication_num: 1
pipeline:
name: Sync MySQL Database to Doris
parallelism: 1
其中: source 中的 tables: app_db.\.* 通过正则匹配同步 app_db 下的所有表。 sink 添加 table.create.properties.replication_num 参数是由于 Docker 镜像中只有一个 Doris BE 节点。
bash bin/flink-cdc.sh mysql-to-doris.yaml
Flink lib#
注:没有flink-doris-connector-1.20-24.0.1.jar和flink-sql-connector-mysql-cdc-3.5.0.jar
/opt/flink/lib
flink-cep-1.20.2.jar
flink-connector-files-1.20.2.jar
flink-csv-1.20.2.jar
flink-dist-1.20.2.jar
flink-json-1.20.2.jar
flink-scala_2.12-1.20.2.jar
flink-table-api-java-uber-1.20.2.jar
flink-table-planner-loader-1.20.2.jar
flink-table-runtime-1.20.2.jar
log4j-1.2-api-2.24.3.jar
log4j-api-2.24.3.jar
log4j-core-2.24.3.jar
log4j-slf4j-impl-2.24.3.jar
mysql-connector-java-8.0.27.jar
Flink CDC lib#
/opt/flink-cdc/lib
flink-cdc-dist.jar
flink-cdc-pipeline-connector-doris-3.5.0.jar
flink-cdc-pipeline-connector-mysql-3.5.0.jar
Runtime Environment#
Screenshots#

