【README】

0,为啥要看  DefaultKafkaProducerFactory? 最近在基于 springboot 开发kafka模块,发现 kafakTemplate构造器传入了 DefaultKafkaProducerFactory实例, kafkaTemplate内部使用了 很多 DefaultKafkaProducerFactory的方法; 所以把 DefaultKafkaProducerFactory的重点方法分析处理出来,以便于查看 KafkaTemplate的内部逻辑; 

1, 本文涉及的 kafka操作,不涉及事务和消费者,所以本文忽略了有关kafka事务,消费者的描述; kafka事务, refer2  转:Kafka事务使用和编程示例/实例_PacosonSWJTU的博客-CSDN博客Kafka事务使用和编程示例/实例_JobShow裁员加班实况-微信小程序-CSDN博客一、概述​ Kafka事务特性是指一系列的生产者生产消息和消费者提交偏移量的操作在一个事务中,或者说是一个原子操作,生产消息和提交偏移量同时成功或者失败。注意:kafka事务和DB事务。在理解消息的事务时,一直处于一个错误理解是,把操作db的业务逻辑跟操作消息当成是一个事务,如下所示:void kakfa_in_tranction(){ // 1.kafa的操作:读取消息或生产消息 kafkaOperation(); /https://blog.csdn.net/PacosonSWJTU/article/details/1213058842,本文结合了 api doc 对 DefaultKafkaProducerFactory-默认kafka生产者工厂的重点方法进行介绍;  

3,DefaultKafkaProducerFactory 类代码结构包括(小结):

  1. 创建原生kafka生产者并包装到 CloseSafeProducer类中;
  2. 创建带有事务的 kafka 生产者;
  3. 为每个线程都创建一个kafka 生产者;
  4. 内部类 CloseSafeProducer-关闭安全生产者;
    1. 发送消息;
    2. 刷新kafka缓存到服务器;
    3. kafka事务操作(本文不涉及),包括开启或提交或中断事务,提交偏移量等; 
    4. 关闭生产者; 

【1】 类描述

类描述:

单例共享 Producer 实例的 ProducerFactory 实现。

此实现将为每次 createProducer() 调用时提供的 Map 配置和可选的 Serializer 实现返回相同的 Producer 实例(如果未启用事务)。

如果您使用的序列化器没有参数构造函数并且不需要设置,那么最简单的方法是在传递给 DefaultKafkaProducerFactory 构造函数的配置中针对 ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG 和 ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG 键指定序列化器类。

如果这是不可能的,但您确定以下至少一项是正确的:

  • 1 只有一个生产者会使用序列化程序。
  • 2 您正在使用可以在 Producer 实例之间共享的序列化程序(特别是它们的 close() 方法是无操作的)。
  • 3 您确定没有任何单个 Producer 被关闭的风险,而其他具有相同序列化程序的 Producer 实例正在使用中。

然后您可以为键和值序列化程序之一或两者传入 Serializer 实例。

如果以上都不是真的,那么您可以为一个或两个序列化程序提供一个供应商函数,每次工厂创建生产者时,该函数将用于获取序列化程序。

Producer 被包装,并且在调用 Producer.close() 时实际上并未关闭底层的 KafkaProducer 实例。当调用 DisposableBean.destroy() 或应用程序上下文发布 ContextStoppedEvent 时,KafkaProducer 物理关闭。你也可以调用reset()。

设置 setTransactionIdPrefix(String) 启用事务;在这种情况下,会维护生产者的缓存;关闭生产者将其返回到缓存。当工厂被销毁、应用程序上下文停止或调用 reset() 方法时,生产者将关闭并清除缓存。

public class DefaultKafkaProducerFactory<K, V> extends KafkaResourceFactory
		implements ProducerFactory<K, V>, ApplicationContextAware,
			BeanNameAware, ApplicationListener<ContextStoppedEvent>, DisposableBean {

 【2】构造器

使用提供的配置和序列化程序供应商构建工厂。

如果提供,还可以把 transactionIdPrefix 配置为ProducerConfig.TRANSACTIONAL_ID_CONFIG 的值。

此配置将被目标 Producer 实例的后缀覆盖。

public DefaultKafkaProducerFactory(Map<String, Object> configs,
			@Nullable Supplier<Serializer<K>> keySerializerSupplier,
			@Nullable Supplier<Serializer<V>> valueSerializerSupplier) {

	this.configs = new ConcurrentHashMap<>(configs);
	this.keySerializerSupplier = keySerializerSupplier == null ? () -> null : keySerializerSupplier;
	this.valueSerializerSupplier = valueSerializerSupplier == null ? () -> null : valueSerializerSupplier;

	// clientId 表示kafka生产者id
	if (this.clientIdPrefix == null && configs.get(ProducerConfig.CLIENT_ID_CONFIG) instanceof String) {
		this.clientIdPrefix = (String) configs.get(ProducerConfig.CLIENT_ID_CONFIG);
	}
			// 是否开启事务
	String txId = (String) this.configs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
	if (StringUtils.hasText(txId)) {
		setTransactionIdPrefix(txId);
		this.configs.remove(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
	}
	this.configs.put("internal.auto.downgrade.txn.commit", true);
}

【特别注意】我们可以不传入 key序列化器,value序列化器 对象到 DefaultKafkaProducerFactory构造器(即设置为null) , 而把序列化器全限定类名设置在 configs 属性里面,因为原生kafka生产者的构造器可以读取配置中的序列化器类,如下:

new DefaultKafkaProducerFactory<>(0
(Map) PPKafkaClusterManager.INSTANCE.getKafkaClusterProps(kafkaClusterName));

// 不传入key value的序列化器,默认为null 

 kafka 原生构造器如下:

// 通过反射创建序列化器对象 
public <T> T getConfiguredInstance(String key, Class<T> t) {
        Class<?> c = getClass(key);
        if (c == null)
            return null;
        Object o = Utils.newInstance(c);
        if (!t.isInstance(o))
            throw new KafkaException(c.getName() + " is not an instance of " + t.getName());
        if (o instanceof Configurable)
            ((Configurable) o).configure(originals());
        return t.cast(o);
    }

【3】方法介绍

【3.1】设置物理关闭生产者超时时间

public void setPhysicalCloseTimeout(int physicalCloseTimeout) {
    this.physicalCloseTimeout = Duration.ofSeconds(physicalCloseTimeout);
}

通过工厂物理关闭生产者而不是producer自身关闭的等待时间(当 {@link #reset()}、{@link #destroy() #closeProducerFor(String)} 或 {@link #closeThreadBoundProducer( )} 被调用)。 以秒为单位; 默认{@link #DEFAULT_PHYSICAL_CLOSE_TIMEOUT}。


【3.2】设置事务id 前缀(开启事务)

public final void setTransactionIdPrefix(String transactionIdPrefix) {
    Assert.notNull(transactionIdPrefix, "'transactionIdPrefix' cannot be null");
    this.transactionIdPrefix = transactionIdPrefix;
    enableIdempotentBehaviour();
}
// 事务可用? 
@Override
public boolean transactionCapable() {
    return this.transactionIdPrefix != null;
}

为 ProducerConfig.TRANSACTIONAL_ID_CONFIG 配置设置前缀。 默认情况下,来自配置的 ProducerConfig.TRANSACTIONAL_ID_CONFIG 值用作目标生产者配置中的前缀。

【3.3】 设置是否每个线程产生一个生产者

public void setProducerPerThread(boolean producerPerThread) {
    this.producerPerThread = producerPerThread;
}

设置为 true 为每个线程创建一个生产者,而不是由所有客户端共享的单例。 当不再需要生产者时,客户端必须调用 closeThreadBoundProducer() 来物理关闭生产者。 这些生产者不会被 destroy() 或 reset() 关闭。 


【3.4】创建kafka生产者方法(重点)*

3个外观方法

@Override
public Producer<K, V> createProducer() {
	return createProducer(this.transactionIdPrefix);
}

@Override
public Producer<K, V> createProducer(@Nullable String txIdPrefixArg) {
	String txIdPrefix = txIdPrefixArg == null ? this.transactionIdPrefix : txIdPrefixArg;
	return doCreateProducer(txIdPrefix);
}

@Override
public Producer<K, V> createNonTransactionalProducer() {
	return doCreateProducer(null);
}

底层方法

private Producer<K, V> doCreateProducer(@Nullable String txIdPrefix) {
	if (txIdPrefix != null) { // 使用kafka事务
		if (this.producerPerConsumerPartition) {
			return createTransactionalProducerForPartition(txIdPrefix);
		}
		else {
			return createTransactionalProducer(txIdPrefix);
		}
	}
	if (this.producerPerThread) { // 每个线程一个生产者 
		return getOrCreateThreadBoundProducer();
	}
	synchronized (this) { // 我们走这里,synchronized同步代码块 
		if (this.producer != null && expire(this.producer)) {
			this.producer = null;
		}
		if (this.producer == null) {
			this.producer = new CloseSafeProducer<>(createKafkaProducer(), this::removeProducer,
					this.physicalCloseTimeout, this.beanName, this.epoch.get());
			this.listeners.forEach(listener -> listener.producerAdded(this.producer.clientId, this.producer));
		}
		return this.producer;
	}
}

【代码解说】 以上方法有4个分支,包括创建分区带事务的生产者, 带事务的生产者,每个线程一个生产者,普通生产者;

【3.4.1】 创建分区带事务的生产者 createTransactionalProducerForPartition(txIdPrefix)

因为方法过于复杂,放在文末说明

【3.4.2】 带事务的生产者 createTransactionalProducer(txIdPrefix)

因为方法过于复杂,放在文末说明

【3.4.3】 每个线程一个生产者 getOrCreateThreadBoundProducer

该方法就是把 生产者放入了线程级变量 ThreadLocal; 仅此而已

private Producer<K, V> getOrCreateThreadBoundProducer() {
        // 从线程级变量获取
	CloseSafeProducer<K, V> tlProducer = this.threadBoundProducers.get();
	if (tlProducer != null && (this.epoch.get() != tlProducer.epoch || expire(tlProducer))) {
		closeThreadBoundProducer();
		tlProducer = null;
	}
	if (tlProducer == null) {
		tlProducer = new CloseSafeProducer<>(createKafkaProducer(), this::removeProducer,
				this.physicalCloseTimeout, this.beanName, this.epoch.get());
		for (Listener<K, V> listener : this.listeners) {
			listener.producerAdded(tlProducer.clientId, tlProducer);
		}
		this.threadBoundProducers.set(tlProducer); // 放入线程级遍历 
	}
	return tlProducer;
}

threadBoundProducers 就是线程级遍历


【3.4.4】 普通生产者*(本文所关注的方法)

synchronized块中 可以保证所有客户端线程复用同一个 kafka生产者,只要这个kafka没有过期;即便过期,它会重新创建一个 kafka生产者;

synchronized中调用了expire(this.producer)判断是否过期

private boolean expire(CloseSafeProducer<K, V> producer) {
	boolean expired = this.maxAge > 0 && System.currentTimeMillis() - producer.created > this.maxAge;
	if (expired) {
		producer.closeDelegate(this.physicalCloseTimeout, this.listeners);
	}
	return expired;
}

接着 创建了 CloseSafeProducer -关闭安全的生产者, 这是一个内部类

protected static class CloseSafeProducer<K, V> implements Producer<K, V> {

在 调用CloseSafeProducer 构造器时传入了 kafka生产者, 由 createKafkaProducer() 获取; createKafkaProducer() 方法如下:

protected Producer<K, V> createKafkaProducer() {
	Map<String, Object> newConfigs;
	if (this.clientIdPrefix == null) {// 是否有生产者客户端id ,这个值可以为空,
		newConfigs = new HashMap<>(this.configs);
	}
	else {
		newConfigs = new HashMap<>(this.configs);
		newConfigs.put(ProducerConfig.CLIENT_ID_CONFIG,
				this.clientIdPrefix + "-" + this.clientIdCounter.incrementAndGet());
	}
	checkBootstrap(newConfigs);
	return createRawProducer(newConfigs);
}

// 创建原生kafka生产者 
protected Producer<K, V> createRawProducer(Map<String, Object> rawConfigs) {
	Producer<K, V> kafkaProducer =
			new KafkaProducer<>(rawConfigs, this.keySerializerSupplier.get(), this.valueSerializerSupplier.get());
	for (ProducerPostProcessor<K, V> pp : this.postProcessors) {
		kafkaProducer = pp.apply(kafkaProducer);
	}
	return kafkaProducer;
}

 子类必须返回一个原生生产者(kafka生产者),该生产者将被包装在 DefaultKafkaProducerFactory.CloseSafeProducer 中。


【4】内部类-保证关闭安全的kafka生产者-CloseSafeProducer

1, 该类实现了 接口  Producer;

【4.1】构造器

上文调用  CloseSafeProducer构造器创建 生产者对象,

 其中 removeProducer 使用了 java8语法的方法引用,如下:

构造器有1个外观

CloseSafeProducer(Producer<K, V> delegate,
    BiPredicate<CloseSafeProducer<K, V>, Duration> removeConsumerProducer, Duration closeTimeout,
    String factoryName, int epoch) {

    this(delegate, removeConsumerProducer, null, closeTimeout, factoryName, epoch);
}

底层构造器

CloseSafeProducer(Producer<K, V> delegate,
	BiPredicate<CloseSafeProducer<K, V>, Duration> removeProducer, @Nullable String txIdPrefix,
	Duration closeTimeout, String factoryName, int epoch) {

	Assert.isTrue(!(delegate instanceof CloseSafeProducer), "Cannot double-wrap a producer");
	this.delegate = delegate; // 原生kafka生产者 
	this.removeProducer = removeProducer; // 移除生产者的方法
	this.txIdPrefix = txIdPrefix; // 事务id前缀 
	this.closeTimeout = closeTimeout; // 关闭超时时间 
	Map<MetricName, ? extends Metric> metrics = delegate.metrics(); // 指标
	Iterator<MetricName> metricIterator = metrics.keySet().iterator();// 指标迭代器
	String id;
	if (metricIterator.hasNext()) {
		id = metricIterator.next().tags().get("client-id");
	}
	else {
		id = "unknown";
	}
	this.clientId = factoryName + "." + id; // 客户端id
	this.created = System.currentTimeMillis(); // 创建时间为当前时间(毫秒)
	this.epoch = epoch; // 副本时代版本号
	LOGGER.debug(() -> "Created new Producer: " + this);
}

【代码解说】

delegate,表示代理,这里指 原生kafka生产者;

很显然, 构造器没有啥复杂逻辑,就是赋值而已;


【4.2】发送消息

发送消息有2个重载方法 

方法1, 直接发送 ProducerRecord

@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
    LOGGER.trace(() -> toString() + " send(" + record + ")");
    return this.delegate.send(record);
}

方法2,带回调的发送方法,消息发送完成后的回调(成功或失败),回调方法中可以获取发送消息所在的偏移量和分区等信息(metadata) ;

这个方法先执行了 原生kafka发送消息分发的回调, 回调执行完成后,在执行 外层传入的回调;

public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
    LOGGER.trace(() -> toString() + " send(" + record + ")");
    return this.delegate.send(record, new Callback() {

	@Override
	public void onCompletion(RecordMetadata metadata, Exception exception) {
		if (exception instanceof OutOfOrderSequenceException) {
			CloseSafeProducer.this.producerFailed = exception;
			close(CloseSafeProducer.this.closeTimeout);
		}
		callback.onCompletion(metadata, exception);
	}
    });
}

【4.3】flush 刷新缓冲区

直接调用 原生kakfa生产者delegate的flush 方法; 

public void flush() {
    LOGGER.trace(() -> toString() + " flush()");
    this.delegate.flush();
}

【4.4】kafka事务操作方法

初始化事务, initTransactions;

开启事务, beginTransaction ;

发送偏移量到事务, sendOffsetsToTransaction ;

提交事务, commitTransaction ;

中断事务, abortTransaction ;


【4.5】关闭生产者方法(非常重要*)

有2个方法关闭生产者;

【4.5.1】 close(Duration timeout )

public void close(@Nullable Duration timeout) {
    LOGGER.trace(() -> toString() + " close(" + (timeout == null ? "null" : timeout) + ")");
    if (!this.closed) {
	if (this.producerFailed != null) {
		LOGGER.warn(() -> "Error during some operation; producer removed from cache: " + this);
		this.closed = true;
		this.removeProducer.test(this, this.producerFailed instanceof TimeoutException
				? CLOSE_TIMEOUT_AFTER_TX_TIMEOUT
				: timeout);
	}
	else {
		this.closed = this.removeProducer.test(this, timeout);
	}
    }
}

调用 remoteProducer() 移除生产者

// 删除单个共享生产者和线程绑定实例(如果存在)。
protected final synchronized boolean removeProducer(CloseSafeProducer<K, V> producerToRemove
, Duration timeout) {
	if (producerToRemove.closed) {
		if (producerToRemove.equals(this.producer)) {
			this.producer = null;
			producerToRemove.closeDelegate(timeout, this.listeners);
		}
		this.threadBoundProducers.remove();
		return true;
	}
	else {
		return false;
	}
}

 

也就是说,不是每一次发送消息完成,都关闭kafka生产者

  1. 那什么时候才会关闭 ?
  2. 当 producerToRemove.closed 为true时,才会执行关闭;那 producerToRemove.closed 即代理.closed 何时才会设置为true ?
  3. 当 this.profucerFailed 不为空的时候; 那 this.profucerFailed  什么情况下不为空 ?
  4. 原来在 发送消息的回调方法里,若发送消息抛出异常,则设置 profucerFailed为异常对象,即不为空;如下图所示

【总结】 何时关闭kafka生产者 ?(干货——非常重要) *

当发送消息抛出异常时,关闭kafka生产者; 

上述代码还是调用了  producerToRemove.closeDelegate(timeout, this.listeners);

其中 producerToRemove 就是 producer(CloseSafeProducer )的 closeDelegate(),下文所述;


 【4.5.2】 CloseSafeProducer.closeDelegate

调用了 原生kafka生产者的close 方法;

把生产者从监听器中移除;

void closeDelegate(Duration timeout, List<Listener<K, V>> listeners) {
	this.delegate.close(timeout == null ? this.closeTimeout : timeout);
	listeners.forEach(listener -> listener.producerRemoved(this.clientId, this));
	this.closed = true;
}


【补充】创建生产者

【代码解说】 以上方法有4个分支,包括创建分区带事务的生产者, 带事务的生产者,每个线程一个生产者,普通生产者;

【3.4.1】 创建分区带事务的生产者 createTransactionalProducerForPartition(txIdPrefix)

protected Producer<K, V> createTransactionalProducerForPartition(String txIdPrefix) {
	String suffix = TransactionSupport.getTransactionIdSuffix();
	if (suffix == null) {
		return createTransactionalProducer(txIdPrefix);
	}
	else {
		synchronized (this.consumerProducers) {
			CloseSafeProducer<K, V> consumerProducer = this.consumerProducers.get(suffix);
			if (consumerProducer == null || expire(consumerProducer)) {
				CloseSafeProducer<K, V> newProducer = doCreateTxProducer(txIdPrefix, suffix,
						this::removeConsumerProducer);
				this.consumerProducers.put(suffix, newProducer);
				return newProducer;
			}
			else {
				return consumerProducer;
			}
		}
	}
}
private final Map<String, CloseSafeProducer<K, V>> consumerProducers 
    = new HashMap<>();
  • 1) 如果后缀为空,调用  createTransactionalProducer 创建生产者 ; 先从缓存中的阻塞队列中获取,若获取不到,则 调用 doCreateTxProducer  创建;

获取缓存 getCache() 方法如下:
protected BlockingQueue<CloseSafeProducer<K, V>> getCache(String txIdPrefix) {
    if (txIdPrefix == null) {
	return null;
    }
    return this.cache.computeIfAbsent(txIdPrefix, txId -> new LinkedBlockingQueue<>());
}

// cache缓存定义
private final Map<String, BlockingQueue<CloseSafeProducer<K, V>>> cache
     = new ConcurrentHashMap<>();
  •  2,不为空, 调用 doCreateTxProducer 创建生产者; 它也调用了  createRawProducer 根据原生 kafka生产者 创建  CloseSafeProducer

【3.4.2】 带事务的生产者 createTransactionalProducer(txIdPrefix)

refer2【3.4.1】分支1。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