Akka(23): Stream:自定义流构件功能-Custom defined stream processing stages

    从总体上看:akka-stream是由数据源头Source,流通节点Flow和数据流终点Sink三个框架性的流构件(stream components)组成的。这其中:Source和Sink是stream的两个独立端点,而Flow处于stream Source和Sink中间可能由多个通道式的节点组成,每个节点代表某些数据流元素转化处理功能,它们的链接顺序则可能代表整体作业的流程。一个完整的数据流(可运行数据流)必须是一个闭合的数据流,即:从外表上看,数据流两头必须连接一个Source和一个Sink。我们可以直接把一个Sink连接到一个Source来获取一个最简单的可运行数据流,如下:

  Source(1 to 10).runWith(Sink.foreach(println))

从另一个角度说明:akka-stream又包括数据流图Graph及运算器Materializer两个部分。Graph代表运算方案,Materializer负责准备环境并把运算方案Graph放置到Actor系统里去实际运算产生效果(effects)及获取运算结果。所以:akka-stream必须有一个Graph描述功能和流程。每个Graph又可以由一些代表更细小功能的子Graph组成。一个可运行数据流必须由一个闭合的数据流图(closed graph)来代表,而这个ClosedGraph又是由代表不同数据转化处理功能的子图(sub-graph)组成。定制数据流功能就是针对Graph按功能需要进行自定义。

一个Graph可以用GraphShape和GraphStage两个部分来描述:GraphShape描述了Graph的输入输出端口数量,GraphStage描述数据在流通中的转化处理过程。我们先来分析一下GraphShape,它们的基类是Shape:

/**
 * A Shape describes the inlets and outlets of a [[Graph]]. In keeping with the
 * philosophy that a Graph is a freely reusable blueprint, everything that
 * matters from the outside are the connections that can be made with it,
 * otherwise it is just a black box.
 */
abstract class Shape {
  /**
   * Scala API: get a list of all input ports
   */
  def inlets: immutable.Seq[Inlet[_]]

  /**
   * Scala API: get a list of all output ports
   */
  def outlets: immutable.Seq[Outlet[_]]

  /**
   * Create a copy of this Shape object, returning the same type as the
   * original; this constraint can unfortunately not be expressed in the
   * type system.
   */
  def deepCopy(): Shape
...}

Shape的子类必须实现上面这三个抽象函数。akka-stream预先提供了一些基本的形状,包括SourceShape/FlowShape/SinkShape: 

/**
 * A Source [[Shape]] has exactly one output and no inputs, it models a source
 * of data.
 */
final case class SourceShape[+T](out: Outlet[T @uncheckedVariance]) extends Shape {
  override val inlets: immutable.Seq[Inlet[_]] = EmptyImmutableSeq
  override val outlets: immutable.Seq[Outlet[_]] = out :: Nil

  override def deepCopy(): SourceShape[T] = SourceShape(out.carbonCopy())
}
object SourceShape {
  /** Java API */
  def of[T](outlet: Outlet[T @uncheckedVariance]): SourceShape[T] =
    SourceShape(outlet)
}

/**
 * A Flow [[Shape]] has exactly one input and one output, it looks from the
 * outside like a pipe (but it can be a complex topology of streams within of
 * course).
 */
final case class FlowShape[-I, +O](in: Inlet[I @uncheckedVariance], out: Outlet[O @uncheckedVariance]) extends Shape {
  override val inlets: immutable.Seq[Inlet[_]] = in :: Nil
  override val outlets: immutable.Seq[Outlet[_]] = out :: Nil

  override def deepCopy(): FlowShape[I, O] = FlowShape(in.carbonCopy(), out.carbonCopy())
}
object FlowShape {
  /** Java API */
  def of[I, O](inlet: Inlet[I @uncheckedVariance], outlet: Outlet[O @uncheckedVariance]): FlowShape[I, O] =
    FlowShape(inlet, outlet)
}

还有一个稍微复杂点的双向流形状BidiShape: 

//#bidi-shape
/**
 * A bidirectional flow of elements that consequently has two inputs and two
 * outputs, arranged like this:
 *
 * {{{
 *        +------+
 *  In1 ~>|      |~> Out1
 *        | bidi |
 * Out2 <~|      |<~ In2
 *        +------+
 * }}}
 */
