中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當(dāng)前位置: 首頁(yè) > news >正文

長(zhǎng)春怎么做網(wǎng)站建站流程新手搭建網(wǎng)站第一步

長(zhǎng)春怎么做網(wǎng)站,建站流程新手搭建網(wǎng)站第一步,企業(yè)網(wǎng)站建設(shè)公,冷門卻好聽(tīng)的公司名稱隨著越來(lái)越多用戶使用 DolphinDB,各式各樣的應(yīng)用場(chǎng)景對(duì) DolphinDB 的數(shù)據(jù)接入提出了不同的要求。部分用戶需要將 Oracle 11g 的數(shù)據(jù)實(shí)時(shí)同步到 DolphinDB 中來(lái),以滿足在 DolphinDB 中實(shí)時(shí)使用數(shù)據(jù)的需求。本篇教程將介紹使用 Debezium 來(lái)實(shí)時(shí)捕獲和發(fā)布 …

隨著越來(lái)越多用戶使用 DolphinDB,各式各樣的應(yīng)用場(chǎng)景對(duì) DolphinDB 的數(shù)據(jù)接入提出了不同的要求。部分用戶需要將 Oracle 11g 的數(shù)據(jù)實(shí)時(shí)同步到 DolphinDB 中來(lái),以滿足在 DolphinDB 中實(shí)時(shí)使用數(shù)據(jù)的需求。本篇教程將介紹使用 Debezium 來(lái)實(shí)時(shí)捕獲和發(fā)布 Oracle 11g 的數(shù)據(jù)庫(kù)更改事件,并完成 Oracle 到 DolphinDB 的實(shí)時(shí)數(shù)據(jù)同步的完整解決方案

1. Debezium 同步方案概述

Debezium 是一個(gè)開源的分布式平臺(tái),用于實(shí)時(shí)捕獲和發(fā)布數(shù)據(jù)庫(kù)更改事件。它可以將關(guān)系型數(shù)據(jù)庫(kù)(如MySQL、PostgreSQL、Oracle 等)的變更事件轉(zhuǎn)化為可觀察的流數(shù)據(jù),以供其他應(yīng)用程序?qū)崟r(shí)消費(fèi)和處理。本文中將采用 Debezium 與 Kafka 組合的方式來(lái)實(shí)現(xiàn)從 Oracle11g 到 DolphinDB 的數(shù)據(jù)同步。

Kafka +Debezium 的數(shù)據(jù)同步方案需要部署 4 個(gè)服務(wù),如下所示:

  • ZooKeeper:kafka 的依賴部署。
  • Kafka:數(shù)據(jù)存儲(chǔ)。
  • Kafka-Connect:用于接入數(shù)據(jù)插件 source-connetor, sink-connector 的框架,可以提供高可用,也可以部署單實(shí)例版本。
  • Schema-Registry?:提供實(shí)時(shí)同步的數(shù)據(jù)的元數(shù)據(jù)注冊(cè)功能 ,支持?jǐn)?shù)據(jù)序列化。

基于 Debezium 的數(shù)據(jù)架構(gòu)圖如下:

Debezium+Kafka 實(shí)時(shí)同步 Oracle 11g 數(shù)據(jù)到 DolphinDB 架構(gòu)圖

接下來(lái),本文將逐一介紹這些服務(wù)的下載、安裝,以及配置數(shù)據(jù)同步任務(wù)。

2. 部署 Kafka 單實(shí)例實(shí)時(shí)數(shù)據(jù)存儲(chǔ)

為方便用戶快速?gòu)?fù)現(xiàn)教程內(nèi)容,本文將采用 Kafka 單實(shí)例服務(wù)部署方案以介紹數(shù)據(jù)同步功能。部署環(huán)境我們采用 Linux 環(huán)境。

2.1 部署準(zhǔn)備

首先下載程序包,Zookeeper(開源), Kafka (開源), Confluent (社區(qū)版)建議到到官網(wǎng)下載最新穩(wěn)定版本。

并將下面 4 個(gè)軟件包放到?/opt?目錄下。(軟件、配置、數(shù)據(jù)路徑文件較多。注意:初次試用請(qǐng)盡量保持路徑一致。)

  • jdk-17.0.7_linux-x64_bin.tar.gz

https://download.oracle.com/java/17/latest/jdk-17_linux-x64_bin.tar.gz

  • apache-zookeeper-3.7.1-bin.tar.gz

Apache ZooKeeper

  • kafka_2.13-3.4.1.tgz (下載scala 2.13版本)

Apache Kafka

  • confluent-community-7.4.0.tar.gz

https://www.confluent.io/installation/

Confluent 是 Kafka 相關(guān)的商業(yè)公司。下載 Confluent 時(shí)選擇 self-managed,然后錄入信息,再點(diǎn)擊 start free 后方能下載。注意,下載 community 版本即可滿足需要,本教程只需要其中的 schema-registry 包。如需使用更多功能,也可下載正式版。正式版包括了 Zookeeper 、Kafka 以及管理、監(jiān)控 Kafka 的更多功能。

下載完以上 4 個(gè)程序包后,即可開始部署。

2.2 部署 Zookeeper

2.2.1 基礎(chǔ)準(zhǔn)備

第一步:創(chuàng)建部署用戶

創(chuàng)建用戶 kafka,授予 sudo 免密權(quán)限(需自行設(shè)置)。然后切換到 kafka 用戶來(lái)進(jìn)行操作(以下均為 kafka 用戶操作)。

useradd kafka
su kafka

第二步:安裝部署 Java 環(huán)境

由于整套架構(gòu)涉及的程序都是基于 java 虛擬機(jī)運(yùn)行的。所以必須安裝 java 運(yùn)行環(huán)境。

安裝 Java 到路徑?/opt/java17。

cd /opt
sudo mkdir -p /usr/local/java
sudo tar -xvf jdk-17.0.7_linux-x64_bin.tar.gz
sudo mv jdk-17.0.7 /usr/local/java/java17

設(shè)置 Java 環(huán)境變量 (kafka 用戶下執(zhí)行)。

vim ~/.bashrc
# 輸入下面代碼
JAVA_HOME=/usr/local/java/java17
PATH=$JAVA_HOME/bin:$PATH
export JAVA_HOME PATHsource ~/.bashrc
java --version

2.2.2 安裝 Zookeeper

第一步:解壓并安裝 Zookeeper

由于 3.7.1 版本 Zookeeper 的用戶、組具有默認(rèn)值,故此處根據(jù)實(shí)際情況進(jìn)行調(diào)整。

cd /opt
sudo tar -xvf apache-zookeeper-3.7.1-bin.tar.gz
sudo mv apache-zookeeper-3.7.1-bin zookeeper
sudo chown -R root:root zookeeper
sudo chmod -R 755 zookeeper

第二步: 準(zhǔn)備 Zookeeper 的配置文件和存儲(chǔ)文件

創(chuàng)建 Zookeeper 的配置文件、數(shù)據(jù)文件、日志文件的存儲(chǔ)路徑。建議保持路徑一致。相關(guān)的程序配置文件包請(qǐng)參見(jiàn)附錄

sudo mkdir -p /KFDATA/zookeeper/etc
sudo mkdir -p /KFDATA/zookeeper/data
sudo mkdir -p /KFDATA/zookeeper/datalog
sudo mkdir -p /KFDATA/zookeeper/logs
sudo chown -R kafka:kafka /KFDATA
chmod -R 700 /KFDATA/zookeeper

準(zhǔn)備 Zookeeper 的配置文件?zoo.cfg。先從 zookeeper 安裝路徑下復(fù)制?log4j.properties?過(guò)來(lái),然后進(jìn)行修改。

說(shuō)明:Zookeeper 的不同版本?log4j.properties?配置內(nèi)容會(huì)略有區(qū)別。如有不同,請(qǐng)按 log4j 的規(guī)則調(diào)整。

cd /KFDATA/zookeeper/etc
touch zoo.cfg
echo tickTime=2000 > zoo.cfg
echo initLimit=10 >>zoo.cfg
echo syncLimit=5 >>zoo.cfg
echo  dataDir=/KFDATA/zookeeper/data >>zoo.cfg
echo  dataLogDir=/KFDATA/zookeeper/datalog >>zoo.cfg
echo  clientPort=2181 >>zoo.cfg
sudo cp /opt/zookeeper/conf/log4j.properties ./
sudo chown kafka:kafka ./log4j.properties

修改?log4j.properties?中的?zookeeper.log.dir?參數(shù) :

第三步:創(chuàng)建 Zookeeper 的啟動(dòng)文件

創(chuàng)建一個(gè)?zk.env,配置 Zookeeper 啟動(dòng)所需環(huán)境變量,用于啟動(dòng) service 文件調(diào)用。

cd /KFDATA/zookeeper/etc/
touch zk.env
echo JAVA_HOME=/usr/local/java/java17 > zk.env
echo PATH="/usr/local/java/java17/bin:/opt/zookeeper/bin:/usr/local/bin:/bin:/usr/bin:/usr/local/sbin:/usr/sbin" >> zk.env
echo ZOO_LOG_DIR=/KFDATA/zookeeper/logs >> zk.env
echo ZOO_LOG4J_OPTS=\"-Dlog4j.configuration=file:/KFDATA/zookeeper/etc/log4j.properties\" >> zk.env

如果對(duì) Zookeeper 很熟練可以自行調(diào)用 Zookeeper 安裝目錄下的 bin 文件夾下的操作腳本來(lái)進(jìn)行操作或測(cè)試。

使用?vim?命令編輯一個(gè) service 文件。

sudo vim /usr/lib/systemd/system/zookeeper.service

錄入以下啟動(dòng)命令信息并保存。

[Unit]
Description=Apache Kafka - ZooKeeper
After=network.target[Service]
Type=forking
User=kafka
Group=kafka
EnvironmentFile=/KFDATA/zookeeper/etc/zk.env
ExecStart=/opt/zookeeper/bin/zkServer.sh start /KFDATA/zookeeper/etc/zoo.cfg
ExecStop=/opt/zookeeper/bin/zkServer.sh stop /KFDATA/zookeeper/etc/zoo.cfg
TimeoutStopSec=180
Restart=no[Install]
WantedBy=multi-user.target

