使用Python实现监测APP的使用流量

 更新时间:2026年04月23日 09:12:44   作者:lsp84ch80  
这篇文章主要为大家详细介绍了如何使用Python实现监测APP的使用流量,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下

本文介绍了一种用于监测单个APK包上下行及总流量的方法,同时实现了定时备份与邮件提醒功能。通过Python脚本,可以获取设备UID,计算设备流量,并将数据保存至Excel表格中。定时备份功能确保数据安全,而邮件提醒则方便远程监控。 

实现代码如下:

#!/usr/bin/env python
# _*_ coding: utf-8 _*_
'''
# @Time    : 2018/1/17 21:54
# @Author  : Soner
# @version : V1.0
# @license : Copyright(C), Your Company
# @实现的功能:
            1.单APK包的上下行及总流量监测
            2.定时备份
            3.定时发送邮件提醒
'''
import time
import subprocess
import xlwt
import os
import shutil
import datetime
# ----------------- 获取设备UID -----------------
def getUid(package_name):#获取UID
    cmd = 'adb shell dumpsys package ' + package_name + ' | findstr userId'
    p1 = subprocess.Popen(cmd, shell = True,
    stdout=subprocess.PIPE, stderr = subprocess.PIPE)#用adb获取信息
    uidLongString = str(p1.stdout.read().strip(), encoding="utf-8")
    uidLongList = uidLongString.split("=")
    uid = uidLongList[1]
    return uid[0:5]
# ----------------- 计算设备流量 -----------------
def getFlowFromUid(packagename, uid=None):
    '''
    # 通过应用uid,获取应用当前消耗的流量
    # 获取PID adb shell ps | findstr 包名   如果是LINUx 系统 findstr 换成 grep
    # 获取网络标识及流量 “adb shell cat /proc/应用PID码/net/dev”
    # return (rcv,snd)
    '''
    if uid is None:
        uid = getUid(packagename)
    cmd = 'adb shell cat /proc/net/xt_qtaguid/stats | findstr %s' % uid
    std = os.popen(cmd)
    net_rcv_bck = []
    net_rcv_front = []
    net_snd_bck = []
    net_snd_front = []
    lo_rcv_bck = []
    lo_rcv_front = []
    lo_snd_bck = []
    lo_snd_front = []
    for line in std:        # if 判断里的 “ccmni0”根据使用的SIM卡变换,具体的需要先去查询改掉这里,不然会出错“adb shell cat /proc/应用PID码/net/dev”
        data = line.split()
        if 'ccmni0' in line:
            background_flow = int(data[4]) == 0
            if background_flow:
                net_rcv_bck.append(int(data[5]))
                net_snd_bck.append(int(data[7]))
            else:
                net_rcv_front.append(int(data[5]))
                net_snd_front.append(int(data[7]))
        elif 'lo' in line:
            background_flow = int(data[4]) == 0
            if background_flow:
                lo_rcv_bck.append(int(data[5]))
                lo_snd_bck.append(int(data[7]))
            else:
                lo_rcv_front.append(int(data[5]))
                lo_snd_front.append(int(data[7]))
    return sum(net_rcv_bck), sum(net_rcv_front), sum(net_snd_bck), sum(net_snd_front), \
            sum(lo_rcv_bck), sum(lo_rcv_front), sum(lo_snd_bck), sum(lo_snd_front)
