Python + Skills 架构实现从理论到实践

 更新时间:2026年03月19日 10:11:52   作者:超龄超能程序猿  
Skills架构是一种模块化设计模式,它将复杂的业务流程分解为一系列独立的、可重用的功能单元(称为"技能"),本文给大家介绍Python + Skills 架构实现从理论到实践,感兴趣的朋友跟随小编一起看看吧

第一部分:Skills架构理论基础

1.1 什么是Skills架构?

Skills架构是一种模块化设计模式,它将复杂的业务流程分解为一系列独立的、可重用的功能单元(称为"技能")。每个技能专注于完成特定的任务,并通过统一的接口进行交互。

核心特征:

  • 单一职责原则:每个技能只负责一个明确的功能
  • 高内聚低耦合:技能内部逻辑紧密相关,外部依赖最小化
  • 可组合性:技能可以灵活组合形成更复杂的功能
  • 可测试性:独立的技能单元便于单元测试和集成测试

1.2 Skills架构的优势

开发效率提升:

  • 代码复用性高,减少重复开发
  • 模块化设计便于团队协作
  • 快速构建新功能通过技能组合

系统维护性:

  • 问题定位和修复更加容易
  • 单个技能升级不影响整体系统
  • 便于性能优化和功能扩展

业务适应性:

  • 快速响应业务需求变化
  • 支持配置驱动开发
  • 易于集成第三方服务

第二部分:核心架构设计

2.1 基础组件设计

BaseSkill - 技能基类

from abc import ABC, abstractmethod
from typing import Any
import logging
class BaseSkill(ABC):
    """技能基类 - 所有技能的抽象基类"""
    def __init__(self, name: str):
        self.name = name
        self.logger = logging.getLogger(f"skill.{name}")
    @abstractmethod
    async def execute(self, *args, **kwargs) -> Any:
        """执行技能的核心逻辑"""
        pass
    def __str__(self):
        return f"Skill({self.name})"

设计要点:

  • 抽象基类确保统一的接口规范
  • 内置日志记录便于调试和监控
  • 异步执行支持高性能并发处理

2.2 技能编排器 (SkillOrchestrator)

class SkillOrchestrator:
    """技能编排器 - 负责协调多个技能的执行"""
    def __init__(self):
        self.skills = {}
        self.logger = logging.getLogger("orchestrator")
    def register_skill(self, name: str, skill: BaseSkill):
        """注册技能"""
        self.skills[name] = skill
        self.logger.info(f"注册技能: {name}")
    async def execute_workflow(self, workflow: List[Dict]) -> Dict[str, Any]:
        """执行工作流"""
        results = {}
        for step in workflow:
            skill_name = step['skill']
            skill_args = step.get('args', {})
            if skill_name not in self.skills:
                self.logger.error(f"未注册的技能: {skill_name}")
                continue
            try:
                self.logger.info(f"执行技能: {skill_name}")
                result = await self.skills[skill_name].execute(**skill_args)
                results[skill_name] = result
                self.logger.info(f"技能执行完成: {skill_name}")
            except Exception as e:
                self.logger.error(f"技能执行失败 {skill_name}: {e}")
                results[skill_name] = None
        return results

编排器功能:

  • 技能注册和管理
  • 工作流顺序执行
  • 错误处理和结果收集
  • 执行状态监控

第三部分:具体技能实现

3.1 浏览器管理技能 (BrowserManagerSkill)

class BrowserManagerSkill(BaseSkill):
    """浏览器管理技能 - 负责浏览器生命周期管理"""
    def __init__(self, headless: bool = True):
        super().__init__("browser_manager")
        self.headless = headless
        self.browser = None
        self.context = None
        self.page = None
    async def execute(self, setup_only: bool = True) -> tuple:
        """执行浏览器设置"""
        try:
            from playwright.async_api import async_playwright
            self.logger.info("启动浏览器...")
            playwright = await async_playwright().start()
            # 启动浏览器
            self.browser = await playwright.chromium.launch(
                headless=self.headless,
                args=[
                    '--no-sandbox',
                    '--disable-setuid-sandbox',
                    '--disable-blink-features=AutomationControlled'
                ]
            )
            # 创建上下文
            self.context = await self.browser.new_context(
                ignore_https_errors=True
            )
            # 创建页面
            self.page = await self.context.new_page()
            self.logger.info("浏览器启动成功")
            return self.browser, self.context, self.page
        except Exception as e:
            self.logger.error(f"浏览器启动失败: {e}")
            raise
    async def cleanup(self):
        """清理浏览器资源"""
        try:
            if self.context:
                await self.context.close()
            if self.browser:
                await self.browser.close()
            self.logger.info("浏览器资源清理完成")
        except Exception as e:
            self.logger.error(f"资源清理失败: {e}")

