python对于requests的封装方法详解

 更新时间:2019年01月03日 09:11:42   作者:可昌   我要评论

今天小编就为大家分享一篇python对于requests的封装方法详解,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧

由于requests是http类接口的核心,因此封装前考虑问题比较多:

1. 对多种接口类型的支持;

2. 连接异常时能够重连;

3. 并发处理的选择;

4. 使用方便,容易维护;

当前并未全部实现,后期会不断完善。重点提一下并发处理的选择:python的并发处理机制由于存在GIL的原因,实现起来并不是很理想,综合考虑多进程、多线程、协程,在不考虑大并发性能测试的前提下使用了多线程-线程池的形式实现。使用的是

concurrent.futures模块。当前仅方便支持webservice接口。

# -*- coding:utf-8 -*-
 
import requests
from concurrent.futures import ThreadPoolExecutor
from Tools.Config import Config # 配置文件读取
from Tools.Log import Log # 日志管理
from Tools.tools import decoLOG # 日志装饰
 
'''
  功能:   Requests类
  使用方法: 
  作者:   郭可昌
  作成时间: 20180224
  更新内容:
  更新时间:
'''
class Requests(object):
  def __init__(self):
    self.session = requests.session()
    self.header = {}
    # URL默认来源于配置文件,方便不同测试环境的切换,也可以动态设定
    self.URL = Config().getURL()
    # 默认60s,可以动态设定
    self.timeout = 60
    #http连接异常的场合,重新连接的次数,默认为3,可以动态设定
    self.iRetryNum = 3
 
    self.errorMsg = ""
    # 内容 = {用例编号:响应数据}
    self.responses = {}
    # 内容 = {用例编号:异常信息}
    self.resErr={}
 
 
  # 原始post使用保留
  # bodyData: request's data
  @decoLOG
  def post(self, bodyData):
    response = None
    self.errorMsg = ""
 
    try:
      response = self.session.post(self.URL, data=bodyData.encode('utf-8'), headers=self.header, timeout=self.timeout)
      response.raise_for_status()
    except Exception as e:
      self.errorMsg = str(e)
      Log().logger.error("HTTP请求异常,异常信息:%s" % self.errorMsg)
    return response
 
 
  # 复数请求并发处理,采用线程池的形式,用例数>线程池的容量:线程池的容量为并发数,否则,用例数为并发数
  # dicDatas: {用例编号:用例数据}
  @decoLOG
  def req_all(self, dicDatas, iThreadNum=5):
 
    if len(dict(dicDatas)) < 1:
      Log().logger.error("没有测试对象,请确认后再尝试。。。")
      return self.responses.clear()
 
    # 请求用例集合转换(用例编号,用例数据)
    seed = [i for i in dicDatas.items()]
    self.responses.clear()
 
    # 线程池并发执行,iThreadNum为并发数
    with ThreadPoolExecutor(iThreadNum) as executor:
      executor.map(self.req_single,seed)
 
    # 返回所有请求的响应信息({用例编号:响应数据}),http连接异常:对应None
    return self.responses
 
  # 用于单用例提交,http连接失败可以重新连接,最大重新连接数可以动态设定
  def req_single(self, listData, reqType="post", iLoop=1):
    response = None
    # 如果达到最大重连次数,连接后提交结束
    if iLoop == self.iRetryNum:
      if reqType == "post":
        try:
          response = requests.post(self.URL, data=listData[1].encode('utf-8'), headers=self.header,
                       timeout=self.timeout)
          response.raise_for_status()
        except Exception as e:
          # 异常信息保存只在最大连接次数时进行,未达到最大连接次数,异常信息为空
          self.resErr[listData[0]] = str(e)
          Log().logger.error("HTTP请求异常,异常信息:%s【%d】" % (str(e), iLoop))
 
        self.responses[listData[0]] = response
      else:
        # for future: other request method expand
        pass
    # 未达到最大连接数,如果出现异常,则重新连接尝试
    else:
      if reqType == "post":
        try:
          response = requests.post(self.URL, data=listData[1].encode('utf-8'), headers=self.header,
                       timeout=self.timeout)
          response.raise_for_status()
        except Exception as e:
          Log().logger.error("HTTP请求异常,异常信息:%s【%d】" % (str(e), iLoop))
          # 重连次数递增
          iLoop += 1
          # 进行重新连接
          self.req_single(listData, reqType, iLoop)
          # 当前连接终止
          return None
        self.responses[listData[0]] = response
      else:
        # for future: other request method expand
        pass
 
  # 设定SoapAction, 快捷完成webservice接口header设定
  def setSoapAction(self, soapAction):
    self.header["SOAPAction"] = soapAction
    self.header["Content-Type"] = "text/xml;charset=UTF-8"
    self.header["Connection"] = "Keep-Alive"
    self.header["User-Agent"] = "InterfaceAutoTest-run"
 

以上这篇python对于requests的封装方法详解就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • python如何重载模块实例解析

    python如何重载模块实例解析

    这篇文章主要介绍了python如何重载模块实例解析,涉及模块的概念,载入和重载的实例,小编觉得还是挺不错的,具有一定借鉴价值,需要的朋友可以参考下
    2018-01-01
  • 初步解析Python下的多进程编程

    初步解析Python下的多进程编程

    这篇文章主要介绍了初步解析Python下的多进程编程,使用多进程编程一直是Python编程当中的重点和难点,需要的朋友可以参考下
    2015-04-04
  • Python 分析Nginx访问日志并保存到MySQL数据库实例

    Python 分析Nginx访问日志并保存到MySQL数据库实例

    这篇文章主要介绍了Python 分析Nginx访问日志并保存到MySQL数据库实例,需要的朋友可以参考下
    2014-03-03
  • Python 类的继承实例详解

    Python 类的继承实例详解

    这篇文章主要介绍了Python 类的继承实例详解的相关资料,需要的朋友可以参考下
    2017-03-03
  • 一个小示例告诉你Python语言的优雅之处

    一个小示例告诉你Python语言的优雅之处

    本篇中, 我们展示一下一段非常小的代码, 这段代码十分吸引我们, 因为它使用十分优雅和直接的方式解决了一个常见的问题.
    2014-07-07
  • Python pymongo模块用法示例

    Python pymongo模块用法示例

    这篇文章主要介绍了Python pymongo模块用法,结合实例形式分析了Python中pymongo模块的安装与简单使用相关操作技巧,需要的朋友可以参考下
    2018-03-03
  • Python基础之getpass模块详细介绍

    Python基础之getpass模块详细介绍

    最近在看Python标准库官方文档的时候偶然发现了这个模块。仔细一看内容挺少的,只有两个主要api,就花了点时间阅读了一下源码,感觉挺实用的,在这安利给大家。下面这篇文章主要给大家介绍了关于Python基础之getpass模块的相关资料,需要的朋友可以参考下。
    2017-08-08
  • python使用xauth方式登录饭否网然后发消息

    python使用xauth方式登录饭否网然后发消息

    这篇文章主要介绍了python使用xauth方式登录饭否网然后发消息示例,需要的朋友可以参考下
    2014-04-04
  • Numpy截取指定范围内的数据方法

    Numpy截取指定范围内的数据方法

    今天小编就为大家分享一篇Numpy截取指定范围内的数据方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2018-11-11
  • Python中使用strip()方法删除字符串中空格的教程

    Python中使用strip()方法删除字符串中空格的教程

    这篇文章主要介绍了Python中使用strip()方法删除字符串中空格的教程,是Python入门学习中的基础知识,需要的朋友可以参考下
    2015-05-05

最新评论