前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >KestrelServer详解[3]: 自定义一个迷你版的KestrelServer

KestrelServer详解[3]: 自定义一个迷你版的KestrelServer

作者头像
蒋金楠
发布2022-05-09 13:09:25
4400
发布2022-05-09 13:09:25
举报
文章被收录于专栏:大内老A大内老A

和所有的服务器一样,KestrelServer最终需要解决的是网络传输的问题。在《网络连接的创建》,我们介绍了KestrelServer如何利用连接接听器的建立网络连接,并再次基础上演示了如何直接利用建立的连接接收请求和回复响应。本篇更进一步,我们根据其总体设计,定义了迷你版的KestrelServer让读者看看这个重要的服务器大体是如何实现的。本文提供的示例演示已经同步到《ASP.NET Core 6框架揭秘-实例演示版》)

一、ConnectionDelegate 二、IConnectionBuilder 三、HTTP 1.x/HTTP 2.x V.S. HTTP 3 四、MiniKestrelServer

一、ConnectionDelegate

ASP.NET CORE在“应用”层将针对请求的处理抽象成由中间件构建的管道,实际上KestrelServer面向“传输”层的连接也采用了这样的设计。当代表连接的ConnectionContext上下文创建出来之后,后续的处理将交给由连接中间件构建的管道进行处理。我们可以根据需要注册任意的中间件来处理连接,比如可以将并发连结的控制实现在专门的连接中间件中。ASP.NET CORE管道利用RequestDelegate委托来表示请求处理器,连接管道同样定义了如下这个ConnectionDelegate委托。

代码语言:javascript
复制
public delegate Task ConnectionDelegate(ConnectionContext connection);

二、IConnectionBuilder

ASP.NET CORE管道中的中间件体现为一个Func<RequestDelegate, RequestDelegate>委托,连接管道的中间件同样可以利用Func<ConnectionDelegate, ConnectionDelegate>委托来表示。ASP.NET CORE管道中的中间件注册到IApplicationBuilder对象上并利用它将管道构建出来。连接管道依然具有如下这个IConnectionBuilder接口,ConnectionBuilder实现了该接口。

代码语言:javascript
复制
public interface IConnectionBuilder
{
    IServiceProvider ApplicationServices { get; }
    IConnectionBuilder Use(Func<ConnectionDelegate, ConnectionDelegate> middleware);
    ConnectionDelegate Build();
}

public class ConnectionBuilder : IConnectionBuilder
{
    public IServiceProvider ApplicationServices { get; }
    public ConnectionDelegate Build();
    public IConnectionBuilder Use(Func<ConnectionDelegate, ConnectionDelegate> middleware);
}

IConnectionBuilder接口还定义了如下三个扩展方法来注册连接中间件。第一个Use方法使用Func<ConnectionContext, Func<Task>, Task>委托来表示中间件。其余两个方法用来注册管道末端的中间件,这样的中间件本质上就是一个ConnectionDelegate委托,我们可以将其定义成一个派生于ConnectionHandler的类型。

代码语言:javascript
复制
public static class ConnectionBuilderExtensions
{
    public static IConnectionBuilder Use(this IConnectionBuilder connectionBuilder,Func<ConnectionContext, Func<Task>, Task> middleware);
    public static IConnectionBuilder Run(this IConnectionBuilder connectionBuilder,Func<ConnectionContext, Task> middleware);
    public static IConnectionBuilder UseConnectionHandler<TConnectionHandler>(this IConnectionBuilder connectionBuilder) where TConnectionHandler : ConnectionHandler;
}

public abstract class ConnectionHandler
{
    public abstract Task OnConnectedAsync(ConnectionContext connection);
}

三、HTTP 1.x/HTTP 2.x V.S. HTTP 3

KestrelServer针对HTTP 1.X/2和HTTP 3的设计和实现基本上独立的,这一点从监听器的定义就可以看出来。就连接管道来说,基于HTTP 3的多路复用连接通过MultiplexedConnectionContext表示,它也具有“配套”的MultiplexedConnectionDelegate委托和IMultiplexedConnectionBuilder接口。ListenOptions类型同时实现了IConnectionBuilder和IMultiplexedConnectionBuilder接口,意味着我们在注册终结点的时候还可以注册任意中间件。