final case class BidiShape[-In1, +Out1, -In2, +Out2](
  in1:  Inlet[In1 @uncheckedVariance],
  out1: Outlet[Out1 @uncheckedVariance],
  in2:  Inlet[In2 @uncheckedVariance],
  out2: Outlet[Out2 @uncheckedVariance]) extends Shape {
  //#implementation-details-elided
  override val inlets: immutable.Seq[Inlet[_]] = in1 :: in2 :: Nil
  override val outlets: immutable.Seq[Outlet[_]] = out1 :: out2 :: Nil

  /**
   * Java API for creating from a pair of unidirectional flows.
   */
  def this(top: FlowShape[In1, Out1], bottom: FlowShape[In2, Out2]) = this(top.in, top.out, bottom.in, bottom.out)

  override def deepCopy(): BidiShape[In1, Out1, In2, Out2] =
    BidiShape(in1.carbonCopy(), out1.carbonCopy(), in2.carbonCopy(), out2.carbonCopy())

  //#implementation-details-elided
}
//#bidi-shape
object BidiShape {
  def fromFlows[I1, O1, I2, O2](top: FlowShape[I1, O1], bottom: FlowShape[I2, O2]): BidiShape[I1, O1, I2, O2] =
    BidiShape(top.in, top.out, bottom.in, bottom.out)

  /** Java API */
  def of[In1, Out1, In2, Out2](
    in1:  Inlet[In1 @uncheckedVariance],
    out1: Outlet[Out1 @uncheckedVariance],
    in2:  Inlet[In2 @uncheckedVariance],
    out2: Outlet[Out2 @uncheckedVariance]): BidiShape[In1, Out1, In2, Out2] =
    BidiShape(in1, out1, in2, out2)

}

还有一对多的UniformFanOutShape和多对一的UniformFanInShape。下面是我们自定义的一个多对多的Shape:

  case class TwoThreeShape[I, I2, O, O2, O3](
                                              in1: Inlet[I],
                                              in2: Inlet[I2],
                                              out1: Outlet[O],
                                              out2: Outlet[O2],
                                              out3: Outlet[O3]) extends Shape {

    override def inlets: immutable.Seq[Inlet[_]] = in1 :: in2 :: Nil

    override def outlets: immutable.Seq[Outlet[_]] = out1 :: out2 :: out3 :: Nil

    override def deepCopy(): Shape = TwoThreeShape(
      in1.carbonCopy(),
      in2.carbonCopy(),
      out1.carbonCopy(),
      out2.carbonCopy(),
      out3.carbonCopy()
    )
  }

这是一个二进三出的形状。我们只需要实现inlets,outlets和deepCopy这三个函数。

GraphStage描述了数据流构件的行为,通过数据流元素在构件中进出流动方式和在流动过程中的转变来定义流构件的具体功能。下面是GraphStage的类型定义:

/**
 * A GraphStage represents a reusable graph stream processing stage. A GraphStage consists of a [[Shape]] which describes
 * its input and output ports and a factory function that creates a [[GraphStageLogic]] which implements the processing
 * logic that ties the ports together.
 */
abstract class GraphStage[S <: Shape] extends GraphStageWithMaterializedValue[S, NotUsed] {
  final override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, NotUsed) =
    (createLogic(inheritedAttributes), NotUsed)

  @throws(classOf[Exception])
  def createLogic(inheritedAttributes: Attributes): GraphStageLogic
}

每个构件都需要根据需求通过实现createLogic来设计GraphStageLogic功能。GraphStageLogic定义如下:

/**
 * Represents the processing logic behind a [[GraphStage]]. Roughly speaking, a subclass of [[GraphStageLogic]] is a
 * collection of the following parts:
 *  * A set of [[InHandler]] and [[OutHandler]] instances and their assignments to the [[Inlet]]s and [[Outlet]]s
 *    of the enclosing [[GraphStage]]
 *  * Possible mutable state, accessible from the [[InHandler]] and [[OutHandler]] callbacks, but not from anywhere
 *    else (as such access would not be thread-safe)
 *  * The lifecycle hooks [[preStart()]] and [[postStop()]]
 *  * Methods for performing stream processing actions, like pulling or pushing elements
 *
 * The stage logic is completed once all its input and output ports have been closed. This can be changed by
 * setting `setKeepGoing` to true.
 *
 * The `postStop` lifecycle hook on the logic itself is called once all ports are closed. This is the only tear down
 * callback that is guaranteed to happen, if the actor system or the materializer is terminated the handlers may never
 * see any callbacks to `onUpstreamFailure`, `onUpstreamFinish` or `onDownstreamFinish`. Therefore stage resource
 * cleanup should always be done in `postStop`.
 */
abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: Int) {...}

GraphStageLogic主要负责通过InHandler和OutHandler响应输出输入端口的事件,对元素的转变和在端口上的流动方式进行控制:

/**
 * Collection of callbacks for an input port of a [[GraphStage]]
 */
trait InHandler {
  /**
   * Called when the input port has a new element available. The actual element can be retrieved via the
   * [[GraphStageLogic.grab()]] method.
   */
  @throws(classOf[Exception])
  def onPush(): Unit

  /**
   * Called when the input port is finished. After this callback no other callbacks will be called for this port.
   */
  @throws(classOf[Exception])
  def onUpstreamFinish(): Unit = GraphInterpreter.currentInterpreter.activeStage.completeStage()

  /**
   * Called when the input port has failed. After this callback no other callbacks will be called for this port.
   */
  @throws(classOf[Exception])
  def onUpstreamFailure(ex: Throwable): Unit = GraphInterpreter.currentInterpreter.activeStage.failStage(ex)
}

/**
 * Collection of callbacks for an output port of a [[GraphStage]]
 */
trait OutHandler {
  /**
   * Called when the output port has received a pull, and therefore ready to emit an element, i.e. [[GraphStageLogic.push()]]
   * is now allowed to be called on this port.
   */
  @throws(classOf[Exception])
  def onPull(): Unit

  /**
   * Called when the output port will no longer accept any new elements. After this callback no other callbacks will
   * be called for this port.
   */
  @throws(classOf[Exception])
  def onDownstreamFinish(): Unit = {
    GraphInterpreter
      .currentInterpreter
      .activeStage
      .completeStage()
  }
}

可以看到:我们需要实现InHandler.onPush()和OutHandler.onPull。akka-stream在数据流的各环节都实现了Reactive-Stream-Specification,所以对于输入端口InHandler来讲需要响应上游推送信号onPush,输出端口OutHandler要响应下游的读取信号onPull。就构件自身来说需要:从输入端口pull(in),对输出端口push(out)。

下面我们就示范设计一个循环产生一串指定字符的Source。Source只有一个输出端口,我们只需要观察输出端口下游的读取信号。所以在这种情况下我们只需要重写函数OutHandler即可:

class AlphaSource(chars: Seq[String]) extends GraphStage[SourceShape[String]] {
  val outport = Outlet[String]("output")
  val shape = SourceShape(outport)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) {
      var pos: Int = 0
      setHandler(outport,new OutHandler {
        override def onPull(): Unit = {
          push(outport,chars(pos))
          pos += 1
          if (pos == chars.length) pos = 0
        }
      })
    }
}

GraphStage类是Graph子类:

abstract class GraphStage[S <: Shape] extends GraphStageWithMaterializedValue[S, NotUsed] {...}
abstract class GraphStageWithMaterializedValue[+S <: Shape, +M] extends Graph[S, M] {...}

所以我们可以把AlphaSource当作Graph然后用Source.fromGraph来构建Source构件:

  val sourceGraph: Graph[SourceShape[String],NotUsed] = new AlphaSource(Seq("A","B","C","D"))
  val alphaSource = Source.fromGraph(sourceGraph).delay(1.second,DelayOverflowStrategy.backpressure)
  alphaSource.runWith(Sink.foreach(println))

同样对于Sink:我们只需要观察上游推送信号然后读取数据:

class UppercaseSink extends GraphStage[SinkShape[String]] {
  val inport = Inlet[String]("input")
  val shape = SinkShape(inport)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with InHandler {

      override def preStart(): Unit = pull(inport)

      override def onPush(): Unit = {
        println(grab(inport).toUpperCase)
        pull(inport)
      }

      setHandler(inport,this)

    }
}

从上面的AlphaSource,UppercaseSink我们略为尝试了一把数据流元素流动控制,主要是对输出输入端口状态变化采取一种被动的响应:通过push,pull来对端口进行操作。下面列出了一些常用的端口状态事件及操作方法:

输出端口状态变化事件是通过OutHandler中的回调函数(callback)来捕获的。用setHandler(out,outHandler)来注册OutHandler实例。下面是针对输出端口的操作函数:

1、push(out,elem):对端口推出数据,只容许在下游使用pull提出读取数据要求后才能进行,在此之前不容许多次调用

2、complete(out):正常手动关闭端口

3、fail(out,exeption):异常手动关闭端口

输出端口响应事件包括:

1、onPull():下游可以接收数据,此时可以用push(out,elem)来向输出端口发送数据

2、onDownStreamFinish():下游终止读取数据,此后不会再收到任何onPull事件

下面的函数可以获得输出端口的当前状态:

1、isAvailable(out):true代表可以使用push(out,elem)

2、isClosed(out):true代表输出端口已经关闭,无法聆听事件或者推送数据

同样,输入端口状态捕获是通过用setHandler(in,inHandler)登记的inHandler中callback实现的。输入端口操作函数包括:

1、pull(in):向上游提出读取数据要求,只容许在上游已经完成了数据推送后才能使用,在此之前不容许多次调用

2、grab(in):从端口读取当前数据,只有在上游完成了数据推送后才能使用,其中不容许多次调用

3、cancel(in):手动关闭输入端口

输入端口事件:

1、onPush():上游已经发送数据至输入端口,此时可以用grab(in)来读取当前数据,用pull(in)向上游要求下一个数据

2、onUpstreamFinish():上游已经终止数据发送,此后再不会捕获onPush事件,不得使用pull(in)向上游请求数据

3、onUpstreamFalure():上游异常终止

获取输入端口状态方法:

1、isAvailable(in):true代表现在可以使用grab(in)读取当前数据

2、hasBeenPulled(in):true代表已经使用pull(in)进行了数据读取要求,在此状态下不容许再次使用pull(in)

3、isClosed(in):true代表端口已经关闭,此后不可施用pull(in)及无法捕获onPush事件

从上面的pull(in)和push(out,elem)的功能描述可以得出它们是严格相互依赖、相互循环配合的,即:下游pull(in)前上游必须先push(out),而上游push(out,elem)前下游必须先pull(in)。这容易理解,因为akka-stream是Reactive-Stream,是push,pull结合模式上下游相互沟通的。但如此则很不方便某些应用场景,比如数据流动控制。akka-stream还提供了一套更简单的API使用户可以更灵活的对端口进行操作。这个API中的函数包括下面这些:

1、emit(out,elem):临时替换OutHandler,向端口发送elem,然后再恢复OutHandler

2、emitMultiple(out,Iterable(e1,e2,e3...)):临时替换OutHandler,向端口发送一串数据,然后再恢复OutHandler

3、read(in)(andThen):临时替换InHandler,从端口读取一个数据元素,然后再恢复InHandler

4、readN(in)(andThen):临时替换InHandler,从端口读取n个数据元素,然后再恢复InHandler

5、abortEmitting():取消输出端口上未完成的数据推送

6、abortReading():取消输入端口上未完成的读取操作

这个API实际上也支持reactive-stream-backpressure,我们从emitMultiple函数源代码中可以得出:

 /**
   * Emit a sequence of elements through the given outlet and continue with the given thunk
   * afterwards, suspending execution if necessary.
   * This action replaces the [[OutHandler]] for the given outlet if suspension
   * is needed and reinstalls the current handler upon receiving an `onPull()`
   * signal (before invoking the `andThen` function).
   */
  final protected def emitMultiple[T](out: Outlet[T], elems: Iterator[T], andThen: () ⇒ Unit): Unit =
    if (elems.hasNext) {
      if (isAvailable(out)) {
        push(out, elems.next())
        if (elems.hasNext)
          setOrAddEmitting(out, new EmittingIterator(out, elems, getNonEmittingHandler(out), andThen))
        else andThen()
      } else {
        setOrAddEmitting(out, new EmittingIterator(out, elems, getNonEmittingHandler(out), andThen))
      }
    } else andThen()

下面我们就定制一个Flow GraphStage,利用read/emit让用户自定义的函数可以控制数据流元素的流动和筛选。对于Flow,同时需要关注输入端口上游推送数据状态及输出端口上下游读取请求状态:

trait Row
trait Move
case object Stand extends Move
case class Next(rows: Iterable[Row]) extends Move

