前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Colly源码解析——框架

Colly源码解析——框架

作者头像
方亮
发布2019-01-16 17:08:27
1.1K0
发布2019-01-16 17:08:27
举报
文章被收录于专栏:方亮方亮

Colly是一个使用golang实现的数据抓取框架,我们可以使用它快速搭建类似网络爬虫这样的应用。本文我们将剖析其源码,以探析其中奥秘。(转载请指明出于breaksoftware的csdn博客)

        Collector是Colly的核心结构体,其中包含了用户对框架行为的定义。一般情况下,我们可以使用NewCollector方法构建一个它的指针

代码语言:javascript
复制
// NewCollector creates a new Collector instance with default configuration
func NewCollector(options ...func(*Collector)) *Collector {
	c := &Collector{}
	c.Init()

	for _, f := range options {
		f(c)
	}

	c.parseSettingsFromEnv()

	return c
}

        第4行调用了Init方法初始化了Collector的一些成员。然后遍历并调用不定长参数,这些参数都是函数类型——func(*Collector)。我们看个例子

代码语言:javascript
复制
	c := colly.NewCollector(
		// Visit only domains: coursera.org, www.coursera.org
		colly.AllowedDomains("coursera.org", "www.coursera.org"),
		// Cache responses to prevent multiple download of pages
		// even if the collector is restarted
		colly.CacheDir("./coursera_cache"),
	)

        AllowedDomains和CacheDir都返回一个匿名函数,其逻辑就是将Collector对象中对应的成员设置为指定的值

代码语言:javascript
复制
// AllowedDomains sets the domain whitelist used by the Collector.
func AllowedDomains(domains ...string) func(*Collector) {
	return func(c *Collector) {
		c.AllowedDomains = domains
	}
}

        Collector中绝大部分成员均有对应的方法,而且它们的名称(函数名和成员名)也一致。但是其中只有3个方法——ParseHTTPErrorResponse、AllowURLRevisit和IgnoreRobotsTxt比较特殊,因为它们没有参数。如果被调用,则对应的Collector成员会被设置为true

代码语言:javascript
复制
// AllowURLRevisit instructs the Collector to allow multiple downloads of the same URL
func AllowURLRevisit() func(*Collector) {
	return func(c *Collector) {
		c.AllowURLRevisit = true
	}
}

        再回到NewCollector函数,其最后一个逻辑是调用parseSettingsFromEnv方法。从名称我们可以看出它是用于解析环境变量的。将它放在最后是可以理解的,因为后面执行的逻辑可以覆盖前面的逻辑。这样我们可以让环境变量对应的设置生效。

代码语言:javascript
复制
func (c *Collector) parseSettingsFromEnv() {
	for _, e := range os.Environ() {
		if !strings.HasPrefix(e, "COLLY_") {
			continue
		}
		pair := strings.SplitN(e[6:], "=", 2)
		if f, ok := envMap[pair[0]]; ok {
			f(c, pair[1])
		} else {
			log.Println("Unknown environment variable:", pair[0])
		}
	}
}

        它从os.Environ()中获取系统环境变量,然后遍历它们。对于以COLLY_开头的变量,找到其在envMap中的对应方法,并调用之以覆盖之前设置的Collector成员变量值。envMap是一个<string,func>的映射,它是包内全局的。

