以太坊RPC机制

RPC简介

RPC(remote process call),即远程过程调用,意思就是两台物理位置不同的服务器,其中一台服务器的应用想调用另一台服务器上某个应用的函数或者方法,由于不在同一个内存空间不能直接调用,因此需要通过网络来表达语义以及传入的参数,RPC是跨操作系统,跨编程语言的网络通信方式。

RPC启动

我们可以通过执行以下命令来启动RPC:

geth --networkid 666 --datadir /home/ubuntu/Private_eth/eth1 --identity "node1" --rpc --rpcport "8545" --rpcaddr "192.168.174.212" --nodiscover --rpcapi "eth,net,web3,txpool,debug,miner" 

之后我们可以通过以下脚本进行RPC测试:

#!/usr/bin/env python3
import requests
URL = "http://192.168.174.212:8545/"
data = {
   "jsonrpc": "2.0", 
   "method": "eth_getBalance", 
   "params":["0x578efd53cf8342f4f5acfb6ee0ce9c7b3cfe2252", "latest"],
   "id":0 
}
response = requests.post(url=URL,json=data)
print(response.json())

源码分析

以太坊有四种RPC:HTTP RPC、Inproc RPC、IPC RPC、WS RPC,它们主要的实现逻辑都在rpc/server.go和rpc/client.go,各自根据自己的实现方式派生自己的client实例,建立各自的net.conn通道,由于HTTP RPC是基于短链接请求,实现方式和其他的不太一样,这里仅对RPC服务的启动以及HTTP RPC请求、HTTP RPC和非HTTP请求类的请求和响应做一个简单的介绍分析~

服务启动

RPC服务的启动与否是我们在通过geth来启动链节点时有参数--rpc来决定的,在geth函数中会调用startNode来启动一个node:

// filedir:go-ethereum-1.10.2\cmd\geth\main.go  L308
// geth is the main entry point into the system if no special subcommand is ran.
// It creates a default node based on the command line arguments and runs it in
// blocking mode, waiting for it to be shut down.
func geth(ctx *cli.Context) error {
  if args := ctx.Args(); len(args) > 0 {
    return fmt.Errorf("invalid command: %q", args[0])
  }

  prepare(ctx)
  stack, backend := makeFullNode(ctx)
  defer stack.Close()

  startNode(ctx, stack, backend)
  stack.Wait()
  return nil
}

startNode进而转去调用utils的StartNode函数,此处的utils为github.com/ethereum/go-ethereum/cmd/utils