class FlowValve(controller: Row => Move) extends GraphStage[FlowShape[Row,Row]] {
  val inport = Inlet[Row]("input")
  val outport = Outlet[Row]("output")
  val shape = FlowShape.of(inport,outport)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with InHandler with OutHandler {
      override def onPush(): Unit = {
        controller(grab(inport)) match {
          case Next(rows) => emitMultiple(outport,rows)
          case _ => pull(inport)
        }
      }
      override def onPull(): Unit = pull(inport)
      setHandlers(inport,outport,this)
    }
}

上面这个FlowValve类型是专门为施用一个用户自定义函数controller而设的。controller函数根据上游推送的数据元素内容来决定Stand越过当前数据元素或者Next(...)向下游发送一或多个元素。当下游可以接受数据发出pull请求时FlowValve会把它直接传递给上游。下面是用户自定义函数的一个例子:

 case class Order(burger: String, qty: Int) extends Row
  case class Burger(msg: String) extends Row

  def orderDeliver: Row => Move = order => {
    order match {
      case Order(name,qty) =>

        if (qty > 0) {
          val burgers: Iterable[Burger] =
            (1 to qty).foldLeft(Iterable[Burger]()) { (b, a) =>
              b ++ Iterable(Burger(s"$name $a of ${qty}"))
            }
          Next(burgers)
        } else Stand
    }
  }


  val flowGraph: Graph[FlowShape[Row,Row],NotUsed] = new FlowValve(orderDeliver)
  val deliverFlow: Flow[Row,Row,NotUsed] = Flow.fromGraph(flowGraph)
  val orders = List(Order("cheeze",2),Order("beef",3),Order("pepper",1),Order("Rice",0)
                    ,Order("plain",1),Order("beef",2))

  Source(orders).via(deliverFlow).to(Sink.foreach(println)).run()

试运算结果显示如下: 

Burger(cheeze 1 of 2)
Burger(cheeze 2 of 2)
Burger(beef 1 of 3)
Burger(beef 2 of 3)
Burger(beef 3 of 3)
Burger(pepper 1 of 1)
Burger(plain 1 of 1)
Burger(beef 1 of 2)
Burger(beef 2 of 2)

正是我们预料的结果。对于一对多扩散型和多对一合并型形状的数据流构件akka-stream提供了UniformFanIn和UniformFanOut两种GraphStage。把这两个结合起来使用可以构建多对多形状的构件,所以预设定的GraphStage已经够用。

下面是本次示范涉及的源代码:

import akka.NotUsed
import akka.actor._
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.stream.stage._
import akka.stream._
import scala.concurrent.duration._
import scala.collection.immutable.Iterable

class AlphaSource(chars: Seq[String]) extends GraphStage[SourceShape[String]] {
  val outport = Outlet[String]("output")
  val shape = SourceShape(outport)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) {
      var pos: Int = 0
      setHandler(outport,new OutHandler {
        override def onPull(): Unit = {
          push(outport,chars(pos))
          pos += 1
          if (pos == chars.length) pos = 0
        }
      })
    }
}
class UppercaseSink extends GraphStage[SinkShape[String]] {
  val inport = Inlet[String]("input")
  val shape = SinkShape(inport)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with InHandler {

      override def preStart(): Unit = pull(inport)

      override def onPush(): Unit = {
        println(grab(inport).toUpperCase)
        pull(inport)
      }

      setHandler(inport,this)

    }
}

trait Row
trait Move
case object Stand extends Move
case class Next(rows: Iterable[Row]) extends Move

class FlowValve(controller: Row => Move) extends GraphStage[FlowShape[Row,Row]] {
  val inport = Inlet[Row]("input")
  val outport = Outlet[Row]("output")
  val shape = FlowShape.of(inport,outport)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with InHandler with OutHandler {
      override def onPush(): Unit = {
        controller(grab(inport)) match {
          case Next(rows) => emitMultiple(outport,rows)
          case _ => pull(inport)
        }
      }
      override def onPull(): Unit = pull(inport)
      setHandlers(inport,outport,this)
    }
}


object GraphStages extends App {
  implicit val sys = ActorSystem("demoSys")
  implicit val ec = sys.dispatcher
  implicit val mat = ActorMaterializer(
    ActorMaterializerSettings(sys)
      .withInputBuffer(initialSize = 16, maxSize = 16)
  )