代码语言:javascript
复制
public delegate Task MultiplexedConnectionDelegate(MultiplexedConnectionContext connection);

public interface IMultiplexedConnectionBuilder
{
    IServiceProvider ApplicationServices { get; }
    IMultiplexedConnectionBuilder Use(Func<MultiplexedConnectionDelegate, MultiplexedConnectionDelegate> middleware);
    MultiplexedConnectionDelegate Build();
}

public class MultiplexedConnectionBuilder : IMultiplexedConnectionBuilder
{
    public IServiceProvider ApplicationServices { get; }
    public IMultiplexedConnectionBuilder Use(Func<MultiplexedConnectionDelegate, MultiplexedConnectionDelegate> middleware);
    public MultiplexedConnectionDelegate Build();
}

public class ListenOptions : IConnectionBuilder, IMultiplexedConnectionBuilder

四、MiniKestrelServer

在了解了KestrelServer的连接管道后,我们来简单模拟一下这种服务器类型的实现,为此我们定义了一个名为MiniKestrelServer的服务器类型。简单起见,MiniKestrelServer只提供针对HTTP 1.1的支持。对于任何一个服务来说,它需要将请求交付给一个IHttpApplication<TContext>对象进行处理,MiniKestrelServer将这项工作实现在如下这个HostedApplication<TContext>类型中。

代码语言:javascript
复制
public class HostedApplication<TContext> : ConnectionHandler where TContext : notnull
{
    private readonly IHttpApplication<TContext> _application;
    public HostedApplication(IHttpApplication<TContext> application) => _application = application;

    public override async Task OnConnectedAsync(ConnectionContext connection)
    {
        var reader = connection!.Transport.Input;
        while (true)
        {
            var result = await reader.ReadAsync();
            using (var body = new MemoryStream())
            {
                var (features, request, response) = CreateFeatures(result, body);
                var closeConnection = request.Headers.TryGetValue("Connection", out var vallue) && vallue == "Close";
                reader.AdvanceTo(result.Buffer.End);

                var context = _application.CreateContext(features);
                Exception? exception = null;
                try
                {
                    await _application.ProcessRequestAsync(context);
                    await ApplyResponseAsync(connection, response, body);
                }
                catch (Exception ex)
                {
                    exception = ex;
                }
                finally
                {
                    _application.DisposeContext(context, exception);
                }
                if (closeConnection)
                {
                    await connection.DisposeAsync();
                    return;
                }
            }
            if (result.IsCompleted)
            {
                break;
            }
        }

        static (IFeatureCollection, IHttpRequestFeature, IHttpResponseFeature) CreateFeatures(ReadResult result, Stream body)
        {
            var handler = new HttpParserHandler();
            var parserHandler = new HttpParser(handler);
            var length = (int)result.Buffer.Length;
            var array = ArrayPool<byte>.Shared.Rent(length);
            try
            {
                result.Buffer.CopyTo(array);
                parserHandler.Execute(new ArraySegment<byte>(array, 0, length));
            }
            finally
            {
                ArrayPool<byte>.Shared.Return(array);
            }
            var bodyFeature = new StreamBodyFeature(body);

            var features = new FeatureCollection();
            var responseFeature = new HttpResponseFeature();
            features.Set<IHttpRequestFeature>(handler.Request);
            features.Set<IHttpResponseFeature>(responseFeature);
            features.Set<IHttpResponseBodyFeature>(bodyFeature);

            return (features, handler.Request, responseFeature);
        }

        static async Task ApplyResponseAsync(ConnectionContext connection, IHttpResponseFeature response, Stream body)
        {
            var builder = new StringBuilder();
            builder.AppendLine($"HTTP/1.1 {response.StatusCode} {response.ReasonPhrase}");
            foreach (var kv in response.Headers)
            {
                builder.AppendLine($"{kv.Key}: {kv.Value}");
            }
            builder.AppendLine($"Content-Length: {body.Length}");
            builder.AppendLine();
            var bytes = Encoding.UTF8.GetBytes(builder.ToString());

            var writer = connection.Transport.Output;
            await writer.WriteAsync(bytes);
            body.Position = 0;
            await body.CopyToAsync(writer);
        }
    }
}

