Azure Cosmos DB实现多主机读写

众多的应用实现全球化部署的时候,希望实现本地应用对本地数据进行访问,那么数据库就需要全球同步,并且多主机写入。传统模型里面实现这点是比较困难的。在同步和性能方面都会受到各种局限,如网络、存储、数据的一致性问题。当cosmos db诞生后,这些问题都可以得到解决。假设一个信息发布,有全球数以百万、千万的用户,数以几十亿的文章。那么这时候这套系统肯定最外面的结局就是全球分布式部署,并且就近访问。下面以Demo方式实现此构架。模型如下。

假设在东南亚、北欧、美西 创建3个站点,承接全球的业务。东南亚的用户写入和读取都在东南亚的数据库进行写入和读取。北欧的同样在北欧读取和写入,美西也在美西进行读取和写入。

Cosmos DB设计

分别在东南亚、北欧、美西创建一个cosmos DB设置为如下配置

创建模型

本文模拟文章发布页面,先创建文章的模型,应用程序如何建立,请参考:https://docs.microsoft.com/zh-cn/azure/cosmos-db/tutorial-develop-sql-api-dotnet

publicclassArticle

{

// Unique ID for Article

[JsonProperty(PropertyName="id")]

publicstringId{get;set; }

publicstringPartitionKey

{

get

{

returnthis.Id;

}

}

// Author of the article

publicstringAuthor{get;set; }

// Category/genre of the article

publicstringCategory{get;set; }

// Tags associated with the article

publicstringTitle{get;set; }

}

controller,这里只模拟了展现和创建,删除写了方法。没有使用

publicclassArticleController:Controller

{

// GET: Article

[ActionName("Index")]

publicasyncTaskIndexAsync()

{

varitems=awaitDocumentDBRepository.GetListAsync();

returnView(items);

}

[ActionName("Create")]

publicasyncTaskCreateAsync()

{

returnView();

}

#pragmawarningrestore1998

[HttpPost]

[ActionName("Create")]

[ValidateAntiForgeryToken]

publicasyncTaskCreateAsync([Bind(Include="Id,PartitionKey,Category ,Title, Author ")]Articleitem)

{

if(ModelState.IsValid)

{

awaitDocumentDBRepository.CreateAsync(item);

returnRedirectToAction("Index");

}

returnView(item);

}

[HttpPost]

[ActionName("Delete")]

[ValidateAntiForgeryToken]

publicasyncTaskDeleteConfirmedAsync([Bind(Include="Id")]stringid)

{

awaitDocumentDBRepository.DeleteAsync(id);

returnRedirectToAction("Index");

}

}

数据访问层

创建一个类DocumentDBRepository.cs

publicstaticclassDocumentDBRepositorywhereT:class

