Python实现监控系统资源的脚本工具

 更新时间:2025年12月04日 08:34:19   作者:零日失眠者  
系统资源监控是系统管理员和开发人员日常工作中的重要任务,本文将介绍一个实用的Python系统资源监控工具,感兴趣的小伙伴可以了解一下

简介

系统资源监控是系统管理员和开发人员日常工作中的重要任务。通过实时监控CPU、内存、磁盘和网络等关键资源的使用情况,我们可以及时发现系统性能瓶颈、预防系统故障并优化资源配置。本文将介绍一个实用的Python脚本——系统资源监控工具,它可以实时显示系统的各项关键指标,并支持历史数据记录和告警功能。

功能介绍

这个系统资源监控工具具有以下核心功能:

  • 实时监控:实时显示CPU、内存、磁盘和网络使用情况
  • 多维度监控:监控CPU使用率、内存占用、磁盘IO、网络流量等多个维度
  • 历史数据记录:将监控数据保存到CSV文件,便于后续分析
  • 可视化图表:生成资源使用趋势图,直观展示系统性能变化
  • 阈值告警:当资源使用超过设定阈值时发出告警
  • 定时监控:支持按指定间隔持续监控系统资源
  • 跨平台支持:支持Windows、Linux和macOS操作系统
  • 轻量级设计:占用系统资源极少,不影响被监控系统的正常运行

应用场景

这个工具适用于以下场景:

  • 服务器监控:监控生产服务器的资源使用情况
  • 性能调优:分析应用程序运行时的资源消耗
  • 故障排查:定位系统性能问题的根本原因
  • 容量规划:根据历史数据预测系统资源需求
  • 开发测试:在开发和测试阶段监控应用程序性能
  • 系统维护:定期检查系统健康状况

报错处理

脚本包含了完善的错误处理机制:

  • 权限检测:检测是否具有足够的权限访问系统信息
  • 依赖检查:检查必要的第三方库是否已安装
  • 网络异常处理:处理网络监控中的连接异常
  • 文件操作保护:安全地读写监控数据文件
  • 资源限制处理:在资源受限环境下优雅降级
  • 异常捕获:捕获并处理运行过程中可能出现的各种异常

代码实现

import psutil
import time
import csv
import argparse
import os
from datetime import datetime
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from collections import deque
import threading
import sys