HostedApplication<TContext>是对一个IHttpApplication<TContext>对象的封装。它派生于抽象类ConnectionHandler,重写的OnConnectedAsync方法将针对请求的读取和处理置于一个无限循环中。为了将读取的请求转交给IHostedApplication<TContext>对象进行处理,它需要根据特性集合将TContext上下文创建出来。这里提供的特性集合只包含三种核心的特性,一个是描述请求的HttpRequestFeature特性,它是利用HttpParser解析请求荷载内容得到的。另一个是描述响应的HttpResponseFeature特性,至于提供响应主体的特性由如下所示的StreamBodyFeature对象来表示。这三个特性的创建实现在CreateFeatures方法中。

代码语言:javascript
复制
public class StreamBodyFeature : IHttpResponseBodyFeature
{
    public Stream 	Stream { get; }
    public PipeWriter 	Writer { get; }

    public StreamBodyFeature(Stream stream)
    {
        Stream = stream;
        Writer = PipeWriter.Create(Stream);
    }

    public Task CompleteAsync() => Task.CompletedTask;
    public void DisableBuffering() { }
    public Task SendFileAsync(string path, long offset, long? count,
    CancellationToken cancellationToken = default)=> throw new NotImplementedException();
    public Task StartAsync(CancellationToken cancellationToken = default) => Task.CompletedTask;
}

包含三大特性的集合随后作为参数调用了IHostedApplication<TContext>对象的CreateContext方法将TContext上下文创建出来,此上下文作为参数传入了同一对象的ProcessRequestAsync方法,此时中间件管道接管请求。待中间件管道完成处理后, ApplyResponseAsync方法被调用以完成最终的响应工作。ApplyResponseAsync方法将响应状态从HttpResponseFeature特性中提取并生成首行响应内容(“HTTP/1.1 {StatusCode} {ReasonPhrase}”),然后再从这个特性中将响应报头提取出来并生成相应的文本。响应报文的首行内容和报头文本按照UTF-8编码生成二进制数组后利用ConnectionContext上下文的Transport属性返回的IDuplexPipe对象发送出去后,它再将StreamBodyFeature特性收集到的响应主体输出流“拷贝”到这个IDuplexPipe对象中,进而完成了针对响应主体内容的输出。

如下所示的是MiniKestrelServer类型的完整定义。该类型的构造函数中注入了用于提供配置选项的IOptions<KestrelServerOptions>特性和IConnectionListenerFactory工厂,并且创建了一个ServerAddressesFeature对象并注册到Features属性返回的特性集合中。

代码语言:javascript
复制
public class MiniKestrelServer : IServer
{
    private readonly KestrelServerOptions _options;
    private readonly IConnectionListenerFactory _factory;
    private readonly List<IConnectionListener> _listeners = new();

    public IFeatureCollection Features { get; } = new FeatureCollection();

    public MiniKestrelServer(IOptions<KestrelServerOptions> optionsAccessor, IConnectionListenerFactory factory)
    {
        _factory = factory;
        _options = optionsAccessor.Value;
        Features.Set<IServerAddressesFeature>(new ServerAddressesFeature());
    }

    public void Dispose() => StopAsync(CancellationToken.None).GetAwaiter().GetResult();
    public Task StartAsync<TContext>(IHttpApplication<TContext> application, CancellationToken cancellationToken) where TContext : notnull
    {
        var feature = Features.Get<IServerAddressesFeature>()!;
        IEnumerable<ListenOptions> listenOptions;
        if (feature.PreferHostingUrls)
        {
            listenOptions = BuildListenOptions(feature);
        }
        else
        {
            listenOptions = _options.GetListenOptions();
            if (!listenOptions.Any())
            {
                listenOptions = BuildListenOptions(feature);
            }
        }

        foreach (var options in listenOptions)
        {
            _ = StartAsync(options);
        }
        return Task.CompletedTask;

        async Task StartAsync(ListenOptions litenOptions)
        {
            var listener = await _factory.BindAsync(litenOptions.EndPoint,cancellationToken);
            _listeners.Add(listener!);
            var hostedApplication = new HostedApplication<TContext>(application);
            var pipeline = litenOptions.Use(next => context => hostedApplication.OnConnectedAsync(context)).Build();
            while (true)
            {
                var connection = await listener.AcceptAsync();
                if (connection != null)
                {
                    _ = pipeline(connection);
                }
            }
        }

        IEnumerable<ListenOptions> BuildListenOptions(IServerAddressesFeature feature)
        {
            var options = new KestrelServerOptions();
            foreach (var address in feature.Addresses)
            {
                var url = new Uri(address);
                if (string.Compare("localhost", url.Host, true) == 0)
                {
                    options.ListenLocalhost(url.Port);
                }
                else
                {
                    options.Listen(IPAddress.Parse(url.Host), url.Port);
                }

            }
            return options.GetListenOptions();
        }
    }