{

privatestaticreadonlystringDatabaseId=ConfigurationManager.AppSettings["database"];

//private static readonly string CollectionIdArticle = "Article";

// private static DocumentClient client;

privatestaticDocumentClientwriteClient;

privatestaticDocumentClientreadClient1;

privatestaticDocumentClientreadClient2;

publicstaticvoidInitialize()

{

// client = new DocumentClient(new Uri(ConfigurationManager.AppSettings["endpoint"]), ConfigurationManager.AppSettings["authKey"]);

ConnectionPolicyClientPolicyAsia=newConnectionPolicy{ConnectionMode=ConnectionMode.Direct,ConnectionProtocol=Protocol.Tcp};

ClientPolicyAsia.PreferredLocations.Add(LocationNames.SoutheastAsia);

ClientPolicyAsia.PreferredLocations.Add(LocationNames.WestUS);

ClientPolicyAsia.PreferredLocations.Add(LocationNames.NorthEurope);

ConnectionPolicyClientPolicyNorthEurope=newConnectionPolicy{ConnectionMode=ConnectionMode.Direct,ConnectionProtocol=Protocol.Tcp};

ClientPolicyNorthEurope.PreferredLocations.Add(LocationNames.NorthEurope);

ClientPolicyNorthEurope.PreferredLocations.Add(LocationNames.WestUS);

ClientPolicyNorthEurope.PreferredLocations.Add(LocationNames.SoutheastAsia);

ConnectionPolicyClientPolicyUS=newConnectionPolicy{ConnectionMode=ConnectionMode.Direct,ConnectionProtocol=Protocol.Tcp};

ClientPolicyUS.PreferredLocations.Add(LocationNames.WestUS);

ClientPolicyUS.PreferredLocations.Add(LocationNames.SoutheastAsia);

ClientPolicyUS.PreferredLocations.Add(LocationNames.NorthEurope);

readClient2=newDocumentClient(

newUri("https://contentdatabase-usa.documents.azure.com:443/"),

"key",

ClientPolicyUS);

writeClient=newDocumentClient(

newUri("https://contentdatabase-asia.documents.azure.com:443"),

"key",

ClientPolicyAsia);

readClient1=newDocumentClient(

newUri("https://contentdatabase-europe.documents.azure.com:443/"),

"key",

ClientPolicyNorthEurope);

CreateDatabaseIfNotExistsAsync().Wait();

CreateCollectionIfNotExistsAsync("Article").Wait();

// CreateCollectionIfNotExistsAsync("Review").Wait();

}

privatestaticasyncTaskCreateDatabaseIfNotExistsAsync()

{

try

{

awaitwriteClient.ReadDatabaseAsync(UriFactory.CreateDatabaseUri(DatabaseId));

}

catch(DocumentClientExceptione)

{

if(e.StatusCode==System.Net.HttpStatusCode.NotFound)

{

awaitwriteClient.CreateDatabaseAsync(newDatabase{Id=DatabaseId});

}

else

{

throw;

}

}

try

{

awaitreadClient1.ReadDatabaseAsync(UriFactory.CreateDatabaseUri(DatabaseId));

}

catch(DocumentClientExceptione)

{

if(e.StatusCode==System.Net.HttpStatusCode.NotFound)

{

awaitreadClient1.CreateDatabaseAsync(newDatabase{Id=DatabaseId});

}

else

{

throw;

}

}

try

{

awaitreadClient2.ReadDatabaseAsync(UriFactory.CreateDatabaseUri(DatabaseId));

}

catch(DocumentClientExceptione)

{

if(e.StatusCode==System.Net.HttpStatusCode.NotFound)

{

awaitreadClient2.CreateDatabaseAsync(newDatabase{Id=DatabaseId});

}

else

{

throw;

}

}

}

privatestaticasyncTaskCreateCollectionIfNotExistsAsync(stringcollectionname)

{

//创建Article

try

{

awaitwriteClient.ReadDocumentCollectionAsync(UriFactory.CreateDocumentCollectionUri(DatabaseId,collectionname));

}

catch(DocumentClientExceptione)

{

if(e.StatusCode==System.Net.HttpStatusCode.NotFound)

{

DocumentCollectionmyCollection=newDocumentCollection();

myCollection.Id=collectionname;

myCollection.PartitionKey.Paths.Add("/id");

awaitwriteClient.CreateDocumentCollectionAsync(

UriFactory.CreateDatabaseUri(DatabaseId),

myCollection,

newRequestOptions{OfferThroughput=1000});

}

else

{

throw;

}

}

try

{

awaitreadClient1.ReadDocumentCollectionAsync(UriFactory.CreateDocumentCollectionUri(DatabaseId,collectionname));

}

catch(DocumentClientExceptione)

{

if(e.StatusCode==System.Net.HttpStatusCode.NotFound)

{

DocumentCollectionmyCollection=newDocumentCollection();

myCollection.Id=collectionname;

myCollection.PartitionKey.Paths.Add("/id");

awaitreadClient1.CreateDocumentCollectionAsync(

UriFactory.CreateDatabaseUri(DatabaseId),

myCollection,

newRequestOptions{OfferThroughput=1000});

}

else

{

throw;

}

}

try

{

awaitreadClient2.ReadDocumentCollectionAsync(UriFactory.CreateDocumentCollectionUri(DatabaseId,collectionname));

}

catch(DocumentClientExceptione)

{

if(e.StatusCode==System.Net.HttpStatusCode.NotFound)

{

DocumentCollectionmyCollection=newDocumentCollection();

myCollection.Id=collectionname;

myCollection.PartitionKey.Paths.Add("/id");

awaitreadClient2.CreateDocumentCollectionAsync(

UriFactory.CreateDatabaseUri(DatabaseId),

myCollection,

newRequestOptions{OfferThroughput=1000});

}

else

{

throw;

}

}

}

publicstaticasyncTaskGetListAsync()

{

//合并三个区域的文章

//typeof(T).ToString();

// T.

IDocumentQuerywritequery=writeClient.CreateDocumentQuery(

UriFactory.CreateDocumentCollectionUri(DatabaseId,typeof(T).ToString().Split('.')[typeof(T).ToString().Split('.').Length-1]))

.AsDocumentQuery();

IDocumentQueryreadquery1=readClient1.CreateDocumentQuery(

UriFactory.CreateDocumentCollectionUri(DatabaseId,typeof(T).ToString().Split('.')[typeof(T).ToString().Split('.').Length-1]))

.AsDocumentQuery();

IDocumentQueryreadquery2=readClient2.CreateDocumentQuery(

UriFactory.CreateDocumentCollectionUri(DatabaseId,typeof(T).ToString().Split('.')[typeof(T).ToString().Split('.').Length-1]))

.AsDocumentQuery();

ListresultsList=newList();

while(writequery.HasMoreResultsreadquery1.HasMoreResultsreadquery2.HasMoreResults)

{

IListresults=newList();

if(writequery.HasMoreResults)

{

results.Add(writequery.ExecuteNextAsync());

}

if(readquery1.HasMoreResults)

{

results.Add(readquery1.ExecuteNextAsync());

}

if(readquery2.HasMoreResults)

{

results.Add(readquery2.ExecuteNextAsync());

}

IListFeedResult=awaitTask.WhenAll(results);

foreach(FeedResponsefeedinFeedResult)

{

resultsList.AddRange(feed);

}

}

returnresultsList;

}

publicstaticasyncTaskCreateAsync(Titem)

{

returnawaitwriteClient.CreateDocumentAsync(UriFactory.CreateDocumentCollectionUri(DatabaseId,typeof(T).ToString().Split('.')[typeof(T).ToString().Split('.').Length-1]),item);

}

publicstaticasyncTaskDeleteAsync(stringid)

{

awaitwriteClient.DeleteDocumentAsync(UriFactory.CreateDocumentUri(DatabaseId,typeof(T).ToString().Split('.')[typeof(T).ToString().Split('.').Length-1],id));

}

}

