Akka(23): Stream:自定义流构件功能-Custom defined stream processing stages

    从总体上看:akka-stream是由数据源头Source,流通节点Flow和数据流终点Sink三个框架性的流构件(stream components)组成的。这其中:Source和Sink是stream的两个独立端点,而Flow处于stream Source和Sink中间可能由多个通道式的节点组成,每个节点代表某些数据流元素转化处理功能,它们的链接顺序则可能代表整体作业的流程。一个完整的数据流(可运行数据流)必须是一个闭合的数据流,即:从外表上看,数据流两头必须连接一个Source和一个Sink。我们可以直接把一个Sink连接到一个Source来获取一个最简单的可运行数据流,如下:

  Source(1 to 10).runWith(Sink.foreach(println))

从另一个角度说明:akka-stream又包括数据流图Graph及运算器Materializer两个部分。Graph代表运算方案,Materializer负责准备环境并把运算方案Graph放置到Actor系统里去实际运算产生效果(effects)及获取运算结果。所以:akka-stream必须有一个Graph描述功能和流程。每个Graph又可以由一些代表更细小功能的子Graph组成。一个可运行数据流必须由一个闭合的数据流图(closed graph)来代表,而这个ClosedGraph又是由代表不同数据转化处理功能的子图(sub-graph)组成。定制数据流功能就是针对Graph按功能需要进行自定义。

一个Graph可以用GraphShape和GraphStage两个部分来描述:GraphShape描述了Graph的输入输出端口数量,GraphStage描述数据在流通中的转化处理过程。我们先来分析一下GraphShape,它们的基类是Shape:

/**
 * A Shape describes the inlets and outlets of a [[Graph]]. In keeping with the
 * philosophy that a Graph is a freely reusable blueprint, everything that
 * matters from the outside are the connections that can be made with it,
 * otherwise it is just a black box.
 */
abstract class Shape {
  /**
   * Scala API: get a list of all input ports
   */
  def inlets: immutable.Seq[Inlet[_]]

  /**
   * Scala API: get a list of all output ports
   */
  def outlets: immutable.Seq[Outlet[_]]

  /**
   * Create a copy of this Shape object, returning the same type as the
   * original; this constraint can unfortunately not be expressed in the
   * type system.
   */
  def deepCopy(): Shape
...}

Shape的子类必须实现上面这三个抽象函数。akka-stream预先提供了一些基本的形状,包括SourceShape/FlowShape/SinkShape: 

/**
 * A Source [[Shape]] has exactly one output and no inputs, it models a source
 * of data.
 */
final case class SourceShape[+T](out: Outlet[T @uncheckedVariance]) extends Shape {
  override val inlets: immutable.Seq[Inlet[_]] = EmptyImmutableSeq
  override val outlets: immutable.Seq[Outlet[_]] = out :: Nil

  override def deepCopy(): SourceShape[T] = SourceShape(out.carbonCopy())
}
object SourceShape {
  /** Java API */
  def of[T](outlet: Outlet[T @uncheckedVariance]): SourceShape[T] =
    SourceShape(outlet)
}

/**
 * A Flow [[Shape]] has exactly one input and one output, it looks from the
 * outside like a pipe (but it can be a complex topology of streams within of
 * course).
 */
final case class FlowShape[-I, +O](in: Inlet[I @uncheckedVariance], out: Outlet[O @uncheckedVariance]) extends Shape {
  override val inlets: immutable.Seq[Inlet[_]] = in :: Nil
  override val outlets: immutable.Seq[Outlet[_]] = out :: Nil

  override def deepCopy(): FlowShape[I, O] = FlowShape(in.carbonCopy(), out.carbonCopy())
}
object FlowShape {
  /** Java API */
  def of[I, O](inlet: Inlet[I @uncheckedVariance], outlet: Outlet[O @uncheckedVariance]): FlowShape[I, O] =
    FlowShape(inlet, outlet)
}

还有一个稍微复杂点的双向流形状BidiShape: 

//#bidi-shape
/**
 * A bidirectional flow of elements that consequently has two inputs and two
 * outputs, arranged like this:
 *
 * {{{
 *        +------+
 *  In1 ~>|      |~> Out1
 *        | bidi |
 * Out2 <~|      |<~ In2
 *        +------+
 * }}}
 */
