Msmq设计文档(赋源代码)

Msmq设计文档

文件状态: [√] 草稿 [  ] 正式发布 [  ] 正在修改

文件标识:

ECI-MSMQ v01

当前版本:

0.5

作    者:

阿新

完成日期:

2005-8-18

1.0文档说明:

1.1文档目的

介绍了MSMQ的基本编程(如存储和接收消息)和基本的管理功能(如创建和删除队列)。虽然使用.Net API来是非常方便和简单的,但是在实际的MSMQ项目中,需要了解消息队列作为架构的概念。通过使用MSMQ,系统会更加松散耦合,因此更加自治(autonomous)。需要注意的是:消息仅仅是消息,而不是内部的业务对象。因此,在设计新的分布式应用程序时,建议遵守面向服务架构(Service-Oriented Architecture)的基本思想:通过显式定义边界、创建自治服务,让MSMQ来负责交互部分。

1.2文档范围

涉及居于MSMQ基础的传输信息的交互开发

1.3读者对象

系统设计人员,开发人员,测试人员

1.4参考文献

Msdn Cnblogs

1.5专业术语

1、“消息”是在两台计算机间传送的数据单位。消息可以非常简单,例如只包含文本字符串;也可以更复杂,可能包含嵌入对象。消息被发送到队列中。“消息队列(MSMQ)”是在消息的传输过程中保存消息的容器。消息队列管理器在将消息从它的源中继到它的目标时充当中间人。队列的主要目的是提供路由并保证消息的传递;如果发送消息时接收者不可用,消息队列会保留消息,直到可以成功地传递它。

2、队列类型(Queue Type)

有两种主要的队列类型:由您或网络中的其他用户创建的队列和系统队列。用户创建的队列可能是以下任何一种队列:

“公共队列”在整个“消息队列”网络中复制,并且有可能由网络连接的所有站点访问。

“专用队列”不在整个网络中发布。相反,它们仅在所驻留的本地计算机上可用。专用队列只能由知道队列的完整路径名或标签的应用程序访问。

“管理队列”包含确认在给定“消息队列”网络中发送的消息回执的消息。指定希望 MessageQueue 组件使用的管理队列(如果有的话)。

“响应队列”包含目标应用程序接收到消息时返回给发送应用程序的响应消息。指定希望 MessageQueue 组件使用的响应队列(如果有的话)。

说明:我们这里用到专用队列;

3、同步和异步通信(Synchronous VS. Asynchronous Communication)

队列通信天生就是异步的,因为将消息发送到队列和从队列中接收消息是在不同的进程中完成的。另外,可以异步执行接收操作,因为要接收消息的人可以对任何给定的队列调用 BeginReceive 方法,然后立即继续其他任务而不用等待答复。这与人们所了解的“同步通信”截然不同。 在同步通信中,请求的发送方在执行其他任务前,必须等待来自预定接收方的响应。发送方等待的时间完全取决于接收方处理请求和发送响应所用的时间

4、同消息队列交互(Interacting with Message Queues)

消息处理和消息为基于服务器的应用程序组件之间的进程间通信提供了强大灵活的机制。同组件间的直接调用相比,它们具有若干优点,其中包括:

稳定性 — 组件失败对消息的影响程度远远小于组件间的直接调用,因为消息存储在队列中并一直留在那里,直到被适当地处理。消息处理同事务处理相似,因为消息处理是有保证的。

消息优先级 — 更紧急或更重要的消息可在相对不重要的消息之前接收,因此可以为关键的应用程序保证足够的响应时间。

脱机能力 — 发送消息时,它们可被发送到临时队列中并一直留在那里,直到被成功地传递。当因任何原因对所需队列的访问不可用时,用户可以继续执行操作。同时,其他操作可以继续进行,如同消息已经得到了处理一样,这是因为网络连接恢复时消息传递是有保证的。

事务性消息处理 — 将多个相关消息耦合为单个事务,确保消息按顺序传递、只传递一次并且可以从它们的目标队列中被成功地检索。如果出现任何错误,将取消整个事务。

安全性 — MessageQueue 组件基于的消息队列技术使用 Windows 安全来保护访问控制,提供审核,并对组件发送和接收的消息进行加密和验证