代码语言:javascript
复制
var envMap = map[string]func(*Collector, string){
	"ALLOWED_DOMAINS": func(c *Collector, val string) {
		c.AllowedDomains = strings.Split(val, ",")
	},
	"CACHE_DIR": func(c *Collector, val string) {
		c.CacheDir = val
	},
……

        初始化完Collector,我们就可以让其发送请求。目前Colly公开了5个方法,其中3个是和Post相关的:Post、PostRaw和PostMultipart。一个Get请求方法:Visit。以及一个用户可以高度定制的方法:Request。这些方法底层都调用了scrape方法。比如Visit的实现是

代码语言:javascript
复制
func (c *Collector) Visit(URL string) error {
	return c.scrape(URL, "GET", 1, nil, nil, nil, true)
}

scrape

        scrape方法是需要我们展开分析的。因为它是Colly库中两个最重要的方法之一。

代码语言:javascript
复制
// scrape method
func (c *Collector) scrape(u, method string, depth int, requestData io.Reader, ctx *Context, hdr http.Header, checkRevisit bool) error {
	if err := c.requestCheck(u, method, depth, checkRevisit); err != nil {
		return err
	}

        首先requestCheck方法检测一些和递归深度以及URL相关的信息

代码语言:javascript
复制
func (c *Collector) requestCheck(u, method string, depth int, checkRevisit bool) error {
	if u == "" {
		return ErrMissingURL
	}
	if c.MaxDepth > 0 && c.MaxDepth < depth {
		return ErrMaxDepth
	}

        Collector的MaxDepth默认设置为0,即不用比较深度。如果它被设置值,则递归深度不可以超过它。

        然后检测URL是否在被禁止的URL过滤器中。如果在,则返回错误。

代码语言:javascript
复制
	if len(c.DisallowedURLFilters) > 0 {
		if isMatchingFilter(c.DisallowedURLFilters, []byte(u)) {
			return ErrForbiddenURL
		}
	}

        之后检测URL是否在准入的URL过滤器中。如果不在,则返回错误

代码语言:javascript
复制
	if len(c.URLFilters) > 0 {
		if !isMatchingFilter(c.URLFilters, []byte(u)) {
			return ErrNoURLFiltersMatch
		}
	}

        最后针对GET请求,检查其是否被请求过。

代码语言:javascript
复制
	if checkRevisit && !c.AllowURLRevisit && method == "GET" {
		h := fnv.New64a()
		h.Write([]byte(u))
		uHash := h.Sum64()
		visited, err := c.store.IsVisited(uHash)
		if err != nil {
			return err
		}
		if visited {
			return ErrAlreadyVisited
		}
		return c.store.Visited(uHash)
	}
	return nil
}

        通过这些检测后,scrape会对URL组成进行分析补齐

代码语言:javascript
复制
// scrape method
	parsedURL, err := url.Parse(u)
	if err != nil {
		return err
	}
	if parsedURL.Scheme == "" {
		parsedURL.Scheme = "http"
	}

        然后针对host进行精确匹配(在requestCheck中,是对URL使用正则进行匹配)。先检测host是否在被禁止的列表中,然后检测其是否在准入的列表中。

代码语言:javascript
复制
// scrape method
	if !c.isDomainAllowed(parsedURL.Host) {
		return ErrForbiddenDomain
	}
代码语言:javascript
复制
func (c *Collector) isDomainAllowed(domain string) bool {
	for _, d2 := range c.DisallowedDomains {
		if d2 == domain {
			return false
		}
	}
	if c.AllowedDomains == nil || len(c.AllowedDomains) == 0 {
		return true
	}
	for _, d2 := range c.AllowedDomains {
		if d2 == domain {
			return true
		}
	}
	return false
}

        通过上面检测,还需要检查是否需要遵从Robots协议

代码语言:javascript
复制
// scrape method
	if !c.IgnoreRobotsTxt {
		if err = c.checkRobots(parsedURL); err != nil {
			return err
		}
	}

        所有检测通过后,就需要填充请求了

代码语言:javascript
复制
// scrape method
	if hdr == nil {
		hdr = http.Header{"User-Agent": []string{c.UserAgent}}
	}
	rc, ok := requestData.(io.ReadCloser)
	if !ok && requestData != nil {
		rc = ioutil.NopCloser(requestData)
	}
	req := &http.Request{
		Method:     method,
		URL:        parsedURL,
		Proto:      "HTTP/1.1",
		ProtoMajor: 1,
		ProtoMinor: 1,
		Header:     hdr,
		Body:       rc,
		Host:       parsedURL.Host,
	}
	setRequestBody(req, requestData)

        第5~8行,使用类型断言等方法,将请求的数据(requestData)转换成io.ReadCloser接口数据。setRequestBody方法则是根据数据(requestData)的原始类型,设置Request结构中的GetBody方法

代码语言:javascript
复制
func setRequestBody(req *http.Request, body io.Reader) {
	if body != nil {
		switch v := body.(type) {
		case *bytes.Buffer:
			req.ContentLength = int64(v.Len())
			buf := v.Bytes()
			req.GetBody = func() (io.ReadCloser, error) {
				r := bytes.NewReader(buf)
				return ioutil.NopCloser(r), nil
			}
		case *bytes.Reader:
			req.ContentLength = int64(v.Len())
			snapshot := *v
			req.GetBody = func() (io.ReadCloser, error) {
				r := snapshot
				return ioutil.NopCloser(&r), nil
			}
		case *strings.Reader:
			req.ContentLength = int64(v.Len())
			snapshot := *v
			req.GetBody = func() (io.ReadCloser, error) {
				r := snapshot
				return ioutil.NopCloser(&r), nil
			}
		}
		if req.GetBody != nil && req.ContentLength == 0 {
			req.Body = http.NoBody
			req.GetBody = func() (io.ReadCloser, error) { return http.NoBody, nil }
		}
	}
}

        这种抽象方式,使得不同类型的requestData都可以通过统一的GetBody方法获取内容。目前Colly中发送数据有3种复合结构,分别是:mapstringstring、requestData []byte和mapstringbyte。对于普通的Post传送mapstringstring数据,Colly会使用createFormReader方法将其转换成Reader结构指针

代码语言:javascript
复制
func createFormReader(data map[string]string) io.Reader {
	form := url.Values{}
	for k, v := range data {
		form.Add(k, v)
	}
	return strings.NewReader(form.Encode())
}

        如果是一个二进制切片,则使用bytes.NewReader直接将其转换为Reader结构指针

        如果是mapstringbyte,则是Post数据的Multipart结构,使用createMultipartReader方法将其转换成Buffer结构指针。

代码语言:javascript
复制
func createMultipartReader(boundary string, data map[string][]byte) io.Reader {
	dashBoundary := "--" + boundary

	body := []byte{}
	buffer := bytes.NewBuffer(body)

	buffer.WriteString("Content-type: multipart/form-data; boundary=" + boundary + "\n\n")
	for contentType, content := range data {
		buffer.WriteString(dashBoundary + "\n")
		buffer.WriteString("Content-Disposition: form-data; name=" + contentType + "\n")
		buffer.WriteString(fmt.Sprintf("Content-Length: %d \n\n", len(content)))
		buffer.Write(content)
		buffer.WriteString("\n")
	}
	buffer.WriteString(dashBoundary + "--\n\n")
	return buffer
}

        回到scrape方法中,数据准备结束,开始正式获取数据

代码语言:javascript
复制
// scrape method
	u = parsedURL.String()
	c.wg.Add(1)
	if c.Async {
		go c.fetch(u, method, depth, requestData, ctx, hdr, req)
		return nil
	}
	return c.fetch(u, method, depth, requestData, ctx, hdr, req)
}

        通过第4行我们可以看到,可以通过Async参数决定是否异步的获取数据。

fetch

        在解析fetch方法前,我们要先介绍Collector的几个回调函数

代码语言:javascript
复制
	htmlCallbacks     []*htmlCallbackContainer
	xmlCallbacks      []*xmlCallbackContainer
	requestCallbacks  []RequestCallback
	responseCallbacks []ResponseCallback
	errorCallbacks    []ErrorCallback
	scrapedCallbacks  []ScrapedCallback

        以requestCallbacks为例,Colly提供了OnRequest方法用于注册回调。由于这些回调函数通过切片保存,所以可以多次调用注册方法。(即不是覆盖之前的注册回调)

代码语言:javascript
复制
// OnRequest registers a function. Function will be executed on every
// request made by the Collector
func (c *Collector) OnRequest(f RequestCallback) {
	c.lock.Lock()
	if c.requestCallbacks == nil {
		c.requestCallbacks = make([]RequestCallback, 0, 4)
	}
	c.requestCallbacks = append(c.requestCallbacks, f)
	c.lock.Unlock()
}

        用户则可以使用下面方法进行注册

代码语言:javascript
复制
	// Before making a request print "Visiting ..."
	c.OnRequest(func(r *colly.Request) {
		fmt.Println("Visiting", r.URL.String())
	})

        这些回调会被在handleOnXXXX类型的函数中被调用。调用的顺序和注册的顺序一致。

代码语言:javascript
复制
func (c *Collector) handleOnResponse(r *Response) {
	if c.debugger != nil {
		c.debugger.Event(createEvent("response", r.Request.ID, c.ID, map[string]string{
			"url":    r.Request.URL.String(),
			"status": http.StatusText(r.StatusCode),
		}))
	}
	for _, f := range c.responseCallbacks {
		f(r)
	}
}

        每次调用fetch方法都会构建一个全新Request结构。

代码语言:javascript
复制
// fetch method
func (c *Collector) fetch(u, method string, depth int, requestData io.Reader, ctx *Context, hdr http.Header, req *http.Request) error {
	defer c.wg.Done()
	if ctx == nil {
		ctx = NewContext()
	}
	request := &Request{
		URL:       req.URL,
		Headers:   &req.Header,
		Ctx:       ctx,
		Depth:     depth,
		Method:    method,
		Body:      requestData,
		collector: c,
		ID:        atomic.AddUint32(&c.requestCount, 1),
	}

        这儿注意一下3~5行ctx(上下文)的构建逻辑。如果传入的ctx为nil,则构建一个新的,否则使用老的。这就意味着Request结构体(以及之后出现的Response结构体)中的ctx可以是每次调用fetch时全新产生的,也可以是各个Request公用的。我们回溯下ctx的调用栈,发现只有func (c *Collector) Request(……)方法使用的不是nil

代码语言:javascript
复制
func (c *Collector) Request(method, URL string, requestData io.Reader, ctx *Context, hdr http.Header) error {
	return c.scrape(URL, method, 1, requestData, ctx, hdr, true)
}

        这也就意味着,调用Visit、Post、PostRaw和PostMultipart方法在每次调用fetch时都会产生一个新的上下文。

        由于Context存在被多个goroutine共享访问的可能性,所以其定义了读写锁进行保护

代码语言:javascript
复制
type Context struct {
	contextMap map[string]interface{}
	lock       *sync.RWMutex
}

        再回到fetch方法。数据填充完毕后,就提供了一次给用户干预之后流程的机会

代码语言:javascript
复制
// fetch method
	c.handleOnRequest(request)

	if request.abort {
		return nil
	}

        之前我们讲解过,handleOnRequest调用的是用户通过OnRequest注册个所有回调函数。如果用户在该回调中调用了下面方法,则之后的流程都不走了。

代码语言:javascript
复制
// Abort cancels the HTTP request when called in an OnRequest callback
func (r *Request) Abort() {
	r.abort = true
}

        如果用户没用终止执行,则开始发送请求

代码语言:javascript
复制
// fetch method
	if method == "POST" && req.Header.Get("Content-Type") == "" {
		req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
	}

	if req.Header.Get("Accept") == "" {
		req.Header.Set("Accept", "*/*")
	}

	origURL := req.URL
	response, err := c.backend.Cache(req, c.MaxBodySize, c.CacheDir)

        对于这次请求,不管是否出错都会触发用户定义的Error回调

代码语言:javascript
复制
// fetch method
	if err := c.handleOnError(response, err, request, ctx); err != nil {
		return err
	}

        在handleOnError函数中,回调函数会接收到err原因,所以用户自定义的错误处理函数需要通过该值来做区分。

代码语言:javascript
复制
	for _, f := range c.errorCallbacks {
		f(response, err)
	}
	return err

        正常请求后,fetch会使用ctx和修复后的request填充到response中

代码语言:javascript
复制
// fetch method
	if req.URL != origURL {
		request.URL = req.URL
		request.Headers = &req.Header
	}
	if proxyURL, ok := req.Context().Value(ProxyURLKey).(string); ok {
		request.ProxyURL = proxyURL
	}
	atomic.AddUint32(&c.responseCount, 1)
	response.Ctx = ctx
	response.Request = request

	err = response.fixCharset(c.DetectCharset, request.ResponseCharacterEncoding)
	if err != nil {
		return err
	}

        最后在一系列调用用户回调中结束fetch

代码语言:javascript
复制
// fetch method
	c.handleOnResponse(response)

	err = c.handleOnHTML(response)
	if err != nil {
		c.handleOnError(response, err, request, ctx)
	}

	err = c.handleOnXML(response)
	if err != nil {
		c.handleOnError(response, err, request, ctx)
	}

	c.handleOnScraped(response)

	return err
}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018年11月27日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • scrape
  • fetch
相关产品与服务
数据库一体机 TData
数据库一体机 TData 是融合了高性能计算、热插拔闪存、Infiniband 网络、RDMA 远程直接存取数据的数据库解决方案,为用户提供高可用、易扩展、高性能的数据库服务,适用于 OLAP、 OLTP 以及混合负载等各种应用场景下的极限性能需求,支持 Oracle、SQL Server、MySQL 和 PostgreSQL 等各种主流数据库。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档