Python实现智能磁盘监控并自动清理垃圾文件

 更新时间:2026年05月07日 08:22:45   作者:红魔Y  
磁盘空间不足是IT运维最高频的问题之一,本文我们将为大家详细介绍如何通过 Python 实现磁盘空间监控,大文件扫描,重复文件检测和自动清理功能,希望对大家有所帮助

"C 盘满了"——这句话大概是 IT 运维中听到最多的求助了。与其每次手动清理,不如让 Python 自动搞定。

磁盘空间问题的真相

企业环境中磁盘空间不足的常见原因:

  1. 日志文件膨胀:IIS 日志、应用日志、数据库日志没人清理
  2. 临时文件堆积:Windows 临时目录、浏览器缓存、安装残留
  3. 重复文件:同一个文件被复制了 N 份到不同位置
  4. 大文件遗忘:测试时拷入的 ISO、数据库备份、录像文件
  5. 回收站和缩略图缓存:用户删了文件但没清回收站

手动排查?右键→属性→看看哪个盘满了→再一层层找……效率极低。

基础篇:磁盘空间全景扫描

import os
import shutil
from pathlib import Path
from collections import defaultdict

def get_disk_usage():
    """获取所有磁盘分区的使用情况"""
    print("=" * 60)
    print(f"{'盘符':<8}{'总容量':<12}{'已用':<12}{'可用':<12}{'使用率':<8}")
    print("=" * 60)

    results = []
    for partition in os.popen("wmic logicaldisk get caption,size,freespace").readlines()[1:]:
        parts = partition.strip().split()
        if len(parts) < 3:
            continue

        drive = parts[0]
        total = int(parts[1])  # 字节
        free = int(parts[2])
        used = total - free
        usage_percent = (used / total * 100) if total > 0 else 0

        # 格式化显示
        total_gb = total / 1024**3
        used_gb = used / 1024**3
        free_gb = free / 1024**3

        # 根据使用率选择显示标记
        if usage_percent >= 90:
            icon = "🔴"
        elif usage_percent >= 80:
            icon = "🟡"
        else:
            icon = "🟢"

        print(f"{icon} {drive:<6}{total_gb:>8.1f} GB{used_gb:>8.1f} GB"
              f"{free_gb:>8.1f} GB{usage_percent:>6.1f}%")

        results.append({
            "drive": drive,
            "total_gb": total_gb,
            "used_gb": used_gb,
            "free_gb": free_gb,
            "usage_percent": usage_percent,
        })

    return results


# 使用
# disks = get_disk_usage()

实战一:大文件扫描器

快速找出占用空间最多的文件:

def scan_large_files(
    directory,
    min_size_mb=100,
    top_n=20,
    exclude_dirs=None
):
    """
    扫描大文件
    directory: 扫描目录
    min_size_mb: 最小文件大小(MB)
    top_n: 返回前 N 个
    exclude_dirs: 排除的目录列表
    """
    if exclude_dirs is None:
        exclude_dirs = [
            r"C:\Windows", r"C:\Program Files",
            r"C:\Program Files (x86)",
        ]

    large_files = []
    min_size_bytes = min_size_mb * 1024 * 1024

    print(f"正在扫描 {directory}(最小 {min_size_mb} MB)...")

    for root, dirs, files in os.walk(directory):
        # 排除目录
        dirs[:] = [
            d for d in dirs
            if not any(
                os.path.normpath(os.path.join(root, d)).startswith(
                    os.path.normpath(excl)
                )
                for excl in exclude_dirs
            )
        ]

        for file in files:
            try:
                filepath = os.path.join(root, file)
                size = os.path.getsize(filepath)
                if size >= min_size_bytes:
                    mtime = os.path.getmtime(filepath)
                    large_files.append({
                        "path": filepath,
                        "size_mb": round(size / 1024**2, 1),
                        "size_bytes": size,
                        "modified": mtime,
                    })
            except (PermissionError, OSError):
                continue

    # 按大小排序
    large_files.sort(key=lambda x: x["size_bytes"], reverse=True)

    print(f"\n发现 {len(large_files)} 个大文件,TOP {top_n}:\n")

    for f in large_files[:top_n]:
        from datetime import datetime
        mod_time = datetime.fromtimestamp(f["modified"]).strftime("%Y-%m-%d")
        print(f"  {f['size_mb']:>8.1f} MB  {mod_time}  {f['path']}")

    return large_files


