满天星
Fork me on GitHub

天亮爬虫学习笔记03

分布式爬虫

中级版-问题列表-待完善
1.UI
    1.1 数据来源不够丰富,目前来源于文件不够灵活。
        增加来源:
            1)数据库可以添加
            2)web界面可以自由添加任务
2.TaskSchedule
    2.1 恢复机制进行升级解耦,用redis作为中间存储解耦
        恢复机制很麻烦,是由于解耦不当造成的,需要持久化的数据放到了第三方内存数据库中
        比如:doneURL,doneTask等都放到了内存中,导致关机或是重启进程均丢失
        故要将该性质的数据进行独立存储,则关机或关进程就不需要恢复了
    之前的schedule:1)由UIManager存入任务,形成任务池,然后其依据FIFO策略,将任务分发给下载线程。(但由于项目本身的特点,5个页面的url一直是可以快速循环迭代的,所以不必要进行恢复)
                  2)saveNewsEntityUrlSet:存储历史已入库任务的URL记录,用来作为数据入库时候的判重set使用。
                  3)监控日志中的当天共采集多少条数据,以及历史共采集多少条数据
    持久化中间件选择:
                  1)mysql:关系型数据库,查询相对较慢,对数据库压力也大
                  2)redis:内存型数据库,查询块,无压力
                    1)进程内存和redis各放一份同等的数据,由内存往redis里同步数据
                    优点:1))简化了恢复过程
                    缺点:1))同步需要周期,中间会有丢失数据的可能性。如果宕机的话会丢失数据。
                         2))还依然需要恢复数据到进程内存,因为实际使用还是进程内存的数据
                    2)直接放在redis中,不在进程内存中放
                    优点:简单设计简洁
                    缺点:1))与redis直接通讯效率会低于进程自身内存
                         2))redis的IO请求会显著提高。会产生一定的redis请求压力


3.Download
    3.1 解耦调用流程
        将原先的download-->parser-->persistence
        改为download-->parser,并将persistence独立出来。
    3.2 分布式爬虫
        1)由多个独立的进程共同完成一件事情
        2)分布式爬虫的设计
            *1)master/slave  角色定义/角色称呼
                角色对应的功能:
                    master:UIManager,TaskSchedule,PersistenceManager
                    slave:DownloadManger,ParserManager
                    middleware(中间件)
                    第1个:作为中间媒介人,用来存储master需要下放给slave的任务列表。
                    第2个:用来存储slave给master返回的解析完的结果对象(NewsItemEntity)

            *2)无中心化/去中心化
4.Parser
    4.1 正则,Jsoup工具类
    4.2 新增三个字段:标题,发布时间,URL,作者,来源,正文
        则最终成为7个字段,分别为:标题,发布时间,URL,作者,来源,正文,插入时间
        差异点:
            之前是种子任务,现在是两种任务,一个是种子任务,一个是要采集出来的真正的实体数据
        第1种设计方法:
            两种任务对于TaskSchedule来讲,相当于平行复制了一份,里面的对象发生了改变
            相当于两个池子,分发的时候各自去取不同种类的任务去下载,在解析的时候也要有区分,以及是不是要存储
        第2种设计方法:
            将2种任务化简成主线程单线程完成种子任务采集,由后边的多线程完成实体数据的采集。

5.Persistence
    5.1 mysql版
    5.2 elasticsearch版

WebSpiderAdvanced4job001
复制过来之后要修改git
在网页重新搞一个project,然后项目右键->team->share....
然后commit的时候把.文件都不选,其他都选然后注释,commit

com.tianliangedu.job001.controller
    SystemController();
com.tianliangedu.job001.iface.download
com.tianliangedu.job001.iface.parser
com.tianliangedu.job001.iface.persister
    DataPersistManager();  //数据持久化管理器
com.tianliangedu.job001.monitor
com.tianliangedu.job001.parser
com.tianliangedu.job001.persistence
com.tianliangedu.job001.pojos
com.tianliangedu.job001.pojos.entity
com.tianliangedu.job001.schedule
com.tianliangedu.job001.ui
com.tianliangedu.job001.utils



public class SystemController{
    //将log4j配置文件放到jar包外边的路径更新
    static{
        PropertyConfigurator.configure(System.getProperites);
    }

    logger

    //主线程任务采集
    psvm(){
        //1.起动UIManager,注入种子任务
        //2.起动下载程序,解析
        //3.起动系统监控管理器



        //周期执行
    }

    //addSeedUrlsToTaskSchedule(){
    //将给定的种子任务先进行采集和解析,将二级任务传递给任务管理器,去调度
    parseSeedUrlsTaskToSchedule(){
        //定义种子文件路径
        String datafilePath = "";
        //将种子任务从种子任务中读取出来,形成种子任务集合
        List<UrlTaskPojo> seedUrlPojoList = UIManager(xxx);
        //将任务不直接放调度器,而是先逐个种子url采集和解析,将解析出来的子任务再添加到TaskSchedule里面去。
        for(UrlTaskPojo urlTaskPojo:seedUrlPojoList){
            //遍历集合,拿到每一页的URL封装的对象
            String htmlSource = downLoadInterface.download(urlTaskPojo.getUrl());
            List<NewsItemEntity> itemEnityList = xxparser;
            for(NewsItemEntity itemEntity:itemEnityList){
                sout(itemEntity.toString());
            }
        }

    }

}