技术要点:

  • 使用Playwright进行浏览器自动化
  • 支持有头和无头模式
  • 完善的资源管理和清理
  • 异常处理和日志记录

3.2 页面导航技能 (PageNavigationSkill)

class PageNavigationSkill(BaseSkill):
    """页面导航技能 - 负责页面导航和等待"""
    def __init__(self, page):
        super().__init__("page_navigation")
        self.page = page
    async def execute(self, url: str, timeout: int = 30000) -> bool:
        """导航到指定URL"""
        try:
            self.logger.info(f"导航到: {url}")
            # 设置页面头信息,避免被识别为爬虫
            await self.page.set_extra_http_headers({
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
            })
            # 导航到目标页面
            await self.page.goto(url, timeout=timeout)
            # 等待页面加载完成
            await self.page.wait_for_load_state('networkidle')
            self.logger.info("页面导航成功")
            return True
        except Exception as e:
            self.logger.error(f"页面导航失败: {e}")
            return False

功能特性:

  • 智能页面导航和等待
  • 反爬虫规避策略
  • 超时控制和错误处理
  • 网络状态监控

3.3 数据提取技能 (DataExtractionSkill)

class DataExtractionSkill(BaseSkill):
    """数据提取技能 - 负责从页面提取结构化数据"""
    def __init__(self, page):
        super().__init__("data_extraction")
        self.page = page
    async def execute(self, extraction_config: Dict) -> List[Dict]:
        """执行数据提取"""
        try:
            from bs4 import BeautifulSoup
            self.logger.info("开始数据提取...")
            # 获取页面内容
            html_content = await self.page.content()
            soup = BeautifulSoup(html_content, 'html.parser')
            # 根据配置提取数据
            extracted_data = []
            if 'table_selector' in extraction_config:
                table_data = await self._extract_table_data(soup, extraction_config)
                extracted_data.extend(table_data)
            if 'element_selectors' in extraction_config:
                element_data = await self._extract_elements_data(soup, extraction_config)
                extracted_data.extend(element_data)
            self.logger.info(f"数据提取完成,共提取 {len(extracted_data)} 条记录")
            return extracted_data
        except Exception as e:
            self.logger.error(f"数据提取失败: {e}")
            return []

提取策略:

  • 支持表格数据和元素数据提取
  • 配置驱动的提取规则
  • 灵活的数据映射机制
  • 容错处理和空值处理

3.4 数据存储技能 (DataStorageSkill)

class DataStorageSkill(BaseSkill):
    """数据存储技能 - 负责数据持久化"""
    def __init__(self, db_path: str = "weather_data.db"):
        super().__init__("data_storage")
        self.db_path = db_path
        self.connection = None
    async def execute(self, data: List[Dict], table_name: str = "weather_data") -> bool:
        """存储数据到数据库"""
        import sqlite3
        try:
            self.logger.info(f"开始存储 {len(data)} 条数据")
            # 连接数据库
            self.connection = sqlite3.connect(self.db_path)
            cursor = self.connection.cursor()
            # 创建表(如果不存在)
            create_table_sql = f"""
            CREATE TABLE IF NOT EXISTS {table_name} (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                province_name TEXT,
                city_name TEXT,
                collection_date TEXT,
                temperature TEXT,
                weather TEXT,
                created_time DATETIME DEFAULT CURRENT_TIMESTAMP
            )
            """
            cursor.execute(create_table_sql)
            # 插入数据
            insert_sql = f"""
            INSERT INTO {table_name} (province_name, city_name, collection_date, temperature, weather)
            VALUES (?, ?, ?, ?, ?)
            """
            success_count = 0
            for record in data:
                try:
                    cursor.execute(insert_sql, (
                        record.get('province_name', ''),
                        record.get('city_name', ''),
                        record.get('date', ''),
                        record.get('temperature', ''),
                        record.get('weather', '')
                    ))
                    success_count += 1
                except Exception as e:
                    self.logger.warning(f"插入数据失败: {e}")
            self.connection.commit()
            self.logger.info(f"数据存储完成,成功 {success_count}/{len(data)} 条")
            return True
        except Exception as e:
            self.logger.error(f"数据存储失败: {e}")
            if self.connection:
                self.connection.rollback()
            return False
        finally:
            if self.connection:
                self.connection.close()