# 使用:扫描 D 盘超过 500MB 的文件
# large_files = scan_large_files(r"D:\", min_size_mb=500, top_n=30)

实战二:目录大小分析

找出哪些目录占用了最多空间:

def get_directory_sizes(
    root_dir,
    max_depth=2,
    top_n=15,
    exclude_dirs=None
):
    """
    分析目录大小
    max_depth: 递归深度
    """
    if exclude_dirs is None:
        exclude_dirs = []

    dir_sizes = defaultdict(int)

    print(f"正在分析 {root_dir} 的目录大小...")

    for root, dirs, files in os.walk(root_dir):
        # 计算当前深度
        rel_path = os.path.relpath(root, root_dir)
        depth = rel_path.count(os.sep) + (1 if rel_path != "." else 0)

        if depth > max_depth:
            dirs.clear()  # 不再深入
            continue

        # 排除目录
        dirs[:] = [
            d for d in dirs
            if not any(os.path.join(root, d).startswith(excl) for excl in exclude_dirs)
        ]

        # 统计当前目录下文件大小
        current_size = 0
        for file in files:
            try:
                current_size += os.path.getsize(os.path.join(root, file))
            except (PermissionError, OSError):
                pass

        # 累加到各级父目录
        if depth <= max_depth:
            dir_sizes[root] += current_size

    # 排序
    sorted_dirs = sorted(dir_sizes.items(), key=lambda x: x[1], reverse=True)

    print(f"\nTOP {top_n} 目录:\n")
    total = sum(s for _, s in sorted_dirs)

    for path, size in sorted_dirs[:top_n]:
        size_gb = size / 1024**3
        percent = (size / total * 100) if total > 0 else 0
        # 进度条可视化
        bar_len = 30
        filled = int(bar_len * percent / 100)
        bar = "█" * filled + "░" * (bar_len - filled)
        print(f"  {size_gb:>8.2f} GB  {bar} {percent:>5.1f}%")
        print(f"             {path}")

    return sorted_dirs


# 使用
# dirs = get_directory_sizes(r"C:\Users", max_depth=2, exclude_dirs=[r"C:\Users\Default"])

实战三:重复文件检测

清理重复文件可以释放大量空间:

import hashlib

def get_file_hash(filepath, block_size=65536):
    """计算文件的 MD5 哈希"""
    hasher = hashlib.md5()
    try:
        with open(filepath, "rb") as f:
            for block in iter(lambda: f.read(block_size), b""):
                hasher.update(block)
        return hasher.hexdigest()
    except (PermissionError, OSError):
        return None


def find_duplicate_files(directory, min_size_kb=100):
    """
    找出重复文件(基于文件大小+MD5双重校验)
    min_size_kb: 只检测大于此大小的文件
    """
    # 第一步:按文件大小分组
    print("第一步:按文件大小分组...")
    size_groups = defaultdict(list)

    for root, dirs, files in os.walk(directory):
        for file in files:
            try:
                filepath = os.path.join(root, file)
                size = os.path.getsize(filepath)
                if size >= min_size_kb * 1024:
                    size_groups[size].append(filepath)
            except (PermissionError, OSError):
                continue

    # 只保留有多个文件的组
    potential_duplicates = {
        size: files for size, files in size_groups.items()
        if len(files) >= 2
    }

    print(f"发现 {len(potential_duplicates)} 组大小相同的文件")

    # 第二步:对大小相同的文件计算 MD5
    print("第二步:计算 MD5 确认重复...")
    duplicates = {}  # hash -> [filepath, ...]

    total_to_check = sum(len(files) for files in potential_duplicates.values())
    checked = 0

    for size, files in potential_duplicates.items():
        for filepath in files:
            file_hash = get_file_hash(filepath)
            checked += 1

            if file_hash:
                if file_hash not in duplicates:
                    duplicates[file_hash] = []
                duplicates[file_hash].append(filepath)

    # 只保留真正重复的
    true_duplicates = {
        h: files for h, files in duplicates.items()
        if len(files) >= 2
    }

    # 统计可释放空间
    total_wasted = 0
    for h, files in true_duplicates.items():
        # 保留一个,其余都是浪费
        size = os.path.getsize(files[0])
        total_wasted += size * (len(files) - 1)

    print(f"\n发现 {len(true_duplicates)} 组重复文件")
    print(f"可释放空间: {total_wasted / 1024**3:.2f} GB\n")

    # 按浪费空间排序
    dup_sorted = sorted(
        true_duplicates.items(),
        key=lambda x: os.path.getsize(x[1][0]) * (len(x[1]) - 1),
        reverse=True
    )

    for h, files in dup_sorted[:10]:
        size = os.path.getsize(files[0])
        wasted = size * (len(files) - 1)
        print(f"  重复 {len(files)} 份, 各 {size/1024**2:.1f} MB, "
              f"浪费 {wasted/1024**2:.1f} MB")
        for f in files:
            print(f"    {f}")
        print()

    return true_duplicates