# ----------------- 发送邮件 -----------------
''''' 
函数说明:Send_email_text() 函数实现发送带有附件的邮件,可以群发,附件格式包括:xlsx,pdf,txt,jpg,mp3等 
参数说明: 
    1. subject:邮件主题 
    2. content:邮件正文 
    3. filepath:附件的地址, 输入格式为["","",...] 
    4. receive_email:收件人地址, 输入格式为["","",...] 
'''  
def Send_email_text(subject,content,filepath,receive_email):  
    import smtplib  
    from email.mime.multipart import MIMEMultipart   
    from email.mime.text import MIMEText   
    from email.mime.application import MIMEApplication  
    sender = "Youn Email"  # 发送人的账号, 已配置为 163 邮箱服务,如果使用其它的邮箱,可在方法下面修改
    passwd = "PASSWORD"     # 发送人的密码  
    receivers = receive_email   #收件人邮箱  
    msgRoot = MIMEMultipart()   
    msgRoot['Subject'] = subject  
    msgRoot['From'] = sender  
    if len(receivers)>1:  
        msgRoot['To'] = ','.join(receivers) #群发邮件  
    else:  
        msgRoot['To'] = receivers[0]  
    part = MIMEText(content)   
    msgRoot.attach(part)  
    ##添加附件部分--可以自己追加类型,或者全部打开自动判断  
    for path in filepath:
        # if ".jpg" in path:
        #     #jpg类型附件
        #     jpg_name = path.split("\\")[-1]
        #     part = MIMEApplication(open(path,'rb').read())
        #     part.add_header('Content-Disposition', 'attachment', filename=jpg_name)
        #     msgRoot.attach(part)
        #
        # if ".pdf" in path:
        #     #pdf类型附件
        #     pdf_name = path.split("\\")[-1]
        #     part = MIMEApplication(open(path,'rb').read())
        #     part.add_header('Content-Disposition', 'attachment', filename=pdf_name)
        #     msgRoot.attach(part)
        if ".xls" in path:  
            #xlsx类型附件  
            xlsx_name = path.split("\\")[-1]  
            part = MIMEApplication(open(path,'rb').read())   
            part.add_header('Content-Disposition', 'attachment', filename=xlsx_name)  
            msgRoot.attach(part)  
        # if ".txt" in path:
        #     #txt类型附件
        #     txt_name = path.split("\\")[-1]
        #     part = MIMEApplication(open(path,'rb').read())
        #     part.add_header('Content-Disposition', 'attachment', filename=txt_name)
        #     msgRoot.attach(part)
        #
        # if ".mp3" in path:
        #     #mp3类型附件
        #     mp3_name = path.split("\\")[-1]
        #     part = MIMEApplication(open(path,'rb').read())
        #     part.add_header('Content-Disposition', 'attachment', filename=mp3_name)
        #     msgRoot.attach(part)
    try:
        global m
        m = smtplib.SMTP()  
        m.connect("smtp.163.com") #这里我使用的是163邮箱,也可以使用其它邮箱
        m.login(sender, passwd)  
        m.sendmail(sender, receivers, msgRoot.as_string())  
        print("邮件发送成功")  
    except smtplib.SMTPException as e:  
        print("Error, 发送失败")  
    finally:  
        m.quit()
# ----------------- 创建EXCEL表格 -----------------
col =0
row =0
book_Mirror = xlwt.Workbook(encoding='utf-8', style_compression=0)  # 创建新的工作簿Mirror
sheet_load_Mirror = book_Mirror.add_sheet('流量', cell_overwrite_ok=True)  # 创建新的sheet,并命名为流量
sheet_load_Mirror.write(row, col, "时间")
sheet_load_Mirror.write(row, col + 1, "网络下行(KB)")
sheet_load_Mirror.write(row, col + 2, "网络上行(KB)")
sheet_load_Mirror.write(row, col + 3, "网络总流量(KB)")
sheet_load_Mirror.write(row, col + 4, "本地下行(KB)")
sheet_load_Mirror.write(row, col + 5, "本地上行(KB)")
sheet_load_Mirror.write(row, col + 6, "本地总流量(KB)")
# ----------------- 需要监测的包名 -----------------
time_end =0 # 监测初始时间,单位秒
package_name_Mirror= "You 的APK名称"  # 改成自己需要测试的APk包名
uid = (getUid(package_name_Mirror))[0:5]
try:
    uid_sdk = getUid(package_name_Mirror)
    print(time.strftime('%Y-%m-%d   %H:%M:%S',time.localtime(time.time())) +'  uid =  '+str(uid_sdk))
except:
    print('获取Mirror-uid失败')
