一不小心成了知名开源项目的贡献者?!

每天早上七点三十,准时推送干货

真是一个意外之喜,之前写的 18 张图手把手教你使用 Canal Adapter 同步 MySQL 数据到 ES8,建议收藏!写了怎么使用 Canal Adapter 进行 MySQL 数据同步到 ES8

在这个文章中我提到了这么一个内容,官方自带的 ES8 Adapter 同步类不支持 ES8TLS 认证,所以导致我们在部署 ES8 集群的时候需要关闭这个安全功能。

但是作为技术人员就是见不得功能被阉割,所以就拉取了源码,在原有的基础上进行改造支持了 TLS 认证。

改完过后本地重新打包实现了功能,本着独乐乐不如众乐乐,就顺手提了一个 PR,结果万万没想到,最近发现这个 PR 被合并到主干了!!!

就这样一不小心成了一个几万星的知名开源项目贡献者,大佬还在 PR 下面回复了一个 tks,突然发现自己和大佬也可以靠的这么近。

问题描述

在没有修复的时候,启动了 canal 适配器过后,在进行 MySQL 数据同步到 ES8 的时候,出现下面的错误,这个错误的原因是因为 ES8 默认开启了安全认证,并且自带了签名证书。Canal Adapter 在适配 ES8 的时候并没有支持这个功能,因此报错了。

2024-04-13 20:55:39.368 [pool-3-thread-1] ERROR c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - ElasticsearchException[java.util.concurrent.ExecutionException: javax.net.ssl.SSLHandshakeException: General SSLEngine problem]; nested: ExecutionException[javax.net.ssl.SSLHandshakeException: General SSLEngine problem]; nested: SSLHandshakeException[General SSLEngine problem]; nested: SSLHandshakeException[General SSLEngine problem]; nested: ValidatorException[PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target]; nested: SunCertPathBuilderException[unable to find valid certification path to requested target];
java.lang.RuntimeException: ElasticsearchException[java.util.concurrent.ExecutionException: javax.net.ssl.SSLHandshakeException: General SSLEngine problem]; nested: ExecutionException[javax.net.ssl.SSLHandshakeException: General SSLEngine problem]; nested: SSLHandshakeException[General SSLEngine problem]; nested: SSLHandshakeException[General SSLEngine problem]; nested: ValidatorException[PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target]; nested: SunCertPathBuilderException[unable to find valid certification path to requested target];
 at com.alibaba.otter.canal.client.adapter.es.core.service.ESSyncService.sync(ESSyncService.java:112)
 at com.alibaba.otter.canal.client.adapter.es.core.service.ESSyncService.sync(ESSyncService.java:60)
 at com.alibaba.otter.canal.client.adapter.es.core.ESAdapter.sync(ESAdapter.java:104)
 at com.alibaba.otter.canal.client.adapter.es.core.ESAdapter.sync(ESAdapter.java:83)
 at com.alibaba.otter.canal.client.adapter.ProxyOuterAdapter.sync(ProxyOuterAdapter.java:42)
 at com.alibaba.otter.canal.adapter.launcher.loader.AdapterProcessor.batchSync(AdapterProcessor.java:139)
 at com.alibaba.otter.canal.adapter.launcher.loader.AdapterProcessor.lambda$null$1(AdapterProcessor.java:97)
 at java.util.concurrent.CopyOnWriteArrayList.forEach(CopyOnWriteArrayList.java:890)
 at com.alibaba.otter.canal.adapter.launcher.loader.AdapterProcessor.lambda$null$2(AdapterProcessor.java:94)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)