// filedir:go-ethereum-1.10.2\cmd\geth\main.go  L325
// startNode boots up the system node and all registered protocols, after which
// it unlocks any requested accounts, and starts the RPC/IPC interfaces and the
// miner.
func startNode(ctx *cli.Context, stack *node.Node, backend ethapi.Backend) {
  debug.Memsize.Add("node", stack)

  // Start up the node itself
  utils.StartNode(ctx, stack)
......

之后再go-ethereum-1.10.2\cmd\utils\cmd.go的startNode函数中转而调用Node的start函数来启动服务,之后开启监听:

func StartNode(ctx *cli.Context, stack *node.Node) {
  if err := stack.Start(); err != nil {
    Fatalf("Error starting protocol stack: %v", err)
  }
  go func() {
    sigc := make(chan os.Signal, 1)
    signal.Notify(sigc, syscall.SIGINT, syscall.SIGTERM)
    defer signal.Stop(sigc)

    minFreeDiskSpace := ethconfig.Defaults.TrieDirtyCache
    if ctx.GlobalIsSet(MinFreeDiskSpaceFlag.Name) {
      minFreeDiskSpace = ctx.GlobalInt(MinFreeDiskSpaceFlag.Name)
    } else if ctx.GlobalIsSet(CacheFlag.Name) || ctx.GlobalIsSet(CacheGCFlag.Name) {
      minFreeDiskSpace = ctx.GlobalInt(CacheFlag.Name) * ctx.GlobalInt(CacheGCFlag.Name) / 100
    }
    if minFreeDiskSpace > 0 {
      go monitorFreeDiskSpace(sigc, stack.InstanceDir(), uint64(minFreeDiskSpace)*1024*1024)
    }

    <-sigc
    log.Info("Got interrupt, shutting down...")
    go stack.Close()
    for i := 10; i > 0; i-- {
      <-sigc
      if i > 1 {
        log.Warn("Already shutting down, interrupt more to panic.", "times", i-1)
      }
    }
    debug.Exit() // ensure trace and CPU profile data is flushed.
    debug.LoudPanic("boom")
  }()
}

Node.start函数进而去调用openEndpoints去开启RPC端点:

// filedir: go-ethereum-1.10.2\node\node.go
// Start starts all registered lifecycles, RPC services and p2p networking.
// Node can only be started once.
func (n *Node) Start() error {
  n.startStopLock.Lock()
  defer n.startStopLock.Unlock()

  n.lock.Lock()
  switch n.state {
  case runningState:
    n.lock.Unlock()
    return ErrNodeRunning
  case closedState:
    n.lock.Unlock()
    return ErrNodeStopped
  }
  n.state = runningState
  // open networking and RPC endpoints
  err := n.openEndpoints()
  lifecycles := make([]Lifecycle, len(n.lifecycles))
  copy(lifecycles, n.lifecycles)
  n.lock.Unlock()

  // Check if endpoint startup failed.
  if err != nil {
    n.doClose(nil)
    return err
  }
  // Start all registered lifecycles.
  var started []Lifecycle
  for _, lifecycle := range lifecycles {
    if err = lifecycle.Start(); err != nil {
      break
    }
    started = append(started, lifecycle)
  }
  // Check if any lifecycle failed to start.
  if err != nil {
    n.stopServices(started)
    n.doClose(nil)
  }
  return err
}

之后的openEndpoints调用startRPC来启动RPC服务:

// filedir:go-ethereum-1.10.2\node\node.go   L260
// openEndpoints starts all network and RPC endpoints.
func (n *Node) openEndpoints() error {
  // start networking endpoints
  n.log.Info("Starting peer-to-peer node", "instance", n.server.Name)
  if err := n.server.Start(); err != nil {
    return convertFileLockError(err)
  }
  // start RPC endpoints
  err := n.startRPC()
  if err != nil {
    n.stopRPC()
    n.server.Stop()
  }
  return err
}

startRPC具体实现如下:

// filedir: go-ethereum-1.10.2\node\node.go
// configureRPC is a helper method to configure all the various RPC endpoints during node
// startup. It's not meant to be called at any time afterwards as it makes certain
// assumptions about the state of the node.
func (n *Node) startRPC() error {
  if err := n.startInProc(); err != nil {
    return err
  }

  // Configure IPC.
  if n.ipc.endpoint != "" {
    if err := n.ipc.start(n.rpcAPIs); err != nil {
      return err
    }
  }

  // Configure HTTP.
  if n.config.HTTPHost != "" {
    config := httpConfig{
      CorsAllowedOrigins: n.config.HTTPCors,
      Vhosts:             n.config.HTTPVirtualHosts,
      Modules:            n.config.HTTPModules,
      prefix:             n.config.HTTPPathPrefix,
    }
    if err := n.http.setListenAddr(n.config.HTTPHost, n.config.HTTPPort); err != nil {
      return err
    }
    if err := n.http.enableRPC(n.rpcAPIs, config); err != nil {
      return err
    }
  }

  // Configure WebSocket.
  if n.config.WSHost != "" {
    server := n.wsServerForPort(n.config.WSPort)
    config := wsConfig{
      Modules: n.config.WSModules,
      Origins: n.config.WSOrigins,
      prefix:  n.config.WSPathPrefix,
    }
    if err := server.setListenAddr(n.config.WSHost, n.config.WSPort); err != nil {
      return err
    }
    if err := server.enableWS(n.rpcAPIs, config); err != nil {
      return err
    }
  }

  if err := n.http.start(); err != nil {
    return err
  }
  return n.ws.start()
}

在这里会调用startInProc来注册所有的RPC API接口信息:

// startInProc registers all RPC APIs on the inproc server.
func (n *Node) startInProc() error {
  for _, api := range n.rpcAPIs {
    if err := n.inprocHandler.RegisterName(api.Namespace, api.Service); err != nil {
      return err
    }
  }
  return nil
}

调用n.ipc.start(n.rpcAPIs)启动IPC

// filedir: go-ethereum-1.10.2\node\rpcstack.go
// Start starts the httpServer's http.Server
func (is *ipcServer) start(apis []rpc.API) error {
  is.mu.Lock()
  defer is.mu.Unlock()

  if is.listener != nil {
    return nil // already running
  }
  listener, srv, err := rpc.StartIPCEndpoint(is.endpoint, apis)
  if err != nil {
    is.log.Warn("IPC opening failed", "url", is.endpoint, "error", err)
    return err
  }
  is.log.Info("IPC endpoint opened", "url", is.endpoint)
  is.listener, is.srv = listener, srv
  return nil
}

调用n.http.enableRPC启动RPC服务并注册Handler:

// filedir:go-ethereum-1.10.2\node\rpcstack.go    L272
// enableRPC turns on JSON-RPC over HTTP on the server.
func (h *httpServer) enableRPC(apis []rpc.API, config httpConfig) error {
  h.mu.Lock()
  defer h.mu.Unlock()

  if h.rpcAllowed() {
    return fmt.Errorf("JSON-RPC over HTTP is already enabled")
  }

  // Create RPC server and handler.
  srv := rpc.NewServer()
  if err := RegisterApisFromWhitelist(apis, config.Modules, srv, false); err != nil {
    return err
  }
  h.httpConfig = config
  h.httpHandler.Store(&rpcHandler{
    Handler: NewHTTPHandlerStack(srv, config.CorsAllowedOrigins, config.Vhosts),
    server:  srv,
  })
  return nil
}

Handler注册跟踪如下(下面的ServeHTTP其实已经到了处理请求的逻辑了,这里不再深入,后面再做探究):

// filedir: go-ethereum-1.10.2\node\rpcstack.go
// NewHTTPHandlerStack returns wrapped http-related handlers
func NewHTTPHandlerStack(srv http.Handler, cors []string, vhosts []string) http.Handler {
  // Wrap the CORS-handler within a host-handler
  handler := newCorsHandler(srv, cors)
  handler = newVHostHandler(vhosts, handler)
  return newGzipHandler(handler)
}

// filedir:go-ethereum-1.10.2\node\rpcstack.go
func newGzipHandler(next http.Handler) http.Handler {
  return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
    if !strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") {
      next.ServeHTTP(w, r)
      return
    }

    w.Header().Set("Content-Encoding", "gzip")

    gz := gzPool.Get().(*gzip.Writer)
    defer gzPool.Put(gz)

    gz.Reset(w)
    defer gz.Close()

    next.ServeHTTP(&gzipResponseWriter{ResponseWriter: w, Writer: gz}, r)
  })
}
// filedir: go-ethereum-1.10.2\rpc\http.go
// ServeHTTP serves JSON-RPC requests over HTTP.
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  // Permit dumb empty requests for remote health-checks (AWS)
  if r.Method == http.MethodGet && r.ContentLength == 0 && r.URL.RawQuery == "" {
    w.WriteHeader(http.StatusOK)
    return
  }
  if code, err := validateRequest(r); err != nil {
    http.Error(w, err.Error(), code)
    return
  }
  // All checks passed, create a codec that reads directly from the request body
  // until EOF, writes the response to w, and orders the server to process a
  // single request.
  ctx := r.Context()
  ctx = context.WithValue(ctx, "remote", r.RemoteAddr)
  ctx = context.WithValue(ctx, "scheme", r.Proto)
  ctx = context.WithValue(ctx, "local", r.Host)
  if ua := r.Header.Get("User-Agent"); ua != "" {
    ctx = context.WithValue(ctx, "User-Agent", ua)
  }
  if origin := r.Header.Get("Origin"); origin != "" {
    ctx = context.WithValue(ctx, "Origin", origin)
  }

  w.Header().Set("content-type", contentType)
  codec := newHTTPServerConn(r, w)
  defer codec.close()
  s.serveSingleRequest(ctx, codec)
}