final case class BidiShape[-In1, +Out1, -In2, +Out2](
  in1:  Inlet[In1 @uncheckedVariance],
  out1: Outlet[Out1 @uncheckedVariance],
  in2:  Inlet[In2 @uncheckedVariance],
  out2: Outlet[Out2 @uncheckedVariance]) extends Shape {
  //#implementation-details-elided
  override val inlets: immutable.Seq[Inlet[_]] = in1 :: in2 :: Nil
  override val outlets: immutable.Seq[Outlet[_]] = out1 :: out2 :: Nil

  /**
   * Java API for creating from a pair of unidirectional flows.
   */
  def this(top: FlowShape[In1, Out1], bottom: FlowShape[In2, Out2]) = this(top.in, top.out, bottom.in, bottom.out)

  override def deepCopy(): BidiShape[In1, Out1, In2, Out2] =
    BidiShape(in1.carbonCopy(), out1.carbonCopy(), in2.carbonCopy(), out2.carbonCopy())

  //#implementation-details-elided
}
//#bidi-shape
object BidiShape {
  def fromFlows[I1, O1, I2, O2](top: FlowShape[I1, O1], bottom: FlowShape[I2, O2]): BidiShape[I1, O1, I2, O2] =
    BidiShape(top.in, top.out, bottom.in, bottom.out)

  /** Java API */
  def of[In1, Out1, In2, Out2](
    in1:  Inlet[In1 @uncheckedVariance],
    out1: Outlet[Out1 @uncheckedVariance],
    in2:  Inlet[In2 @uncheckedVariance],
    out2: Outlet[Out2 @uncheckedVariance]): BidiShape[In1, Out1, In2, Out2] =
    BidiShape(in1, out1, in2, out2)

}

还有一对多的UniformFanOutShape和多对一的UniformFanInShape。下面是我们自定义的一个多对多的Shape:

  case class TwoThreeShape[I, I2, O, O2, O3](
                                              in1: Inlet[I],
                                              in2: Inlet[I2],
                                              out1: Outlet[O],
                                              out2: Outlet[O2],
                                              out3: Outlet[O3]) extends Shape {

    override def inlets: immutable.Seq[Inlet[_]] = in1 :: in2 :: Nil

    override def outlets: immutable.Seq[Outlet[_]] = out1 :: out2 :: out3 :: Nil

    override def deepCopy(): Shape = TwoThreeShape(
      in1.carbonCopy(),
      in2.carbonCopy(),
      out1.carbonCopy(),
      out2.carbonCopy(),
      out3.carbonCopy()
    )
  }

这是一个二进三出的形状。我们只需要实现inlets,outlets和deepCopy这三个函数。

GraphStage描述了数据流构件的行为,通过数据流元素在构件中进出流动方式和在流动过程中的转变来定义流构件的具体功能。下面是GraphStage的类型定义:

/**
 * A GraphStage represents a reusable graph stream processing stage. A GraphStage consists of a [[Shape]] which describes
 * its input and output ports and a factory function that creates a [[GraphStageLogic]] which implements the processing
 * logic that ties the ports together.
 */
abstract class GraphStage[S <: Shape] extends GraphStageWithMaterializedValue[S, NotUsed] {
  final override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, NotUsed) =
    (createLogic(inheritedAttributes), NotUsed)

  @throws(classOf[Exception])
  def createLogic(inheritedAttributes: Attributes): GraphStageLogic
}

每个构件都需要根据需求通过实现createLogic来设计GraphStageLogic功能。GraphStageLogic定义如下:

/**
 * Represents the processing logic behind a [[GraphStage]]. Roughly speaking, a subclass of [[GraphStageLogic]] is a
 * collection of the following parts:
 *  * A set of [[InHandler]] and [[OutHandler]] instances and their assignments to the [[Inlet]]s and [[Outlet]]s
 *    of the enclosing [[GraphStage]]
 *  * Possible mutable state, accessible from the [[InHandler]] and [[OutHandler]] callbacks, but not from anywhere
 *    else (as such access would not be thread-safe)
 *  * The lifecycle hooks [[preStart()]] and [[postStop()]]
 *  * Methods for performing stream processing actions, like pulling or pushing elements
 *
 * The stage logic is completed once all its input and output ports have been closed. This can be changed by
 * setting `setKeepGoing` to true.
 *
 * The `postStop` lifecycle hook on the logic itself is called once all ports are closed. This is the only tear down
 * callback that is guaranteed to happen, if the actor system or the materializer is terminated the handlers may never
 * see any callbacks to `onUpstreamFailure`, `onUpstreamFinish` or `onDownstreamFinish`. Therefore stage resource
 * cleanup should always be done in `postStop`.
 */
abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: Int) {...}

GraphStageLogic主要负责通过InHandler和OutHandler响应输出输入端口的事件,对元素的转变和在端口上的流动方式进行控制:

/**
 * Collection of callbacks for an input port of a [[GraphStage]]
 */
trait InHandler {
  /**
   * Called when the input port has a new element available. The actual element can be retrieved via the
   * [[GraphStageLogic.grab()]] method.
   */
  @throws(classOf[Exception])
  def onPush(): Unit

  /**
   * Called when the input port is finished. After this callback no other callbacks will be called for this port.
   */
  @throws(classOf[Exception])
  def onUpstreamFinish(): Unit = GraphInterpreter.currentInterpreter.activeStage.completeStage()

  /**
   * Called when the input port has failed. After this callback no other callbacks will be called for this port.
   */
  @throws(classOf[Exception])
  def onUpstreamFailure(ex: Throwable): Unit = GraphInterpreter.currentInterpreter.activeStage.failStage(ex)
}

/**
 * Collection of callbacks for an output port of a [[GraphStage]]
 */
trait OutHandler {
  /**
   * Called when the output port has received a pull, and therefore ready to emit an element, i.e. [[GraphStageLogic.push()]]
   * is now allowed to be called on this port.
   */
  @throws(classOf[Exception])
  def onPull(): Unit

  /**
   * Called when the output port will no longer accept any new elements. After this callback no other callbacks will
   * be called for this port.
   */
  @throws(classOf[Exception])
  def onDownstreamFinish(): Unit = {
    GraphInterpreter
      .currentInterpreter
      .activeStage
      .completeStage()
  }
}

可以看到:我们需要实现InHandler.onPush()和OutHandler.onPull。akka-stream在数据流的各环节都实现了Reactive-Stream-Specification,所以对于输入端口InHandler来讲需要响应上游推送信号onPush,输出端口OutHandler要响应下游的读取信号onPull。就构件自身来说需要:从输入端口pull(in),对输出端口push(out)。

下面我们就示范设计一个循环产生一串指定字符的Source。Source只有一个输出端口,我们只需要观察输出端口下游的读取信号。所以在这种情况下我们只需要重写函数OutHandler即可:

class AlphaSource(chars: Seq[String]) extends GraphStage[SourceShape[String]] {
  val outport = Outlet[String]("output")
  val shape = SourceShape(outport)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) {
      var pos: Int = 0
      setHandler(outport,new OutHandler {
        override def onPull(): Unit = {
          push(outport,chars(pos))
          pos += 1
          if (pos == chars.length) pos = 0
        }
      })
    }
}

GraphStage类是Graph子类:

abstract class GraphStage[S <: Shape] extends GraphStageWithMaterializedValue[S, NotUsed] {...}
abstract class GraphStageWithMaterializedValue[+S <: Shape, +M] extends Graph[S, M] {...}

所以我们可以把AlphaSource当作Graph然后用Source.fromGraph来构建Source构件:

  val sourceGraph: Graph[SourceShape[String],NotUsed] = new AlphaSource(Seq("A","B","C","D"))
  val alphaSource = Source.fromGraph(sourceGraph).delay(1.second,DelayOverflowStrategy.backpressure)
  alphaSource.runWith(Sink.foreach(println))

同样对于Sink:我们只需要观察上游推送信号然后读取数据:

class UppercaseSink extends GraphStage[SinkShape[String]] {
  val inport = Inlet[String]("input")
  val shape = SinkShape(inport)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with InHandler {

      override def preStart(): Unit = pull(inport)

      override def onPush(): Unit = {
        println(grab(inport).toUpperCase)
        pull(inport)
      }

      setHandler(inport,this)

    }
}

从上面的AlphaSource,UppercaseSink我们略为尝试了一把数据流元素流动控制,主要是对输出输入端口状态变化采取一种被动的响应:通过push,pull来对端口进行操作。下面列出了一些常用的端口状态事件及操作方法:

输出端口状态变化事件是通过OutHandler中的回调函数(callback)来捕获的。用setHandler(out,outHandler)来注册OutHandler实例。下面是针对输出端口的操作函数:

1、push(out,elem):对端口推出数据,只容许在下游使用pull提出读取数据要求后才能进行,在此之前不容许多次调用

2、complete(out):正常手动关闭端口

3、fail(out,exeption):异常手动关闭端口

输出端口响应事件包括:

1、onPull():下游可以接收数据,此时可以用push(out,elem)来向输出端口发送数据

2、onDownStreamFinish():下游终止读取数据,此后不会再收到任何onPull事件

下面的函数可以获得输出端口的当前状态:

1、isAvailable(out):true代表可以使用push(out,elem)

2、isClosed(out):true代表输出端口已经关闭,无法聆听事件或者推送数据

同样,输入端口状态捕获是通过用setHandler(in,inHandler)登记的inHandler中callback实现的。输入端口操作函数包括:

1、pull(in):向上游提出读取数据要求,只容许在上游已经完成了数据推送后才能使用,在此之前不容许多次调用

2、grab(in):从端口读取当前数据,只有在上游完成了数据推送后才能使用,其中不容许多次调用

3、cancel(in):手动关闭输入端口

输入端口事件:

1、onPush():上游已经发送数据至输入端口,此时可以用grab(in)来读取当前数据,用pull(in)向上游要求下一个数据

2、onUpstreamFinish():上游已经终止数据发送,此后再不会捕获onPush事件,不得使用pull(in)向上游请求数据

3、onUpstreamFalure():上游异常终止

获取输入端口状态方法:

1、isAvailable(in):true代表现在可以使用grab(in)读取当前数据

2、hasBeenPulled(in):true代表已经使用pull(in)进行了数据读取要求,在此状态下不容许再次使用pull(in)

3、isClosed(in):true代表端口已经关闭,此后不可施用pull(in)及无法捕获onPush事件

从上面的pull(in)和push(out,elem)的功能描述可以得出它们是严格相互依赖、相互循环配合的,即:下游pull(in)前上游必须先push(out),而上游push(out,elem)前下游必须先pull(in)。这容易理解,因为akka-stream是Reactive-Stream,是push,pull结合模式上下游相互沟通的。但如此则很不方便某些应用场景,比如数据流动控制。akka-stream还提供了一套更简单的API使用户可以更灵活的对端口进行操作。这个API中的函数包括下面这些:

1、emit(out,elem):临时替换OutHandler,向端口发送elem,然后再恢复OutHandler

2、emitMultiple(out,Iterable(e1,e2,e3...)):临时替换OutHandler,向端口发送一串数据,然后再恢复OutHandler

3、read(in)(andThen):临时替换InHandler,从端口读取一个数据元素,然后再恢复InHandler

4、readN(in)(andThen):临时替换InHandler,从端口读取n个数据元素,然后再恢复InHandler

5、abortEmitting():取消输出端口上未完成的数据推送

6、abortReading():取消输入端口上未完成的读取操作

这个API实际上也支持reactive-stream-backpressure,我们从emitMultiple函数源代码中可以得出:

 /**
   * Emit a sequence of elements through the given outlet and continue with the given thunk
   * afterwards, suspending execution if necessary.
   * This action replaces the [[OutHandler]] for the given outlet if suspension
   * is needed and reinstalls the current handler upon receiving an `onPull()`
   * signal (before invoking the `andThen` function).
   */
  final protected def emitMultiple[T](out: Outlet[T], elems: Iterator[T], andThen: () ⇒ Unit): Unit =
    if (elems.hasNext) {
      if (isAvailable(out)) {
        push(out, elems.next())
        if (elems.hasNext)
          setOrAddEmitting(out, new EmittingIterator(out, elems, getNonEmittingHandler(out), andThen))
        else andThen()
      } else {
        setOrAddEmitting(out, new EmittingIterator(out, elems, getNonEmittingHandler(out), andThen))
      }
    } else andThen()

下面我们就定制一个Flow GraphStage,利用read/emit让用户自定义的函数可以控制数据流元素的流动和筛选。对于Flow,同时需要关注输入端口上游推送数据状态及输出端口上下游读取请求状态:

trait Row
trait Move
case object Stand extends Move
case class Next(rows: Iterable[Row]) extends Move

class FlowValve(controller: Row => Move) extends GraphStage[FlowShape[Row,Row]] {
  val inport = Inlet[Row]("input")
  val outport = Outlet[Row]("output")
  val shape = FlowShape.of(inport,outport)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with InHandler with OutHandler {
      override def onPush(): Unit = {
        controller(grab(inport)) match {
          case Next(rows) => emitMultiple(outport,rows)
          case _ => pull(inport)
        }
      }
      override def onPull(): Unit = pull(inport)
      setHandlers(inport,outport,this)
    }
}

上面这个FlowValve类型是专门为施用一个用户自定义函数controller而设的。controller函数根据上游推送的数据元素内容来决定Stand越过当前数据元素或者Next(...)向下游发送一或多个元素。当下游可以接受数据发出pull请求时FlowValve会把它直接传递给上游。下面是用户自定义函数的一个例子:

 case class Order(burger: String, qty: Int) extends Row
  case class Burger(msg: String) extends Row

  def orderDeliver: Row => Move = order => {
    order match {
      case Order(name,qty) =>

        if (qty > 0) {
          val burgers: Iterable[Burger] =
            (1 to qty).foldLeft(Iterable[Burger]()) { (b, a) =>
              b ++ Iterable(Burger(s"$name $a of ${qty}"))
            }
          Next(burgers)
        } else Stand
    }
  }


  val flowGraph: Graph[FlowShape[Row,Row],NotUsed] = new FlowValve(orderDeliver)
  val deliverFlow: Flow[Row,Row,NotUsed] = Flow.fromGraph(flowGraph)
  val orders = List(Order("cheeze",2),Order("beef",3),Order("pepper",1),Order("Rice",0)
                    ,Order("plain",1),Order("beef",2))

  Source(orders).via(deliverFlow).to(Sink.foreach(println)).run()

试运算结果显示如下: 

Burger(cheeze 1 of 2)
Burger(cheeze 2 of 2)
Burger(beef 1 of 3)
Burger(beef 2 of 3)
Burger(beef 3 of 3)
Burger(pepper 1 of 1)
Burger(plain 1 of 1)
Burger(beef 1 of 2)
Burger(beef 2 of 2)

正是我们预料的结果。对于一对多扩散型和多对一合并型形状的数据流构件akka-stream提供了UniformFanIn和UniformFanOut两种GraphStage。把这两个结合起来使用可以构建多对多形状的构件,所以预设定的GraphStage已经够用。

下面是本次示范涉及的源代码:

import akka.NotUsed
import akka.actor._
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.stream.stage._
import akka.stream._
import scala.concurrent.duration._
import scala.collection.immutable.Iterable

class AlphaSource(chars: Seq[String]) extends GraphStage[SourceShape[String]] {
  val outport = Outlet[String]("output")
  val shape = SourceShape(outport)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) {
      var pos: Int = 0
      setHandler(outport,new OutHandler {
        override def onPull(): Unit = {
          push(outport,chars(pos))
          pos += 1
          if (pos == chars.length) pos = 0
        }
      })
    }
}
class UppercaseSink extends GraphStage[SinkShape[String]] {
  val inport = Inlet[String]("input")
  val shape = SinkShape(inport)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with InHandler {

      override def preStart(): Unit = pull(inport)

      override def onPush(): Unit = {
        println(grab(inport).toUpperCase)
        pull(inport)
      }

      setHandler(inport,this)

    }
}

trait Row
trait Move
case object Stand extends Move
case class Next(rows: Iterable[Row]) extends Move

class FlowValve(controller: Row => Move) extends GraphStage[FlowShape[Row,Row]] {
  val inport = Inlet[Row]("input")
  val outport = Outlet[Row]("output")
  val shape = FlowShape.of(inport,outport)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with InHandler with OutHandler {
      override def onPush(): Unit = {
        controller(grab(inport)) match {
          case Next(rows) => emitMultiple(outport,rows)
          case _ => pull(inport)
        }
      }
      override def onPull(): Unit = pull(inport)
      setHandlers(inport,outport,this)
    }
}


object GraphStages extends App {
  implicit val sys = ActorSystem("demoSys")
  implicit val ec = sys.dispatcher
  implicit val mat = ActorMaterializer(
    ActorMaterializerSettings(sys)
      .withInputBuffer(initialSize = 16, maxSize = 16)
  )