Caused by: org.elasticsearch.ElasticsearchException: java.util.concurrent.ExecutionException: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
 at org.elasticsearch.client.RestHighLevelClient.performClientRequest(RestHighLevelClient.java:2695)
 at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:2171)
 at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:2154)
 at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:2118)
 at org.elasticsearch.client.IndicesClient.getMapping(IndicesClient.java:538)
 at com.alibaba.otter.canal.client.adapter.es8x.support.ESConnection.getMapping(ESConnection.java:132)
 at com.alibaba.otter.canal.client.adapter.es8x.support.ES8xTemplate.getEsType(ES8xTemplate.java:392)
 at com.alibaba.otter.canal.client.adapter.es8x.support.ES8xTemplate.getValFromData(ES8xTemplate.java:269)
 at com.alibaba.otter.canal.client.adapter.es8x.support.ES8xTemplate.getESDataFromDmlData(ES8xTemplate.java:324)
 at com.alibaba.otter.canal.client.adapter.es.core.service.ESSyncService.singleTableSimpleFiledUpdate(ESSyncService.java:814)
 at com.alibaba.otter.canal.client.adapter.es.core.service.ESSyncService.update(ESSyncService.java:208)
 at com.alibaba.otter.canal.client.adapter.es.core.service.ESSyncService.sync(ESSyncService.java:97)
 ... 12 common frames omitted
Caused by: java.util.concurrent.ExecutionException: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
 at org.elasticsearch.common.util.concurrent.BaseFuture$Sync.getValue(BaseFuture.java:257)
 at org.elasticsearch.common.util.concurrent.BaseFuture$Sync.get(BaseFuture.java:244)
 at org.elasticsearch.common.util.concurrent.BaseFuture.get(BaseFuture.java:75)
 at org.elasticsearch.client.RestHighLevelClient.performClientRequest(RestHighLevelClient.java:2692)
 ... 23 common frames omitted
Caused by: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
 at sun.security.ssl.Handshaker.checkThrown(Handshaker.java:1431)
 at sun.security.ssl.SSLEngineImpl.checkTaskThrown(SSLEngineImpl.java:535)
 at sun.security.ssl.SSLEngineImpl.writeAppRecord(SSLEngineImpl.java:1214)
 at sun.security.ssl.SSLEngineImpl.wrap(SSLEngineImpl.java:1186)
 at javax.net.ssl.SSLEngine.wrap(SSLEngine.java:469)
 at org.apache.http.nio.reactor.ssl.SSLIOSession.doWrap(SSLIOSession.java:270)
 at org.apache.http.nio.reactor.ssl.SSLIOSession.doHandshake(SSLIOSession.java:316)
 at org.apache.http.nio.reactor.ssl.SSLIOSession.isAppInputReady(SSLIOSession.java:537)
 at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:120)
 at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
 at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
 at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
 at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
 at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
 at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)
 ... 1 common frames omitted
Caused by: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
 at sun.security.ssl.Alerts.getSSLException(Alerts.java:192)
 at sun.security.ssl.SSLEngineImpl.fatal(SSLEngineImpl.java:1728)
 at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:304)
 at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:296)
 at sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1509)
 at sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:216)
 at sun.security.ssl.Handshaker.processLoop(Handshaker.java:979)
 at sun.security.ssl.Handshaker$1.run(Handshaker.java:919)
 at sun.security.ssl.Handshaker$1.run(Handshaker.java:916)
 at java.security.AccessController.doPrivileged(Native Method)
 at sun.security.ssl.Handshaker$DelegatedTask.run(Handshaker.java:1369)
 at org.apache.http.nio.reactor.ssl.SSLIOSession.doRunTask(SSLIOSession.java:288)
 at org.apache.http.nio.reactor.ssl.SSLIOSession.doHandshake(SSLIOSession.java:356)
 ... 9 common frames omitted
Caused by: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
 at sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:387)
 at sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:292)
 at sun.security.validator.Validator.validate(Validator.java:260)
 at sun.security.ssl.X509TrustManagerImpl.validate(X509TrustManagerImpl.java:324)
 at sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:281)
 at sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:136)
 at sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1496)
 ... 17 common frames omitted
