报错信息

Commit cannot be completed since the group has already rebalanced and assigned the partitions

如何理解

这里是说提交commit失败, 因为这个组已经重新分配了

产生原因

正常情况下, kafka会有一个配置用于设置一条消息的过期时间, 在规定时间内, 如果消费者提交了消费完成的信息, 那么就可以正常的分配下一条记录给消费者, 并且将当前记录的状态记为"已消费"状态, 对消息队列做一个标识, 避免重复消费

如何解决

kafka中配置的规定返回消息时间, 默认是300s, 也就是5分钟, 但是有一些业务逻辑处理起来比较复杂, 数据量又比较庞大, 那么5分钟是肯定处理不完的, 比如导入一个5G的文件, 然后逐条插入数据库, 这就需要消耗很长时间, 所以需要设置一下kafka的最大间隔时间
在application-dev.yml文件中配置如下

也就是配置

spring:
	kafka:
		consumer:
			properties:
				max.poll.interval.ms: 86400000

86400000是一天的毫秒数, 我这个业务需求有一天一夜足矣

至此, 问题完美修复!

其它参考方案

  1. 调大max.poll.interval.ms(两次poll方法最大时间间隔),默认时间为300000ms
  2. 调小max.poll.records(一次最多处理的记录数量),默认500
  3. 启动多个线程并行处理数据,但要注意处理完一批消息后才能提交offset,然后进行下次的poll(会用到CountDownLatch)

修改配置参数,调大间隔,调小一次处理的最大任务数量

props.put("max.poll.records", 8);
props.put("max.poll.interval.ms", "30000");
props.put("session.timeout.ms", "30000");

使用多线程并行处理

@Scheduled(fixedRate = 5000)
public void processing()
{
    //如果消息队列中没有消息,等待timeout毫秒后,调用poll()方法。
    //如果队列中有消息,立即消费消息,每次消费的消息的多少
    //可以通过max.poll.records配置
    ConsumerRecords<String, String> records = consumer.poll(3000);
    if (records.count() == 0)
    {
        return;
    }
    Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
    CountDownLatch countDownLatch = new CountDownLatch(records.count());
    ConsumerRecord array[] = new ConsumerRecord[records.count()];
    int i;
    for (i = 0; i < records.count(); ++i)
    {
        array[i] = iterator.next();
    }
    for (i = 0; i < records.count(); ++i){
        final int id = i;
        if (id < records.count() - 1)
        {
            new Thread(()-> {
                disposeOneRecord(array[id],false);
                countDownLatch.countDown();
            }).start();
        }
        else
        {
            new Thread(()-> {
                disposeOneRecord(array[id],true);
                countDownLatch.countDown();
            }).start();
        }
    }
    try {
        countDownLatch.await();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    consumer.commitAsync();
    logger.info(String.format("Successfully processing %d records", records.count()));
}

private void disposeOneRecord(ConsumerRecord<String, String> record, boolean saveInRedis)
{
    String[] split;
    DCSPoint point;
    String rowKey, qualifier, value;
    List<Put> putList = new ArrayList<>();

    Map<String,Object> tagAndValue = JSONObject.parseObject(record.value()).getInnerMap();
    for (String tag : tagAndValue.keySet()) {
        split = tag.split("_");
        if (split.length != 2)
        {
            continue;
        }
        try {
            point = DCSPoint.valueOf(split[1].toUpperCase());
        }catch (IllegalArgumentException e){
            continue;
        }
        if (point.getSection() == Section.UNKNOWN || point.getDataType() != DataType.REAL)
        {
            continue;
        }
        value = tagAndValue.get(tag).toString();
        if (saveInRedis)
        {
            RedisConfig.masterRedis.set(tag, value);
        }
        rowKey = split[0] + "_" + record.key();
        qualifier = split[1];
        putList.add(HBaseDaoUtil.cellPut(rowKey, HBaseConfig.FAMILY,qualifier,value));
    }
    hBaseDao.adds(HBaseConfig.TABLE_NAME, putList);
}
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