摘要:本篇教程探讨了大数据采集之分布式爬虫系统设计、实现与实战:爬取京东、苏宁易购全网手机商品数据+MySQL、HBase存储,希望阅读本篇文章以后大家有所收获,帮助大家对相关内容的理解更加深入。
本篇教程探讨了大数据采集之分布式爬虫系统设计、实现与实战:爬取京东、苏宁易购全网手机商品数据+MySQL、HBase存储,希望阅读本篇文章以后大家有所收获,帮助大家对相关内容的理解更加深入。
在不用爬虫框架的情况,经过多方学习,尝试实现了一个分布式爬虫系统,并且可以将数据保存到不同地方,类似MySQL、HBase等。
基于面向接口的编码思想来开发,因此这个系统具有一定的扩展性,有兴趣的朋友直接看一下代码,就能理解其设计思想,虽然代码目前来说很多地方还是比较紧耦合,但只要花些时间和精力,很多都是可抽取出来并且可配置化的。
因为时间的关系,我只写了京东和苏宁易购两个网站的爬虫,但是完全可以实现不同网站爬虫的随机调度,基于其代码结构,再写国美、天猫等的商品爬取,难度不大,但是估计需要花很多时间和精力。因为在解析网页的数据时,实际上需要花很多时间,比如我在爬取苏宁易购商品的价格时,价格是异步获取的,并且其api是一长串的数字组合,我花了几个小时的时间才发现其规律,当然也承认,我的经验不足。
这个系统的设计,除了基本的数据爬取以外,更关注以下几个方面的问题:
1.如何实现分布式,同一个程序打包后分发到不同的节点运行时,不影响整体的数据爬取
2.如何实现url随机循环调度,核心是针对不同的顶级域名做随机
3.如何定时向url仓库中添加种子url,达到不让爬虫系统停下来的目的
4.如何实现对爬虫节点程序的监控,并能够发邮件报警
5.如何实现一个随机IP代理库,目的跟第2点有点类似,都是为了反反爬虫
下面会针对这个系统来做一个整体的基本介绍,其实我在代码中都有非常详细的注释,有兴趣的朋友可以参考一下代码,最后我会给出一些我爬虫时的数据分析。
另外需要注意的是,这个爬虫系统是基于Java实现的,但是语言本身仍然不是最重要的,有兴趣的朋友可以尝试用Python实现。
整体系统架构如下:
所以从上面的架构可以看出,整个系统主要分为三个部分:
爬虫系统
URL调度系统
监控报警系统
爬虫系统就是用来爬取数据的,因为系统设计为分布式,因此,爬虫程序本身可以运行在不同的服务器节点上。
url调度系统核心在于url仓库,所谓的url仓库其实就是用Redis保存了需要爬取的url列表,并且在我们的url调度器中根据一定的策略来消费其中的url,从这个角度考虑,url仓库其实也是一个url队列。
监控报警系统主要是对爬虫节点进行监控,虽然并行执行的爬虫节点中的某一个挂掉了对整体数据爬取本身没有影响(只是降低了爬虫的速度),但是我们还是希望知道能够主动接收到节点挂掉的通知,而不是被动地发现。
下面将会针对以上三个方面并结合部分代码片段来对整个系统的设计思路做一些基本的介绍,对系统完整实现有浓厚兴趣的朋友可以直接参考源代码。
(说明:zookeeper监控属于监控报警系统,url调度器属于URL调度系统)
爬虫系统是一个独立运行的进程,我们把我们的爬虫系统打包成jar包,然后分发到不同的节点上执行,这样并行爬取数据可以提高爬虫的效率。
加入随机IP代理主要是为了反反爬虫,因此如果有一个IP代理库,并且可以在构建http客户端时可以随机地使用不同的代理,那么对我们进行反反爬虫则会有很大的帮助。
在系统中使用IP代理库,需要先在文本文件中添加可用的代理地址信息:
# IPProxyRepository.txt58.60.255.104:8118219.135.164.245:312827.44.171.27:9999219.135.164.245:312858.60.255.104:811858.252.6.165:9000......
需要注意的是,上面的代理IP是我在西刺代理上拿到的一些代理IP,不一定可用,建议是自己花钱购买一批代理IP,这样可以节省很多时间和精力去寻找代理IP。
然后在构建http客户端的工具类中,当第一次使用工具类时,会把这些代理IP加载进内存中,加载到Java的一个HashMap:
// IP地址代理库Mapprivate static Map<String, Integer> IPProxyRepository = new HashMap<>();private static String[] keysArray = null; // keysArray是为了方便生成随机的代理对象/** * 初次使用时使用静态代码块将IP代理库加载进set中 */static { InputStream in = HttpUtil.class.getClassLoader().getResourceAsStream("IPProxyRepository.txt"); // 加载包含代理IP的文本 // 构建缓冲流对象 InputStreamReader isr = new InputStreamReader(in); BufferedReader bfr = new BufferedReader(isr); String line = null; try { // 循环读每一行,添加进map中 while ((line = bfr.readLine()) != null) { String[] split = line.split(":"); // 以:作为分隔符,即文本中的数据格式应为192.168.1.1:4893 String host = split[0]; int port = Integer.valueOf(split[1]); IPProxyRepository.put(host, port); } Set<String> keys = IPProxyRepository.keySet(); keysArray = keys.toArray(new String[keys.size()]); // keysArray是为了方便生成随机的代理对象 } catch (IOException e) { e.printStackTrace(); }}
之后,在每次构建http客户端时,都会先到map中看是否有代理IP,有则使用,没有则不使用代理:
CloseableHttpClient httpClient = null;HttpHost proxy = null;if (IPProxyRepository.size() > 0) { // 如果ip代理地址库不为空,则设置代理 proxy = getRandomProxy(); httpClient = HttpClients.custom().setProxy(proxy).build(); // 创建httpclient对象} else { httpClient = HttpClients.custom().build(); // 创建httpclient对象}HttpGet request = new HttpGet(url); // 构建htttp get请求......
随机代理对象则通过下面的方法生成:
/** * 随机返回一个代理对象 * * @return */public static HttpHost getRandomProxy() { // 随机获取host:port,并构建代理对象 Random random = new Random(); String host = keysArray[random.nextInt(keysArray.length)]; int port = IPProxyRepository.get(host); HttpHost proxy = new HttpHost(host, port); // 设置http代理 return proxy;}
这样,通过上面的设计,基本就实现了随机IP代理器的功能,当然,其中还有很多可以完善的地方,比如,当使用这个IP代理而请求失败时,是否可以把这一情况记录下来,当超过一定次数时,再将其从代理库中删除,同时生成日志供开发人员或运维人员参考,这是完全可以实现的,不过我就不做这一步功能了。
网页下载器就是用来下载网页中的数据,主要基于下面的接口开发:
/** * 网页数据下载 */public interface IDownload { /** * 下载给定url的网页数据 * @param url * @return */ public Page download(String url);}
基于此,在系统中只实现了一个http get的下载器,但是也可以完成我们所需要的功能了:
/** * 数据下载实现类 */public class HttpGetDownloadImpl implements IDownload { @Override public Page download(String url) { Page page = new Page(); String content = HttpUtil.getHttpContent(url); // 获取网页数据 page.setUrl(url); page.setContent(content); return page; }}
网页解析器就是把下载的网页中我们感兴趣的数据解析出来,并保存到某个对象中,供数据存储器进一步处理以保存到不同的持久化仓库中,其基于下面的接口进行开发:
/** * 网页数据解析 */public interface IParser { public void parser(Page page);}
网页解析器在整个系统的开发中也算是比较重头戏的一个组件,功能不复杂,主要是代码比较多,针对不同的商城不同的商品,对应的解析器可能就不一样了,因此需要针对特别的商城的商品进行开发,因为很显然,京东用的网页模板跟苏宁易购的肯定不一样,天猫用的跟京东用的也肯定不一样,所以这个完全是看自己的需要来进行开发了,只是说,在解析器开发的过程当中会发现有部分重复代码,这时就可以把这些代码抽象出来开发一个工具类了。
目前在系统中爬取的是京东和苏宁易购的手机商品数据,因此与就写了这两个实现类:
/** * 解析京东商品的实现类 */public class JDHtmlParserImpl implements IParser { ......}/** * 苏宁易购网页解析 */public class SNHtmlParserImpl implements IParser { ......}
数据存储器主要是将网页解析器解析出来的数据对象保存到不同的,而对于本次爬取的手机商品,数据对象是下面一个Page对象:
/** * 网页对象,主要包含网页内容和商品数据 */public class Page { private String content; // 网页内容 private String id; // 商品Id private String source; // 商品来源 private String brand; // 商品品牌 private String title; // 商品标题 private float price; // 商品价格 private int commentCount; // 商品评论数 private String url; // 商品地址 private String imgUrl; // 商品图片地址 private String params; // 商品规格参数 private List<String> urls = new ArrayList<>(); // 解析列表页面时用来保存解析的商品url的容器}
对应的,在MySQL中,表数据结构如下:
-- ------------------------------ Table structure for phone-- ----------------------------DROP TABLE IF EXISTS `phone`;CREATE TABLE `phone` ( `id` varchar(30) CHARACTER SET armscii8 NOT NULL COMMENT '商品id', `source` varchar(30) NOT NULL COMMENT '商品来源,如jd suning gome等', `brand` varchar(30) DEFAULT NULL COMMENT '手机品牌', `title` varchar(255) DEFAULT NULL COMMENT '商品页面的手机标题', `price` float(10,2) DEFAULT NULL COMMENT '手机价格', `comment_count` varchar(30) DEFAULT NULL COMMENT '手机评论', `url` varchar(500) DEFAULT NULL COMMENT '手机详细信息地址', `img_url` varchar(500) DEFAULT NULL COMMENT '图片地址', `params` text COMMENT '手机参数,json格式存储', PRIMARY KEY (`id`,`source`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;
而在HBase中的表结构则为如下:
## cf1 存储 id source price comment brand url## cf2 存储 title params imgUrlcreate 'phone', 'cf1', 'cf2'## 在HBase shell中查看创建的表hbase(main):135:0> desc 'phone'Table phone is ENABLED phone COLUMN FAMILIES DESCRIPTION {NAME => 'cf1', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'} {NAME => 'cf2', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'} 2 row(s) in 0.0350 seconds
即在HBase中建立了两个列族,分别为cf1、cf2,其中cf1用来保存id source price comment brand url字段信息,cf2用来保存title params imgUrl字段信息。
不同的数据存储用的是不同的实现类,但是其都是基于下面同一个接口开发的:
/** * 商品数据的存储 */public interface IStore { public void store(Page page);}
然后基于此开发了MySQL的存储实现类、HBase的存储实现类还有控制台的输出实现类,如MySQL的存储实现类,其实就是简单的数据插入语句:
/** * 使用dbc数据库连接池将数据写入mysql表中 */public class MySQLStoreImpl implements IStore { private QueryRunner queryRunner = new QueryRunner(DBCPUtil.getDataSource()); @Override public void store(Page page) { String sql = "insert into phone(id, source, brand, title, price, comment_count, url, img_url, params) values(?, ?, ?, ?, ?, ?, ?, ?, ?)"; try { queryRunner.update(sql, page.getId(), page.getSource(), page.getBrand(), page.getTitle(), page.getPrice(), page.getCommentCount(), page.getUrl(), page.getImgUrl(), page.getParams()); } catch (SQLException e) { e.printStackTrace(); } }}
而HBase的存储实现类,则是HBase Java API的常用插入语句代码:
......// cf1:pricePut pricePut = new Put(rowKey);// 必须要做是否为null判断,否则会有空指针异常pricePut.addColumn(cf1, "price".getBytes(), page.getPrice() != null ? String.valueOf(page.getPrice()).getBytes() : "".getBytes());puts.add(pricePut);// cf1:commentPut commentPut = new Put(rowKey);commentPut.addColumn(cf1, "comment".getBytes(), page.getCommentCount() != null ? String.valueOf(page.getCommentCount()).getBytes() : "".getBytes());puts.add(commentPut);// cf1:brandPut brandPut = new Put(rowKey);brandPut.addColumn(cf1, "brand".getBytes(), page.getBrand() != null ? page.getBrand().getBytes() : "".getBytes());puts.add(brandPut);......
当然,至于要将数据存储在哪个地方,在初始化爬虫程序时,是可以手动选择的:
// 3.注入存储器iSpider.setStore(new HBaseStoreImpl());
目前还没有把代码写成可以同时存储在多个地方,按照目前代码的架构,要实现这一点也比较简单,修改一下相应代码就好了。实际上,是可以先把数据保存到MySQL中,然后通过Sqoop导入到HBase中,详细操作可以参考我写的Sqoop文章。
仍然需要注意的是,如果确定需要将数据保存到HBase中,请保证你有可用的集群环境,并且需要将如下配置文档添加到classpath下:
core-site.xmlhbase-site.xmlhdfs-site.xml
对大数据感兴趣的同学可以折腾一下这一点,如果之前没有接触过的,直接使用MySQL存储就好了,只需要在初始化爬虫程序时注入MySQL存储器即可:
// 3.注入存储器iSpider.setStore(new MySQLStoreImpl());
URL调度系统是实现整个爬虫系统分布式的桥梁与关键,正是通过URL调度系统的使用,才使得整个爬虫系统可以较为高效(Redis作为存储)随机地获取url,并实现整个系统的分布式。
通过架构图可以看出,所谓的URL仓库不过是Redis仓库,即在我们的系统中使用Redis来保存url地址列表,正是这样,才能保证我们的程序实现分布式,只要保存了url是唯一的,这样不管我们的爬虫程序有多少个,最终保存下来的数据都是只有唯一一份的,而不会重复,是通过这样来实现分布式的。
同时url仓库中的url地址在获取时的策略是通过队列的方式来实现的,待会通过URL调度器的实现即可知道。
另外,在我们的url仓库中,主要保存了下面的数据:
种子URL列表
Redis的数据类型为list。
种子URL是持久化存储的,一定时间后,由URL定时器通过种子URL获取URL,并将其注入到我们的爬虫程序需要使用的高优先级URL队列中,这样就可以保存我们的爬虫程序可以源源不断地爬取数据而不需要中止程序的执行。
高优先级URL队列
Redis的数据类型为set。
什么是高优先级URL队列?其实它就是用来保存列表url的。
那么什么是列表url呢?
说白了就是一个列表中含有多个商品,以京东为列,我们打开一个手机列表为例:
该地址中包含的不是一个具体商品的url,而是包含了多个我们需要爬取的数据(手机商品)的列表,通过对每个高级url的解析,我们可以获取到非常多的具体商品url,而具体的商品url,就是低优先url,其会保存到低优先级URL队列中。
那么以这个系统为例,保存的数据类似如下:
jd.com.higher --https://list.jd.com/list.html?cat=9987,653,655&page=1 ... suning.com.higher --https://list.suning.com/0-20006-0.html ...
低优先级URL队列
Redis的数据类型为set。
低优先级URL其实就是具体某个商品的URL,如下面一个手机商品:
通过下载该url的数据,并对其进行解析,就能够获取到我们想要的数据。
那么以这个系统为例,保存的数据类似如下:
jd.com.lower --https://item.jd.com/23545806622.html ...suning.com.lower --https://product.suning.com/0000000000/690128156.html ...
所谓url调度器,其实说白了就是url仓库java代码的调度策略,不过因为其核心在于调度,所以将其放到URL调度器中来进行说明,目前其调度基于以下接口开发:
/** * url 仓库 * 主要功能: * 向仓库中添加url(高优先级的列表,低优先级的商品url) * 从仓库中获取url(优先获取高优先级的url,如果没有,再获取低优先级的url) * */public interface IRepository { /** * 获取url的方法 * 从仓库中获取url(优先获取高优先级的url,如果没有,再获取低优先级的url) * @return */ public String poll(); /** * 向高优先级列表中添加商品列表url * @param highUrl */ public void offerHigher(String highUrl); /** * 向低优先级列表中添加商品url * @param lowUrl */ public void offerLower(String lowUrl);}
其基于Redis作为URL仓库的实现如下:
/** * 基于Redis的全网爬虫,随机获取爬虫url: * * Redis中用来保存url的数据结构如下: * 1.需要爬取的域名集合(存储数据类型为set,这个需要先在Redis中添加) * key * spider.website.domains * value(set) * jd.com suning.com gome.com * key由常量对象SpiderConstants.SPIDER_WEBSITE_DOMAINS_KEY 获得 * 2.各个域名所对应的高低优先url队列(存储数据类型为list,这个由爬虫程序解析种子url后动态添加) * key * jd.com.higher * jd.com.lower * suning.com.higher * suning.com.lower * gome.com.higher * gome.come.lower * value(list) * 相对应需要解析的url列表 * key由随机的域名 + 常量 SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX或者SpiderConstants.SPIDER_DOMAIN_LOWER_SUFFIX获得 * 3.种子url列表 * key * spider.seed.urls * value(list) * 需要爬取的数据的种子url * key由常量SpiderConstants.SPIDER_SEED_URLS_KEY获得 * * 种子url列表中的url会由url调度器定时向高低优先url队列中 */public class RandomRedisRepositoryImpl implements IRepository { /** * 构造方法 */ public RandomRedisRepositoryImpl() { init(); } /** * 初始化方法,初始化时,先将redis中存在的高低优先级url队列全部删除 * 否则上一次url队列中的url没有消耗完时,再停止启动跑下一次,就会导致url仓库中有重复的url */ public void init() { Jedis jedis = JedisUtil.getJedis(); Set<String> domains = jedis.smembers(SpiderConstants.SPIDER_WEBSITE_DOMAINS_KEY); String higherUrlKey; String lowerUrlKey; for(String domain : domains) { higherUrlKey = domain + SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX; lowerUrlKey = domain + SpiderConstants.SPIDER_DOMAIN_LOWER_SUFFIX; jedis.del(higherUrlKey, lowerUrlKey); } JedisUtil.returnJedis(jedis); } /** * 从队列中获取url,目前的策略是: * 1.先从高优先级url队列中获取 * 2.再从低优先级url队列中获取 * 对应我们的实际场景,应该是先解析完列表url再解析商品url * 但是需要注意的是,在分布式多线程的环境下,肯定是不能完全保证的,因为在某个时刻高优先级url队列中 * 的url消耗完了,但实际上程序还在解析下一个高优先级url,此时,其它线程去获取高优先级队列url肯定获取不到 * 这时就会去获取低优先级队列中的url,在实际考虑分析时,这点尤其需要注意 * @return */ @Override public String poll() { // 从set中随机获取一个顶级域名 Jedis jedis = JedisUtil.getJedis(); String randomDomain = jedis.srandmember(SpiderConstants.SPIDER_WEBSITE_DOMAINS_KEY); // jd.com String key = randomDomain + SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX; // jd.com.higher String url = jedis.lpop(key); if(url == null) { // 如果为null,则从低优先级中获取 key = randomDomain + SpiderConstants.SPIDER_DOMAIN_LOWER_SUFFIX; // jd.com.lower url = jedis.lpop(key); } JedisUtil.returnJedis(jedis); return url; } /** * 向高优先级url队列中添加url * @param highUrl */ @Override public void offerHigher(String highUrl) { offerUrl(highUrl, SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX); } /** * 向低优先url队列中添加url * @param lowUrl */ @Override public void offerLower(String lowUrl) { offerUrl(lowUrl, SpiderConstants.SPIDER_DOMAIN_LOWER_SUFFIX); } /** * 添加url的通用方法,通过offerHigher和offerLower抽象而来 * @param url 需要添加的url * @param urlTypeSuffix url类型后缀.higher或.lower */ public void offerUrl(String url, String urlTypeSuffix) { Jedis jedis = JedisUtil.getJedis(); String domain = SpiderUtil.getTopDomain(url); // 获取url对应的顶级域名,如jd.com String key = domain + urlTypeSuffix; // 拼接url队列的key,如jd.com.higher jedis.lpush(key, url); // 向url队列中添加url JedisUtil.returnJedis(jedis); }}
通过代码分析也是可以知道,其核心就在如何调度url仓库(Redis)中的url。
一段时间后,高优先级URL队列和低优先URL队列中的url都会被消费完,为了让程序可以继续爬取数据,同时减少人为的干预,可以预先在Redis中插入种子url,之后定时让URL定时器从种子url中取出url定存放到高优先级
本文由职坐标整理发布,学习更多的相关知识,请关注职坐标IT知识库!
您输入的评论内容中包含违禁敏感词
我知道了
请输入正确的手机号码
请输入正确的验证码
您今天的短信下发次数太多了,明天再试试吧!
我们会在第一时间安排职业规划师联系您!
您也可以联系我们的职业规划师咨询:
版权所有 职坐标-一站式IT培训就业服务领导者 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
沪公网安备 31011502005948号