Caused by: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
 at sun.security.provider.certpath.SunCertPathBuilder.build(SunCertPathBuilder.java:141)
 at sun.security.provider.certpath.SunCertPathBuilder.engineBuild(SunCertPathBuilder.java:126)
 at java.security.cert.CertPathBuilder.build(CertPathBuilder.java:280)
 at sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:382)
 ... 23 common frames omitted
2024-04-13 20:55:39.370 [Thread-4] ERROR c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - Outer adapter sync failed!  Error sync and rollback, execute times: 13

解决方案

解决方案有两个:

  1. 部署搭建 ES 集群的时候,关闭这个安全证书的功能,对应 ES 的配置是在 elasticsearch.yml 里面的 xpack.security.enabled falsedocker 部署的 ES 需要进入的容器里面去进行修改,或者在容器启动的时候就配置。
  2. 修改 canal adapter 的源码,兼容证书;

这里主要讲一下方案 2,因为对于方案 1 需要取消 ES8 的安全功能,不推荐。

修改源码,兼容 ES8 安全配置

拷贝证书

在使用 docker 安装和部署 ES8 的时候,默认已经创建好了一个证书,我们需要将证书从容器中拷贝出来,命令如下

docker cp es01:/usr/share/elasticsearch/config/certs/http_ca.crt .

这里的 es01 是容器名称,根据自己的进行替换即可,拷贝出来的路径可以自行替换,记住在哪就行,后面会用到。

修改代码

canal adapter 的源码中,找到下面这类,com.alibaba.otter.canal.client.adapter.es8x.support.ESConnection#ESConnection

将其中的构造方法改成下面这段

public ESConnection(String[] hosts, Map<String, String> properties) throws UnknownHostException {
    String caPath = properties.get("security.ca.path");
    if (StringUtils.isNotEmpty(caPath)) {
        connectEsWithCa(hosts, properties, caPath);
    } else {
        connectEsWithoutCa(hosts, properties);
    }
}
private void connectEsWithCa(String[] hosts, Map<String, String> properties, String caPath) {
    Path caCertificatePath = Paths.get(caPath);
    try (InputStream is = Files.newInputStream(caCertificatePath)) {
        CertificateFactory factory = CertificateFactory.getInstance("X.509");
        Certificate trustedCa = factory.generateCertificate(is);
        KeyStore trustStore = KeyStore.getInstance("pkcs12");
        trustStore.load(nullnull);
        trustStore.setCertificateEntry("ca", trustedCa);
        SSLContextBuilder sslContextBuilder = SSLContexts.custom()
        .loadTrustMaterial(trustStore, null);
        final SSLContext sslContext = sslContextBuilder.build();

        HttpHost[] httpHosts = Arrays.stream(hosts).map(this::createHttpHost).toArray(HttpHost[]::new);
        RestClientBuilder restClientBuilder = RestClient.builder(httpHosts);
        String nameAndPwd = properties.get("security.auth");
        if (StringUtils.isNotEmpty(nameAndPwd) && nameAndPwd.contains(":")) {
            String[] nameAndPwdArr = nameAndPwd.split(":");
            final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
            credentialsProvider.setCredentials(AuthScope.ANY,
                                               new UsernamePasswordCredentials(nameAndPwdArr[0], nameAndPwdArr[1]));
            restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {
                httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                return httpClientBuilder.setSSLContext(sslContext);
                });
            }
            restHighLevelClient = new RestHighLevelClientBuilder(restClientBuilder.build()).setApiCompatibilityMode(true).build();
        } catch (Exception e) {
            throw new RuntimeException(e);
    }
}

