P4-自动化采集系统代码优化-实例化浏览器管理类
- ✅接口交互:FastAPI(基于异步,高性能)
- ✅Token校验
- ✅DP获取网页源码
- ✅前三者联系起来实现一个最简单的系统:fastapi接受的参数,校验token,获取网页源码用接口并返回
- ✅传递参数丰富:睡眠时间,截图功能,等待元素
- ✅FastAPI接口文档无法显示问题
- ✅接口文档界面完善
- 实例化浏览器管理类
python
import asyncio
from pathlib import Path
from functools import wraps
from fastapi import FastAPI, HTTPException, Depends, APIRouter
from fastapi.responses import HTMLResponse, Response, JSONResponse
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from fastapi.staticfiles import StaticFiles
from fastapi.openapi.docs import get_swagger_ui_html
from pydantic import BaseModel, HttpUrl, Field
from typing import Optional, Union
from DrissionPage import ChromiumOptions, Chromium
from pydantic import BaseModel
from typing import Optional, Any
class ChromiumManager:
def __init__(self):
self.chromium: Optional[Chromium] = None # Chromium 实例
self.last_request_time: Optional[float] = None # 记录最后一次请求时间
self.lock = asyncio.Lock() # 确保线程安全
self.shutdown_timer: Optional[asyncio.Task] = None # 定时器任务
self.active_tasks = 0 # 活跃任务计数器
self.BROWSER_IDLE_TIMEOUT = 120 # 浏览器空闲多久(秒)后关闭
self.CHECK_INTERVAL = 10 # 检查间隔(秒)
async def start_chromium(self):
if self.chromium is None:
co = ChromiumOptions().auto_port()
co.ignore_certificate_errors(True)
self.chromium = Chromium(addr_or_opts=co)
self.chromium.set.auto_handle_alert(True)
print("Chromium 已启动")
async def stop_chromium(self):
if self.chromium is not None:
await asyncio.to_thread(self.chromium.quit)
self.chromium = None
print("Chromium 已关闭")
async def monitor_inactivity(self):
while True:
await asyncio.sleep(self.CHECK_INTERVAL)
async with self.lock:
if (
self.active_tasks == 0
and self.last_request_time
and (asyncio.get_event_loop().time() - self.last_request_time)
> self.BROWSER_IDLE_TIMEOUT
):
await self.stop_chromium()
self.shutdown_timer = None
break
async def reset_timer(self):
self.last_request_time = asyncio.get_event_loop().time()
async with self.lock:
if self.shutdown_timer is None:
self.shutdown_timer = asyncio.create_task(self.monitor_inactivity())
def chromium_task(self, func):
@wraps(func)
async def wrapper(*args, **kwargs):
if self.chromium and not self.chromium.states.is_alive:
self.chromium = None
async with self.lock:
if self.chromium is None:
await self.start_chromium()
async with self.lock:
self.active_tasks += 1
try:
return await func(*args, **kwargs)
finally:
async with self.lock:
self.active_tasks -= 1
if self.chromium and not self.chromium.states.is_alive:
self.chromium = None
return wrapper
class ResponseData(BaseModel):
code: int # 状态码
msg: str # 提示信息
data: Optional[Union[str, bytes]] = (
None # data 字段可以是字符串(HTML)或字节(图片)
)
class RequestData(BaseModel):
r_type: Optional[bool] = Field(True, description="是否以json格式返回,否的话会根据接口返回html或者图片字节")
url: HttpUrl = Field(default="http://localhost:8001/", description="目标网址")
dp_timeout: Optional[int] = Field(
10, description="等待页面加载的最大时间(秒)", ge=0, le=30
)
dp_sleep: Optional[float] = Field(
0.1, description="等待页面加载的最大时间(秒)", ge=0, le=10
)
dp_wait_for: Optional[str] = Field(description="等待页面元素的xpath选择器")
dp_retry: Optional[int] = Field(1, description="请求页面的重试次数", ge=0, le=3)
# 初始化 Chromium 管理器
chromium_manager = ChromiumManager()
# 获取 HTML 页面
async def get_html_from_tab(
tab, url: str, dp_timeout, dp_sleep, dp_wait_for, dp_retry
) -> str:
await asyncio.to_thread(tab.get, url=url, timeout=dp_timeout, retry=dp_retry)
await asyncio.sleep(dp_sleep)
return await asyncio.to_thread(lambda: tab.html)
# 获取 Screenshot
async def get_screenshot_from_tab(
tab, url, dp_timeout, dp_sleep, dp_wait_for, dp_retry
) -> bytes:
await asyncio.to_thread(tab.get, url=url, timeout=dp_timeout, retry=dp_retry)
await asyncio.sleep(dp_sleep)
return await asyncio.to_thread(tab.get_screenshot, as_bytes="png", full_page=True)
# 获取 HTML 页面
@chromium_manager.chromium_task
async def get_html(url, dp_timeout, dp_sleep, dp_wait_for, dp_retry):
new_tab = await asyncio.to_thread(chromium_manager.chromium.new_tab)
try:
return await get_html_from_tab(
new_tab, url, dp_timeout, dp_sleep, dp_wait_for, dp_retry
)
finally:
await asyncio.to_thread(new_tab.close)
# 获取 Screenshot
@chromium_manager.chromium_task
async def get_screenshot(url: str, dp_timeout, dp_sleep, dp_wait_for, dp_retry):
new_tab = await asyncio.to_thread(chromium_manager.chromium.new_tab)
try:
return await get_screenshot_from_tab(
new_tab, url, dp_timeout, dp_sleep, dp_wait_for, dp_retry
)
finally:
await asyncio.to_thread(new_tab.close)
# 创建 FastAPI 应用
app = FastAPI(
title="自动化采集中间系统",
description="FastAPI + DrissionPage 实现的自动化采集中间系统,功能多样,体验丝滑,还有一丢丢智能~",
version="1.0.0",
docs_url=None,
)
app.mount("/static", StaticFiles(directory="static"), name="static")
router = APIRouter()
@app.get("/docs", include_in_schema=False)
async def custom_swagger_ui_html():
return get_swagger_ui_html(
openapi_url="/openapi.json",
title="接口文档",
oauth2_redirect_url="/docs/oauth2-redirect",
swagger_js_url="/static/swagger-ui-bundle.js",
swagger_css_url="/static/swagger-ui.css",
)
@router.post(
"/fetch_html/",
summary="获取html源码",
description="返回目标网址源代码,HTML格式的响应",
response_model=ResponseData,
)
async def fetch_html(data: RequestData):
url = str(data.url)
dp_timeout = data.dp_timeout
dp_sleep = data.dp_sleep
dp_wait_for = data.dp_wait_for
dp_retry = data.dp_retry
await chromium_manager.reset_timer() # 更新请求时间,重置定时器
try:
result: str = await get_html(
url=url,
dp_timeout=dp_timeout,
dp_sleep=dp_sleep,
dp_wait_for=dp_wait_for,
dp_retry=dp_retry,
)
return HTMLResponse(content=result, status_code=200)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post(
"/fetch_screenshot/",
summary="获取网页截图",
description="返回目标网站截图,图片格式的响应",
)
async def fetch_screenshot(data: RequestData):
url = str(data.url)
dp_timeout = data.dp_timeout
dp_sleep = data.dp_sleep
dp_wait_for = data.dp_wait_for
dp_retry = data.dp_retry
await chromium_manager.reset_timer() # 更新请求时间,重置定时器
image_data = await get_screenshot(
url=url,
dp_timeout=dp_timeout,
dp_sleep=dp_sleep,
dp_wait_for=dp_wait_for,
dp_retry=dp_retry,
)
return Response(content=image_data, media_type="image/png")
app.include_router(router, tags=["网页采集"])
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8010)