Python3维护免费代理池
一、项目背景
最近在学习Python爬虫,于是想要在网上找一些免费的代理练练手,于是有了这个项目。
二、项目结构
1、爬取下面两个个站点
2、使用百度对爬取到的代理进行检测
3、保存爬取到的代理信息
3.1、将没有验证的直接放到数据库的 uncheck_proxies
表中
3.2、将验证过的直接放到数据库的 available_proxies
表中
三、项目代码
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# --author:valecalida--
# Edit time: 2020/4/13 16:08
from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from requests import RequestException
from bs4 import BeautifulSoup
import threading
import requests
import urllib3
import random
import re
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class MaintainProxyPool(object):
def __init__(self):
self.kuai_proxy = "https://www.kuaidaili.com/free/inha/"
self.xici_proxy = "https://www.xicidaili.com/nn/"
self.check_proxy = 'https://www.baidu.com'
self.kuai_infos, self.xici_infos = [], []
self.alive_infos = []
self.merge_infos = []
@staticmethod
def get_user_agent():
user_agent = [
"Mozilla/5.0 (Windows NT 6.2) AppleWebKit/536.6 (KHTML, like Gecko) Chrome/20.0.1090.0 Safari/536.6",
"Mozilla/5.0 (Windows NT 6.2; WOW64) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/19.77.34.5 Safari/537.1",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/536.5 (KHTML, like Gecko) Chrome/19.0.1084.9 Safari/536.5",
"Mozilla/5.0 (Windows NT 6.2) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1062.0 Safari/536.3",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1062.0 Safari/536.3",
"Mozilla/5.0 (Windows NT 6.2) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1061.0 Safari/536.3",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/535.24 (KHTML, like Gecko) Chrome/19.0.1055.1 Safari/535.24",
"Mozilla/5.0 (Windows NT 6.2; WOW64) AppleWebKit/535.24 (KHTML, like Gecko) Chrome/19.0.1055.1 Safari/535.24"
]
header = random.choice(user_agent)
return header
@staticmethod
def get_html(url):
header = {"User-Agent": MaintainProxyPool.get_user_agent()}
try:
res = requests.get(url, headers=header)
except RequestException:
pass
else:
soup = BeautifulSoup(res.text, 'lxml')
html = soup.find_all('tr')
return html
def get_status_code(self, proxy):
header = {"User-Agent": MaintainProxyPool.get_user_agent()}
try:
res = requests.get(self.check_proxy, headers=header, proxies=proxy)
except RequestException:
pass
else:
if res.status_code == 200:
return res.status_code
def get_kuai_proxy(self):
print("[..] 开始爬取kuai代理的代理")
for i in range(1, 5):
url = self.kuai_proxy + str(i) + "/"
html = self.get_html(url)
for index in range(1, len(html)):
infos = re.findall("<td data.*?>(.*)</td>", str(html[index]))
infos.pop(2)
infos.pop(3)
infos.pop(3)
self.kuai_infos.append(infos)
return self.kuai_infos
def get_xici_proxy(self):
print("[..] 开始爬取xici的代理")
for i in range(1, 5):
url = self.xici_proxy + str(i)
html = self.get_html(url)
for index in range(1, len(html)):
infos = re.findall("<td>(.*)</td>", str(html[index]))
infos.pop(3)
self.xici_infos.append(infos)
return self.xici_infos
def check_alive(self):
self.merge_infos = self.get_kuai_proxy() + self.get_xici_proxy()
print("[..] 开始检测爬取到代理的可用性...")
for info in self.merge_infos:
if info[2] == "HTTP" or "http":
proxy_http = {"http": "http://" + info[0] + ":" + info[1]}
if self.get_status_code(proxy_http):
self.alive_infos.append(info)
elif info[2]:
proxy_https = {"https": "https://" + info[0] + ":" + info[1]}
if self.get_status_code(proxy_https):
self.alive_infos.append(info)
return self.merge_infos, self.alive_infos
class CreateTables(object):
"""此类用于生成表结构"""
engine = create_engine('mysql+mysqlconnector://root:root@localhost:3306/proxies', encoding="utf-8", echo=True, max_overflow=5)
Base = declarative_base()
class UncheckProxies(Base):
__tablename__ = 'uncheck_proxies'
id = Column(Integer, primary_key=True, autoincrement=True)
ip_address = Column(String(20))
link_port = Column(String(6))
conn_type = Column(String(5))
verify_time = Column(String(30))
class UsableProxy(Base):
__tablename__ = 'available_proxies'
id = Column(Integer, primary_key=True, autoincrement=True)
ip_address = Column(String(20))
link_port = Column(String(6))
conn_type = Column(String(5))
verify_time = Column(String(30))
Base.metadata.create_all(engine)
class MysqlOperation(object):
def __init__(self):
self.engine = create_engine('mysql+mysqlconnector://root:root@localhost:3306/proxies', encoding="utf-8", echo=False, max_overflow=5)
self.Session_Class = sessionmaker(bind=self.engine)
self.Session = self.Session_Class()
self.merge_infos, self.alive_infos = MaintainProxyPool().check_alive()
def get_start_index(self):
# start_index 得到uncheck_proxies表中的行数
start_index_uncheck = self.Session.query(CreateTables.UncheckProxies).count()
start_index_checked = self.Session.query(CreateTables.UsableProxy).count()
return start_index_uncheck, start_index_checked
def write_uncheck_to_database(self):
start_index, _ = self.get_start_index()
for info in self.merge_infos:
address, port, conn_type, v_time = info[0], info[1], info[2], info[3]
res = self.Session.query(CreateTables.UncheckProxies).filter(CreateTables.UncheckProxies.ip_address == address).all()
if len(res) == 0:
start_index += 1
t = CreateTables.UncheckProxies(id=start_index, ip_address=address, link_port=port, conn_type=conn_type, verify_time=v_time)
self.Session.add(t)
self.Session.commit()
print("[+] 正在往'uncheck_proxy'数据库中写入一条新的代理信息...")
else:
print("[-] 数据库中已包含此条代理信息,将不执行新增操作")
def write_alive_to_database(self):
_, start_index = self.get_start_index()
for info in self.alive_infos:
address, port, conn_type, v_time = info[0], info[1], info[2], info[3]
res = self.Session.query(CreateTables.UsableProxy).filter(CreateTables.UsableProxy.ip_address == address).all()
if len(res) == 0:
start_index += 1
t = CreateTables.UsableProxy(id=start_index, ip_address=address, link_port=port, conn_type=conn_type, verify_time=v_time)
self.Session.add(t)
self.Session.commit()
print("[+] 正在往'alive_proxy'数据库中写入一条新的代理信息...")
else:
print("[-] 数据库中已包含此条代理信息,将不执行新增操作")
def run(self):
t1 = threading.Thread(target=self.write_uncheck_to_database())
t2 = threading.Thread(target=self.write_alive_to_database())
t1.start()
t2.start()
if __name__ == '__main__':
MysqlOperation().run()