private void connectEsWithoutCa(String[] hosts, Map<String, String> properties) {
    HttpHost[] httpHosts = Arrays.stream(hosts).map(this::createHttpHost).toArray(HttpHost[]::new);
    RestClientBuilder restClientBuilder = RestClient.builder(httpHosts);
    String nameAndPwd = properties.get("security.auth");
    if (StringUtils.isNotEmpty(nameAndPwd) && nameAndPwd.contains(":")) {
        String[] nameAndPwdArr = nameAndPwd.split(":");
        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY,
                new UsernamePasswordCredentials(nameAndPwdArr[0], nameAndPwdArr[1]));
        restClientBuilder.setHttpClientConfigCallback(
                httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
    }
    restHighLevelClient = new RestHighLevelClientBuilder(restClientBuilder.build()).setApiCompatibilityMode(true)
            .build();
}

简单说明

  1. 其中 connectEsWithoutCa 方法为原来的构造方法的实现;
  2. connectEsWithCa 方法为兼容了安全认证的方法构造方法实现;
  3. 这两个方法的使用根据是否配置了 security.ca.path 属性来判断;
  4. security.ca.path 这个配置是在启动器的 outerAdaptersES8properties 下,与 security.auth 同级;

代码修改到这里就结束了,下面看下如何使用

重新打包

修改好了代码过后,通过 maven 重新打包,打包出对应的 es8jar 包即可。

将编译打包后的 jar 重新复制到 canal 适配器的 plugin 目录下面,并且修改一下对应的名称跟下载下来的版本一致即可,比如我这边之前下载的 1.1.7 版本。

img

其中 client-adapter.es8x-1.1.7-jar-with-dependencies.jar.7 是原来下载下来携带的 jarclient-adapter.es8x-1.1.7-jar-with-dependencies.jar 是我重新打包编译后的 jar

修改启动器的配置

前面讲到兼容代码的时候,我们使用了一个叫 security.ca.path 的配置,所以我们需要将前面拷贝的 ca 证书路径,配置在这个属性上,即 security.ca.path: /opt/canal/http_ca.crt

完整的配置如下所示

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: -1
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    # canal tcp consumer
    canal.tcp.server.host: 127.0.0.1:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
    # kafka consumer
    # kafka.bootstrap.servers: 127.0.0.1:9092
    # kafka.enable.auto.commit: false
    # kafka.auto.commit.interval.ms: 1000
    # kafka.auto.offset.reset: latest
    # kafka.request.timeout.ms: 40000
    # kafka.session.timeout.ms: 30000
    # kafka.isolation.level: read_committed
    # kafka.max.poll.records: 1000
    # rocketMQ consumer
    # rocketmq.namespace:
    # rocketmq.namesrv.addr: 127.0.0.1:9876
    # rocketmq.batch.size: 1000
    # rocketmq.enable.message.trace: false
    # rocketmq.customized.trace.topic:
    # rocketmq.access.channel:
    # rocketmq.subscribe.filter:
    # rabbitMQ consumer
    # rabbitmq.host:
    # rabbitmq.virtual.host:
    # rabbitmq.username:
    # rabbitmq.password:
    # rabbitmq.resource.ownerId:

  srcDataSources:
    defaultDS:
      url: jdbc:mysql://127.0.0.1:3306/database?useUnicode=true
      username: root
      password: 123456
  canalAdapters:
  - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
        - name: es8
          key: es-key
          hosts: https://127.0.0.1:9200 # 127.0.0.1:9200 for rest mode
          properties:
            mode: rest # transport or rest
            security.auth: elastic:password
            security.ca.path: /opt/canal/http_ca.crt
            cluster.name: docker-cluster
        - name: logger