# ----------------- 定位文件目录 -----------------
file_dir = "D:\\"   # 可改成自己的目录,要与下边对称
os.chdir(file_dir)
row =1
col =0
s = 1
net_bck_start_rx, net_front_start_rx, net_bck_start_tx, net_front_start_tx, \
lo_bck_start_rx, lo_front_start_rx, lo_bck_start_tx, lo_front_start_tx = getFlowFromUid(package_name_Mirror, uid)
net_start_rx = net_bck_start_rx + net_front_start_rx
net_start_tx = net_bck_start_tx + net_front_start_tx
lo_start_rx = lo_bck_start_rx + lo_front_start_rx
lo_start_tx = lo_bck_start_tx + lo_front_start_tx
i = 1
time_k,time_s = 0,0
while   time_end <= 259200:
    net_bck_end_rx, net_front_end_rx, net_bck_end_tx, net_front_end_tx, \
    lo_bck_end_rx, lo_front_end_rx, lo_bck_end_tx, lo_front_end_tx = getFlowFromUid(package_name_Mirror, uid)
    net_end_rx = net_bck_end_rx + net_front_end_rx
    net_end_tx = net_bck_end_tx + net_front_end_tx
    lo_end_rx = lo_bck_end_rx + lo_front_end_rx
    lo_end_tx = lo_bck_end_tx + lo_front_end_tx
    net_flow_rx, net_flow_tx = net_end_rx - net_start_rx, net_end_tx - net_start_tx
    lo_flow_rx, lo_flow_tx = lo_end_rx - lo_start_rx, lo_end_tx - lo_start_tx
    net_rx_kb, net_tx_kb = round(net_flow_rx / 1024, 3), round(net_flow_tx / 1024, 3)
    lo_rx_kb, lo_tx_kb = round(lo_flow_rx / 1024, 3), round(lo_flow_tx / 1024, 3)
    timeNow = time.strftime('%Y-%m-%d %H-%M-%S', time.localtime(time.time()))  # 获取当前时间用于输出
    timenew = time.strftime('%Y-%m-%d %H-%M', time.localtime(time.time()))  # 获取当前时间用户备份时的命名
    sheet_load_Mirror.write(row, col, timeNow)  # 写入时间
    sheet_load_Mirror.write(row, col + 1, net_rx_kb)  # 写入网络下行(KB)
    sheet_load_Mirror.write(row, col + 2, net_tx_kb)  # 写入网络上行(KB)
    sheet_load_Mirror.write(row, col + 3, round(net_rx_kb + net_tx_kb, 3))  # 写入网络总流量(KB)
    sheet_load_Mirror.write(row, col + 4, lo_rx_kb)  # 写入本地上行(KB)
    sheet_load_Mirror.write(row, col + 5, lo_tx_kb)  # 写入本地下行(KB)
    sheet_load_Mirror.write(row, col + 6, round(lo_rx_kb + lo_tx_kb, 3))  # 写入本地总流量(KB)
    book_Mirror.save("d:\Mirror_Folw.xls")  # 保存EXCEL表
    print(" %s ---------- %s %s ----------" % (row, package_name_Mirror, timeNow))
    print(
          '网络下行:', net_rx_kb, 'KB\t',
          '网络上行:', net_tx_kb, 'KB\t',
          '网络总流量', round(net_rx_kb + net_tx_kb, 3), 'KB\t\t',
          '本地下行:', lo_rx_kb, 'KB\t',
          '本地上行:', lo_tx_kb, 'KB\t',
          '本地总流量', round(lo_rx_kb + lo_tx_kb, 3), 'KB\t\n'
          )
    row = row + 1   # EXCEL表格追加下一行写入
    time.sleep(10)  # 控制监测频率
    time_end += 10  # 监测计时器
    time_s += 10    # 备份计时器
    time_k += 10    # 邮件计时器
    # 定时备份
    if time_s == 300:
        shutil.copy("Mirror_Folw.xls", "d:\\test\\Mirror_Folw_%s.xls" % timenew)    # 改成自己需要复制的文件(此处的路径为上面定位目录),和复制后的文件路径以及名称
        time_s = 0
        print('备份成功~!')
    try:
        # 定时发送邮件
        if time_end == (1800 * i):
            subject = "邮件标题"
            content = "邮件正文"
            Mirror_path = "d:\\test\\Mirror_Folw_%s.xls" % timenew # 附件地址
            file_path = [Mirror_path]  # 可添加多个附件到邮箱
            receive_email = ["接收人的邮箱"]
            Send_email_text(subject,content,file_path,receive_email)
            i += 1
    except:
        continue
print("---------- END  统计时长:%s----------" % str(time_k))

知识扩展:

下面我们就来看看如何使用 Python 获取 Android App 的网络数据,核心思路主要有两种:

1) 直接模拟 App 请求,这通常是绕过 App 复杂逻辑进行高效数据采集的最终目的;

2) 进行实时的网络流量捕获与分析。最终采用哪种路径,取决于你的技术栈和目标 App 的防护强度。

核心方法一:App 请求模拟(直接数据采集)

这是指通过分析 App 的网络请求,找到规律后,直接用 Python 脚本(如 requests 库)构造请求来模拟 App 的行为,直接获取数据。它的目标是实现高效、长期稳定的数据采集。

实践流程:使用抓包工具分析 → 定位关键接口 → 分析参数规律 → 使用 requests 库等编写 Python 脚本模拟请求。

核心方法二:网络流量捕获与解析

通过设置代理或在设备上抓包,实时拦截 App 产生的网络数据包,再进行解析。

  • mitmproxy:一款强大的可编程中间人代理,原生支持 Python 脚本,可以实现请求和响应的实时拦截、修改、自动化处理。它非常适合需要实时介入或高度自动化处理的场景,且跨平台。
  • tcpdump:一个经典的网络抓包工具,通过 adb 在 Android 设备上运行,将网络数据包保存为 .pcap 文件。
  • r0capture:一款基于 Frida 的“通杀型”抓包工具,能够在应用层直接捕获解密后的明文数据。它尤其适合用于对抗 SSL Pinning 和强加密的 App,是目前处理高防护 App 的最强方案之一。

核心方法三:深度分析辅助

  • Frida:一款强大的动态插桩框架,允许你在运行时注入 JavaScript 代码来 Hook App 的函数。它主要用于突破客户端安全防护,如绕过 SSL Pinning 证书校验或代理检测,是使用 r0capture 的基础,也为分析加密参数提供强大支持。
  • Scapy:一款强大的 Python 库,可以用来处理 .pcap 文件,从原始数据包中过滤和解码 TCP、UDP 等协议数据。

实战指南

以下是三种主流方案的详细操作指南,请根据你的技术背景和目标 App 的防护强度选择。

方案一:mitmproxy 实时代理(适合实时介入与自动化)

该方案的核心原理是在电脑上启动 mitmproxy 服务,并将手机的网络代理指向电脑,从而让所有流量都经过代理,实现拦截、修改或自动化处理。该方案无法绕过证书绑定(SSL Pinning)

安装与启动:通过 pip install mitmproxy 安装。之后,可使用 mitmweb(Web界面)或 mitmdump -s your_script.py(加载脚本)来启动。

配置手机:确保手机与电脑在同一 Wi-Fi,在手机 Wi-Fi 设置中开启代理,填入电脑的局域网 IP 和 mitmproxy 端口(默认 8080)。首次使用需访问 mitm.it 下载并安装证书。

编写 Python 脚本:新建 add_header.py 文件,编写如下示例脚本,用于在请求头中添加自定义字段:

# add_header.py
def request(flow):
    flow.request.headers["X-Custom-Header"] = "Value"
    print(f"Modified headers for {flow.request.url}")

运行并查看:执行 mitmdump -s add_header.py,手机上的流量便会实时流经代理。你添加的请求头会出现在每一条请求中,其效果等同于 curl -H

方案二:r0capture + Frida 应用层抓包(最强方案,可绕过 SSL Pinning)

该方案利用 Frida 将 JavaScript 代码注入到目标 App 进程,直接从其内存中 Hook 加密函数,在数据加密前捕获明文内容。它能绕过 SSL Pinning,是对付高防护 App 的“大杀器”

环境准备:电脑端安装 frida-tools (pip install frida-tools)。准备一台已 Root 的安卓手机或模拟器,下载对应架构的 frida-server并推送到手机运行。

克隆与运行git clone https://github.com/r0ysue/r0capture.git。通过 adb devices 确认设备已连接。最后,使用以下命令启动抓包(-f 后跟目标应用包名,-p 指定输出文件):

python3 r0capture.py -U -f com.example.app -v -p captured.pcap

分析数据:该命令会生成 captured.pcap 文件。你可以用 Wireshark 打开它进行深度分析,也可以配合下文提到的 Scapy 进行自动化解析。

方案三:Scapy 离线解析 PCAP 文件(适合离线深度分析)