调用server.enableWS启动Websocket:

// filedir:go-ethereum-1.10.2\node\rpcstack.go
// enableWS turns on JSON-RPC over WebSocket on the server.
func (h *httpServer) enableWS(apis []rpc.API, config wsConfig) error {
  h.mu.Lock()
  defer h.mu.Unlock()

  if h.wsAllowed() {
    return fmt.Errorf("JSON-RPC over WebSocket is already enabled")
  }

  // Create RPC server and handler.
  srv := rpc.NewServer()
  if err := RegisterApisFromWhitelist(apis, config.Modules, srv, false); err != nil {
    return err
  }
  h.wsConfig = config
  h.wsHandler.Store(&rpcHandler{
    Handler: srv.WebsocketHandler(config.Origins),
    server:  srv,
  })
  return nil
}

Handler注册:

// filedir:  go-ethereum-1.10.2\rpc\websocket.go L45
// WebsocketHandler returns a handler that serves JSON-RPC to WebSocket connections.
//
// allowedOrigins should be a comma-separated list of allowed origin URLs.
// To allow connections with any origin, pass "*".
func (s *Server) WebsocketHandler(allowedOrigins []string) http.Handler {
  var upgrader = websocket.Upgrader{
    ReadBufferSize:  wsReadBuffer,
    WriteBufferSize: wsWriteBuffer,
    WriteBufferPool: wsBufferPool,
    CheckOrigin:     wsHandshakeValidator(allowedOrigins),
  }
  return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
    conn, err := upgrader.Upgrade(w, r, nil)
    if err != nil {
      log.Debug("WebSocket upgrade failed", "err", err)
      return
    }
    codec := newWebsocketCodec(conn)
    s.ServeCodec(codec, 0)
  })
}

// filedir: go-ethereum-1.10.2\rpc\websocket.go L241
func newWebsocketCodec(conn *websocket.Conn) ServerCodec {
  conn.SetReadLimit(wsMessageSizeLimit)
  wc := &websocketCodec{
    jsonCodec: NewFuncCodec(conn, conn.WriteJSON, conn.ReadJSON).(*jsonCodec),
    conn:      conn,
    pingReset: make(chan struct{}, 1),
  }
  wc.wg.Add(1)
  go wc.pingLoop()
  return wc
}
// filedir: go-ethereum-1.10.2\rpc\server.go
// ServeCodec reads incoming requests from codec, calls the appropriate callback and writes
// the response back using the given codec. It will block until the codec is closed or the
// server is stopped. In either case the codec is closed.
//
// Note that codec options are no longer supported.
func (s *Server) ServeCodec(codec ServerCodec, options CodecOption) {
  defer codec.close()

  // Don't serve if server is stopped.
  if atomic.LoadInt32(&s.run) == 0 {
    return
  }

  // Add the codec to the set so it can be closed by Stop.
  s.codecs.Add(codec)
  defer s.codecs.Remove(codec)

  c := initClient(codec, s.idgen, &s.services)
  <-codec.closed()
  c.Close()
}

最后调用start启动HTTP Server

// start starts the HTTP server if it is enabled and not already running.
func (h *httpServer) start() error {
  h.mu.Lock()
  defer h.mu.Unlock()

  if h.endpoint == "" || h.listener != nil {
    return nil // already running or not configured
  }

  // Initialize the server.
  h.server = &http.Server{Handler: h}
  if h.timeouts != (rpc.HTTPTimeouts{}) {
    CheckTimeouts(&h.timeouts)
    h.server.ReadTimeout = h.timeouts.ReadTimeout
    h.server.WriteTimeout = h.timeouts.WriteTimeout
    h.server.IdleTimeout = h.timeouts.IdleTimeout
  }

  // Start the server.
  listener, err := net.Listen("tcp", h.endpoint)
  if err != nil {
    // If the server fails to start, we need to clear out the RPC and WS
    // configuration so they can be configured another time.
    h.disableRPC()
    h.disableWS()
    return err
  }
  h.listener = listener
  go h.server.Serve(listener)

  if h.wsAllowed() {
    url := fmt.Sprintf("ws://%v", listener.Addr())
    if h.wsConfig.prefix != "" {
      url += h.wsConfig.prefix
    }
    h.log.Info("WebSocket enabled", "url", url)
  }
  // if server is websocket only, return after logging
  if !h.rpcAllowed() {
    return nil
  }
  // Log http endpoint.
  h.log.Info("HTTP server started",
    "endpoint", listener.Addr(),
    "prefix", h.httpConfig.prefix,
    "cors", strings.Join(h.httpConfig.CorsAllowedOrigins, ","),
    "vhosts", strings.Join(h.httpConfig.Vhosts, ","),
  )

  // Log all handlers mounted on server.
  var paths []string
  for path := range h.handlerNames {
    paths = append(paths, path)
  }
  sort.Strings(paths)
  logged := make(map[string]bool, len(paths))
  for _, path := range paths {
    name := h.handlerNames[path]
    if !logged[name] {
      log.Info(name+" enabled", "url", "http://"+listener.Addr().String()+path)
      logged[name] = true
    }
  }
  return nil
}

