前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >.NET Core如何通过认证机制访问Kafka?

.NET Core如何通过认证机制访问Kafka?

作者头像
Edison Zhou
发布2022-12-22 14:09:45
1.5K0
发布2022-12-22 14:09:45
举报
文章被收录于专栏:EdisonTalkEdisonTalk

大家好,我是Edison。

最近有一个ASP.NET Core使用认证机制访问Kafka的需求,加之我们又使用了CAP这个开源项目使用的Kafka,于是网上寻找了一番发现对应资料太少,于是调查了一番,做了如下的笔记,希望对你有用。

背景

在实际场景中,开发环境的Kafka服务器一般没有要求通过认证即可发布和读取消息,并且还可以随意创建Topic和Consumer Group。但是,在生产环境中则一般有较强的安全需求,无法随意创建Topic和Consumer,还做了一些认证和权限约束。而在ASP.NET Core的解决方案中,我们经常使用到CAP这个开源项目作为事件总线,在CAP.Kafka项目中,只提供了最基础的Servers配置,文档示例中也只给出了这种只适合开发环境的配置示例,而对于对安全要求较高的生产环境,则需要我们研究一下Kafka官方的配置文档,在CAP.Kafka的MainConfig对象中进行主动配置Key/Value。

本文会首先介绍一下Kafka的认证机制,然后会给出基于CAP项目通过认证方式访问Kafka的示例。

Kafka认证机制

自 0.9.0.0 版本开始,Kafka 正式引入了认证机制,用于实现基础的安全用户认证,这是将 Kafka 上云或进行多租户管理的必要步骤。目前Kafka的版本,已支持基于 SSL 和 基于 SASL 的安全认证机制。

基于 SSL 的认证主要是指 Broker 和客户端的双路认证(2-way authentication)。通常来说,SSL 加密(Encryption)已经启用了单向认证,即客户端认证 Broker 的证书(Certificate)。如果要做 SSL 认证,那么我们要启用双路认证,也就是说 Broker 也要认证客户端的证书。

Note:Kafka 的源码中依然是使用 SSL 而不是 TLS 来表示这类东西的。不过,今天出现的所有 SSL 字眼,我们都可以认为它们是和 TLS 等价的。

Kafka 还支持通过 SASL 做客户端认证。SASL 是提供认证和数据安全服务的框架。Kafka 支持的 SASL 机制有 5 种:

  • GSSAPI:也就是 Kerberos 使用的安全接口,是在 0.9 版本中被引入的。
  • PLAIN:是使用简单的用户名 / 密码认证的机制,在 0.10 版本中被引入。
  • SCRAM:主要用于解决 PLAIN 机制安全问题的新机制,是在 0.10.2 版本中被引入的。
  • OAUTHBEARER:是基于 OAuth 2 认证框架的新机制,在 2.0 版本中被引进。
  • Delegation Token:补充现有 SASL 机制的轻量级认证机制,是在 1.1.0 版本被引入的。

在实际应用中,一般建议 使用 SSL 来做通信加密,使用 SASL 来做 Kafka 的认证实现。对于小型公司来说,SASL/PLAIN 的配置和运维成本相对较小,比较适合Kafka集群配置

下图将这些认证机制进行了汇总,源自极客时间胡夕《Kafka核心技术与实战》。

通过认证机制使用Kafka

这里假设我们已经搭建好了一个Kafka集群,并且配置了SASL/PLAIN方式,并且创建了一个账号“kafka_user”,密码为"kakfa_user_password@2022abcdlk!",约束客户端只能通过SSL方式带上CA证书加密访问。

假设我们已经有了一个ASP.NET Core应用,并且之前已经在开发环境通过CAP项目使用了Kafka,那么对于生产环境或安全要求较高的测试环境,我们应该如何修改呢?

通过查看CAP的文档,在CAP.Kafka中其实只提供了几个最基础的配置项:

而其他的配置项,我们只能通过CAP.Kafka提供的MainConfig这个Dictionary类进行手动添加,如下所示:

代码语言:javascript
复制
services.AddCap(capOptions => 
{
    capOptions.UseKafka(kafkaOption=>
    {
        // kafka options.
        // kafkaOptions.MainConfig.Add("", "");
    });
});

那么,我们应该添加哪些配置呢?它们的key和可选的value又是哪些呢?CAP给出了一个参考链接:https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md,它是librdkafka项目的配置参数的文档。

通过研究配置项文档,我们大概需要以下一些参数,将其添加到MainConfig字典中,这些参数不仅适配Producer也适配Consumer。

代码语言:javascript
复制
namespace Microsoft.Extensions.DependencyInjection
{
    public static class ApplicationServiceCollectionExtensions
    {
        public static IServiceCollection AddEventBus(this IServiceCollection services, IConfigurationSection configuration)
        {
            services.AddCap(option =>
            {
                option.UseInMemoryStorage();
                option.UseKafka(kfkOption =>
                {
                    kfkOption.Servers = configuration["KafkaBootstrapServers"];
                    if (Convert.ToBoolean(configuration["EnableAuthorization"]))
                    {
                        kfkOption.MainConfig.Add("security.protocol", "sasl_ssl");
                        kfkOption.MainConfig.Add("sasl.mechanism", "PLAIN");
                        kfkOption.MainConfig.Add("sasl.username", configuration["SaslUserName"]);                        
                        kfkOption.MainConfig.Add("sasl.password", configuration["SaslPassword"]);                        
                        kfkOption.MainConfig.Add("ssl.ca.location", configuration["SslCertificatePath"]);                        
                        kfkOption.MainConfig.Add("enable.ssl.certificate.verification", configuration["EnableSslCertificateVerification"]);                        
                    }
                });
                option.SucceedMessageExpiredAfter = 3600 * 24 * Convert.ToInt32(configuration["SuccessMsgExpireDays"]);
            });
            return services;
        }
    }
}

