基于sanic和爬虫创建的代理ip池

搭建免费的代理ip池


需要解决的问题 :

  1. 使用什么方式存储ip


    • 文件存储

      缺点: 打开文件修改文件操作较麻烦

    • mysql

      缺点: 查询速度较慢

    • mongodb

      缺点: 查询速度较慢. 没有查重功能

    • redis --> 使用redis存储最为合适


    所以 -> 数据结构采用redis中的zset有序集合

  2. 获取ip的网站


    • https://ip.jiangxianli.com/

    • https://free.kuaidaili.com/free/intr/

  3. 项目架构???

项目架构

  1. 获取api
  2. 筛选api
  3. 验证api的有效性
  4. 提供api

项目结构图

项目结构如下 :

项目代码

code 文件夹

redis_proxy.py

 # -*- encoding:utf-8 -*-
# @time: 2022/7/4 11:32
# @author: Maxs_hu
"""
这里用来做redis中间商. 去控制redis和ip之间的调用关系
"""
from redis import Redis
import random


class RedisProxy:
def __init__(self):
# 连接到redis数据库
self.red = Redis(
host='localhost',
port=6379,
db=9,
password=123456,
decode_responses=True
)

# 1. 存储到redis中. 存储之前需要提前判断ip是否存在. 防止将已存在的ip的score抵掉
# 2. 需要校验所有的ip. 查询ip
# 3. 验证可用性. 可用分值拉满. 不可用扣分
# 4. 将可用的ip查出来返回给用户
# 先给满分的
# 再给有分的
# 都没有分. 就不给

def add_ip(self, ip): # 外界调用并传入ip
# 判断ip在redis中是否存在
if not self.red.zscore('proxy_ip', ip):
self.red.zadd('proxy_ip', {ip: 10})
print('proxy_ip存储完毕', ip)
else:
print('存在重复', ip)

def get_all_proxy(self):
# 查询所有的ip功能
return self.red.zrange('proxy_ip', 0, -1)

def set_max_score(self, ip):
self.red.zadd('proxy_ip', {ip: 100}) # 注意是引号的格式

def deduct_score(self, ip):
# 先将分数查询出来
score = self.red.zscore('proxy_ip', ip)
# 如果有分值.那就扣一分
if score > 0:
self.red.zincrby('proxy_ip', -1, ip)
else:
# 如果分值已经扣的小于0了. 那么可以直接删除了
self.red.zrem('proxy_ip', ip)

def effect_ip(self):
# 先将ip通过分数筛选出来
ips = self.red.zrangebyscore('proxy_ip', 100, 100, 0, -1)
if ips:
return random.choice(ips)
else: # 没有满分的
# 将九十分以上的筛选出来
ips = self.red.zrangebyscore('proxy_ip', 11, 99, 0, -1)
if ips:
return random.choice(ips)
else:
print('无可用ip')
return None

ip_collection.py

 # -*- encoding:utf-8 -*-
# @time: 2022/7/4 11:32
# @author: Maxs_hu
"""
这里用来收集ip
"""
from redis_proxy import RedisProxy
import requests
from lxml import html
from multiprocessing import Process
import time
import random


def get_kuai_ip(red):
url = "https://free.kuaidaili.com/free/intr/"
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36"
}
resp = requests.get(url, headers=headers)
etree = html.etree
et = etree.HTML(resp.text)
trs = et.xpath('//table//tr')
for tr in trs:
ip = tr.xpath('./td[1]/text()')
port = tr.xpath('./td[2]/text()')
if not ip: # 将不含有ip值的筛除
continue
proxy_ip = ip[0] + ":" + port[0]
red.add_ip(proxy_ip)


def get_unknown_ip(red):
url = "https://ip.jiangxianli.com/"
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36"
}
resp = requests.get(url, headers=headers)
etree = html.etree
et = etree.HTML(resp.text)
trs = et.xpath('//table//tr')
for tr in trs:
ip = tr.xpath('./td[1]/text()')
port = tr.xpath('./td[2]/text()')
if not ip: # 将不含有ip值的筛除
continue
proxy_ip = ip[0] + ":" + port[0]
red.add_ip(proxy_ip)


def get_happy_ip(red):
page = random.randint(1, 5)
url = f'http://www.kxdaili.com/dailiip/2/{page}.html'
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36"
}
resp = requests.get(url, headers=headers)
etree = html.etree
et = etree.HTML(resp.text)
trs = et.xpath('//table//tr')
for tr in trs:
ip = tr.xpath('./td[1]/text()')
port = tr.xpath('./td[2]/text()')
if not ip: # 将不含有ip值的筛除
continue
proxy_ip = ip[0] + ":" + port[0]
red.add_ip(proxy_ip)


def get_nima_ip(red):
url = 'http://www.nimadaili.com/'
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36"
}
resp = requests.get(url, headers=headers)
etree = html.etree
et = etree.HTML(resp.text)
trs = et.xpath('//table//tr')
for tr in trs:
ip = tr.xpath('./td[1]/text()') # 这里存在空值. 所以不能在后面加[0]
if not ip:
continue
red.add_ip(ip[0])


def get_89_ip(red):
page = random.randint(1, 26)
url = f'https://www.89ip.cn/index_{page}.html'
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36"
}
resp = requests.get(url, headers=headers)
etree = html.etree
et = etree.HTML(resp.text)
trs = et.xpath('//table//tr')
for tr in trs:
ip = tr.xpath('./td[1]/text()')
if not ip:
continue
red.add_ip(ip[0].strip())