def cleanup_duplicates(duplicate_map, dry_run=True):
    """
    清理重复文件(保留每组中的第一个)
    dry_run: True=只报告不删除
    """
    total_freed = 0

    for h, files in duplicate_map.items():
        # 保留第一个,删除其余
        keep = files[0]
        remove = files[1:]

        for filepath in remove:
            if dry_run:
                size = os.path.getsize(filepath)
                total_freed += size
                print(f"  [模拟删除] {filepath} ({size/1024**2:.1f} MB)")
            else:
                try:
                    size = os.path.getsize(filepath)
                    os.remove(filepath)
                    total_freed += size
                    print(f"  [已删除] {filepath} ({size/1024**2:.1f} MB)")
                except Exception as e:
                    print(f"  [失败] {filepath}: {e}")

    action = "可释放" if dry_run else "已释放"
    print(f"\n{action}空间: {total_freed / 1024**3:.2f} GB")

    if dry_run:
        print("\n⚠️ 这是模拟运行,没有实际删除文件。")
        print("确认无误后,设置 dry_run=False 重新运行。")

    return total_freed


# 使用
# dups = find_duplicate_files(r"D:\Data", min_size_kb=1024)
# cleanup_duplicates(dups, dry_run=True)  # 先模拟
# cleanup_duplicates(dups, dry_run=False)  # 确认后删除

实战四:智能清理策略

不同类型的垃圾文件需要不同的清理策略:

from datetime import datetime, timedelta
import re