当完成抓包并得到 .pcap 文件后,Scapy 是解析它并提取业务数据的利器。

安装 Scapypip install scapy

解析代码示例:编写以下 Python 脚本,从 .pcap 文件中提取 HTTP 请求信息。

from scapy.all import rdpcap, TCP, IP, Raw
packets = rdpcap('captured.pcap') # 加载 PCAP 文件
for pkt in packets:
    if IP in pkt and TCP in pkt and Raw in pkt:
        payload = pkt[Raw].load
        try:
            # 简单解码 HTTP 请求行
            http_text = payload.decode('utf-8', errors='ignore')
            if http_text.startswith('GET') or http_text.startswith('POST'):
                print(f"发现 HTTP 请求: {http_text.splitlines()[0]}")
        except:
            pass

到此这篇关于使用Python实现监测APP的使用流量的文章就介绍到这了,更多相关Python监测APP流量内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 使用Python读取json文件的方法小结

    使用Python读取json文件的方法小结

    这篇文章主要给大家介绍了Python读取json文件的方法,使用python读取json文件,输出结果为字符串或python对象,文中有详细的代码示例和图解,感兴趣的小伙伴可以自己动手试一试
    2023-09-09
  • OpenCV特征提取与检测之Shi-Tomasi角点检测器

    OpenCV特征提取与检测之Shi-Tomasi角点检测器

    在角点检测的世界里哈瑞斯无疑是最重要的方法之一,但Shi-Tomasi作为改进的算法也有很大应用场景,尤其是动态跟踪用的还比较多,这篇文章主要给大家介绍了关于OpenCV特征提取与检测之Shi-Tomasi角点检测器的相关资料,需要的朋友可以参考下
    2021-08-08
  • Python使用matplotlib绘制多个图形单独显示的方法示例

    Python使用matplotlib绘制多个图形单独显示的方法示例

    这篇文章主要介绍了Python使用matplotlib绘制多个图形单独显示的方法,结合实例形式分析了matplotlib实现绘制多个图形单独显示的具体操作技巧与注意事项,代码备有较为详尽的注释便于理解,需要的朋友可以参考下
    2018-03-03
  • Python之Numpy的超实用基础详细教程

    Python之Numpy的超实用基础详细教程

    这篇文章主要介绍了Python之Numpy的超实用基础详细教程,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-10-10
  • python中字符串的编码与解码详析

    python中字符串的编码与解码详析

    这篇文章主要给大家介绍了关于python中字符串的编码与解码的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-12-12
  • 基于Python+smtplib实现邮件自动发送功能

    基于Python+smtplib实现邮件自动发送功能

    工作中总有各种邮件需要定期发送,手动操作不仅繁琐还容易忘记,Python的smtplib库完美解决这个痛点,几行代码就能搞定邮件自动发送,还能加上附件、HTML格式美化、定时任务等花样玩法,所以本文介绍了基于Python+smtplib实现邮件自动发送功能,需要的朋友可以参考下
    2025-09-09
  • 浅谈django开发者模式中的autoreload是如何实现的

    浅谈django开发者模式中的autoreload是如何实现的

    下面小编就为大家带来一篇浅谈django开发者模式中的autoreload是如何实现的。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-08-08
  • Python图像处理之图像金字塔的向上和向下取样

    Python图像处理之图像金字塔的向上和向下取样

    图像金字塔是指由一组图像且不同分别率的子图集合,它是图像多尺度表达的一种,以多分辨率来解释图像的结构,主要用于图像的分割或压缩。本文主要介绍了图像金字塔的图像向下取样和向上取样,感兴趣的可以了解一下
    2022-09-09
  • Python爬虫入门案例之回车桌面壁纸网美女图片采集

    Python爬虫入门案例之回车桌面壁纸网美女图片采集

    读万卷书不如行万里路,学的扎不扎实要通过实战才能看出来,今天小编给大家带来一个python爬虫案例,采集回车桌面网站的美女图片,大家可以在过程中查缺补漏,看看自己掌握程度怎么样
    2021-10-10
  • python中利用h5py模块读取h5文件中的主键方法

    python中利用h5py模块读取h5文件中的主键方法

    今天小编就为大家分享一篇python中利用h5py模块读取h5文件中的主键方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2018-06-06

最新评论