可以看到上面的代码中,初始化了3个DocumentClient,分布是指向3个区域的cosmos db

创建站点

在对应区域创建相应的站点美西站点美西的初始化的DocumentClient 为

writeClient=newDocumentClient(

newUri("https://contentdatabase-usa.documents.azure.com:443/"),

"key",

ClientPolicyUS);

readClient1=newDocumentClient(

newUri("https://contentdatabase-asia.documents.azure.com:443"),

"key",

ClientPolicyAsia);

readClient2=newDocumentClient(

newUri("https://contentdatabase-europe.documents.azure.com:443/"),

"key",

ClientPolicyNorthEurope);

北欧站点北欧初始化的DocumentClient 为

readClient2=newDocumentClient(

newUri("https://contentdatabase-usa.documents.azure.com:443/"),

"key",

ClientPolicyUS);

readClient1=newDocumentClient(

newUri("https://contentdatabase-asia.documents.azure.com:443"),

"key",

ClientPolicyAsia);

writeClient=newDocumentClient(

newUri("https://contentdatabase-europe.documents.azure.com:443/"),

"key",

ClientPolicyNorthEurope);

东亚站点:东亚初始化的DocumentClient 为

readClient1=newDocumentClient(

newUri("https://contentdatabase-usa.documents.azure.com:443/"),

"key",

ClientPolicyUS);

writeClient=newDocumentClient(

newUri("https://contentdatabase-asia.documents.azure.com:443"),

"key",

ClientPolicyAsia);

readClient2=newDocumentClient(

newUri("https://contentdatabase-europe.documents.azure.com:443/"),

"key",

ClientPolicyNorthEurope);

数据合并

由于是三个cosmos db因此在数据列表中,需要对数据列表进行合并,合并代码是在数据访问层中的代码片段:

publicstaticasyncTaskGetListAsync()

{

//合并三个区域的文章

//typeof(T).ToString();

// T.

IDocumentQuerywritequery=writeClient.CreateDocumentQuery(

UriFactory.CreateDocumentCollectionUri(DatabaseId,typeof(T).ToString().Split('.')[typeof(T).ToString().Split('.').Length-1]))

.AsDocumentQuery();

IDocumentQueryreadquery1=readClient1.CreateDocumentQuery(

UriFactory.CreateDocumentCollectionUri(DatabaseId,typeof(T).ToString().Split('.')[typeof(T).ToString().Split('.').Length-1]))

.AsDocumentQuery();

IDocumentQueryreadquery2=readClient2.CreateDocumentQuery(

UriFactory.CreateDocumentCollectionUri(DatabaseId,typeof(T).ToString().Split('.')[typeof(T).ToString().Split('.').Length-1]))

.AsDocumentQuery();

ListresultsList=newList();

while(writequery.HasMoreResultsreadquery1.HasMoreResultsreadquery2.HasMoreResults)

{

IListresults=newList();

if(writequery.HasMoreResults)

{

results.Add(writequery.ExecuteNextAsync());

}

if(readquery1.HasMoreResults)

{

results.Add(readquery1.ExecuteNextAsync());

}

if(readquery2.HasMoreResults)

{

results.Add(readquery2.ExecuteNextAsync());

}

IListFeedResult=awaitTask.WhenAll(results);

foreach(FeedResponsefeedinFeedResult)

{

resultsList.AddRange(feed);

}

}

returnresultsList;

}

实现效果

访问东南亚站点看到数据为一共有4条数据

在cosmos db中的数据为3条,还有一条在美西的cosmos db中

我们访问欧洲站点,看到结果一样,欧洲的cosmos db中并无数据

当一个站点写入一条数据,其余站点会在毫秒级既可以展示。这样通过选择合适的分区键和静态的基于帐户的分区,可以使用 Azure Cosmos DB 实现多区域本地写入和读取。也即是,在不同区域建立访问的web站点,web站点读取本地的数据和写入到本地。其他区域也是如此,然后建立cosmos db的帐户进行静态的分区,通过cosmos db实现毫秒级的分发,应用层进行数据的合并。这样实现多主机读写、跨区域、全球分布的应用模型。

  • 发表于:
  • 原文链接:http://kuaibao.qq.com/s/20180107G0MU4F00?refer=cp_1026

相关快讯

扫码关注云+社区