重新加載 service 啟動(dòng)服務(wù)。

sudo systemctl daemon-reload

第四步:創(chuàng)建測(cè)試腳本

(1)創(chuàng)建連接 Zookeeper 測(cè)試文件?zkCon.sh。

mkdir -p /KFDATA/bin
cd /KFDATA/bin
touch zkCon.sh
echo export JAVA_HOME=/usr/local/java/java17 >zkCon.sh
echo export PATH="{$JAVE_HOME}/bin:/opt/zookeeper/bin:/usr/local/bin:/bin:/usr/bin:/usr/local/sbin:/usr/sbin" >>zkCon.sh
echo export ZOO_LOG_DIR=/KFDATA/zookeeper/logs >>zkCon.sh
echo export ZOO_LOG4J_OPTS=\"-Dlog4j.configuration=file:/KFDATA/zookeeper/etc/log4j.properties\" >>zkCon.sh
echo  '/opt/zookeeper/bin/zkCli.sh -server localhost:2181 -Dzookeeper.config.path=/KFDATA/zookeeper/zoo.cfg' >>zkCon.sh

對(duì)腳本授予執(zhí)行權(quán)限。

chmod +x  zkCon.sh 

2.2.3 部署啟動(dòng) Zookeeper

第一步:通過(guò) systemctl 工具啟動(dòng) Zookeeper 服務(wù)

sudo systemctl start zookeeper.service

第二步:查看 Zookeeper 啟動(dòng)情況

可以通過(guò)?jps?命令查看 Java 進(jìn)程, QuorumPeerMain 進(jìn)程是 Zookeeper 的啟動(dòng)進(jìn)程。

也可以通過(guò)?systemctl命令查看,如圖即是正常啟動(dòng)。

 sudo systemctl status zookeeper

第三步:通過(guò)客戶端連接 Zookeeper ,并進(jìn)行查看

cd /KFDATA/bin/
./zkCon.sh
# 等待zookeeper 命令行窗口
ls /
ls /zookeeper 

如果返回如下顯示,表示 Zookeeper 啟動(dòng)成功,可以在 Zookeeper 中觀察到自身的基礎(chǔ)信息。

ctrl +c?可以退出 Zookeeper 客戶端連接。

2.3 部署 Kafka

2.3.1 安裝 Kafka

第一步:解壓安裝 Kafka 文件

執(zhí)行以下命令,修改一下 Kafka 的安裝文件名。

cd /opt
sudo tar -xvf kafka_2.13-3.4.1.tgz
sudo mv kafka_2.13-3.4.1 kafka

第二步:準(zhǔn)備 Kafka 的配置文件和存儲(chǔ)文件

創(chuàng)建 Kafka 的配置文件、數(shù)據(jù)文件、日志文件的存儲(chǔ)路徑。

mkdir -p /KFDATA/kafka/etc
mkdir -p /KFDATA/kafka/data
mkdir -p /KFDATA/kafka/logs

準(zhǔn)備 Kafka 相關(guān)配置文件,創(chuàng)建啟動(dòng)配置文件,和日志配置文件。

cd /KFDATA/kafka/etc
touch kafka-server.properties
cp /opt/kafka/config/log4j.properties ./
cp /opt/kafka/config/tools-log4j.properties ./

修改 kafka-server.properties 文件中的配置,修改內(nèi)容如下。

############################# Server Basics #############################
broker.id=1
############################# Socket Server Settings #############################
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://192.168.189.130:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
############################# Log Basics #############################
log.dirs=/KFDATA/kafka/data
num.partitions=1
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings  #############################
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Retention Policy #############################
log.retention.hours=-1
log.retention.bytes=21474836480 
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
auto.create.topics.enable=true
############################# Zookeeper #############################
zookeeper.connect=192.168.189.130:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=12000
############################# Group Coordinator Settings #############################
group.initial.rebalance.delay.ms=0
############################# message Settings #############################
message.max.byte=5242880

其中以下兩項(xiàng)需要視具體環(huán)境修改,advertise.listeners是對(duì)外監(jiān)聽(tīng)端口。

advertised.listeners=PLAINTEXT://192.168.189.130:9092
zookeeper.connect=192.168.189.130:2181

第三步:準(zhǔn)備 Kafka 的啟動(dòng)文件

創(chuàng)建 Kafka 啟動(dòng)的環(huán)境變量文件, 這里配置了開啟 JMX 監(jiān)控端口,如果不需要,可以忽略后兩項(xiàng)配置。

JMX 端口的作用是可以通過(guò)此端口連接,獲取一些監(jiān)控指標(biāo)。

cd /KFDATA/kafka/etc
touch kf-server.envecho PATH="/usr/local/java/java17/bin:/opt/zookeeper/bin:/opt/kafka:/usr/local/bin:/bin:/usr/bin:/usr/local/sbin:/usr/sbin" >>kf-server.env
echo LOG_DIR="/KFDATA/kafka/logs/" >>kf-server.env
echo KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:/KFDATA/kafka/etc/log4j.properties\" >>kf-server.env
echo KAFKA_JMX_OPTS=\"-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=192.168.189.130 -Djava.net.preferIPv4Stack=true\" >>kf-server.env
echo JMX_PORT=29999 >>kf-server.env

創(chuàng)建 Kafka 的 systemd service 文件,通過(guò)?vim?命令打開一個(gè)文件。

sudo vim /usr/lib/systemd/system/kafka-server.service

錄入以下內(nèi)容并保存。

[Unit]
Description=Apache Kafka - broker
After=network.target confluent-zookeeper.target[Service]
Type=forking
User=kafka
Group=kafka
EnvironmentFile=/KFDATA/kafka/etc/kf-server.env
ExecStart=/opt/kafka/bin/kafka-server-start.sh -daemon /KFDATA/kafka/etc/kafka-server.properties
ExecStop=/KFDATA/kafka/bin/kafka-server-stop.sh
LimitNOFILE=1000000
TimeoutStopSec=180
Restart=no[Install]
WantedBy=multi-user.target

重新加載 service 啟動(dòng) 服務(wù)。

sudo systemctl daemon-reload

2.3.2 部署啟動(dòng) Kafka

第一步: 通過(guò) systemctl 工具啟動(dòng) Kafka 服務(wù)

執(zhí)行下述命令啟動(dòng) Kafka 服務(wù):

sudo systemctl start kafka-server.service

第二步: 查看 Kafka 啟動(dòng)情況

檢查 Kafka 啟動(dòng)情況,可以連接 Zookeeper 客戶端 。查看 zookeeper 中的數(shù)據(jù)。

cd /KFDATA/bin
./zkCon.sh
ls /

可以看到 Zookeeper 中已經(jīng)多了一些 kafka 注冊(cè)信息,如 brokers ,cluseter, config, controller 等。

此時(shí),可以測(cè)試創(chuàng)建一個(gè) topic 進(jìn)行測(cè)試:

cd  /opt/kafka/bin
./kafka-topics.sh --bootstrap-server 192.168.189.130:9092 --create --topic test110

執(zhí)行下述代碼,查看當(dāng)前 Kafka 中 topic 列表:

./kafka-topics.sh --bootstrap-server 192.168.189.130:9092 --list

如果返回上述圖片顯示內(nèi)容,說(shuō)明 Kafka 已經(jīng)啟動(dòng)成功。

2.4 部署 Schema-Registry

Schema-Registry 是用于注冊(cè)傳輸數(shù)據(jù)的數(shù)據(jù)結(jié)構(gòu)的。并記錄數(shù)據(jù)結(jié)構(gòu)改變的每一個(gè)版本。數(shù)據(jù)寫入 Kafka 和從 Kafka 中讀出都需要 schema-registry 中記錄的數(shù)據(jù)結(jié)構(gòu)來(lái)進(jìn)行序列化和反序列化。通過(guò)使用 schema-registry 來(lái)注冊(cè)數(shù)據(jù)結(jié)構(gòu)。Kafka 中只需保存序列化后的數(shù)據(jù)即可??梢詼p少數(shù)據(jù)的空間占用。

2.4.1 安裝 Schema-Registry

第一步:解壓安裝 Schema-Registry 文件

Schema-Registry 程序是 confluent 程序包中一部分。故這里我們要安裝 conluent 程序包 , 部署社區(qū)版本即可。解壓縮 confluent-community-7.4.0.tar.gz,并修改文件名,設(shè)置隸屬組。

cd /opt
sudo tar -xvf confluent-community-7.4.0.tar.gz
sudo mv confluent-7.4.0 confluent
sudo chown -R root:root confluent
sudo chmod -R 755 confluent

第二步:準(zhǔn)備 Schema-Registry 的配置文件和存儲(chǔ)文件

創(chuàng)建 schema-registry 的配置、日志文件存儲(chǔ)路徑。

mkdir -p /KFDATA/schema-registry/etc
mkdir -p /KFDATA/schema-registry/logs

準(zhǔn)備 schema-registry 的配置文件。

cd /KFDATA/schema-registry/etc
cp /opt/confluent/etc/schema-registry/schema-registry.properties ./
cp /opt/confluent/etc/schema-registry/log4j.properties ./

修改?schema-registry.properties文件 , 修改連接的 Kafka Server 地址。

第三步:準(zhǔn)備 Schema-Registry的啟動(dòng)文件

創(chuàng)建 Schema-Registry 啟動(dòng)環(huán)境變量文件,用于 Schema-Registry 啟動(dòng)時(shí)使用。

touch schema-registry.env
echo PATH="/usr/local/java/java17/bin:/opt/confluent/bin:/usr/local/bin:/bin:/usr/bin:/usr/local/sbin:/usr/sbin" >schema-registry.env
echo LOG_DIR="/KFDATA/schema-registry/logs" >>schema-registry.env
echo LOG4J_DIR="/KFDATA/schema-registry/etc/log4j.properties" >>schema-registry.env
echo SCHEMA_REGISTRY_LOG4J_OPTS=\"-Dlog4j.configuration=file:/KFDATA/schema-registry/etc/log4j.properties\" >>schema-registry.env

創(chuàng)建 Schema-Registry 的 systemd service 啟動(dòng)文件。

sudo vim /usr/lib/systemd/system/schema-registry.service

錄入以下內(nèi)容并保存。