class DiskCleaner:
    """智能磁盘清理器"""

    # 常见可清理的目录和模式
    CLEANUP_TARGETS = {
        "Windows 临时文件": r"C:\Windows\Temp",
        "用户临时文件": os.path.join(os.environ.get("TEMP", ""), ""),
        "缩略图缓存": os.path.join(
            os.environ.get("LOCALAPPDATA", ""), "Microsoft", "Windows", "Explorer"
        ),
        "Windows 更新缓存": r"C:\Windows\SoftwareDistribution\Download",
        "Prefetch 缓存": r"C:\Windows\Prefetch",
    }

    # 可清理的文件模式
    CLEANUP_PATTERNS = {
        "日志文件": [r"\.log$"],
        "临时文件": [r"\.tmp$", r"\.temp$", r"~\$"],
        "缓存文件": [r"\.cache$", r"\.bak$"],
        "缩略图数据库": [r"thumbcache_.*\.db$"],
    }

    # 可清理的目录模式
    CLEANUP_DIR_PATTERNS = {
        "node_modules": r"node_modules$",
        "__pycache__": r"__pycache__$",
        ".pytest_cache": r"\.pytest_cache$",
        "pip cache": r"pip[\\/]cache$",
    }

    def __init__(self, dry_run=True):
        self.dry_run = dry_run
        self.report = defaultdict(lambda: {"count": 0, "size": 0})

    def clean_temp_directories(self):
        """清理系统临时目录"""
        print("\n=== 清理临时目录 ===")

        for name, path in self.CLEANUP_TARGETS.items():
            if not os.path.exists(path):
                continue

            total_size = 0
            file_count = 0

            for item in os.listdir(path):
                item_path = os.path.join(path, item)
                try:
                    if os.path.isfile(item_path):
                        size = os.path.getsize(item_path)
                        total_size += size
                        file_count += 1
                        if not self.dry_run:
                            os.remove(item_path)
                    elif os.path.isdir(item_path):
                        size = sum(
                            os.path.getsize(os.path.join(dp, f))
                            for dp, dn, fn in os.walk(item_path)
                            for f in fn
                        )
                        total_size += size
                        file_count += 1
                        if not self.dry_run:
                            shutil.rmtree(item_path)
                except (PermissionError, OSError):
                    pass

            self.report[name]["count"] = file_count
            self.report[name]["size"] = total_size

            action = "可清理" if self.dry_run else "已清理"
            if file_count > 0:
                print(f"  {name}: {action} {file_count} 项 "
                      f"({total_size/1024/1024:.1f} MB)")

    def clean_old_logs(self, days=30, log_dirs=None):
        """清理 N 天前的旧日志"""
        print(f"\n=== 清理 {days} 天前的日志 ===")

        if log_dirs is None:
            log_dirs = [
                r"C:\inetpub\logs",
                r"C:\Windows\System32\winevt\Logs",
            ]

        cutoff = time.time() - (days * 86400)
        total_cleaned = 0

        for log_dir in log_dirs:
            if not os.path.exists(log_dir):
                continue

            for root, dirs, files in os.walk(log_dir):
                for file in files:
                    if not file.endswith(".log"):
                        continue

                    filepath = os.path.join(root, file)
                    try:
                        mtime = os.path.getmtime(filepath)
                        if mtime < cutoff:
                            size = os.path.getsize(filepath)
                            total_cleaned += size

                            if self.dry_run:
                                from datetime import datetime as dt
                                mod_date = dt.fromtimestamp(mtime).strftime("%Y-%m-%d")
                                print(f"  [模拟] {filepath} ({size/1024/1024:.1f} MB, {mod_date})")
                            else:
                                os.remove(filepath)
                    except (PermissionError, OSError):
                        pass

        action = "可释放" if self.dry_run else "已释放"
        print(f"\n日志清理: {action} {total_cleaned/1024/1024:.1f} MB")

    def clean_recycle_bin(self):
        """清空回收站"""
        import ctypes

        print("\n=== 清空回收站 ===")

        if self.dry_run:
            # 估算回收站大小
            print("  [模拟] 清空回收站")
            return

        try:
            # Windows API 清空回收站
            result = ctypes.windll.shell32.SHEmptyRecycleBinW(None, None, 7)
            if result == 0:
                print("  回收站已清空")
            else:
                print(f"  清空回收站返回: {result}")
        except Exception as e:
            print(f"  清空回收站失败: {e}")

    def run_full_cleanup(self):
        """执行完整清理"""
        mode = "模拟" if self.dry_run else "实际"
        print(f"{'=' * 60}")
        print(f"  磁盘清理 ({mode}模式)")
        print(f"{'=' * 60}")

        self.clean_temp_directories()
        self.clean_old_logs(days=30)
        self.clean_recycle_bin()

        # 汇总
        total_size = sum(item["size"] for item in self.report.values())
        print(f"\n{'=' * 60}")
        action = "总计可释放" if self.dry_run else "总计已释放"
        print(f"  {action}: {total_size / 1024 / 1024:.1f} MB")
        print(f"{'=' * 60}")

        if self.dry_run:
            print("\n确认无误后,使用 DiskCleaner(dry_run=False) 执行实际清理")

        return total_size


import time

# 使用示例
# cleaner = DiskCleaner(dry_run=True)   # 先模拟
# cleaner.run_full_cleanup()
# cleaner = DiskCleaner(dry_run=False)   # 确认后执行
# cleaner.run_full_cleanup()

实战五:磁盘空间监控与告警

import json
from datetime import datetime, timedelta
from pathlib import Path

