首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >IBMMQ应用实例

IBMMQ应用实例

原创
作者头像
软件架构师Michael
发布2025-07-22 10:37:20
发布2025-07-22 10:37:20
930
举报

IBMMQ作为一种高端的收费MQ产品,主要用于一些对消息时效性和安全性都很高的企业。

MQ的基本原理都基本类似,具体到各个MQ的用法又有所不同。

今天分享下之前用过的IBMMQ的实际案例。由于涉及企业业务保密,本小节不涉及具体企业数据。

定义一个MQ类,用于描述它的具体属性。

代码语言:csharp
复制
public class IBMMQClass
{

    public void CreateQueueManage(string qmName)
    {

        throw new Exception("不会创建!");

    }

    /// <summary>

    /// 创建本地队列

    /// </summary>

    /// <param name="qmName">队列管理器名称</param>

    /// <param name="queueName">本地队列名称</param>

    public void CreateQueue(string qmName, string queueName)
    {

        PCFMessageAgent agent = new PCFMessageAgent(qmName);

        PCFMessage request = new PCFMessage(CMQCFC.MQCMD_CREATE_Q);

        request.AddParameter(MQC.MQCA_Q_NAME, queueName);

        request.AddParameter(MQC.MQIA_Q_TYPE, MQC.MQQT_LOCAL);

        PCFMessage[] response = agent.Send(request);

        agent.Disconnect();

    }

    /// <summary>

    /// 创建远程队列

    /// </summary>

    /// <param name="qmName">队列管理器</param>

    /// <param name="queueName">队列名称</param>

    /// <param name="ycqmName">远程队列管理器名称</param>

    /// <param name="ycqueueName">远程队列名称</param>

    /// <param name="csqueueName">传输队列名称</param>

    public void CreateYuanchengQueue(string qmName, string queueName, string ycqmName, string ycqueueName, string csqueueName)
    {

        PCFMessageAgent agent = new PCFMessageAgent(qmName);

        PCFMessage request = new PCFMessage(CMQCFC.MQCMD_CREATE_Q);

        request.AddParameter(MQC.MQCA_Q_NAME, queueName);

        request.AddParameter(MQC.MQIA_Q_TYPE, MQC.MQQT_REMOTE);

        request.AddParameter(MQC.MQCA_REMOTE_Q_MGR_NAME, ycqmName);

        request.AddParameter(MQC.MQCA_REMOTE_Q_NAME, ycqueueName);

        request.AddParameter(MQC.MQCA_XMIT_Q_NAME, csqueueName);

        PCFMessage[] response = agent.Send(request);

        agent.Disconnect();

    }

    /// <summary>

    /// 创建通道

    /// </summary>

    /// <param name="isSend">True为发送方,false为接收方</param>

    public void CreateChannel(bool isSend)
    {

        PCFMessageAgent agent = new PCFMessageAgent("qm");

        PCFMessage request = new PCFMessage(CMQCFC.MQCMD_CREATE_CHANNEL);

        request.AddParameter(CMQCFC.MQCACH_CHANNEL_NAME, "mama");

        if (!isSend)
        {

            request.AddParameter(CMQCFC.MQIACH_CHANNEL_TYPE, MQC.MQCHT_RECEIVER);

        }

        else
        {

            request.AddParameter(CMQCFC.MQIACH_CHANNEL_TYPE, MQC.MQCHT_SENDER);

            request.AddParameter(CMQCFC.MQCACH_CONNECTION_NAME, "192.168.3.232");

            request.AddParameter(CMQCFC.MQCACH_XMIT_Q_NAME, "bb");

        }

        PCFMessage[] response = agent.Send(request);

        agent.Disconnect();

    }

    /// <summary>

    /// 删除队列

    /// </summary>

    /// <param name="qmName">队列管理器名称</param>

    /// <param name="queueName">队列名称</param>

    public void DeleteQueue(string qmName, string queueName)
    {

        PCFMessageAgent agent = new PCFMessageAgent(qmName);

        PCFMessage request = new PCFMessage(CMQCFC.MQCMD_DELETE_Q);

        request.AddParameter(MQC.MQCA_Q_NAME, queueName);

        PCFMessage[] response = agent.Send(request);

        agent.Disconnect();

    }

    /// <summary>

    /// 发送消息

    /// </summary>

    /// <param name="qmName">队列管理器</param>

    /// <param name="queueName">队列名称</param>

    /// <param name="body">消息内容</param>

    public void PutMessage(string qmName, string queueName, string body)
    {

        MQQueueManager qMgr = new MQQueueManager(qmName);

        MQQueue queue = qMgr.AccessQueue(queueName, MQC.MQOO_OUTPUT|MQC.MQOO_FAIL_IF_QUIESCING);

        try
        {

            MQMessage message = new MQMessage();

            message.WriteString(body);

            message.Format = MQC.MQFMT_STRING;

            queue.Put(message);

        }

        catch { }

        finally
        {

            queue.Close();

        }

    }

    /// <summary>

    /// 获取消息

    /// </summary>

    /// <param name="qmName">队列管理器名称</param>

    /// <param name="queueName">队列名称</param>

    /// <returns>消息内容</returns>

    #region 获取消息  已注释
    //public string GetMessage(string qmName, string queueName)
    //{

    //    MQQueueManager qMgr = new MQQueueManager(qmName);

    //    MQQueue queue = qMgr.AccessQueue(queueName, MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_FAIL_IF_QUIESCING);

    //    try
    //    {

    //        MQMessage message = new MQMessage();

    //        queue.Get(message);