[Unit]
Description=RESTful Avro schema registry for Apache Kafka
After=network.target[Service]
Type=forking
User=kafka
Group=kafka
EnvironmentFile=/KFDATA/schema-registry/etc/schema-registry.env
ExecStart=/opt/confluent/bin/schema-registry-start -daemon /KFDATA/schema-registry/etc/schema-registry.properties
TimeoutStopSec=180
Restart=no[Install]
WantedBy=multi-user.target

重新加載 service 啟動(dòng)服務(wù)。

sudo systemctl daemon-reload

2.4.2 部署啟動(dòng) Schema-Registry

第一步: 通過(guò) systemctl 工具啟動(dòng) Schema-Registry 服務(wù)

執(zhí)行以下命令。

sudo systemctl start schema-registry

第二步:查看 Schema-Registry 啟動(dòng)情況

通過(guò) systemctl 工具查看啟動(dòng)狀態(tài)。

sudo systemctl status schema-registry

查看 Kafka 中的 topic。

cd /opt/kafka/bin
./kafka-topics.sh --bootstrap-server 192.168.189.130:9092 --list

可以看到 kafka 中已經(jīng)創(chuàng)建出了 schema-registry 需要使用的 topic。

schema-registry 啟動(dòng)成功。

2.5 部署 Kafka-Connect

Kafka-Connect 是 Kafka 提供的 HA 框架,實(shí)現(xiàn)了 Kafka-Connect 接口的 connector (連接器) ,只需處理自己需要進(jìn)行讀取、寫入數(shù)據(jù)任務(wù)。高可用部分由 Kafka-Connect 框架負(fù)責(zé)。

Kafka-Connect 可用通過(guò) REST API 進(jìn)行訪問(wèn)。

2.5.1 安裝 Kafka-Connect

第一步: Kafka-Connect 安裝

Kafka-Connect 由 Kafka 提供,啟動(dòng)程序在 Kafka 的安裝路徑下,已經(jīng)存在。數(shù)據(jù)元數(shù)據(jù)注冊(cè)由 schema-registry 處理。相應(yīng)的序列化包在已安裝 Confluent 路徑下。故無(wú)需再安裝程序包。

第二步:準(zhǔn)備 Kafka-Connect 的配置文件和存儲(chǔ)文件

創(chuàng)建 Kafka-Connect 的配置、日志文件存儲(chǔ)路徑。

mkdir -p /KFDATA/kafka-connect/etc
mkdir -p /KFDATA/kafka-connect/logs

創(chuàng)建 Kafka-Connect 的配置文件。

cd /KFDATA/kafka-connect/etc
vim kafka-connect.properties

錄入以下內(nèi)容并保存。 IP 地址部分,需要根據(jù)前環(huán)境修改。

bootstrap.servers=192.168.189.130:9092
group.id=connect-clusterkey.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://192.168.189.130:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://192.168.189.130:8081
key.converter.schemas.enable=true
value.converter.schemas.enable=trueinternal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=falseconfig.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-statuses
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1plugin.path=/opt/confluent/share/java/plugin
rest.host.name=192.168.189.130
rest.port=8083
rest.advertised.host.name=192.168.189.130
rest.advertised.port=8083offset.flush.timeout.ms=50000
offset.flush.interval.ms=10000
send.buffer.bytes=13107200
consumer.max.poll.records=10000
consumer.partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor

創(chuàng)建 Kafka-Connect 的?log4j?配置文件。

cd /KFDATA/kafka-connect/etc
cp /opt/kafka/config/connect-log4j.properties ./log4j.properties

修改文件中的以下參數(shù)配置。

vim ./log4j.properties
log4j.appender.connectAppender.File=${kafka.logs.dir}/connect.log

將其修改為:

log4j.appender.connectAppender.File=/KFDATA/kafka-connect/logs/connect.log

第三步: 準(zhǔn)備 Kafka-Connect 的啟動(dòng)文件

創(chuàng)建 Kafka-Connect 啟動(dòng)環(huán)境變量文件。