class DiskMonitor:
    """磁盘空间持续监控器"""

    def __init__(self, data_file="disk_history.json"):
        self.data_file = Path(data_file)
        self.history = self._load_history()
        self.threshold_warning = 80  # 警告阈值 %
        self.threshold_critical = 95  # 严重阈值 %

    def _load_history(self):
        """加载历史数据"""
        if self.data_file.exists():
            with open(self.data_file, "r") as f:
                return json.load(f)
        return []

    def _save_history(self):
        """保存历史数据"""
        with open(self.data_file, "w") as f:
            json.dump(self.history, f, ensure_ascii=False, indent=2)

    def record_snapshot(self):
        """记录当前磁盘快照"""
        snapshot = {
            "timestamp": datetime.now().isoformat(),
            "drives": [],
        }

        for partition in os.popen(
            "wmic logicaldisk get caption,size,freespace"
        ).readlines()[1:]:
            parts = partition.strip().split()
            if len(parts) < 3:
                continue

            total = int(parts[1])
            free = int(parts[2])
            usage = ((total - free) / total * 100) if total > 0 else 0

            snapshot["drives"].append({
                "drive": parts[0],
                "total_gb": round(total / 1024**3, 2),
                "free_gb": round(free / 1024**3, 2),
                "usage_percent": round(usage, 1),
            })

        self.history.append(snapshot)

        # 只保留最近 90 天的数据
        cutoff = (datetime.now() - timedelta(days=90)).isoformat()
        self.history = [
            h for h in self.history
            if h["timestamp"] >= cutoff
        ]
        self._save_history()

        return snapshot

    def check_alerts(self, snapshot=None):
        """检查是否需要告警"""
        if not snapshot:
            snapshot = self.record_snapshot()

        alerts = []

        for drive in snapshot["drives"]:
            usage = drive["usage_percent"]
            free_gb = drive["free_gb"]

            if usage >= self.threshold_critical:
                alerts.append({
                    "severity": "CRITICAL",
                    "drive": drive["drive"],
                    "message": (
                        f"{drive['drive']} 磁盘空间严重不足!"
                        f"已用 {usage}%,仅剩 {free_gb:.1f} GB"
                    ),
                })
            elif usage >= self.threshold_warning:
                alerts.append({
                    "severity": "WARNING",
                    "drive": drive["drive"],
                    "message": (
                        f"{drive['drive']} 磁盘空间不足警告:"
                        f"已用 {usage}%,剩余 {free_gb:.1f} GB"
                    ),
                })

        return alerts

    def analyze_trend(self, drive_letter, days=30):
        """分析磁盘使用趋势"""
        cutoff = (datetime.now() - timedelta(days=days)).isoformat()
        recent = [
            h for h in self.history
            if h["timestamp"] >= cutoff
        ]

        # 提取目标盘符的数据点
        data_points = []
        for h in recent:
            for d in h["drives"]:
                if d["drive"] == drive_letter:
                    data_points.append({
                        "time": h["timestamp"],
                        "usage": d["usage_percent"],
                        "free_gb": d["free_gb"],
                    })
                    break

        if len(data_points) < 2:
            print(f"数据不足(需要至少 2 个数据点,当前 {len(data_points)} 个)")
            return None

        # 计算趋势
        first = data_points[0]
        last = data_points[-1]

        usage_change = last["usage"] - first["usage"]
        free_change = last["free_gb"] - first["free_gb"]
        days_span = len(data_points)  # 近似天数

        daily_usage_increase = usage_change / days_span if days_span > 0 else 0
        daily_free_decrease = abs(free_change) / days_span if days_span > 0 else 0

        # 预计何时达到阈值
        if daily_usage_increase > 0:
            days_to_warning = (self.threshold_warning - last["usage"]) / daily_usage_increase
            days_to_critical = (self.threshold_critical - last["usage"]) / daily_usage_increase
        else:
            days_to_warning = float('inf')
            days_to_critical = float('inf')

        report = {
            "drive": drive_letter,
            "period": f"最近 {days} 天",
            "data_points": len(data_points),
            "current_usage": f"{last['usage']}%",
            "current_free": f"{last['free_gb']} GB",
            "usage_change": f"+{usage_change:.1f}%" if usage_change > 0 else f"{usage_change:.1f}%",
            "free_change": f"{free_change:.1f} GB",
            "daily_increase": f"+{daily_usage_increase:.2f}%" if daily_usage_increase > 0 else "稳定",
            "days_to_warning": f"{days_to_warning:.0f} 天" if days_to_warning < 365 else "无风险",
            "days_to_critical": f"{days_to_critical:.0f} 天" if days_to_critical < 365 else "无风险",
        }

        print(f"\n=== {drive_letter} 磁盘使用趋势 ({report['period']}) ===")
        print(f"  当前使用率: {report['current_usage']}, 剩余: {report['current_free']}")
        print(f"  变化: 使用率 {report['usage_change']}, 空间 {report['free_change']}")
        print(f"  日均增长: {report['daily_increase']}")
        print(f"  预计触发警告: {report['days_to_warning']}")
        print(f"  预计磁盘满: {report['days_to_critical']}")

        return report