至此,启动完成~

请求发起

这里我们以RPC请求接口eth_getBalance为例进行分析,首先我们在全局搜索"eth_getBalance"关键字,确定其引用位置——ethclient.go L354

eth_getBalance的具体接口为——BalanceAt

// filedir:go-ethereum-1.10.2\ethclient\ethclient.go  L351
// BalanceAt returns the wei balance of the given account.
// The block number can be nil, in which case the balance is taken from the latest known block.
func (ec *Client) BalanceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (*big.Int, error) {
  var result hexutil.Big
  err := ec.c.CallContext(ctx, &result, "eth_getBalance", account, toBlockNumArg(blockNumber))
  return (*big.Int)(&result), err
}

之后跟进这里的关键操作函数ec.c.CallContext逻辑代码如下:

// filedir: go-ethereum-1.10.2\rpc\client.go  L286
// CallContext performs a JSON-RPC call with the given arguments. If the context is
// canceled before the call has successfully returned, CallContext returns immediately.
//
// The result must be a pointer so that package json can unmarshal into it. You
// can also pass nil, in which case the result is ignored.
func (c *Client) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error {
  if result != nil && reflect.TypeOf(result).Kind() != reflect.Ptr {
    return fmt.Errorf("call result parameter must be pointer or nil interface: %v", result)
  }
  msg, err := c.newMessage(method, args...)
  if err != nil {
    return err
  }
  op := &requestOp{ids: []json.RawMessage{msg.ID}, resp: make(chan *jsonrpcMessage, 1)}

  if c.isHTTP {
    err = c.sendHTTP(ctx, op, msg)
  } else {
    err = c.send(ctx, op, msg)
  }
  if err != nil {
    return err
  }

  // dispatch has accepted the request and will close the channel when it quits.
  switch resp, err := op.wait(ctx, c); {
  case err != nil:
    return err
  case resp.Error != nil:
    return resp.Error
  case len(resp.Result) == 0:
    return ErrNoResult
  default:
    return json.Unmarshal(resp.Result, &result)
  }
}

上面的newMessage用于处理请求数据:

// filedir: go-ethereum-1.10.2\rpc\client.go
func (c *Client) newMessage(method string, paramsIn ...interface{}) (*jsonrpcMessage, error) {
  msg := &jsonrpcMessage{Version: vsn, ID: c.nextID(), Method: method}
  if paramsIn != nil { // prevent sending "params":null
    var err error
    if msg.Params, err = json.Marshal(paramsIn); err != nil {
      return nil, err
    }
  }
  return msg, nil
}

这里的&jsonrpcMessage的数据格式和我们python脚本构建的请求相差无几:

#!/usr/bin/env python3
import requests
URL = "http://192.168.174.212:8545/"
data = {
   "jsonrpc": "2.0", 
   "method": "eth_getBalance", 
   "params":["0x578efd53cf8342f4f5acfb6ee0ce9c7b3cfe2252", "latest"],
   "id":0 
}
response = requests.post(url=URL,json=data)
print(response.json())

再这里我们的请求为http请求,所以会进入到sendHTTP方法中:

// filedir: go-ethereum-1.10.2\rpc\http.go
func (c *Client) sendHTTP(ctx context.Context, op *requestOp, msg interface{}) error {
  hc := c.writeConn.(*httpConn)
  respBody, err := hc.doRequest(ctx, msg)
  if respBody != nil {
    defer respBody.Close()
  }

  if err != nil {
    if respBody != nil {
      buf := new(bytes.Buffer)
      if _, err2 := buf.ReadFrom(respBody); err2 == nil {
        return fmt.Errorf("%v: %v", err, buf.String())
      }
    }
    return err
  }
  var respmsg jsonrpcMessage
  if err := json.NewDecoder(respBody).Decode(&respmsg); err != nil {
    return err
  }
  op.resp <- &respmsg
  return nil
}

在这里首先调用doRequest来读取请求并记录请求数据的长度,之后设置请求头信息,然后调用http的do请求,获取到请求的返回值resp

// filedir: go-ethereum-1.10.2\rpc\http.go  L175
func (hc *httpConn) doRequest(ctx context.Context, msg interface{}) (io.ReadCloser, error) {
  body, err := json.Marshal(msg)
  if err != nil {
    return nil, err
  }
  req, err := http.NewRequestWithContext(ctx, "POST", hc.url, ioutil.NopCloser(bytes.NewReader(body)))
  if err != nil {
    return nil, err
  }
  req.ContentLength = int64(len(body))

  // set headers
  hc.mu.Lock()
  req.Header = hc.headers.Clone()
  hc.mu.Unlock()

  // do request
  resp, err := hc.client.Do(req)
  if err != nil {
    return nil, err
  }
  if resp.StatusCode < 200 || resp.StatusCode >= 300 {
    return resp.Body, errors.New(resp.Status)
  }
  return resp.Body, nil
}

之后传进管道op.resp,向上回溯到CallContext()里面的op.wait(ctx)方法:

func (c *Client) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error {
  ......
    
  // dispatch has accepted the request and will close the channel when it quits.
  switch resp, err := op.wait(ctx, c); {
  case err != nil:
    return err
  case resp.Error != nil:
    return resp.Error
  case len(resp.Result) == 0:
    return ErrNoResult
  default:
    return json.Unmarshal(resp.Result, &result)
  }
}

wait的具体实现如下所示:

func (op *requestOp) wait(ctx context.Context, c *Client) (*jsonrpcMessage, error) {
  select {
  case <-ctx.Done():
    // Send the timeout to dispatch so it can remove the request IDs.
    if !c.isHTTP {
      select {
      case c.reqTimeout <- op:
      case <-c.closing:
      }
    }
    return nil, ctx.Err()
  case resp := <-op.resp:
    return resp, op.err
  }
}