cd /KFDATA/kafka-connect/etc
touch kafka-connect.envecho PATH="/usr/local/java/java17/bin:/usr/local/bin:/bin:/usr/bin:/usr/local/sbin:/usr/sbin" >kafka-connect.env
echo LOG_DIR="/KFDATA/kafka-connect/logs/" >>kafka-connect.env
echo LOG4J_DIR="/KFDATA/kafka-connect/etc/log4j.properties" >>kafka-connect.env
echo KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:/KFDATA/kafka-connect/etc/log4j.properties\" >>kafka-connect.env
echo CLASSPATH=/opt/confluent/share/java/schema-registry/*:/opt/confluent/share/java/kafka-serde-tools/*:/opt/confluent/share/java/confluent-common/* >>kafka-connect.env
echo JMX_PORT=29998 >>kafka-connect.env

創(chuàng)建 Kafka-Connect 的 systemd service 文件。

sudo vim /usr/lib/systemd/system/kafka-connect.service

錄入以下內(nèi)容,并保存。

[Unit]
Description=Apache Kafka Connect - distributed
After=network.target[Service]
Type=simple
User=kafka
Group=kafka
EnvironmentFile=/KFDATA/kafka-connect/etc/kafka-connect.env
ExecStart=/opt/kafka/bin/connect-distributed.sh /KFDATA/kafka-connect/etc/kafka-connect.properties
TimeoutStopSec=180
Restart=no[Install]
WantedBy=multi-user.target

重新加載 service 啟動(dòng)服務(wù)。

sudo systemctl daemon-reload

2.5.2 部署啟動(dòng) Kafka-Connect

第一步: 通過(guò) systemctl 工具啟動(dòng) Kafka-Connect 服務(wù)

執(zhí)行以下命令。

sudo systemctl start kafka-connect.service

第二步: 查看 Kafka-Connect 啟動(dòng)情況

通過(guò)?jps?命令查看啟動(dòng)情況。

jps -mlvV |grep connect

查看 Kafka 中的 topic 情況,Kafka-Connect 會(huì)在 Kafka 中創(chuàng)建 connect-configs , connect-offsets, connect-statuses 三個(gè) topic。

cd  /opt/kafka/bin
./kafka-topics.sh --bootstrap-server 192.168.189.130:9092 --list

使用?curl?命令訪問(wèn) kafka-connect ,可以看到當(dāng)前還沒(méi)有配置 connector 任務(wù)。

 curl -H "Accept:application/json" 192.168.189.130:8083/connectors/

3. 從 Oracle 到 Kafka 的數(shù)據(jù)同步:部署程序與配置任務(wù)

3.1 配置 Oracle 數(shù)據(jù)庫(kù)

對(duì)于 Source 數(shù)據(jù)庫(kù) Oracle ,本教程采用 Oracle 提供的 LogMiner 工具來(lái)捕獲數(shù)據(jù)的數(shù)據(jù)變化 ,這里需要對(duì) Oracle 數(shù)據(jù)庫(kù)做一些設(shè)置。主要配置如下:

  • Oracle 需開啟歸檔模式
  • 要進(jìn)行實(shí)時(shí)同步的 Oracle 表需要開啟表級(jí)補(bǔ)充日志
  • 創(chuàng)建具有一定權(quán)限的 Oracle 用戶

第一步: 開啟 Oracle 歸檔模式

創(chuàng)建備份文件路徑:

mkdir -p /data/oradata/recovery_area

sqlplus 以 dba 權(quán)限登錄:

sqlplus / as sysdba

執(zhí)行以下命令開啟 Oracle 歸檔模式:

-- 設(shè)置數(shù)據(jù)庫(kù)歸檔空間大小為20GB,該值為建議最小提供值
alter system set db_recovery_file_dest_size = 20G;
-- 設(shè)置數(shù)據(jù)庫(kù)歸檔路徑
alter system set db_recovery_file_dest = '/data/oradata/recovery_area' scope=spfile;
-- 立即關(guān)閉數(shù)據(jù)庫(kù)
shutdown immediate
-- mount模式啟動(dòng)
startup mount
-- 開啟歸檔模式
alter database archivelog;
-- 開啟數(shù)據(jù)庫(kù)
alter database open;

查看是否開啟歸檔日志,如圖顯示 Archive Mode 則表示歸檔已開啟:

archive log list

歸檔日志空間大小需根據(jù)實(shí)際需求設(shè)置。當(dāng)歸檔日志滿時(shí),Oracle將不可用,需自行設(shè)置腳本來(lái)定時(shí)清理歸檔日志。

第二步: 開啟補(bǔ)充日志

開啟最小補(bǔ)充日志:

ALTER DATABASE ADD SUPPLEMENTAL LOG DATA; -- 開啟最小補(bǔ)充日志

除了最小補(bǔ)充日志,還需要開啟全字段的補(bǔ)充日志。可以開啟數(shù)據(jù)庫(kù)級(jí)全字段補(bǔ)充日志,也可以根據(jù)需要同步的表開啟表級(jí)別全字段補(bǔ)充日志,只開啟表級(jí)別可以減少歸檔日志的產(chǎn)生量。

開啟數(shù)據(jù)庫(kù)級(jí)全字段補(bǔ)充日志:

ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; -- 開啟
-- ALTER DATABASE DROP SUPPLEMENTAL LOG DATA (ALL) COLUMNS; -- 關(guān)閉

或開啟表級(jí)別全字段補(bǔ)充日志,SCHEMA 和 TABLENAME 需替換成具體的同步表(推薦采用此種方式):

ALTER TABLE SCHEMA.TABLENAME ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; -- 開啟
-- ALTER TABLE SCHEMA.TABLENAME DROP SUPPLEMENTAL LOG DATA (ALL) COLUMNS; -- 關(guān)閉

第三步:?創(chuàng)建數(shù)據(jù)同步用的 debezium 用戶

創(chuàng)建 logminer_tbs 表空間。

CREATE TABLESPACE logminer_tbs DATAFILE '/data/oradata/dbname/logminer_tbs.dbf'
SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;

創(chuàng)建專門用于日志挖掘的用戶,命名為 debezium。

CREATE USER debezium IDENTIFIED BY password
DEFAULT TABLESPACE logminer_tbs
QUOTA UNLIMITED ON logminer_tbs;

對(duì) debezium 用戶授權(quán)(以 Oracle 11g 版本的設(shè)置為示例,其他 Oracle 版本可以參考 Debezium 文檔說(shuō)明)。

GRANT SELECT ANY TABLE TO debezium;
GRANT FLASHBACK ANY TABLE TO debezium;GRANT CREATE SESSION TO debezium;
GRANT SELECT ON V_$DATABASE to debezium;
GRANT SELECT_CATALOG_ROLE TO debezium;
GRANT EXECUTE_CATALOG_ROLE TO debezium;
GRANT SELECT ANY TRANSACTION TO debezium;
GRANT SELECT ANY DICTIONARY to debezium;GRANT CREATE TABLE TO debezium;
GRANT CREATE SEQUENCE TO debezium;GRANT EXECUTE ON DBMS_LOGMNR TO debezium;
GRANT EXECUTE ON DBMS_LOGMNR_D TO debezium;GRANT SELECT ON V_$LOG TO debezium;
GRANT SELECT ON V_$LOG_HISTORY TO debezium;
GRANT SELECT ON V_$LOGMNR_LOGS TO debezium;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO debezium;
GRANT SELECT ON V_$LOGMNR_PARAMETERS TO debezium;
GRANT SELECT ON V_$LOGFILE TO debezium;
GRANT SELECT ON V_$ARCHIVED_LOG TO debezium;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO debezium;

其中SELECT ANY TABLEFLASHBACK ANY TABLE可以縮減權(quán)限為需實(shí)時(shí)同步的表:

GRANT SELECT ON SCHEMA.TABLENAME TO debezium;
GRANT FLASHBACK ON SCHEMA.TABLENAME TO debezium;

3.2 安裝 Debezium-Oracle 連接器插件

Oracle 的 Debezium 數(shù)據(jù)同步插件需要安裝在 Kafka_Connect 程序部署文件路徑下。

配置啟動(dòng) Debezium-Oracle 連接器,需要以下兩步:

  • 下載 Debezium-Oracle-Connector 插件,將插件解壓并放到 Kafka Connect 的插件路徑下
  • 重新啟動(dòng) Kafka Connect 程序,以加載插件

第一步: 下載安裝 Debezium-Oracle 插件和 ojdbc 包

官方網(wǎng)站?Debezium,選擇 2.5.1.Final 版本進(jìn)行下載,程序名為debezium-connector-oracle-2.5.1.Final-plugin.tar.gz

在 confluent 的安裝路徑下創(chuàng)建插件路徑,在此路徑下解壓 Debezium 的 Oracle 插件包,請(qǐng)確保 kafka 用戶對(duì)此路徑的讀權(quán)限。

sudo mkdir -p /opt/confluent/share/java/plugin
cd /opt/confluent/share/java/plugin
sudo tar -xvf ./debezium-connector-oracle-2.5.1.Final-plugin.tar.gz
sudo rm ./debezium-connector-oracle-2.5.1.Final-plugin.tar.gz

連接 Oracle 數(shù)據(jù)庫(kù)還需 Oracle 的 JDBC 包,可以到 Oracle 官網(wǎng)?JDBC Drivers Archive?下載對(duì)應(yīng) Oracle 版本的 JDBC 包,目前 Oracle 官網(wǎng)已下架 Oracle 11g 版本的ojdbc包,可以在此下載ojdbc8.jar

在當(dāng)前的debezium-connector-oracle目錄下再放入 ojdbc 包?ojdbc8.jar

cp ~/ojdbc8.jar /opt/confluent/share/java/plugin/debezium-connector-oracle/

第二步: 配置 Kafka-Connect 加載插件

修改 Kafka Connect 的配置文件,添加插件路徑配置。若已配置插件路徑,則跳過(guò)該步驟。

cd /KFDATA/kafka-connect/etc
vim kafka-connect.properties

添加或修改參數(shù) plugin.path 如下。

plugin.path=/opt/confluent/share/java/plugin

重新啟動(dòng) Kafka Connect。

sudo systemctl restart kafka-connect

查看 kafka connect 的日志輸出,能查詢到信息則說(shuō)明插件加載成功。

cat /KFDATA/kafka-connect/logs/connect.log | grep OracleConnector

3.3 配置 Oracle 數(shù)據(jù)同步連接任務(wù)

配置同步任務(wù)的及檢查的很多命令都要帶上?url?等參數(shù)。為了操作快捷,本教程封裝了一些加載配置文件的操作腳本在?kafka-tools.tar?包中,詳情參見(jiàn)附錄。下載當(dāng)前包,解壓縮到 /KFDATA 目錄下。后續(xù)的很多操作,包括檢查 Kafka 的 topic、查看數(shù)據(jù)和配置同步任務(wù)等都會(huì)使用 kafka-tools 包中的腳本。包中的腳本在無(wú)參數(shù)運(yùn)行時(shí)會(huì)輸出 help。

cd /KFDATA
sudo tar -xvf kafka-tools.tar
sudo chown kafka:kafka kafka-tools
rm ./kafka-tools.tar

修改?/KFDATA/kafka-tools/config/config.properties?配置參數(shù)。

按照本機(jī)的路徑、IP 等對(duì)應(yīng)修改 Kafka 和 Kafka_Connect 的啟動(dòng) IP 地址,以及安裝目錄。

示例如下:

#kafka parameters
kafka_home=/opt/kafka
confluent_home=/opt/confluent
bootstrap_server=192.168.189.130:9092#kafka-connect parameters
connect_rest_url=192.168.1.178:8083
#rest_pd  means restful request  password,This is not necessary
#rest_pd=appsdba:passwd
schema_ip=192.168.189.130
schema_port=8081

第一步:準(zhǔn)備 Oracle 數(shù)據(jù)庫(kù)表

在 debezium 用戶下創(chuàng)建index_example示例表。

create table debezium.index_example (trade_date date,stock_code varchar2(20),effDate date,indexShortName varchar2(20),indexCode varchar2(20),secShortName varchar2(50),exchangeCD varchar2(10),weight number,tm_stamp date,flag integer,primary key (trade_date, stock_code, indexCode, flag)
);

index_example表中插入4條數(shù)據(jù)。

insert into debezium.index_example
values(to_date('2006-11-30', 'YYYY-MM-DD'), '000759', to_date('2018-06-30 03:48:05', 'YYYY-MM-DD HH24:MI:SS'),
'中證500', '000905', '中百集團(tuán)', 'XSHE', 0.0044, to_date('2018-06-30 05:43:05', 'YYYY-MM-DD HH24:MI:SS'), 1);insert into debezium.index_example
values(to_date('2006-11-30', 'YYYY-MM-DD'), '000759', to_date('2018-06-30 04:47:05', 'YYYY-MM-DD HH24:MI:SS'),
'中證500', '000906', '中百集團(tuán)', 'XSHE', 0.0011, to_date('2018-06-30 05:48:06', 'YYYY-MM-DD HH24:MI:SS'), 1);insert into debezium.index_example
values(to_date('2006-11-30', 'YYYY-MM-DD'), '600031', to_date('2018-06-30 03:48:05', 'YYYY-MM-DD HH24:MI:SS'),
'上證180', '000010', '三一重工', 'XSHG', 0.0043, to_date('2018-06-30 05:48:05', 'YYYY-MM-DD HH24:MI:SS'), 1);insert into debezium.index_example
values(to_date('2006-11-30', 'YYYY-MM-DD'), '600031', to_date('2018-06-30 03:48:05', 'YYYY-MM-DD HH24:MI:SS'),
'滬深300', '000300', '三一重工', 'XSHG', 0.0029, to_date('2018-06-30 05:48:05', 'YYYY-MM-DD HH24:MI:SS'), 1);commit;

在 debezium 用戶下創(chuàng)建stock_example示例表。

create table debezium.stock_example (id number,ts_code varchar2(20),symbol_id varchar2(20),name varchar2(20),area varchar2(20),industry varchar2(20),list_date date,primary key (id, ts_code)
);

stock_example表中插入3條數(shù)據(jù)并提交。

insert into debezium.stock_example
values (1, '000001.SZ', '000001', '平安銀行', '深圳', '銀行', to_date('1991-04-03', 'YYYY-MM-DD'));insert into debezium.stock_example
values (2, '000002.SZ', '000002', '萬(wàn)科A', '深圳', '地產(chǎn)', to_date('1991-01-29', 'YYYY-MM-DD'));insert into debezium.stock_example
values (3, '000004.SZ', '000004', 'ST國(guó)華', '深圳', '軟件服務(wù)', to_date('1991-01-14', 'YYYY-MM-DD'));

查看 Oracle 表中數(shù)據(jù)。

若在3.1節(jié)的 step2 中未開啟數(shù)據(jù)庫(kù)級(jí)全字段增量日志,在此步還需開啟表級(jí)全字段增量日志。

ALTER TABLE debezium.index_example ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
ALTER TABLE debezium.stock_example ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

第二步:準(zhǔn)備連接器配置文件,并啟動(dòng)連接任務(wù)

創(chuàng)建連接 Oracle 的 source 連接器配置文件。

mkdir -p /KFDATA/datasyn-config
vim /KFDATA/datasyn-config/source-oracle.json

錄入以下配置,hostname 和 kafka 啟動(dòng)地址需對(duì)應(yīng)修改。

{"name": "oracle-connector","config": {"connector.class": "io.debezium.connector.oracle.OracleConnector","tasks.max": "1","topic.prefix": "oracle_service","database.hostname": "183.134.101.144","database.port": 1521,"database.user": "debezium","database.password": "123456","database.dbname": "testdb","database.connection.adapter": "logminer","schema.include.list": "debezium","table.include.list": "debezium.index_example,debezium.stock_example","schema.history.internal.kafka.bootstrap.servers": "183.134.101.144:9092","schema.history.internal.kafka.topic": "schema-changes.test","event.processing.failure.handling.mode": "skip","decimal.handling.mode": "string","snapshot.mode": "initial","log.mining.strategy": "online_catalog","log.mining.query.filter.mode": "in","log.mining.continuous.mine": true,"heartbeat.interval.ms": "20000","schema.history.internal.store.only.captured.tables.ddl": true,"schema.history.internal.store.only.captured.databases.ddl": true,}
}

參數(shù)說(shuō)明:

參數(shù)名稱默認(rèn)值參數(shù)說(shuō)明
connector.class無(wú)連接器的 Java 類的名稱。這里是 oracle 的連接器類名。
tasks.max1當(dāng)前 connector 的最大并行任務(wù)數(shù)。oracle 的 source 連接器任務(wù)數(shù)只能是 1 。
topic.prefix無(wú)當(dāng)前 connector 同步寫入任務(wù)的命名空間。會(huì)被用于添加到同步表對(duì)應(yīng) topic 名稱前等。
database.hostname無(wú)Oracle 數(shù)據(jù)庫(kù)服務(wù)器的 IP 地址或主機(jī)名。
database.port3306Oracle 數(shù)據(jù)庫(kù)服務(wù)器的整數(shù)端口號(hào)。
database.user無(wú)Oracle 數(shù)據(jù)庫(kù)服務(wù)器連接用戶。
database.password無(wú)Oracle 數(shù)據(jù)庫(kù)服務(wù)器連接用戶密碼。
database.dbname無(wú)Oracle 數(shù)據(jù)庫(kù)實(shí)例名稱。
database.connection.adapterlogminer使用的適配器名稱。選項(xiàng)為 logminer、olr 或者 xstream 。分別對(duì)應(yīng) Oracle LogMiner API、OpenLogReplicator和Oracle XStream API。
schema.include.list無(wú)匹配的模式名??梢远鄠€(gè),用逗號(hào)分割即可。
table.include.list無(wú)匹配的表名。可以多個(gè),用逗號(hào)分割即可。
schema.history.internal.kafka.bootstrap.servers無(wú)數(shù)據(jù)同步記錄 Oracle 的表結(jié)構(gòu)信息的 kafka 連接。
schema.history.internal.kafka.topic無(wú)數(shù)據(jù)同步記錄 Oracle 表結(jié)構(gòu)的 topic 名稱。
event.processing.failure.handling.modefail處理事件時(shí)碰到異常如何處理??蛇x fail 、warn 和 skip。
decimal.handling.modeprecise針對(duì) Oracle 的 NUMBER、DECIMAL 和 NUMERIC 類型的處理模式?!眕recise” 表示轉(zhuǎn)換成 java.math.BigDecimal ,”double” 表示轉(zhuǎn)換成 double 值,”string” 表示轉(zhuǎn)換成字符串。
snapshot.modeinitial指定 connector 用來(lái)捕獲表快照的形式。常用的是 ”initial”和 ”schema_only” ?!癷nitial“ 在 connector 第一次啟動(dòng)時(shí)會(huì)獲取表結(jié)構(gòu)和表的快照數(shù)據(jù),并繼續(xù)獲取新增的變更數(shù)據(jù)?!眘chema_only” 表示只獲取表結(jié)構(gòu),并只會(huì)獲取新增的變更數(shù)據(jù)。其他形式請(qǐng)參考 debezium 官方文檔。
log.mining.strategyredo_log_catalog指定挖掘策略。挖掘策略決定了Oracle LogMiner 如何構(gòu)建數(shù)據(jù)字典來(lái)解析日志文件。有 redo_log_catalog 和 online_catalog 兩種模式。推薦使用 online_catalog,這種模式的優(yōu)點(diǎn)是速度快、不產(chǎn)生多余日志。缺點(diǎn)是如果DDL變化時(shí)如果還有舊的表結(jié)構(gòu)對(duì)應(yīng)的數(shù)據(jù)變更沒(méi)有挖掘完成,無(wú)法根據(jù)舊的表結(jié)構(gòu)解析日志,不適用于表結(jié)構(gòu)頻繁變更的情況。
log.mining.query.filter.modenoneOracle LogMiner 查詢構(gòu)建的方式。查詢過(guò)濾方式,推薦使用 ”in” 的方式。
log.mining.continuous.minefalse連續(xù)挖掘選項(xiàng)。Oracle 12c 及之后該選項(xiàng)已棄用。
heartbeat.interval.ms0connector 向 topic 發(fā)送心跳信息的頻率,單位為毫秒。當(dāng)需捕獲的表在較長(zhǎng)時(shí)間沒(méi)有數(shù)據(jù)更改時(shí),設(shè)置此參數(shù)可以及時(shí)刷新 SCN 號(hào),設(shè)置為 0 時(shí) connector 不會(huì)發(fā)送任何心跳信息,可能會(huì)導(dǎo)致 connector 記錄的SCN號(hào)和數(shù)據(jù)庫(kù)最新 SCN 號(hào)之間的間隔較大。
schema.history.internal.store.only.captured.tables.ddlfalse指定 connector 記錄一個(gè) schema 或者一個(gè)數(shù)據(jù)庫(kù)中所有非系統(tǒng)表的表結(jié)構(gòu),還是只記錄需要捕獲表的表結(jié)構(gòu)。
schema.history.internal.store.only.captured.databases.ddlfalse指定 connector 是否記錄數(shù)據(jù)庫(kù)實(shí)例中所有邏輯數(shù)據(jù)庫(kù)的表結(jié)構(gòu)。

更多詳細(xì)參數(shù)說(shuō)明可以參看(Debezium 2.5),不同Debezium版本的參數(shù)配置不同,若使用其他版本的Debezium,需找到對(duì)應(yīng)文檔做修改。Debezium Connector for Oracle

第三步: 啟動(dòng) Oracle 的數(shù)據(jù)同步任務(wù)

通過(guò) REST API 啟動(dòng) Oracle 的 source 連接器

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://183.134.101.144:8083/connectors/ -d @/KFDATA/datasyn-config/source-oracle.json

也可以通過(guò) kafka-tools 中的腳本啟動(dòng)

cd /KFDATA/kafka-tools/bin
./rest.sh create /KFDATA/datasyn-config/source-oracle.json

第四步:查看 Oracle 數(shù)據(jù)同步任務(wù)狀態(tài)

查看同步任務(wù)列表。list?參數(shù)展示任務(wù)名列表,showall 參數(shù)會(huì)顯示全部同步任務(wù)狀態(tài)。

cd /KFDATA/kafka-tools/bin
./rest.sh showall

查看?topic列表:

./kafka.sh tplist

查看 topic oracle_service.DEBEZIUM.INDEX_EXAMPLE和 oracle_service.DEBEZIUM.STOCK_EXAMPLE 中的數(shù)據(jù)條數(shù):

./kafka.sh get_offsets oracle_service.DEBEZIUM.INDEX_EXAMPLE
./kafka.sh get_offsets oracle_service.DEBEZIUM.STOCK_EXAMPLE

結(jié)果如圖所示

4. 從Kafka 到 DolphinDB 的數(shù)據(jù)同步:部署程序與配置任務(wù)

4.1 安裝 Kafka-DolphinDB 數(shù)據(jù)同步連接器插件

配置啟動(dòng) Kafka-DolphinDB 連接器,需要以下兩步:

  • 下載 Kafka-DolphinDB-Connector 插件,將插件解壓并放到 Kafka Connect 的插件路徑下。
  • 重新啟動(dòng) Kafka Connect 程序,以加載插件。

第一步:下載 Kafka-DolphinDB 插件

  • jdbc-2.00.11.1-ddbcdc-beta1.jar :該 DolphinDB JDBC 包為數(shù)據(jù)同步做了一些專門修改,為特殊版本,后續(xù)會(huì)進(jìn)行版本統(tǒng)一支持。參見(jiàn)附錄。
  • kafka-connect-jdbc-10.7.4-ddb1.10.jar :是基于 kafka-connect-jdbc-10.7.4 開發(fā)的 DolphinDB 連接器,后續(xù)會(huì)進(jìn)行代碼開源。參見(jiàn)附錄。

創(chuàng)建插件路徑,在此路徑下放置 Kafka-DolphinDB 插件包,將上述兩個(gè) jar 包放在此目錄下。請(qǐng)確保 kafka 用戶包含對(duì)這兩個(gè)文件的讀權(quán)限。

sudo mkdir -p /opt/confluent/share/java/plugin/kafka-connect-jdbc
sudo cp ~/jdbc-2.00.11.1-ddbcdc-beta1.jar /opt/confluent/share/java/plugin/kafka-connect-jdbc/
sudo cp ~/kafka-connect-jdbc-10.7.4-ddb1.10.jar /opt/confluent/share/java/plugin/kafka-connect-jdbc/

如果上面的操作碰到權(quán)限問(wèn)題,則可以使用以下命令賦予權(quán)限。

sudo chmod o+rx /opt/confluent/share/java/plugin/kafka-connect-jdbc/*

第二步: 重啟 kafka-connect

sudo systemctl restart kafka-connect

查看kafka-connect路徑的日志輸出

cat /KFDATA/kafka-connect/logs/connect.log | grep JdbcSinkConnector

出現(xiàn)下圖中所示信息時(shí),說(shuō)明插件加載成功:

4.2 配置 DolphinDB 的數(shù)據(jù)同步連接任務(wù)

第一步:創(chuàng)建同步的 DolphinDB 庫(kù)、表

根據(jù) Oracle 表結(jié)構(gòu),創(chuàng)建與 Oracle 表結(jié)構(gòu)一致的表,Oracle 數(shù)據(jù)類型轉(zhuǎn)換為 DolphinDB 數(shù)據(jù)類型對(duì)照表可以參考4.2節(jié)。

創(chuàng)建?index_example?表:

dbName = "dfs://index_data"
tbName = "index_example"
colNames = `trade_date`stock_code`effDate`indexShortName`indexCode`secShortName`exchangeCD`weight`tm_stamp`flag
colTypes = `DATE`SYMBOL`TIMESTAMP`SYMBOL`SYMBOL`SYMBOL`SYMBOL`DOUBLE`TIMESTAMP`INT
t = table(1:0, colNames, colTypes)
db = database(directory=dbName, partitionType=RANGE, partitionScheme=1990.01M+(0..80)*12, engine="TSDB", atomic="CHUNK")
db.createPartitionedTable(t, tbName, partitionColumns=`trade_date, sortColumns=`stock_code`indexCode`flag`trade_date, keepDuplicates=LAST, softDelete=true)

創(chuàng)建?stock_example?表:

dbName = "dfs://stock_data"
tbName = "stock_example"
colNames = `id`ts_code`symbol_id`name`area`industry`list_date
colTypes = `LONG`SYMBOL`SYMBOL`SYMBOL`SYMBOL`SYMBOL`DATE
t = table(1:0, colNames, colTypes)
db=database(directory=dbName, partitionType=HASH, partitionScheme=[LONG, 1], engine="TSDB", atomic="CHUNK")
db.createPartitionedTable(t, tbName, partitionColumns=`id, sortColumns=`ts_code`id, keepDuplicates=LAST, sortKeyMappingFunction=[hashBucket{,100}], softDelete=true)

注:建表時(shí)的軟刪除功能,即softDelete選項(xiàng)需要DolphinDB 2.00.11及以上的版本。舊版本DolphinDB建表時(shí)可以去除該選項(xiàng)。

第二步: 配置同步配置表

在DolphinDB 中創(chuàng)建一張配置表,記錄 kafka topic 和 DolphinDB 庫(kù)表之間的映射關(guān)系。配置表的庫(kù)表名可以自行調(diào)整,并在 DolphinDB 的同步任務(wù)中設(shè)置相應(yīng)的庫(kù)表名稱。配置表中字段名是固定的,需和示例保持一致。

數(shù)據(jù)庫(kù)名:dfs://ddb_sync_config

表名:sync_config

db = database("dfs://ddb_sync_config", HASH, [SYMBOL, 2])
t = table(1:0, `connector_name`topic_name`target_db`target_tab`add_sortcol_flag,
[SYMBOL, SYMBOL, SYMBOL, SYMBOL, SYMBOL])
db.createTable(t, "sync_config")

kafka topic 名可以通過(guò)之前介紹的./kafka.sh tplist?的命令查看。debezium.index_example 和 debezium.stock_example 表對(duì)應(yīng)的 kafka topic 分別是 oracle_service.DEBEZIUM.INDEX_EXAMPLE 和 oracle_service.DEBEZIUM.STOCK_EXAMPLE。

插入配置信息,將 kafka topic 和 DolphinDB 庫(kù)表名稱一一對(duì)應(yīng)。

def addSyncConfig(connector_name, topic_name, dbname, tbname, add_sortcol_flag="0") {loadTable("dfs://ddb_sync_config", "sync_config").append!(table([connector_name] as col1, [topic_name] as col2, [dbname] as col3,[tbname] as col4, [add_sortcol_flag] as col5))
}addSyncConfig("ddb-sink", "oracle_service.DEBEZIUM.INDEX_EXAMPLE", "dfs://index_data", "index_example", "0")
addSyncConfig("ddb-sink", "oracle_service.DEBEZIUM.STOCK_EXAMPLE", "dfs://stock_data", "stock_example", "0")

以下是配置表的各個(gè)字段說(shuō)明:

字段名類型字段作用
connector_nameSYMBOL配置的 DolphinDB Sink 同步任務(wù)名
topic_nameSYMBOL要同步的 kafka topic 名稱
target_dbSYMBOL對(duì)應(yīng)的 DolphinDB 分布式庫(kù)名
target_tabSYMBOL對(duì)應(yīng)的 DolphinDB 分布式表名
add_sortcol_flagSYMBOL是否需要添加 dummySortKey__ 列,需要?jiǎng)t設(shè)置為 ”1”,否則設(shè)置為 ”0”。具體原因詳見(jiàn) 4.1 節(jié)的DolphinDB 同步須知。

第三步: 準(zhǔn)備連接器配置文件,并啟動(dòng)連接任務(wù)

創(chuàng)建 DolphinDB 數(shù)據(jù)同步任務(wù)配置文件。

cd /KFDATA/datasyn-config
vim ddb-sink.json

配置如下

{"name": "ddb-sink","config": {"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector","tasks.max": "2","topics": "oracle_service.DEBEZIUM.INDEX_EXAMPLE,oracle_service.DEBEZIUM.STOCK_EXAMPLE","connection.url": "jdbc:dolphindb://183.134.101.144:8848?user=admin&password=123456","transforms": "unwrap","transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState","transforms.unwrap.drop.tombstones": "false","auto.evolve": "false","insert.mode": "insert","delete.enabled": "true","batch.size":"10000","pk.mode": "record_key","ddbsync.config.table":"dfs://ddb_sync_config,sync_config","ddbsync.addSortColFlag": "true"}
}
參數(shù)名稱默認(rèn)值參數(shù)說(shuō)明
name無(wú)同步任務(wù)名稱,不可重復(fù)。
connector.class無(wú)連接器的 Java 類的名稱。這里是 JdbcSink 的通用連接器類名。
tasks.max1當(dāng)前 connector 的最大并行任務(wù)數(shù)??梢哉{(diào)節(jié)增大,會(huì)創(chuàng)建多 consumer 并行消費(fèi)讀取 Kafka 中數(shù)據(jù)。一般的數(shù)據(jù)同步場(chǎng)景設(shè)置到 10 基本可以滿足同步速度上的需求。
topics無(wú)配置要同步的 Kafka 中的 topic 名稱,配置多個(gè) topic 時(shí)用逗號(hào)分割。
connection.url無(wú)DolphinDB 數(shù)據(jù)庫(kù)服務(wù)器的IP地址或主機(jī)名。
transforms無(wú)聲明數(shù)據(jù)轉(zhuǎn)換操作,請(qǐng)?jiān)O(shè)置為 unwrap。
transforms.unwrap.type無(wú)聲明數(shù)據(jù)轉(zhuǎn)換器類別。請(qǐng)?jiān)O(shè)置為 false。
transforms.unwrap.drop.tombstonesfalse聲明是否刪除 Kafka 中的墓碑?dāng)?shù)據(jù)。請(qǐng)?jiān)O(shè)置為 false。
auto.evolvetrue當(dāng) DolphinDB 中缺少列時(shí),是否自動(dòng)增加列。當(dāng)前不支持自動(dòng)增加列,必須配置為 false。
insert.modeinsert數(shù)據(jù)插入模式。當(dāng)前只支持 insert 模式。
pk.modenone主鍵模式。必須設(shè)置為 record_key。
delete.enabledfalse在主鍵模式為 record_key 情況下。對(duì)于 null 值 record 是否按照 delete 進(jìn)行操作。
batch.size3000設(shè)置在數(shù)據(jù)量足夠大時(shí)。以每批最大多少條來(lái)寫入到目標(biāo)數(shù)據(jù)庫(kù)。注意:當(dāng)該值大于 Connect worker 中設(shè)置的 consumer.max.pol.records 時(shí),每次提交數(shù)量會(huì)受 consumer.max.pol.records 的值限制。
ddbsync.config.tabledfs://ddb_sync_config,sync_configKafka 中的 topic 對(duì)應(yīng) DolphinDB 表的配置表名稱??梢宰孕卸x庫(kù)、表名稱。但表中的字段要保持一致。表結(jié)構(gòu)見(jiàn) 3.2 節(jié)。
ddbsync.addSortColFlagfalse是否開啟補(bǔ)充列。若部分表需要通過(guò)補(bǔ)充列來(lái)設(shè)置sortColumns,則需要開啟。具體原因見(jiàn) 4.1 節(jié)。

參數(shù)說(shuō)明:以上參數(shù)項(xiàng)為同步 DolphinDB 所需參數(shù)。如果對(duì) Confluent 的JDBC Sink Connect 有經(jīng)驗(yàn)可適當(dāng)調(diào)節(jié)。

通過(guò) REST API 啟動(dòng) source 連接器:

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://183.134.101.144:8083/connectors/ -d @ddb-sink.json

也可以通過(guò) kafka-tools 中的腳本啟動(dòng):

cd /KFDATA/kafka-tools/bin
./rest.sh create /KFDATA/datasyn-config/ddb-sink.json

查看同步任務(wù)狀態(tài), ddb-sink 是 DolphinDB 的數(shù)據(jù)同步任務(wù):

./rest.sh showall

第四步: 查看表初始數(shù)據(jù)同步進(jìn)度

在設(shè)置 Oracle 同步任務(wù)時(shí),將snapshot.mode選項(xiàng)值設(shè)置為 ”initial” ,該選項(xiàng)意味著 Oracle 會(huì)同步表的初始數(shù)據(jù)到 Kafka 中,設(shè)置完下游的 DolphinDB 任務(wù)后,可以檢查初始數(shù)據(jù)的同步情況。

通過(guò) kafka.sh 腳本查看消費(fèi)者列表:

./kafka.sh cm_list

查看 DolphinDB 同步任務(wù)對(duì)應(yīng)的 Kafka 消費(fèi)組中的每一個(gè) consumer 的消費(fèi)進(jìn)度,通過(guò)此命令可以查看同步程序中每一張的表同步進(jìn)度。 Lag 為 0 則表示 Kafka 中 topic 當(dāng)前沒(méi)有未消費(fèi)的數(shù)據(jù),即 Kafka 中的數(shù)據(jù)與對(duì)應(yīng)表的數(shù)據(jù)是一致的。

./kafka.sh cm_detail connector-ddb-sink

如上圖顯示,數(shù)據(jù)已被 DolphinDB 同步任務(wù)消費(fèi)完畢,此時(shí)在 DolphinDB 的 web 界面查看表中數(shù)據(jù),表數(shù)據(jù)和 Oracle 表中數(shù)據(jù)是一致的。

4.3 實(shí)時(shí)同步驗(yàn)證

第一步:插入數(shù)據(jù)

向 Oracle 表插入兩條新數(shù)據(jù):

insert into debezium.index_example
values (to_date('2006-11-30', 'YYYY-MM-DD'), '600051', 
to_date('2018-06-30 05:48:05', 'YYYY-MM-DD HH:MI:SS'), '上證180', '000010', '三一重工', 
'XXXB', 0.0043, to_date('2018-06-30 05:48:05', 'YYYY-MM-DD HH24:MI:SS'), 1);insert into debezium.index_example
values (to_date('2006-11-30', 'YYYY-MM-DD'), '600052', 
to_date('2018-06-30 06:48:02', 'YYYY-MM-DD HH:MI:SS'), '滬深300', '000300', '三一重工', 
'XSHG', 0.0029, to_date('2018-06-30 05:48:05', 'YYYY-MM-DD HH24:MI:SS'), 1);

查看 DolphinDB 對(duì)應(yīng)的表數(shù)據(jù):

select * from loadTable("dfs://index_data", "index_example")

可以看到新數(shù)據(jù)已寫入:

第二步:更新數(shù)據(jù)

Oracle 更改 code 值為600031數(shù)據(jù)的 tm_stamp 字段值,更改為 2024.02.28 16:00:00:

update debezium.index_example 
set tm_stamp = to_date('2024-02-28 16:00:00', 'YYYY-MM-DD HH24:MI:SS')
where stock_code = '600031';

再查看 DolphinDB 中數(shù)據(jù),數(shù)據(jù)已被修改:

第三步:刪除數(shù)據(jù)

Oracle 刪除一條數(shù)據(jù):

delete from debezium.index_example where stock_code = '600052';

再查看 DolphinDB 中數(shù)據(jù),數(shù)據(jù)已被刪除:

5. 部署注意事項(xiàng)

5.1 實(shí)時(shí)同步須知

DolphinDB 是一款支持海量數(shù)據(jù)的分布式時(shí)序數(shù)據(jù)庫(kù)。針對(duì)不同的數(shù)據(jù)處理需求,在底層架構(gòu)上天然上與通常的關(guān)系型數(shù)據(jù)庫(kù)不同,所以需要有以下限制:

  • DolphinDB 的表沒(méi)有主鍵設(shè)計(jì),必須使用 TSDB 引擎,將主鍵設(shè)置為 sortColumn 字段,并設(shè)置?keepDuplicates=LAST?來(lái)進(jìn)行去重,以確保數(shù)據(jù)唯一性。TSDB 引擎的 sortColumn 是分區(qū)內(nèi)去重,如果使用的是分區(qū)表,需要至少將其中一個(gè)主鍵列設(shè)置為分區(qū)列。
  • Oracle 表的主鍵可能不滿足 TSDB 引擎的 sortColumn 設(shè)置規(guī)則,有以下三種情況:
    • Oracle 表中有兩個(gè)及以上的主鍵,其中一個(gè)主鍵為整數(shù)類型或時(shí)間類型,但末尾列不是整數(shù)類型或時(shí)間類型:
      • 該情況需要調(diào)整 sortColumn 設(shè)置的順序,將整數(shù)類型或時(shí)間類型的主鍵移動(dòng)到末尾。
    • Oracle 表中只有一個(gè)主鍵,或者 Oracle 表中的主鍵的數(shù)據(jù)類型均不包含整數(shù)類型或時(shí)間類型:
      • 該情況需要建表時(shí)在末尾補(bǔ)充一個(gè) dummySortKey__ 列,值均設(shè)置為0,對(duì)應(yīng)同步程序的配置表中需要將add_sortcol_flag 列的值設(shè)置為“1”,若使用 DataX 進(jìn)行同步,則需要做數(shù)據(jù)轉(zhuǎn)換。
    • Oracle 表中的主鍵類型包含 DolphinDB 不支持的類型。
      • DolphinDB TSDB 引擎的 sortColumns 支持整數(shù)、日期或時(shí)間、字符串類型,暫時(shí)不支持小數(shù)類型,但預(yù)計(jì)會(huì)在后續(xù)的版本里支持,請(qǐng)關(guān)注版本更新。

DDL 語(yǔ)句相關(guān):

  • 當(dāng)前不支持 DDL 語(yǔ)句同步。
  • 若表結(jié)構(gòu)發(fā)生更改,需進(jìn)行單表修復(fù),具體操作后續(xù)會(huì)在實(shí)時(shí)同步的運(yùn)維手冊(cè)文檔中給出。

其他:

  • 表字段命名時(shí),請(qǐng)盡量規(guī)避一些簡(jiǎn)單的名字,比如 code, timestamp 等,這種命名與 DolphinDB 內(nèi)關(guān)鍵字重復(fù),可能會(huì)導(dǎo)致無(wú)法正確同步。

5.2 Oracle-DolphinDB 數(shù)據(jù)類型對(duì)應(yīng)表

以下的類型對(duì)應(yīng)表為推薦設(shè)置的 DolphinDB 類型,注意兩者數(shù)據(jù)類型表示的精度范圍,確保 DolphinDB 數(shù)據(jù)類型的精度可以覆蓋原 Oracle 類型。

Oracle類型DolphinDB類型
NUMBER / NUMBER(P, S > 0)DOUBLE / DECIMAL
FLOAT / DOUBLE PRICISIONDOUBLE
INTEGER / NUMBER(P, 0)LONG
VARCHAR(M) / CHAR(M)SYMBOL / STRING
DATEDATE(僅日期)或TIMESTAMP(毫秒級(jí)時(shí)間戳)
TIMESTAMP(0 - 3)TIMESTAMP(毫秒級(jí)時(shí)間戳)
TIMESTAMP, TIMESTAMP(4 - 6)NANOTIMESTAMP(納秒級(jí)時(shí)間戳)
TIMESTAMP(7 - 9)NANOTIMESTAMP

在浮點(diǎn)數(shù)數(shù)據(jù)處理上,Oracle 的 NUMBER 類型是精確值,如果轉(zhuǎn)換成 DolphinDB 的 DOUBLE 類型,會(huì)存在浮點(diǎn)數(shù)精度丟失問(wèn)題。DolphinDB 的 DOUBLE 類型的精度為15-16位有效數(shù)字。因此推薦用戶轉(zhuǎn)換成 DolphinDB 的 DECIMAL 類型,確保浮點(diǎn)數(shù)精度。

在時(shí)間類型轉(zhuǎn)換上,請(qǐng)參照表中的類型映射,以保證 DolphinDB 中的時(shí)間類型字段在精度上可以覆蓋 Oracle 中時(shí)間類型字段的精度。

6. 同步性能測(cè)試

6.1 性能測(cè)試配置

參數(shù)配置調(diào)優(yōu)

本次性能測(cè)試修改了 kafka 的配置參數(shù)?message.max.bytes?為 10,485,760,當(dāng)更新數(shù)據(jù)量比較大時(shí),kafka 每批處理的消息變多了,會(huì)有一定的性能提升。

此外,也可以調(diào)整 Oracle 同步任務(wù)的?log.mining.batch.size.max?配置參數(shù)來(lái)增大 Oracle LogMiner 每批處理的數(shù)據(jù)量。該調(diào)整會(huì)增加 Oracle 的內(nèi)存使用,需按實(shí)際情況評(píng)估調(diào)整。

建表語(yǔ)句

Oracle 建表,并生成測(cè)試數(shù)據(jù)代碼:

DROP TABLE debezium.performance_test1;
CREATE TABLE debezium.performance_test1 (dt date,id varchar2(20),str1 char(10),val number,qty varchar2(20),tm date
);
-- 生成100w行數(shù)據(jù),每天1000行
INSERT INTO debezium.performance_test1 
SELECT to_date('2020-01-01', 'YYYY-MM-DD') + trunc(level/1000), 
to_char(mod(level,1000)+1), 'aa', 1.234, 1000, 
to_date('2024-01-01 15:00:00', 'YYYY-MM-DD HH24:MI:SS') FROM dual
connect by level <= 1000000;
-- 添加主鍵
ALTER TABLE debezium.performance_test1
ADD CONSTRAINT pk_performance_test1 PRIMARY KEY (id, dt);DROP TABLE debezium.performance_test2;
CREATE TABLE debezium.performance_test2 (dt date,id varchar2(20),str1 char(10),val number,qty varchar2(20),tm date
);
-- 生成1億行數(shù)據(jù),每天100000行
INSERT INTO debezium.performance_test2
SELECT to_date('2020-01-01', 'YYYY-MM-DD') + trunc(((lvl1-1)*10000+lvl2)/100000),
to_char(mod(((lvl1-1)*10000+lvl2), 100000)+1), 'aa', 1.234, 1000, 
to_date('2024-01-01 15:00:00', 'YYYY-MM-DD HH24:MI:SS')
FROM (select level as lvl1 from dual connect by Level <= 10000)
CROSS JOIN (select level as lvl2 from dual connect by Level <= 10000);
-- 添加主鍵
ALTER TABLE debezium.performance_test2
ADD CONSTRAINT pk_performance_test2 PRIMARY KEY (id, dt);

DolphinDB 建表代碼:

dbName = "dfs://performance_test1"
tbName = "performance_test1"
colNames = `dt`id`str1`val`qty`tm
colTypes = `DATE`SYMBOL`SYMBOL`DOUBLE`LONG`TIMESTAMP
t = table(1:0, colNames, colTypes)
pkColumns = `id`dt
db = database(dbName, HASH, [SYMBOL, 2], , 'TSDB', 'CHUNK')
db.createTable(t, tbName, sortColumns=pkColumns, keepDuplicates=LAST, softDelete=true)dbName = "dfs://performance_test2"
tbName = "performance_test2"
colNames = `dt`id`str1`val`qty`tm
colTypes = `DATE`SYMBOL`SYMBOL`DOUBLE`LONG`TIMESTAMP
t = table(1:0, colNames, colTypes)
pkColumns = `id`dt
partitionCols = `dt`id
db1 = database(, RANGE, date(datetimeAdd(1990.01M, 0..100*12, 'M')))
db2 = database(, HASH, [SYMBOL, 50])
db = database(dbName, COMPO, [db1, db2], , `TSDB, `CHUNK)
db.createPartitionedTable(t, tbName, partitionColumns=partitionCols, sortColumns=pkColumns, keepDuplicates=LAST, softDelete=true)

6.2 性能測(cè)試結(jié)果

性能測(cè)試結(jié)果如下表所示,其中總耗時(shí)等于 DolphinDB 更新完成時(shí)間減去 Oracle 更新完成時(shí)間,因此總耗時(shí)包含了以下數(shù)據(jù)同步的完整鏈路:

  • Debezium 挖掘 Oracle 日志到 Kafka
  • Kafka 推送數(shù)據(jù)給相應(yīng) topic 的消費(fèi)者
  • 下游的 DolphinDB Connector 消費(fèi) Kafka 中數(shù)據(jù),解析為相應(yīng)的 DolphinDB 更新語(yǔ)句,并執(zhí)行寫入 DolphinDB 完成

Kafka 每次推送的變更數(shù)據(jù)在3000-4000條,具體條數(shù)和 Kafka 的日志大小配置相關(guān)。對(duì)于 insert 和 update 類型的操作,DolphinDB 的處理效率很高。對(duì)于 delete 類型操作,由于 delete 操作涉及數(shù)據(jù)查找, DolphinDB 的處理效率和具體表的數(shù)據(jù)行數(shù)、分區(qū)方式相關(guān)。

7. 常見(jiàn)問(wèn)題解答(FAQ)

7.1 創(chuàng)建同步任務(wù)時(shí)報(bào)錯(cuò)

  • json 文件格式錯(cuò)誤

造成上述問(wèn)題的原因可能是多了逗號(hào)、少了逗號(hào)或者括號(hào)不正確,需要檢查并修訂 json 文件。

  • Failed to resolve Oracle database version

該報(bào)錯(cuò)提示意味著 Oracle 數(shù)據(jù)庫(kù)無(wú)法正常連接。造成上述問(wèn)題的可能原因:

  • 未將?ojdbc8.jar?包到插件目錄下
  • Kafka 用戶對(duì)?ojdbc8.jar?文件沒(méi)有讀權(quán)限

根據(jù)原因調(diào)整文件目錄或讀權(quán)限后,如果還是出現(xiàn)該錯(cuò)誤,或者是其他無(wú)法連接 Oracle 的提示,可以嘗試更換連接方式的寫法,在配置文件中加入如下代碼:

"database.url" : "jdbc:oracle:thin:@192.168.189.130:1521:testdb",
  • Can’t find JdbcSinkConnector

查看日志提示沒(méi)有 JdbcSinkConnector 包的加載。JdbcSinkConnector 是包含在 kafka-connect-jdbc-10.7.4-ddb1.10.jar 包內(nèi),需要確認(rèn)該 jar 包是否防止在 kafka connect 的插件路徑下,確認(rèn) kafka 對(duì)該文件的讀權(quán)限。再通過(guò)?java --version?查看 Java 版本是否是17,Java 版本較低時(shí),可能無(wú)法正確加載插件。目前已知使用 Java 8 時(shí)無(wú)法正確加載該插件。

7.2 數(shù)據(jù)未同步或者未正確同步

當(dāng)數(shù)據(jù)未同步或者未正確同步時(shí),請(qǐng)先按以下兩步進(jìn)行檢查。然后對(duì)照后面的提供的錯(cuò)誤列表進(jìn)行參考調(diào)整。

step1 查看同步任務(wù)狀態(tài)

先查看同步任務(wù)是否報(bào)錯(cuò):

cd /KFDATA/kafka-tools/bin
./rest.sh showall

再看 kafka connect 的日志中是否出現(xiàn) ERROR:

cd /KFDATA/kafka-connect/logs
cat connect.log | grep ERROR

如果有出現(xiàn) ERROR,看 ERROR 顯示的日志是 Oracle 報(bào)錯(cuò)還是 ddb-sink 報(bào)錯(cuò),查看具體的報(bào)錯(cuò)信息。如果同步任務(wù)未報(bào)錯(cuò),也沒(méi)有 ERROR,再通過(guò)以下方式排查。

step2 查看 Oracle 數(shù)據(jù)是否同步到 Kafka

查看 Kafka 所有的 topic:

cd /KFDATA/kafka-tools/bin
./kafka.sh tplist

再查看該 topic 對(duì)應(yīng)的數(shù)據(jù)條數(shù):

./kafka.sh get_offsets oracle_service.DEBEZIUM.INDEX_EXAMPLE
  • 一張表出現(xiàn)兩個(gè) topic 名字

這說(shuō)明 Oracle source 任務(wù)的 topic.prefix 或 DolphinDB sink 任務(wù)的 topics 配置項(xiàng)拼寫有誤,請(qǐng)檢查這兩項(xiàng)。DolphinDB sink 任務(wù)的 topics 必須為 {topic.prefix}.{SCHEMA}.{TABLENAME} 的格式。創(chuàng)建 sink 任務(wù)時(shí),如果 topic 不存在,則會(huì)自動(dòng)創(chuàng)建 topic,因此拼寫錯(cuò)誤會(huì)導(dǎo)致出現(xiàn)兩個(gè) topic 。

  • 沒(méi)有表對(duì)應(yīng)的 topic / 有對(duì)應(yīng)的 topic,但數(shù)據(jù)條數(shù)為0

這說(shuō)明 Oracle 數(shù)據(jù)未正常同步到 Kafka 中,請(qǐng)?jiān)谕饺蝿?wù)的 table.include.list 中檢查 Oracle 表名的拼寫,或者在同步任務(wù)中未配置參數(shù) "log.mining.strategy":"online_catalog" 導(dǎo)致數(shù)據(jù)延遲大。

  • 有對(duì)應(yīng)的 topic,有數(shù)據(jù)條數(shù),但DolphinDB未同步

檢查 DolphinDB Sink 任務(wù)中 topics 配置項(xiàng)中的拼寫,檢查同步任務(wù)配置表中是否有相同的條數(shù)。

查看 Kafka 中數(shù)據(jù)是否與 Oracle 變更數(shù)據(jù)一致:

./tpconsumer.sh --op=2 --topic=oracle_service.DEBEZIUM.INDEX_EXAMPLE --offset=0 --max-messages=20

在顯示的結(jié)果中,初始數(shù)據(jù)同步的消息數(shù)據(jù) op = r,新插入數(shù)據(jù) op = c,更新數(shù)據(jù) op = u

  • 插入數(shù)據(jù)正常,但更新或刪除數(shù)據(jù)時(shí)未同步/報(bào)錯(cuò)

Oracle 未開啟全字段補(bǔ)充日志

ALTER TABLE debezium.index_example ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
ALTER TABLE debezium.stock_example ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

7.3 同步任務(wù)運(yùn)行報(bào)錯(cuò)

  • Java.lang.OutOfMemoryError

Kafka Connect 的默認(rèn)內(nèi)存為1GB,當(dāng)數(shù)據(jù)更新量較大時(shí)會(huì)出現(xiàn) JVM 內(nèi)存不足,需要調(diào)整 JVM 大小。根據(jù)之前配置的安裝路徑,修改 kafka connect 的配置文件:

vim /KFDATA/kafka-connect/etc/kafka-connect.env

在末尾加入 JVM 選項(xiàng),內(nèi)存大小根據(jù)實(shí)際需要調(diào)整:

KAFKA_HEAP_OPTS="-Xms10G -Xmx10G"
  • Supplemental logging not properly configured.

未開啟最小補(bǔ)充日志時(shí),會(huì)報(bào)此錯(cuò)誤

ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;

除了最小補(bǔ)充日志,還需開啟全字段補(bǔ)充日志

ALTER TABLE debezium.index_example ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
ALTER TABLE debezium.stock_example ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

8. 附錄

  • DolphinDB 的 Kafka-Connect 插件包
  • DolphinDB 的 JDBC 包
  • 運(yùn)維腳本包 kafka-tools
http://www.risenshineclean.com/news/40821.html

相關(guān)文章:

  • 商城網(wǎng)站開發(fā)教程視頻北京網(wǎng)絡(luò)營(yíng)銷推廣
  • 歐美設(shè)計(jì)網(wǎng)站推薦app拉新平臺(tái)哪個(gè)好傭金高
  • 密云做網(wǎng)站的產(chǎn)品關(guān)鍵詞大全
  • 建站行業(yè)的利潤(rùn)百度指數(shù)里的資訊指數(shù)是什么
  • 不銹鋼公司網(wǎng)站源碼 網(wǎng)站建設(shè) 產(chǎn)品3級(jí)分類asp源碼域名查詢注冊(cè)商
  • 360度全景街景地圖陽(yáng)江seo
  • 北京營(yíng)銷型網(wǎng)站建設(shè)培訓(xùn)百度怎么優(yōu)化網(wǎng)站關(guān)鍵詞
  • 科學(xué)城做網(wǎng)站公司網(wǎng)絡(luò)營(yíng)銷的四種模式
  • 桂平網(wǎng)站制作seo查詢seo優(yōu)化
  • 網(wǎng)站不備案可以做微信小程序么幫人推廣的平臺(tái)
  • 獨(dú)立商城網(wǎng)站建設(shè)網(wǎng)絡(luò)營(yíng)銷八大目標(biāo)是什么
  • 手機(jī)屏幕網(wǎng)站佛山外貿(mào)seo
  • 新校區(qū)建設(shè)專題網(wǎng)站免費(fèi)瀏覽網(wǎng)站推廣
  • 做網(wǎng)站最重要的是什么seo方法圖片
  • 網(wǎng)站建設(shè)制作設(shè)計(jì)seo優(yōu)化珠海百度人工電話
  • 網(wǎng)站建設(shè) 上海網(wǎng)站百度seo排名規(guī)則
  • 鮮花網(wǎng)站建設(shè)圖片百度知道客服電話人工服務(wù)
  • 蕪湖哪里有做網(wǎng)站的網(wǎng)絡(luò)營(yíng)銷的重要性與意義
  • 新疆做網(wǎng)站首選seo站長(zhǎng)工具推廣平臺(tái)
  • 第五次普查數(shù)據(jù)自網(wǎng)站怎么做搭建網(wǎng)站需要哪些步驟
  • 網(wǎng)站建設(shè)流程分為哪幾個(gè)階段知識(shí)付費(fèi)網(wǎng)站搭建
  • 萊蕪網(wǎng)站seo重慶seo網(wǎng)站收錄優(yōu)化
  • 在那個(gè)網(wǎng)站做任務(wù)賺怎么制作網(wǎng)站教程
  • 網(wǎng)站在線客服源碼南寧網(wǎng)絡(luò)推廣服務(wù)商
  • 做asp動(dòng)態(tài)網(wǎng)站制作流程十大免費(fèi)引流平臺(tái)
  • 網(wǎng)站模板信息不存在網(wǎng)站推廣和優(yōu)化系統(tǒng)
  • 網(wǎng)站素材 下載最近的新聞?wù)?/a>
  • 萊蕪網(wǎng)站制作杭州產(chǎn)品推廣服務(wù)公司
  • 怎樣做網(wǎng)站外鏈最近新聞大事
  • 上海由多少家網(wǎng)站建設(shè)公司佛山seo