  val sourceGraph: Graph[SourceShape[String],NotUsed] = new AlphaSource(Seq("a","b","c","d"))
  val alphaSource = Source.fromGraph(sourceGraph).delay(1.second,DelayOverflowStrategy.backpressure)
  // alphaSource.runWith(Sink.foreach(println))

  val sinkGraph: Graph[SinkShape[String],NotUsed] = new UppercaseSink
  val upperSink = Sink.fromGraph(sinkGraph)
  alphaSource.runWith(upperSink)

  case class Order(burger: String, qty: Int) extends Row
  case class Burger(msg: String) extends Row

  def orderDeliver: Row => Move = order => {
    order match {
      case Order(name,qty) =>

        if (qty > 0) {
          val burgers: Iterable[Burger] =
            (1 to qty).foldLeft(Iterable[Burger]()) { (b, a) =>
              b ++ Iterable(Burger(s"$name $a of ${qty}"))
            }
          Next(burgers)
        } else Stand
    }
  }


  val flowGraph: Graph[FlowShape[Row,Row],NotUsed] = new FlowValve(orderDeliver)
  val deliverFlow: Flow[Row,Row,NotUsed] = Flow.fromGraph(flowGraph)
  val orders = List(Order("cheeze",2),Order("beef",3),Order("pepper",1),Order("Rice",0)
                    ,Order("plain",1),Order("beef",2))

  Source(orders).via(deliverFlow).to(Sink.foreach(println)).run()


  // Source(1 to 10).runWith(Sink.foreach(println))

    scala.io.StdIn.readLine()
  sys.terminate()

}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏跟着阿笨一起玩NET

LINQ to XML 从逗号分隔值 (CSV) 文件生成 XML 文件

参考:http://msdn.microsoft.com/zh-cn/library/bb387090.aspx

581
来自专栏.net core新时代

List,DataTable实现行转列的通用方案

  最近在做报表统计方面的需求,涉及到行转列报表。根据以往经验使用SQL可以比较容易完成,这次决定挑战一下直接通过代码方式完成行转列。期间遇到几个问题和用到的新...

1967
来自专栏菩提树下的杨过

[你必须知道的.Net]读书笔记--override与new在继承中的区别

前言: 这本书拿到手已经好长时间了,但由于种种原因一直没读完,也许是我太懒了,应该好好反省自我检讨一下。 所谓“书读百遍,其义自见”,虽然糊里糊涂做web开发六...

2059
来自专栏Golang语言社区

Golang下通过syscall调用win32的api

源于golang群中再次提到windows下获取磁盘空间的方法 由于golang的api并非完全跨平台, golang本身并没有直接提供windows下的方式 ...

3155
来自专栏吴生的专栏

spring boot 源码解析-SpringApplication初始化

就是这么简单的代码,构成了spring boot的世界. 那么代码中只有⼀个@SpringBootApplication 注解 和 调⽤了SpringAppli...

3525
来自专栏跟着阿笨一起玩NET

LINQ 从 CSV 文件生成 XML

本文参考:http://msdn.microsoft.com/zh-cn/library/bb387090.aspx

721
来自专栏尚国

CVE-2018-8174 EXP 0day python

usage: CVE-2018-8174.py [-h] -u URL -o OUTPUT [-i IP] [-p PORT]

2293
来自专栏葡萄城控件技术团队

Table-values parameter(TVP)系列之三: 利用Collection将其作为参数传给SP

一,回顾 上一部分讲述了“在ADO.NET中利用DataTable对象,将其作为参数传给存贮过程”。 通过DataTable实例,完成了两部分的内容: ...

1749
来自专栏数据结构与算法

树上倍增求LCA及例题

先瞎扯几句 树上倍增的经典应用是求两个节点的LCA 当然它的作用不仅限于求LCA,还可以维护节点的很多信息 求LCA的方法除了倍增之外,还有树链剖分、离线tar...

3196
来自专栏SeanCheney的专栏

《Pandas Cookbook》第08章 数据清理1. 用stack清理变量值作为列名2. 用melt清理变量值作为列名3. 同时stack多组变量4. 反转stacked数据5. 分组聚合后uns

第01章 Pandas基础 第02章 DataFrame运算 第03章 数据分析入门 第04章 选取数据子集 第05章 布尔索引 第06章 索引对齐 ...

712

扫码关注云+社区