def main():
# 创建一个redis实例化对象
red = RedisProxy()
print("开始采集数据")
while 1:
try:
# 这里可以添加各种采集的网站
print('>>>开始收集快代理ip')
get_kuai_ip(red) # 收集快代理
# get_unknown_ip(red) # 收集ip
print(">>>开始收集开心代理ip")
get_happy_ip(red) # 收集开心代理
print(">>>开始收集泥马代理ip")
# get_nima_ip(red) # 收集泥马代理
print(">>>开始收集89代理ip")
get_89_ip(red)
time.sleep(60)
except Exception as e:
print('ip储存出错了', e)
time.sleep(60)


if __name__ == '__main__':
main()
# 创建一个子进程
# p = Process(target=main)
# p.start()

ip_verify.py

 # -*- encoding:utf-8 -*-
# @time: 2022/7/4 11:34
# @author: Maxs_hu
"""
这里用来验证ip的可用性: 使用携程发送请求增加效率
"""
from redis_proxy import RedisProxy
from multiprocessing import Process
import asyncio
import aiohttp
import time


async def verify_ip(ip, red, sem):
timeout = aiohttp.ClientTimeout(total=10) # 设置网页等待时间不超过十秒
try:
async with sem:
async with aiohttp.ClientSession() as session:
async with session.get(url='http://www.baidu.com/',
proxy='http://'+ip,
timeout=timeout) as resp:
page_source = await resp.text()
if resp.status in [200, 302]:
# 如果可用. 加分
red.set_max_score(ip)
print('验证没有问题. 分值拉满~', ip)
else:
# 如果不可用. 扣分
red.deduct_score(ip)
print('问题ip. 扣一分', ip)
except Exception as e:
print('出错了', e)
red.deduct_score(ip)
print('问题ip. 扣一分', ip)


async def task(red):
ips = red.get_all_proxy()
sem = asyncio.Semaphore(30) # 设置每次三十的信号量
tasks = []
for ip in ips:
tasks.append(asyncio.create_task(verify_ip(ip, red, sem)))
if tasks:
await asyncio.wait(tasks)


def main():
red = RedisProxy()
time.sleep(5) # 初始的等待时间. 等待采集到数据
print("开始验证可用性")
while 1:
try:
asyncio.run(task(red))
time.sleep(100)
except Exception as e:
print("ip_verify出错了", e)
time.sleep(100)


if __name__ == '__main__':
main()
# 创建一个子进程
# p = Process(target=main())
# p.start()

ip_api.py

 # -*- encoding:utf-8 -*-
# @time: 2022/7/4 11:35
# @author: Maxs_hu

"""
这里用来提供给用户ip接口. 通过写后台服务器. 用户访问我们的服务器就可以得到可用的代理ip:
1. flask
2. sanic --> 今天使用这个要稍微简单一点
"""
from redis_proxy import RedisProxy
from sanic import Sanic, json
from sanic_cors import CORS
from multiprocessing import Process

# 创建一个app
app = Sanic('ip') # 随便给个名字
# 解决跨域问题
CORS(app)
red = RedisProxy()


@app.route('maxs_hu_ip') # 添加路由
def api(req): # 第一个请求参数固定. 请求对象
ip = red.effect_ip()
return json({"ip": ip})


def main():
# 让sanic跑起来
app.run(host='127.0.0.1', port=1234)


if __name__ == '__main__':
main()
# p = Process(target=main())
# p.start()

runner.py

 # -*- encoding:utf-8 -*-
# @time: 2022/7/5 17:36
# @author: Maxs_hu
from ip_api import main as api_run
from ip_collection import main as coll_run
from ip_verify import main as veri_run
from multiprocessing import Process


def main():
# 设置互不干扰的三个进程
p1 = Process(target=api_run) # 只需要将目标函数的内存地址传过去即可
p2 = Process(target=coll_run)
p3 = Process(target=veri_run)

p1.start()
p2.start()
p3.start()


if __name__ == '__main__':
main()

测试ip是否可用.py

 # -*- encoding:utf-8 -*-
# @time: 2022/7/5 18:15
# @author: Maxs_hu
import requests


def get_proxy():
url = "http://127.0.0.1:1234/maxs_hu_ip"
resp = requests.get(url)
return resp.json()


def main():
url = 'http://mip.chinaz.com/?query=' + get_proxy()["ip"]
proxies = {
"http": 'http://' + get_proxy()["ip"],
"https": 'http://' + get_proxy()["ip"] # 目前代理只支持http请求
}
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36",
}
resp = requests.get(url, proxies=proxies, headers=headers)
resp.encoding = 'utf-8'
print(resp.text) # 物理位置


if __name__ == '__main__':
main()

运行效果

项目运行截图:

redis储存截图:

总结

  1. 免费代理ip只支持http的网页操作. 并不好用. 如果有需求可以进行购买然后加入ip代理池
  2. 网页部署到自己的服务器上. 别人访问自己的服务器. 以后学了全栈可以加上登录. 和付费功能. 实现功能的进一步拓展
  3. 项目架构是生产者消费者模型. 三个模块同时运行. 每个模块为一个进程. 互不影响
  4. 代理设计细节有待处理. 但总体运行效果还可以. 遇到问题再修改

标签: python

添加新评论