首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

基于NetMQ技术实现的数据对象传输方案

一.总体框架

二.代码示例

2.1 传输消息封装相关对象

2.2 消息处理

2.3客户端示例代码

数据传输系统主要是将大量井场数据通过一定的传输协议安全的传输到基地,并存储到数据库。

井场业务数据需要安全、快捷的传到基地,井场一体化数据采集平台采集井场业务数据存储到井场本地数据库中(MySql),数据传输系统根据用户定义策略,经过数据拆分后传输到基地的服务器上,并存储到Oracle数据库中。因为涉及到多个客户端,数据在接收时要识别是哪个客户端传来的数据。通过该系统工作人员可以对井场数据传输进行监控。

数据传输系统的主要功能是:将各个井场采集到的数据经过加密、压缩后传输到服务器,之后再进行解压解密,整理后将数据存储到数据库中。

一.总体框架

图1-1产品框图

基于开源NetMQ请求回应模型(Request-Reply)实现轻量级对象传输解决方案。

NetMQ:ZeroMQ的.Net版本,ZeroMQ简单来说就是局域网内的消息中间件(与MSMQ类似),包括了进程间通讯、点对点通讯、订阅模式通讯等等,底层用更“完美”的Socket实现,ZeroMQ实现了多语言、跨平台、高效率等诸多优势。

具体实现步骤如下:

把业务对象分别转化为数据传输(DTO)对象、领域对象、数据访问对象(DAO),并提供某种机制方便的实现对象之间的相互转换。业务对象与数据库脱离,通过职责库与数据持久层进行交互。DTO对象只包括简单的属性用来进行数据传输,包括跨逻辑层传输和跨进程和网络的远程传输。Domain对象用于实现业务逻辑,包含属性和方案,DAO对象拥有与数据库交互。

传输客户端从客户端依照特定的策略收集需要更新的业务对象,转化成方便传输使用的DTO对象,传输消息管理组件把对象转换成JSON格式的字符串,对字符串形式数据对象进行数据验证、压缩、加密后,按配置进行消息封装、拆分、和打包,发现送给NETMQ Client对象。

NETMQ Client通过设定的网络协议和端口把基于消息形式的数据对象传输到服务器上。

传输服务器上把接收到的消息进行解压、解密、数据完整性验证等,把消息存储到队列中。

服务器上的消息处理服务和数据存储服务对接收到的数据进行处理,把字符串还原成DTO对象。

DTO对象使用服务器上配置的职责库服务(转换DTO对象为DAO对象),调用服务器上配置的数据持久层(IBatis.Net)进行入库操作。

二.代码示例

2.1 传输消息封装相关对象

传输信封对象:

using System;

using System.Collections.Generic;

using System.Linq;

using System.Text;

using System.Threading.Tasks;

namespace ServerApp.Common

{

public class MessageEnvelope

{

public string MetaDataClass { get; set; }

public string RepositoryMetaDataClass { get; set; }

public string Version { get; set; }

public MessageHeader Header { get; set; }

public MessageBody Body { get; set; }

public string CreatedTime { get;set;}

public string UniqueKey { get; set; }

public string MD5 { get; set; }

///

/// 0,add;1,update;2,delete

///

public string UpLoadType { get; set; }

public MessageEnvelope()

{

this.CreatedTime = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff");

this.UniqueKey = System.Guid.NewGuid().ToString("N");

this.Body = new MessageBody();

this.Header = new MessageHeader();

}

}

}

传输标题对象

using System;

using System.Collections.Generic;

using System.Linq;

using System.Text;

namespace ServerApp.Common

{

public class MessageHeader

{

public string EncodingStyle { get; set; }

public string Actor { get; set; }

public string NextActor { get; set; }

public string TockenKey { get; set; }

}

}

传输体对象

using System;

using System.Collections.Generic;

using System.Linq;

using System.Text;

namespace ServerApp.Common

{

public class MessageBody

{

public List Items { get; set; }

}

}

传输失败对象

using System;

using System.Collections.Generic;

using System.Linq;

using System.Text;

using System.Threading.Tasks;

namespace ServerApp.Common