# 使用示例
# monitor = DiskMonitor()
# monitor.record_snapshot()  # 记录快照
# alerts = monitor.check_alerts()
# monitor.analyze_trend("C:", days=30)

小结

需求方案复杂度
磁盘全景wmic + shutil入门
大文件扫描os.walk + 大小排序入门
目录大小分析os.walk + 深度控制中等
重复文件检测文件大小 + MD5 双重校验中等
自动清理按类型策略清理中等
趋势分析历史数据 + 线性回归进阶
告警通知阈值检查 + 邮件/微信推送进阶

磁盘空间管理是运维的基本功。建好监控,设好告警,自动清理,把"C 盘满了"这种低级问题消灭在萌芽状态。

以上就是Python实现智能磁盘监控并自动清理垃圾文件的详细内容,更多关于Python磁盘监控的资料请关注脚本之家其它相关文章!

相关文章

  • Python之tkinter面板PanedWindow的使用

    Python之tkinter面板PanedWindow的使用

    这篇文章主要介绍了Python之tkinter面板PanedWindow的使用及说明,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-05-05
  • python 使用re.search()筛选后 选取部分结果的方法

    python 使用re.search()筛选后 选取部分结果的方法

    今天小编就为大家分享一篇python 使用re.search()筛选后 选取部分结果的方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2018-11-11
  • VSCode运行或调试python文件无反应的问题解决

    VSCode运行或调试python文件无反应的问题解决

    这篇文章主要给大家介绍了关于VSCode运行或调试python文件无反应的问题解决,使用VScode编译运行C/C++没有问题,但是运行Python的时候出了问题,所以这里给大家总结下,需要的朋友可以参考下
    2023-07-07
  • python selenium循环登陆网站的实现

    python selenium循环登陆网站的实现

    这篇文章主要介绍了python selenium循环登陆网站的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-11-11
  • 利用Playwright实现文件上传与下载的完成判断全指南

    利用Playwright实现文件上传与下载的完成判断全指南

    在自动化测试或网页数据交互场景中,文件上传与下载是极为常见的操作,Playwright作为强大的自动化测试工具,不仅能模拟用户触发上传和下载行为,更能精准判断操作是否完成,本文将从原理到实践,全面讲解如何利用Playwright实现文件上传、下载的完成判断
    2025-10-10
  • python 一些常用的小脚本

    python 一些常用的小脚本

    本文主要介绍了python 一些常用的小脚本,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2007-10-10
  • Python时间转化方法超全总结

    Python时间转化方法超全总结

    在生活和工作中,我们每个人每天都在和时间打交道。本文就为大家总结了Python实现时间转化的多种方法,快来跟随小编一起学习一下吧
    2022-03-03
  • Python调试代码的高效方法分享

    Python调试代码的高效方法分享

    写代码最让人抓狂的,不是逻辑想不明白,而是代码跑起来——直接爆炸,所以本文小编给大家介绍如何快速找出 Bug,并解决它们,感兴趣的小伙伴跟着小编一起来看看吧
    2025-04-04
  • Python实现C#代码生成器应用服务于Unity示例解析

    Python实现C#代码生成器应用服务于Unity示例解析

    为了满足项目需要,需要实现一个c#代码生成器,为此设计了一个语法模板适用于Unity的代码生成器。本次使用了Python的Template模板,使用python开发
    2021-10-10
  • 学python最电脑配置有要求么

    学python最电脑配置有要求么

    在本篇内容中小编给大家整理的是关于学习python中电脑配置的相关文章,需要的朋友们可以学习下。
    2020-07-07

最新评论