在Program.cs中调用AddEventBus方法:

代码语言:javascript
复制
builder.Services.AddEventBus(builder.Configuration.GetSection("EventBusConfigs"));

在appsettings中的配置如下:

代码语言:javascript
复制
{
  "EventBusConfigs": {
    "KafkaBootstrapServers": "prd.kafka01.com:9093, prd.kafka02.com:9093, prd.kafka03.com:9093",
    "SuccessMsgExpireDays": 7,
    "EnableAuthorization": true,
    "SaslUserName": "kafka_user",
    "SaslPassword": "kakfa_user_password@2022abcdlk!",
    "SslCertificatePath": "resources/certificates/intranet_server_ca.cer",
    "EnableSslCertificateVerification": true
  }
}

既然是通过证书访问,那么我们得告诉ASP.NET Core这个证书放在什么位置,本文示例是放在这个ASP.NET Core应用目录下的,在实际中建议由运维管理员统一放在一个中心服务器位置,挂载到容器内部可以访问,从而保证证书的安全。如果是通过K8s部署,那么将其添加为一个Secret存放是更好的方式。

CAP中的异构系统集成

顺带说一下,在CAP这个项目中,如果你的项目都是基于它来做事件总线,那么CAP可以正常的Publish和Subscribe消息,但是如果在你使用它之前已经有了许多的Topic Messages,它需要和一些第三方系统进行消息传输,这就会涉及到异构系统的集成。如果我们不做一些配置,CAP是无法正常Subscribe和Consume消息的。

因此,在CAP中,我们需要主动对Message做一些改造,添加传递一些额外信息以便于CAP能够在收到消息时提取到关键特征从而正常运作。否则,你会在启动时收到这样一个错误:The given key "cap-msg-id" is not existed........。

我们只需要在注册CAP组件时添加自定义Headers,确保"cap-msg-id"和"cap-msg-name"两个Header值能够被解析到:

代码语言:javascript
复制
namespace Microsoft.Extensions.DependencyInjection
{
    public static class ApplicationServiceCollectionExtensions
    {
        public static IServiceCollection AddEventBus(this IServiceCollection services, IConfigurationSection configuration)
        {
            services.AddCap(option =>
            {
                option.UseInMemoryStorage();
                option.UseKafka(kfkOption =>
                {
                    kfkOption.Servers = configuration["KafkaBootstrapServers"];
                    if (Convert.ToBoolean(configuration["EnableAuthorization"]))
                    {
                        kfkOption.MainConfig.Add("security.protocol", "sasl_ssl");
                        kfkOption.MainConfig.Add("sasl.mechanism", "PLAIN");
                        kfkOption.MainConfig.Add("sasl.username", configuration["SaslUserName"]);                        
                        kfkOption.MainConfig.Add("sasl.password", configuration["SaslPassword"]);                        
                        kfkOption.MainConfig.Add("ssl.ca.location", configuration["SslCertificatePath"]);                        
                        kfkOption.MainConfig.Add("enable.ssl.certificate.verification", configuration["EnableSslCertificateVerification"]);                        
                    }
                    // 以下为新增自定义Headers配置
                    option.CustomHeaders = e => new List<KeyValuePair<string, string>>
                    {
                        new KeyValuePair<string, string>(DotNetCore.CAP.Messages.Headers.MessageId, SnowflakeId.Default().NextId().ToString()),
                        new KeyValuePair<string, string>(DotNetCore.CAP.Messages.Headers.MessageName, e.Topic)
                    };
                });
                option.SucceedMessageExpiredAfter = 3600 * 24 * Convert.ToInt32(configuration["SuccessMsgExpireDays"]);
            });
            return services;
        }
    }
}

小结

本文介绍了在ASP.NET Core中使用CAP项目通过认证机制安全地使用kafka消息中间件,希望能够对你有所帮助!

参考资料

CAP文档Kafka部分:https://cap.dotnetcore.xyz/user-guide/en/transport/kafka

librdkafka配置项文档:https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

CAP文档Messaging部分:https://cap.dotnetcore.xyz/user-guide/en/cap/messaging

胡夕《Kafka核心技术与实战》之Kafka认证机制用哪家:https://time.geekbang.org/column/article/118347

作者:周旭龙

出处:https://edisonchou.cnblogs.com

本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文链接。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022-12-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 背景
  • Kafka认证机制
  • 通过认证机制使用Kafka
  • CAP中的异构系统集成
  • 小结
  • 参考资料
相关产品与服务
SSL 证书
腾讯云 SSL 证书(SSL Certificates)为您提供 SSL 证书的申请、管理、部署等服务,为您提供一站式 HTTPS 解决方案。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档