2类库功能说明

完成将报文以string,stream,xmldocument,dataset的形式发送到指定的消息队列,并接收这些消息还原成报文格式;支持多线程异步方式接收消息;

2.1类库结构

类名称

方法/类型

参数

说明

ECI.MSMQLib

FormatterType

枚举类型

Xml,binary,stream

定义消息序列化类型

MQProfile

Path,transactional…

定义消息队列的基本参数

MsgType

枚举类型

String,stream,dataset,docment

定义消息的类型

MyConvert

ToXmlDoc

dataset

Dataset转化成xmldocuemnt

ToDataSet

xmldocuemnt

xmldocuemnt转化成Dataset

ToStream

Dataset,xmldocument

把DataSet,xmldocuemnt转化成stream

ToString

Stream

将stream转成String

ToString

XmlDocument

将xmldocment转成String

ToString

dataset

将xmldocment转成String

ToBig

Big5

繁体转简体

ToBig5

Big

简体转繁体

MySteam

Read

读取stream中的消息

WriteLog

Save

当msmq发送错误将记录错误信息同时备份下消息内容,发送Mail通知处理人员

MQReceiveDelegate

委托

在采用异步接受时会用到

Sender

Sender

MQProfile

构造函数,初始化消息队列

Send

根据MQProflie的定义发送消息

Lable,Context

发送String;定义消息标签和内容

DataSet,Lable

发送DataSet

Xmldocuemnt,Lable

发送XmlDocuemnt

Stream,Lable

发送Stream

Receiver

Receiver

MQProfile

构造函数,初始化消息队列

Receive

同步接收String

ReceiveXmlDoc

同步接收xmlDcoument

ReceiveDataset

同步接收DataSet

AsynReceive

异步接收消息通过MQReceiveDelegate Receiving获取消息体和标签

AsynReceiveCallBak

异步回调接收消息通过MQReceiveDelegate Receiving获取消息和标签

2.2消息队列的创建

///MQPath = FormatName:DIRECT=TCP:172.20.30.36\Private$\PathName 通过ip方式调用队列

///MQPath = FormatName:Direct=http://localhost/msmq/Private$/PathName 通过http方式调用队列优点可以穿越防火墙的限制;

///MQPath = MachineName\Private$\PathName 通过主机名方式调用队列

///创建队列

System.Messaging.MessageQueue mq = new System.Messaging.MessageQueue(this.MQPath);

///创建消息

System.Messaging.Message msg=new System.Messaging.Message();

//msg.Recoverable=true;

/*

Recoverable 属性指示是否保证消息的传递,即使计算机在消息传递到目标队列的途中崩溃。 如果保证消息的传递,则在途中的每一步都将本地存储消息,直到消息被成功地转发到下一台计算机。将 Recoverable 属性设置为 true 可能会影响吞吐量。 如果消息是事务性的,消息队列会自动将消息视为可恢复的,而与 Recoverable 属性的值无关。

*/

///定义消息的序列化类型

mq.Formatter=new System.Messaging.XmlMessageFormatter(new Type[] {typeof(string)}) ;

msg.Formatter=new System.Messaging.XmlMessageFormatter(new Type[] {typeof(string)}) ;

mq.Formatter=new System.Messaging.XmlMessageFormatter(new Type[] {typeof(DataSet)}) ;

msg.Formatter=new System.Messaging.XmlMessageFormatter(new Type[] {typeof(DataSet)}) ;

///创建一个事务用于对事务性队列的控制

MessageQueueTransaction mqt=new MessageQueueTransaction();

///对消息标签赋值 string

msg.Label=MQlable;

///对消息体赋值 object

msg.Body=Context;

///对以流形式的消息体赋值 stream

msg.BodyStream =stream

try

{

///开始一个发送消息事务

mqt.Begin();

///发送消

mq.Send(msg,mqt);

///成功发送

mqt.Commit();

}

catch(MessageQueueException e)

{

//Log.Save(e.Message);

///回滚整个事物

mqt.Abort();

///将错误信息写入日至

//WriteLog.Save(this.MQPath,"Sender.SendStream(System.IO.Stream stream,string Lable).Transaction",e.Message,MyReadStream.Read(stream));

throw;

}

