JulyT、通用爬虫框架

一、背景

一直准备做一个iOS优质文章的聚合,每天定向爬取大V们的博客。所以就萌生了尝试打造一个通用爬虫框架的想法。加上近期开始golang的学习,所以选择使用go来写。

各章内容总结的比较粗糙,有什么问题,欢迎交流,欢迎各位大佬来喷。

传送门JulyT

1.1 初识

我给它起名叫JulyT,目前0.1版本支持Xpath数据解析,批量任务并发。简单的规则编写。就可以完成定向数据的提取.

例如:实现抓取列表,再抓取列表页详情数据,接着翻页继续

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func rule(node *Xpath.Node,spider *JulySpider.Spider)  {
path := Xpath.MustCompile("//*[@id=\"archive-page\"]/section")
it := path.Iter(node)

for it.Next() {
urlPath := Xpath.MustCompile("a/@href")
url,_:= urlPath.String(it.Node())
//
spider.RunNextStep("http://lastdays.cn"+url,analysisData)
}
fmt.Println("================一页数据==================")
nextPath := Xpath.MustCompile("//*[@id=\"page-nav\"]/a[@class=\"extend next\"]/@href")
if nextPath.Exists(node) {
url,_ := nextPath.String(node)
spider.RunNextStep("http://lastdays.cn"+url,rule)
}
}

1.2 组件简介

1.2.1 任务池

1
为每一个爬虫实例提供独立的运行空间。自动调度,自动回收空闲任务节点,任务节点复用。提供最底层的任务环境

1.2.2 调度器

1
管理所有请求,实现请求优先级调度。过滤重复请求。

1.2.3 下载器

1
提供高并发的HTML下载。

1.2.4 引擎

1
处理数据流,控制各个模块之间的调度。监控所有请求流程

1.2.5 spider

1
爬虫实例,支持规则自定义。

1.3 结构图

1.4 总结

有很多优秀的开源的爬虫框架。自己造这个轮子的目的也是希望得到一些锻炼,JulyT还有很多需要修正改进的地方,我会持续更新这个玩具。目前还缺少一个输出组件,自定义输出规则。会在0.2版本中体现出来。欢迎大家来喷。

二、任务池

2.1简介

可以理解这个taskPool为julyT提供一个稳定、独立的运行空间。听起来很像是一个多线程的服务,原谅我标题党,其实可以理解这个taskPool是一个协程池。每一个框架都希望具备高并发,性能稳定。所以julyT就寄希望于taskPool。

golang从出生那天起就带着高并发的标签。通过使用goroutine可以轻松实现并发。但是无限制的使用还是会造成很大的问题,需要增加调度控制,以及复用等能力。

目前并发模型主要有:IO复用,多进程,多线程几种。很多大型的高并发框架设计根据不同的应用场景来协同使用这几种。因为目前julyT的场景轻量一些,所以我、选择基于多线程来实现。

2.2 任务节点

2.2.1 结构

运行每一个爬虫任务。

1
2
3
4
5
6
type TaskNode struct {
pool *TaskPool //所述任务池
task chan funcTask //通道
isMultiplexing bool //是否为复用节点
recentUsageTime time.Time //最近最后使用时间
}

2.2.2 Run

1
2
3
4
5
6
7
8
9
10
11
12
13
func (t *TaskNode) run() {
go func() {
for task := range t.task {
if task == nil {
break
}
//执行任务
task()
//回收任务节点
t.pool.taskNodeGC(t)
}
}()
}

task为我们要执行的任务,原理就是通过遍历task通道,来监控任务并执行。

  • task数据为nil,释放节点。
  • 任务执行后执行回收,以便复用。

这里提供一个demo模型,方便理解

1
2
3
4
5
6
7
8
9
10
11
12
int main{
queue := make(chan string, 0)
// range函数遍历每个从通道接收到的数据,因为queue没有关闭通道
// 所以不会结束,从而阻塞等待接受数据。
go func() {
for elem := range queue {
fmt.Println(elem)
}
}()
queue <- "one"
queue <- "two"
}

