好久没更新 blog 了。想了想大概一个星期之前弄了一个分布式爬虫。大概可以写(水一发)。感谢@ntzyz赞助了三个 VPS。
起因是因为某daailou想要看看B站到底有多少人充值了信仰。所以我就搞了个爬虫试了试。试的时候发现B站已经添加了反爬机制。所以只能利用多 IP 分布式来爬信息了。
Python 版本Python3.5,使用的包:requests,pymysql
首先我们得解决信息来源的问题。经过一番简单的查看。我们发现B站的大会员信息接口如下:http://space.bilibili.com/ajax/member/getVipStatus。
我们依然使用的是requests包。
params = {"mid":uid} r = requests.get(vipurl,params = {"mid":uid},headers = head);
我们使用json包对该接口的数据进行读取之后就可以判断这个用户是不是大会员。
这里需要注意的一点就是B站有些ID是空的。会产生错误。另外,由于添加了反爬机制,偶尔也会产生错误。
我们使用try…catch…来过滤掉信息。
try: info = json.loads(r.text); except: print("I'm killed. retrying...") return False; try: viptype = info["data"]["vipType"]; except: return True;
如果包是正确的,那么 json 包是可以解析的,而被B站反爬机制干掉的时候,会返回一个错误页面。json 包对此无能为力,就会报错。而如果该 ID 为空。那么则会返回一个 json 包,里面写着 status:false之类的字样,也是可以被 json 解析的,但是因为没有 vipType 字段,也会报错。此时,我们将其当作正常处理的数据略过即可。
接下来我们只需要读取大会员用户的等级即可。等级存放的接口是:http://space.bilibili.com/ajax/member/GetInfo
if(viptype==2): r = requests.post(levelurl,data = params,headers = head); info = json.loads(r.text); level = info["data"]["level_info"]["current_level"]; ansl.append({"uid":uid,"level":level}) return True
单个用户的查询就此完成。接下来我们需要一个主程序来分发任务。最基础的部分可以参考:Python 分布式计算 。我们接下来为以此为基础进行拓展。
首先,我们使用multiprocessing模块建立分布式队列。Python为其提供了锁。所以使用它可以方便的实现任务分发。
task_queue = queue.Queue(task_number); result_queue = queue.Queue(task_number); def gettask(): return task_queue; def getresult(): return result_queue; BaseManager.register('get_task',callable = gettask); BaseManager.register('get_result',callable = getresult); manager = BaseManager(address = ('0.0.0.0',5011),authkey = b'kasora'); manager.start(); task = manager.get_task(); result = manager.get_result(); for i in range(task_number): task.put(i);
这样我们就在本地服务器的5011端口暴露了一个队列接口以供其他分布式终端拉取信息。
由于B站用户量实在太多,另外我们也要考虑某些分布式终端在运行中不可预知的崩溃导致其领取了任务却没有完成。
我们的解决方案是。除了分发任务的队列之外,我们建立一个保存剩余任务的 set ,每次任务完成之后在 set 中删除对应的任务编号,这样,在队列中的任务被全部领取完毕之后,我们对 set 进行检索,如果还有未完成的任务,我们就将其再次添加到任务队列中,直到所有任务都被完成为止。
最后将结果保存到数据库中,以供查询。
while(len(lastSet)>0): time.sleep(60); print(str(len(lastSet))+" left") conn = pymysql.connect(host="127.0.0.1",user=user,passwd=passwd,db=db,use_unicode=True, charset="utf8") cur = conn.cursor() for i in range(result.qsize()): ans = result.get(); try: lastSet.remove(int(ans[0])); except KeyError: continue; if(len(ans[1])==0): pass; else: for userinfo in ans[1]: insertsql = 'insert into vipuserinfo (uid, ulevel) values (%s, %s)'; cur.execute(insertsql, [userinfo["uid"], userinfo["level"]]); conn.commit(); cur.close() conn.close() if(task.empty() and len(lastSet)>0): for i in lastSet: task.put(i);
现在我们再到任务程序中添加拉取任务的相关代码。
BaseManager.register('get_task'); BaseManager.register('get_result'); conn = BaseManager(address = (serverSite,5011), authkey = b'kasora'); def getvips(startid): lid = startid*jump; for i in range(lid,lid+jump): try: if(not getvip(i)): return False; except: pass; return True; def main(): try: conn.connect(); print("Connection succeeded.") except: print('Connection failed...retrying...'); time.sleep(5); main(); return; task = conn.get_task(); result = conn.get_result(); while not task.empty(): n = task.get(timeout = 5); print('deal with '+str(n*jump)+' to '+str((n+1)*jump)) ansl.clear() while(not getvips(n)): pass; rt = (n, ansl); result.put(rt); print('finish it.') print("finished all.") return;
尽管我们已经做了一些简单的防护措施。但是B站的服务器经常会传回一些不按套路来的数据。所以。我们还是加上try…catch…来保护任务进程。
最后我们不要忘记在主机进程中关闭管道。
manager.shutdown();
这样,我们的分布式爬虫就算完工了。
我们只需要在多个服务器上部署任务程序。并且选一台网络最好的机器部署主机程序即可。
完整项目代码:https://github.com/kasora/bilibili_bigvip_search
阶段性结果(数据太大还没跑完):