    public Task StopAsync(CancellationToken cancellationToken) => Task.WhenAll(_listeners.Select(it => it.DisposeAsync().AsTask()));
}

实现的StartAsync<TContext>方法先将IServerAddressesFeature特性提取出来,并利用其PreferHostingUrls属性决定应该使用直接注册到KestrelOptions配置选项上的终结点还是使用注册在该特定上的监听地址。如果使用后者,注册的监听地址会利用BuildListenOptions方法转换成对应的ListenOptions列表,否则直接从KestrelOptions对象的ListenOptions属性提取所有的ListenOptions列表,由于这是一个内部属性,不得不利用如下这个扩展方法以反射的方式获取这个列表。

代码语言:javascript
复制
public static class KestrelServerOptionsExtensions
{
    public static IEnumerable<ListenOptions> GetListenOptions(this KestrelServerOptions options)
    {
        var property = typeof(KestrelServerOptions).GetProperty("ListenOptions",BindingFlags.NonPublic | BindingFlags.Instance);
        return (IEnumerable<ListenOptions>)property!.GetValue(options)!;
    }
}

对于每一个表示注册终结点的ListenOptions配置选项,StartAsync<TContext>方法利用IConnectionListenerFactory工厂将对应的IConnectionListener监听器创建出来,并绑定到指定的终结点上监听连接请求。表示连接的ConnectionContext上下文一旦被创建出来后,该方法便会利用构建的连接管道对它进行处理。在调用ListenOptions配置选项的Build方法构建连接管道前,StartAsync<TContext>方法将HostedApplication<TContext>对象创建出来并作为中间件进行了注册。所以针对连接的处理将被这个HostedApplication<TContext>对象接管。

代码语言:javascript
复制
using App;
using Microsoft.AspNetCore.Hosting.Server;
using Microsoft.Extensions.DependencyInjection.Extensions;

var builder = WebApplication.CreateBuilder();
builder.WebHost.UseKestrel(kestrel => kestrel.ListenLocalhost(5000));
builder.Services.Replace(ServiceDescriptor.Singleton<IServer, MiniKestrelServer>());
var app = builder.Build();
app.Run(context => context.Response.WriteAsync("Hello World!"));
app.Run();

如上所示的演示程序将替换了针对IServer的服务注册,意味着默认的KestrelServer将被替换成自定义的MiniKestrelServer。启动该程序后,由浏览器发送的HTTP请求(不支持HTTPS)同样会被正常处理,并得到如图18-6所示的响应内容。需要强调一下,MiniKestrelServer仅仅用来模拟KestrelServer的实现原理,不要觉得真实的实现会如此简单。

图1 由MiniKestrelServer回复的响应内容

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022-03-30,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、ConnectionDelegate
  • 二、IConnectionBuilder
  • 三、HTTP 1.x/HTTP 2.x V.S. HTTP 3
  • 四、MiniKestrelServer
相关产品与服务
消息队列 TDMQ
消息队列 TDMQ (Tencent Distributed Message Queue)是腾讯基于 Apache Pulsar 自研的一个云原生消息中间件系列,其中包含兼容Pulsar、RabbitMQ、RocketMQ 等协议的消息队列子产品,得益于其底层计算与存储分离的架构,TDMQ 具备良好的弹性伸缩以及故障恢复能力。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档