2.3任务池

2.3.1 结构

TaskPool负责TaskNode的调度,管理等相关控制。

1
2
3
4
5
6
7
8
9
10
11
type TaskPool struct {
poolSize int32 //任务池大小
expiredDuration time.Duration //过期时间
taskNodes []*TaskNode //空闲任务节点
running int32 //运行中节点数
closeNotice chan closeSignal //TaskPool关闭通知
isUseCache bool //是否使用缓冲队列
cacheTasks []funcTask //TaskPool 缓冲队列
lock sync.Mutex //lock
once sync.Once //once
}

从注释可以看出来这些节点的基本作用,原理就是任务进来,先检测有没有空闲的任务节点,有的话使用,没有的话创建一个,任务完成后回收该节点。

2.3.2 处理Task

DealWithTask作为Task进入Pool的入口,通过这里统一分配节点。分配后通过通道设置任务,TaskNode便会自动执行。

初始化TaskPool,可以设置isUseCache,可以提供暂存待处理任务的能力。用于避免因为任务节点数限制,导致上层逻辑被阻塞。也可以作为一个串行入口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
type funcTask func() error
//提交任务,由内部调度分配
func (p *TaskPool) DealWithTask(task funcTask) error {
if len(p.closeNotice) > 0 {
return ErrorTaskPoolClose
}
if p.isUseCache {
p.cacheTask(task)
} else {
taskNode := p.getTaskNode()
taskNode.task <- task
}
return nil
}

2.3.3 缓存组件

用于缓存TaskPool的待处理缓存。其实就是先将任务节点提交至队列。然后通过listenCache的检测,发现队列中有待执行任务就取出,提交至TaskNode。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (p *TaskPool) cacheTask(task funcTask) {
p.lock.Lock()
defer p.lock.Unlock()

p.cacheTasks = append(p.cacheTasks, task)
}

func (p *TaskPool) listenCache() {
for {
if len(p.cacheTasks) <= 0 {
continue
}
task := p.cacheTasks[0]
p.cacheTasks[0] = nil
p.cacheTasks = p.cacheTasks[1:]

taskNode := p.getTaskNode()
taskNode.task <- task
}
}

因为JulyT刚刚完成,现在回看这里还是有很大问题的。通过for来实现监听不是一个很好的选择,后面我会考虑使用channel来控制。

2.3.4 节点调度

这一块是taskPool最核心的一块内容,主要就是负责节点的调度。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (p *TaskPool) getTaskNode() *TaskNode {
var t *TaskNode
if len(p.taskNodes) > 0 {
//取出空闲队列最后可用节点
t = p.getNodeFromTaskNodes()
} else {
//当前任务池还有空间
if p.Running() < p.PoolSize() {
t = new(TaskNode)
t.pool = p
t.task = make(chan funcTask, 1)
t.run()
p.runningIncrease()
} else {
//阻塞等待
for {
p.lock.Lock()
if len(p.taskNodes) <= 0 {
p.lock.Unlock()
continue
}
t = p.getNodeFromTaskNodes()
p.lock.Unlock()

break
}
}
}
return t
}
  • 首先就是判断空闲队列中是否有可用节点。前面已经提到过。节点完成任务后,不会被销毁,会被加入到空闲队列中,等待复用。
  • 如果没有空闲节点可以使用,但是任务池存在空余空间,可以生成一个节点。
  • 如果空闲队列没有可用节点,并且taskpool中没有可用空间,这里阻塞等待。

tips:这里的阻塞方案用的也是for,其实可以用channel替代的。会在下一个版本优化这里。(分享总结也是一个很好的思考机会)

2.2.5 获取节点

从空闲队列中获取节点的策略是取出队尾的节点。因为回收节点是把最近使用的节点添加到队尾

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//从空闲任务列表中获取任务节点,并且移除
func (p *TaskPool) getNodeFromTaskNodes() *TaskNode {

tempNodes := p.taskNodes
n := len(p.taskNodes)
if n <= 0 {
return nil
}

t := tempNodes[n-1]
tempNodes[n-1] = nil
p.taskNodes = tempNodes[:n-1]

return t
}

