ES和kibana得安装
https://www.cnblogs.com/yuersan/p/15353882.html
版本
elasticsearch-8.16.1-windows-x86_64
kibana-8.16.1-windows-x86_64
canal.adapter-1.1.7
canal.deployer-1.1.7
ES的配置
elasticsearch-8.16.1\config\elasticsearch.yml
cluster.name: nfturbo-cluster
network.host: 127.0.0.1
http.cors.enabled: true
http.cors.allow-origin: "*"
xpack.security.enabled: false
kibana没有配置
canal.deployer
修改canal.properties,增加
canal.ip = 127.0.0.1
修改example/instance.properties
canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=admin
canal.instance.dbPassword=admin
canal.adapter
application.properties
srcDataSources:
defaultDS:
url: jdbc:mysql://127.0.0.1:3306/xxxx?useUnicode=true
username: admin
password: admin
注意hosts需要带https,由于这出现过问题
canalAdapters:
- instance: example # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: logger
- name: es8
hosts: http://127.0.0.1:9200
properties:
mode: transport # or rest
# security.auth: test:123456 # only used for rest mode
cluster.name: elasticsearch
完整的配置
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts:
syncBatchSize: 1000
retries: -1
timeout:
accessKey:
secretKey:
consumerProperties:
# canal tcp consumer
canal.tcp.server.host: 127.0.0.1:11111
canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
# kafka consumer
kafka.bootstrap.servers: 127.0.0.1:9092
kafka.enable.auto.commit: false
kafka.auto.commit.interval.ms: 1000
kafka.auto.offset.reset: latest
kafka.request.timeout.ms: 40000
kafka.session.timeout.ms: 30000
kafka.isolation.level: read_committed
kafka.max.poll.records: 1000
# rocketMQ consumer
rocketmq.namespace:
rocketmq.namesrv.addr: 127.0.0.1:9876
rocketmq.batch.size: 1000
rocketmq.enable.message.trace: false
rocketmq.customized.trace.topic:
rocketmq.access.channel:
rocketmq.subscribe.filter:
# rabbitMQ consumer
rabbitmq.host:
rabbitmq.virtual.host:
rabbitmq.username:
rabbitmq.password:
rabbitmq.resource.ownerId:
srcDataSources:
defaultDS:
url: jdbc:mysql://127.0.0.1:3306/xxx?useUnicode=true
username: admin
password: admin
canalAdapters:
- instance: example # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: logger
- name: es8
hosts: http://127.0.0.1:9200
properties:
mode: transport # or rest
# security.auth: test:123456 # only used for rest mode
cluster.name: elasticsearch
因为在outerAdapters下面,我们配置的 name 是 es8conf/es8/mytest_user.yml文件:
dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:
_index: nfturbo_users
_id: _id
# upsert: true
# pk: id
sql: "select t.id as _id, t.nick_name as nick_name, t.state as state,t.telephone as telephone from users as t"
# objFields:
# _labels: array:;
#etlCondition: "where a.c_time>={}"
commitBatch: 3000
创建索引
PUT nfturbo_users
{
"mappings": {
"properties": {
"nickname": {
"type": "text"
},
"telephone": {
"type": "text"
},
"state": {
"type": "text"
}
}
}
}
从 ES 查询
GET nfturbo_users/_search
{"_source": ["nick_name","telephone","state"],
"query": {
"match": {
"nick_name": "test11111"
}
}
}