之后resp recieve到op.resp管道的数据,然后对resp数据进行json序列化并返回~

请求处理

这里我们直接回溯到之前服务启动部分的HTTP请求处理模块(当时我们只是分析启动没有深入,这里我们深入追踪分析一波),在这里会首先通过调用validateRequest来验证请求的合法性,之后调用newHTTPServerConn建立一个HTTP请求连接,最后调用serveSingleRequest来处理请求:

// filedir: go-ethereum-1.10.2\rpc\http.go  L225
// ServeHTTP serves JSON-RPC requests over HTTP.
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  // Permit dumb empty requests for remote health-checks (AWS)
  if r.Method == http.MethodGet && r.ContentLength == 0 && r.URL.RawQuery == "" {
    w.WriteHeader(http.StatusOK)
    return
  }
  if code, err := validateRequest(r); err != nil {
    http.Error(w, err.Error(), code)
    return
  }
  // All checks passed, create a codec that reads directly from the request body
  // until EOF, writes the response to w, and orders the server to process a
  // single request.
  ctx := r.Context()
  ctx = context.WithValue(ctx, "remote", r.RemoteAddr)
  ctx = context.WithValue(ctx, "scheme", r.Proto)
  ctx = context.WithValue(ctx, "local", r.Host)
  if ua := r.Header.Get("User-Agent"); ua != "" {
    ctx = context.WithValue(ctx, "User-Agent", ua)
  }
  if origin := r.Header.Get("Origin"); origin != "" {
    ctx = context.WithValue(ctx, "Origin", origin)
  }

  w.Header().Set("content-type", contentType)
  codec := newHTTPServerConn(r, w)
  defer codec.close()
  s.serveSingleRequest(ctx, codec)
}

validateRequest实现代码如下所示,这里会校验请求的方法(不允许PUT、DELETE)、请求数据包大小(缺失可能会带来安全风险)、Content-Type等:

// validateRequest returns a non-zero response code and error message if the
// request is invalid.
func validateRequest(r *http.Request) (int, error) {
  if r.Method == http.MethodPut || r.Method == http.MethodDelete {
    return http.StatusMethodNotAllowed, errors.New("method not allowed")
  }
  if r.ContentLength > maxRequestContentLength {
    err := fmt.Errorf("content length too large (%d>%d)", r.ContentLength, maxRequestContentLength)
    return http.StatusRequestEntityTooLarge, err
  }
  // Allow OPTIONS (regardless of content-type)
  if r.Method == http.MethodOptions {
    return 0, nil
  }
  // Check content-type
  if mt, _, err := mime.ParseMediaType(r.Header.Get("content-type")); err == nil {
    for _, accepted := range acceptedContentTypes {
      if accepted == mt {
        return 0, nil
      }
    }
  }
  // Invalid content-type
  err := fmt.Errorf("invalid content type, only %s is supported", contentType)
  return http.StatusUnsupportedMediaType, err
}

serveSingleRequest实现逻辑如下所示,在这里会调用newHandler来获取用于处理请求的Handler:

// filedir:go-ethereum-1.10.2\rpc\server.go  L94
// serveSingleRequest reads and processes a single RPC request from the given codec. This
// is used to serve HTTP connections. Subscriptions and reverse calls are not allowed in
// this mode.
func (s *Server) serveSingleRequest(ctx context.Context, codec ServerCodec) {
  // Don't serve if server is stopped.
  if atomic.LoadInt32(&s.run) == 0 {
    return
  }

  h := newHandler(ctx, codec, s.idgen, &s.services)
  h.allowSubscribe = false
  defer h.close(io.EOF, nil)

  reqs, batch, err := codec.readBatch()
  if err != nil {
    if err != io.EOF {
      codec.writeJSON(ctx, errorMessage(&invalidMessageError{"parse error"}))
    }
    return
  }
  if batch {
    h.handleBatch(reqs)
  } else {
    h.handleMsg(reqs[0])
  }
}

newHandler实现代码如下所示,这里会调用newCallBack回调函数:

func newHandler(connCtx context.Context, conn jsonWriter, idgen func() ID, reg *serviceRegistry) *handler {
  rootCtx, cancelRoot := context.WithCancel(connCtx)
  h := &handler{
    reg:            reg,
    idgen:          idgen,
    conn:           conn,
    respWait:       make(map[string]*requestOp),
    clientSubs:     make(map[string]*ClientSubscription),
    rootCtx:        rootCtx,
    cancelRoot:     cancelRoot,
    allowSubscribe: true,
    serverSubs:     make(map[ID]*Subscription),
    log:            log.Root(),
  }
  if conn.remoteAddr() != "" {
    h.log = h.log.New("conn", conn.remoteAddr())
  }
  h.unsubscribeCb = newCallback(reflect.Value{}, reflect.ValueOf(h.unsubscribe))
  return h
}

newCallback函数实现如下所示:

// filedir:go-ethereum-1.10.2\rpc\service.go
// newCallback turns fn (a function) into a callback object. It returns nil if the function
// is unsuitable as an RPC callback.
func newCallback(receiver, fn reflect.Value) *callback {
  fntype := fn.Type()
  c := &callback{fn: fn, rcvr: receiver, errPos: -1, isSubscribe: isPubSub(fntype)}
  // Determine parameter types. They must all be exported or builtin types.
  c.makeArgTypes()

  // Verify return types. The function must return at most one error
  // and/or one other non-error value.
  outs := make([]reflect.Type, fntype.NumOut())
  for i := 0; i < fntype.NumOut(); i++ {
    outs[i] = fntype.Out(i)
  }
  if len(outs) > 2 {
    return nil
  }
  // If an error is returned, it must be the last returned value.
  switch {
  case len(outs) == 1 && isErrorType(outs[0]):
    c.errPos = 0
  case len(outs) == 2:
    if isErrorType(outs[0]) || !isErrorType(outs[1]) {
      return nil
    }
    c.errPos = 1
  }
  return c
}