  val sourceGraph: Graph[SourceShape[String],NotUsed] = new AlphaSource(Seq("a","b","c","d"))
  val alphaSource = Source.fromGraph(sourceGraph).delay(1.second,DelayOverflowStrategy.backpressure)
  // alphaSource.runWith(Sink.foreach(println))

  val sinkGraph: Graph[SinkShape[String],NotUsed] = new UppercaseSink
  val upperSink = Sink.fromGraph(sinkGraph)
  alphaSource.runWith(upperSink)

  case class Order(burger: String, qty: Int) extends Row
  case class Burger(msg: String) extends Row

  def orderDeliver: Row => Move = order => {
    order match {
      case Order(name,qty) =>

        if (qty > 0) {
          val burgers: Iterable[Burger] =
            (1 to qty).foldLeft(Iterable[Burger]()) { (b, a) =>
              b ++ Iterable(Burger(s"$name $a of ${qty}"))
            }
          Next(burgers)
        } else Stand
    }
  }


  val flowGraph: Graph[FlowShape[Row,Row],NotUsed] = new FlowValve(orderDeliver)
  val deliverFlow: Flow[Row,Row,NotUsed] = Flow.fromGraph(flowGraph)
  val orders = List(Order("cheeze",2),Order("beef",3),Order("pepper",1),Order("Rice",0)
                    ,Order("plain",1),Order("beef",2))

  Source(orders).via(deliverFlow).to(Sink.foreach(println)).run()


  // Source(1 to 10).runWith(Sink.foreach(println))

    scala.io.StdIn.readLine()
  sys.terminate()

}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏偏前端工程师的驿站

Design Pattern: Not Just Mixin Pattern

Brief                                 从Mix-In模式到Mixin模式,中文常用翻译为“混入/织入模式”。单纯从名字上看...

1836
来自专栏ascii0x03的安全笔记

【C】用C语言提取bmp图片像素,并进行K-means聚类分析——容易遇到的问题

关于bmp图片的格式,网上有很多文章,具体可以参考百度百科,也有例子程序。这里只提要注意的问题。 (1)结构体定义问题:首先按照百度百科介绍的定义了结构体,但是...

4006
来自专栏函数式编程语言及工具

Akka(21): Stream:实时操控:人为中断-KillSwitch

 akka-stream是多线程non-blocking模式的,一般来说,运算任务提交到另外线程后这个线程就会在当前程序控制之外自由运行了。任何时候如果需要终...

2226
来自专栏Felix的技术分享

Poj第1083题--Moving Tables

1866
来自专栏QQ音乐技术团队的专栏

​关于 M4A 文件的随机访问

文章介绍了 M4A 文件的大概结构,详细解读了其中的 Sample Table Box,并结合图例,详细讲解了如何使用它来完成 M4A 文件的随机访问。

2050
来自专栏bboysoul

1499: C语言实验题――鸡兔同笼

描述:“鸡兔同笼”是我国古代著名趣题之一。大约在1500年前,《孙子算经》中就记载了这个有趣的问题。书中是这样叙述的:“今有雉兔同笼,上有三十五头,下有九十四足...

482
来自专栏QQ音乐技术团队的专栏

​关于M4A文件的随机访问

文章介绍了M4A文件的大概结构,详细解读了其中的Sample Table Box,并结合图例,详细讲解了如何使用它来完成M4A文件的随机访问。 本文属原创作品,...

2408
来自专栏高性能服务器开发

(三)dict哈希结构2

dict.c; /* Hash Tables Implementation. * * This file implements in memory ha...

2619
来自专栏美团技术团队

函数式编程在Redux/React中的应用

本文简述了软件复杂度问题及应对策略:抽象和组合;展示了抽象和组合在函数式编程中的应用;并展示了Redux/React在解决前端状态管理的复杂度方面对上述理论的实...

3399
来自专栏更流畅、简洁的软件开发方式

好大一棵树,新春的祝福(一):n级分类的数据结构

     这个树的结构几年前在csdn里面也发过了一次,现在看看,主体结构居然没有什么变化,用了这么长的时间,自我感觉还是很好用的。而且在这个基础之上把其他的功...

2135

扫码关注云+社区