前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >WCF实现长连接

WCF实现长连接

作者头像
少羽大怪兽
发布2019-09-11 19:05:27
1.5K0
发布2019-09-11 19:05:27
举报
文章被收录于专栏:架构技术架构技术

由于WCF的机制,连接池会在连接建立一定时间后超时,即使设置了超时时间非常长,也可能被服务端系统主动回收。之前做项目时碰到了这个问题,所以项目上考虑采用长连接,自动管理连接池,当连接超时后,自动重建,保持会话,这样在业务层就不需要再去处理连接超时的问题。具体的思路是,在程序启动时,先将需要使用长连接的连接放到长连接容器中,并设置连接的最大数量,在使用时,轮询使用连接,当使用时捕获到异常时,自动切换到下一个连接,并重建上一个连接。代码如下:

AliveConnection类功能是保持连接,具体执行WCF调用,在检测到连接不可用时进行重建。

代码语言:javascript
复制
class AliveConnection<T> where T : System.ServiceModel.ICommunicationObject, new()
    {
        private string _endpointConfigName = string.Empty;
        private T _instance = default(T);
        /// <summary>
        /// 正在执行其他过程时,设置为正忙。执行完成后闲置
        /// 连接出错后,正在重新连接创建时设置为正忙,解除正忙状态有俩种情况:
        /// 1.第一次重建连接成功后;
        /// 2.在线程中重试成功后;
        /// </summary>
        public bool IsBusy { get; set; }

        internal AliveConnection(string endpointConfigName)
        {
            if (string.IsNullOrEmpty(endpointConfigName.Trim())) throw new ArgumentException("终结点不能配置为空。");
            _endpointConfigName = endpointConfigName;
            //_instance = CreateConnection();
        }

        internal bool Execute(Action<T> expression)
        {
            try
            {
                Open();
                expression(_instance);
                return true;
            }
            catch (System.ServiceModel.CommunicationException e)
            {
                return false;
            }
        }

        internal bool Execute<TResult>(Func<T, TResult> expression,out TResult result)
        {
            result = default(TResult);
            try
            {
                Open();
                result = expression(_instance);
                return true;
            }
            catch (System.ServiceModel.CommunicationException e)
            {
                return false;
            }
        }

        private void Open()
        {
            if (_instance == null)//使用时才创建
            {
                _instance = CreateConnection();
                _instance.Faulted += Faulted;
            }
            if (_instance.State != System.ServiceModel.CommunicationState.Opened)
                _instance.Open();
        }

        private void Faulted(object sender, EventArgs e)
        {
            lock (_instance)
            {
                IsBusy = true;
                //失败后锁住并重新建立连接
                _instance = CreateConnection();
            }
        }

        private T CreateConnection()
        {
            try
            {
                var instance = (T)Activator.CreateInstance(typeof(T), _endpointConfigName);
                IsBusy = false;
                return instance;
            }
            catch (Exception e)
            {
                IsBusy = true;
                RetryWhenFailed();
                throw new Exception("创建连接失败,请检测终结点配置和服务。", e);
            }
        }
        //创建一个线程来不间断重试创建连接
        private void RetryWhenFailed()
        {
            int retryTimes = 0;
            Task.Factory.StartNew(() =>
            {
                while (true)
                {
                    //如果抛出异常,表示创建失败,继续重试,如果睡眠时间大于60秒,则睡眠时间不再增加
                    try
                    {
                        retryTimes++;
                        _instance = (T)Activator.CreateInstance(typeof(T), _endpointConfigName);
                        IsBusy = false;
                        break;
                    }
                    catch
                    {
                        int sleepMillionSeconds = retryTimes * 5 * 1000;
                        sleepMillionSeconds = sleepMillionSeconds > 60 * 1000 ? 60 * 1000 : sleepMillionSeconds;
                        System.Threading.Thread.Sleep(sleepMillionSeconds);
                        continue;
                    }
                }
            });
        }
    }

另外我们需要一个类,来做连接的初始化,并做轮询,并且暴露执行的函数。

