Akka(40): Http:Marshalling reviewed - 传输数据序列化重温

   上篇我们讨论了Akka-http的文件交换。由于文件内容编码和传输线上数据表达型式皆为bytes,所以可以直接把文件内容存进HttpEntity中进行传递。那么对于在内存里自定义的高级数据类型则应该需要首先进行byte转换后才能放入HttpEntity中了。高级数据类型与byte之间的相互转换就是marshalling和unmarshalling过程了。这个我们在前几篇讨论里提及过,在本篇再重温加强印象。因为我们的主要目的是实现数据库表行的交换,所以应该把焦点放在 T <-> MessageEntity这样的转换上。在Akka-http中T->MessageEntity转换是通过Marshaller[T,MessageEntity]实现的,Marshaller类型定义如下:

sealed abstract class Marshaller[-A, +B] {

  def apply(value: A)(implicit ec: ExecutionContext): Future[List[Marshalling[B]]]
...
}
object Marshaller
  extends GenericMarshallers
  with PredefinedToEntityMarshallers
  with PredefinedToResponseMarshallers
  with PredefinedToRequestMarshallers {

  /**
   * Creates a [[Marshaller]] from the given function.
   */
  def apply[A, B](f: ExecutionContext ⇒ A ⇒ Future[List[Marshalling[B]]]): Marshaller[A, B] =
    new Marshaller[A, B] {
      def apply(value: A)(implicit ec: ExecutionContext) =
        try f(ec)(value)
        catch { case NonFatal(e) ⇒ FastFuture.failed(e) }
    }
...

这个类型包嵌了个类型转换函数:A => Future[List[Marshalling[B]]],最终目的是A=>B的转换。增加了一层Marshalling类型是为了更方便对B类型目标进行筛选、修改操作。我们看看类型Marshal的转换函数to[???]就明白了:

class Marshal[A](val value: A) {
  /**
   * Marshals `value` using the first available [[Marshalling]] for `A` and `B` provided by the given [[Marshaller]].
   * If the marshalling is flexible with regard to the used charset `UTF-8` is chosen.
   */
  def to[B](implicit m: Marshaller[A, B], ec: ExecutionContext): Future[B] =
    m(value).fast.map {
      _.head match {
        case Marshalling.WithFixedContentType(_, marshal) ⇒ marshal()
        case Marshalling.WithOpenCharset(_, marshal)      ⇒ marshal(HttpCharsets.`UTF-8`)
        case Marshalling.Opaque(marshal)                  ⇒ marshal()
      }
    }

首先,在可视域内需要Marshaller[A,B]隐式实例存在,Marshalling提供筛选,最后Marshaller的包嵌函数marshal进行了具体的类型转换。Akka-http提供了基础数据类型到MessageEntity转换的隐式实例,如下:

trait PredefinedToEntityMarshallers extends MultipartMarshallers {

  implicit val ByteArrayMarshaller: ToEntityMarshaller[Array[Byte]] = byteArrayMarshaller(`application/octet-stream`)
  def byteArrayMarshaller(contentType: ContentType): ToEntityMarshaller[Array[Byte]] =
    Marshaller.withFixedContentType(contentType) { bytes ⇒ HttpEntity(contentType, bytes) }

  implicit val ByteStringMarshaller: ToEntityMarshaller[ByteString] = byteStringMarshaller(`application/octet-stream`)
  def byteStringMarshaller(contentType: ContentType): ToEntityMarshaller[ByteString] =
    Marshaller.withFixedContentType(contentType) { bytes ⇒ HttpEntity(contentType, bytes) }

  implicit val CharArrayMarshaller: ToEntityMarshaller[Array[Char]] = charArrayMarshaller(`text/plain`)
  def charArrayMarshaller(mediaType: MediaType.WithOpenCharset): ToEntityMarshaller[Array[Char]] =
    Marshaller.withOpenCharset(mediaType) { (value, charset) ⇒ marshalCharArray(value, mediaType withCharset charset) }
  def charArrayMarshaller(mediaType: MediaType.WithFixedCharset): ToEntityMarshaller[Array[Char]] =
    Marshaller.withFixedContentType(mediaType) { value ⇒ marshalCharArray(value, mediaType) }

  private def marshalCharArray(value: Array[Char], contentType: ContentType.NonBinary): HttpEntity.Strict =
    if (value.length > 0) {
      val charBuffer = CharBuffer.wrap(value)
      val byteBuffer = contentType.charset.nioCharset.encode(charBuffer)
      val array = new Array[Byte](byteBuffer.remaining())
      byteBuffer.get(array)
      HttpEntity(contentType, array)
    } else HttpEntity.Empty

  implicit val DoneMarshaller: ToEntityMarshaller[akka.Done] =
    Marshaller.withFixedContentType(`text/plain(UTF-8)`) { done ⇒
      HttpEntity(`text/plain(UTF-8)`, "")
    }

  implicit val StringMarshaller: ToEntityMarshaller[String] = stringMarshaller(`text/plain`)
  def stringMarshaller(mediaType: MediaType.WithOpenCharset): ToEntityMarshaller[String] =
    Marshaller.withOpenCharset(mediaType) { (s, cs) ⇒ HttpEntity(mediaType withCharset cs, s) }
  def stringMarshaller(mediaType: MediaType.WithFixedCharset): ToEntityMarshaller[String] =
    Marshaller.withFixedContentType(mediaType) { s ⇒ HttpEntity(mediaType, s) }

  implicit val FormDataMarshaller: ToEntityMarshaller[FormData] =
    Marshaller.withOpenCharset(`application/x-www-form-urlencoded`) { _ toEntity _ }

  implicit val MessageEntityMarshaller: ToEntityMarshaller[MessageEntity] =
    Marshaller strict { value ⇒ Marshalling.WithFixedContentType(value.contentType, () ⇒ value) }
}

object PredefinedToEntityMarshallers extends PredefinedToEntityMarshallers

注意:上面的这些转换函数类型都是ToEntityMarshaller,这是一个类型别称,实际上就是Marshaller[T,MessageEntity]:

  type ToEntityMarshaller[T] = Marshaller[T, MessageEntity]

从源代码上看这些Marshaller的隐式实例都提供了转换函数 T=>HttpEntity。这样就可以在实际类型转换时只要能找到对应Marshaller的隐式实例就可以调用它的转换函数进行转换操作了。

现在,只要通过import把这些隐式实例导入可视域内就可以这样调用Marshal了:

import akka.http.scaladsl.marshalling.Marshal
  val aChars = Array[Char]('h','e','l','l','o')
  val aBytes = Array[Byte](0,1,2,3)
  val strHello = Marshal("Hello").to[MessageEntity]
  val chHello = Marshal(aChars).to[MessageEntity]
  val bt0123 = Marshal(aBytes).to[MessageEntity]

那么对于结构复杂的自定义类型又如何呢?如下:

  case class Person(id: Int, name: String)
  val john = Person(12,"John")
  val futP = Marshal(john).to[MessageEntity]

这个futP无法通过编译,报错如下:

Error:(17, 30) could not find implicit value for parameter m: akka.http.scaladsl.marshalling.Marshaller[MarshalDemo.Person,akka.http.scaladsl.model.MessageEntity]
  val futP = Marshal(john).to[MessageEntity]

这是因为编译器compiler无法找到Marshaller[Person,MessageEntity]这个类型的隐式实例。现在我只为Person自定义一个Marshaller隐式实例:

  implicit val PersonMarshaller: ToEntityMarshaller[Person] = personMarshaller(`text/plain`)
  def personMarshaller(mediaType: MediaType.WithOpenCharset): ToEntityMarshaller[Person] =
    Marshaller.withOpenCharset(mediaType) { (p, ps) ⇒ HttpEntity(mediaType withCharset ps, ByteString(p.toString)) }
  def personMarshaller(mediaType: MediaType.WithFixedCharset): ToEntityMarshaller[Person] =
    Marshaller.withFixedContentType(mediaType) { p ⇒ HttpEntity(mediaType, ByteString(p.toString)) }

这个Marshaller代表的转换过程是:Person -> Person.String -> ByteString。中间多了层用String函数来描述Person类型。这只是我个人的转换方式,所以反向转换Unmarshalling也必须按照我的方式把String转回Person。实际上这种转换的开放标准之一就是Json,大家共同按照标准要求的表达形式进行转换操作就能达到同样的目的了。

Akka-http自带的Json解决方案用的是Spray-Json,下面我们就用Spray-Json来实现转换:

import akka.http.scaladsl.marshallers.sprayjson._
import spray.json._

trait Formats extends SprayJsonSupport with DefaultJsonProtocol
object Converters extends Formats {
  case class Person(id: Int, name: String)
  implicit val userFormat = jsonFormat2(Person.apply)
}
...
  import Converters._
  val john = Person(12,"John")
  val futP = Marshal(john).to[MessageEntity]

现在的转换流程变成了:Person -> Json -> ByteString。Akka-http是通过RootJasonFormat[T]来提供转换隐式实例的:

/**
 * A special JsonFormat signaling that the format produces a legal JSON root object, i.e. either a JSON array
 * or a JSON object.
 */
trait RootJsonFormat[T] extends JsonFormat[T] with RootJsonReader[T] with RootJsonWriter[T]

RootJsonFormat[T]代表T类型实例的Json转换。RootJsonFormat[T]的继承父辈包括:
/**
  * Provides the JSON deserialization and serialization for type T.
 */
trait JsonFormat[T] extends JsonReader[T] with JsonWriter[T]

/**
 * A special JsonReader capable of reading a legal JSON root object, i.e. either a JSON array or a JSON object.
 */
@implicitNotFound(msg = "Cannot find RootJsonReader or RootJsonFormat type class for ${T}")
trait RootJsonReader[T] extends JsonReader[T]

/**
 * A special JsonWriter capable of writing a legal JSON root object, i.e. either a JSON array or a JSON object.
 */
@implicitNotFound(msg = "Cannot find RootJsonWriter or RootJsonFormat type class for ${T}")
trait RootJsonWriter[T] extends JsonWriter[T]

在我们的例子里Person的Marshaller隐式实例是通过jsonFormat2函数获取的:

  def jsonFormat2[P1 :JF, P2 :JF, T <: Product :ClassManifest](construct: (P1, P2) => T): RootJsonFormat[T] = {
    val Array(p1, p2) = extractFieldNames(classManifest[T])
    jsonFormat(construct, p1, p2)
  }
  def jsonFormat[P1 :JF, P2 :JF, T <: Product](construct: (P1, P2) => T, fieldName1: String, fieldName2: String): RootJsonFormat[T] = new RootJsonFormat[T]{
    def write(p: T) = {
      val fields = new collection.mutable.ListBuffer[(String, JsValue)]
      fields.sizeHint(2 * 3)
      fields ++= productElement2Field[P1](fieldName1, p, 0)
      fields ++= productElement2Field[P2](fieldName2, p, 1)
      JsObject(fields: _*)
    }
    def read(value: JsValue) = {
      val p1V = fromField[P1](value, fieldName1)
      val p2V = fromField[P2](value, fieldName2)
      construct(p1V, p2V)
    }
  }

就是这个函数返回了RootJsonFormat[T]。可以看到,功能的具体实现在jsonFormat函数里,在这里实现了对json数据结构的读写。jsonFormat2是在ProductFormatsInstances trait里的,也就是ProductFormats: 

trait ProductFormats extends ProductFormatsInstances {
  this: StandardFormats =>

我们上面例子里的Formats trait继承了DefaultJsonProtocal,这里面包括了所有json转换实例构建方法:

/**
  * Provides all the predefined JsonFormats.
 */
trait DefaultJsonProtocol
        extends BasicFormats
        with StandardFormats
        with CollectionFormats
        with ProductFormats
        with AdditionalFormats

object DefaultJsonProtocol extends DefaultJsonProtocol

再看看RootJasonFormat及相关继承情况:

/**
 * A special JsonFormat signaling that the format produces a legal JSON root object, i.e. either a JSON array
 * or a JSON object.
 */
trait RootJsonFormat[T] extends JsonFormat[T] with RootJsonReader[T] with RootJsonWriter[T]

/**
  * Provides the JSON deserialization and serialization for type T.
 */
trait JsonFormat[T] extends JsonReader[T] with JsonWriter[T]

/**
 * A special JsonReader capable of reading a legal JSON root object, i.e. either a JSON array or a JSON object.
 */
@implicitNotFound(msg = "Cannot find RootJsonReader or RootJsonFormat type class for ${T}")
trait RootJsonReader[T] extends JsonReader[T]

/**
 * A special JsonWriter capable of writing a legal JSON root object, i.e. either a JSON array or a JSON object.
 */
@implicitNotFound(msg = "Cannot find RootJsonWriter or RootJsonFormat type class for ${T}")
trait RootJsonWriter[T] extends JsonWriter[T]

下面是Spray-Json的具体实现:

package json {

  case class DeserializationException(msg: String, cause: Throwable = null, fieldNames: List[String] = Nil) extends RuntimeException(msg, cause)
  class SerializationException(msg: String) extends RuntimeException(msg)

  private[json] class PimpedAny[T](any: T) {
    def toJson(implicit writer: JsonWriter[T]): JsValue = writer.write(any)
  }

  private[json] class PimpedString(string: String) {
    @deprecated("deprecated in favor of parseJson", "1.2.6")
    def asJson: JsValue = parseJson
    def parseJson: JsValue = JsonParser(string)
  }
}

toJson,asJason分别需要JsonWriter,JsonReader的隐式实例。

从上面的讨论中我们对任意结构类型的一个实例进行序列化转换有了一定了解。这个类型的实例可以被是作为数据库的一条记录,通过上面讨论的方式在服务端和客户端进行交换。这是因为SprayJsonSupport可以提供任意类T的Marshaller[T,MessageEntity]隐式实例。

因为我们的主要目的是实现数据库表多行的交换,所以必须要实现以表行为元素数据流的数据交换,也就是说最起码能要在可视域内提供Marshall[Source[T],_],MessageEnity]及Unmarshaller[MessageEntity,Source[T,_]]的隐式实例才行。在服务端我们尝试过用complete(Source[T,NotUsed])来完成HttpResponse的构建。但单独用Marshal(source).to[Source[T,NotUsed]]则编译出错。这是因为Akka-http提供的是ToResponseMarshaller[Source[T,M]]的隐式实例:

implicit def fromEntityStreamingSupportAndByteStringMarshaller[T, M](implicit s: EntityStreamingSupport, m: ToByteStringMarshaller[T]): ToResponseMarshaller[Source[T, M]] = {
    Marshaller[Source[T, M], HttpResponse] { implicit ec ⇒ source ⇒
      FastFuture successful {
        Marshalling.WithFixedContentType(s.contentType, () ⇒ {
          val availableMarshallingsPerElement = source.mapAsync(1) { t ⇒ m(t)(ec) }

          // TODO optimise such that we pick the optimal marshalling only once (headAndTail needed?)
          // TODO, NOTE: this is somewhat duplicated from Marshal.scala it could be made DRYer
          val bestMarshallingPerElement = availableMarshallingsPerElement mapConcat { marshallings ⇒
            // pick the Marshalling that matches our EntityStreamingSupport
            (s.contentType match {
              case best @ (_: ContentType.Binary | _: ContentType.WithFixedCharset | _: ContentType.WithMissingCharset) ⇒
                marshallings collectFirst { case Marshalling.WithFixedContentType(`best`, marshal) ⇒ marshal }

              case best @ ContentType.WithCharset(bestMT, bestCS) ⇒
                marshallings collectFirst {
                  case Marshalling.WithFixedContentType(`best`, marshal) ⇒ marshal
                  case Marshalling.WithOpenCharset(`bestMT`, marshal)    ⇒ () ⇒ marshal(bestCS)
                }
            }).toList
          }
          val marshalledElements: Source[ByteString, M] =
            bestMarshallingPerElement.map(_.apply()) // marshal!
              .via(s.framingRenderer)

          HttpResponse(entity = HttpEntity(s.contentType, marshalledElements))
        }) :: Nil
      }
    }
  }

这个complete(m => ToResponseMarshallable)是个magnet-pattern函数,巧妙在ToResponseMarshallable参数类型:

/** Something that can later be marshalled into a response */
trait ToResponseMarshallable {
  type T
  def value: T
  implicit def marshaller: ToResponseMarshaller[T]

  def apply(request: HttpRequest)(implicit ec: ExecutionContext): Future[HttpResponse] =
    Marshal(value).toResponseFor(request)
}

object ToResponseMarshallable {
  implicit def apply[A](_value: A)(implicit _marshaller: ToResponseMarshaller[A]): ToResponseMarshallable =
    new ToResponseMarshallable {
      type T = A
      def value: T = _value
      def marshaller: ToResponseMarshaller[T] = _marshaller
    }

  implicit val marshaller: ToResponseMarshaller[ToResponseMarshallable] =
    Marshaller { implicit ec ⇒ marshallable ⇒ marshallable.marshaller(marshallable.value) }
}

magnet-pattern我们就不多谈了。但它的伴生对象中包含了对任何类型ToResponseMarshallable的隐式实例,所以complete能够通过编译。SprayJsonSupport中倒是提供了Unmarshaller[MessageEntity,T]的隐式实例: 

  // support for as[Source[T, NotUsed]]
  implicit def sprayJsonSourceReader[T](implicit reader: RootJsonReader[T], support: EntityStreamingSupport): FromEntityUnmarshaller[Source[T, NotUsed]] =
    Unmarshaller.withMaterializer { implicit ec ⇒ implicit mat ⇒ e ⇒
      if (support.supported.matches(e.contentType)) {
        val frames = e.dataBytes.via(support.framingDecoder)
        val unmarshal = sprayJsonByteStringUnmarshaller(reader)(_)
        val unmarshallingFlow =
          if (support.unordered) Flow[ByteString].mapAsyncUnordered(support.parallelism)(unmarshal)
          else Flow[ByteString].mapAsync(support.parallelism)(unmarshal)
        val elements = frames.viaMat(unmarshallingFlow)(Keep.right)
        FastFuture.successful(elements)
      } else FastFuture.failed(Unmarshaller.UnsupportedContentTypeException(support.supported))
    }

看来如果需要实现stream的双向交换,我们还必须提供Marshaller[Source[T,NotUsed],MessageEntity]以及Unmarshaller[MessageEntity,Source[T,NotUsed]]才行。篇幅所限,具体实现方法移到下篇讨论。

下面是本次讨论的示范源代码:

import akka.actor._
import akka.stream.scaladsl._
import akka.http.scaladsl.marshalling._
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.MediaTypes._
import akka.util.ByteString
import akka.http.scaladsl.marshallers.sprayjson._
import spray.json._


trait Formats extends SprayJsonSupport with DefaultJsonProtocol
object sprayConverters extends Formats {
  case class Person(id: Int, name: String)
  implicit val userFormat = jsonFormat2(Person.apply)
}


object MarshalDemo extends App {
  import sprayConverters._

  implicit val sys = ActorSystem("marshaller")
  implicit val ec = sys.dispatcher

  val aChars = Array[Char]('h','e','l','l','o')
  val aBytes = Array[Byte](0,1,2,3)
  val strHello = Marshal("Hello").to[MessageEntity]
  val chHello = Marshal(aChars).to[MessageEntity]
  val bt0123 = Marshal(aBytes).to[MessageEntity]

  implicit val PersonMarshaller: ToEntityMarshaller[Person] = personMarshaller(`text/plain`)
  def personMarshaller(mediaType: MediaType.WithOpenCharset): ToEntityMarshaller[Person] =
    Marshaller.withOpenCharset(mediaType) { (p, ps) ⇒ HttpEntity(mediaType withCharset ps, ByteString(p.toString)) }
  def personMarshaller(mediaType: MediaType.WithFixedCharset): ToEntityMarshaller[Person] =
    Marshaller.withFixedContentType(mediaType) { p ⇒ HttpEntity(mediaType, ByteString(p.toString)) }



  val john = Person(12,"John")
  val futP = Marshal(john).to[MessageEntity]

  val ps = (1 to 5).map {i => Person(i,s"member#i")}
  val source = Source(ps)

  import akka.http.scaladsl.common.{ EntityStreamingSupport, JsonEntityStreamingSupport }
  import akka.http.scaladsl.server.Directives._
  implicit val jsonStreamingSupport: JsonEntityStreamingSupport = EntityStreamingSupport.json()


  val route =
    path("data") {
 //     val fut = Marshal(source).to[Source[Person,NotUsed]]  //compile failed: implicit not found
      val fut2 = Marshal(source).toResponseFor(HttpRequest(method = HttpMethods.GET)) // compile ok!
      complete(source)  //ok due to magnet-patern type ToResponseMarshallable
    }

}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏吴伟祥

Velocity语法大全 转

本文转载自:http://www.cnblogs.com/codingsilence/archive/2011/03/29/2146580.html

484
来自专栏开源项目

Vue 2.0 学习总结,精华全在这里了

摘要:年后公司项目开始上vue2.0,自己对学习进行了总结,希望对大家有帮助! 1Vue 介绍 Vue 是什么? https://vuefe.cn/guide ...

27711
来自专栏前端侠2.0

asp。net5的依赖注入 原

昨天读asp.net5的doc,看到了configure的配置时,提到在controller中访问配置就是通过依赖注入的。asp.net5的很多功能都通过依赖注...

601
来自专栏web前端-

rem和em小插曲

1.对em来说,它的大小是相对于父层font-size来改变,但是如果其自身有font-size属性的话,em会优先考虑自身的font-size;

742
来自专栏IT 指南者专栏

前端系列之JavaScript基础知识概述

微信公众号:compassblog 欢迎关注、转发,互相学习,共同进步! 有任何问题,请后台留言联系! 1、什么是JavaScript (1)、JavaScri...

2929
来自专栏极客猴

Django 学习笔记之模板

本文是自己 Django 学习笔记系列的第四篇原创文章。主要接着篇文章的视图内容,讲解模板的用法。另外也说下 Django 学习笔记系列的安排。自己计划大概 1...

230
来自专栏LanceToBigData

struts2(四)之输入校验

前言   这个本来是昨天就写好的,但是不知道为什么没有保存成功!但是今天起来再写一遍就当巩固一下知识吧。 一、输入校验概述   在以前我们写一个登录页面时,并没...

1968
来自专栏向治洪

android代码混淆

proguard 原理 Java代码编译成二进制class 文件,这个class 文件也可以反编译成源代码 ,除了注释外,原来的code 基本都可以看到。为了...

1798
来自专栏塔奇克马敲代码

QDockWidget嵌套布局详解-实现Visual Studio布局

3066
来自专栏林德熙的博客

WPF UncommonField 类型是什么

本文告诉大家一个黑科技,这个黑科技在.net 框架外无法使用,这就是 UncommonField 。下面将会告诉大家这个类有什么用。

561

扫码关注云+社区