Connection Handler (连接处理器)
Connection Handler 是 Pydoll 架构的基础层,充当 Python 代码与浏览器 Chrome DevTools Protocol (CDP) 之间的桥梁。该组件管理与浏览器的 WebSocket 连接,处理命令执行,并以非阻塞、异步的方式处理事件。
graph TD
A[Python 代码] --> B[Connection Handler]
B <--> C[WebSocket]
C <--> D[浏览器 CDP 端点]
subgraph "Connection Handler"
E[命令管理器]
F[事件处理器]
G[WebSocket 客户端]
end
B --> E
B --> F
B --> G
异步编程模型
Pydoll 构建于 Python 的 asyncio 框架之上,该框架支持非阻塞 I/O 操作。这种设计选择对于高性能的浏览器自动化至关重要,因为它允许多个操作并发执行,而无需等待每个操作完成。
理解 Async/Await
为了理解 async/await 在实践中如何工作,让我们看一个包含两个并发操作的更详细的示例:
import asyncio
from pydoll.browser.chrome import Chrome
async def fetch_page_data(url):
print(f"开始抓取 {url}")
browser = Chrome()
await browser.start()
page = await browser.get_page()
# 导航需要时间 - 这是我们让出控制权的地方
await page.go_to(url)
# 获取页面标题
title = await page.execute_script("return document.title")
# 提取一些数据
description = await page.execute_script(
"return document.querySelector('meta[name=\"description\"]')?.content || ''"
)
await browser.stop()
print(f"完成抓取 {url}")
return {"url": url, "title": title, "description": description}
async def main():
# 并发启动两个页面操作
task1 = asyncio.create_task(fetch_page_data("https://example.com"))
task2 = asyncio.create_task(fetch_page_data("https://github.com"))
# 等待两者完成并获取结果
result1 = await task1
result2 = await task2
return [result1, result2]
# 运行异步函数
results = asyncio.run(main())
此示例演示了我们如何并发地从两个不同的网站获取数据,与顺序执行相比,这可能将总执行时间缩短近一半。
异步执行流程图
以下是执行上述代码时事件循环中发生的情况:
sequenceDiagram
participant A as 主代码
participant B as 任务 1<br/> (example.com)
participant C as 任务 2<br/> (github.com)
participant D as 事件循环
A->>B: 创建任务 1
B->>D: 在循环中注册
A->>C: 创建任务 2
C->>D: 在循环中注册
D->>B: 执行直到 browser.start()
D->>C: 执行直到 browser.start()
D-->>B: WebSocket 连接后恢复
D-->>C: WebSocket 连接后恢复
D->>B: 执行直到 page.go_to()
D->>C: 执行直到 page.go_to()
D-->>B: 页面加载后恢复
D-->>C: 页面加载后恢复
B-->>A: 返回结果
C-->>A: 返回结果
此序列图说明了 Python 的 asyncio 如何管理我们示例代码中的两个并发任务:
- 主函数创建两个任务,用于从不同网站获取数据
- 两个任务都在事件循环中注册
- 事件循环执行每个任务,直到遇到
await语句(如browser.start()) - 当异步操作完成时(如 WebSocket 连接建立),任务恢复执行
- 循环在每个
await点继续在任务之间切换 - 当每个任务完成时,它将其结果返回给主函数
在 fetch_page_data 示例中,这允许两个浏览器实例并发工作 - 当一个实例等待页面加载时,另一个实例可以取得进展。这比顺序处理每个网站要高效得多,因为 I/O 等待时间不会阻塞其他任务的执行。
协作式多任务
Asyncio 使用协作式多任务,其中任务在 await 点自愿让出控制权。这不同于抢占式多任务(线程),后者中任务可能在任何时候被中断。协作式多任务可以为 I/O 密集型操作提供更好的性能,但需要仔细编码以避免阻塞事件循环。
Connection Handler 实现
ConnectionHandler 类旨在管理命令执行和事件处理,为 CDP WebSocket 连接提供了一个健壮的接口。
类初始化
def __init__(
self,
connection_port: int,
page_id: str = 'browser',
ws_address_resolver: Callable[[int], str] = get_browser_ws_address,
ws_connector: Callable = websockets.connect,
):
# 初始化组件...
ConnectionHandler 接受几个参数:
| 参数 | 类型 | 描述 |
|---|---|---|
connection_port |
int |
浏览器 CDP 端点正在监听的端口号 |
page_id |
str |
特定页面/目标的标识符(用于浏览器级别的连接时使用 'browser') |
ws_address_resolver |
Callable |
从端口号解析 WebSocket URL 的函数 |
ws_connector |
Callable |
建立 WebSocket 连接的函数 |
内部组件
ConnectionHandler 协调三个主要组件:
- WebSocket 连接:管理与浏览器的实际 WebSocket 通信
- 命令管理器:处理发送命令和接收响应
- 事件处理器:处理来自浏览器的事件并触发适当的回调
classDiagram
class ConnectionHandler {
-_connection_port: int
-_page_id: str
-_ws_connection
-_command_manager: CommandManager
-_events_handler: EventsHandler
+execute_command(command, timeout) async
+register_callback(event_name, callback) async
+remove_callback(callback_id) async
+ping() async
+close() async
-_receive_events() async
}
class CommandManager {
-_pending_commands: dict
+create_command_future(command)
+resolve_command(id, response)
+remove_pending_command(id)
}
class EventsHandler {
-_callbacks: dict
-_network_logs: list
-_dialog: dict
+register_callback(event_name, callback, temporary)
+remove_callback(callback_id)
+clear_callbacks()
+process_event(event) async
}
ConnectionHandler *-- CommandManager
ConnectionHandler *-- EventsHandler
命令执行流程
通过 CDP 执行命令时,ConnectionHandler 遵循特定模式:
- 确保存在活动的 WebSocket 连接
- 创建一个 Future 对象来表示挂起的响应
- 通过 WebSocket 发送命令
- 等待 Future 被响应解析
- 将响应返回给调用者
async def execute_command(self, command: dict, timeout: int = 10) -> dict:
# 验证命令
if not isinstance(command, dict):
logger.error('Command must be a dictionary.')
raise exceptions.InvalidCommand('Command must be a dictionary')
# 确保连接处于活动状态
await self._ensure_active_connection()
# 为此命令创建 future
future = self._command_manager.create_command_future(command)
command_str = json.dumps(command)
# 发送命令并等待响应
try:
await self._ws_connection.send(command_str)
response: str = await asyncio.wait_for(future, timeout)
return json.loads(response)
except asyncio.TimeoutError as exc:
self._command_manager.remove_pending_command(command['id'])
raise exc
except websockets.ConnectionClosed as exc:
await self._handle_connection_loss()
raise exc
命令超时
未在指定超时期限内收到响应的命令将引发 TimeoutError。这可以防止自动化脚本因缺少响应而无限期挂起。默认超时为 10 秒,但可以根据复杂操作的预期响应时间进行调整。
事件处理系统
事件系统是启用 Pydoll 中反应式编程模式的关键架构组件。它允许您为特定浏览器事件注册回调,并在这些事件发生时自动执行它们。
事件流
事件处理流程遵循以下步骤:
_receive_events方法作为后台任务运行,持续从 WebSocket 接收消息- 每条消息被解析并分类为命令响应或事件
- 事件被传递给 EventsHandler 进行处理
- EventsHandler 识别该事件已注册的回调并调用它们
flowchart TD
A[WebSocket 消息] --> B{是命令响应吗?}
B -->|是| C[解析命令 Future]
B -->|否| D[作为事件处理]
D --> E[查找匹配的回调]
E --> F[执行回调]
F --> G{是临时的吗?}
G -->|是| H[移除回调]
G -->|否| I[保留回调]
回调注册
ConnectionHandler 提供了注册、移除和管理事件回调的方法:
# 为特定事件注册回调
callback_id = await connection.register_callback(
'Page.loadEventFired',
handle_page_load
)
# 移除特定回调
await connection.remove_callback(callback_id)
# 移除所有回调
await connection.clear_callbacks()
临时回调
您可以将回调注册为临时的,这意味着它在触发一次后将自动移除。这对于一次性事件(如处理对话框)很有用:
异步回调执行
回调可以是同步函数或异步协程。EventsHandler(由 ConnectionHandler 管理)可以正确处理这两种类型:
# 同步回调
def synchronous_callback(event):
print(f"Event received: {event['method']}")
# 异步回调
async def asynchronous_callback(event):
await asyncio.sleep(0.1) # 执行一些异步操作
print(f"Event processed asynchronously: {event['method']}")
# 两者都可以用相同的方式注册
await connection.register_callback('Network.requestWillBeSent', synchronous_callback)
await connection.register_callback('Network.responseReceived', asynchronous_callback)
顺序执行模型:
异步回调由 EventsManager 顺序等待 (await)。这确保了对于单个事件,回调按照它们注册的顺序执行,防止了多个回调修改共享状态时出现竞争条件。
# 在 EventsManager.process_event() 内部
for callback_data in callbacks:
if asyncio.iscoroutinefunction(callback_data['callback']):
await callback_data['callback'](event_data) # 顺序 await
else:
callback_data['callback'](event_data) # 同步执行
非阻塞执行(用于不应阻塞其他操作的 UI 回调)是在 更高层 实现的,例如在 Tab.on() 方法中,它在注册用户回调之前将其包装在 asyncio.create_task() 中。这种架构提供了:
- 底层 (ConnectionHandler/EventsManager):保证顺序执行和可预测的顺序
- 高层 (Tab.on()):在需要时提供非阻塞语义
事件架构详情
有关多层事件系统和顺序回调执行原理的完整详细信息,请参阅 事件架构深入探讨。
连接管理
ConnectionHandler 实现了多种策略以确保连接的健壮性:
延迟连接建立
仅在需要时才建立连接,通常是在执行第一个命令时或明确请求时。这种延迟初始化方法可以节省资源,并允许更灵活的连接管理。
自动重新连接
如果 WebSocket 连接意外丢失或关闭,ConnectionHandler 将在执行下一个命令时尝试自动重新建立连接。这提供了对瞬态网络问题的弹性。
async def _ensure_active_connection(self):
"""
保证在继续之前存在活动连接。
"""
if self._ws_connection is None or self._ws_connection.closed:
await self._establish_new_connection()
资源清理
ConnectionHandler 实现了显式清理方法和 Python 的异步上下文管理器协议(__aenter__ 和 __aexit__),确保在不再需要资源时正确释放它们:
async def close(self):
"""
关闭 WebSocket 连接并清除所有回调。
"""
await self.clear_callbacks()
if self._ws_connection is not None:
try:
await self._ws_connection.close()
except websockets.ConnectionClosed as e:
logger.info(f'WebSocket connection has closed: {e}')
logger.info('WebSocket connection closed.')
上下文管理器用法
将 ConnectionHandler 用作上下文管理器是确保正确清理资源的的推荐模式:
消息处理管道
ConnectionHandler 实现了一个复杂的消息处理管道,用于处理来自 WebSocket 连接的连续消息流:
sequenceDiagram
participant WS as WebSocket
participant RCV as _receive_events
participant MSG as _process_single_message
participant PARSE as _parse_message
participant CMD as _handle_command_message
participant EVT as _handle_event_message
loop 当连接时
WS->>RCV: 消息
RCV->>MSG: 原始消息
MSG->>PARSE: 原始消息
PARSE-->>MSG: 解析后的 JSON 或 None
alt 是命令响应
MSG->>CMD: 消息
CMD->>CMD: 解析命令 future
else 是事件通知
MSG->>EVT: 消息
EVT->>EVT: 处理事件并触发回调
end
end
该管道确保了命令响应和异步事件的高效处理,使 Pydoll 即使在大量消息的情况下也能保持响应灵敏的操作。
高级用法
ConnectionHandler 通常通过 Browser 和 Page 类间接使用,但也可以直接用于高级场景:
直接事件监控
对于特殊用例,您可能希望绕过更高级别的 API,直接监控特定的 CDP 事件:
from pydoll.connection.connection import ConnectionHandler
async def monitor_network():
connection = ConnectionHandler(9222)
async def log_request(event):
url = event['params']['request']['url']
print(f"Request: {url}")
await connection.register_callback(
'Network.requestWillBeSent',
log_request
)
# 通过 CDP 命令启用网络事件
await connection.execute_command({
"id": 1,
"method": "Network.enable"
})
# 持续运行直到被中断
try:
while True:
await asyncio.sleep(1)
finally:
await connection.close()
自定义命令执行
您可以直接执行任意 CDP 命令:
async def custom_cdp_command(connection, method, params=None):
command = {
"id": random.randint(1, 10000),
"method": method,
"params": params or {}
}
return await connection.execute_command(command)
# 示例:不使用 Page 类获取文档 HTML
async def get_html(connection):
result = await custom_cdp_command(
connection,
"Runtime.evaluate",
{"expression": "document.documentElement.outerHTML"}
)
return result['result']['result']['value']
高级接口
直接使用 ConnectionHandler 需要深入了解 Chrome DevTools 协议。对于大多数用例,更高级别的 Browser 和 Page API 提供了更直观、更安全的接口。
高级并发模式
ConnectionHandler 的异步设计支持复杂的并发模式:
并行命令执行
并发执行多个命令并等待所有结果:
async def get_page_metrics(connection):
commands = [
{"id": 1, "method": "Performance.getMetrics"},
{"id": 2, "method": "Network.getResponseBody", "params": {"requestId": "..."}},
{"id": 3, "method": "DOM.getDocument"}
]
results = await asyncio.gather(
*(connection.execute_command(cmd) for cmd in commands)
)
return results
结论
ConnectionHandler 是 Pydoll 架构的基础,为 Chrome DevTools 协议提供了健壮、高效的接口。通过利用 Python 的 asyncio 框架和 WebSocket 通信,它支持高性能的浏览器自动化,并具有优雅的、事件驱动的编程模式。
理解 ConnectionHandler 的设计和操作,有助于深入了解 Pydoll 的内部工作原理,并为在特殊场景下进行高级定制和优化提供了机会。
对于大多数用例,您将通过更高级别的 Browser 和 Page API 间接与 ConnectionHandler 交互,这些 API 提供了更直观的接口,同时利用了 ConnectionHandler 的强大功能。