代码语言:javascript
复制
 /// <summary>
    /// WCF长连接容器
    /// </summary>
    /// <typeparam name="T">待创建的WCF服务类型</typeparam>
    public class AliveConnectionContainer<T>
        where T : System.ServiceModel.ICommunicationObject, new()
    {
        #region fields
        private List<AliveConnection<T>> _connections = null;//所有连接
        private int _currentConnectionIndex = 0;//当前使用的连接的索引
        #endregion

        #region Octor
        /// <summary>
        /// 通过终结点配置名称,创建长连接。如果终结点数不等于连接数,则轮询跟节点配置列表,最终创建达到连接数的连接。
        /// </summary>
        /// <param name="maxConnection">需要创建的长连接数</param>
        /// <param name="endpointConfigNames">所有的终结点配置名称,对应配置节点里bind的name</param>
        /// <exception>如果终结点配置为空,则抛出异常,如果创建失败,也会抛出异常</exception>
        public AliveConnectionContainer(int maxConnection, params string[] endpointConfigNames)
        {
            _connections = new List<AliveConnection<T>>(maxConnection);

            int tmpIndex = 0;
            for (int index = 0; index < maxConnection; index++)
            {
                if (tmpIndex >= endpointConfigNames.Count()) tmpIndex = 0;
                _connections.Add(new AliveConnection<T>(endpointConfigNames[tmpIndex]));
            }
        }
        #endregion

        #region public method
        /// <summary>
        /// 执行服务调用,会一直轮询执行直到执行成功
        /// </summary>
        /// <param name="expression">需要调用的处理方法</param>
        public void Execute(Action<T> expression)
        {
            Func<bool> executeExpression = () => Instance.Execute(expression);
            Execute(executeExpression);
        }    
        /// <summary>
        /// 执行服务调用,会一直轮询执行直到执行成功
        /// </summary>
        /// <param name="expression">需要调用的处理方法</param>
        public TResult Execute<TResult>(Func<T, TResult> expression)
        {
            TResult result = default(TResult);
            Func<bool> executeExpression = () => Instance.Execute(expression,out result);
            Execute(executeExpression);
            return result;
        }

        private void Execute(Func<bool> expression)
        {
            bool success = false;
            int failedCount = 0;
            try
            {
                while (true)
                {
                    success = expression();
                    if (!success) failedCount++;
                    else break;

                    if (failedCount >= _connections.Count) throw new Exception("没有可用的服务,请检测服务运行状态。");
                }
            }
            catch (Exception e)
            {
                throw new Exception("执行WCF服务调用失败。", e);
            }
        }

        #endregion

        #region private method
        private AliveConnection<T> Instance
        {
            get
            {
                if (_connections == null || _connections.Count == 0) throw new Exception("没有可用的连接,请先设置连接。");
                AliveConnection<T> result;
                while (!(result = GetInstance()).IsBusy) break;//轮询直到找到空闲的连接
                return result;
            }
        }

        private AliveConnection<T> GetInstance()
        {
            if (_currentConnectionIndex >= _connections.Count) _currentConnectionIndex = 0;
            return _connections[_currentConnectionIndex++];
        }
        #endregion
    }

使用静态类,做全局的WCF连接注册和检索使用

代码语言:javascript
复制
/// <summary>
    /// 长连接服务的管理类
    /// </summary>
    /// <example>
    /// AliveConnectionManager.Register<Service>(endpoints,10);
    /// var client = AliveConnectionManager.Resolve<Service>();
    /// List<Movie> movies;
    /// client.Execute(service => movies = service.GetMovies());
    /// </example>
    public static class AliveConnectionManager
    {
        private static Dictionary<Type, object> _container = new Dictionary<Type, object>();
        /// <summary>
        /// 根据输入的终结点列表,在app.config文件中查找对应的终结点,并建立连接
        /// </summary>
        /// <typeparam name="T">要注册的WCF的服务类型</typeparam>
        /// <param name="maxConnection">连接数</param>
        /// <param name="endpointConfigNames">配置的终结点列表</param>
        public static void Register<T>(int maxConnection, params string[] endpointConfigNames)
            where T : System.ServiceModel.ICommunicationObject, new()
        {
            if (_container.ContainsKey(typeof(T))) throw new ArgumentException(string.Format("容器中已经添加过{0}的长连接服务。", typeof(T)));
            _container.Add(typeof(T), new AliveConnectionContainer<T>(maxConnection, endpointConfigNames));
        }

        /// <summary>
        /// 获取类型为T的长连接服务
        /// </summary>
        /// <typeparam name="T">待获取的WCF的服务类型</typeparam>
        /// <returns>对应类型为T的服务</returns>
        public static AliveConnectionContainer<T> Resolve<T>() where T : System.ServiceModel.ICommunicationObject, new()
        {
            Type type = typeof(T);
            if (!_container.ContainsKey(type)) throw new ArgumentException(string.Format("没有找到类型为{0}的长连接服务。", type));
            return _container[type] as AliveConnectionContainer<T>;
        }
    }

服务注册代码

代码语言:javascript
复制
 AliveConnectionManager.Register<Service>(endpoints,10);

调用服务

代码语言:javascript
复制
var client = AliveConnectionManager.Resolve<Service>();
 List<Movie> movies;
client.Execute(service => movies = service.GetMovies());

Service即我们在客户端引入的WCF服务

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2017-12-26 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档