{

public class MessageFault

{

public string FaultCode

public string FaultString

public string FaultActor

}

}

传输客户端发送对象

using NetMQ.Sockets;

using System;

using System.Collections.Generic;

using System.Linq;

using System.Text;

using System.Threading.Tasks;

using NetMQ;

using System.IO;

using System.Runtime.Serialization.Formatters.Binary;

using Newtonsoft.Json;

namespace ServerApp.Common

{

public class ClientSender

{

private IList envelopeList;

private int envelopeMaxCounts = 100;

private MessageHeader messageHeader = new MessageHeader();

private MessageEnvelope messageEnvelope = new MessageEnvelope();

private string serverURL = "";

public event EventHandler NotifyMessage;

public void Notify(string message)

{

if(this.NotifyMessage != null)

{

NotifyMessage(message, EventArgs.Empty);

}

}

public ClientSender(string serverURL)

{

this.serverURL = serverURL;

messageHeader = new MessageHeader();

messageEnvelope = new MessageEnvelope();

envelopeList = new List();

}

public ClientSender(string serverURL,MessageHeader header = null,MessageEnvelope envelope = null)

: this(serverURL)

{

this.messageHeader = header;

this.messageEnvelope = envelope;

}

public void AddNewEnvelope(List addNewList)

{

List

> listGroup = SplitObjectList(addNewList);

foreach (var item in listGroup)

{

MessageEnvelope messageEnvelope = new MessageEnvelope();

messageEnvelope.Header = this.messageHeader;

messageEnvelope.UpLoadType = "0";

messageEnvelope.Body.Items = item;

envelopeList.Add(messageEnvelope);

}

}

public void AddUpdateEnvelope(List updateList)

{

List

> listGroup = SplitObjectList(updateList);

foreach (var item in listGroup)

{

MessageEnvelope messageEnvelope = new MessageEnvelope();

messageEnvelope.Header = this.messageHeader;

messageEnvelope.UpLoadType = "1";

messageEnvelope.Body.Items = item;

envelopeList.Add(messageEnvelope);

}

}

private static List

> SplitObjectList(List updateList)

{

List

> listGroup = new List

>();

int j = 10;

for (int i = 0; i

{

List cList = new List();

cList = updateList.Take(j).Skip(i).ToList();

j += 10;

listGroup.Add(cList);

}

return listGroup;

}

public void Run()

{

//Task.Run(() =>

{

using (var sender = new RequestSocket())

{

sender.Connect("tcp://127.0.0.1:5555");

this.Notify("connect to " + this.serverURL);

Console.WriteLine("Sending tasks to workers");

//RuntimeTypeModel.Default.MetadataTimeoutMilliseconds = 300000;

int taskNumber = 0;

foreach (var envelope in this.envelopeList)

{

taskNumber++;

Console.WriteLine("Workload : ", "dd");

var encoding = System.Text.Encoding.ASCII;

var message = GetEnvelopeContent(envelope);

var msg = new Msg();

byte[] byteArray = System.Text.Encoding.UTF8.GetBytes(message);

msg.InitGC(byteArray, byteArray.Length);

sender.Send(ref msg, false);

msg.Close();

var resultMessage = sender.ReceiveFrameString(System.Text.Encoding.UTF8);

this.Notify(string.Format("process : finished.", taskNumber, resultMessage));

}

}

// });

}

;

}

private string GetEnvelopeContent(MessageEnvelope envelope)

{

string output = JsonConvert.SerializeObject(envelope);

return output;

}

//private string GetContent(int taskNumber)

//{

// MessageEnvelope message = new MessageEnvelope();

// message.Body = new MessageBody();

// message.Header = new MessageHeader();

// AutUserDTO user;

// List users = new List();

// for (var i = 0; i

// {

// user = new AutUserDTO();

// user.Password = "1233";

// user.UserId = System.Guid.NewGuid().ToString("N");

// user.UserName = "newlj" + i.ToString();

// user.PersonName = "lj1";

// user.TelNumber = "1";

// user.UserStatus = "1";

// user.ValMethod = "1";

// user.PositionId = "1";

// user.UserType = "1";

// user.OrgId = "1";

// user.ModifyUserId = "1";

// user.ModifyDate = DateTime.Now;

// user.EmployeeId = "1";

// user.CreateUserId = "1";

// user.AdminFlg = "1";

// user.Desp = "1";

// user.DeleteFlg = "1";

// user.PsdErrCount = 0;

// user.Locked = "1";

// users.Add(user);

// }

// message.Body.Items = users;

// message.Version = "1.0";

// message.MetaDataClass = typeof(AutUserDTO).ToString();

// message.RepositoryMetaDataClass = typeof(IAutUserRepository).ToString();

// // message.CreatedTime = DateTime.Now.ToString();

// string output = JsonConvert.SerializeObject(message);

// //var message = "this is test" + taskNumber.ToString();

// return output;

//}

}

}

但是因为Web Service 的开放性和通用性,为了能够保护信息系统的安全,对Web Service 的安全性提出了很高的要求。Web Service 迫切需要一个完整的安全服务框架,来为上层应用开发提供全面的安全服务。构建Web Service 的安全框架的困难在于:web service 是非常分布式的,并且关键的安全实现和算法都是由不同提供商实现的。将各分散的业务部门和它们原先的异构的安全系统和架构统一集成到Web Service 安全和业务平台上,并且能够以一种信任关系在各部门应用之间共享用户信息、描述和权限是一个摆在面前的巨大挑战。

为什么需要安全的可信的Web Services

与过去十年中客户/服务器和基于Web 的应用一样,XML Web Services 给应用开发和信息系统的构建带来了革命性的影响。通过使用标准协议,如XML、SOAP、WSDL 和UDDI,应用能够更容易的相互通讯,并且更快、更便宜的进行应用集成,供应链集成,实现分布式的服务模型。

XML Web Service 接口是基于XML 和松耦合的。XML 和SOAP 允许任意系统间进行相互通讯,无论它是一个Office XP 桌面还是一个大型主机系统。随着自动化业务流程集成的越来越普及,越来越多各式各样的系统通过Web Service 加入到一个广泛的Web Service 集成环境中去,因此出现了以下一些问题:

非集中的架构

非集中的管理

用异构的技术实现

多个部门间相互连接

多个企业间相互连接

天然的对等的架构

有可能对Internet 开放

上面的每一个问题都是对系统安全的严峻挑战。如何跨越多个异构系统在整个环境中实施一个安全策略?如何为一个不了解安全系统的外部提供商提供安全服务?如何监视和审计跨越多个异构系统的安全活动事件? 要解决上述问题,仅依赖于传统的防火墙和入侵监测系统是不足够的,即使加上了SSL 和VPN 也只是解决了数据在网络中安全传输的问题,并没有解决跨系统的认证和访问授权问题,也没能解决面向Internet 的服务安全问题。要解决这些问题,需要提供一个完整的基于Web Service 的安全和企业应用集成架构。DCI 架构以及产品系列提供了对上述问题的完整解决方案(完整的架构说明请看另文)。

2.2 消息处理

传输消息处理对象接口:

using System;

using System.Collections.Generic;

using System.Linq;

using System.Text;

using System.Threading.Tasks;

namespace ServerApp.Interface

{

public interface IMessageProcessService

{

bool Proccess(string message);

void Run();

}

}

using Newtonsoft.Json;

using RichFit.Drilling.Infrastructure.Interface.DTO;

using ServerApp.Common;

using ServerApp.Interface;

using System;

using System.Collections.Generic;

using System.Linq;

using System.Reflection;

using System.Text;

using System.Threading;

using System.Threading.Tasks;

using Microsoft.Practices.Unity;

using RichFit.Drilling.Infrastructure.Interface;

using RichFit.Drilling.RepositoryService.RepositoryImpl;

using RichFit.Foundation.Service.Repository;

namespace ServerApp.Service

{

public class MessageProcessService : IMessageProcessService

{

private readonly IMessageStorageService messageStorageService;

public MessageProcessService(IMessageStorageService messageStorageService)

{

this.messageStorageService = messageStorageService;

Run();

}

public void Run()

{

Task.Run(() =>

{

while (true)

{

string message = messageStorageService.TryDequeue();

if(message != null)

{

Task.Run(() =>

{

Proccess(message);

});

}

//Thread.Sleep(100);

}

});

}

public bool Proccess(string message)

{

MessageEnvelope deserializedMessage = JsonConvert.DeserializeObject(message);

var container = AppContainer.Container;

Type t = Assembly.GetAssembly(typeof(AutUserDTO)).GetType(deserializedMessage.MetaDataClass);

Type trep = Assembly.GetAssembly(typeof(AutUserDTO)).GetType(deserializedMessage.RepositoryMetaDataClass);

var userR = container.Resolve(trep, null); //container.Resolve();

var rep = userR as AutUserRepository;

var methodinfo = userR.GetType().GetMethod("Add");

(userR as RepositoryBase).UnitOfWork.Begin();

foreach (var item in deserializedMessage.Body.Items)

{

//Task.Run(() =>

//{

var user1 = JsonConvert.DeserializeObject(item.ToString(), t);

AutUserDTO u = user1 as AutUserDTO;

methodinfo.Invoke(userR, new object[] { u });

//rep.Add(u)

// });

}

(userR as RepositoryBase).UnitOfWork.Commit();

Console.WriteLine("processed------------------finished" + deserializedMessage.CreatedTime);

return true;

}

}

}

传输对象存储接口:

using System;

using System.Collections.Generic;

using System.Linq;

using System.Text;

using System.Threading.Tasks;

namespace ServerApp.Interface

{

public interface IMessageStorageService

{

void Save(string message);

string Remove();

string TryDequeue();

List GetList();

}

}

using ServerApp.Interface;

using System;

using System.Collections.Generic;

using System.Linq;

using System.Text;

using System.Threading.Tasks;

using System.Collections.Concurrent;

using ServerApp.Common;

namespace ServerApp.Service

{

public class MessageStorageService : IMessageStorageService

{

internal static ConcurrentQueue messageQueue = new ConcurrentQueue();

public void Save(string message)

{

messageQueue.Enqueue(message);

}

public string TryDequeue()

{

string result;

if (messageQueue.TryDequeue(out result))

{

return result;

}

return null;

}

public string Remove()

{

string result;

if(messageQueue.TryDequeue(out result))

{

return result;

}

return null;

}

public List GetList()

{

return messageQueue.ToList();

}

}

}

2.3客户端示例代码

using Newtonsoft.Json;

using RichFit.Drilling.Infrastructure.Interface.DTO;

using ServerApp.Common;

using System;

using System.Collections.Generic;

using System.Linq;

using System.Reflection;

using System.Text;

using System.Threading.Tasks;

namespace ConsoleApplication1

{

class Program

{

static void Main(string[] args)

{

MessageEnvelope message = new MessageEnvelope();

message.Body = new MessageBody();

message.Header = new MessageHeader();

AutUserDTO user;

List users = new List();

for(var i=0;i

{

user = new AutUserDTO();

user.PersonName = "lj1";

users.Add(user);

}

message.Body.Items = users;

message.Version = "1.0";

message.MetaDataClass = typeof(AutUserDTO).ToString();

string output = JsonConvert.SerializeObject(message);

MessageEnvelope deserializedProduct = JsonConvert.DeserializeObject(output);

Type t = Assembly.GetAssembly(typeof(AutUserDTO)).GetType(message.MetaDataClass);

var user1 = JsonConvert.DeserializeObject(deserializedProduct.Body.Items[0].ToString(), t);

AutUserDTO u = user1 as AutUserDTO;

Console.WriteLine("====== VENTILATOR ======");

ClientSender client1 = new ClientSender("");

List newl = new List();

newl.Add("dddd");

newl.Add("dddd");

newl.Add("dddd");

newl.Add("dddd");

newl.Add("dddd");

client1.AddNewEnvelope(newl);

client1.Run();

Console.ReadLine();

Console.ReadLine();

}

}

}

这就需要各个系统能够按照统一的系统管理标准进行远程管理并提供系统的安全状态信息

  • 发表于:
  • 原文链接http://kuaibao.qq.com/s/20180504G0YSEQ00?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券