Biztalk Server zip unzip pipeline component Development
最近有个B2B的项目涉及和其他合作伙伴(partner)作数据传输,我们这边使用的开发平台(platform)是Biztalk Server 2006,数据传输管道(channel)采用window server 2003的MSMQ,但是由于MSMQ本身存在单个消息有4M限制的问题,虽软Biztalk Server 2006自带的MSMQ Adapter已经对大消息(Large Data)的支持,提供了[Support segmentation] if true , message larger than 4095KB(approximately 4MB) will be segmented,说明当发送单个报文实例(Instance)超过4MB的时候可以在发送至MSMQ对列的时候进行分割成几个小的消息发送,这些被分割的消息之间通过Message. CorrelationId进行关联具体的做法可以参考《MSMQ消息大于4MB限制的解决办法》采用对消息进行分割的做法会对接收消息需要作特定的判断,相对于对报文压缩来得比较简单;
下面介绍一下如何通过对Biztalk Pipeline的二次开发实现对报文进行压缩/解压得实现;
功能描述:
1/将Biztalk 流程(Orchestration)出来的消息在发送端口通过加载pipeline组件实现将消息以zip的方式进行压缩(zip可以对Xml,txt文档的压缩比达到10:1)
2/将接收的zip文件(支持包含多个文件批处理(batch))进行压缩后进入Biztalk流程(Orchestration)处理;
具体实现:
要实现对Biztalk Pipeline的开发对如下接口[Microsoft.BizTalk.Component.Interop.IComponent,IBaseComponent, IPersistPropertyBag, IComponentUI]做实现,好在现在网上提供pipeline component wizrad:http://www.gotdotnet.com/Workspaces/Workspace.aspx?id=1d4f7d6b-7d27-4f05-a8ee-48cfcd5abf4a 可以下载到pipeline开发向导
实现对文件进行压缩/解压需要的[ICSharpCode.SharpZipLib.dll]目前版本0.85相当稳定;下载地址:http://www.icsharpcode.net/OpenSource/SharpZipLib/ 具体方法请查看版本
通过安装pipeline component wizrad之后就可以在vs.net中创建你的pipeline component组件了。CategoryTypes表示该组件可以加载到pipline的什么位置CategoryTypes.CATID_Any表示任何位置都可以放;
[ComponentCategory(CategoryTypes.CATID_PipelineComponent)]
[System.Runtime.InteropServices.Guid("62656b9b-7d69-407d-b71f-d3c0415c82af")]
[ComponentCategory(CategoryTypes.CATID_DisassemblingParser)]
public class UnzipDisassemblerComponent : Microsoft.BizTalk.Component.Interop.IDisassemblerComponent, IBaseComponent, IPersistPropertyBag, IComponentUI
{
private System.Resources.ResourceManager resourceManager = new System.Resources.ResourceManager("Execution.BizTalk.Common.Pipelines.UnzipDisassemblerPipeline", Assembly.GetExecutingAssembly());
#region IBaseComponent members
下面是对Biztalk消息进行解压/压缩的代码实现。
/// <summary>解压
/// called by the messaging engine when a new message arrives
/// </summary>
/// <param name="pc">the pipeline context</param>
/// <param name="inmsg">the actual message</param>
public void Disassemble(Microsoft.BizTalk.Component.Interop.IPipelineContext pc, Microsoft.BizTalk.Message.Interop.IBaseMessage inmsg)
{
Stream strmZipFile;
IBaseMessagePart msgPart;
msgPart = inmsg.BodyPart;
strmZipFile = msgPart.GetOriginalDataStream();
ZipInputStream oZipStream = new ZipInputStream(strmZipFile);
if (!string.IsNullOrEmpty(mPassword))
oZipStream.Password = mPassword;
try
{
ZipEntry sEntry = oZipStream.GetNextEntry();
while (sEntry != null )
{
if (sEntry.IsDirectory)
{
sEntry = oZipStream.GetNextEntry();
continue;
}
MemoryStream strmMem = new MemoryStream();
byte[] buffer = new byte[];
int count = ;
while ((count = oZipStream.Read(buffer, , buffer.Length)) != )
strmMem.Write(buffer, , count);
strmMem.Seek(, SeekOrigin.Begin);
msgPart.Data = strmMem;
IBaseMessage outMsg;
outMsg = pc.GetMessageFactory().CreateMessage();
outMsg.AddPart("Body", pc.GetMessageFactory().CreateMessagePart(), true);
outMsg.BodyPart.Data = strmMem;
for (int iProp = ; iProp < inmsg.Context.CountProperties; iProp++)
{
string strName;
string strNSpace;
object val = inmsg.Context.ReadAt(iProp, out strName, out strNSpace);
// If the property has been promoted, respect the settings
if (inmsg.Context.IsPromoted(strName, strNSpace))
outMsg.Context.Promote(strName, strNSpace, val);
else
outMsg.Context.Write(strName, strNSpace, val);
}
if (this.Namespace != null && this.RootElementName != null)
{
string messageType = string.Format("{0}#{1}", this.Namespace, this.RootElementName);
outMsg.Context.Promote("MessageType", "http://schemas.microsoft.com/BizTalk/2003/system-properties", messageType);
}
_msgs.Enqueue(outMsg);
sEntry = oZipStream.GetNextEntry();
}
}
catch (Exception ex)
{
Debug.Write(ex.Message + Environment.NewLine + ex.StackTrace);
throw;
}
}
//压缩消息
/// <summary>
/// Implements IComponent.Execute method.
/// </summary>
/// <param name="pc">Pipeline context</param>
/// <param name="inmsg">Input message</param>
/// <returns>Original input message</returns>
/// <remarks>
/// IComponent.Execute method is used to initiate
/// the processing of the message in this pipeline component.
/// </remarks>
public Microsoft.BizTalk.Message.Interop.IBaseMessage Execute(
Microsoft.BizTalk.Component.Interop.IPipelineContext pc,
Microsoft.BizTalk.Message.Interop.IBaseMessage inmsg)
{
//
// TODO: implement component logic
//string[] filenames = Directory.GetFiles(args[0]);
byte[] buffer = new byte[];
string fileName = string.Empty;
Stream strmZipFile=new MemoryStream();
Stream strmOriginFile;
Stream strmZip; ;
IBaseMessagePart msgPart;
msgPart = inmsg.BodyPart;
strmOriginFile = msgPart.GetOriginalDataStream();
try
{
using (ZipOutputStream s = new ZipOutputStream(strmZipFile))
{
s.SetLevel(); // 0 - store only to 9 - means best compression
//fileName = Guid.NewGuid().ToString() + "." + this.extension;
fileName = inmsg.Context.Read("ReceivedFileName", "http://schemas.microsoft.com/BizTalk/2003/" + messageContext.InboundTransportType.ToLower() + "-properties").ToString();
s.Password = this.password;
ZipEntry entry = new ZipEntry(fileName);
s.PutNextEntry(entry);
StreamUtils.Copy(strmOriginFile, s, buffer);
s.Finish();
strmZip = new MemoryStream();
strmZipFile.Seek(, SeekOrigin.Begin);
StreamUtils.Copy(strmZipFile, strmZip, buffer);
strmZip.Seek(, SeekOrigin.Begin);
s.Close();
}
msgPart.Data = strmZip;
pc.ResourceTracker.AddResource(strmZip);
// this way, it's a passthrough pipeline component
return inmsg;
}
catch (Exception e)
{
Debug.WriteLine("[zipAssember] " + e.Message);
throw e;
}
}