Python实现自动化清理临时文件的全攻略

 更新时间:2026年03月02日 14:39:25   作者:hefeng_aspnet  
这篇文章主要为大家详细介绍了Python实现自动化清理临时文件的相关知识,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下

一、临时文件的智能识别系统

1. 文件指纹识别引擎

import os
import hashlib
import magic 
from pathlib import Path
from datetime import datetime, timedelta
from typing import Dict, List, Set, Optional
import mimetypes
import json

​​​​​​​class FileFingerprint:
    """文件指纹识别系统 - 智能判断文件类型和用途"""
    
    def __init__(self):
        self.temp_patterns = {
            'compilation': ['.o', '.obj', '.class', '.pyc', '.pyo'],
            'cache': ['.cache', '.tmp', '.swp', '.swo', '.swn'],
            'log': ['.log', '.out', '.err', '.trace'],
            'download': ['.part', '.crdownload', '.download'],
            'backup': ['~', '.bak', '.backup', '.old'],
            'ide': ['.idea/', '.vscode/', '.vs/', 'thumbs.db'],
            'build': ['node_modules/', '__pycache__/', 'dist/', 'build/']
        }
        
        self.mime = magic.Magic(mime=True)
        self.safe_extensions = {'.py', '.js', '.java', '.cpp', '.md', '.txt'}
        
    def analyze_file(self, filepath: Path) -> Dict:
        """深度分析文件特征"""
        stats = filepath.stat()
        
        fingerprint = {
            'path': str(filepath),
            'size': stats.st_size,
            'created': datetime.fromtimestamp(stats.st_ctime),
            'modified': datetime.fromtimestamp(stats.st_mtime),
            'accessed': datetime.fromtimestamp(stats.st_atime),
            'extension': filepath.suffix.lower(),
            'is_temp': False,
            'category': 'unknown',
            'risk_level': 'low',
            'content_hash': self._calculate_hash(filepath),
            'mime_type': self._detect_mime(filepath)
        }
        
        # 智能分类
        fingerprint.update(self._classify_file(filepath, fingerprint))
        return fingerprint
    
    def _classify_file(self, filepath: Path, fp: Dict) -> Dict:
        """智能文件分类"""
        result = {'category': 'other', 'is_temp': False}
        
        # 基于扩展名识别
        for category, patterns in self.temp_patterns.items():
            for pattern in patterns:
                if pattern.startswith('.') and fp['extension'] == pattern:
                    result.update({'category': category, 'is_temp': True})
                    return result
                elif pattern.endswith('/') and pattern in str(filepath):
                    result.update({'category': category, 'is_temp': True})
                    return result
                elif pattern in filepath.name:
                    result.update({'category': category, 'is_temp': True})
                    return result
        
        # 基于文件名模式识别
        filename = filepath.name.lower()
        temp_patterns = ['temp', 'tmp', 'cache', 'swap', 'dump']
        if any(pattern in filename for pattern in temp_patterns):
            result.update({'category': 'temp_pattern', 'is_temp': True})
        
        # 基于内容识别(大文件且长时间未访问)
        if (fp['size'] > 100 * 1024 * 1024 and  # 大于100MB
            datetime.now() - fp['accessed'] > timedelta(days=30)):
            result.update({'category': 'large_inactive', 'is_temp': True})
        
        return result
    
    def _calculate_hash(self, filepath: Path) -> str:
        """计算文件内容哈希(采样)"""
        try:
            if filepath.stat().st_size > 10 * 1024 * 1024:  # 大文件采样
                with open(filepath, 'rb') as f:
                    # 采样文件头、中、尾
                    f.seek(0)
                    head = f.read (4096)
                    f.seek(max(0, filepath.stat().st_size // 2 - 2048))
                    middle = f.read(4096)
                    f.seek(max(0, filepath.stat().st_size - 4096))
                    tail = f.read(4096)
                    data  = head + middle + tail
            else:
                data = filepath.read_bytes()
            
            return hashlib.md5(data).hexdigest()
        except:
            return 'error'
    
    def _detect_mime(self, filepath: Path) -> str:
        """检测文件真实类型"""
        try:
            return self.mime.from_file(str(filepath))
        except:
            return mimetypes.guess_type(str(filepath))[0] or 'unknown'

2. 目录扫描与监控系统

import psutil
import platform
from dataclasses import dataclass
from collections import defaultdict
import shutil

@dataclass
class ScanResult:
    total_size: int
    file_count: int
    temp_files: List[Dict]
    by_category: Dict[str, List[Dict]]
    disk_usage: Dict

​​​​​​​class TempFileScanner:
    """临时文件扫描器"""
    
    def __init__(self):
        self.fingerprint = FileFingerprint()
        
        # 系统特定的临时目录
        self.system_temp_dirs = self._get_system_temp_dirs()
        
    def _get_system_temp_dirs(self) -> List[Path]:
        """获取系统临时目录"""
        system = platform.system()
        temp_dirs = []
        
        # 系统临时目录
        if system == 'Windows':
            temp_dirs.extend([
                Path(os.environ.get('TEMP', 'C:\\Windows\\Temp')),
                Path(os.environ.get('TMP', 'C:\\Windows\\Temp')),
                Path('C:\\Users\\') / os.environ['USERNAME'] / 'AppData' / 'Local' / 'Temp'
            ])
        elif system == 'Linux' or system == 'Darwin':
            temp_dirs.extend([
                Path('/tmp'),
                Path('/var/tmp'),
                Path.home() / '.cache',
                Path.home() / '.tmp'
            ])
        
        # 开发工具临时目录
        dev_tools = [
            Path.home() / '.npm',  # npm缓存
            Path.home() / '.m2',   # Maven仓库
            Path.home() / '.gradle',  # Gradle缓存
            Path.home() / '.cache/pip',  # pip缓存
            Path.home() / '.cargo/registry',  # Rust缓存
            Path.home() / 'Library/Caches',  # macOS应用缓存
        ]
        
        temp_dirs.extend([d for d in dev_tools if d.exists()])
        return temp_dirs
    
    def scan_directory(self, directory: Path, recursive: bool = True) -> ScanResult:
        """扫描目录中的临时文件"""
        temp_files = []
        by_category = defaultdict(list)
        total_size = 0
        file_count = 0
        
        scan_method = directory.rglob if recursive else directory.glob
        
        for filepath in scan_method('*'):
            if filepath.is_file():
                try:
                    fp = self.fingerprint.analyze_file(filepath)
                    file_count += 1
                    total_size += fp['size']
                    
                    if fp['is_temp']:
                        temp_files.append(fp)
                        by_category[fp['category']].append(fp)
                        
                except (PermissionError, OSError):
                    continue
        
        # 获取磁盘使用情况
        disk_usage = self._get_disk_usage(directory)
        
        return ScanResult(
            total_size=total_size,
            file_count=file_count,
            temp_files=temp_files,
            by_category=dict(by_category),
            disk_usage=disk_usage
        )
    
    def _get_disk_usage(self, path: Path) -> Dict:
        """获取磁盘使用情况"""
        usage = psutil.disk_usage(str(path))
        return {
            'total': usage.total,
            'used': usage.used,
            'free': usage.free,
            'percent': usage.percent,
            'threshold': 85  # 警告阈值85%
        }
    
    def find_largest_temp_files(self, directory: Path, top_n: int = 20) -> List[Dict]:
        """查找最大的临时文件"""
        scanner = self.scan_directory(directory)
        
        # 按大小排序
        sorted_files = sorted(
            scanner.temp_files,
            key=lambda x: x['size'],
            reverse=True
        )
        
        return sorted_files[:top_n]

二、智能清理策略引擎

1. 基于规则的清理策略

from abc import ABC, abstractmethod
from typing import List, Tuple
import heapq

class CleanupStrategy(ABC):
    """清理策略抽象基类"""
    
    @abstractmethod
    def should_clean(self, file_info: Dict) -> bool:
        pass
    
    @abstractmethod
    def get_priority(self, file_info: Dict) -> int:
        pass

class AgeBasedStrategy(CleanupStrategy):
    """基于时间的清理策略"""
    
    def __init__(self, max_age_days: int = 7):
        self.max_age = timedelta(days=max_age_days)
    
    def should_clean(self, file_info: Dict) -> bool:
        age = datetime.now() - file_info['modified']
        return age > self.max_age
    
    def get_priority(self, file_info: Dict) -> int:
        age_hours = (datetime.now() - file_info['modified']).total_seconds() / 3600
        return int(age_hours)  # 越旧优先级越高

class SizeBasedStrategy(CleanupStrategy):
    """基于大小的清理策略"""
    
    def __init__(self, min_size_mb: int = 10):
        self.min_size = min_size_mb * 1024 * 1024
    
    def should_clean(self, file_info: Dict) -> bool:
        return file_info['size'] > self.min_size
    
    def get_priority(self, file_info: Dict) -> int:
        # 每100MB得1分
        return file_info['size'] // (100 * 1024 * 1024)

class AccessBasedStrategy(CleanupStrategy):
    """基于访问频率的清理策略"""
    
    def __init__(self, min_access_days: int = 30):
        self.min_access = timedelta(days=min_access_days)
    
    def should_clean(self, file_info: Dict) -> bool:
        last_access = datetime.now() - file_info['accessed']
        return last_access > self.min_access
    
    def get_priority(self, file_info: Dict) -> int:
        days_since_access = (datetime.now() - file_info['accessed']).days
        return days_since_access

​​​​​​​class CompositeStrategy(CleanupStrategy):
    """组合策略 - 加权评分"""
    
    def __init__(self):
        self.strategies = [
            (AgeBasedStrategy(max_age_days=7), 0.4),      # 40%权重
            (SizeBasedStrategy(min_size_mb=50), 0.3),     # 30%权重
            (AccessBasedStrategy(min_access_days=14), 0.3) # 30%权重
        ]
    
    def should_clean(self, file_info: Dict) -> bool:
        # 只要有一个策略认为应该清理,就返回True
        return any(strategy.should_clean(file_info) 
                  for strategy, _ in self.strategies)
    
    def get_priority(self, file_info: Dict) -> int:
        total_score = 0
        for strategy, weight in self.strategies:
            if strategy.should_clean(file_info):
                score = strategy.get_priority(file_info)
                total_score += int(score * weight * 100)
        return total_score

2. 智能清理管理器

class SmartCleanupManager:
    """智能清理管理器"""
    
    def __init__(self, strategy: CleanupStrategy = None):
        self.strategy = strategy or CompositeStrategy()
        self.scanner = TempFileScanner()
        self.cleaned_files = []
        self.backup_dir = Path.home() / '.temp_cleanup_backup'
        self.backup_dir.mkdir(exist_ok=True)
        
        # 清理历史记录
        self.history_file = self.backup_dir / 'cleanup_history.json'
        self.history = self._load_history()
    
    def _load_history(self) -> List:
        """加载清理历史"""
        if self.history_file.exists():
            with open(self.history_file, 'r') as f:
                return json.load(f)
        return []
    
    def _save_history(self):
        """保存清理历史"""
        with open(self.history_file, 'w') as f:
            json.dump(self.history[-1000:], f, indent=2, default=str)
    
    def analyze_and_clean(self, directory: Path, 
                         dry_run: bool = True,
                         max_size_to_free: int = 0) -> Dict:
        """分析并清理目录"""
        print(f"🔍 扫描目录: {directory}")
        
        # 扫描文件
        scan_result = self.scanner.scan_directory(directory)
        
        print(f"📊 扫描结果:")
        print(f"   总文件数: {scan_result.file_count}")
        print(f"   总大小: {self._format_size(scan_result.total_size)}")
        print(f"   临时文件数: {len(scan_result.temp_files)}")
        
        # 应用清理策略
        files_to_clean = []
        for file_info in scan_result.temp_files:
            if self.strategy.should_clean(file_info):
                priority = self.strategy.get_priority(file_info)
                files_to_clean.append((priority, file_info))
        
        # 按优先级排序
        files_to_clean.sort(reverse=True)
        
        # 如果指定了要释放的空间大小
        if max_size_to_free > 0:
            files_to_clean = self._select_files_to_free_space(
                files_to_clean, max_size_to_free
            )
        
        # 执行清理
        cleanup_result = self._execute_cleanup(
            [file_info for _, file_info in files_to_clean],
            dry_run
        )
        
        # 更新历史
        self.history.append({
            'timestamp': datetime.now().isoformat(),
            'directory': str(directory),
            'dry_run': dry_run,
            'result': cleanup_result
        })
        self._save_history()
        
        return cleanup_result
    
    def _select_files_to_free_space(self, files: List[Tuple], 
                                   target_size: int) -> List[Tuple]:
        """选择文件以释放目标空间大小"""
        selected = []
        freed_size = 0
        
        for priority, file_info in sorted(files, reverse=True):
            if freed_size >= target_size:
                break
            
            selected.append((priority, file_info))
            freed_size += file_info['size']
        
        return selected
    
    def _execute_cleanup(self, files: List[Dict], dry_run: bool) -> Dict:
        """执行清理操作"""
        total_freed = 0
        cleaned_count = 0
        errors = []
        
        print(f"\n{'🧪 模拟运行' if dry_run else '🧹 开始清理'}:")
        
        for file_info in files:
            filepath = Path(file_info['path'])
            
            try:
                if dry_run:
                    action = "将删除"
                else:
                    # 先备份
                    backup_path = self._backup_file(filepath)
                    
                    # 执行删除
                    if filepath.is_file():
                        filepath.unlink()
                    elif filepath.is_dir():
                        shutil.rmtree(filepath)
                
                size_mb = file_info['size'] / (1024 * 1024)
                total_freed += file_info['size']
                cleaned_count += 1
                
                status = "✅" if not dry_run else "📝"
                print(f"{status} {action} {filepath.name} ({size_mb:.1f}MB)")
                
            except Exception as e:
                errors.append(str(e))
                print(f"❌ 失败: {filepath.name} - {e}")
        
        result = {
            'total_freed': total_freed,
            'cleaned_count': cleaned_count,
            'error_count': len(errors),
            'errors': errors,
            'dry_run': dry_run
        }
        
        print(f"\n📈 清理总结:")
        print(f"   释放空间: {self._format_size(total_freed)}")
        print(f"   清理文件: {cleaned_count}个")
        if errors:
            print(f"   错误: {len(errors)}个")
        
        return result
    
    def _backup_file(self, filepath: Path) -> Path:
        """备份文件(安全措施)"""
        if not filepath.exists():
            return None
        
        # 生成备份路径
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        relative_path = filepath.relative_to(filepath.anchor)
        safe_name = str(relative_path).replace(os.sep, '_')
        backup_path = self.backup_dir / f"{timestamp}_{safe_name}"
        
        try:
            if filepath.is_file():
                shutil.copy2(filepath, backup_path)
            elif filepath.is_dir():
                shutil.copytree(filepath, backup_path)
        except:
            pass  # 备份失败也不阻止清理
        
        return backup_path
    
    def _format_size(self, size_bytes: int) -> str:
        """格式化文件大小"""
        for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
            if size_bytes < 1024.0:
                return f"{size_bytes:.2f}{unit}"
            size_bytes /= 1024.0
        return f"{size_bytes:.2f}PB"
    
    def restore_from_backup(self, backup_filename: str) -> bool:
        """从备份恢复文件"""
        backup_path = self.backup_dir / backup_filename
        if not backup_path.exists():
            return False
        
        # 从备份文件名解析原始路径
        # 实现恢复逻辑...
        return True

三、自动化监控与调度系统

1. 实时监控守护进程

import time
import threading
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import schedule

class TempFileMonitor(FileSystemEventHandler):
    """临时文件监控器"""
    
    def __init__(self, cleanup_manager: SmartCleanupManager):
        self.manager = cleanup_manager
        self.temp_extensions = {'.tmp', '.temp', '.cache', '.log'}
        self.recent_creations = {}
        
    def on_created(self, event):
        """监控新创建的文件"""
        if not event.is_directory:
            filepath = Path(event.src_path)
            if filepath.suffix in self.temp_extensions:
                self.recent_creations[filepath] = time.time()
                print(f"📁 检测到临时文件: {filepath.name}")
                
                # 如果文件超过1小时未修改,标记为可清理
                threading.Timer(3600, self._check_if_stale, args=[filepath]).start()
    
    def on_modified(self, event):
        """文件修改时更新访问时间"""
        if not event.is_directory:
            filepath = Path(event.src_path)
            if filepath in self.recent_creations:
                self.recent_creations[filepath] = time.time()
    
    def _check_if_stale(self, filepath: Path):
        """检查文件是否已过期"""
        if filepath in self.recent_creations:
            create_time = self.recent_creations[filepath]
            if time.time() - create_time > 3600:  # 1小时
                if filepath.exists():
                    print(f"⏰ 文件已过期: {filepath.name}")
                    # 自动清理
                    self.manager.analyze_and_clean(
                        filepath.parent,
                        dry_run=False,
                        max_size_to_free=0
                    )

​​​​​​​class AutomatedCleanupScheduler:
    """自动化清理调度器"""
    
    def __init__(self):
        self.manager = SmartCleanupManager()
        self.monitor = TempFileMonitor(self.manager)
        self.observer = Observer()
        
        # 监控的目录
        self.watch_dirs = [
            Path.home() / 'Downloads',
            Path.home() / 'Desktop',
            Path('/tmp') if platform.system() != 'Windows' else 
            Path(os.environ.get('TEMP', 'C:\\Windows\\Temp'))
        ]
    
    def start_monitoring(self):
        """启动文件监控"""
        for directory in self.watch_dirs:
            if directory.exists():
                self.observer.schedule(
                    self.monitor,
                    str(directory),
                    recursive=True
                )
                print(f"👀 开始监控: {directory}")
        
        self.observer.start()
        
        # 定时任务
        schedule.every().day.at("02:00").do(self._nightly_cleanup)
        schedule.every().hour.do(self._check_disk_usage)
        
        print("🚀 临时文件监控器已启动")
        
        try:
            while True:
                schedule.run_pending()
                time.sleep(60)
        except KeyboardInterrupt:
            self.observer.stop()
        self.observer.join()
    
    def _nightly_cleanup(self):
        """夜间自动清理"""
        print("🌙 执行夜间清理...")
        for directory in self.watch_dirs:
            if directory.exists():
                self.manager.analyze_and_clean(
                    directory,
                    dry_run=False,
                    max_size_to_free=1024 * 1024 * 1024  # 尝试释放1GB
                )
    
    def _check_disk_usage(self):
        """检查磁盘使用率"""
        for directory in self.watch_dirs:
            if directory.exists():
                usage = psutil.disk_usage(str(directory))
                if usage.percent > 85:  # 磁盘使用率超过85%
                    print(f"⚠️  磁盘空间不足: {directory} ({usage.percent}%)")
                    # 紧急清理
                    self.manager.analyze_and_clean(
                        directory,
                        dry_run=False,
                        max_size_to_free=1024 * 1024 * 1024 * 5  # 尝试释放5GB
                    )

2. 命令行工具集成

import argparse
import sys
from rich.console import Console
from rich.table import Table
from rich.progress import Progress

console = Console()

​​​​​​​def main():
    parser = argparse.ArgumentParser(
        description='智能临时文件清理工具',
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""

使用示例:

  %(prog)s scan ~/Downloads              # 扫描目录
  %(prog)s clean ~/Downloads --dry-run   # 模拟清理
  %(prog)s clean ~/Downloads --force     # 实际清理
  %(prog)s monitor                       # 启动监控守护进程
  %(prog)s stats                         # 显示统计信息
        """
    )
    
    subparsers = parser.add_subparsers(dest='command', help='命令')
    
    # scan 命令
    scan_parser = subparsers.add_parser('scan', help='扫描临时文件')
    scan_parser.add_argument('directory', help='要扫描的目录')
    scan_parser.add_argument('--recursive', '-r', action='store_true', 
                           help='递归扫描')
    scan_parser.add_argument('--top', type=int, default=20,
                           help='显示最大的N个文件')
    
    # clean 命令
    clean_parser = subparsers.add_parser('clean', help='清理临时文件')
    clean_parser.add_argument('directory', help='要清理的目录')
    clean_parser.add_argument('--dry-run', '-d', action='store_true',
                            help='模拟运行,不实际删除')
    clean_parser.add_argument('--force', '-f', action='store_true',
                            help='强制清理,无需确认')
    clean_parser.add_argument('--free-size', type=int,
                            help='要释放的空间大小(MB)')
    
    # monitor 命令
    subparsers.add_parser('monitor', help='启动监控守护进程')
    
    # stats 命令
    stats_parser = subparsers.add_parser('stats', help='显示统计信息')
    stats_parser.add_argument('--days', type=int, default=7,
                            help='显示最近N天的统计')
    
    args = parser.parse_args()
    
    if args.command == 'scan':
        run_scan(args)
    elif args.command == 'clean':
        run_clean(args)
    elif args.command == 'monitor':
        run_monitor(args)
    elif args.command == 'stats':
        run_stats(args)
    else:
        parser.print_help()

def run_scan(args):
    """执行扫描命令"""
    scanner = TempFileScanner()
    directory = Path(args.directory).expanduser()
    
    if not directory.exists():
        console.print(f"[red]目录不存在: {directory}[/red]")
        return
    
    console.print(f"[bold blue]扫描目录: {directory}[/bold blue]")
    
    with Progress() as progress:
        task = progress.add_task("[cyan]扫描中...", total=None)
        
        # 扫描文件
        result = scanner.scan_directory(directory, args.recursive)
        
        progress.update(task, completed=100)
    
    # 显示结果表格
    table = Table(title="临时文件分析结果")
    table.add_column("分类", style="cyan")
    table.add_column("文件数", justify="right")
    table.add_column("总大小", justify="right")
    table.add_column("占比", justify="right")
    
    for category, files in result.by_category.items():
        category_size = sum(f['size'] for f in files)
        percentage = (category_size / result.total_size * 100) if result.total_size > 0 else 0
        
        table.add_row(
            category,
            str(len(files)),
            scanner._format_size(category_size),
            f"{percentage:.1f}%"
        )
    
    console.print(table)
    
    # 显示最大的文件
    if args.top > 0:
        largest_files = scanner.find_largest_temp_files(directory, args.top)
        
        if largest_files:
            console.print(f"\n[bold yellow]最大的 {args.top} 个临时文件:[/bold yellow]")
            file_table = Table()
            file_table.add_column("文件名", style="green")
            file_table.add_column("大小", justify="right")
            file_table.add_column("修改时间", justify="right")
            file_table.add_column("分类", style="cyan")
            
            for file_info in largest_files:
                file_table.add_row(
                    Path(file_info['path']).name,
                    scanner._format_size(file_info['size']),
                    file_info['modified'].strftime('%Y-%m-%d %H:%M'),
                    file_info['category']
                )
            
            console.print(file_table)

def run_clean(args):
    """执行清理命令"""
    manager = SmartCleanupManager()
    directory = Path(args.directory).expanduser()
    
    if not directory.exists():
        console.print(f"[red]目录不存在: {directory}[/red]")
        return
    
    # 确认(除非使用--force)
    if not args.force and not args.dry_run:
        console.print(f"[bold yellow]警告: 将清理目录: {directory}[/bold yellow]")
        response = input("确定继续吗? (y/N): ")
        if response.lower() != 'y':
            console.print("[red]操作已取消[/red]")
            return
    
    console.print(f"[bold blue]开始清理: {directory}[/bold blue]")
    
    # 执行清理
    result = manager.analyze_and_clean(
        directory,
        dry_run=args.dry_run,
        max_size_to_free=(args.free_size * 1024 * 1024) if args.free_size else 0
    )
    
    if result['dry_run']:
        console.print(f"[yellow]模拟运行完成,可释放 {manager._format_size(result['total_freed'])}[/yellow]")
    else:
        console.print(f"[green]清理完成,已释放 {manager._format_size(result['total_freed'])}[/green]")

def run_monitor(args):
    """启动监控守护进程"""
    scheduler = AutomatedCleanupScheduler()
    console.print("[bold green]启动临时文件监控守护进程...[/bold green]")
    console.print("按 Ctrl+C 停止监控")
    scheduler.start_monitoring()

def run_stats(args):
    """显示统计信息"""
    manager = SmartCleanupManager()
    
    if manager.history:
        console.print("[bold blue]清理历史统计:[/bold blue]")
        
        table = Table()
        table.add_column("时间", style="cyan")
        table.add_column("目录")
        table.add_column("释放空间", justify="right")
        table.add_column("清理文件", justify="right")
        
        for record in manager.history[-args.days:]:
            table.add_row(
                record['timestamp'][:16],
                Path(record['directory']).name,
                manager._format_size(record['result']['total_freed']),
                str(record['result']['cleaned_count'])
            )
        
        console.print(table)
        
        # 计算总计
        total_freed = sum(r['result']['total_freed'] for r in manager.history)
        total_files = sum(r['result']['cleaned_count'] for r in manager.history)
        
        console.print(f"\n[bold green]总计:[/bold green]")
        console.print(f"  释放空间: {manager._format_size(total_freed)}")
        console.print(f"  清理文件: {total_files}个")
    else:
        console.print("[yellow]暂无清理历史记录[/yellow]")

if __name__ == '__main__':
    main()

四、使用示例与最佳实践

1. 基本使用示例

from pathlib import Path

# 创建清理管理器
manager = SmartCleanupManager()

# 扫描Downloads目录
downloads = Path.home() / 'Downloads'
result = manager.scan_directory(downloads)

print(f"找到 {len(result.temp_files)} 个临时文件")
print(f"总大小: {manager._format_size(result.total_size)}")

2. 智能清理配置

# 自定义策略:清理超过100MB且7天未访问的文件
custom_strategy = CompositeStrategy()
manager = SmartCleanupManager(custom_strategy)

3. 自动化监控

# 创建监控调度器
scheduler = AutomatedCleanupScheduler()

​​​​​​​# 添加自定义监控目录
scheduler.watch_dirs.append(Path.home() / 'Projects' / 'builds')

4. 安全备份与恢复

#清理前自动备份
manager.backup_dir = Path.home() / '.safe_cleanup_backups'

# 查看可恢复的备份
backup_files = list(manager.backup_dir.glob('*.backup'))
for backup in backup_files[:5]:
    print(f"备份: {backup.name}")

5. 集成到开发工作流

# 在构建脚本中添加清理
def build_project():
    # 构建前清理临时文件
    cleanup_tool = SmartCleanupManager()
    cleanup_tool.analyze_and_clean(
        Path('build'),
        dry_run=False
    )
    
    # 执行构建
    # ... 构建代码
    
    # 构建后清理
    cleanup_tool.analyze_and_clean(
        Path('dist'),
        dry_run=False
    )

五、安全注意事项

class SafeCleanupValidator:
    """安全验证器 - 防止误删重要文件"""
    
    SAFE_PATTERNS = {
        'git': ['.git/', '.gitignore', '.gitmodules'],
        'config': ['.env', 'config.', 'settings.', 'secret'],
        'database': ['.db', '.sqlite', '.mdb'],
        'project': ['package.json', 'requirements.txt', 'pom.xml']
    }
    
    def __init__(self):
        self.whitelist = self._load_whitelist()
    
    def _load_whitelist(self):
        """加载白名单"""
        whitelist_file = Path.home() / '.cleanup_whitelist.txt'
        if whitelist_file.exists():
            return set(whitelist_file.read_text().splitlines())
        return set()
    
    def is_safe_to_delete(self, filepath: Path) -> bool:
        """检查文件是否可以安全删除"""
        # 检查白名单
        if str(filepath) in self.whitelist:
            return False
        
        # 检查安全模式
        for category, patterns in self.SAFE_PATTERNS.items():
            for pattern in patterns:
                if pattern in str(filepath):
                    return False
        
        # 检查文件内容(简单启发式)
        try:
            if filepath.stat().st_size < 1024:  # 小文件
                content = filepath.read_text()[:500]
                dangerous_keywords = ['password', 'secret', 'key', 'token']
                if any(keyword in content.lower() for keyword in dangerous_keywords):
                    return False
        except:
            pass
        
        return True
    
    def add_to_whitelist(self, filepath: Path):
        """添加文件到白名单"""
        self.whitelist.add(str(filepath))
        self._save_whitelist()
    
    def _save_whitelist(self):
        """保存白名单"""
        whitelist_file = Path.home() / '.cleanup_whitelist.txt'
        whitelist_file.write_text('\n'.join(sorted(self.whitelist)))

这个完整的临时文件管理工具提供了:

  • 智能识别 - 准确识别临时文件
  • 安全清理 - 多重验证防止误删
  • 自动化监控 - 实时监控和定时清理
  • 可视化报告 - 清晰的统计信息
  • 备份恢复 - 安全网机制

以上就是Python实现自动化清理临时文件的全攻略的详细内容,更多关于Python清理文件的资料请关注脚本之家其它相关文章!

相关文章

  • 详解python的循环

    详解python的循环

    这篇文章主要为大家介绍了python的循环,具有一定的参考价值,感兴趣的小伙伴们可以参考一下,希望能够给你带来帮助
    2021-12-12
  • 如何在Python中编写并发程序

    如何在Python中编写并发程序

    让计算机程序并发的运行是一个经常被讨论的话题,今天我想讨论一下Python下的各种并发方式。
    2016-02-02
  • ubuntu 18.04 安装opencv3.4.5的教程(图解)

    ubuntu 18.04 安装opencv3.4.5的教程(图解)

    这篇文章主要介绍了ubuntu 18.04 安装opencv3.4.5的教程,本文图文并茂给大家介绍的非常详细,具有一定的参考借鉴价值,需要的朋友可以参考下
    2019-11-11
  • python中wx将图标显示在右下角的脚本代码

    python中wx将图标显示在右下角的脚本代码

    python中wx将图标显示在右下脚的代码,此程序摘自wxdemo,不够完善,只供参考用
    2013-03-03
  • python中使用xlrd、xlwt操作excel表格详解

    python中使用xlrd、xlwt操作excel表格详解

    这篇文章主要介绍了python中使用xlrd、xlwt操作excel表格详解,python操作excel主要用到xlrd和xlwt这两个库,即xlrd是读excel,xlwt是写excel的库,需要的朋友可以参考下
    2015-01-01
  • 总结Python编程中函数的使用要点

    总结Python编程中函数的使用要点

    这篇文章主要介绍了Python编程中函数的使用要点总结,文中也讲到了人民群众喜闻乐见的lambda表达式的用法,需要的朋友可以参考下
    2016-03-03
  • python IP地址转整数

    python IP地址转整数

    这篇文章主要介绍了python 如何将IP 地址转整数,帮助大家了解转换的原理与收益,更好的理解python,感兴趣的朋友可以了解下
    2020-11-11
  • 在Mac下使用python实现简单的目录树展示方法

    在Mac下使用python实现简单的目录树展示方法

    今天小编就为大家分享一篇在Mac下使用python实现简单的目录树展示方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2018-11-11
  • 我们为什么要减少Python中循环的使用

    我们为什么要减少Python中循环的使用

    这篇文章主要介绍了我们为什么要减少Python中循环的使用,我将阐述 Python 提供的一些简单但是非常有用的结构,一些小技巧以及一些我在数据科学工作中遇到的案例。我将讨论 Python 中的 for 循环,以及如何尽量避免使用它们,需要的朋友可以参考下
    2019-07-07
  • python基于机器学习预测股票交易信号

    python基于机器学习预测股票交易信号

    近年来,随着技术的发展,机器学习和深度学习在金融资产量化研究上的应用越来越广泛和深入。目前,大量数据科学家在Kaggle网站上发布了使用机器学习/深度学习模型对股票、期货、比特币等金融资产做预测和分析的文章。本文就来看看如何用python预测股票交易信号
    2021-05-05

最新评论