之后调用startCallProc在一个新的goroutine中执行function并跟踪:

// filedir:go-ethereum-1.10.2\rpc\handler.go L219
// startCallProc runs fn in a new goroutine and starts tracking it in the h.calls wait group.
func (h *handler) startCallProc(fn func(*callProc)) {
  h.callWG.Add(1)
  go func() {
    ctx, cancel := context.WithCancel(h.rootCtx)
    defer h.callWG.Done()
    defer cancel()
    fn(&callProc{ctx: ctx})
  }()
}

这里的handleCallMsg用于处理请求并返回执行结果:

// filedir:go-ethereum-1.10.2\rpc\handler.go  L290
// handleCallMsg executes a call message and returns the answer.
func (h *handler) handleCallMsg(ctx *callProc, msg *jsonrpcMessage) *jsonrpcMessage {
  start := time.Now()
  switch {
  case msg.isNotification():
    h.handleCall(ctx, msg)
    h.log.Debug("Served "+msg.Method, "t", time.Since(start))
    return nil
  case msg.isCall():
    resp := h.handleCall(ctx, msg)
    var ctx []interface{}
    ctx = append(ctx, "reqid", idForLog{msg.ID}, "t", time.Since(start))
    if resp.Error != nil {
      ctx = append(ctx, "err", resp.Error.Message)
      if resp.Error.Data != nil {
        ctx = append(ctx, "errdata", resp.Error.Data)
      }
      h.log.Warn("Served "+msg.Method, ctx...)
    } else {
      h.log.Debug("Served "+msg.Method, ctx...)
    }
    return resp
  case msg.hasValidID():
    return msg.errorResponse(&invalidRequestError{"invalid request"})
  default:
    return errorMessage(&invalidRequestError{"invalid request"})
  }
}

这里的HandleCall用于处理方法调用

// filedir:go-ethereum-1.10.2\rpc\handler.go  L318
// handleCall processes method calls.
func (h *handler) handleCall(cp *callProc, msg *jsonrpcMessage) *jsonrpcMessage {
  if msg.isSubscribe() {
    return h.handleSubscribe(cp, msg)
  }
  var callb *callback
  if msg.isUnsubscribe() {
    callb = h.unsubscribeCb
  } else {
    callb = h.reg.callback(msg.Method)
  }
  if callb == nil {
    return msg.errorResponse(&methodNotFoundError{method: msg.Method})
  }
  args, err := parsePositionalArguments(msg.Params, callb.argTypes)
  if err != nil {
    return msg.errorResponse(&invalidParamsError{err.Error()})
  }
  start := time.Now()
  answer := h.runMethod(cp.ctx, msg, callb, args)

  // Collect the statistics for RPC calls if metrics is enabled.
  // We only care about pure rpc call. Filter out subscription.
  if callb != h.unsubscribeCb {
    rpcRequestGauge.Inc(1)
    if answer.Error != nil {
      failedReqeustGauge.Inc(1)
    } else {
      successfulRequestGauge.Inc(1)
    }
    rpcServingTimer.UpdateSince(start)
    newRPCServingTimer(msg.Method, answer.Error == nil).UpdateSince(start)
  }
  return answer
}

之后调用runMethod方法来处理请求:

// runMethod runs the Go callback for an RPC method.
func (h *handler) runMethod(ctx context.Context, msg *jsonrpcMessage, callb *callback, args []reflect.Value) *jsonrpcMessage {
  result, err := callb.call(ctx, msg.Method, args)
  if err != nil {
    return msg.errorResponse(err)
  }
  return msg.response(result)
}

紧接着调用Call方法执行最后的调用并返回执行结果:

// call invokes the callback.
func (c *callback) call(ctx context.Context, method string, args []reflect.Value) (res interface{}, errRes error) {
  // Create the argument slice.
  fullargs := make([]reflect.Value, 0, 2+len(args))
  if c.rcvr.IsValid() {
    fullargs = append(fullargs, c.rcvr)
  }
  if c.hasCtx {
    fullargs = append(fullargs, reflect.ValueOf(ctx))
  }
  fullargs = append(fullargs, args...)

  // Catch panic while running the callback.
  defer func() {
    if err := recover(); err != nil {
      const size = 64 << 10
      buf := make([]byte, size)
      buf = buf[:runtime.Stack(buf, false)]
      log.Error("RPC method " + method + " crashed: " + fmt.Sprintf("%v\n%s", err, buf))
      errRes = errors.New("method handler crashed")
    }
  }()
  // Run the callback.
  results := c.fn.Call(fullargs)
  if len(results) == 0 {
    return nil, nil
  }
  if c.errPos >= 0 && !results[c.errPos].IsNil() {
    // Method has returned non-nil error value.
    err := results[c.errPos].Interface().(error)
    return reflect.Value{}, err
  }
  return results[0].Interface(), nil
}
非 HTTP

当请求不是HTTP请求时会转而走向c.send(ctx, op, msg)方法,这里之所以这样设计是因为http是一个短连接,每次请求都是同步的,直接返回请求结果,而IPC、InProc、 websocket请求都是长连接,每次请求都是异步的,需要在网络线程外监听请求返回的结果