class SystemResourceMonitor:
    def __init__(self, interval=1, history_size=100):
        self.interval = interval
        self.history_size = history_size
        self.monitoring = False
        self.data_history = deque(maxlen=history_size)
        
        # 初始化历史数据存储
        self.timestamps = deque(maxlen=history_size)
        self.cpu_percentages = deque(maxlen=history_size)
        self.memory_percentages = deque(maxlen=history_size)
        self.disk_usages = deque(maxlen=history_size)
        self.network_bytes_sent = deque(maxlen=history_size)
        self.network_bytes_recv = deque(maxlen=history_size)
        
        # 网络流量基准值
        self.net_io_baseline = psutil.net_io_counters()
        
    def get_cpu_info(self):
        """获取CPU信息"""
        try:
            cpu_percent = psutil.cpu_percent(interval=1)
            cpu_count = psutil.cpu_count()
            cpu_freq = psutil.cpu_freq()
            
            return {
                'percent': cpu_percent,
                'count': cpu_count,
                'frequency': cpu_freq.current if cpu_freq else 0
            }
        except Exception as e:
            return {'error': str(e)}
            
    def get_memory_info(self):
        """获取内存信息"""
        try:
            memory = psutil.virtual_memory()
            swap = psutil.swap_memory()
            
            return {
                'total': memory.total,
                'available': memory.available,
                'used': memory.used,
                'percent': memory.percent,
                'swap_total': swap.total,
                'swap_used': swap.used,
                'swap_percent': swap.percent
            }
        except Exception as e:
            return {'error': str(e)}
            
    def get_disk_info(self):
        """获取磁盘信息"""
        try:
            disk = psutil.disk_usage('/')
            disk_io = psutil.disk_io_counters()
            
            return {
                'total': disk.total,
                'used': disk.used,
                'free': disk.free,
                'percent': disk.percent,
                'read_bytes': disk_io.read_bytes if disk_io else 0,
                'write_bytes': disk_io.write_bytes if disk_io else 0
            }
        except Exception as e:
            return {'error': str(e)}
            
    def get_network_info(self):
        """获取网络信息"""
        try:
            net_io = psutil.net_io_counters()
            
            # 计算相对于基准值的流量
            bytes_sent = net_io.bytes_sent - self.net_io_baseline.bytes_sent
            bytes_recv = net_io.bytes_recv - self.net_io_baseline.bytes_recv
            
            return {
                'bytes_sent': bytes_sent,
                'bytes_recv': bytes_recv,
                'packets_sent': net_io.packets_sent,
                'packets_recv': net_io.packets_recv
            }
        except Exception as e:
            return {'error': str(e)}
            
    def format_bytes(self, bytes_value):
        """格式化字节单位"""
        for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
            if bytes_value < 1024.0:
                return f"{bytes_value:.2f} {unit}"
            bytes_value /= 1024.0
        return f"{bytes_value:.2f} PB"
        
    def display_system_info(self):
        """显示系统信息"""
        # 清屏(兼容不同操作系统)
        os.system('cls' if os.name == 'nt' else 'clear')
        
        print("=" * 80)
        print("系统资源监控工具")
        print("=" * 80)
        print(f"监控时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        print()
        
        # CPU信息
        cpu_info = self.get_cpu_info()
        if 'error' not in cpu_info:
            print("CPU信息:")
            print(f"  使用率: {cpu_info['percent']:.1f}%")
            print(f"  核心数: {cpu_info['count']}")
            print(f"  频率: {cpu_info['frequency']:.2f} MHz")
        else:
            print(f"CPU信息获取失败: {cpu_info['error']}")
        print()
        
        # 内存信息
        memory_info = self.get_memory_info()
        if 'error' not in memory_info:
            print("内存信息:")
            print(f"  总计: {self.format_bytes(memory_info['total'])}")
            print(f"  已用: {self.format_bytes(memory_info['used'])}")
            print(f"  可用: {self.format_bytes(memory_info['available'])}")
            print(f"  使用率: {memory_info['percent']:.1f}%")
            if memory_info['swap_total'] > 0:
                print(f"  交换分区: {memory_info['swap_percent']:.1f}%")
        else:
            print(f"内存信息获取失败: {memory_info['error']}")
        print()
        
        # 磁盘信息
        disk_info = self.get_disk_info()
        if 'error' not in disk_info:
            print("磁盘信息:")
            print(f"  总计: {self.format_bytes(disk_info['total'])}")
            print(f"  已用: {self.format_bytes(disk_info['used'])}")
            print(f"  可用: {self.format_bytes(disk_info['free'])}")
            print(f"  使用率: {disk_info['percent']:.1f}%")
        else:
            print(f"磁盘信息获取失败: {disk_info['error']}")
        print()
        
        # 网络信息
        network_info = self.get_network_info()
        if 'error' not in network_info:
            print("网络信息:")
            print(f"  发送: {self.format_bytes(network_info['bytes_sent'])}")
            print(f"  接收: {self.format_bytes(network_info['bytes_recv'])}")
            print(f"  发送包: {network_info['packets_sent']:,}")
            print(f"  接收包: {network_info['packets_recv']:,}")
        else:
            print(f"网络信息获取失败: {network_info['error']}")
        print()
        
        print("=" * 80)
        print("按 Ctrl+C 停止监控")
        
    def collect_data(self):
        """收集监控数据"""
        timestamp = datetime.now()
        
        data = {
            'timestamp': timestamp,
            'cpu': self.get_cpu_info(),
            'memory': self.get_memory_info(),
            'disk': self.get_disk_info(),
            'network': self.get_network_info()
        }
        
        # 添加到历史记录
        self.data_history.append(data)
        
        # 更新图表数据
        self.timestamps.append(timestamp)
        if 'error' not in data['cpu']:
            self.cpu_percentages.append(data['cpu']['percent'])
        if 'error' not in data['memory']:
            self.memory_percentages.append(data['memory']['percent'])
        if 'error' not in data['disk']:
            self.disk_usages.append(data['disk']['percent'])
            
        return data
        
    def save_to_csv(self, filename="system_monitor.csv"):
        """保存数据到CSV文件"""
        try:
            file_exists = os.path.exists(filename)
            
            with open(filename, 'a', newline='', encoding='utf-8') as csvfile:
                fieldnames = [
                    'timestamp', 'cpu_percent', 'memory_percent', 'disk_percent',
                    'network_sent', 'network_recv'
                ]
                writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
                
                # 如果是新文件,写入表头
                if not file_exists:
                    writer.writeheader()
                    
                # 写入最新数据
                if self.data_history:
                    latest_data = self.data_history[-1]
                    row = {
                        'timestamp': latest_data['timestamp'].strftime('%Y-%m-%d %H:%M:%S'),
                        'cpu_percent': latest_data['cpu'].get('percent', 0) if 'error' not in latest_data['cpu'] else 0,
                        'memory_percent': latest_data['memory'].get('percent', 0) if 'error' not in latest_data['memory'] else 0,
                        'disk_percent': latest_data['disk'].get('percent', 0) if 'error' not in latest_data['disk'] else 0,
                        'network_sent': latest_data['network'].get('bytes_sent', 0) if 'error' not in latest_data['network'] else 0,
                        'network_recv': latest_data['network'].get('bytes_recv', 0) if 'error' not in latest_data['network'] else 0
                    }
                    writer.writerow(row)
                    
            return True
        except Exception as e:
            print(f"保存CSV文件时出错: {e}")
            return False
            
    def generate_chart(self, filename="resource_usage.png"):
        """生成资源使用趋势图"""
        try:
            if len(self.timestamps) < 2:
                print("数据不足,无法生成图表")
                return False
                
            plt.figure(figsize=(12, 8))
            
            # 创建子图
            ax1 = plt.subplot(2, 2, 1)
            if self.cpu_percentages:
                ax1.plot(self.timestamps, self.cpu_percentages, 'b-', label='CPU使用率')
                ax1.set_title('CPU使用率')
                ax1.set_ylabel('百分比 (%)')
                ax1.legend()
                ax1.grid(True)
            
            ax2 = plt.subplot(2, 2, 2)
            if self.memory_percentages:
                ax2.plot(self.timestamps, self.memory_percentages, 'g-', label='内存使用率')
                ax2.set_title('内存使用率')
                ax2.set_ylabel('百分比 (%)')
                ax2.legend()
                ax2.grid(True)
            
            ax3 = plt.subplot(2, 2, 3)
            if self.disk_usages:
                ax3.plot(self.timestamps, self.disk_usages, 'r-', label='磁盘使用率')
                ax3.set_title('磁盘使用率')
                ax3.set_ylabel('百分比 (%)')
                ax3.set_xlabel('时间')
                ax3.legend()
                ax3.grid(True)
            
            ax4 = plt.subplot(2, 2, 4)
            if self.network_bytes_sent and self.network_bytes_recv:
                ax4.plot(self.timestamps, [b/1024/1024 for b in self.network_bytes_sent], 'c-', label='发送(MB)')
                ax4.plot(self.timestamps, [b/1024/1024 for b in self.network_bytes_recv], 'm-', label='接收(MB)')
                ax4.set_title('网络流量')
                ax4.set_ylabel('流量 (MB)')
                ax4.set_xlabel('时间')
                ax4.legend()
                ax4.grid(True)
            
            # 格式化时间轴
            for ax in [ax1, ax2, ax3, ax4]:
                ax.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M:%S'))
                ax.xaxis.set_major_locator(mdates.MinuteLocator(interval=5))
                plt.setp(ax.xaxis.get_majorticklabels(), rotation=45)
            
            plt.tight_layout()
            plt.savefig(filename, dpi=300, bbox_inches='tight')
            plt.close()
            
            print(f"图表已保存到: {filename}")
            return True
        except Exception as e:
            print(f"生成图表时出错: {e}")
            return False
            
    def check_thresholds(self, thresholds):
        """检查阈值并发出告警"""
        if not self.data_history:
            return
            
        latest_data = self.data_history[-1]
        
        # 检查CPU阈值
        if 'error' not in latest_data['cpu'] and thresholds.get('cpu'):
            cpu_percent = latest_data['cpu']['percent']
            if cpu_percent > thresholds['cpu']:
                print(f"⚠️  CPU使用率过高: {cpu_percent:.1f}% (阈值: {thresholds['cpu']}%)")
                
        # 检查内存阈值
        if 'error' not in latest_data['memory'] and thresholds.get('memory'):
            memory_percent = latest_data['memory']['percent']
            if memory_percent > thresholds['memory']:
                print(f"⚠️  内存使用率过高: {memory_percent:.1f}% (阈值: {thresholds['memory']}%)")
                
        # 检查磁盘阈值
        if 'error' not in latest_data['disk'] and thresholds.get('disk'):
            disk_percent = latest_data['disk']['percent']
            if disk_percent > thresholds['disk']:
                print(f"⚠️  磁盘使用率过高: {disk_percent:.1f}% (阈值: {thresholds['disk']}%)")
                
    def start_monitoring(self, duration=None, csv_file=None, chart_file=None, thresholds=None):
        """开始监控"""
        self.monitoring = True
        start_time = time.time()
        
        try:
            while self.monitoring:
                # 收集数据
                self.collect_data()
                
                # 显示信息
                self.display_system_info()
                
                # 保存到CSV
                if csv_file:
                    self.save_to_csv(csv_file)
                    
                # 检查阈值
                if thresholds:
                    self.check_thresholds(thresholds)
                    
                # 检查是否达到指定时长
                if duration and (time.time() - start_time) >= duration:
                    break
                    
                # 等待下次监控
                time.sleep(self.interval)
                
        except KeyboardInterrupt:
            print("\n\n停止监控...")
        finally:
            self.monitoring = False
            
            # 生成图表
            if chart_file:
                self.generate_chart(chart_file)
                
            print("监控已停止")

def main():
    parser = argparse.ArgumentParser(description="系统资源监控工具")
    parser.add_argument("-i", "--interval", type=int, default=5, 
                       help="监控间隔(秒,默认:5)")
    parser.add_argument("-d", "--duration", type=int, 
                       help="监控时长(秒,不指定则持续监控)")
    parser.add_argument("-c", "--csv", help="保存数据到CSV文件")
    parser.add_argument("-g", "--graph", help="生成图表文件")
    parser.add_argument("--cpu-threshold", type=float, 
                       help="CPU使用率告警阈值(%)")
    parser.add_argument("--memory-threshold", type=float, 
                       help="内存使用率告警阈值(%)")
    parser.add_argument("--disk-threshold", type=float, 
                       help="磁盘使用率告警阈值(%)")
    
    args = parser.parse_args()
    
    # 检查依赖
    try:
        import psutil
    except ImportError:
        print("错误: 缺少psutil库,请安装: pip install psutil")
        sys.exit(1)
        
    if args.graph and not plt:
        print("警告: 缺少matplotlib库,无法生成图表")
        args.graph = None
        
    # 设置阈值
    thresholds = {}
    if args.cpu_threshold:
        thresholds['cpu'] = args.cpu_threshold
    if args.memory_threshold:
        thresholds['memory'] = args.memory_threshold
    if args.disk_threshold:
        thresholds['disk'] = args.disk_threshold
        
    try:
        monitor = SystemResourceMonitor(interval=args.interval)
        monitor.start_monitoring(
            duration=args.duration,
            csv_file=args.csv,
            chart_file=args.graph,
            thresholds=thresholds
        )
        
    except Exception as e:
        print(f"程序执行出错: {e}")
        sys.exit(1)

if __name__ == "__main__":
    main()

使用方法

安装依赖

在使用此脚本之前,需要安装必要的库:

pip install psutil matplotlib

基本使用

# 基本用法,每5秒刷新一次
python system_monitor.py

# 设置监控间隔为10秒
python system_monitor.py -i 10

# 监控60秒后自动停止
python system_monitor.py -d 60

# 保存监控数据到CSV文件
python system_monitor.py -c monitor_data.csv

# 生成资源使用趋势图
python system_monitor.py -g resource_trend.png

# 设置告警阈值
python system_monitor.py --cpu-threshold 80 --memory-threshold 85 --disk-threshold 90

命令行参数说明

  • -i, --interval: 监控间隔(秒),默认5秒
  • -d, --duration: 监控时长(秒),不指定则持续监控
  • -c, --csv: 保存数据到指定的CSV文件
  • -g, --graph: 生成资源使用趋势图到指定文件
  • --cpu-threshold: CPU使用率告警阈值(%)
  • --memory-threshold: 内存使用率告警阈值(%)
  • --disk-threshold: 磁盘使用率告警阈值(%)

使用示例

持续监控系统资源

python system_monitor.py

监控并保存数据

python system_monitor.py -i 10 -d 300 -c system_data.csv -g trend.png

带告警的监控

python system_monitor.py --cpu-threshold 80 --memory-threshold 85

总结

这个系统资源监控工具提供了一个全面的系统监控解决方案,能够实时显示CPU、内存、磁盘和网络等关键资源的使用情况。它支持数据持久化、可视化图表生成和阈值告警等功能,适用于服务器监控、性能调优和故障排查等多种场景。通过这个工具,用户可以更好地了解系统运行状态,及时发现和解决性能问题,确保系统的稳定运行。

以上就是Python实现监控系统资源的脚本工具的详细内容,更多关于Python监控系统资源的资料请关注脚本之家其它相关文章!

相关文章

  • 如何基于Python批量下载音乐

    如何基于Python批量下载音乐

    这篇文章主要介绍了如何基于Python批量下载音乐,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-11-11
  • 如何使用Python对日期和时间进行排序

    如何使用Python对日期和时间进行排序

    本文将教我们如何使用Python对日期和时间进行排序,我们还将学习datetime模块和sorted方法,本文结合示例代码给大家介绍的非常详细,需要的朋友参考下吧
    2023-06-06
  • python pygame模块编写飞机大战

    python pygame模块编写飞机大战

    这篇文章主要为大家详细介绍了python pygame模块编写飞机大战,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2018-11-11
  • Python实现视频自动打码的示例代码

    Python实现视频自动打码的示例代码

    我们在观看视频的时候,有时候会出现一些奇怪的马赛克,影响我们的观影体验,那么这些马赛克是如何精确的加上去的呢?本文就来为大家详细讲讲
    2022-04-04
  • Python将运行结果导出为CSV格式的两种常用方法

    Python将运行结果导出为CSV格式的两种常用方法

    这篇文章主要给大家介绍了关于Python将运行结果导出为CSV格式的两种常用方法,Python生成(导出)csv文件其实很简单,我们一般可以用csv模块或者pandas库来实现,需要的朋友可以参考下
    2023-07-07
  • Python 创建格式化字符串方法

    Python 创建格式化字符串方法

    这篇文章主要介绍了Python 创建格式化字符串方法,文章围绕主题展开详细的内容介绍,具有一定的参考价值,需要的小伙伴可以参考一下
    2022-08-08
  • 使用Termux在手机上运行Python的详细过程

    使用Termux在手机上运行Python的详细过程

    这篇文章主要介绍了使用Termux在手机上运行Python的详细过程,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-10-10
  • Python编写春联的示例代码(支持行书隶书楷书)

    Python编写春联的示例代码(支持行书隶书楷书)

    这篇文章主要介绍了如何通过Python代码编写春联,其中春联字体支持行书隶书楷书。文中的示例代码讲解详细,感兴趣的小伙伴可以动手试一试
    2022-01-01
  • Python代码阅读--列表元素逻辑判断

    Python代码阅读--列表元素逻辑判断

    本篇阅读的三份代码的功能分别是判断列表中的元素是否都符合给定的条件;判断列表中是否存在符合给定的条件的元素;以及判断列表中的元素是否都不符合给定的条件,下面小编将在文章里详细介绍,需要的朋友可以参考下
    2021-09-09
  • python:socket传输大文件示例

    python:socket传输大文件示例

    本篇文章主要介绍了python:socket传输大文件示例,具有一定的参考价值,有兴趣的可以了解一下,
    2017-01-01

最新评论