Spring Boot集成Debezium快速入门demo

1.Debezium介绍

Debezium是一个用来捕获数据库数据变更的分布式服务,你的应用可以看到这些数据变更,以及处理他们。Debezium以更改事件流的形式记录每张表的行级变更。然后应用可以以事件流产生的顺序读取事件流变更记录。目前支持的Source Connectors是Mysql,MongoDB,PostgresSQL、Oracle、SQL Server、Db2、Cassamdra、Vitesss。

2.mysql 环境安装,开启bin-log

采用docker-compose来搭建测试环境

    
version: '3'services:  mysql:    image: registry.cn-hangzhou.aliyuncs.com/zhengqing/mysql:5.7  # 原镜像`mysql:5.7`    container_name: mysql_3306                                    # 容器名为'mysql_3306'    restart: unless-stopped                                       # 指定容器退出后的重启策略为始终重启,但是不考虑在Docker守护进程启动时就已经停止了的容器    volumes:                                                      # 数据卷挂载路径设置,将本机目录映射到容器目录      - "./mysql/my.cnf:/etc/mysql/my.cnf"      - "./mysql/init-file.sql:/etc/mysql/init-file.sql"      - "./mysql/data:/var/lib/mysql"#      - "./mysql/conf.d:/etc/mysql/conf.d"      - "./mysql/log/mysql/error.log:/var/log/mysql/error.log"      - "./mysql/docker-entrypoint-initdb.d:/docker-entrypoint-initdb.d" # 可执行初始化sql脚本的目录 -- tips:`/var/lib/mysql`目录下无数据的时候才会执行(即第一次启动的时候才会执行)    environment:                        # 设置环境变量,相当于docker run命令中的-e      TZ: Asia/Shanghai      LANG: en_US.UTF-8      MYSQL_ROOT_PASSWORD: root         # 设置root用户密码      MYSQL_DATABASE: demo              # 初始化的数据库名称    ports:                              # 映射端口      - "3306:3306"

启动

    
docker-compose -f docker-compose.yml -p mysql5.7 up -d

在demo数据库中创建表

    
create table user_info(    user_id     varchar(64)          not null        primary key,    username    varchar(100)         null comment '用户名',    age         int(3)               null comment '年龄',    gender      tinyint(1)           null comment '字典类型',    remark      varchar(255)         null comment '描述',    create_time datetime             null comment '创建时间',    create_id   varchar(64)          null comment '创建人ID',    update_time datetime             null comment '修改时间',    update_id   varchar(64)          null comment '修改人ID',    enabled     tinyint(1) default 1 null comment '删除状态(1-正常,0-删除)')    comment '字典表';

查看bin-log是否开启

    
show variables like 'log_%';

 

3.代码工程

实现利用debezium监听bin-log日志来捕获mysql变更数据

pom.xml

    
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">    <parent>        <artifactId>springboot-demo</artifactId>        <groupId>com.et</groupId>        <version>1.0-SNAPSHOT</version>    </parent>    <modelVersion>4.0.0</modelVersion>
<artifactId>debezium</artifactId>
<properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <debezium.version>1.5.2.Final</debezium.version> </properties> <dependencies>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-autoconfigure</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>io.debezium</groupId> <artifactId>debezium-api</artifactId> <version>${debezium.version}</version> </dependency> <dependency> <groupId>io.debezium</groupId> <artifactId>debezium-embedded</artifactId> <version>${debezium.version}</version> </dependency> <dependency> <groupId>io.debezium</groupId> <artifactId>debezium-connector-mysql</artifactId> <version>${debezium.version}</version> </dependency>
<dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.4.3</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.78</version> </dependency>
</dependencies></project>

application.yaml

    
timely:  # 是否开启  switch: true  # 偏移量文件  offset-file-name: D:\IdeaProjects\ETFramework\debezium\docker\offsets.dat  # 是否启东时清除偏移量文件  offset-file-clean: true  # 偏移量提交时间 单位ms  offset-time: 1  # 读取历史记录文件  history-file-name: D:\IdeaProjects\ETFramework\debezium\docker\dbhistory.dat  # 读取的数据库信息  offline:    ip: 127.0.0.1    port: 3306    username: root    password: root    # 保证每个数据库读取的 instance-name  logic-name 不能相同    # 实例名    instance-name: mysql-connector    # 逻辑名    logic-name: mysql-customer    # 读取的表    include-table: dbo.vehicle    # 读取的库    include-db: demo    #mysql.cnf 配置的 server-id    server-id: 1
server: port: 8088

