IBMMQ作为一种高端的收费MQ产品,主要用于一些对消息时效性和安全性都很高的企业。
MQ的基本原理都基本类似,具体到各个MQ的用法又有所不同。
今天分享下之前用过的IBMMQ的实际案例。由于涉及企业业务保密,本小节不涉及具体企业数据。
定义一个MQ类,用于描述它的具体属性。
public class IBMMQClass
{
public void CreateQueueManage(string qmName)
{
throw new Exception("不会创建!");
}
/// <summary>
/// 创建本地队列
/// </summary>
/// <param name="qmName">队列管理器名称</param>
/// <param name="queueName">本地队列名称</param>
public void CreateQueue(string qmName, string queueName)
{
PCFMessageAgent agent = new PCFMessageAgent(qmName);
PCFMessage request = new PCFMessage(CMQCFC.MQCMD_CREATE_Q);
request.AddParameter(MQC.MQCA_Q_NAME, queueName);
request.AddParameter(MQC.MQIA_Q_TYPE, MQC.MQQT_LOCAL);
PCFMessage[] response = agent.Send(request);
agent.Disconnect();
}
/// <summary>
/// 创建远程队列
/// </summary>
/// <param name="qmName">队列管理器</param>
/// <param name="queueName">队列名称</param>
/// <param name="ycqmName">远程队列管理器名称</param>
/// <param name="ycqueueName">远程队列名称</param>
/// <param name="csqueueName">传输队列名称</param>
public void CreateYuanchengQueue(string qmName, string queueName, string ycqmName, string ycqueueName, string csqueueName)
{
PCFMessageAgent agent = new PCFMessageAgent(qmName);
PCFMessage request = new PCFMessage(CMQCFC.MQCMD_CREATE_Q);
request.AddParameter(MQC.MQCA_Q_NAME, queueName);
request.AddParameter(MQC.MQIA_Q_TYPE, MQC.MQQT_REMOTE);
request.AddParameter(MQC.MQCA_REMOTE_Q_MGR_NAME, ycqmName);
request.AddParameter(MQC.MQCA_REMOTE_Q_NAME, ycqueueName);
request.AddParameter(MQC.MQCA_XMIT_Q_NAME, csqueueName);
PCFMessage[] response = agent.Send(request);
agent.Disconnect();
}
/// <summary>
/// 创建通道
/// </summary>
/// <param name="isSend">True为发送方,false为接收方</param>
public void CreateChannel(bool isSend)
{
PCFMessageAgent agent = new PCFMessageAgent("qm");
PCFMessage request = new PCFMessage(CMQCFC.MQCMD_CREATE_CHANNEL);
request.AddParameter(CMQCFC.MQCACH_CHANNEL_NAME, "mama");
if (!isSend)
{
request.AddParameter(CMQCFC.MQIACH_CHANNEL_TYPE, MQC.MQCHT_RECEIVER);
}
else
{
request.AddParameter(CMQCFC.MQIACH_CHANNEL_TYPE, MQC.MQCHT_SENDER);
request.AddParameter(CMQCFC.MQCACH_CONNECTION_NAME, "192.168.3.232");
request.AddParameter(CMQCFC.MQCACH_XMIT_Q_NAME, "bb");
}
PCFMessage[] response = agent.Send(request);
agent.Disconnect();
}
/// <summary>
/// 删除队列
/// </summary>
/// <param name="qmName">队列管理器名称</param>
/// <param name="queueName">队列名称</param>
public void DeleteQueue(string qmName, string queueName)
{
PCFMessageAgent agent = new PCFMessageAgent(qmName);
PCFMessage request = new PCFMessage(CMQCFC.MQCMD_DELETE_Q);
request.AddParameter(MQC.MQCA_Q_NAME, queueName);
PCFMessage[] response = agent.Send(request);
agent.Disconnect();
}
/// <summary>
/// 发送消息
/// </summary>
/// <param name="qmName">队列管理器</param>
/// <param name="queueName">队列名称</param>
/// <param name="body">消息内容</param>
public void PutMessage(string qmName, string queueName, string body)
{
MQQueueManager qMgr = new MQQueueManager(qmName);
MQQueue queue = qMgr.AccessQueue(queueName, MQC.MQOO_OUTPUT|MQC.MQOO_FAIL_IF_QUIESCING);
try
{
MQMessage message = new MQMessage();
message.WriteString(body);
message.Format = MQC.MQFMT_STRING;
queue.Put(message);
}
catch { }
finally
{
queue.Close();
}
}
/// <summary>
/// 获取消息
/// </summary>
/// <param name="qmName">队列管理器名称</param>
/// <param name="queueName">队列名称</param>
/// <returns>消息内容</returns>
#region 获取消息 已注释
//public string GetMessage(string qmName, string queueName)
//{
// MQQueueManager qMgr = new MQQueueManager(qmName);
// MQQueue queue = qMgr.AccessQueue(queueName, MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_FAIL_IF_QUIESCING);
// try
// {
// MQMessage message = new MQMessage();
// queue.Get(message);
// return message.ReadString(message.MessageLength);
// }
// catch
// { }
// finally
// {
// queue.Close();
// }
//}
#endregion
/// <summary>
/// 获取消息
/// </summary>
/// <param name="qmName">队列管理器名称</param>
/// <param name="queueName">队列名称</param>
/// <param name="timeInterval">等待时间(毫秒)</param>
/// <param name="qMgr">队列管理器实体类</param>
/// <returns>消息内容</returns>
public string GetMessage(string qmName, string queueName, int timeInterval)
{
MQQueueManager qMgr = new MQQueueManager(qmName);
MQQueue queue = qMgr.AccessQueue(queueName, MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_FAIL_IF_QUIESCING);
MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.Options = MQC.MQGMO_WAIT;
gmo.WaitInterval = timeInterval;
gmo.MatchOptions = MQC.MQMO_NONE;
MQMessage message = new MQMessage();
queue.Get(message, gmo);
return message.ReadString(message.MessageLength);
}
}
使用IBMMQ之前,创建一个MQ管理类是必需的。
class Management
{
string queueName;
MQQueueManager qMgr;
MQMessage mqMsg;
MQQueue queue;
MQPutMessageOptions putOptions;
#region 连接队列管理器
public Management()
{
}
string linkStatus;
public string LinkToQueueManager()
{
string QueueName = "QL_Node";
queueName = QueueName;
Environment.SetEnvironmentVariable("MQCCSID", "1381");
if (MQEnvironment.properties.Count <= 0)
{
MQEnvironment.properties.Add(MQC.CCSID_PROPERTY, 1381);
}
MQEnvironment.Port = 1416;
MQEnvironment.Channel = "SYSTEM.DEF.SVRCONN";
MQEnvironment.Hostname = "192.168.10.8";
MQEnvironment.UserId = "MUSR_MQADMIN";
//MQEnvironment.Password = "Aie_1234";
string qmName = "QM_NODE";
try
{
if (qMgr == null || !qMgr.IsConnected)
{
qMgr = new MQQueueManager(qmName);
}
linkStatus = "连接队列管理器:" + "成功!";
}
catch (MQException e)
{
linkStatus = "连接队列管理器错误: 结束码:" + e.CompletionCode + " 错误原因代码:" + e.ReasonCode;
}
catch (Exception e)
{
linkStatus = "连接队列管理器错误: 结束码:" + e;
}
return linkStatus;
}
#endregion
#region 发送消息
public void SendMsg(string message)
{
int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_INPUT_SHARED | MQC.MQOO_INQUIRE;
try
{
queue = qMgr.AccessQueue(queueName, openOptions); //尝试打开队列
}
catch (MQException e)
{
Console.WriteLine("打开队列失败:" + e.Message);
}
mqMsg = new MQMessage();
mqMsg.WriteString(message);
putOptions = new MQPutMessageOptions();
try
{
queue.Put(mqMsg, putOptions); //将消息放入消息队列
}
catch (MQException mqe)
{
Console.WriteLine("发送异常终止:" + mqe.Message);
}
finally
{
try
{
qMgr.Disconnect();
}
catch (MQException e)
{
}
}
}
#endregion
#region 接收消息
public DataSet receiveMsg()
{
int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_INPUT_SHARED | MQC.MQOO_INQUIRE;
try
{
queue = qMgr.AccessQueue(queueName, openOptions); //尝试打开队列
}
catch (MQException e)
{
Console.WriteLine("打开队列失败:" + e.Message);
}
//从队列管理器中获得消息
MQGetMessageOptions mqGetMsgOpts;
mqMsg = new MQMessage();
mqGetMsgOpts = new MQGetMessageOptions();
mqGetMsgOpts.WaitInterval = 15000;
mqGetMsgOpts.Options |= MQC.MQGMO_WAIT;
try
{
int queryDep = queue.CurrentDepth;
if (queryDep > 0)
{
queue.Get(mqMsg, mqGetMsgOpts); //获得消息
var ds = new DataSet();
var table = new DataTable("T_School");
table.Columns.Add("ID", typeof(string));
table.Columns.Add("SchoolName", typeof(string));
table.Columns.Add("BuildDate", typeof(string));
table.Columns.Add("Address", typeof(string));
ds.Tables.Add(table);
string message = mqMsg.ReadString(mqMsg.MessageLength);
mqMsg.Format = MQC.MQFMT_XMIT_Q_HEADER;
var reader = new StringReader(message);
ds.ReadXml(reader, XmlReadMode.Fragment);
return ds;
}
else
{
return null;
}
}
catch (MQException ex)
{
Console.WriteLine("访问队列停止" + ex.InnerException);
return null;
}
finally
{
try
{
qMgr.Disconnect();
}
catch (MQException e)
{
}
}
}
#endregion
}
【小结】
任何MQ的基本操作,都包括MQ的创建、连接、消息发送、消息接收。
我们学习任何MQ,都只要抓住这几点就够了。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。