2.3.6 回收节点

负责将完成任务后的节点做回收,方便后续复用,并且将节点添加到队尾处。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//返回空闲队列
func (p *TaskPool) taskNodeGC(node *TaskNode) error {
if node == nil {
return ErrorGCNodeIsNil
}

p.lock.Lock()
defer p.lock.Unlock()

node.recentUsageTime = time.Now()
node.isMultiplexing = true
p.taskNodes = append(p.taskNodes, node)

return nil
}

2.3.6 删除节点

这是一个定期执行的任务,执行时间就是expiredDuration,为了防止大量的空闲节点存在,对每个空闲节点设置了最近时间时间,根据这个字段判断出这个节点被闲置的时间。如果超过了expiredDuration,就认为这个节点是多余的。

从队头开始遍历,找到过期节点。因为回收空闲节点与取出空闲节点都是从队尾开始。遍历到不满足过期节点的时候就可以停止遍历了。因为它的下一位一定是不满足过期时间的。所以只需要将头部节点与满足过期节点这个区间的所有节点清空就可以。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (p *TaskPool) idleNodeGC() {

ticker := time.NewTicker(p.expiredDuration)
tempNodes := p.taskNodes
for _= range ticker.C {
nowTime := time.Now()

p.lock.Lock()
if len(p.closeNotice) <= 0 {
return
}
n := 0
//查找过期节点
for i, t := range tempNodes {
if nowTime.Sub(t.recentUsageTime) <= p.expiredDuration {
break
}
n = i
t.task <- nil
tempNodes[i] = nil
}
if n+1 >= len(tempNodes) {
p.taskNodes = tempNodes[:0]
} else {
p.taskNodes = tempNodes[n+1:]
}
p.lock.Unlock()
}
}

2.4 总结

其中很多地方也涉及到锁的操作,为了保证数据的一致性。也有很多需要改进的地方。希望阅读的小伙伴能够给出宝贵的建议。

1
2
3
一个从iOS到后台的小工,更多代码细节,欢迎review源码