config

    
package com.et.debezium.config;
import cn.hutool.core.io.FileUtil;import com.et.debezium.handler.ChangeEventHandler;import com.google.common.util.concurrent.ThreadFactoryBuilder;import io.debezium.connector.mysql.MySqlConnector;import io.debezium.embedded.Connect;import io.debezium.engine.DebeziumEngine;import io.debezium.engine.RecordChangeEvent;import io.debezium.engine.format.ChangeEventFormat;import lombok.Data;import lombok.SneakyThrows;import lombok.extern.log4j.Log4j2;import org.apache.kafka.connect.source.SourceRecord;import org.springframework.beans.factory.InitializingBean;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.SmartLifecycle;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.util.Assert;
import java.util.concurrent.*;
/** * @author lei * @create 2021-06-22 15:36 * @desc sql server 实时同步 **/@Configuration@Log4j2public class ChangeEventConfig { private final ChangeEventHandler changeEventHandler;
@Value("${timely.offset-file-name}") private String offsetFileName; @Value("${timely.offset-file-clean:true}") private Boolean offsetFileDelete; @Value("${timely.offset-time}") private String offsetTime; @Value("${timely.history-file-name}") private String historyFileName; @Value("${timely.offline.instance-name}") private String instanceName; @Value("${timely.offline.logic-name}") private String logicName; @Value("${timely.offline.ip}") private String ip; @Value("${timely.offline.port}") private String port; @Value("${timely.offline.username}") private String username; @Value("${timely.offline.password}") private String password; @Value("${timely.offline.include-table}") private String includeTable; @Value("${timely.offline.include-db}") private String includeDb; @Value("${timely.offline.server-id}") private String serverId;
@Autowired public ChangeEventConfig(ChangeEventHandler changeEventHandler) { this.changeEventHandler = changeEventHandler; }
@Bean public void cleanFile() { if (offsetFileDelete && FileUtil.exist(offsetFileName)) { FileUtil.del(offsetFileName); } }
/** * Debezium 配置. * * @return configuration */ @Bean io.debezium.config.Configuration debeziumConfig() { return io.debezium.config.Configuration.create() // 连接器的Java类名称 .with("connector.class", MySqlConnector.class.getName()) // 偏移量持久化,用来容错 默认值 .with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore") // 偏移量持久化文件路径 默认/tmp/offsets.dat 如果路径配置不正确可能导致无法存储偏移量 可能会导致重复消费变更 // 如果连接器重新启动,它将使用最后记录的偏移量来知道它应该恢复读取源信息中的哪个位置。 .with("offset.storage.file.filename", offsetFileName) // 捕获偏移量的周期 .with("offset.flush.interval.ms", offsetTime) // 连接器的唯一名称 .with("name", instanceName) // 数据库的hostname .with("database.hostname", ip) // 端口 .with("database.port", port) // 用户名 .with("database.user", username) // 密码 .with("database.password", password) // 包含的数据库列表 .with("database.include.list", includeDb) // 是否包含数据库表结构层面的变更,建议使用默认值true .with("include.schema.changes", "false") // mysql.cnf 配置的 server-id .with("database.server.id", serverId) // MySQL 服务器或集群的逻辑名称 .with("database.server.name", logicName) // 历史变更记录 .with("database.history", "io.debezium.relational.history.FileDatabaseHistory") // 历史变更记录存储位置,存储DDL .with("database.history.file.filename", historyFileName) .build(); } /* @Bean io.debezium.config.Configuration debeziumConfig() { return io.debezium.config.Configuration.create() // 连接器的Java类名称 .with("connector.class", SqlServerConnector.class.getName()) // 偏移量持久化,用来容错 默认值 .with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore") // 要存储偏移量的文件路径 默认/tmp/offsets.dat 如果路径配置不正确可能导致无法存储偏移量 可能会导致重复消费变更 // 如果连接器重新启动,它将使用最后记录的偏移量来知道它应该恢复读取源信息中的哪个位置。 .with("offset.storage.file.filename", offsetFileName) // 尝试提交偏移量的时间间隔。默认值为 1分钟 .with("offset.flush.interval.ms", offsetTime) // 监听连接器实例的 唯一名称 .with("name", instanceName) // SQL Server 实例的地址 .with("database.hostname", ip) // SQL Server 实例的端口号 .with("database.port", port) // SQL Server 用户的名称 .with("database.user", username) // SQL Server 用户的密码 .with("database.password", password) // 要从中捕获更改的数据库的名称 .with("database.dbname", includeDb) // 是否包含数据库表结构层面的变更 默认值true .with("include.schema.changes", "false") // Debezium 应捕获其更改的所有表的列表 .with("table.include.list", includeTable) // SQL Server 实例/集群的逻辑名称,形成命名空间,用于连接器写入的所有 Kafka 主题的名称、Kafka Connect 架构名称以及 Avro 转换器时对应的 Avro 架构的命名空间用来 .with("database.server.name", logicName) // 负责数据库历史变更记录持久化Java 类的名称 .with("database.history", "io.debezium.relational.history.FileDatabaseHistory") // 历史变更记录存储位置,存储DDL .with("database.history.file.filename", historyFileName) .build(); }*/
/** * 实例化sql server 实时同步服务类,执行任务 * * @param configuration * @return */ @Bean SqlServerTimelyExecutor sqlServerTimelyExecutor(io.debezium.config.Configuration configuration) { SqlServerTimelyExecutor sqlServerTimelyExecutor = new SqlServerTimelyExecutor(); DebeziumEngine<RecordChangeEvent<SourceRecord>> debeziumEngine = DebeziumEngine .create(ChangeEventFormat.of(Connect.class)) .using(configuration.asProperties()) .notifying(changeEventHandler::handlePayload) .build(); sqlServerTimelyExecutor.setDebeziumEngine(debeziumEngine); return sqlServerTimelyExecutor; }
/** * @author lei * @version 1.0 * @date 2021-06-22 15:39 * @desc 同步执行服务类 */ @Data @Log4j2 public static class SqlServerTimelyExecutor implements InitializingBean, SmartLifecycle { private final ExecutorService executor = ThreadPoolEnum.INSTANCE.getInstance(); private DebeziumEngine<?> debeziumEngine;
@Override public void start() { log.warn(ThreadPoolEnum.SQL_SERVER_LISTENER_POOL + "线程池开始执行 debeziumEngine 实时监听任务!"); executor.execute(debeziumEngine); }
@SneakyThrows @Override public void stop() { log.warn("debeziumEngine 监听实例关闭!"); debeziumEngine.close(); Thread.sleep(2000); log.warn(ThreadPoolEnum.SQL_SERVER_LISTENER_POOL + "线程池关闭!"); executor.shutdown(); }
@Override public boolean isRunning() { return false; }
@Override public void afterPropertiesSet() { Assert.notNull(debeziumEngine, "DebeZiumEngine 不能为空!"); }
public enum ThreadPoolEnum { /** * 实例 */ INSTANCE;
public static final String SQL_SERVER_LISTENER_POOL = "sql-server-listener-pool"; /** * 线程池单例 */ private final ExecutorService es;
/** * 枚举 (构造器默认为私有) */ ThreadPoolEnum() { final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(SQL_SERVER_LISTENER_POOL + "-%d").build(); es = new ThreadPoolExecutor(8, 16, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(256), threadFactory, new ThreadPoolExecutor.DiscardPolicy()); }
/** * 公有方法 * * @return ExecutorService */ public ExecutorService getInstance() { return es; } } }
}

handler

    
package com.et.debezium.handler;

import cn.hutool.core.bean.BeanUtil;import com.alibaba.fastjson.JSON;import com.et.debezium.model.ChangeListenerModel;import io.debezium.data.Envelope;import io.debezium.engine.DebeziumEngine;import io.debezium.engine.RecordChangeEvent;import lombok.Getter;import lombok.extern.log4j.Log4j2;import org.apache.commons.lang3.tuple.Pair;import org.apache.kafka.connect.data.Field;import org.apache.kafka.connect.data.Struct;import org.apache.kafka.connect.source.SourceRecord;import org.springframework.stereotype.Service;import org.springframework.util.CollectionUtils;
import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.Objects;import java.util.stream.Collectors;import java.util.stream.Stream;
import static io.debezium.data.Envelope.FieldName.*;import static java.util.stream.Collectors.toMap;
/** * @author lei * @create 2021-06-22 16:11 * @desc 变更数据处理 **/@Service@Log4j2public class ChangeEventHandler {
public static final String DATA = "data"; public static final String BEFORE_DATA = "beforeData"; public static final String EVENT_TYPE = "eventType"; public static final String SOURCE = "source"; public static final String TABLE = "table";
private enum FilterJsonFieldEnum { /** * 表 */ table, /** * 库 */ db, /** * 操作时间 */ ts_ms, ;
public static Boolean filterJsonField(String fieldName) { return Stream.of(values()).map(Enum::name).collect(Collectors.toSet()).contains(fieldName); } }
/** * @author lei * @create 2021-06-24 16:04 * @desc 变更类型枚举 **/ public enum EventTypeEnum { /** * 增 */ CREATE(1), /** * 删 */ UPDATE(2), /** * 改 */ DELETE(3), ; @Getter private final int type;
EventTypeEnum(int type) { this.type = type; } }
public void handlePayload(List<RecordChangeEvent<SourceRecord>> recordChangeEvents, DebeziumEngine.RecordCommitter<RecordChangeEvent<SourceRecord>> recordCommitter) { for (RecordChangeEvent<SourceRecord> r : recordChangeEvents) { SourceRecord sourceRecord = r.record(); Struct sourceRecordChangeValue = (Struct) sourceRecord.value(); if (sourceRecordChangeValue == null) { continue; } // 获取变更表数据 Map<String, Object> changeMap = getChangeTableInfo(sourceRecordChangeValue); if (CollectionUtils.isEmpty(changeMap)) { continue; } ChangeListenerModel changeListenerModel = getChangeDataInfo(sourceRecordChangeValue, changeMap); if (changeListenerModel == null) { continue; } String jsonString = JSON.toJSONString(changeListenerModel); log.info("发送变更数据:{}", jsonString); } try { recordCommitter.markBatchFinished(); } catch (InterruptedException e) { e.printStackTrace(); } }
private ChangeListenerModel getChangeDataInfo(Struct sourceRecordChangeValue, Map<String, Object> changeMap) { // 操作类型过滤,只处理增删改 Envelope.Operation operation = Envelope.Operation.forCode((String) sourceRecordChangeValue.get(OPERATION)); if (operation != Envelope.Operation.READ) { Integer eventType = null; Map<String, Object> result = new HashMap<>(4); if (operation == Envelope.Operation.CREATE) { eventType = EventTypeEnum.CREATE.getType(); result.put(DATA, getChangeData(sourceRecordChangeValue, AFTER)); result.put(BEFORE_DATA, null); } // 修改需要特殊处理,拿到前后的数据 if (operation == Envelope.Operation.UPDATE) { if (!changeMap.containsKey(TABLE)) { return null; } eventType = EventTypeEnum.UPDATE.getType(); String currentTableName = String.valueOf(changeMap.get(TABLE).toString()); // 忽略非重要属性变更 Map<String, String> resultMap = filterChangeData(sourceRecordChangeValue, currentTableName); if (CollectionUtils.isEmpty(resultMap)) { return null; } result.put(DATA, resultMap.get(AFTER)); result.put(BEFORE_DATA, resultMap.get(BEFORE)); } if (operation == Envelope.Operation.DELETE) { eventType = EventTypeEnum.DELETE.getType(); result.put(DATA, getChangeData(sourceRecordChangeValue, BEFORE)); result.put(BEFORE_DATA, getChangeData(sourceRecordChangeValue, BEFORE)); } result.put(EVENT_TYPE, eventType); result.putAll(changeMap); return BeanUtil.copyProperties(result,ChangeListenerModel.class); } return null; }
/** * 过滤非重要变更数据 * * @param sourceRecordChangeValue * @param currentTableName * @return */ private Map<String, String> filterChangeData(Struct sourceRecordChangeValue, String currentTableName) { Map<String, String> resultMap = new HashMap<>(4); Map<String, Object> afterMap = getChangeDataMap(sourceRecordChangeValue, AFTER); Map<String, Object> beforeMap = getChangeDataMap(sourceRecordChangeValue, BEFORE); //todo 根据表过滤字段 resultMap.put(AFTER, JSON.toJSONString(afterMap)); resultMap.put(BEFORE, JSON.toJSONString(beforeMap)); return resultMap; }
/** * 校验是否仅仅是非重要字段属性变更 * @param currentTableName * @param afterMap * @param beforeMap * @param filterColumnList * @return */ private boolean checkNonEssentialData(String currentTableName, Map<String, Object> afterMap, Map<String, Object> beforeMap, List<String> filterColumnList) { Map<String, Boolean> filterMap = new HashMap<>(16); for (String key : afterMap.keySet()) { Object afterValue = afterMap.get(key); Object beforeValue = beforeMap.get(key); filterMap.put(key, !Objects.equals(beforeValue, afterValue)); } filterColumnList.parallelStream().forEach(filterMap::remove); if (filterMap.values().stream().noneMatch(x -> x)) { log.info("表:{}无核心资料变更,忽略此次操作!", currentTableName); return true; } return false; }

public String getChangeData(Struct sourceRecordChangeValue, String record) { Map<String, Object> changeDataMap = getChangeDataMap(sourceRecordChangeValue, record); if (CollectionUtils.isEmpty(changeDataMap)) { return null; } return JSON.toJSONString(changeDataMap); }
public Map<String, Object> getChangeDataMap(Struct sourceRecordChangeValue, String record) { Struct struct = (Struct) sourceRecordChangeValue.get(record); // 将变更的行封装为Map Map<String, Object> changeData = struct.schema().fields().stream() .map(Field::name) .filter(fieldName -> struct.get(fieldName) != null) .map(fieldName -> Pair.of(fieldName, struct.get(fieldName))) .collect(toMap(Pair::getKey, Pair::getValue)); if (CollectionUtils.isEmpty(changeData)) { return null; } return changeData; }
private Map<String, Object> getChangeTableInfo(Struct sourceRecordChangeValue) { Struct struct = (Struct) sourceRecordChangeValue.get(SOURCE); Map<String, Object> map = struct.schema().fields().stream() .map(Field::name) .filter(fieldName -> struct.get(fieldName) != null && FilterJsonFieldEnum.filterJsonField(fieldName)) .map(fieldName -> Pair.of(fieldName, struct.get(fieldName))) .collect(toMap(Pair::getKey, Pair::getValue)); if (map.containsKey(FilterJsonFieldEnum.ts_ms.name())) { map.put("changeTime", map.get(FilterJsonFieldEnum.ts_ms.name())); map.remove(FilterJsonFieldEnum.ts_ms.name()); } return map; }
}

model

package com.et.debezium.model;
import lombok.Data;
/** * @author lei * @create 2021-06-23 09:58 * @desc sqlServer数据变更model **/@Datapublic class ChangeListenerModel { /** * 当前DB */ private String db; /** * 当前表 */ private String table; /** * 操作类型 1add 2update 3 delete */ private Integer eventType; /** * 操作时间 */ private Long changeTime; /** * 现数据 */ private String data; /** * 之前数据 */ private String beforeData;}
DemoApplication.javapackage com.et.debezium;
import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplicationpublic class DemoApplication {
public static void main(String[] args) { SpringApplication.run(DemoApplication.class, args); }}

代码仓库

  • https://github.com/Harries/springboot-demo

4.测试

启动应用

    
2024-03-20 10:31:25.968 INFO 23260 --- [rce-coordinator] io.debezium.util.Threads : Creating thread debezium-mysqlconnector-mysql-customer-binlog-client2024-03-20 10:31:25.971 INFO 23260 --- [-127.0.0.1:3306] io.debezium.util.Threads : Creating thread debezium-mysqlconnector-mysql-customer-binlog-client2024-03-20 10:31:25.983 INFO 23260 --- [-127.0.0.1:3306] c.g.shyiko.mysql.binlog.BinaryLogClient : Connected to 127.0.0.1:3306 at mysql-bin.000001/154 (sid:1, cid:8)2024-03-20 10:31:25.983 INFO 23260 --- [-127.0.0.1:3306] i.d.c.m.MySqlStreamingChangeEventSource : Connected to MySQL binlog at 127.0.0.1:3306, starting at MySqlOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.mysql.Source:STRUCT}, sourceInfo=SourceInfo [currentGtid=null, currentBinlogFilename=mysql-bin.000001, currentBinlogPosition=154, currentRowNumber=0, serverId=0, sourceTime=2024-03-20T02:31:25.942Z, threadId=-1, currentQuery=null, tableIds=[demo.user], databaseName=demo], partition={server=mysql-customer}, snapshotCompleted=true, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0], restartGtidSet=null, currentGtidSet=null, restartBinlogFilename=mysql-bin.000001, restartBinlogPosition=154, restartRowsToSkip=0, restartEventsToSkip=0, currentEventLengthInBytes=0, inTransaction=false, transactionId=null]2024-03-20 10:31:25.983 INFO 23260 --- [rce-coordinator] i.d.c.m.MySqlStreamingChangeEventSource : Waiting for keepalive thread to start2024-03-20 10:31:25.984 INFO 23260 --- [-127.0.0.1:3306] io.debezium.util.Threads : Creating thread debezium-mysqlconnector-mysql-customer-binlog-client2024-03-20 10:31:26.090 INFO 23260 --- [rce-coordinator] i.d.c.m.MySqlStreamingChangeEventSource : Keepalive thread is running2024-03-20 10:33:36.536 WARN 23260 --- [-127.0.0.1:3306] io.debezium.util.SchemaNameAdjuster : The Kafka Connect schema name 'mysql-customer.demo.user_info.Value' is not a valid Avro schema name, so replacing with 'mysql_customer.demo.user_info.Value'2024-03-20 10:33:36.536 WARN 23260 --- [-127.0.0.1:3306] io.debezium.util.SchemaNameAdjuster : The Kafka Connect schema name 'mysql-customer.demo.user_info.Key' is not a valid Avro schema name, so replacing with 'mysql_customer.demo.user_info.Key'2024-03-20 10:33:36.537 WARN 23260 --- [-127.0.0.1:3306] io.debezium.util.SchemaNameAdjuster : The Kafka Connect schema name 'mysql-customer.demo.user_info.Envelope' is not a valid Avro schema name, so replacing with 'mysql_customer.demo.user_info.Envelope'2024-03-20 10:33:36.540 INFO 23260 --- [-127.0.0.1:3306] i.d.r.history.DatabaseHistoryMetrics : Already applied 7 database changes

在user_info插入一些数据

    
INSERT INTO demo.user_info(user_id, username, age, gender, remark, create_time, create_id, update_time, update_id, enabled)VALUES('1', '1', 1, 1, '1', NULL, '1', NULL, '1', 1);

观察控制台输出

    
2024-03-20 10:34:13.156 INFO 23260 --- [listener-pool-0] c.e.debezium.handler.ChangeEventHandler : 发送变更数据:{"changeTime":1710902052000,"data":"{\"update_id\":\"1\",\"create_id\":\"1\",\"gender\":1,\"user_id\":\"1\",\"remark\":\"1\",\"enabled\":1,\"age\":1,\"username\":\"1\"}","db":"demo","eventType":1,"table":"user_info"}

5.引用

  • http://www.liuhaihua.cn/archives/710327.html

  • https://github.com/leilei0220/springboot-learn/tree/master

相关推荐

  • 博士申请 | 香港大学陈怡老师课题组招收计算机安全方向全奖博士/博后
  • CVPR 2024 | 通过细粒度人类反馈对齐数据,提高多模态大模型可信度
  • TPAMI 2024 | 针对节点的融合全局-局部信息的图谱滤波方法
  • 多领域文献阅读超越GPT-4!深势科技发布科学文献多模态大模型Uni-SMART技术报告
  • 警惕发布前夕的“致命遗忘”
  • 邀你探索金融、教育、法律及医疗行业的大模型创新应用,AICon 2024 即将拉开帷幕
  • Java 22 正式发布
  • 微软开抢年收入上亿美元的 Redis 饭碗?开源性能遥遥领先的 Garnet:无需修改,Redis 客户端可直接接入
  • 风控也在用大模型了
  • 一次性支持 200 万字无损上下文!Kimi智能助手玩了个大的——月之暗面「登月」最新进展!
  • 今日arXiv最热NLP大模型论文:大模型RAG新宠!浙江大学发布自反馈检索增强方法
  • 无人驾驶飞机上架淘宝!?亿航智能首款国产「空中的士」网上开售,股价一度飞升 30%
  • 如何从头开始编写LoRA代码,这有一份教程
  • Stable Video 3D震撼登场:单图生成无死角3D视频、模型权重开放
  • 利物浦用DeepMind的AI制定战术已有三年了
  • 融资15亿美元却被挖走创始人,微软正在将这家创企生吞活剥?
  • 奥特曼回应一切:GPT-5、董事会宫斗、Ilya当时看到了什么
  • 谷歌发布“Vlogger”模型:单张图片生成10秒视频
  • AI足球教练上岗利物浦,射门机会提高13%!来自DeepMind,网友:这不公平
  • 旷视实战大模型:把多模态扎进行业