Featured image of post Mysql数据到ClickHouse的同步方案

Mysql数据到ClickHouse的同步方案

项目地址:https://gitee.com/zhongxianliang/mysql2ck

项目结构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
├─src
│  ├─main
│  │  ├─java
│  │  │  └─com
│  │  │      └─example
│  │  │          └─mysql2ck
│  │  │              │  Mysql2ckApplication.java
│  │  │              │  
│  │  │              ├─buffer
│  │  │              │      BatchBuffer.java
│  │  │              │      BatchBufferManager.java
│  │  │              │      FlushStrategy.java
│  │  │              │      
│  │  │              ├─common
│  │  │              │      Constants.java
│  │  │              │      Exception.java
│  │  │              │      
│  │  │              ├─config
│  │  │              │      ClickHouseConfig.java
│  │  │              │      KafkaConfig.java
│  │  │              │      ThreadPoolConfig.java
│  │  │              │      
│  │  │              ├─consumer
│  │  │              │      KafkaBinlogConsumer.java
│  │  │              │      
│  │  │              ├─manager
│  │  │              │      RetryManager.java
│  │  │              │      
│  │  │              ├─model
│  │  │              │      MaxwellMessage.java
│  │  │              │      SyncConfig.java
│  │  │              │      TableFilter.java
│  │  │              │      
│  │  │              ├─processor
│  │  │              │      MessageProcessor.java
│  │  │              │      
│  │  │              └─writer
│  │  │                      ClickHouseWriter.java
│  │  │                      SQLBuilder.java
│  │  │                      
│  │  └─resources
│  │          application.yml

主要的流程是 MySQL→ Canal→Kafka→本系统→ClickHouse

一、配置内容

application.yml文件

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: clickhouse-sync-group
      auto-offset-reset: latest
      enable-auto-commit: false
      max-poll-records: 500
      fetch-max-wait: 500
      fetch-min-size: 1
    listener:
      type: batch
      concurrency: 3
      ack-mode: manual

  clickhouse:
    url: jdbc:clickhouse://localhost:8123/default
    username: default
    password:
    driver-class-name: com.clickhouse.jdbc.ClickHouseDriver
    hikari:
      maximum-pool-size: 10
      minimum-idle: 5
      connection-timeout: 30000

sync:
  batch:
    size: 1000          # 批量大小
    timeout: 2000       # 超时时间(ms)
  tables:
    include: "user.*,order.*"  # 同步的表,支持正则
    exclude: "*.log_.*"        # 排除的表
server:
  port: 8080

ClickHouse的连接配置类

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Configuration
public class ClickHouseConfig {

    @Value("${spring.clickhouse.url}")
    private String url;

    @Value("${spring.clickhouse.username}")
    private String username;

    @Value("${spring.clickhouse.password}")
    private String password;

    @Bean
    public DataSource clickHouseDataSource() {
        HikariConfig config = new HikariConfig();
        config.setJdbcUrl(url);
        config.setUsername(username);
        config.setPassword(password);
        config.setDriverClassName("com.clickhouse.jdbc.ClickHouseDriver");
        config.setMaximumPoolSize(20);
        config.setMinimumIdle(5);
        config.setConnectionTimeout(30000);
        config.setIdleTimeout(600000);
        config.setMaxLifetime(1800000);
        config.addDataSourceProperty("socket_timeout", "300000");
        config.addDataSourceProperty("keepAliveTimeout", "60");
        return new HikariDataSource(config);
    }

    @Bean
    public JdbcTemplate clickHouseJdbcTemplate() {
        return new JdbcTemplate(clickHouseDataSource());
    }
}

Kafka 的配置类,主要设置了Kafka的提交模式和拉取数量

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@Configuration
@EnableKafka
public class KafkaConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    /**
     * 批量消费工厂
     */
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> batchContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setBatchListener(true); 
        factory.setConcurrency(3); 

        // 批量消费配置
        ContainerProperties containerProperties = factory.getContainerProperties();
        containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        containerProperties.setPollTimeout(3000);

        return factory;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        // 每次最大拉取数
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); 
        props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);
        // 1MB
        props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024 * 1024);
        // 手动提交
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); 

        return new DefaultKafkaConsumerFactory<>(props);
    }
}