    //        return message.ReadString(message.MessageLength);

    //    }

    //    catch

    //    { }

    //    finally
    //    {

    //        queue.Close();

    //    }

    //} 
    #endregion

    /// <summary>

    /// 获取消息

    /// </summary>

    /// <param name="qmName">队列管理器名称</param>

    /// <param name="queueName">队列名称</param>

    /// <param name="timeInterval">等待时间(毫秒)</param>

    /// <param name="qMgr">队列管理器实体类</param>

    /// <returns>消息内容</returns>

    public string GetMessage(string qmName, string queueName, int timeInterval)
    {

        MQQueueManager qMgr = new MQQueueManager(qmName);

        MQQueue queue = qMgr.AccessQueue(queueName, MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_FAIL_IF_QUIESCING);

        MQGetMessageOptions gmo = new MQGetMessageOptions();

        gmo.Options = MQC.MQGMO_WAIT;

        gmo.WaitInterval = timeInterval;

        gmo.MatchOptions = MQC.MQMO_NONE;

        MQMessage message = new MQMessage();

        queue.Get(message, gmo);

        return message.ReadString(message.MessageLength);

    }

}

使用IBMMQ之前,创建一个MQ管理类是必需的。

代码语言:csharp
复制
 class Management
 {
     string queueName;
     MQQueueManager qMgr;
     MQMessage mqMsg;
     MQQueue queue;
     MQPutMessageOptions putOptions;

     #region   连接队列管理器

     public Management()
     {

     }
     string linkStatus;
     public string LinkToQueueManager()
     {
         string QueueName = "QL_Node";
         queueName = QueueName;

         Environment.SetEnvironmentVariable("MQCCSID", "1381");
         if (MQEnvironment.properties.Count <= 0)
         {
             MQEnvironment.properties.Add(MQC.CCSID_PROPERTY, 1381);
         }
         MQEnvironment.Port = 1416;
         MQEnvironment.Channel = "SYSTEM.DEF.SVRCONN";
         MQEnvironment.Hostname = "192.168.10.8";
         MQEnvironment.UserId = "MUSR_MQADMIN";
         //MQEnvironment.Password = "Aie_1234";
         string qmName = "QM_NODE";
         try
         {
             if (qMgr == null || !qMgr.IsConnected)
             {
                 qMgr = new MQQueueManager(qmName);
             }

             linkStatus = "连接队列管理器:" + "成功!";
         }
         catch (MQException e)
         {

             linkStatus = "连接队列管理器错误: 结束码:" + e.CompletionCode + " 错误原因代码:" + e.ReasonCode;
         }
         catch (Exception e)
         {

             linkStatus = "连接队列管理器错误: 结束码:" + e;
         }
         return linkStatus;
     }
     #endregion

     #region   发送消息

     public void SendMsg(string message)
     {
         int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_INPUT_SHARED | MQC.MQOO_INQUIRE;
         try
         {
             queue = qMgr.AccessQueue(queueName, openOptions);   //尝试打开队列
         }
         catch (MQException e)
         {
             Console.WriteLine("打开队列失败:" + e.Message);
         }
         mqMsg = new MQMessage();
         mqMsg.WriteString(message);
         putOptions = new MQPutMessageOptions();
         try
         {
             queue.Put(mqMsg, putOptions);        //将消息放入消息队列
         }
         catch (MQException mqe)
         {
             Console.WriteLine("发送异常终止:" + mqe.Message);
         }
         finally
         {
             try
             {
                 qMgr.Disconnect();

             }
             catch (MQException e)
             {

             }
         }
     }


     #endregion

     #region   接收消息

     public DataSet receiveMsg()
     {
         int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_INPUT_SHARED | MQC.MQOO_INQUIRE;
         try
         {
             queue = qMgr.AccessQueue(queueName, openOptions);   //尝试打开队列
         }
         catch (MQException e)
         {
             Console.WriteLine("打开队列失败:" + e.Message);
         }
         //从队列管理器中获得消息
         MQGetMessageOptions mqGetMsgOpts;
         mqMsg = new MQMessage();
         mqGetMsgOpts = new MQGetMessageOptions();
         mqGetMsgOpts.WaitInterval = 15000;
         mqGetMsgOpts.Options |= MQC.MQGMO_WAIT;
         try
         {
             int queryDep = queue.CurrentDepth;
             if (queryDep > 0)
             {
                 queue.Get(mqMsg, mqGetMsgOpts);          //获得消息
                 var ds = new DataSet();
                 var table = new DataTable("T_School");
                 table.Columns.Add("ID", typeof(string));
                 table.Columns.Add("SchoolName", typeof(string));
                 table.Columns.Add("BuildDate", typeof(string));
                 table.Columns.Add("Address", typeof(string));
                 ds.Tables.Add(table);
                 string message = mqMsg.ReadString(mqMsg.MessageLength);
                 mqMsg.Format = MQC.MQFMT_XMIT_Q_HEADER;
                 var reader = new StringReader(message);
                 ds.ReadXml(reader, XmlReadMode.Fragment);
                 return ds;
             }

             else
             {
                 return null;
             }
         }
         catch (MQException ex)
         {
             Console.WriteLine("访问队列停止" + ex.InnerException);
             return null;
         }
         finally
         {
             try
             {
                 qMgr.Disconnect();

             }
             catch (MQException e)
             {

             }
         }
     }
     #endregion
 }

【小结】

任何MQ的基本操作,都包括MQ的创建、连接、消息发送、消息接收。

我们学习任何MQ,都只要抓住这几点就够了。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档