Kafka基础教程(三):C#使用Kafka消息队列

摘要:
2、ProducerBuilder在实例化时需要一个配置参数,这个配置参数是一个集合,ProducerConfig其实是实现了这个集合接口的一个类型,在旧版本的Confluent.Kafka中,是没有这个ProducerConfig类型的,之前都是使用Dictionary来构建ProducerBuilder,比如上面的Demo,其实也可以写成:  staticvoidMain{Dictionaryconfig=newDictionary();config["bootstrap.servers"]="192.168.209.133:9092,192.168.209.134:9092,192.168.209.135:9092";varbuilder=newProducerBuilder;builder.SetValueSerializer;//设置序列化方式varproducer=builder.Build();producer.Produce;Console.ReadKey();}这两种方式是一样的效果,只是ProducerConfig对象最终也是生成一个KeyValuePair集合,ProducerConfig中的属性都会有一个Key与它对应,比如上面的ProducerConfig的BootstrapServers属性最终会映射成bootstrap.servers,表示Kafka集群地址,多个地址之间使用逗号分隔。

接上篇Kafka的安装,我安装的Kafka集群地址:192.168.209.133:9092,192.168.209.134:9092,192.168.209.135:9092,所以这里直接使用这个集群来演示

首先创建一个项目,演示采用的是控制台(.net core 3.1),然后使用Nuget安装 Confluent.Kafka 包:

Kafka基础教程(三):C#使用Kafka消息队列第1张

上面的截图中有Confluent.Kafka的源码地址,感兴趣的可以去看看:https://github.com/confluentinc/confluent-kafka-dotnet/

消息发布

先直接上Demo  

    static void Main(string[] args)
    {
        ProducerConfig config = newProducerConfig();
        config.BootstrapServers = "192.168.209.133:9092,192.168.209.134:9092,192.168.209.135:9092";
        var builder = new ProducerBuilder<string, object>(config);
        builder.SetValueSerializer(new KafkaConverter());//设置序列化方式
        var producer =builder.Build();
        producer.Produce("test", new Message<string, object>() { Key = "Test", Value = "hello world"});
Console.ReadKey();
}

上述代码执行后,就可以使用上一节提到的kafkatool工具查看到消息了。

1、消息发布需要使用生产者对象,它由ProducerBuilder<,>类构造,有两个泛型参数,第一个是路由Key的类型,第二个是消息的类型,开发过程中,我们多数使用ProducerBuilder<string, object>或者ProducerBuilder<string, string>。

2、ProducerBuilder<string, object>在实例化时需要一个配置参数,这个配置参数是一个集合(IEnumerable<KeyValuePair<string, string>>),ProducerConfig其实是实现了这个集合接口的一个类型,在旧版本的Confluent.Kafka中,是没有这个ProducerConfig类型的,之前都是使用Dictionary<string,string>来构建ProducerBuilder<string, object>,比如上面的Demo,其实也可以写成:  

    static void Main(string[] args)
    {Dictionary<string, string> config = new Dictionary<string, string>();
        config["bootstrap.servers"]= "192.168.209.133:9092,192.168.209.134:9092,192.168.209.135:9092";
        var builder = new ProducerBuilder<string, object>(config);
        builder.SetValueSerializer(new KafkaConverter());//设置序列化方式
        var producer =builder.Build();
        producer.Produce("test", new Message<string, object>() { Key = "Test", Value = "hello world"});

        Console.ReadKey();
    }

这两种方式是一样的效果,只是ProducerConfig对象最终也是生成一个KeyValuePair<string, string>集合,ProducerConfig中的属性都会有一个Key与它对应,比如上面的ProducerConfig的BootstrapServers属性最终会映射成bootstrap.servers,表示Kafka集群地址,多个地址之间使用逗号分隔。

其他配置信息可以参考官方配置文档:https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

3、Confluent.Kafka还要求提供一个实现了ISerializer<TValue>或者IAsyncSerializer<TValue>接口的序列化类型,比如上面的Demo中的KafkaConverter:  

    public class KafkaConverter : ISerializer<object>{
        /// <summary>
        ///序列化数据成字节
        /// </summary>
        /// <param name="data"></param>
        /// <param name="context"></param>
        /// <returns></returns>
        public byte[] Serialize(objectdata, SerializationContext context)
        {
            var json =JsonConvert.SerializeObject(data);
            returnEncoding.UTF8.GetBytes(json);
        }}

这里我采用的是Json格式序列化,需要使用Nuget安装Newtonsoft.Json。

4、发布消息使用Produce方法,它有几个重载,还有几个异步发布方法。第一个参数是topic,如果想指定Partition,需要使用TopicPartition对象,第二个参数是消息,它是Message<TKey, TValue>类型,Key即路由,Value就是我们的消息,消息会经过ISerializer<TValue>接口序列化之后发送到Kafka,第三个参数是Action<DeliveryReport<TKey, TValue>>类型的委托,它是异步执行的,其实是发布的结果通知。

消息消费

先直接上Demo

    static void Main(string[] args)
    {
        ConsumerConfig config = newConsumerConfig();
        config.BootstrapServers = "192.168.209.133:9092,192.168.209.134:9092,192.168.209.135:9092";
        config.GroupId = "group.1";
        config.AutoOffsetReset =AutoOffsetReset.Earliest;
        config.EnableAutoCommit = false;

        var builder = new ConsumerBuilder<string, object>(config);
        builder.SetValueDeserializer(new KafkaConverter());//设置反序列化方式
        var consumer =builder.Build();
        consumer.Subscribe("test");//订阅消息使用Subscribe方法
        //consumer.Assign(new TopicPartition("test", new Partition(1)));//从指定的Partition订阅消息使用Assign方法

        while (true)
        {
            var result =consumer.Consume();
            Console.WriteLine($"recieve message:{result.Message.Value}");
            consumer.Commit(result);//手动提交,如果上面的EnableAutoCommit=true表示自动提交,则无需调用Commit方法
}
    }

1、和消息发布一样,消费者的构建是通过ConsumerBuilder<, >对象来完成的,同样也有一个ConsumerConfig配置对象,它在旧版本中也是不存在的,旧版本中也是使用Dictionary<string,string>来实现的,比如上面的例子等价于:

    static void Main(string[] args)
    {Dictionary<string, string> config = new Dictionary<string, string>();
        config["bootstrap.servers"] = "192.168.209.133:9092,192.168.209.134:9092,192.168.209.135:9092";
        config["group.id"] = "group.1";
        config["auto.offset.reset"] = "earliest";
        config["enable.auto.commit"] = "false";

        var builder = new ConsumerBuilder<string, object>(config);
        builder.SetValueDeserializer(new KafkaConverter());//设置反序列化方式
        var consumer =builder.Build();
        consumer.Subscribe("test");//订阅消息使用Subscribe方法
        //consumer.Assign(new TopicPartition("test", new Partition(1)));//从指定的Partition订阅消息使用Assign方法

        while (true)
        {
            var result =consumer.Consume();
            Console.WriteLine($"recieve message:{result.Message.Value}");
            consumer.Commit(result);//手动提交,如果上面的EnableAutoCommit=true表示自动提交,则无需调用Commit方法
}
    }

实际上,它和ProducerConfig一样也是一个KeyValuePair<string, string>集合,它的属性最终都会有一个Key与它对应。其他配置信息可以参考官方配置文档:https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

这里顺带提一下这个例子用到的几个配置:  

   BootstrapServers:Kafka集群地址,多个地址之间使用逗号分隔。
GroupId:消费者的Group,注意了,Kafka以Group的形式消费消息,一个消息只会被同一Group中的一个消费者消费,另外,一般的,同一Group中的消费者应该实现相同的逻辑
EnableAutoCommit:是否自动提交,如果设置成true,那么消费者接收到消息就相当于被消费了,我们可以设置成false,然后在我们处理完逻辑之后手动提交。
AutoOffsetReset:自动重置offset的行为,默认是Latest,这是kafka读取数据的策略,有三个可选值:Latest,Earliest,Error,个人推荐使用Earliest    

关于AutoOffsetReset配置,这里再提一点  

  Latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
Earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
Error:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

上面几句话说得有点蒙,举个例子:

当有一个消费者连接到Kafka,那这个消费者该从哪个位置开始消费呢?

首先,我们知道Kafka的消费者以群组Group的形式去消费,Kafka会记录每个Group在每个Partition中的到哪个位置,也就是offset。  

当有消费者连接到Kafka要消费消息是,如果这个消费者所在的群组Group之前有消费过并提交过offset(也就是存在offset记录),那么这个消费者就从这个offset的位置开始消费,这一点Latest,Earliest,Error三个配置的行为是一样的。

但是如果连接的消费者所在的群组是一个新的群组时(也就是不存在offset记录),Latest,Earliest,Error三个配置表现出不一样的行为:

Latest:从连接到Kafka那一刻开始消费之后产生的消息,之前发布的消息不在消费,这也是默认的行为

Earliest:从offset最小值(如果消息全部有效的话,那就是最开头)处开始消费,也就是说会消费连接到Kafka之前发布的消息

Error:简单暴力的抛出异常

2、生产消息需要序列化,消费消息就需要反序列化了,我们需要提供一个实现了IDeserializer<TValue>接口的类型,比如上面的例子采用Json序列化:  

    public class KafkaConverter : IDeserializer<object>{/// <summary>
        ///反序列化字节数据成实体数据
        /// </summary>
        /// <param name="data"></param>
        /// <param name="context"></param>
        /// <returns></returns>
        public object Deserialize(ReadOnlySpan<byte> data, boolisNull, SerializationContext context)
        {
            if (isNull) return null;

            var json =Encoding.UTF8.GetString(data.ToArray());
            try{
                returnJsonConvert.DeserializeObject(json);
            }
            catch{
                returnjson;
            }
        }
    }

3、Kafka是发布/订阅方式的消息队列,Confluent.Kafka提供了两个订阅的方法:Subscribe和Assign

Subscribe:从一个或者多个topic订阅消息

Assign:从一个或者多个topic的指定partition中订阅消息

另外,Confluent.Kafka还提供了两个取消订阅的方法:Unsubscribe和Unassign

4、获取消息使用Consume方法,方法返回一个ConsumeResult<TKey, TValue>对象,我们要的消息就在这个对象中,它还包含offset等等其他信息。

另外,Consume方法会导致当前线程阻塞,直至有获取到消息可以消费,或者超时。

5、如果我们创建消费者时,设置了EnableAutoCommit=false,那么我们就需要手动调用Commit方法提交消息,切记。

完整的Demo例子

上面有提到,生产消息需要一个实现序列化消息接口的对象,而消费消息需要一个实现了反序列化接口的对象,这两者建议用同一个类实现,于是一个完整的实现类:  

    public class KafkaConverter : ISerializer<object>, IDeserializer<object>{
        /// <summary>
        ///序列化数据成字节
        /// </summary>
        /// <param name="data"></param>
        /// <param name="context"></param>
        /// <returns></returns>
        public byte[] Serialize(objectdata, SerializationContext context)
        {
            var json =JsonConvert.SerializeObject(data);
            returnEncoding.UTF8.GetBytes(json);
        }
        /// <summary>
        ///反序列化字节数据成实体数据
        /// </summary>
        /// <param name="data"></param>
        /// <param name="context"></param>
        /// <returns></returns>
        public object Deserialize(ReadOnlySpan<byte> data, boolisNull, SerializationContext context)
        {
            if (isNull) return null;

            var json =Encoding.UTF8.GetString(data.ToArray());
            try{
                returnJsonConvert.DeserializeObject(json);
            }
            catch{
                returnjson;
            }
        }
    }

一个完整的Demo例子如下:  

    static void Main(string[] args)
    {
        var bootstrapServers = "192.168.209.133:9092,192.168.209.134:9092,192.168.209.135:9092";
        var group1 = "group.1";
        var group2 = "group.2";
        var topic = "test";

        new Thread(() =>{
            ConsumerConfig config = newConsumerConfig();
            config.BootstrapServers =bootstrapServers;
            config.GroupId =group1;
            config.AutoOffsetReset =AutoOffsetReset.Earliest;
            config.EnableAutoCommit = false;

            var builder = new ConsumerBuilder<string, object>(config);
            builder.SetValueDeserializer(new KafkaConverter());//设置反序列化方式
            var consumer =builder.Build();
            //consumer.Subscribe(topic);//订阅消息使用Subscribe方法
            consumer.Assign(new TopicPartition(topic, new Partition(0)));//从指定的Partition订阅消息使用Assign方法

            while (true)
            {
                var result =consumer.Consume();
                Console.WriteLine($"{group1} recieve message:{result.Message.Value}");
                consumer.Commit(result);//手动提交,如果上面的EnableAutoCommit=true表示自动提交,则无需调用Commit方法
}

        }).Start();

        new Thread(() =>{
            ConsumerConfig config = newConsumerConfig();
            config.BootstrapServers =bootstrapServers;
            config.GroupId =group2;
            config.AutoOffsetReset =AutoOffsetReset.Earliest;
            config.EnableAutoCommit = false;

            var builder = new ConsumerBuilder<string, object>(config);
            builder.SetValueDeserializer(new KafkaConverter());//设置反序列化方式
            var consumer =builder.Build();
            //consumer.Subscribe(topic);//订阅消息使用Subscribe方法
            consumer.Assign(new TopicPartition(topic, new Partition(1)));//从指定的Partition订阅消息使用Assign方法

            while (true)
            {
                var result =consumer.Consume();
                Console.WriteLine($"{group2} recieve message:{result.Message.Value}");
                consumer.Commit(result);//手动提交,如果上面的EnableAutoCommit=true表示自动提交,则无需调用Commit方法
}

        }).Start();

        int index = 0;
        ProducerConfig config = newProducerConfig();
        config.BootstrapServers =bootstrapServers;
        var builder = new ProducerBuilder<string, object>(config);
        builder.SetValueSerializer(new KafkaConverter());//设置序列化方式
        var producer =builder.Build();
        while (true)
        {
            Console.Write("请输入消息:");
            var line =Console.ReadLine();

            int partition = index % 3;
            var topicPartition = new TopicPartition(topic, newPartition(partition));
            producer.Produce(topicPartition, new Message<string, object>() { Key = "Test", Value =line });

            index++;
        }
    }

封装使用

这里做一个简单的封装,使用几个常用的配置以方便使用,当然,还是要使用nuget安装 Confluent.Kafka 和 Newtonsoft.Json两个包,具体几个类如下:  

Kafka基础教程(三):C#使用Kafka消息队列第2张Kafka基础教程(三):C#使用Kafka消息队列第3张
    public abstract classKafkaBaseOptions
    {
        /// <summary>
        ///服务器地址
        /// </summary>
        public string[] BootstrapServers { get; set; }
    }
KafkaBaseOptions
Kafka基础教程(三):C#使用Kafka消息队列第4张Kafka基础教程(三):C#使用Kafka消息队列第5张
    public classKafkaConsumer : IDisposable
    {
        ConsumerBuilder<string, object>builder;
        List<IConsumer<string, object>>consumers;
        bool disposed = false;

        /// <summary>
        ///kafka服务节点
        /// </summary>
        public string BootstrapServers { get; private set; }
        /// <summary>
        ///群组
        /// </summary>
        public string GroupId { get; private set; }
        /// <summary>
        ///是否允许自动提交(enable.auto.commit)
        /// </summary>
        public bool EnableAutoCommit { get; set; } = false;

        /// <summary>
        ///异常事件
        /// </summary>
        public event Action<object, Exception>ErrorHandler;
        /// <summary>
        ///统计事件
        /// </summary>
        public event Action<object, string>StatisticsHandler;
        /// <summary>
        ///日志事件
        /// </summary>
        public event Action<object, KafkaLogMessage>LogHandler;

        public KafkaConsumer(string groupId, params string[] bootstrapServers)
        {
            if (bootstrapServers == null || bootstrapServers.Length == 0)
            {
                throw new Exception("at least one server must be assigned");
            }

            this.GroupId =groupId;
            this.BootstrapServers = string.Join(",", bootstrapServers);
            this.consumers = new List<IConsumer<string, object>>();
        }

        #region Private
        /// <summary>
        ///创建消费者生成器
        /// </summary>
        private voidCreateConsumerBuilder()
        {
            if(disposed)
            {
                throw newObjectDisposedException(nameof(KafkaConsumer));
            }

            if (builder == null)
            {
                lock (this)
                {
                    if (builder == null)
                    {
                        ConsumerConfig config = newConsumerConfig();
                        config.BootstrapServers =BootstrapServers;
                        config.GroupId =GroupId;
                        config.AutoOffsetReset =AutoOffsetReset.Earliest;
                        config.EnableAutoCommit =EnableAutoCommit;
                        //config.EnableAutoOffsetStore = true;
                        //config.IsolationLevel = IsolationLevel.ReadCommitted;
                        //config.MaxPollIntervalMs = 10000;


                        //List<KeyValuePair<string, string>> config = new List<KeyValuePair<string, string>>();
                        //config.Add(new KeyValuePair<string, string>("bootstrap.servers", BootstrapServers));
                        //config.Add(new KeyValuePair<string, string>("group.id", GroupId));
                        //config.Add(new KeyValuePair<string, string>("auto.offset.reset", "earliest"));
                        //config.Add(new KeyValuePair<string, string>("enable.auto.commit", EnableAutoCommit.ToString().ToLower()));
                        //config.Add(new KeyValuePair<string, string>("max.poll.interval.ms", "10000"));
                        //config.Add(new KeyValuePair<string, string>("session.timeout.ms", "10000"));
                        //config.Add(new KeyValuePair<string, string>("isolation.level", "read_uncommitted"));
builder = new ConsumerBuilder<string, object>(config);

                        Action<Delegate, object> tryCatchWrap = (@delegate, arg) =>{
                            try{
                                @delegate?.DynamicInvoke(arg);
                            }
                            catch{ }
                        };
                        builder.SetErrorHandler((p, e) => tryCatchWrap(ErrorHandler, newException(e.Reason)));
                        builder.SetStatisticsHandler((p, e) =>tryCatchWrap(StatisticsHandler, e));
                        builder.SetLogHandler((p, e) => tryCatchWrap(LogHandler, newKafkaLogMessage(e)));
                        builder.SetValueDeserializer(newKafkaConverter());
                    }
                }
            }
        }
        /// <summary>
        ///内部处理消息
        /// </summary>
        /// <param name="consumer"></param>
        /// <param name="cancellationToken"></param>
        /// <param name="action"></param>
        private void InternalListen(IConsumer<string, object> consumer, CancellationToken cancellationToken, Action<RecieveResult>action)
        {
            try{
                var result =consumer.Consume(cancellationToken);
                if (!cancellationToken.IsCancellationRequested)
                {
                    CancellationTokenSource cancellationTokenSource = newCancellationTokenSource();
                    if (!EnableAutoCommit && result != null)
                    {
                        cancellationTokenSource.Token.Register(() =>{
                            consumer.Commit(result);
                        });
                    }
                    action?.Invoke(result == null ? null : newRecieveResult(result, cancellationTokenSource));
                }
            }
            catch{ }
        }
        /// <summary>
        ///验证消费主题和分区
        /// </summary>
        /// <param name="subscribers"></param>
        private void CheckSubscribers(paramsKafkaSubscriber[] subscribers)
        {
            if (subscribers == null || subscribers.Length == 0)
            {
                throw new InvalidOperationException("subscriber cann't be empty");
            }

            if (subscribers.Any(f => string.IsNullOrEmpty(f.Topic)))
            {
                throw new InvalidOperationException("topic cann't be empty");
            }
        }
        /// <summary>
        ///设置监听主题
        /// </summary>
        /// <param name="consumer"></param>
        private void SetSubscribers(IConsumer<string, object> consumer, paramsKafkaSubscriber[] subscribers)
        {
            var topics = subscribers.Where(f => f.Partition == null).Select(f =>f.Topic).ToArray();
            var topicPartitions = subscribers.Where(f => f.Partition != null).Select(f => new TopicPartition(f.Topic, newPartition(f.Partition.Value))).ToArray();

            if (topics.Length > 0)
            {
                consumer.Subscribe(topics);
            }

            if (topicPartitions.Length > 0)
            {
                consumer.Assign(topicPartitions);
            }
        }
        /// <summary>
        ///创建一个消费者
        /// </summary>
        /// <param name="listenResult"></param>
        /// <param name="subscribers"></param>
        /// <returns></returns>
        private IConsumer<string, object> CreateConsumer(ListenResult listenResult, paramsKafkaSubscriber[] subscribers)
        {
            if(disposed)
            {
                throw newObjectDisposedException(nameof(KafkaConsumer));
            }

            CheckSubscribers(subscribers);

            CreateConsumerBuilder();

            var consumer =builder.Build();
            listenResult.Token.Register(() =>{
                consumer.Dispose();
            });

            SetSubscribers(consumer, subscribers);

            consumers.Add(consumer);

            returnconsumer;
        }
        #endregion

        #region Listen
        /// <summary>
        ///监听一次并阻塞当前线程,直至有消息获取或者取消获取
        /// </summary>
        /// <param name="topics"></param>
        public RecieveResult ListenOnce(params string[] topics)
        {
            return ListenOnce(topics.Select(f => new KafkaSubscriber() { Partition = null, Topic =f }).ToArray());
        }
        /// <summary>
        ///监听一次并阻塞当前线程,直至有消息获取或者取消获取
        /// </summary>
        /// <param name="subscribers"></param>
        public RecieveResult ListenOnce(paramsKafkaSubscriber[] subscribers)
        {
            ListenResult listenResult = newListenResult();
            var consumer =CreateConsumer(listenResult, subscribers);

            RecieveResult result = null;
            InternalListen(consumer, listenResult.Token, rr =>{
                result =rr;
            });
            returnresult;
        }
        /// <summary>
        ///异步监听一次
        /// </summary>
        /// <param name="topics"></param>
        /// <returns></returns>
        public async Task<RecieveResult> ListenOnceAsync(params string[] topics)
        {
            return await ListenOnceAsync(topics.Select(f => new KafkaSubscriber() { Partition = null, Topic =f }).ToArray());
        }
        /// <summary>
        ///异步监听一次
        /// </summary>
        /// <param name="subscribers"></param>
        /// <returns></returns>
        public async Task<RecieveResult> ListenOnceAsync(paramsKafkaSubscriber[] subscribers)
        {
            return await Task.Run(() =>{
                returnListenOnce(subscribers);
            });
        }
        /// <summary>
        ///监听
        /// </summary>
        /// <param name="topics"></param>
        /// <param name="action"></param>
        /// <param name="cancellationToken"></param>
        /// <returns></returns>
        public void Listen(string[] topics, Action<RecieveResult> action = null, CancellationToken cancellationToken = default)
        {
            Listen(topics.Select(f => new KafkaSubscriber() { Partition = null, Topic =f }).ToArray(), action, cancellationToken);
        }
        /// <summary>
        ///监听
        /// </summary>
        /// <param name="subscribers"></param>
        /// <param name="action"></param>
        /// <param name="cancellationToken"></param>
        /// <returns></returns>
        public void Listen(KafkaSubscriber[] subscribers, Action<RecieveResult> action = null, CancellationToken cancellationToken = default)
        {
            ListenResult result = newListenResult();
            var consumer =CreateConsumer(result, subscribers);
            cancellationToken.Register(() =>{
                result.Stop();
            });
            while (!result.Stoped)
            {
                InternalListen(consumer, result.Token, action);
            }
        }
        /// <summary>
        ///异步监听
        /// </summary>
        /// <param name="topics"></param>
        /// <param name="action"></param>
        /// <returns></returns>
        public async Task<ListenResult> ListenAsync(string[] topics, Action<RecieveResult> action = null)
        {
            return await ListenAsync(topics.Select(f => new KafkaSubscriber() { Partition = null, Topic =f }).ToArray(), action);
        }
        /// <summary>
        ///异步监听
        /// </summary>
        /// <param name="subscribers"></param>
        /// <param name="action"></param>
        /// <returns></returns>
        public async Task<ListenResult> ListenAsync(KafkaSubscriber[] subscribers, Action<RecieveResult> action = null)
        {
            ListenResult result = newListenResult();
            new Task(() =>{
                var consumer =CreateConsumer(result, subscribers);
                while (!result.Stoped)
                {
                    InternalListen(consumer, result.Token, action);
                }
            }).Start();
            return awaitTask.FromResult(result);
        }
        #endregion
        /// <summary>
        ///释放资源
        /// </summary>
        public voidDispose()
        {
            disposed = true;
            builder = null;

            foreach (var consumer inconsumers)
            {
                consumer?.Close();
                consumer?.Dispose();
            }

            GC.Collect();
        }

        public static KafkaConsumer Create(stringgroupId, KafkaBaseOptions kafkaBaseOptions)
        {
            return newKafkaConsumer(groupId, kafkaBaseOptions.BootstrapServers);
        }
        public override stringToString()
        {
            returnBootstrapServers;
        }
    }

    public classRecieveResult
    {
        CancellationTokenSource cancellationTokenSource;

        internal RecieveResult(ConsumeResult<string, object>consumeResult, CancellationTokenSource cancellationTokenSource)
        {
            this.Topic =consumeResult.Topic;
            this.Message = consumeResult.Message.Value?.ToString();
            this.Offset =consumeResult.Offset.Value;
            this.Partition =consumeResult.Partition.Value;
            this.Key =consumeResult.Message.Key;

            this.cancellationTokenSource =cancellationTokenSource;
        }

        /// <summary>
        ///Kafka消息所属的主题
        /// </summary>
        public string Topic { get; private set; }
        /// <summary>
        ///键值
        /// </summary>
        public string Key { get; private set; }
        /// <summary>
        ///我们需要处理的消息具体的内容
        /// </summary>
        public string Message { get; private set; }
        /// <summary>
        ///Kafka数据读取的当前位置
        /// </summary>
        public long Offset { get; private set; }
        /// <summary>
        ///消息所在的物理分区
        /// </summary>
        public int Partition { get; private set; }

        /// <summary>
        ///提交
        /// </summary>
        public voidCommit()
        {
            if (cancellationTokenSource == null || cancellationTokenSource.IsCancellationRequested) return;

            cancellationTokenSource.Cancel();
            cancellationTokenSource.Dispose();
            cancellationTokenSource = null;
        }
    }
    public classListenResult : IDisposable
    {
        CancellationTokenSource cancellationTokenSource;

        /// <summary>
        ///CancellationToken
        /// </summary>
        public CancellationToken Token { get { returncancellationTokenSource.Token; } }
        /// <summary>
        ///是否已停止
        /// </summary>
        public bool Stoped { get { returncancellationTokenSource.IsCancellationRequested; } }

        publicListenResult()
        {
            cancellationTokenSource = newCancellationTokenSource();
        }

        /// <summary>
        ///停止监听
        /// </summary>
        public voidStop()
        {
            cancellationTokenSource.Cancel();
        }

        public voidDispose()
        {
            Stop();
        }
    }
KafkaConsumer
Kafka基础教程(三):C#使用Kafka消息队列第6张Kafka基础教程(三):C#使用Kafka消息队列第7张
    public class KafkaConverter : ISerializer<object>, IDeserializer<object>{
        /// <summary>
        ///序列化数据成字节
        /// </summary>
        /// <param name="data"></param>
        /// <param name="context"></param>
        /// <returns></returns>
        public byte[] Serialize(objectdata, SerializationContext context)
        {
            var json =JsonConvert.SerializeObject(data);
            returnEncoding.UTF8.GetBytes(json);
        }
        /// <summary>
        ///反序列化字节数据成实体数据
        /// </summary>
        /// <param name="data"></param>
        /// <param name="context"></param>
        /// <returns></returns>
        public object Deserialize(ReadOnlySpan<byte> data, boolisNull, SerializationContext context)
        {
            if (isNull) return null;

            var json =Encoding.UTF8.GetString(data.ToArray());
            try{
                returnJsonConvert.DeserializeObject(json);
            }
            catch{
                returnjson;
            }
        }
    }
KafkaConverter
Kafka基础教程(三):C#使用Kafka消息队列第8张Kafka基础教程(三):C#使用Kafka消息队列第9张
    public classKafkaMessage
    {
        /// <summary>
        ///主题
        /// </summary>
        public string Topic { get; set; }
        /// <summary>
        ///分区,不指定分区即交给kafka指定分区
        /// </summary>
        public int? Partition { get; set; }
        /// <summary>
        ///键值
        /// </summary>
        public string Key { get; set; }
        /// <summary>
        ///消息
        /// </summary>
        public object Message { get; set; }
    }
KafkaMessage
Kafka基础教程(三):C#使用Kafka消息队列第10张Kafka基础教程(三):C#使用Kafka消息队列第11张
    public classKafkaProducer : IDisposable
    {
        /// <summary>
        ///负责生成producer
        /// </summary>
        ProducerBuilder<string, object>builder;
        ConcurrentQueue<IProducer<string, object>>producers;
        bool disposed = false;

        /// <summary>
        ///kafka服务节点
        /// </summary>
        public string BootstrapServers { get; private set; }
        /// <summary>
        ///Flush超时时间(ms)
        /// </summary>
        public int FlushTimeOut { get; set; } = 10000;
        /// <summary>
        ///保留发布者数
        /// </summary>
        public int InitializeCount { get; set; } = 5;
        /// <summary>
        ///默认的消息键值
        /// </summary>
        public string DefaultKey { get; set; }
        /// <summary>
        ///默认的主题
        /// </summary>
        public string DefaultTopic { get; set; }
        /// <summary>
        ///异常事件
        /// </summary>
        public event Action<object, Exception>ErrorHandler;
        /// <summary>
        ///统计事件
        /// </summary>
        public event Action<object, string>StatisticsHandler;
        /// <summary>
        ///日志事件
        /// </summary>
        public event Action<object, KafkaLogMessage>LogHandler;

        public KafkaProducer(params string[] bootstrapServers)
        {
            if (bootstrapServers == null || bootstrapServers.Length == 0)
            {
                throw new Exception("at least one server must be assigned");
            }

            this.BootstrapServers = string.Join(",", bootstrapServers);
            producers = new ConcurrentQueue<IProducer<string, object>>();
        }

        #region Private
        /// <summary>
        ///producer构造器
        /// </summary>
        /// <returns></returns>
        private ProducerBuilder<string, object>CreateProducerBuilder()
        {
            if (builder == null)
            {
                lock (this)
                {
                    if (builder == null)
                    {
                        ProducerConfig config = newProducerConfig();
                        config.BootstrapServers =BootstrapServers;

                        //var config = new KeyValuePair<string, string>[] { new KeyValuePair<string, string>("bootstrap.servers", BootstrapServers) };
builder = new ProducerBuilder<string, object>(config);
                        Action<Delegate, object> tryCatchWrap = (@delegate, arg) =>{
                            try{
                                @delegate?.DynamicInvoke(arg);
                            }
                            catch{ }
                        };
                        builder.SetErrorHandler((p, e) => tryCatchWrap(ErrorHandler, newException(e.Reason)));
                        builder.SetStatisticsHandler((p, e) =>tryCatchWrap(StatisticsHandler, e));
                        builder.SetLogHandler((p, e) => tryCatchWrap(LogHandler, newKafkaLogMessage(e)));
                        builder.SetValueSerializer(newKafkaConverter());
                    }
                }
            }

            returnbuilder;
        }
        /// <summary>
        ///租赁一个发布者
        /// </summary>
        /// <returns></returns>
        private IProducer<string, object>RentProducer()
        {
            if(disposed)
            {
                throw newObjectDisposedException(nameof(KafkaProducer));
            }

            IProducer<string, object>producer;
            lock(producers)
            {
                if (!producers.TryDequeue(out producer) || producer == null)
                {
                    CreateProducerBuilder();
                    producer =builder.Build();
                }
            }
            returnproducer;
        }
        /// <summary>
        ///返回保存发布者
        /// </summary>
        /// <param name="producer"></param>
        private void ReturnProducer(IProducer<string, object>producer)
        {
            if (disposed) return;

            lock(producers)
            {
                if (producers.Count < InitializeCount && producer != null)
                {
                    producers.Enqueue(producer);
                }
                else{
                    producer?.Dispose();
                }
            }
        }
        #endregion

        #region Publish
        /// <summary>
        ///发送消息
        /// </summary>
        /// <param name="key"></param>
        /// <param name="message"></param>
        /// <param name="callback"></param>
        public void PublishWithKey(string key, object message, Action<DeliveryResult> callback = null)
        {
            Publish(DefaultTopic, null, key, message, callback);
        }
        /// <summary>
        ///发送消息
        /// </summary>
        /// <param name="partition"></param>
        /// <param name="key"></param>
        /// <param name="message"></param>
        /// <param name="callback"></param>
        public void PublishWithKey(int? partition, string key, object message, Action<DeliveryResult> callback = null)
        {
            Publish(DefaultTopic, partition, key, message, callback);
        }
        /// <summary>
        ///发送消息
        /// </summary>
        /// <param name="topic"></param>
        /// <param name="key"></param>
        /// <param name="message"></param>
        /// <param name="callback"></param>
        public void PublishWithKey(string topic, string key, object message, Action<DeliveryResult> callback = null)
        {
            Publish(topic, null, key, message, callback);
        }
        /// <summary>
        ///发送消息
        /// </summary>
        /// <param name="message"></param>
        /// <param name="callback"></param>
        public void Publish(object message, Action<DeliveryResult> callback = null)
        {
            Publish(DefaultTopic, null, DefaultKey, message, callback);
        }
        /// <summary>
        ///发送消息
        /// </summary>
        /// <param name="partition"></param>
        /// <param name="message"></param>
        /// <param name="callback"></param>
        public void Publish(int? partition, object message, Action<DeliveryResult> callback = null)
        {
            Publish(DefaultTopic, partition, DefaultKey, message, callback);
        }
        /// <summary>
        ///发送消息
        /// </summary>
        /// <param name="topic"></param>
        /// <param name="message"></param>
        /// <param name="callback"></param>
        public void Publish(string topic, object message, Action<DeliveryResult> callback = null)
        {
            Publish(topic, null, DefaultKey, message, callback);
        }
        /// <summary>
        ///发送消息
        /// </summary>
        /// <param name="topic"></param>
        /// <param name="partition"></param>
        /// <param name="message"></param>
        /// <param name="callback"></param>
        public void Publish(string topic, int? partition, object message, Action<DeliveryResult> callback = null)
        {
            Publish(topic, partition, DefaultKey, message, callback);
        }
        /// <summary>
        ///发送消息
        /// </summary>
        /// <param name="topic"></param>
        /// <param name="partition"></param>
        /// <param name="key"></param>
        /// <param name="message"></param>
        /// <param name="callback"></param>
        public void Publish(string topic, int? partition, string key, object message, Action<DeliveryResult> callback = null)
        {
            Publish(new KafkaMessage() { Key = key, Message = message, Partition = partition, Topic =topic }, callback);
        }
        /// <summary>
        ///发送消息
        /// </summary>
        /// <param name="kafkaMessage"></param>
        /// <param name="callback"></param>
        public void Publish(KafkaMessage kafkaMessage, Action<DeliveryResult> callback = null)
        {
            if (string.IsNullOrEmpty(kafkaMessage.Topic))
            {
                throw new ArgumentException("topic can not be empty", nameof(kafkaMessage.Topic));
            }
            if (string.IsNullOrEmpty(kafkaMessage.Key))
            {
                throw new ArgumentException("key can not be empty", nameof(kafkaMessage.Key));
            }

            var producer =RentProducer();
            if (kafkaMessage.Partition == null)
            {
                producer.Produce(kafkaMessage.Topic, new Message<string, object>() { Key = kafkaMessage.Key, Value = kafkaMessage.Message }, dr => callback?.Invoke(newDeliveryResult(dr)));
            }
            else{
                var topicPartition = new TopicPartition(kafkaMessage.Topic, newPartition(kafkaMessage.Partition.Value));
                producer.Produce(topicPartition, new Message<string, object>() { Key = kafkaMessage.Key, Value = kafkaMessage.Message }, dr => callback?.Invoke(newDeliveryResult(dr)));
            }

            producer.Flush(TimeSpan.FromMilliseconds(FlushTimeOut));

            ReturnProducer(producer);
        }
        #endregion

        #region PublishAsync
        /// <summary>
        ///异步发送消息
        /// </summary>
        /// <param name="key"></param>
        /// <param name="message"></param>
        public async Task<DeliveryResult> PublishWithKeyAsync(string key, objectmessage)
        {
            return await PublishAsync(DefaultTopic, null, key, message);
        }
        /// <summary>
        ///异步发送消息
        /// </summary>
        /// <param name="partition"></param>
        /// <param name="key"></param>
        /// <param name="message"></param>
        public async Task<DeliveryResult> PublishWithKeyAsync(int? partition, string key, objectmessage)
        {
            return awaitPublishAsync(DefaultTopic, partition, key, message);
        }
        /// <summary>
        ///异步发送消息
        /// </summary>
        /// <param name="topic"></param>
        /// <param name="key"></param>
        /// <param name="message"></param>
        public async Task<DeliveryResult> PublishWithKeyAsync(string topic, string key, objectmessage)
        {
            return await PublishAsync(topic, null, key, message);
        }
        /// <summary>
        ///异步发送消息
        /// </summary>
        /// <param name="message"></param>
        public async Task<DeliveryResult> PublishAsync(objectmessage)
        {
            return await PublishAsync(DefaultTopic, null, DefaultKey, message);
        }
        /// <summary>
        ///异步发送消息
        /// </summary>
        /// <param name="partition"></param>
        /// <param name="message"></param>
        public async Task<DeliveryResult> PublishAsync(int? partition, objectmessage)
        {
            return awaitPublishAsync(DefaultTopic, partition, DefaultKey, message);
        }
        /// <summary>
        ///异步发送消息
        /// </summary>
        /// <param name="topic"></param>
        /// <param name="message"></param>
        public async Task<DeliveryResult> PublishAsync(string topic, objectmessage)
        {
            return await PublishAsync(topic, null, DefaultKey, message);
        }
        /// <summary>
        ///异步发送消息
        /// </summary>
        /// <param name="topic"></param>
        /// <param name="partition"></param>
        /// <param name="message"></param>
        public async Task<DeliveryResult> PublishAsync(string topic, int? partition, objectmessage)
        {
            return awaitPublishAsync(topic, partition, DefaultKey, message);
        }
        /// <summary>
        ///异步发送消息
        /// </summary>
        /// <param name="topic"></param>
        /// <param name="partition"></param>
        /// <param name="key"></param>
        /// <param name="message"></param>
        /// <returns></returns>
        public async Task<DeliveryResult> PublishAsync(string topic, int? partition, string key, objectmessage)
        {
            return await PublishAsync(new KafkaMessage() { Key = key, Message = message, Partition = partition, Topic =topic });
        }
        /// <summary>
        ///异步发送消息
        /// </summary>
        /// <param name="kafkaMessage"></param>
        /// <returns></returns>
        public async Task<DeliveryResult>PublishAsync(KafkaMessage kafkaMessage)
        {
            if (string.IsNullOrEmpty(kafkaMessage.Topic))
            {
                throw new ArgumentException("topic can not be empty", nameof(kafkaMessage.Topic));
            }
            if (string.IsNullOrEmpty(kafkaMessage.Key))
            {
                throw new ArgumentException("key can not be empty", nameof(kafkaMessage.Key));
            }

            var producer =RentProducer();
            DeliveryResult<string, object>deliveryResult;
            if (kafkaMessage.Partition == null)
            {
                deliveryResult = await producer.ProduceAsync(kafkaMessage.Topic, new Message<string, object>() { Key = kafkaMessage.Key, Value =kafkaMessage.Message });
            }
            else{
                var topicPartition = new TopicPartition(kafkaMessage.Topic, newPartition(kafkaMessage.Partition.Value));
                deliveryResult = await producer.ProduceAsync(topicPartition, new Message<string, object>() { Key = kafkaMessage.Key, Value =kafkaMessage.Message });
            }

            producer.Flush(new TimeSpan(0, 0, 0, 0, FlushTimeOut));

            ReturnProducer(producer);

            return newDeliveryResult(deliveryResult);
        }

        #endregion

        /// <summary>
        ///释放资源
        /// </summary>
        public voidDispose()
        {
            disposed = true;
            while (producers.Count > 0)
            {
                IProducer<string, object>producer;
                producers.TryDequeue(outproducer);
                producer?.Dispose();
            }
            GC.Collect();
        }

        public staticKafkaProducer Create(KafkaBaseOptions kafkaBaseOptions)
        {
            return newKafkaProducer(kafkaBaseOptions.BootstrapServers);
        }
        public override stringToString()
        {
            returnBootstrapServers;
        }
    }

    public classDeliveryResult
    {
        internal DeliveryResult(DeliveryResult<string, object>deliveryResult)
        {
            this.Topic =deliveryResult.Topic;
            this.Partition =deliveryResult.Partition.Value;
            this.Offset =deliveryResult.Offset.Value;
            switch(deliveryResult.Status)
            {
                case PersistenceStatus.NotPersisted: this.Status = DeliveryResultStatus.NotPersisted; break;
                case PersistenceStatus.Persisted: this.Status = DeliveryResultStatus.Persisted; break;
                case PersistenceStatus.PossiblyPersisted: this.Status = DeliveryResultStatus.PossiblyPersisted; break;
            }
            this.Key =deliveryResult.Key;
            this.Message =deliveryResult.Value;

            if (deliveryResult is DeliveryReport<string, object>)
            {
                var dr = deliveryResult as DeliveryReport<string, object>;
                this.IsError =dr.Error.IsError;
                this.Reason =dr.Error.Reason;
            }
        }

        /// <summary>
        ///是否异常
        /// </summary>
        public bool IsError { get; private set; }
        /// <summary>
        ///异常原因
        /// </summary>
        public string Reason { get; private set; }
        /// <summary>
        ///主题
        /// </summary>
        public string Topic { get; private set; }
        /// <summary>
        ///分区
        /// </summary>
        public int Partition { get; private set; }
        /// <summary>
        ///偏移
        /// </summary>
        public long Offset { get; private set; }
        /// <summary>
        ///状态
        /// </summary>
        public DeliveryResultStatus Status { get; private set; }
        /// <summary>
        ///消息键值
        /// </summary>
        public string Key { get; private set; }
        /// <summary>
        ///消息
        /// </summary>
        public object Message { get; private set; }
    }
    public enumDeliveryResultStatus
    {
        /// <summary>
        ///消息提交失败
        /// </summary>
        NotPersisted = 0,
        /// <summary>
        ///消息已提交,是否成功未知
        /// </summary>
        PossiblyPersisted = 1,
        /// <summary>
        ///消息提交成功
        /// </summary>
        Persisted = 2}
    public classKafkaLogMessage
    {
        internalKafkaLogMessage(LogMessage logMessage)
        {
            this.Name =logMessage.Name;
            this.Facility =logMessage.Facility;
            this.Message =logMessage.Message;

            switch(logMessage.Level)
            {
                case SyslogLevel.Emergency: this.Level = LogLevel.Emergency; break;
                case SyslogLevel.Alert: this.Level = LogLevel.Alert; break;
                case SyslogLevel.Critical: this.Level = LogLevel.Critical; break;
                case SyslogLevel.Error: this.Level = LogLevel.Error; break;
                case SyslogLevel.Warning: this.Level = LogLevel.Warning; break;
                case SyslogLevel.Notice: this.Level = LogLevel.Notice; break;
                case SyslogLevel.Info: this.Level = LogLevel.Info; break;
                case SyslogLevel.Debug: this.Level = LogLevel.Debug; break;
            }
        }
        /// <summary>
        ///名称
        /// </summary>
        public string Name { get; private set; }
        /// <summary>
        ///级别
        /// </summary>
        public LogLevel Level { get; private set; }
        /// <summary>
        ///装置
        /// </summary>
        public string Facility { get; private set; }
        /// <summary>
        ///信息
        /// </summary>
        public string Message { get; private set; }

        public enumLogLevel
        {
            Emergency = 0,
            Alert = 1,
            Critical = 2,
            Error = 3,
            Warning = 4,
            Notice = 5,
            Info = 6,
            Debug = 7}
    }
KafkaProducer
Kafka基础教程(三):C#使用Kafka消息队列第12张Kafka基础教程(三):C#使用Kafka消息队列第13张
    public classKafkaSubscriber
    {
        /// <summary>
        ///主题
        /// </summary>
        public string Topic { get; set; }
        /// <summary>
        ///分区
        /// </summary>
        public int? Partition { get; set; }
    }
KafkaSubscriber

使用方法,比如上面的Demo例子:  

    classProgram
    {
        static void Main(string[] args)
        {
            var bootstrapServers = new string[] { "192.168.209.133:9092", "192.168.209.134:9092", "192.168.209.135:9092"};
            var group1 = "group.1";
            var group2 = "group.2";
            var topic = "test";

            {
                KafkaConsumer consumer = newKafkaConsumer(group1, bootstrapServers);
                consumer.EnableAutoCommit = false;
                consumer.ListenAsync(new KafkaSubscriber[] { new KafkaSubscriber() { Topic = topic, Partition = 0 } }, result =>{
                    Console.WriteLine($"{group1} recieve message:{result.Message}");
                    result.Commit();//手动提交,如果上面的EnableAutoCommit=true表示自动提交,则无需调用Commit方法
}).Wait();
            }

            {
                KafkaConsumer consumer = newKafkaConsumer(group2, bootstrapServers);
                consumer.EnableAutoCommit = false;
                consumer.ListenAsync(new KafkaSubscriber[] { new KafkaSubscriber() { Topic = topic, Partition = 1 } }, result =>{
                    Console.WriteLine($"{group2} recieve message:{result.Message}");
                    result.Commit();//手动提交,如果上面的EnableAutoCommit=true表示自动提交,则无需调用Commit方法
}).Wait();
            }

            KafkaProducer producer = newKafkaProducer(bootstrapServers);

            int index = 0;
            while (true)
            {
                Console.Write("请输入消息:");
                var line =Console.ReadLine();

                int partition = index % 3;
                producer.Publish(topic, partition, "Test", line);
                index++;
            }
        }
    }

免责声明:文章转载自《Kafka基础教程(三):C#使用Kafka消息队列》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇用随机森林分类主机连接虚拟机的mysql 记录下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

&amp;lt;转&amp;gt;PHP中正则表达式函数

PHP中的正则表达式函数       在PHP中有两套正则表达式函数库。一套是由PCRE(Perl Compatible Regular Expression)库提供的,基于传统型NFA。PCRE库使用和Perl相同的语法规则实现了正则表达式的模式匹配,其使用以“preg_”为前缀命名的函数。另一套是由POSIX(Portable Operation Sy...

poi提取docx中的文字和图片

package com.fry.poiDemo.dao; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.PrintStream; import...

WPF 制作圆角按钮

在程序对应坐置插入以下代码,或是先拖一个按钮控件到窗体中,再替换对应的代码。 修改CornerRadius="18,3,18,3" 就可以改变圆角大小 按钮效果: <Button Content="Button" HorizontalAlignment="Left" Margin="19,10,0,0" VerticalAlignment="Top...

andoird软件开发之一个记录账号密码的APP--bmob后台

1.app功能:通过注册登录账户,拥有一个账户本,能够将平时自己容易的忘记的账户记录下来,并可以保持到云端,不需要担心数据丢失,只要登录账户,便可获取到自己的账户本。 2.实现的效果图,如下: 以下界面分别为注册界面、登录界面、提交账户内容界面、账户列表界面、长按删除账户信息、具体账户内容信息 3.实现的工程目录如下: 4.实现的具体过程: a.布局...

【Swift】UILabel的简单操作方法

@IBActionfuncbuttonClick_LabelCtrl(sender: AnyObject) { //定义CGRect来初始化UILable varframe: CGRect= CGRect(x: 50, y: 50, 200, height: 200) varlabel: UILabel= UILabel(frame: frame) //获...

【工具类用法】Hutool里的生成唯一Id唯的工具类

目录 一、介绍 二、使用 2.1 UUID 2.2 ObjectId 2.3 Snowflake 三、测试类 3.1 测试类 3.2 测试结果 一、介绍在分布式环境中,唯一ID生成应用十分广泛,生成方法也多种多样,Hutool针对一些常用生成策略做了简单封装。 唯一ID生成器的工具类,涵盖了: UUIDObjectId(MongoDB)Snowflake(...