UIManager{
    //实例化一个网页下载接口的实现类,单线程
    DownLoadInterface ....

    start();

}

public class DataPersistManager{
    //将实现类初始化
    public static DataPersistentceInterface persistenceInterface = new DataPersist4MysqlImpl();
    public static boolean persist(List<NewsItemEntity> itemList){
        boolean flag = persistenceInterface.persist(itemList);
    }

}


UrlTaskPojo{
    //new
    private TaskTypeEnum taskType = TaskTypeEnum.ROOT_URL;//默认是根url

    public static enum TaskTypeEnum{
        ROOT_URL,CRAWL_TASK
    }

}


HtmlParserManager{
    List<UrlTaskPojo> parserHtmlSource4rootUrl(){
        return xxx
    }
}

NewsItemParserInterface{
    //new    
    public List<NewsItemEntity> parserHtmlSource4rootUrl();
}



分布式设计方法:

redis:
TaskScheduleManager:
savedNewsEntityUrlSet

pom.xml

<!-- https://mvnrepository..... -->
redis.clients       jedis    2.8.2

RedisOpenUtil{
    private Jedis jedis;

    public static RedisOpenUtil getInstance(){
        return new redisOpenUtil = new RedisOpenUtil(xxxx);
    }

    setter and getter

    public RedisOpenUtil(String ip,int port,String password){
        //连接本地的Redis服务
        jedis = new Jedis(ip,port);
        jedis.auth(password);
    }

    public Jedis getJedis(){
        retrun jedis;
    }

    public String get(String key){
        String val = jedis.get(key);
        if(val == null){
            val = "xxx"; 
        }
        return val;
    }

    public void set(String key,String value){
        jedis.set(key,value);
    }

    psvm(){
        String ip =. 
        int port
        password
        RedisOpenUtil 
        redisOpenUtil.set(xx);

        sout(jedis.ping());   //结果为PONG则是通的
        sout();
    }

}

spider.properties:

#redis配置参数
redis_ip=xxx
redis_port=6379
redis_password=xxx

SystemController 
静态声明调用配置文件

TaskScheduleManager:
//redis工具类初始化
public static RedisOpenUtil redisOpenUtil = RedisOpenUtil.getInstance();

addSavedNewsEntityUrlSet(xx){
    redisOpenUtil.getJedis().sadd(uniqUrlSetKey,savedUrl);
    //getJedis().sismember. 判断key,value是不是在库里面
}

//取得其长度
getSavedNewsEntityUrlSetSize(){
    getJedis().scard(key);
}

缓存穿透:
    1.缓存没有命中
        原因:往往是缓存冷启动问题;(第一次起动)
        解决方法:预计算

缓存不一致:
    1.缓存数据与你的之前一般持久化数据不一致了。
        原因:代码有bug
             清空数据之类的人为导致两者不一致了



分布式设计:

spider.properties

#定义是master还是slave节点:true是master
is_master=true

SystemConfigParas

读取爬虫节点角色
boolean is_master


SystemController
//在第一步之前
if(SystemConfigParas.is_master){
    //主节点
    //启动系统监控管理器
    //周期执行
    int circleCounter = 1;
    while(true){
        //添加任务
    }

}else{
    //子节点
    //启动下载进程

}

TaskScheduleManager

//redis中与队列对应的list结构的key声明:
public static String todoTaskPojoListKey = "todo_task_pojo_list_key";


redisOpenUtil.getJedis().lpush(); 

ObjectAndByteArrayConvertor:
java对象与字节数组的转化工具类

ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeObject(urlTaskPojo);
oos.flush();
byte[] objByteArray = baos.toByteArray();

addUrlTaskPojoList(List<UrlPojo> todoAddTaskList){
    //将对象转成byte数组存到redis中
    for(xx xxx: xxList){
        byte[] byteArray = ToByteArray(xxx);
        redisOpenUtil.getJedis().lpush(todoTaskPojoListKey.getBytes("utf-8",byteArray));  

    }
}


UrlTaskPojo implements Serializable{//序列化标志口
}


es实现:
DataPersist4Esimpl

persist(对象类){

}

pom.xml
5个依赖:
esxxx
log4j
sl4j
io.search
...


log4j.proerties
log4j2.proerties

工具类:
TransportClientUtil{
    //声明一个client变量
    public TransportClient client = null;
    public TransportClientUtil() throws Exception(){
        init();
    }

    //初始化TransportClient
    public void init() throws Exception(){
        /*
        配置相关参数,包括集群名称:标识连接哪个es集群,是否开启嗅探
        其嗅探功能使用效果并不理想,还是以直接显式添加服务节点
        */
        Settings settings = Settings.builder()
                        .put("cluster.name","tianliangedu")
                        .put("client.transport.sniff",true);
        //将参数应用到某个client对象中

    }

    //将指定的map对象索引到指定的索引名称和类型当中
    public void addOneDocument(String indexName,String typeName,Map map){
        //通过map定义kv结构数据对象
        Map xxx

        //将数据发送到服务器端
        this.client.prepareIndex(indexName,typeName,xx)
                    .execute().actionGet();
    }

}
-------------本文结束期待您的评论-------------