#      - name: rdb
#        key: mysql1
#        properties:
#          jdbc.driverClassName: com.mysql.jdbc.Driver
#          jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
#          jdbc.username: root
#          jdbc.password: 121212
#          druid.stat.enable: false
#          druid.stat.slowSqlMillis: 1000
#      - name: rdb
#        key: oracle1
#        properties:
#          jdbc.driverClassName: oracle.jdbc.OracleDriver
#          jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
#          jdbc.username: mytest
#          jdbc.password: m121212
#      - name: rdb
#        key: postgres1
#        properties:
#          jdbc.driverClassName: org.postgresql.Driver
#          jdbc.url: jdbc:postgresql://localhost:5432/postgres
#          jdbc.username: postgres
#          jdbc.password: 121212
#          threads: 1
#          commitSize: 3000
#      - name: hbase
#        properties:
#          hbase.zookeeper.quorum: 127.0.0.1
#          hbase.zookeeper.property.clientPort: 2181
#          zookeeper.znode.parent: /hbase

#      - name: kudu
#        key: kudu
#        properties:
#          kudu.master.address: 127.0.0.1 # ',' split multi address
#      - name: phoenix
#        key: phoenix
#        properties:
#          jdbc.driverClassName: org.apache.phoenix.jdbc.PhoenixDriver
#          jdbc.url: jdbc:phoenix:127.0.0.1:2181:/hbase/db
#          jdbc.username:
#          jdbc.password:

配置好了证书路径过后,就可以正常启动和同步数据了,具体的实操也可以看对应的公众号文章 18 张图手把手教你使用 Canal Adapter 同步 MySQL 数据到 ES8,建议收藏!,这里就不重复演示了。

总结

以前一直想着要参与一下开源项目,没想到这次也算是实现了一个小小的目标,其实这次纯属是一个意外之喜,原本只是在自己学习和研究 canal 的数据同步,然后发现了这个问题,最后就修复了一下,顺手提了一个 PR,没想到还真的被合并了,想想还是很激动的。

这个事情告诉我们只要真正的去参与和使用并了解一个开源项目了过后,还是有机会贡献自己的代码的,哪怕只是一个很小的一部分,也算是为开源项目贡献了一份自己的绵薄之力。

另外最近也发现了另一个开源项目的一些小 bug,回头再提交一下 PR,向着开源的道路继续前行。


号外!号外!

Java 极客技术微信群中有很多优秀的小伙伴在讨论技术,偶尔还有不定期的资料分享和红包发放!如果你想提升自己,并且想和优秀的人一起进步,感兴趣的朋友,可以在下方公众号后台回复:666

喜欢就分享
认同就点赞

支持就在看

一键四连,你的offer也四连

相关推荐

  • 30%参数达到92%的表现,大模型稀疏化方法显神通
  • 从零开始手搓GPU,照着英伟达CUDA来,只用两个星期
  • Sam Altman 5月最新2万字谈AI未来:GPT-5与OpenAI的使命 (附全文+视频)
  • [开源]综合性在线网校平台,内容分享、付费、营销一站式解决方案
  • 偷偷爆料下银行信息科技岗(含各大银行薪资)
  • 与AI角色共筑虚拟人生是啥体验?
  • 进程之间的通信方式有哪些?我被问倒了。。
  • 被 Bug 折磨疯后,我写了本 Bug 修复手册。
  • 我是真讨厌验证码啊!赶紧淘汰掉吧
  • 上班摸鱼神器!标星 17k 的安卓手机投屏工具!
  • 首个ICLR时间检验奖出炉!3万被引论文奠定图像生成范式,DALL-E 3/SD背后都靠它
  • 文末送书!大语言模型应用指南:以ChatGPT为起点,从入门到精通的实践教程
  • 【毕业论文】求解最优的任意宝可梦颜色交换算法
  • 到底该不该上APS?
  • 1.6K Star专业小巧!只有10M的录屏软件,Mac可用
  • 绿联,你人还怪好嘞
  • 一文读懂多家厂商的大模型训练、推理、部署策略
  • 中国人有鸿蒙,凭什么要用美国的安卓?
  • 大模型产研2024年4月半月谈后的一些冷思考:思辨及从关键事件、RAG-KG结合到文档图表理解的一些跟进
  • 每日大模型&RAG&文档智能&知识图谱进展及月度线上交流:老刘说NLP技术社区对外纳新