2.3 读/显示消息

当消息接受后,消息将从队列中删除。可以通过使用MessageQueue.Peek方法来检索消息队列中的第一个消息的复制,保留消息在队列中。不过,这样只能获取的相同的消息。更好的办法是通过foreach来读消息队列中的消息,但不删除队列中的消息。

foreach(System.Messaging.Message message in queue)

{

txtResults.Text += message.Label + Environment.NewLine;

}

2.4 异步接受消息

public void AsynReceiveCallBak()

{

try

{

//this.MessageType=msgType;

MessageQueueTransaction myTransaction = new MessageQueueTransaction();

anycMQ = new MessageQueue(MQpath);

AsyncCallback cb = new AsyncCallback (callback);

IAsyncResult asyncResult=anycMQ.BeginReceive(new TimeSpan(0,0,5,0),DateTime.Now,cb);

//return MQ_OutBox;

}

catch(Exception e)

{

WriteLog.Save(this.MQPath,"Receiver.AsynReceiveCallBak()",e.Message);

throw;

}

}

private void callback(IAsyncResult handle)

{

Message msg=anycMQ.EndReceive(handle);

try

{

MQcontext=msg.BodyStream;

MQlable=msg.Label;

this.Receiving(this.MQLable,this.MQContext);

this.AsynReceiveCallBak();

}catch

{

……..

}

}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏进击的程序猿

php异常处理 之 BooBoo库介绍

本文介绍php开源库BooBoo,是一个处理php异常和错误的开源库,通过简单的分析代码,我们知道了实际项目中怎么正确的设置错误和异常。

10620
来自专栏芋道源码1024

分布式消息队列 RocketMQ 源码分析 —— Message 拉取与消费(上)

摘要: 原创出处 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-first/ 「芋道源码」欢迎...

38160
来自专栏有趣的django

redis的使用 一、简介二、对redis的操作三、RDB和AOF的两种数据持久化机制四、设置redis的连接密码五、python操作redis

     redis是一个key-value存储系统。和Memcached类似,它支持存储的value类型相对更多,包括string(字符串)、list(链表)...

14030
来自专栏数据之美

线上服务 CPU 100%?一键定位 so easy!

背景 经常做后端服务开发的同学,或多或少都遇到过 CPU 负载特别高的问题。尤其是在周末或大半夜,突然群里有人反馈线上机器负载特别高,不熟悉定位流程和思路的同学...

51980
来自专栏ascii0x03的安全笔记

Linux下ls命令显示符号链接权限为777的探索

Linux下ls命令显示符号链接权限为777的探索                                                ——深入ls、...

1.1K50
来自专栏安恒网络空间安全讲武堂

Python编写渗透工具学习笔记一 | 0x07 Python实现键盘记录器

0x07 Python实现键盘记录器 这份代码比较经典,里面的注释也写的很详细,我也就直接放出来给大家一起学习一下。 简单说一说 我们定义了pyhook的hoo...

539100
来自专栏13blog.site

Spring+SpringMVC+MyBatis+easyUI整合基础篇(八)mysql中文查询bug修复

前言   在测试搜索时出现的问题,mysql通过中文查询条件搜索不出数据,但是英文和数字可以搜索到记录,中文无返回记录。本文就是写一下发现问题的过程及解决方法...

34350
来自专栏风中追风

java类的加载过程和类加载器的分析

我们知道,我们写的java代码保存的格式是 .java, java文件被编译后会转换为字节码,字节码可以在任何平台通过java虚拟机来运行,这也是java能够跨...

55680
来自专栏大内老A

WCF技术剖析之二十三:服务实例(Service Instance)生命周期如何控制[下篇]

在[第2篇]中,我们深入剖析了单调(PerCall)模式下WCF对服务实例生命周期的控制,现在我们来讨轮另一种极端的服务实例上下文模式:单例(Single)模式...

21590
来自专栏JavaEdge

操作系统之文件管理

将文件属性从外存拷到内存中打开文件表的一表目中 将其编号返回给用户。 系统可利用该编号到打开文件表中去查找。

376100

扫码关注云+社区

领取腾讯云代金券