传送门[JulyT](https://github.com/ldaysjun/JulyT)

三、调度器

3.1 队列

3.1.1 简介

负责请求数据的管理,过滤重复下载链接。下载队列的优先级调度。

3.1.2 结构

1
2
3
4
5
6
7
8
9
type Queue struct {

status int //运行状态
matrix *Matrix //资源矩阵
matrixSize int //资源矩阵大小

pullHandle PullRequestHandle
pushHandle PushRequestHandle
}

pullHandle用于出队通知,pushHandle用于入队通知。

3.1.3 添加请求

1
2
3
4
5
6
func (queue *Queue)PushRequest(request *julyNet.CrawlRequest)  {
queue.matrix.addRequest(request)
if queue.pushHandle != nil {
queue.pushHandle()
}
}

通过操作资源矩阵,将数据加入队列。并且通知外部。

3.1.4 取出请求

1
2
3
4
5
6
7
8
func (queue *Queue)PullRequest() (request *julyNet.CrawlRequest){
request =queue.matrix.pullRequest()
if request!=nil && queue.pullHandle!=nil{
queue.pullHandle(request)
}

return request
}

通过资源矩阵,取出request,并且通知外部

3.2 资源矩阵

为queue提供存储服务,其中过滤器过滤重复的资源请求。

3.2.1 结构

1
2
3
4
5
6
7
type Matrix struct {
requests map[int][]*julyNet.CrawlRequest //所有资源
dupeFilter *DupeFilter //过滤器
priorities []int //优先级
resCount int32
lock sync.Mutex
}

3.2.2 添加资源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//添加请求
func (matrix *Matrix)addRequest(request *julyNet.CrawlRequest) {
matrix.lock.Lock()
defer matrix.lock.Unlock()

if request.Priority == 0 {
request.Priority = 1
}

//过滤检验,避免重复下载,设置NotFilter可以不检验,
if !request.NotFilter {
isRepeat:=matrix.dupeFilter.filter(request)
if isRepeat {
return
}
}

priority := request.Priority
if _,found:=matrix.requests[priority];!found {
matrix.requests[priority] = []*julyNet.CrawlRequest{}
}

//添加请求到队列
matrix.requests[priority] = append(matrix.requests[priority], request)
atomic.AddInt32(&matrix.resCount,1)
}

如果发现没有设置priority,那么默认设置为HighPriority,并使用过滤器过滤重复请求。

3.3.3 取出资源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//取出请求
func (matrix *Matrix)pullRequest() *julyNet.CrawlRequest{
matrix.lock.Lock()
defer matrix.lock.Unlock()
var request *julyNet.CrawlRequest


//按照优先级出队
for i:=0;i<len(matrix.priorities);i++ {
priority := matrix.priorities[i]
if len(matrix.requests[priority])>0 {

//取出队首元素
request = matrix.requests[priority][0]
matrix.requests[priority] = matrix.requests[priority][1:]
atomic.AddInt32(&matrix.resCount,-1)
break
}
}
return request
}

按照优先级顺序,取出数据。

3.4 过滤器

3.4.1 自定义

过滤器支持自定义,如果没有实现则默认使用july自带的过滤器

1
2
3
4
5
6
7
func (f *DupeFilter)RequestFilter(request *julyNet.CrawlRequest)  {
if f.Filter == nil {
f.filter(request)
}else {
f.Filter(request)
}
}

3.4.2 sha-1

利用SHA-1生成对应的指纹,最为url的唯一标识。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (f *DupeFilter)filter(request *julyNet.CrawlRequest) bool {

//生成指纹,过滤相同URL
h := sha1.New()
h.Write([]byte(request.Url))
bs:= h.Sum(nil)

finger := hex.EncodeToString(bs)
if !fingerSet.Contains(finger){
fingerSet.Add(finger)
return false
}
return true
}

3.5 总结

调度器为Engine提供所有请求的管理操作。存在一些需要优化的点:

  • 有可能存在相同URL但是对应不同的爬取规则,但是却被过滤掉了。所以这里需要做一个映射关系的管理。同一资源,可以对应不同的处理规则

四、下载器

4.1 Request

4.1.1 结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
type CrawlRequest struct {

UUID string //请求标识,作为映射
Url string //请求URL
Method string //请求方法
PostData string //post请求数据
Header http.Header //请求头
UseCookie bool //是否使用cookeie
DialTimeout time.Duration //创建超时时间
ConnTimeout time.Duration //连接超时时间
RetryTimes int //重试次数
RetryPause time.Duration //重试延时
RedirectTimes string //重定向次数
Proxy string //请求代理
NotFilter bool //是否入队校验
Priority int //优先级
Once sync.Once //Once控制,避免重复
DownloaderEngine int //下载引擎
}

通过注释基本上就可以了解每个字段的作为,这里直接把字段宝楼给外部。后面会进行一下封装,避免直接暴露。

4.2 Param

4.2.1 简介

提供参数的创建。默认使用GET方法。默认不使用Keep-Alive

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
func CreateParam(req *CrawlRequest) (param *Param, err error) {
param = new(Param)
param.url, err = StrToUrl(req.Url)

if err != nil {
return nil, err
}

param.header = req.Header
if param.header == nil {
param.header = make(http.Header)
}

if req.Proxy != "" {
param.proxy, err = url.Parse(req.Proxy)
if err != nil {
return nil, err
}
}

param.enableCookie = req.UseCookie

if param.dialTimeout = req.DialTimeout; param.dialTimeout < 0 {
param.dialTimeout = 0
}
param.connTimeout = req.ConnTimeout
param.retryTimes = req.RetryTimes
param.redirectTimes = req.RedirectTimes
param.retryPause = req.RetryPause

method := strings.ToUpper(req.Method)
switch method {
case "GET":
param.method = method
case "POST":
param.method = method
param.header.Add("Content-Type", "application/x-www-form-urlencoded")
strings.NewReader(req.PostData)
param.body = strings.NewReader(req.PostData)

default:
param.method = "GET"
}

//默认不使用Keep-Alive
param.header.Set("Connection", "Close")

return param, nil
}

4.3 Downloader

提供下载服务的分发。提供统一的下载接口。

4.3.1 接口

为了后面下载器实例的扩展,目前仅仅支持页面的基础访问,如果页面涉及到登入,数据请求等操作,需要模拟浏览器。所以设置Downer接口,方便扩展下载实例。

1
2
3
type Downer interface {
DownLoad(request *CrawlRequest) (rsp *http.Response, err error)
}

4.3.2下载入口

统一的下载入口,目前仅有一个消灾实例服,所以没有添加区分。后面会支持phantomjs下载、模拟登入等。

1
2
3
4
5
6
7
8
9
10
11
func (d *Downloader)DownLoad(req *CrawlRequest) (rsp *http.Response, err error) {
rsp, err = d.julyHttp.DownLoad(req)
if err != nil{
fmt.Println(err.Error())
return
}
if d.DownFinishHandle !=nil {
d.DownFinishHandle(rsp,req.UUID)
}
return rsp,err
}

4.4 download

4.4.1 下载策略

具体下载服务实例,根据retryTimes判断是否有重试机制,并且重试几次。如果设置重试,根据retryPause字段控制重试时间间隔。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (self *JulyHttp) httpRequest(param *Param) (rsp *http.Response, err error) {
//fmt.Println("httpRequest:",)

req, err := http.NewRequest(param.method, param.url.String(), param.body)
if err != nil {
return nil, err
}

req.Header = param.header
if param.retryTimes <= 0 {
rsp, err = param.client.Do(req)
} else {
for i := 0; i < param.retryTimes; i++ {
rsp, err = param.client.Do(req)
if err != nil {
time.Sleep(param.retryPause)
continue
}
break
}

}

return rsp, err
}

4.4.2 下载实例

使用http.Client,并且自定义Transport设置是否使用cookie,是否使用代理等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (self *JulyHttp) createClient(param *Param) *http.Client {

client := &http.Client{}

transparent := &http.Transport{
Dial: func(network, addr string) (net.Conn, error) {
conn, err := net.DialTimeout(network, addr, param.dialTimeout)
if err != nil {
return nil, err
}
return conn, nil
},
}
if param.url.Scheme == "https" {
transparent.TLSClientConfig = &tls.Config{RootCAs: nil, InsecureSkipVerify: true}
transparent.DisableCompression = true
}

if param.proxy != nil {
transparent.Proxy = http.ProxyURL(param.proxy)
}

if param.enableCookie {
client.Jar = self.CookieJar
}

client.Transport = transparent
return client
}

4.5 总结

go本身对http相关服务已经支持的很好了。只需要根据http的接口做相关的自定义,添加一些策略就可以。可以理解为复杂的http请求。下载组件还有多需要完善的地方,比如提供phantomjs实例,添加请求中间件,提供下载前的数据处理等操作。

五、爬取

5.1 Cralwer

负责spider的调度,这里有一个概念,我之前为每一个spider分配了一个独立的运行空间。因为考虑到翻页等逻辑,每一个独立空间使用串行方式。

5.1.1 调度

判断该spider是否为子spider,如果是则采用直连的方式,将该spider直接送入下载队列中。(为什么会有SonSpider的概念,后面会详细描述),非SonSpider,通知engine,添加任务,分配运行空间。根据规则进行流转。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//spider入队、如果需要异步入队,需要加锁
func (crawler *Crawler)PushSpider(spider *Spider) {
crawler.matrix.pushSpider(spider)

if spider.SonSpider {
if crawler.CrawlerPushRequestHandle!=nil {
crawler.CrawlerPushRequestHandle(spider.Request)
}
}else {
if crawler.crawlerPushHandle != nil{
crawler.crawlerPushHandle()
}
}
}

提取spider,通知engine进行接收。

1
2
3
4
5
6
7
8
//提取spider
func (crawler *Crawler)PullSpider() {
spider := crawler.matrix.pullSpider()

if crawler.crawlerPullHandle != nil && spider!=nil{
crawler.crawlerPullHandle(spider)
}
}

5.1.2 资源矩阵

该资源矩阵用于管理所有的spider实例。基本的添加和提取spider能力。需要特殊说明的是,在添加的时候,判断该spider是否为SonSpider,如果是,则直接添加到待处理队列中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
//添加spider
func (matrix *Matrix)pushSpider(spider *Spider){
matrix.lock.Lock()
defer matrix.lock.Unlock()

if !spider.SonSpider {
//添加请求到队列
matrix.spiders = append(matrix.spiders, spider)
atomic.AddInt32(&matrix.resCount,1)
}else {
//将数据加到待处理
if _,found:=crawler.Process[spider.Request.UUID];!found {
crawler.Process[spider.Request.UUID] = spider
}
}
}

func (matrix *Matrix)pullSpider() *Spider{
matrix.lock.Lock()
defer matrix.lock.Unlock()

if len(matrix.spiders)<=0 {
return nil
}

//取出队首元素
spider := matrix.spiders[0]
matrix.spiders = matrix.spiders[1:]

//将数据加到待处理
if _,found:=crawler.Process[spider.Request.UUID];!found {
crawler.Process[spider.Request.UUID] = spider
}

atomic.AddInt32(&matrix.resCount,-1)
return spider
}

5.2 spider

5.2.1 结构

1
2
3
4
5
6
7
type Spider struct {
SpiderName string //Spider名字
SonSpider bool //是否为子SonSpider
Request *julyNet.CrawlRequest

ParseHandle func(node *Xpath.Node,spider *Spider) //解析处理
}
  • ParseHandle:需要自定义的爬取规则。

  • SonSpider:判断该SonSpider是否为子spider

  • Request:爬取目标

  • SpiderName:spider名字

    为什么会是否为子SonSpider的概念?思考下这种模式,想要爬取一个博客的所有详情页数据。那就需要遍历所有的列表页,以及翻页操作。

所以顺序如下:

爬取第一页列表->遍历第一页列表->爬取第二页列表->遍历第二页列表…..这么一直循环下去,知道没有下一页。

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func parse(node *Xpath.Node,spider *JulySpider.Spider)  {

path := Xpath.MustCompile("//*[@id=\"archive-page\"]/section")
it := path.Iter(node)
fmt.Println(GoID)

for it.Next() {
urlPath := Xpath.MustCompile("a/@href")
url,_:= urlPath.String(it.Node())
spider.RunNextStep("http://lastdays.cn"+url,analysisData)
}

fmt.Println("================一页数据==================")
nextPath := Xpath.MustCompile("//*[@id=\"page-nav\"]/a[@class=\"extend next\"]/@href")
if nextPath.Exists(node) {
url,_ := nextPath.String(node)
spider.RunNextStep("http://lastdays.cn"+url,parse)
}
}

func analysisData(node *Xpath.Node,spider *JulySpider.Spider) {
if node == nil {
return
}
fmt.Println(GoID)
titlePath := Xpath.MustCompile("//*[@id=\"main\"]/article/header/h1/a")
title,_:= titlePath.String(node)
fmt.Println(title)
}
  • 获取了第一页数据后,遍历第一页数据
  • 通过RunNextStep生成新的spider,这个spider称之为sonspider,sonspider有自己对应的规则analysisData,以及request。
  • 遍历当前页后,判断是否有下一页,如果有,生成新的sonspider,规则重新指向parse

其实sonspider,作为分支任务的一种体现。

5.2.2 分支任务

分支任务的实现方式,也就是创建sonspider的过程。

1
2
3
4
5
6
7
8
9
10
func (spider *Spider)RunNextStep(url string,nextStep func(node *Xpath.Node,spider *Spider))  {
nextStepSpider := new(Spider)
req := new(julyNet.CrawlRequest)
req.Url = url
req.NotFilter = true
nextStepSpider.Request = req
nextStepSpider.ParseHandle = nextStep
nextStepSpider.SonSpider = true
nextStepSpider.Registered()
}

5.3 总结

这里分支任务的处理、方便的规则自定义方案想了很多种,最终选择这种串行方案配合sonspider实现。目前还有很多缺点。

  • sonspider应该独立存在,spider和sonspider的关系映射管理目前没有。也就是无法感知分支任务的状态。
  • ParseHandle 规则自定义的方式通过fun回调。为了先简单实现功能没有做统一的管理和约束,下一个版本考虑使用interface。
  • sonspider的创建效率过低,影响爬取速度,需要修改
  • sonspider的直连方式不是很友好。不方便后面代码的维护升级。需要优化

六、引擎

负责所以组件的调度、管理工作。

6.1 初始化

负责各个组件的创建,以及通知回调的注册、

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func NewEngine() *Engine {

engine := new(Engine)

//初始化各个组件
engine.taskPool = julyTaskPool.NewTaskPool(100,50,false)
engine.requestQueue = julyScheduler.NewQueue(engine.queuePullHandle,engine.queueAfterPushHandle)
engine.crawler = JulySpider.NewCrawler()
engine.crawler.SetCrawlerHandle(engine.crawlerPullHandle,engine.crawlerPushHandle)
engine.crawler.CrawlerPushRequestHandle = engine.pushRequestToQueue

engine.downLoad = julyNet.NewDownLoad()
engine.downLoad.DownFinishHandle = engine.downFinishHandle

return engine
}

6.2 通知回调

6.2.1 Crawler相关回调

所有的spider在Crawler中注册后,都会抛出一个通知。Engine接收到通知后,会调用crawler的PullSpider提取spider。从这里开始,engine就会用过TaskPool分配一个运行空间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/*Crawler相关处理函数*/
//提取spider处理
func (engine *Engine)crawlerPullHandle(spider *JulySpider.Spider) {
engine.pushRequestToQueue(spider.Request)
}
//spider入队处理
func (engine *Engine)crawlerPushHandle() {
fmt.Println("入队")

engine.taskPool.SubmitTask(func() error {
engine.crawler.PullSpider()
return nil
})
}

6.2.2 Queue相关回调

spider从Crawler被提取后,engine会将spider的Request加入Queue中。

1
2
3
4
5
6
7
8
9
10
/*Queue相关处理函数*/
//队列入队后相关操作
func (engine *Engine)queueAfterPushHandle() {
engine.requestQueue.PullRequest()
}

//队列拉取处理
func (engine *Engine)queuePullHandle(request *julyNet.CrawlRequest) {
engine.downloadHTML(request)
}

6.2.3 Download相关回调

从Queue提取出的Request交给Download处理,Download处理下载结束后会将Response返回。这里会生成对应的根节点,并且交给归属spider处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/*Download相关处理函数*/
//下载完成处理
func (engine *Engine)downFinishHandle(rsp *http.Response,uuid string){
engine.lock.Lock()
body,err:= ioutil.ReadAll(rsp.Body)
if err != nil {
log.Println(err.Error())
return
}

inputReader := strings.NewReader(string(body))
node,err:=Xpath.ParseHTML(inputReader)
if err!=nil {
log.Println(err.Error())
//return
}
spiders := engine.crawler.Process
spider := spiders[uuid]
engine.lock.Unlock()

if spider != nil{
spider.ParseHandleTest(node)
}
}

6.3 流程

  • spider向crawler注册,通知Engine提取
  • Engine提取spider,将Request推送给Queue,同时Queue通知Engine提取未处理的Request。
  • Engine将提取出的Request交给Download处理。
  • Download将下载结果同步给spider,按照spider的规则进行爬取。

6.4 总结

Engine的作用主要负责数据的流转。这里也有很多不足。各个组件通知Engine进行数据的提取应该使用异步,可以通过channel来实现会更好一些。

  • Engine目前不具备监控的能力,需要添加一些状态更新
  • Engine没有停止,暂停,继续等接口,用于爬取控制。