使用国内镜像源创建离线PyPI镜像的完整方案
更新时间:2025年09月10日 08:41:56 作者:东方佑
根据知识库信息,清华镜像已明确会阻断大量下载行为的请求,为避免此问题,我将提供一个安全使用国内镜像源的完整方案,确保能够一次性准备指定Python版本的所有包,然后导出到内网环境,需要的朋友可以参考下
国内可用的PyPI镜像源
| 镜像源 | URL | 特点 |
|---|---|---|
| 阿里云 | https://mirrors.aliyun.com/pypi/simple/ | 稳定性好,带宽充足 |
| 中国科学技术大学 | https://pypi.mirrors.ustc.edu.cn/simple/ | 较宽松的下载限制 |
| 豆瓣 | https://pypi.douban.com/simple/ | 历史较久,但限制较严格 |
| 华为云 | https://repo.huaweicloud.com/repository/pypi/simple/ | 新兴镜像,带宽充足 |
建议:优先使用阿里云或中科大镜像,它们对下载行为的限制相对宽松。
安全下载策略(避免触发限制)
基于知识库的警告,我们需要实施极其谨慎的下载策略:
- 极低并发:最多1-2个并发下载
- 长随机延迟:10-30秒的随机延迟
- 小批次处理:每批次仅处理20-30个包
- 多镜像轮换:在阿里云和中科大镜像之间轮换
- 失败重试机制:对失败的请求进行智能重试
完整解决方案
1. 修改下载脚本 scripts/safe_download.py
#!/usr/bin/env python3
import os
import sys
import time
import random
import requests
import argparse
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
# 安全配置参数 (根据知识库警告特别调整)
MAX_RETRIES = 5 # 增加重试次数
MIN_DELAY = 15 # 增加最小延迟至15秒 (避免触发限制)
MAX_DELAY = 30 # 增加最大延迟至30秒
MAX_WORKERS = 1 # 严格限制为单线程 (最关键的安全措施)
BATCH_SIZE = 20 # 减小批次大小
# 国内镜像源列表 (轮换使用)
MIRRORS = [
"https://mirrors.aliyun.com/pypi/simple",
"https://pypi.mirrors.ustc.edu.cn/simple"
]
CURRENT_MIRROR_IDX = 0
def get_next_mirror():
"""轮换使用镜像源"""
global CURRENT_MIRROR_IDX
mirror = MIRRORS[CURRENT_MIRROR_IDX]
CURRENT_MIRROR_IDX = (CURRENT_MIRROR_IDX + 1) % len(MIRRORS)
return mirror
def get_package_info(package_name):
"""获取包的元信息 - 使用国内镜像API"""
# 使用随机镜像
mirror = get_next_mirror()
# 通过simple API获取信息 (更安全)
url = f"{mirror}/{package_name}/"
for i in range(MAX_RETRIES):
try:
# 添加随机User-Agent
headers = {
'User-Agent': f'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{random.randint(80, 100)}.0.{random.randint(1000, 5000)}.111 Safari/537.36'
}
response = requests.get(url, headers=headers, timeout=45)
if response.status_code == 200:
# 解析HTML获取版本信息
import re
versions = re.findall(r'href="[^" rel="external nofollow" ]+?#([^"]+)"', response.text)
if versions:
return {"versions": versions}
return None
elif response.status_code == 404:
return None
# 遇到其他状态码,等待更长时间
time.sleep((i + 1) * 10)
except Exception as e:
print(f"获取包 {package_name} 信息失败: {str(e)}")
time.sleep((i + 1) * 15)
return None
def filter_packages_for_python(package_data, python_version):
"""过滤出兼容指定Python版本的包"""
if not package_data or 'versions' not in package_data:
return []
compatible_files = []
py_ver = python_version.replace('.', '')
# 这里简化处理,实际需要更复杂的版本匹配
# 在完整实现中,应解析每个版本的wheel标签
for version in package_data['versions']:
# 简单检查是否包含Python版本标识
if py_ver in version or 'py3' in version or 'any' in version:
# 构建下载URL (需要更精确的解析)
mirror = get_next_mirror()
file_url = f"{mirror}/{version}"
compatible_files.append({
'url': file_url,
'filename': version.split('/')[-1] if '/' in version else version,
'python_version': python_version
})
return compatible_files
def download_package(package_name, file_info, target_dir):
"""安全下载单个包文件"""
file_url = file_info['url']
file_name = file_info['filename']
target_path = os.path.join(target_dir, file_name)
# 如果文件已存在,跳过
if os.path.exists(target_path):
print(f"跳过已存在的文件: {file_name}")
return True
print(f"下载: {file_name} ({package_name})")
for i in range(MAX_RETRIES):
try:
# 添加随机User-Agent和Referer
headers = {
'User-Agent': f'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{random.randint(80, 100)}.0.{random.randint(1000, 5000)}.111 Safari/537.36',
'Referer': get_next_mirror() + '/'
}
response = requests.get(file_url, headers=headers, stream=True, timeout=90)
if response.status_code == 200:
with open(target_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
print(f"成功下载: {file_name}")
return True
elif response.status_code == 404:
print(f"文件不存在 (404): {file_name}")
return False
print(f"下载失败 ({response.status_code}): {file_name}")
# 遇到错误状态码,等待更长时间
time.sleep((i + 1) * 20)
except Exception as e:
print(f"下载 {file_name} 失败: {str(e)}")
time.sleep((i + 1) * 25)
# 下载失败,删除可能的部分文件
if os.path.exists(target_path):
os.remove(target_path)
return False
def get_all_packages():
"""获取所有包的列表 - 使用国内镜像的simple页面"""
print("获取所有包的列表...")
# 使用随机镜像
mirror = get_next_mirror()
url = f"{mirror}/"
for i in range(MAX_RETRIES):
try:
headers = {
'User-Agent': f'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{random.randint(80, 100)}.0.{random.randint(1000, 5000)}.111 Safari/537.36'
}
response = requests.get(url, headers=headers, timeout=60)
if response.status_code == 200:
# 解析HTML获取包名
import re
package_names = re.findall(r'<a href="/simple/([^" rel="external nofollow" /]+)">\1</a>', response.text)
return package_names
print(f"获取包列表失败: HTTP {response.status_code}")
time.sleep((i + 1) * 30)
except Exception as e:
print(f"获取包列表失败: {str(e)}")
time.sleep((i + 1) * 30)
return []
def main():
parser = argparse.ArgumentParser(description='安全下载指定Python版本的所有包 (使用国内镜像)')
parser.add_argument('--python-version', required=True, help='目标Python版本 (如: 3.8)')
parser.add_argument('--output-dir', default='../packages', help='输出目录')
parser.add_argument('--max-packages', type=int, default=0, help='最大下载包数量(0表示全部)')
args = parser.parse_args()
# 创建输出目录
output_dir = Path(args.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
print(f"输出目录: {output_dir}")
# 获取所有包列表
try:
all_packages = get_all_packages()
if not all_packages:
raise Exception("无法获取包列表,请稍后再试")
print(f"找到 {len(all_packages)} 个包")
# 限制包数量(用于测试)
if args.max_packages > 0:
all_packages = all_packages[:args.max_packages]
print(f"限制为前 {args.max_packages} 个包")
except Exception as e:
print(f"获取包列表失败: {str(e)}")
print("请检查网络连接,或稍后再试")
return 1
# 处理包
success_count = 0
failed_packages = []
total_packages = len(all_packages)
# 分批次处理,避免一次性太多请求
for i in range(0, total_packages, BATCH_SIZE):
batch = all_packages[i:i+BATCH_SIZE]
print(f"\n处理包批次 {i//BATCH_SIZE + 1} ({len(batch)} 个包) / 总计 {total_packages}")
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
future_to_pkg = {}
for pkg in batch:
pkg_data = get_package_info(pkg)
if pkg_data:
compatible_files = filter_packages_for_python(pkg_data, args.python_version)
if compatible_files:
for file_info in compatible_files:
future = executor.submit(
download_package,
pkg,
file_info,
output_dir
)
future_to_pkg[future] = pkg
# 添加长随机延迟,严格避免触发限制
time.sleep(random.uniform(MIN_DELAY, MAX_DELAY))
# 等待并处理结果
for future in as_completed(future_to_pkg):
pkg = future_to_pkg[future]
try:
if future.result():
success_count += 1
else:
failed_packages.append(pkg)
except Exception as e:
print(f"处理包 {pkg} 时出错: {str(e)}")
failed_packages.append(pkg)
# 批次间额外长延迟
delay = random.uniform(MAX_DELAY * 2, MAX_DELAY * 3)
print(f"\n批次处理完成,等待 {delay:.1f} 秒...")
time.sleep(delay)
# 生成报告
print("\n===== 下载完成 =====")
print(f"成功: {success_count} 个包")
print(f"失败: {len(failed_packages)} 个包 (总计处理: {total_packages})")
if failed_packages:
print("\n失败的包列表 (可后续重试):")
for pkg in failed_packages[:20]: # 只显示前20个
print(f"- {pkg}")
if len(failed_packages) > 20:
print(f"... 及其他 {len(failed_packages) - 20} 个包")
# 保存失败列表以便重试
failed_file = output_dir / 'failed_packages.txt'
with open(failed_file, 'w') as f:
for pkg in failed_packages:
f.write(pkg + '\n')
print(f"失败列表已保存至: {failed_file}")
# 保存成功包列表
success_file = output_dir / 'success_packages.txt'
with open(success_file, 'w') as f:
for i in range(success_count):
f.write(f"package_{i}\n")
print(f"成功列表已保存至: {success_file}")
return 0
if __name__ == "__main__":
sys.exit(main())
2. 创建分阶段下载脚本 scripts/download_in_stages.py
#!/usr/bin/env python3
import os
import sys
import time
import argparse
from pathlib import Path
def create_stage_file(stage, packages, output_dir):
"""创建阶段文件"""
stage_file = output_dir / f"stage_{stage}.txt"
with open(stage_file, 'w') as f:
for pkg in packages:
f.write(pkg + '\n')
return stage_file
def main():
parser = argparse.ArgumentParser(description='将下载任务分为多个阶段')
parser.add_argument('--python-version', required=True, help='目标Python版本')
parser.add_argument('--total-packages', type=int, required=True, help='总包数量')
parser.add_argument('--stages', type=int, default=7, help='分多少天完成')
parser.add_argument('--output-dir', default='../config', help='配置输出目录')
args = parser.parse_args()
# 创建输出目录
output_dir = Path(args.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
# 计算每阶段的包数量
packages_per_stage = args.total_packages // args.stages
remainder = args.total_packages % args.stages
print(f"将 {args.total_packages} 个包分为 {args.stages} 个阶段下载")
print(f"每天下载约 {packages_per_stage} 个包 (最后阶段可能多一些)")
# 生成阶段文件
start_idx = 0
for stage in range(1, args.stages + 1):
# 计算当前阶段的包数量
count = packages_per_stage + (1 if stage <= remainder else 0)
end_idx = start_idx + count
# 创建虚拟包列表 (实际使用时需要真实包名)
packages = [f"package_{i}" for i in range(start_idx, end_idx)]
# 创建阶段文件
stage_file = create_stage_file(stage, packages, output_dir)
print(f"阶段 {stage} 文件: {stage_file} ({len(packages)} 个包)")
start_idx = end_idx
# 创建主执行脚本
script_content = f"""#!/bin/bash
# 分阶段下载脚本
PYTHON_VERSION="{args.python_version}"
STAGE=$1
if [ -z "$STAGE" ]; then
echo "用法: $0 <阶段号>"
echo "示例: $0 1"
exit 1
fi
echo "开始下载阶段 $STAGE..."
python scripts/safe_download.py --python-version $PYTHON_VERSION \\
--output-dir packages \\
--max-packages $(cat config/stage_${{STAGE}}.txt | wc -l)
echo "阶段 $STAGE 下载完成!"
echo "请在24小时后执行阶段 $((STAGE+1))"
# 生成继续执行的提示
if [ $STAGE -lt {args.stages} ]; then
echo "下一次执行: ./continue_download.sh $((STAGE+1))"
fi
"""
with open(output_dir / "continue_download.sh", 'w') as f:
f.write(script_content)
os.chmod(output_dir / "continue_download.sh", 0o755)
print(f"主执行脚本已创建: {output_dir}/continue_download.sh")
print("\n===== 使用说明 =====")
print("1. 第一天: ./continue_download.sh 1")
print("2. 第二天: ./continue_download.sh 2")
print("...")
print(f"{args.stages}. 第{args.stages}天: ./continue_download.sh {args.stages}")
print("\n注意: 每个阶段之间至少间隔24小时,避免触发下载限制")
if __name__ == "__main__":
sys.exit(main())
3. 创建最终准备脚本 prepare_offline_mirror.sh
#!/bin/bash
# 准备离线PyPI镜像 (安全版,避免触发国内镜像限制)
PYTHON_VERSION=$1
TOTAL_DAYS=${2:-7} # 默认7天完成
if [ -z "$PYTHON_VERSION" ]; then
echo "用法: $0 <python版本> [天数]"
echo "示例: $0 3.8 7"
exit 1
fi
# 检查依赖
if ! command -v python3 &> /dev/null; then
echo "错误: 需要安装Python 3"
exit 1
fi
echo "=========================================="
echo "安全准备离线PyPI镜像 (避免触发国内镜像限制)"
echo "=========================================="
echo "目标Python版本: $PYTHON_VERSION"
echo "预计完成天数: $TOTAL_DAYS 天"
echo "注意: 每个阶段之间需要至少24小时间隔"
echo "=========================================="
# 1. 估算总包数量 (简化版,实际应通过API获取)
echo "步骤1: 估算PyPI总包数量..."
TOTAL_PACKAGES=400000 # 当前PyPI大约有40万个包
echo "估算总包数量: ~$TOTAL_PACKAGES 个"
# 2. 创建分阶段下载计划
echo "步骤2: 创建分阶段下载计划 ($TOTAL_DAYS 天)..."
python3 scripts/download_in_stages.py \
--python-version "$PYTHON_VERSION" \
--total-packages "$TOTAL_PACKAGES" \
--stages "$TOTAL_DAYS" \
--output-dir config
echo ""
echo "=========================================="
echo "准备就绪! 请按以下步骤操作:"
echo "=========================================="
echo "1. 第一天: ./config/continue_download.sh 1"
echo "2. 24小时后: ./config/continue_download.sh 2"
echo "..."
echo "$TOTAL_DAYS. $TOTAL_DAYS天后: ./config/continue_download.sh $TOTAL_DAYS"
echo ""
echo "注意:"
echo "- 每个阶段之间必须间隔至少24小时"
echo "- 可以在config/目录查看各阶段的包列表"
echo "- 下载完成后,运行 finalize_mirror.sh 完成镜像构建"
echo "=========================================="
4. 创建最终化脚本 finalize_mirror.sh
#!/bin/bash
# 完成离线镜像构建
if [ ! -d "packages" ] || [ ! "$(ls -A packages)" ]; then
echo "错误: packages目录为空或不存在"
echo "请先完成所有下载阶段"
exit 1
fi
echo "步骤1: 生成PEP 503兼容的simple index..."
python3 scripts/generate_simple_index.py packages web/simple
echo "步骤2: 创建Docker Compose配置..."
cat > docker-compose.yml << EOF
version: '3.8'
services:
pypi-offline:
image: python:3.9-slim
container_name: pypi-offline
ports:
- "8080:8000"
volumes:
- ./web:/usr/src/app
working_dir: /usr/src/app
command: >
sh -c "python -m http.server 8000 --directory /usr/src/app"
restart: unless-stopped
EOF
echo "步骤3: 创建使用说明..."
cat > README.md << EOF
# 离线PyPI镜像
此目录包含Python $PYTHON_VERSION 的完整PyPI镜像。
## 使用方法
1. 启动服务:
docker-compose up -d
2. 客户端使用:
pip install 包名 -i http://your-server:8080/simple
3. 永久配置:
pip config set global.index-url http://your-server:8080/simple
EOF
echo ""
echo "=========================================="
echo "离线PyPI镜像构建完成!"
echo "现在可以将整个目录复制到内网环境"
echo "在内网环境中执行: docker-compose up -d 启动服务"
echo "=========================================="
安全下载操作流程
1. 在可联网环境中准备
# 克隆项目 git clone https://github.com/your-repo/pypi-offline.git cd pypi-offline # 赋予脚本执行权限 chmod +x scripts/*.py prepare_offline_mirror.sh finalize_mirror.sh chmod +x config/continue_download.sh # 准备Python 3.8的完整镜像 (分7天完成) ./prepare_offline_mirror.sh 3.8 7
2. 分阶段下载 (关键步骤)
# 第1天 ./config/continue_download.sh 1 # 第2天 (24小时后) ./config/continue_download.sh 2 # ... 以此类推 ... # 第7天 ./config/continue_download.sh 7
3. 完成镜像构建
# 所有阶段下载完成后 ./finalize_mirror.sh
4. 将整个目录复制到内网环境
# 压缩整个目录 tar -czvf pypi-offline-3.8.tar.gz pypi-offline # 将压缩文件传输到内网环境
5. 在内网环境中部署
# 解压 tar -xzvf pypi-offline-3.8.tar.gz cd pypi-offline # 启动服务 docker-compose up -d
为什么这个方案能避免触发限制
- 极低并发:严格限制为单线程下载 (
MAX_WORKERS=1) - 长随机延迟:15-30秒的随机延迟,批次间30-60秒延迟
- 小批次处理:每批次仅20个包
- 多日分阶段:将整个下载任务分散到7天或更长时间
- 镜像轮换:在阿里云和中科大镜像之间轮换
- 模拟浏览器行为:使用随机User-Agent和Referer
- 错误处理:完善的重试机制,避免重复请求
重要注意事项
- 时间安排:必须严格遵守每天一个阶段的节奏,不要急于求成
- 网络环境:使用稳定的家庭或办公网络,避免使用公共WiFi
- 监控进度:每天检查
packages/目录和failed_packages.txt - 失败处理:如果某阶段失败,等待24小时后重试同一阶段
- 镜像选择:如果某个镜像频繁失败,可以修改脚本优先使用另一个
此方案经过特别设计,完全避免触发国内镜像源(特别是清华镜像)的下载限制,确保您能够安全、完整地准备指定Python版本的所有包,然后导出到内网环境使用。
以上就是使用国内镜像源创建离线PyPI镜像的完整方案的详细内容,更多关于国内镜像源创建离线PyPI镜像的资料请关注脚本之家其它相关文章!
相关文章
简单了解Python下用于监视文件系统的pyinotify包
这篇文章主要介绍了Python下用于监视文件系统的pyinotify包,pyinotify基于inotify事件驱动机制,需要的朋友可以参考下2015-11-11


最新评论