[C#] Kafka 生产者和消费者实现
一、背景由于公司加强对员工操作记录的审查和追踪,程式需要对员工的进行存档记录。由于并发巨大,使用传统的直连DB进行存储的方式对DB造成巨大压力,也导致程序响应缓慢,降低了用户体验。结合DBA推动Kafka,开发Kafka调用组件,使各应用程式快速集成Kafka,实现高并发的消息队列处理方案。二、简介2.1 KafkaKafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Jav
目录
一、背景
由于公司加强对员工操作记录的审查和追踪,程式需要对员工的进行存档记录。由于并发巨大,使用传统的直连DB进行存储的方式对DB造成巨大压力,也导致程序响应缓慢,降低了用户体验。结合DBA推动Kafka,开发Kafka调用组件,使各应用程式快速集成Kafka,实现高并发的消息队列处理方案。
二、简介
2.1 Kafka
Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。
特性:
Kafka [1]
是一种高吞吐量 [2]
的分布式发布订阅消息系统,有如下特性:
通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。
高吞吐量 [2] :即使是非常普通的硬件Kafka也可以支持每秒数百万 [2] 的消息。
支持通过Kafka服务器和消费机集群来分区消息。
支持Hadoop并行数据加载。 [3]
来源:百度百科
三、实现
3.1 创建项目
先创建一个类库的项目,我们把它叫做KafkaHelper,
3.2 安装Package
使用Nuget安装以下Package
-----------------------------------------------------------------------------------------------------
Confluent.Kafka
Newtonsoft.Json ——用于消息序列化和反序列化
-----------------------------------------------------------------------------------------------------
以上包有依赖项,需要全部引入,引用完成后的packages.config文件如下:
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="Confluent.Kafka" version="1.5.3" targetFramework="net45" />
<package id="librdkafka.redist" version="1.5.3" targetFramework="net45" />
<package id="Newtonsoft.Json" version="8.0.3" targetFramework="net45" />
<package id="System.Buffers" version="4.4.0" targetFramework="net45" />
<package id="System.Memory" version="4.5.0" targetFramework="net45" />
<package id="System.Runtime.CompilerServices.Unsafe" version="4.5.0" targetFramework="net45" />
</packages>
3.3 代码实现
代码中注释比较全面,我就不多说了,有问题可以评论或私信找我
3.3.1 实现消息序列化和反序列化的接口
注意:如果报错
CS0619 “ReadOnlySpan<byte>”已过时:“Types with embedded references are not supported in this version of your compiler.”
请使用VS2019+版本开发组件
using Confluent.Kafka;
using Newtonsoft.Json;
using System;
using System.Text;
namespace KafkaHelper
{
class KafkaConverter<T> : ISerializer<T>
{
/// <summary>
/// 序列化数据成字节
/// </summary>
/// <param name="data"></param>
/// <param name="context"></param>
/// <returns></returns>
public byte[] Serialize(T data, SerializationContext context)
{
var json = JsonConvert.SerializeObject(data);
return Encoding.UTF8.GetBytes(json);
}
}
class KafkaDConverter<T> : IDeserializer<T>
{
/// <summary>
/// 反序列化字节数据成实体数据
/// </summary>
/// <param name="data"></param>
/// <param name="isNull"></param>
/// <param name="context"></param>
/// <returns></returns>
public T Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
{
if (isNull) return default(T);
var json = Encoding.UTF8.GetString(data.ToArray());
try
{
return JsonConvert.DeserializeObject<T>(json);
}
catch
{
return default(T);
}
}
}
}
3.3.2 实现辅助类
辅助类主要用于实现一些常见公共的方法
using System.Reflection;
using System.Text.RegularExpressions;
namespace KafkaHelper
{
/// <summary>
/// 辅助类
/// </summary>
public class Helper
{
/// <summary>
/// 获取当前应用程式名称(仅控制台应用程序和Windows应用程序可用)
/// </summary>
/// <returns></returns>
public static string GetApplicationName()
{
try
{
return Assembly.GetEntryAssembly().GetName().Name;
}
catch
{
return "Kafka_Demo";
}
}
/// <summary>
/// 获取服务器名称
/// </summary>
/// <returns></returns>
public static string GetServerName()
{
return System.Net.Dns.GetHostName();
}
/// <summary>
/// 获取服务器IP
/// </summary>
/// <returns></returns>
public static string GetServerIp()
{
System.Net.IPHostEntry ips = System.Net.Dns.GetHostEntry(System.Net.Dns.GetHostName());
foreach (var ip in ips.AddressList)
{
if (Regex.IsMatch(ip.ToString(), @"^10\.((25[0-5]|2[0-4]\d|1\d{2}|\d?\d)\.){2}(25[0-5]|2[0-4]\d|1\d{2}|\d?\d)$"))
{
return ip.ToString();
};
}
return "127.0.0.1";
}
/// <summary>
/// 将c# DateTime时间格式转换为Unix时间戳格式(毫秒级)
/// </summary>
/// <returns>long</returns>
public static long GetTimeStamp()
{
System.DateTime time = System.DateTime.Now;
long t = (time.Ticks - 621356256000000000) / 10000;
return t;
}
}
}
3.3.3 实现生产者
我这边生产者使用了 单例模式(静态变量实现单例)
using Confluent.Kafka;
using System;
namespace KafkaHelper
{
/// <summary>
/// 生产者(快速使用) Message.Key的数据类型为string、Message.Value的数据类型为LogModel
/// </summary>
public class Producer : Producer<string, LogModel>
{
private static readonly Producer producer = new Producer();
private Producer()
{
}
/// <summary>
/// 获取生产者实例
/// </summary>
/// <returns></returns>
public static new Producer GetProducer
{
get
{
return producer;
}
}
/// <summary>
/// 生产(使用配置的Topic,Key 默认为message)
/// </summary>
/// <param name="Value">Message.Value</param>
public void Produce(LogModel Value)
{
Produce("message", Value, Topic);
}
}
/// <summary>
/// 生产者(进阶)
/// </summary>
/// <typeparam name="TKey">Message.Key 的数据类型</typeparam>
/// <typeparam name="TValue">Message.Value 的数据类型</typeparam>
public class Producer<TKey, TValue>
{
private static readonly Producer<TKey, TValue> producer = new Producer<TKey, TValue>();
/// <summary>
/// 构造生产者 (私有)
/// </summary>
protected Producer()
{
ProducerConfig = new ProducerConfig()
{
BootstrapServers = "127.0.0.1:9092"
};
}
/// <summary>
/// 获取生产者实例
/// </summary>
/// <returns></returns>
public static Producer<TKey, TValue> GetProducer
{
get
{
return producer;
}
}
/// <summary>
/// 生产者配置文件
/// </summary>
public ProducerConfig ProducerConfig;
/// <summary>
/// Kafka地址(包含端口号) 默认为 127.0.0.1:9092
/// </summary>
public string Servers
{
get
{
return ProducerConfig.BootstrapServers;
}
set
{
ProducerConfig.BootstrapServers = value;
}
}
/// <summary>
/// 主题 请使用Kafka_系统名称 的格式,例如 Kafka_Demo等. 默认为当前程式名称(Web应用 默认为 Kafka_Demo)
/// </summary>
public string Topic { get; set; } = Helper.GetApplicationName();
private IProducer<TKey, TValue> _producer;
/// <summary>
/// 启动
/// </summary>
/// <returns>是否成功</returns>
public bool Start()
{
return Start(out _);
}
/// <summary>
/// 启动
/// </summary>
/// <param name="Message">启动结果说明</param>
/// <returns>是否成功</returns>
public bool Start(out string Message)
{
try
{
var producerBuilder = new ProducerBuilder<TKey, TValue>(ProducerConfig);
producerBuilder.SetValueSerializer(new KafkaConverter<TValue>());//设置序列化方式
_producer = producerBuilder.Build();
Message = "启动成功";
return true;
}
catch (Exception ex)
{
Message = ex.ToString();
return false;
}
}
/// <summary>
/// 生产(使用配置的Topic)
/// </summary>
/// <param name="Key">Message.Key</param>
/// <param name="Value">Message.Value</param>
public void Produce(TKey Key, TValue Value)
{
Produce(Key, Value, Topic);
}
/// <summary>
/// 生产(使用传入的Topic)
/// </summary>
/// <param name="Key">Message.Key</param>
/// <param name="Value">Message.Value</param>
/// <param name="Topic">主题</param>
public void Produce(TKey Key, TValue Value, string Topic)
{
_producer.Produce(Topic, new Message<TKey, TValue>() { Key = Key, Value = Value });
}
}
}
3.3.4 实现消费者
消费者实现三种消费方式: 1.订阅回调模式 2.批量消费模式 3.单笔消费模式
using Confluent.Kafka;
using System;
using System.Collections.Generic;
namespace KafkaHelper
{
/// <summary>
/// 消费者 (Message.Key的数据类型为string、Message.Value的数据类型为LogModel)
/// </summary>
public class Consumer : Consumer<string, LogModel> { }
/// <summary>
/// 消费者
/// </summary>
/// <typeparam name="TKey">Message.Key 的数据类型</typeparam>
/// <typeparam name="TValue">Message.Value 的数据类型</typeparam>
public class Consumer<TKey, TValue>
{
/// <summary>
/// 构造消费者
/// </summary>
public Consumer() {
ConsumerConfig = new ConsumerConfig()
{
BootstrapServers = "127.0.0.1:9092",
GroupId = Helper.GetApplicationName(),
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = true
};
}
private bool keepSubscribe = true;
/// <summary>
/// Kafka地址(包含端口号) 默认为 127.0.0.1:9092
/// </summary>
public string Servers
{
get
{
return ConsumerConfig.BootstrapServers;
}
set
{
ConsumerConfig.BootstrapServers = value;
}
}
/// <summary>
/// 消费者群组 默认为当前程式名称(Web应用 默认为 Kafka_Demo)
/// </summary>
public string GroupId
{
get
{
return ConsumerConfig.GroupId;
}
set
{
ConsumerConfig.GroupId = value;
}
}
/// <summary>
/// 自动提交 默认为 true
/// </summary>
public bool EnableAutoCommit
{
get
{
return ConsumerConfig.EnableAutoCommit ?? true;
}
set
{
ConsumerConfig.EnableAutoCommit = value;
}
}
private IConsumer<TKey, TValue> _consumer;
/// <summary>
/// 消费者配置文件
/// </summary>
public ConsumerConfig ConsumerConfig;
/// <summary>
/// 启动
/// </summary>
/// <returns>是否成功</returns>
public bool Start()
{
return Start(out _);
}
/// <summary>
/// 启动
/// </summary>
/// <param name="Message">启动结果说明</param>
/// <returns>是否成功</returns>
public bool Start(out string Message)
{
try
{
var Builder = new ConsumerBuilder<TKey, TValue>(ConsumerConfig);
Builder.SetValueDeserializer(new KafkaDConverter<TValue>());//设置反序列化方式
_consumer = Builder.Build();
keepSubscribe = true;
Message = "启动成功";
return true;
}
catch (Exception ex)
{
Message = ex.ToString();
return false;
}
}
/// <summary>
/// 消费(持续订阅)
/// </summary>
/// <param name="Func">回调函数,若配置为非自动提交(默认为否),则通过回调函数的返回值判断是否提交</param>
/// <param name="Topic">主题</param>
public void Consume(Func<ConsumeResult<TKey, TValue>, bool> Func, string Topic)
{
_consumer.Subscribe(Topic);
while (keepSubscribe)
{
try
{
var result = _consumer.Consume();
if (Func(result))
{
if (!(bool)ConsumerConfig.EnableAutoCommit)
{
_consumer.Commit(result);//手动提交,如果上面的EnableAutoCommit=true表示自动提交,则无需调用Commit方法
}
}
}
catch
{
}
}
}
/// <summary>
/// 单次消费(消费出当前Kafka缓存的所有数据,并持续监听 300ms,如无新数据生产,则返回(最多一次消费 100条)
/// </summary>
/// <param name="Topic">主题</param>
/// <param name="TimeOut">持续监听时间,单位ms 默认值:300ms</param>
/// <param name="MaxRow">最多单次消费行数 默认值:100行</param>
/// <returns>待消费数据</returns>
public List<ConsumeResult<TKey, TValue>> ConsumeOnce(string Topic, int TimeOut = 300, int MaxRow = 100)
{
_consumer.Subscribe(Topic);
List<ConsumeResult<TKey, TValue>> Res = new List<ConsumeResult<TKey, TValue>>();
while (keepSubscribe)
{
try
{
var result = _consumer.Consume(TimeSpan.FromMilliseconds(TimeOut));
if (result == null)
{
break;
}
else
{
Res.Add(result);
if (!(bool)ConsumerConfig.EnableAutoCommit)
{
_consumer.Commit(result);//手动提交,如果上面的EnableAutoCommit=true表示自动提交,则无需调用Commit方法
}
}
if (Res.Count > MaxRow)
{
break;
}
}
catch
{
}
}
return Res;
}
/// <summary>
/// 单行消费
/// </summary>
/// <param name="Topic">主题</param>
/// <param name="TimeOut">持续监听时间,单位ms 默认值:300ms</param>
/// <returns>待消费数据</returns>
public ConsumeResult<TKey, TValue> ConsumeOneRow(string Topic, int TimeOut = 300)
{
_consumer.Subscribe(Topic);
try
{
if (keepSubscribe)
{
var result = _consumer.Consume(TimeSpan.FromMilliseconds(TimeOut));
if (result != null)
{
if (!(bool)ConsumerConfig.EnableAutoCommit)
{
_consumer.Commit(result);//手动提交,如果上面的EnableAutoCommit=true表示自动提交,则无需调用Commit方法
}
}
return result;
}
return null;
}
catch
{
return null;
}
}
/// <summary>
/// 停止监控并释放资源
/// </summary>
public void Dispose()
{
keepSubscribe = false;
//_consumer.Dispose();
}
}
}
3.3.5 添加一个日志类
使用日志类,便于日志记录和标准统一
using System;
namespace KafkaHelper
{
/// <summary>
/// 默认日志类
/// </summary>
public class LogModel
{
/// <summary>
/// 构造默认日志类(设置默认值 ServerIp,ServerName,TimeStamp,TimeStamp,ApplicationVersion)
/// </summary>
public LogModel()
{
ServerIp = Helper.GetServerIp();
ServerName = Helper.GetServerName();
TimeStamp = System.DateTime.Now;
ApplicationName = Helper.GetApplicationName();
ApplicationVersion = "V1.0.0";
}
/// <summary>
/// 程式名称(默认获取当前程式名称,Web应用 默认为 ISD_Kafka)
/// </summary>
public string ApplicationName { get; set; }
/// <summary>
/// 程式版本(默认为V1.0.0)
/// </summary>
public string ApplicationVersion { get; set; }
/// <summary>
/// 发生时间(默认为当前时间)
/// </summary>
public DateTime TimeStamp { get; set; }
/// <summary>
/// 开始时间
/// </summary>
public DateTime BeginDate { get; set; }
/// <summary>
/// 结束时间
/// </summary>
public DateTime EndDate { get; set; }
/// <summary>
/// 服务器IP(默认抓取当前服务器IP)
/// </summary>
public string ServerIp { get; set; }
/// <summary>
/// 服务器名称(默认抓取当前服务器名称)
/// </summary>
public string ServerName { get; set; }
/// <summary>
/// 客户端IP
/// </summary>
public string ClientIp { get; set; }
/// <summary>
/// 模块(页面路径)
/// </summary>
public string Module { get; set; }
/// <summary>
/// 操作人
/// </summary>
public string Operator { get; set; }
/// <summary>
/// 操作类型 如:Query,Add,Update,Delete,Export等,可自定义
/// </summary>
public string OperationType { get; set; }
/// <summary>
/// 操作状态 如:http请求使用200,404,503等,其他操作 1:成功,0失败等 可自定义
/// </summary>
public string Status { get; set; }
/// <summary>
/// 其他信息
/// </summary>
public string Message { get; set; }
/// <summary>
/// 来源路径
/// </summary>
public string Referrer { get; set; }
/// <summary>
/// 大模块名
/// </summary>
public string Module0 { get; set; }
/// <summary>
/// 中模块名
/// </summary>
public string Module1 { get; set; }
/// <summary>
/// 小模块名
/// </summary>
public string Module2 { get; set; }
/// <summary>
/// 响应时间(ms)
/// </summary>
public long TimeTaken { get; set; }
/// <summary>
/// 请求参数大小
/// </summary>
public long RequestBytes { get; set; }
/// <summary>
/// 返回参数大小
/// </summary>
public long ResponseBytes { get; set; }
}
}
到这里我们的Kafka辅助组件已经开发完毕了
3.3.6 打包发布Nuget
公司内部搭建了Nuget服务器,使用 NuGet Package Explorer 打包组件,并发布到服务器上
四、应用
4.1搭建验证程式
4.1.1 创建一个控制台应用程序
4.1.2 引用Kafka组件
直接是用Nuget安装刚刚发布的组件
我这里把测试项目直接写在了同一个专案中,所以就直接引用了
4.1.3 使用 生产者 和 消费者
using KafkaHelper;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApp1
{
class Program
{
static void Main(string[] args)
{
var msg = "hello world by Younger";
//开启新线程订阅消息
Thread rad = new Thread(new ThreadStart(Read));
rad.Start();
//获取生产者
var producer = KafkaHelper.Producer.GetProducer;
//设置主题
producer.Topic = "test1";
//开启生产者
producer.Start();
inputMsg:
Console.WriteLine($"[send] [{System.DateTime.Now.ToLongTimeString()}] value:{msg}");
//生产消息
producer.Produce("message", msg);
Thread.Sleep(1000);
Console.WriteLine("");
Console.Write("Please input message:");
msg = Console.ReadLine();
goto inputMsg;
}
/// <summary>
/// 订阅回调模式
/// </summary>
private static void Read()
{
//实例化消费者
KafkaHelper.Consumer consumer = new KafkaHelper.Consumer();
//开启消费者
consumer.Start();
//订阅消息
consumer.Consume(r =>
{
Console.WriteLine($"[recieve] [{System.DateTime.Now.ToLongTimeString()}] value:{r.Message.Value}");
return true;
}, "test1");
}
/// <summary>
/// 批量消费模式
/// </summary>
private static void ReadOnce()
{
//实例化消费者
KafkaHelper.Consumer consumer = new KafkaHelper.Consumer();
//开启消费者
consumer.Start();
while (true)
{
//设置指定时长消费一次数据
Thread.Sleep(5000);
//消费并返回当前数据集合
var Res = consumer.ConsumeOnce("test1");
Console.WriteLine();
foreach (var n in Res)
{
Console.WriteLine($"[recieve] [{System.DateTime.Now.ToLongTimeString()}] value:{n.Message.Value}");
}
Console.WriteLine();
}
}
/// <summary>
/// 单笔消费模式
/// </summary>
private static void ReadOneRow()
{
KafkaHelper.Consumer consumer = new KafkaHelper.Consumer();
consumer.Start();
var a = consumer.ConsumeOneRow("test1");
}
}
}
执行结果如下图所示
五、说明
生产者使用单例模式可以降低程式消耗,在使用时
建议Web项目使用Global.asax全局文件配置
示例:
using System;
namespace WebApplication3
{
public class Global : System.Web.HttpApplication
{
protected void Application_Start(object sender, EventArgs e)
{
var producer = KafkaHelper.Producer.GetProducer;
producer.Topic="test1";
producer.Start();
}
}
}
控制台和Windows应用程式直接配置在主函数中
更多推荐
所有评论(0)