// CallContext performs a JSON-RPC call with the given arguments. If the context is
// canceled before the call has successfully returned, CallContext returns immediately.
//
// The result must be a pointer so that package json can unmarshal into it. You
// can also pass nil, in which case the result is ignored.
func (c *Client) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error {
  if result != nil && reflect.TypeOf(result).Kind() != reflect.Ptr {
    return fmt.Errorf("call result parameter must be pointer or nil interface: %v", result)
  }
  msg, err := c.newMessage(method, args...)
  if err != nil {
    return err
  }
  op := &requestOp{ids: []json.RawMessage{msg.ID}, resp: make(chan *jsonrpcMessage, 1)}

  if c.isHTTP {
    err = c.sendHTTP(ctx, op, msg)
  } else {
    err = c.send(ctx, op, msg)
  }
  if err != nil {
    return err
  }

  // dispatch has accepted the request and will close the channel when it quits.
  switch resp, err := op.wait(ctx, c); {
  case err != nil:
    return err
  case resp.Error != nil:
    return resp.Error
  case len(resp.Result) == 0:
    return ErrNoResult
  default:
    return json.Unmarshal(resp.Result, &result)
  }
}

send方法的具体实现代码如下所示,在这里请求会被select阻塞直到c.reqInit接收到Op,或者receive到ctx.Done():

// filedir:go-ethereum-1.10.2\rpc\client.go   L478
// send registers op with the dispatch loop, then sends msg on the connection.
// if sending fails, op is deregistered.
func (c *Client) send(ctx context.Context, op *requestOp, msg interface{}) error {
  select {
  case c.reqInit <- op:
    err := c.write(ctx, msg, false)
    c.reqSent <- err
    return err
  case <-ctx.Done():
    // This can happen if the client is overloaded or unable to keep up with
    // subscription notifications.
    return ctx.Err()
  case <-c.closing:
    return ErrClientQuit
  }
}

当c.reqInit接收到Op后会把请求的内容写入到conn通道中去

func (c *Client) write(ctx context.Context, msg interface{}, retry bool) error {
  // The previous write failed. Try to establish a new connection.
  if c.writeConn == nil {
    if err := c.reconnect(ctx); err != nil {
      return err
    }
  }
  err := c.writeConn.writeJSON(ctx, msg)
  if err != nil {
    c.writeConn = nil
    if !retry {
      return c.write(ctx, msg, true)
    }
  }
  return err
}

之后向上回溯到op.wait(ctx, c),在该函数中调用ctx.Done添加到请求队列中去:

// filedir :go-ethereum-1.10.2\rpc\client.go
func (op *requestOp) wait(ctx context.Context, c *Client) (*jsonrpcMessage, error) {
  select {
  case <-ctx.Done():
    // Send the timeout to dispatch so it can remove the request IDs.
    if !c.isHTTP {
      select {
      case c.reqTimeout <- op:
      case <-c.closing:
      }
    }
    return nil, ctx.Err()
  case resp := <-op.resp:
    return resp, op.err
  }
}

之后client的dispactch方法会收到这个结果

// filedir: go-ethereum-1.10.2\rpc\client.go L538
// dispatch is the main loop of the client.
// It sends read messages to waiting calls to Call and BatchCall
// and subscription notifications to registered subscriptions.
func (c *Client) dispatch(codec ServerCodec) {
  var (
    lastOp      *requestOp  // tracks last send operation
    reqInitLock = c.reqInit // nil while the send lock is held
    conn        = c.newClientConn(codec)
    reading     = true
  )
  defer func() {
    close(c.closing)
    if reading {
      conn.close(ErrClientQuit, nil)
      c.drainRead()
    }
    close(c.didClose)
  }()

  // Spawn the initial read loop.
  go c.read(codec)

  for {
    select {
    case <-c.close:
      return

    // Read path:
    case op := <-c.readOp:
      if op.batch {
        conn.handler.handleBatch(op.msgs)
      } else {
        conn.handler.handleMsg(op.msgs[0])
      }

    case err := <-c.readErr:
      conn.handler.log.Debug("RPC connection read error", "err", err)
      conn.close(err, lastOp)
      reading = false

    // Reconnect:
    case newcodec := <-c.reconnected:
      log.Debug("RPC client reconnected", "reading", reading, "conn", newcodec.remoteAddr())
      if reading {
        // Wait for the previous read loop to exit. This is a rare case which
        // happens if this loop isn't notified in time after the connection breaks.
        // In those cases the caller will notice first and reconnect. Closing the
        // handler terminates all waiting requests (closing op.resp) except for
        // lastOp, which will be transferred to the new handler.
        conn.close(errClientReconnected, lastOp)
        c.drainRead()
      }
      go c.read(newcodec)
      reading = true
      conn = c.newClientConn(newcodec)
      // Re-register the in-flight request on the new handler
      // because that's where it will be sent.
      conn.handler.addRequestOp(lastOp)

    // Send path:
    case op := <-reqInitLock:
      // Stop listening for further requests until the current one has been sent.
      reqInitLock = nil
      lastOp = op
      conn.handler.addRequestOp(op)

    case err := <-c.reqSent:
      if err != nil {
        // Remove response handlers for the last send. When the read loop
        // goes down, it will signal all other current operations.
        conn.handler.removeRequestOp(lastOp)
      }
      // Let the next request in.
      reqInitLock = c.reqInit
      lastOp = nil

    case op := <-c.reqTimeout:
      conn.handler.removeRequestOp(op)
    }
  }
}

之后通过c.read(codec)读取server通过conn返回的数据:

// filedir:go-ethereum-1.10.2\rpc\client.go  L630
// read decodes RPC messages from a codec, feeding them into dispatch.
func (c *Client) read(codec ServerCodec) {
  for {
    msgs, batch, err := codec.readBatch()
    if _, ok := err.(*json.SyntaxError); ok {
      codec.writeJSON(context.Background(), errorMessage(&parseError{err.Error()}))
    }
    if err != nil {
      c.readErr <- err
      return
    }
    c.readOp <- readOp{msgs, batch}
  }
}

