分布式爬虫
中级版-问题列表-待完善
1.UI
1.1 数据来源不够丰富,目前来源于文件不够灵活。
增加来源:
1)数据库可以添加
2)web界面可以自由添加任务
2.TaskSchedule
2.1 恢复机制进行升级解耦,用redis作为中间存储解耦
恢复机制很麻烦,是由于解耦不当造成的,需要持久化的数据放到了第三方内存数据库中
比如:doneURL,doneTask等都放到了内存中,导致关机或是重启进程均丢失
故要将该性质的数据进行独立存储,则关机或关进程就不需要恢复了
之前的schedule:1)由UIManager存入任务,形成任务池,然后其依据FIFO策略,将任务分发给下载线程。(但由于项目本身的特点,5个页面的url一直是可以快速循环迭代的,所以不必要进行恢复)
2)saveNewsEntityUrlSet:存储历史已入库任务的URL记录,用来作为数据入库时候的判重set使用。
3)监控日志中的当天共采集多少条数据,以及历史共采集多少条数据
持久化中间件选择:
1)mysql:关系型数据库,查询相对较慢,对数据库压力也大
2)redis:内存型数据库,查询块,无压力
1)进程内存和redis各放一份同等的数据,由内存往redis里同步数据
优点:1))简化了恢复过程
缺点:1))同步需要周期,中间会有丢失数据的可能性。如果宕机的话会丢失数据。
2))还依然需要恢复数据到进程内存,因为实际使用还是进程内存的数据
2)直接放在redis中,不在进程内存中放
优点:简单设计简洁
缺点:1))与redis直接通讯效率会低于进程自身内存
2))redis的IO请求会显著提高。会产生一定的redis请求压力
3.Download
3.1 解耦调用流程
将原先的download-->parser-->persistence
改为download-->parser,并将persistence独立出来。
3.2 分布式爬虫
1)由多个独立的进程共同完成一件事情
2)分布式爬虫的设计
*1)master/slave 角色定义/角色称呼
角色对应的功能:
master:UIManager,TaskSchedule,PersistenceManager
slave:DownloadManger,ParserManager
middleware(中间件)
第1个:作为中间媒介人,用来存储master需要下放给slave的任务列表。
第2个:用来存储slave给master返回的解析完的结果对象(NewsItemEntity)
*2)无中心化/去中心化
4.Parser
4.1 正则,Jsoup工具类
4.2 新增三个字段:标题,发布时间,URL,作者,来源,正文
则最终成为7个字段,分别为:标题,发布时间,URL,作者,来源,正文,插入时间
差异点:
之前是种子任务,现在是两种任务,一个是种子任务,一个是要采集出来的真正的实体数据
第1种设计方法:
两种任务对于TaskSchedule来讲,相当于平行复制了一份,里面的对象发生了改变
相当于两个池子,分发的时候各自去取不同种类的任务去下载,在解析的时候也要有区分,以及是不是要存储
第2种设计方法:
将2种任务化简成主线程单线程完成种子任务采集,由后边的多线程完成实体数据的采集。
5.Persistence
5.1 mysql版
5.2 elasticsearch版
WebSpiderAdvanced4job001
复制过来之后要修改git
在网页重新搞一个project,然后项目右键->team->share....
然后commit的时候把.文件都不选,其他都选然后注释,commit
com.tianliangedu.job001.controller
SystemController();
com.tianliangedu.job001.iface.download
com.tianliangedu.job001.iface.parser
com.tianliangedu.job001.iface.persister
DataPersistManager(); //数据持久化管理器
com.tianliangedu.job001.monitor
com.tianliangedu.job001.parser
com.tianliangedu.job001.persistence
com.tianliangedu.job001.pojos
com.tianliangedu.job001.pojos.entity
com.tianliangedu.job001.schedule
com.tianliangedu.job001.ui
com.tianliangedu.job001.utils
public class SystemController{
//将log4j配置文件放到jar包外边的路径更新
static{
PropertyConfigurator.configure(System.getProperites);
}
logger
//主线程任务采集
psvm(){
//1.起动UIManager,注入种子任务
//2.起动下载程序,解析
//3.起动系统监控管理器
//周期执行
}
//addSeedUrlsToTaskSchedule(){
//将给定的种子任务先进行采集和解析,将二级任务传递给任务管理器,去调度
parseSeedUrlsTaskToSchedule(){
//定义种子文件路径
String datafilePath = "";
//将种子任务从种子任务中读取出来,形成种子任务集合
List<UrlTaskPojo> seedUrlPojoList = UIManager(xxx);
//将任务不直接放调度器,而是先逐个种子url采集和解析,将解析出来的子任务再添加到TaskSchedule里面去。
for(UrlTaskPojo urlTaskPojo:seedUrlPojoList){
//遍历集合,拿到每一页的URL封装的对象
String htmlSource = downLoadInterface.download(urlTaskPojo.getUrl());
List<NewsItemEntity> itemEnityList = xxparser;
for(NewsItemEntity itemEntity:itemEnityList){
sout(itemEntity.toString());
}
}
}
}
UIManager{
//实例化一个网页下载接口的实现类,单线程
DownLoadInterface ....
start();
}
public class DataPersistManager{
//将实现类初始化
public static DataPersistentceInterface persistenceInterface = new DataPersist4MysqlImpl();
public static boolean persist(List<NewsItemEntity> itemList){
boolean flag = persistenceInterface.persist(itemList);
}
}
UrlTaskPojo{
//new
private TaskTypeEnum taskType = TaskTypeEnum.ROOT_URL;//默认是根url
public static enum TaskTypeEnum{
ROOT_URL,CRAWL_TASK
}
}
HtmlParserManager{
List<UrlTaskPojo> parserHtmlSource4rootUrl(){
return xxx
}
}
NewsItemParserInterface{
//new
public List<NewsItemEntity> parserHtmlSource4rootUrl();
}
分布式设计方法:
redis:
TaskScheduleManager:
savedNewsEntityUrlSet
pom.xml
<!-- https://mvnrepository..... -->
redis.clients jedis 2.8.2
RedisOpenUtil{
private Jedis jedis;
public static RedisOpenUtil getInstance(){
return new redisOpenUtil = new RedisOpenUtil(xxxx);
}
setter and getter
public RedisOpenUtil(String ip,int port,String password){
//连接本地的Redis服务
jedis = new Jedis(ip,port);
jedis.auth(password);
}
public Jedis getJedis(){
retrun jedis;
}
public String get(String key){
String val = jedis.get(key);
if(val == null){
val = "xxx";
}
return val;
}
public void set(String key,String value){
jedis.set(key,value);
}
psvm(){
String ip =.
int port
password
RedisOpenUtil
redisOpenUtil.set(xx);
sout(jedis.ping()); //结果为PONG则是通的
sout();
}
}
spider.properties:
#redis配置参数
redis_ip=xxx
redis_port=6379
redis_password=xxx
SystemController
静态声明调用配置文件
TaskScheduleManager:
//redis工具类初始化
public static RedisOpenUtil redisOpenUtil = RedisOpenUtil.getInstance();
addSavedNewsEntityUrlSet(xx){
redisOpenUtil.getJedis().sadd(uniqUrlSetKey,savedUrl);
//getJedis().sismember. 判断key,value是不是在库里面
}
//取得其长度
getSavedNewsEntityUrlSetSize(){
getJedis().scard(key);
}
缓存穿透:
1.缓存没有命中
原因:往往是缓存冷启动问题;(第一次起动)
解决方法:预计算
缓存不一致:
1.缓存数据与你的之前一般持久化数据不一致了。
原因:代码有bug
清空数据之类的人为导致两者不一致了
分布式设计:
spider.properties
#定义是master还是slave节点:true是master
is_master=true
SystemConfigParas
读取爬虫节点角色
boolean is_master
SystemController
//在第一步之前
if(SystemConfigParas.is_master){
//主节点
//启动系统监控管理器
//周期执行
int circleCounter = 1;
while(true){
//添加任务
}
}else{
//子节点
//启动下载进程
}
TaskScheduleManager
//redis中与队列对应的list结构的key声明:
public static String todoTaskPojoListKey = "todo_task_pojo_list_key";
redisOpenUtil.getJedis().lpush();
ObjectAndByteArrayConvertor:
java对象与字节数组的转化工具类
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeObject(urlTaskPojo);
oos.flush();
byte[] objByteArray = baos.toByteArray();
addUrlTaskPojoList(List<UrlPojo> todoAddTaskList){
//将对象转成byte数组存到redis中
for(xx xxx: xxList){
byte[] byteArray = ToByteArray(xxx);
redisOpenUtil.getJedis().lpush(todoTaskPojoListKey.getBytes("utf-8",byteArray));
}
}
UrlTaskPojo implements Serializable{//序列化标志口
}
es实现:
DataPersist4Esimpl
persist(对象类){
}
pom.xml
5个依赖:
esxxx
log4j
sl4j
io.search
...
log4j.proerties
log4j2.proerties
工具类:
TransportClientUtil{
//声明一个client变量
public TransportClient client = null;
public TransportClientUtil() throws Exception(){
init();
}
//初始化TransportClient
public void init() throws Exception(){
/*
配置相关参数,包括集群名称:标识连接哪个es集群,是否开启嗅探
其嗅探功能使用效果并不理想,还是以直接显式添加服务节点
*/
Settings settings = Settings.builder()
.put("cluster.name","tianliangedu")
.put("client.transport.sniff",true);
//将参数应用到某个client对象中
}
//将指定的map对象索引到指定的索引名称和类型当中
public void addOneDocument(String indexName,String typeName,Map map){
//通过map定义kv结构数据对象
Map xxx
//将数据发送到服务器端
this.client.prepareIndex(indexName,typeName,xx)
.execute().actionGet();
}
}