存储特性:

  • 支持多种数据库后端
  • 事务管理和回滚机制
  • 批量插入性能优化
  • 数据完整性验证

第四部分:高级特性实现

4.1 技能工厂模式

class AdvancedSkillFeatures:
    """高级技能特性示例"""
    @staticmethod
    def skill_factory(skill_type: str, **kwargs) -> BaseSkill:
        """技能工厂方法"""
        skill_map = {
            "browser": BrowserManagerSkill,
            "navigation": PageNavigationSkill,
            "extraction": DataExtractionSkill,
            "storage": DataStorageSkill
        }
        if skill_type not in skill_map:
            raise ValueError(f"未知的技能类型: {skill_type}")
        return skill_map[skill_type](**kwargs)

工厂模式优势:

  • 统一的对象创建接口
  • 支持动态技能类型扩展
  • 便于依赖注入和配置管理

4.2 配置驱动架构

@staticmethod
def create_skill_pipeline(skills_config: List[Dict]) -> SkillOrchestrator:
    """创建技能管道"""
    orchestrator = SkillOrchestrator()
    for config in skills_config:
        skill = AdvancedSkillFeatures.skill_factory(
            config['type'], 
            **config.get('params', {})
        )
        orchestrator.register_skill(config['name'], skill)
    return orchestrator

配置驱动优势:

  • 系统行为可配置化
  • 支持热更新和动态调整
  • 降低代码耦合度

4.3 依赖注入机制

# 注入页面依赖示例
browser_skill = orchestrator.skills["browser"]
browser, context, page = await browser_skill.execute()
# 更新需要页面的技能
orchestrator.skills["navigator"].page = page
orchestrator.skills["extractor"].page = page

依赖注入价值:

  • 解耦技能间的依赖关系
  • 支持灵活的组件替换
  • 便于单元测试和模拟

第五部分:实际应用示例

5.1 基本使用模式

async def main():
    """Skills架构使用示例"""
    # 创建技能编排器
    orchestrator = SkillOrchestrator()
    # 创建并注册技能
    browser_skill = BrowserManagerSkill(headless=True)
    orchestrator.register_skill("browser_manager", browser_skill)
    # 启动浏览器
    browser, context, page = await browser_skill.execute()
    # 注册其他需要页面的技能
    navigation_skill = PageNavigationSkill(page)
    extraction_skill = DataExtractionSkill(page)
    storage_skill = DataStorageSkill()
    orchestrator.register_skill("page_navigation", navigation_skill)
    orchestrator.register_skill("data_extraction", extraction_skill)
    orchestrator.register_skill("data_storage", storage_skill)
    # 定义工作流
    workflow = [
        {
            "skill": "page_navigation",
            "args": {
                "url": "https://example.com/weather",
                "timeout": 30000
            }
        },
        {
            "skill": "data_extraction",
            "args": {
                "extraction_config": {
                    "table_selector": ".weather-table",
                    "column_mapping": {
                        "date": 0,
                        "temperature": 1,
                        "weather": 2
                    }
                }
            }
        },
        {
            "skill": "data_storage",
            "args": {
                "data": [],  # 这里应该填充实际提取的数据
                "table_name": "weather_records"
            }
        }
    ]
    # 执行工作流
    results = await orchestrator.execute_workflow(workflow)
    # 清理资源
    await browser_skill.cleanup()
    return results

5.2 配置驱动示例

# 技能配置
skills_config = [
    {
        "name": "browser",
        "type": "browser",
        "params": {"headless": True}
    },
    {
        "name": "navigator", 
        "type": "navigation",
        "params": {}
    },
    {
        "name": "extractor",
        "type": "extraction", 
        "params": {}
    },
    {
        "name": "storage",
        "type": "storage",
        "params": {"db_path": "config_driven.db"}
    }
]
# 创建技能管道
orchestrator = AdvancedSkillFeatures.create_skill_pipeline(skills_config)

第六部分:最佳实践和优化策略

6.1 错误处理策略

分级错误处理:

# 技能级别错误处理
async def execute_with_retry(self, max_retries=3):
    for attempt in range(max_retries):
        try:
            return await self.execute()
        except TemporaryError as e:
            if attempt == max_retries - 1:
                raise
            await asyncio.sleep(2 ** attempt)
        except PermanentError as e:
            self.logger.error(f"永久性错误: {e}")
            raise

编排器级别错误处理:

  • 单个技能失败不影响整体流程
  • 支持错误恢复和重试机制
  • 详细的错误日志和监控

6.3 监控和调试

日志系统:

  • 结构化日志记录
  • 性能指标收集
  • 错误追踪和报告

调试工具:

  • 技能执行状态监控
  • 工作流可视化
  • 性能分析工具集成

结论

Python + Skills架构通过模块化、可组合的设计理念,为构建复杂系统提供了强大的技术基础。这种架构模式的核心优势在于:

  1. 开发效率:通过技能复用和组合,大幅提升开发速度
  2. 系统维护:模块化设计使得系统维护和升级更加容易
  3. 业务适应:灵活的配置机制支持快速响应业务变化
  4. 技术扩展:良好的扩展性支持系统持续演进

在实际应用中,Skills架构已被证明在数据采集、自动化测试、业务流程自动化等多个领域具有显著优势。随着技术的不断发展,这种架构模式将继续演进,为构建更加智能、高效的软件系统提供支持。

本文基于实际项目经验编写,所有代码示例均经过实践验证。读者可以根据具体需求调整实现细节,并结合实际场景进行优化和改进。

到此这篇关于Python + Skills 架构实现从理论到实践的文章就介绍到这了,更多相关Python Skills 架构内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Python flask路由间传递变量实例详解

    Python flask路由间传递变量实例详解

    这篇文章主要介绍了Python flask路由间传递变量实例详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-06-06
  • Python3 xml.etree.ElementTree支持的XPath语法详解

    Python3 xml.etree.ElementTree支持的XPath语法详解

    这篇文章主要介绍了Python3 xml.etree.ElementTree支持的XPath语法详解,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2020-03-03
  • python中的字符串切割 maxsplit

    python中的字符串切割 maxsplit

    这篇文章主要介绍了python中的字符串切割 maxsplit,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-12-12
  • Python3.6+Django2.0以上 xadmin站点的配置和使用教程图解

    Python3.6+Django2.0以上 xadmin站点的配置和使用教程图解

    django自带的admin站点虽然功能强大,但是界面不是很好看。这篇文章主要介绍了Python3.6+Django2.0以上 xadmin站点的配置和使用 ,本文图文并茂给大家介绍的非常详细,具有一定的参考借鉴价值,需要的朋友可以参考下
    2019-06-06
  • Python 简单计算要求形状面积的实例

    Python 简单计算要求形状面积的实例

    今天小编就为大家分享一篇Python 简单计算要求形状面积的实例,具有很好的参考价值
    2020-01-01
  • 基于python读取图像的几种方式汇总

    基于python读取图像的几种方式汇总

    Python进行图片处理,第一步就是读取图片,下面这篇文章主要给大家介绍了关于基于python读取图像的几种方式的相关资料,文中通过实例代码介绍的非常详细,需要的朋友可以参考下
    2022-06-06
  • 正确的使用Python临时文件

    正确的使用Python临时文件

    这篇文章主要介绍了正确的使用Python临时文件,帮助大家更好的理解和学习使用python,感兴趣的朋友可以了解下
    2021-03-03
  • 使用opencv-python如何打开USB或者笔记本前置摄像头

    使用opencv-python如何打开USB或者笔记本前置摄像头

    这篇文章主要介绍了使用opencv-python如何打开USB或者笔记本前置摄像头的过程,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-06-06
  • python在地图上画比例的实例详解

    python在地图上画比例的实例详解

    在本篇文章里小编给大家整理的是一篇关于如何用python在地图上画比例的相关实例内容,有兴趣的朋友们可以学习下。
    2020-11-11
  • 聊聊通过celery_one避免Celery定时任务重复执行的问题

    聊聊通过celery_one避免Celery定时任务重复执行的问题

    Celery Once 也是利用 Redis 加锁来实现, Celery Once 在 Task 类基础上实现了 QueueOnce 类,该类提供了任务去重的功能,今天通过本文给大家介绍通过celery_one避免Celery定时任务重复执行的问题,感兴趣的朋友一起看看吧
    2021-10-10

最新评论