之后将server返回数据send到c.readOp,之后调用handler(handleBatch\handleMsg)处理请求,后续逻辑和HTTP请求处理一致,这里不再赘述~

RPC使用

以太坊JSON RPC接口文档可以访问以下链接进行查看:

http://cw.hubwiz.com/card/c/ethereum-json-rpc-api/

这里主要分为以下几个模块:

  • web3:web3.js相关操作
  • net:与网络相关的操作
  • eth:以太坊关键RPC交互
  • db:数据库交互
  • shh:whisper相关操作

在以太坊中我们还可以通过JSON RPC来管理API,具体示例可以参考以下连接:

http://cw.hubwiz.com/card/c/geth-rpc-api/

其Geth模块又可以划分为以下几个模块:

  • admin:Geth节点管理
  • debug:Geth节点调试
  • miner:矿工和DAG管理
  • personal:帐户管理
  • txpool:事务池检查

文末小结

这里关于RPC的调用不再展开进行介绍了,有兴趣的读者可以结合前面的章节自我搭建以太坊测试链之后开启RPC调用支持后结合上面的说明文件进行测试~

本文分享自微信公众号 - 七芒星实验室(HeptagramSec),作者:Al1ex

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2021-05-06

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • ​【刘文彬】以太坊RPC机制与API实例

    原文链接:醒者呆的博客园,https://www.cnblogs.com/Evsward/p/eth-rpc.html

    圆方圆学院
  • 以太坊源码机制:挖矿

    狗年吉祥,开工利是,我们继续研究以太坊源码。从本篇文章开始,我们会深入到以太坊核心源码中去,进而分析与研究以太坊的核心技术。 关键字:拜占庭,挖矿,矿工...

    文彬
  • 原 以太坊-rpc原理及实现

    魂祭心
  • 以太坊rpc接口调用之nonce

    背景 我们在使用以太坊相关的json-rpc借口发送交易时,往往会出现这种现象:交易已经发送出去,也获得了交易的hash值。dev模式的geth也在正常挖矿,可...

    程序新视界
  • 以太坊源码分析---go-ethereum之rpc

    版权声明:本文为作者原创,如需转载请通知本人,并标明出处和作者。擅自转...

    月牙寂道长
  • 【深度知识】RPC原理及以太坊RPC的实现

    Remote Procedure Calls 远程过程调用 (RPC) 是一种协议,就是从一台机器(客户端)上通过参数传递的方式调用另一台机器(服务器)上的一个...

    辉哥
  • php程序员如何开发区块链、以太坊、智能合约的教程

    以太坊是备受关注的区块链,它基于密码学技术和P2P通信技术 构建了一个去中心化的平台,所有的交易同步保存在每个节点中, 通过将区块单向级联成链,以太坊有效的保证...

    笔阁
  • android和java程序员使用web3j进行区块链以太坊开发详解

    如何使用web3j为Java应用或Android App增加以太坊区块链支持,教程内容即涉及以太坊中的核心概念,例如账户管理包括账户的创建、钱包创建、交易转账,...

    用户2192618
  • java程序员使用web3j进行以太坊开发详解

    如何使用web3j为Java应用或Android App增加以太坊区块链支持,教程内容即涉及以太坊中的核心概念,例如账户管理包括账户的创建、钱包创建、交易转账,...

    笔阁
  • 以太坊应用开发接口:JSON RPC API

    以太坊应用开发接口指的是以太坊节点软件提供的API接口,去中心化应用可以利用这个接口访问以太坊上的智能合约。以太坊应用开发接口采用JSON-PRC标准,通常是通...

    用户1408045
  • 慢雾科技:EOS、以太坊网络攻防情报及智能合约安全分享

    区块链生态中恶意攻击事件频发?冲击过后我们还应当如何搭建安全堡垒?安全是区块链行业发展背后的坚实力量,技术则是在攻防战争中矛与盾的力量转化。这里有一份以技术为导...

    辉哥
  • 以太坊区块链 Asp.Net Core的安全API设计 (上)

    去中心化应用程序(DApp)的常见设计不仅依赖于以太坊区块链,还依赖于API层。在这种情况下,DApp通过用户的以太坊帐户与智能合约进行交互,并通过交换用户凭据...

    笔阁
  • 收藏贴 :2019年必备43种区块链开发工具 原

    本文列出2019年最新整理的用于区块链开发的43种流行的开发库、开发工具与开发框架。

    用户1408045
  • Microsoft Azure 以太坊节点自动化部署方案漏洞分析

    作者:sunsama@知道创宇404区块链安全研究团队 时间:2018/07/10

    Seebug漏洞平台
  • 动手编写一个以太坊智能合约

    区块链大本营
  • Microsoft Azure 以太坊节点自动化部署方案漏洞分析

    为了迎合以太坊区块链1发展需求,Microsoft Azure2早在2016年9月九推出了以太坊节点走自动部署的模块。部署情况如下:

    Seebug漏洞平台
  • 以太坊中GraphQL简介及使用

    以太坊在去年升级的go-ethereum(geth)1.9.0大版本,除了性能得到大幅提升之外,引入了GraphQL,一种节点接口查询机制,用以补充JSON-R...

    Tiny熊
  • 以太坊潜伏多年令全球黑客为之疯狂的“偷渡”漏洞引发偷币狂潮

    世界上有一群人,互联网对于他们来说就是提款机。 是的,过去是,现在更是,因为电子货币的出现,他们提款的速度变得更疯狂。 在2017年,我们的蜜罐监测到一起针对以...

    区块链领域
  • 以太坊预言机与智能合约开发

    什么是以太坊预言机?智能合约就其性质而言,能够运行各种算法并可以存储和查询数据。预言机可以监控区块链事件并能将监控结果发回智能合约。因为每个节点每次都需要大量计...

    笔阁

扫码关注云+社区

领取腾讯云代金券