线程池配置类,负责将消息分发到缓冲区中,考虑到ClickHouse写入压力和资源压力,以及数据同步数据量,设置了核心线程数5,最大线程10,队列长度为1000

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
@Configuration
@EnableAsync
@EnableScheduling
public class ThreadPoolConfig {
    @Bean("processExecutor")
    public ThreadPoolTaskExecutor processExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(10);
        executor.setQueueCapacity(1000);
        executor.setThreadNamePrefix("process-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }
}

二、关键内容

监听方法

首先将拉取的信息解析成Maxwell对象,然后根据表名进行分组,以每个表的维度自己执行自己的

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@KafkaListener(
            topics = "${kafka.topic.binlog}",
            containerFactory = "batchContainerFactory",
            concurrency = "${kafka.consumer.concurrency:3}"
    )
    public void consume(List<ConsumerRecord<String, String>> records) {
        if (records.isEmpty()) {
            return;
        }
        // 按表分组处理,避免不同表互相影响
        Map<String, List<MaxwellMessage>> tableMessages = new HashMap<>();

        for (ConsumerRecord<String, String> record : records) {
            try {
                MaxwellMessage message = parseMessage(record.value());
                tableMessages
                        .computeIfAbsent(message.getTableKey(), k -> new ArrayList<>())
                        .add(message);
            } catch (Exception e) {
                log.error("Parse message failed, offset: {}", record.offset(), e);
            }
        }

        // 按表并行处理
        tableMessages.forEach((tableKey, messages) -> {
            messageProcessor.processBatch(tableKey, messages);
        });
    }

多线程处理数据

启用线程来处理数据,将数据加入到自定义的缓冲区内,并且记录失败的消息

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Async("processExecutor")
public void processBatch(String tableKey, List<MaxwellMessage> messages) {
    if (messages.isEmpty()) {
        return;
    }

    if (!tableFilter.needSync(tableKey)) {
        log.debug("Table {} is filtered, skip {} messages",
                tableKey, messages.size());
        return;
    }

    for (MaxwellMessage message : messages) {
        try {

            validateMessage(message);
            normalizeData(message);
            bufferManager.addMessage(tableKey, message);

        } catch (Exception e) {
            log.error("Process message failed: {}", message, e);
            // 记录失败消息
            retryManager.retryImmediately(message, e);
        }
    }
}
	@Scheduled(fixedRate = 1000)
    public void flushTimeoutBuffers() {
        long now = System.currentTimeMillis();

        buffers.entrySet().removeIf(entry -> {
            BatchBuffer buffer = entry.getValue();
            if (buffer.isTimeout(now) && !buffer.isEmpty()) {
                flushBuffer(entry.getKey(), buffer);
                return true; // 移除并创建新的
            }
            return false;
        });
    }

自定义缓冲区

设置了大小和刷新时间

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Data
public class BatchBuffer {
    private final List<MaxwellMessage> messages = new ArrayList<>();
    private final int maxSize;
    private final long timeout;
    private long lastFlushTime;

    public BatchBuffer(int maxSize, long timeout) {
        this.maxSize = maxSize;
        this.timeout = timeout;
        this.lastFlushTime = System.currentTimeMillis();
    }

    public synchronized void add(MaxwellMessage message) {
        messages.add(message);
    }

    public synchronized boolean shouldFlush() {
        return messages.size() >= maxSize || isTimeout(System.currentTimeMillis());
    }

    public synchronized boolean isTimeout(long currentTime) {
        return currentTime - lastFlushTime > timeout;
    }

    public synchronized List<MaxwellMessage> getAndClear() {
        List<MaxwellMessage> copy = new ArrayList<>(messages);
        messages.clear();
        lastFlushTime = System.currentTimeMillis();
        return copy;
    }

    public synchronized boolean isEmpty() {
        return messages.isEmpty();
    }
}

重试策略

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
public void retryImmediately(MaxwellMessage message, Exception error) {
    String key = getMessageKey(message);

    if (retryCountMap.getOrDefault(key, 0) >= MAX_RETRY_COUNT) {
        sendToDeadLetterQueue(message, error);
        return;
    }
    try {
        Thread.sleep(100);
        bufferManager.addMessage(getTableKey(message), message);
        retryCountMap.remove(key);
        log.info("Immediate retry success: {}", key);
    } catch (Exception e) {
        int count = retryCountMap.getOrDefault(key, 0) + 1;
        retryCountMap.put(key, count);
        if (count >= MAX_RETRY_COUNT) {
            sendToDeadLetterQueue(message, e);
        } else {
            addToDelayRetryQueue(message, e);
        }
    }
}
使用 Hugo 构建
主题 StackJimmy 设计

发布了 17 篇文章 | 共 33060 字