Python文件批量处理(重命名/备份/同步运维)的实战指南
老王在一家小公司管服务器。每天最烦的事,就是开发同事丢来一堆日志文件,文件名乱七八糟——有的叫log1.txt,有的叫1212.log,还有的直接叫新建文本文档(1).log。更糟的是,每周五要手动备份一遍配置文件,还得确保备份目录和源目录同步。老王决定写几个Python脚本把这些破事自动化。
场景一:批量重命名——让文件名规规矩矩
先拿日志文件开刀。老王想把所有.log文件改成2024-12-01_001.log这种格式,按修改时间排序。
import os
from datetime import datetime
def rename_logs(folder_path):
log_files = [f for f in os.listdir(folder_path) if f.endswith('.log')]
for index, filename in enumerate(log_files, 1):
old_path = os.path.join(folder_path, filename)
# 获取修改时间
mtime = os.path.getmtime(old_path)
date_str = datetime.fromtimestamp(mtime).strftime('%Y-%m-%d')
new_name = f"{date_str}_{index:03d}.log"
new_path = os.path.join(folder_path, new_name)
os.rename(old_path, new_path)
print(f"{filename} -> {new_name}")
rename_logs('/var/logs')
运行一次,新建文本文档(1).log变成了2024-12-15_005.log,一目了然。
但os.rename有个坑:目标文件已存在时会报错。加上覆盖检查更稳妥:
def safe_rename(old_path, new_path):
if os.path.exists(new_path):
base, ext = os.path.splitext(new_path)
counter = 1
while os.path.exists(f"{base}_{counter}{ext}"):
counter += 1
new_path = f"{base}_{counter}{ext}"
os.rename(old_path, new_path)
return new_path
更灵活的模式匹配:用glob模块按规则筛选文件,比如只改report_*.xlsx:
import glob
for filepath in glob.glob('/data/report_*.xlsx'):
dirname, filename = os.path.split(filepath)
# 提取月份,比如 report_202412.xlsx -> 202412
month = filename.split('_')[1].split('.')[0]
new_name = f"财务报告_{month}.xlsx"
os.rename(filepath, os.path.join(dirname, new_name))
正则表达式批量替换:某些文件命名有规律但混乱,比如IMG_001 (1).jpg、IMG_001 (2).jpg这种括号空格让人头疼。
import re
def clean_filenames(folder, pattern, replacement):
for filename in os.listdir(folder):
new_name = re.sub(pattern, replacement, filename)
if new_name != filename:
old_path = os.path.join(folder, filename)
new_path = os.path.join(folder, new_name)
os.rename(old_path, new_path)
print(f"修正: {filename} -> {new_name}")
# 去掉文件名中的空格和括号
clean_filenames('./photos', r'[(\s)]', '')
# IMG_001 (1).jpg -> IMG_0011.jpg (虽然不太完美,但至少没空格了)
场景二:智能备份——别每次都全量复制
老王之前用cp -r全量备份,几十G的配置文件越来越慢。他需要增量备份——只复制改动过的文件。
import shutil
import hashlib
def md5_file(filepath):
"""计算文件MD5,用于判断内容是否变化"""
hash_md5 = hashlib.md5()
with open(filepath, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def incremental_backup(src_dir, dst_dir, state_file='backup_state.json'):
import json
# 加载上次备份的状态
if os.path.exists(state_file):
with open(state_file, 'r') as f:
last_state = json.load(f)
else:
last_state = {}
new_state = {}
copied_count = 0
for root, dirs, files in os.walk(src_dir):
rel_path = os.path.relpath(root, src_dir)
dst_root = os.path.join(dst_dir, rel_path)
os.makedirs(dst_root, exist_ok=True)
for file in files:
src_file = os.path.join(root, file)
dst_file = os.path.join(dst_root, file)
file_md5 = md5_file(src_file)
file_key = os.path.relpath(src_file, src_dir)
new_state[file_key] = file_md5
# 检查是否需要备份
if file_key not in last_state or last_state[file_key] != file_md5:
shutil.copy2(src_file, dst_file) # copy2保留元数据
copied_count += 1
print(f"备份: {file_key}")
# 保存新状态
with open(state_file, 'w') as f:
json.dump(new_state, f, indent=2)
print(f"完成,共备份 {copied_count} 个文件")
incremental_backup('/etc/nginx', '/backup/nginx')
第一次运行会全量备份,第二次只复制改动过的文件。状态文件backup_state.json记录了每个文件的MD5,拿掉它就能强制全量。
定时自动备份:结合schedule库设置每天凌晨2点执行。
import schedule
import time
def auto_backup_job():
print(f"{datetime.now()} 开始自动备份")
incremental_backup('/data/mysql', '/backup/mysql')
print("备份完成")
schedule.every().day.at("02:00").do(auto_backup_job)
while True:
schedule.run_pending()
time.sleep(60)
场景三:目录同步——像rsync一样工作
老王还负责维护两台服务器之间的文件同步。rsync很好用,但某些环境不允许装额外软件,自己写一个轻量版。
import filecmp
import shutil
def sync_dirs(src, dst, delete_extra=False):
"""
同步目录,dst会变成和src一模一样
delete_extra=True时删除dst中多余的文件
"""
if not os.path.exists(dst):
os.makedirs(dst)
# 比较两个目录
comparison = filecmp.dircmp(src, dst)
# 复制src有但dst没有的
for file in comparison.left_only:
src_path = os.path.join(src, file)
dst_path = os.path.join(dst, file)
if os.path.isdir(src_path):
shutil.copytree(src_path, dst_path)
else:
shutil.copy2(src_path, dst_path)
print(f"新增: {file}")
# 复制有更新的
for file in comparison.diff_files:
src_path = os.path.join(src, file)
dst_path = os.path.join(dst, file)
shutil.copy2(src_path, dst_path)
print(f"更新: {file}")
# 删除dst多余的
if delete_extra:
for file in comparison.right_only:
dst_path = os.path.join(dst, file)
if os.path.isdir(dst_path):
shutil.rmtree(dst_path)
else:
os.remove(dst_path)
print(f"删除: {file}")
# 递归处理子目录
for common_dir in comparison.common_dirs:
sync_dirs(
os.path.join(src, common_dir),
os.path.join(dst, common_dir),
delete_extra
)
sync_dirs('/home/user/project', '/backup/project', delete_extra=True)
filecmp.dircmp一次性告诉你三类差异:left_only(源独有)、right_only(目标独有)、diff_files(内容不同)。递归调用就能同步整个目录树。
实时同步:监听文件系统事件,文件一改动立刻同步。
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
class SyncHandler(FileSystemEventHandler):
def __init__(self, src, dst):
self.src = src
self.dst = dst
def on_modified(self, event):
if not event.is_directory:
rel_path = os.path.relpath(event.src_path, self.src)
dst_path = os.path.join(self.dst, rel_path)
os.makedirs(os.path.dirname(dst_path), exist_ok=True)
shutil.copy2(event.src_path, dst_path)
print(f"实时同步: {rel_path}")
observer = Observer()
handler = SyncHandler('/watch/src', '/watch/dst')
observer.schedule(handler, '/watch/src', recursive=True)
observer.start()
运行后,在/watch/src里新建或修改任何文件,都会瞬间复制到/watch/dst。适合配置文件热同步场景。
场景四:批量清理旧文件
日志文件占满磁盘是家常便饭。写个脚本自动删除30天前的文件。
import time
def clean_old_files(folder, days=30, pattern='*'):
now = time.time()
cutoff = now - (days * 86400)
for root, dirs, files in os.walk(folder):
for file in files:
if not glob.fnmatch.fnmatch(file, pattern):
continue
filepath = os.path.join(root, file)
mtime = os.path.getmtime(filepath)
if mtime < cutoff:
os.remove(filepath)
print(f"删除旧文件: {filepath} ({datetime.fromtimestamp(mtime).date()})")
# 删除空目录
for dir in dirs:
dirpath = os.path.join(root, dir)
try:
os.rmdir(dirpath)
print(f"删除空目录: {dirpath}")
except OSError:
pass
clean_old_files('/var/log/nginx', days=7, pattern='*.log*')
更安全的干跑模式:先预览不执行,确认无误再真正删除。
def dry_run_clean(folder, days=30):
now = time.time()
cutoff = now - (days * 86400)
to_delete = []
for root, dirs, files in os.walk(folder):
for file in files:
filepath = os.path.join(root, file)
if os.path.getmtime(filepath) < cutoff:
to_delete.append(filepath)
print(f"将删除 {len(to_delete)} 个文件:")
for path in to_delete[:10]: # 只显示前10个
print(f" {path}")
if input("确认删除?(y/n): ").lower() == 'y':
for path in to_delete:
os.remove(path)
print("删除完成")
踩坑与进阶技巧
文件名编码问题:Windows和Linux的文件名编码不同。跨平台时用os.fsencode和os.fsdecode处理。
def safe_listdir(path):
try:
return os.listdir(path)
except UnicodeEncodeError:
return [os.fsdecode(f) for f in os.listdir(os.fsencode(path))]
大量文件性能优化:os.listdir在几十万文件的目录里很慢,用scandir代替。
with os.scandir('/big_folder') as entries:
for entry in entries:
if entry.is_file() and entry.name.endswith('.log'):
print(entry.name, entry.stat().st_size)
scandir一次性返回文件属性,避免额外调用stat,速度提升2-5倍。
移动而不是复制:备份到同一磁盘时,移动文件比复制快得多。
def move_to_archive(src, dst):
os.makedirs(dst, exist_ok=True)
for filename in os.listdir(src):
shutil.move(os.path.join(src, filename), os.path.join(dst, filename))
日志记录:所有操作记下来,出问题能追溯。
import logging
logging.basicConfig(
filename='file_ops.log',
level=logging.INFO,
format='%(asctime)s - %(message)s'
)
def log_op(op, src, dst=None):
msg = f"{op}: {src}"
if dst:
msg += f" -> {dst}"
logging.info(msg)
print(msg)
一键搞定:综合运维脚本
把常用功能打包成命令行工具:
import argparse
def main():
parser = argparse.ArgumentParser(description='文件批量处理工具')
subparsers = parser.add_subparsers(dest='command')
rename_parser = subparsers.add_parser('rename')
rename_parser.add_argument('folder')
rename_parser.add_argument('--pattern', default='*')
rename_parser.add_argument('--template', default='{date}_{index}')
backup_parser = subparsers.add_parser('backup')
backup_parser.add_argument('src')
backup_parser.add_argument('dst')
backup_parser.add_argument('--incremental', action='store_true')
sync_parser = subparsers.add_parser('sync')
sync_parser.add_argument('src')
sync_parser.add_argument('dst')
sync_parser.add_argument('--delete', action='store_true')
clean_parser = subparsers.add_parser('clean')
clean_parser.add_argument('folder')
clean_parser.add_argument('--days', type=int, default=30)
args = parser.parse_args()
if args.command == 'rename':
rename_files(args.folder, args.pattern, args.template)
elif args.command == 'backup':
if args.incremental:
incremental_backup(args.src, args.dst)
else:
shutil.copytree(args.src, args.dst)
elif args.command == 'sync':
sync_dirs(args.src, args.dst, args.delete)
elif args.command == 'clean':
clean_old_files(args.folder, args.days)
# 使用示例:
# python file_tool.py rename ./logs --pattern "*.log" --template "{date}_{index}"
# python file_tool.py backup /data /backup --incremental
# python file_tool.py sync /project /backup/project --delete
老王把这些脚本整合起来,设置好定时任务。现在每天早上一到公司,检查一下日志文件有没有按规范命名,备份有没有成功同步。之前每周五加班备份的日子一去不返,他泡杯茶就能开始一天的工作。
以上就是Python文件批量处理(重命名/备份/同步运维)的实战指南的详细内容,更多关于Python文件批量处理的资料请关注脚本之家其它相关文章!
相关文章
Django media static外部访问Django中的图片设置教程
这篇文章主要介绍了Django media static外部访问Django中的图片设置教程,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧2020-04-04
pip安装库报错[notice] A new release of pip available: 22.2
这篇文章主要给大家介绍了关于pip安装库报错[notice] A new release of pip available: 22.2 -> 22.2.2的相关资料,文中通过图文将解决的方法介绍的非常详细,需要的朋友可以参考下2023-03-03


最新评论