Tornado源码分析之HTTP服务请求解析

 更新时间:2023年09月08日 10:08:27   作者:BruceChen7  
这篇文章主要为大家介绍了Tornado源码分析之HTTP服务请求解析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

listen fd的读事件回调

代码版本 tornado1.2版本下httpserver.py

Tornado定义类HTTPServer来表示一个HTTP服务器,该类在构造函数中会传入事件循环ioloop,和Application对象。同时该HTTPServer提供了如下几种方法:

  • listen() 表示该Server的监听方法,调用该方法时,通过调用bind已经将套接字设置成non-blocking,并使用socket.SO_REUSEADDR。
  • bind() 方法表示Server绑定端口,并设置socket 为non-blocking.
  • start() 方法则是在ioloop中启动该服务器。在不给start()方法传入任何参数的情况下,使用单进程模型的IOLoop。

在start()方法启动服务器时,要向IOLoop中注册对listen fd的可读事件的回调,listen fd可读,表示有新的客户接入到HTTPServer中。我们来看看接入HTTPServer的事件回调_handle_events

def _handle_events(self, fd, events):
    while True:
        try:
            connection, address = self._socket.accept()
        except socket.error, e:
            if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
                return
            raise
        if self.ssl_options is not None:
            assert ssl, "Python 2.6+ and OpenSSL required for SSL"
            try:
                connection = ssl.wrap_socket(connection,
                                             server_side=True,
                                             do_handshake_on_connect=False,
                                             **self.ssl_options)
            except ssl.SSLError, err:
                if err.args[0] == ssl.SSL_ERROR_EOF:
                    return connection.close()
                else:
                    raise
            except socket.error, err:
                if err.args[0] == errno.ECONNABORTED:
                    return connection.close()
                else:
                    raise
        try:
            if self.ssl_options is not None:
                stream = iostream.SSLIOStream(connection, io_loop=self.io_loop)
            else:
                stream = iostream.IOStream(connection, io_loop=self.io_loop)
            HTTPConnection(stream, address, self.request_callback,
                           self.no_keep_alive, self.xheaders)
        except:
            logging.error("Error in connection callback", exc_info=True)

不看ssl处理部分。首先基于listen fd调用accept函数,获取connection和address,注意到tornado处理了spurious wakeup的情况:

if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
    return

也就是说,当前listen fd,没有数据可以读了,表示没有新的客户连接过来,那么就应该返回。回调函数_handle_events,创建了IOStream对象stream

  stream = iostream.SSLIOStream(connection, io_loop=self.io_loop)

注意到创建IOStream对象会给单进程的IOLoop添加新的回调函数,该函数是用来处理accept fd的读事件,此时,在有客户接入过来,那么IOLoop中的handler有如下几个:

其中,

  • 描述符4是listen fd 读事件的处理回调
  • 描述符7 是用来唤醒IOLoop _read_waker
  • 描述符9 是新accept文件描述符读事件的回调.

accept fd 回调函数

 def _handle_events(self, fd, events):
        # 确保该连接还存在
        if not self.socket:
            logging.warning("Got events for closed stream %d", fd)
            return
        try:
            # 如果该连接的读事件产生了,调用读回调
            if events & self.io_loop.READ:
                self._handle_read()
            if not self.socket:
                return
            # 如果是写事件
            if events & self.io_loop.WRITE:
                # 如果该socket是客户端创建的socket, 其已经被服务器处理
                if self._connecting:
                    # 连接服务器端
                    self._handle_connect()
                # 普通的写事件回调
                self._handle_write()
            if not self.socket:
                return
            # 错误
            if events & self.io_loop.ERROR:
                # IOLoop中删除该文件描述符对应的handler
                # 并关闭连接.
                self.close()
                return
            state = self.io_loop.ERROR
            # 如果正在读
            if self.reading():
                # 添加状态继续读
                state |= self.io_loop.READ
            if self.writing():
               # 如果正在写,添加状态写
                state |= self.io_loop.WRITE
            if state != self._state:
                self._state = state
               # 更新对应fd对应的关注状态
                self.io_loop.update_handler(self.socket.fileno(), self._state)
        except:
            logging.error("Uncaught exception, closing connection.",
                          exc_info=True)
            self.close()
            raise

HTTPConnection对象的创建

接着我们将思路放回到listen fd 的回调函数_handle_events中,在IOStream对象stream创建后,_handle_events将创建HTTPConnection

HTTPConnection(stream, address, self.request_callback,
                               self.no_keep_alive, self.xheaders)

在HTTPConnnection构造函数中,可以看到起调用了

self.stream.read_until("\r\n\r\n", self._header_callback)

此时,HTTPConnection中的成员stream,表示的是accept fd中的streamread_until函数将会从accept fd中一直读到\r\n\n,然后调用_header_callback来解析HTTP的头部字段。

read_until函数

read_until从套接字中就是读到指定的分隔符为止

 def read_until(self, delimiter, callback):
        """Call callback when we read the given delimiter."""
        assert not self._read_callback, "Already reading"
        self._read_delimiter = delimiter
        self._read_callback = stack_context.wrap(callback)
        while True:
            # See if we've already got the data from a previous read
            if self._read_from_buffer():
                return
            self._check_closed()
            if self._read_to_buffer() == 0:
                break
        self._add_io_state(self.io_loop.READ)

readl_until有两种退出方式:一种是从buffer中读到数据后,直接返回,另一种是将数据读到buffer,_\read_to_buffer就是将数据读到buffer,如果没有数据,则将该socket的读事件添加进来。

accept fd获取后,显然该fd对应的IOStream中的缓冲区为0,所以_read_from_buffer返回False,流程将执行_read_to_buffer

从缓冲区读_read_from_buffer

  def _read_from_buffer(self):
        """Attempts to complete the currently-pending read from the buffer.
        Returns True if the read was completed.
        """
        if self._read_bytes:
            if self._read_buffer_size() >= self._read_bytes:
                num_bytes = self._read_bytes
                callback = self._read_callback
                self._read_callback = None
                self._read_bytes = None
                self._run_callback(callback, self._consume(num_bytes))
                return True
        elif self._read_delimiter:
            _merge_prefix(self._read_buffer, sys.maxint)
            loc = self._read_buffer[0].find(self._read_delimiter)
            if loc != -1:
                callback = self._read_callback
                delimiter_len = len(self._read_delimiter)
                self._read_callback = None
                self._read_delimiter = None
                self._run_callback(callback,
                                   self._consume(loc + delimiter_len))
                return True
        return False

read_from_buffer有两种形式的读:

  • 一种是读指定字节的数据
  • 一种是读到指定的分隔符

应该注意的是_read_from_buffer中指定了回调函数,意思是从socket中读的数据后,使用回调函数来消费。应该注意的是_read_from_buffer并跟socket打交道,其假设所有的数据已经在buffer中了。

将数据读到缓冲区

从socket中读数据到缓冲区,使用的是_read_to_buffer。read_to_buffer实际调用的是_read_from_socket,其从non-blocking中读一次,最多读4096个字节。注意错误的处理,当我们发现从socket中读,发生了其他的错误(除了EAGAIN)的时候,就应该关闭连接。

def _read_to_buffer(self):
        """Reads from the socket and appends the result to the read buffer.
        Returns the number of bytes read.  Returns 0 if there is nothing
        to read (i.e. the read returns EWOULDBLOCK or equivalent).  On
        error closes the socket and raises an exception.
        """
        try:
            chunk = self._read_from_socket()
        except socket.error, e:
            # ssl.SSLError is a subclass of socket.error
            logging.warning("Read error on %d: %s",
                            self.socket.fileno(), e)
            self.close()
            raise
        if chunk is None:
            return 0
        self._read_buffer.append(chunk)
        if self._read_buffer_size() >= self.max_buffer_size:
            logging.error("Reached maximum read buffer size")
            self.close()
            raise IOError("Reached maximum read buffer size")
        return len(chunk)

该函数总体来说就是讲将据读到bufferIOStream中的_read_buffer中,并返回实际读的大小。注意错误的处理:如果所读的数据过大,那么也应该关闭连接。

从socket读一次数据_read_from_socket

 def _read_from_socket(self):
        """Attempts to read from the socket
        Returns the data read or None if there is nothing to read.
        May be overridden in subclasses.
        """
        try:
            # 尽可能读得多
            chunk = self.socket.recv(self.read_chunk_size)
        except socket.error, e:
            if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
      # 没有数据      
                return None
            else:
                raise
        if not chunk:
    # 客户端数据没有了,那么表示客户端关闭了连接
            self.close()
            return None
        return chunk

LT模式下从socket中读数据_handle_read()

def _handle_read(self):
        while True:
            try:
                # Read from the socket until we get EWOULDBLOCK or equivalent.
                # SSL sockets do some internal buffering, and if the data is
                # sitting in the SSL object's buffer select() and friends
                # can't see it; the only way to find out if it's there is to
                # try to read it.
                result = self._read_to_buffer()
            except Exception:
                self.close()
                return
            if result == 0:
                break
            else:
                if self._read_from_buffer():
                    return

在LT模式下从socket中读分成了两步:

  • 从socket中读数据读到_read_to_buffer
  • 然后从buffer中读。

从socket读,要一直读到出现EWOULDBLOCK或者是EAGAIN。注意到while True,就是不停的读,读到没有数据。

以上就是Tornado源码分析之HTTP服务请求解析的详细内容,更多关于Tornado HTTP服务请求的资料请关注脚本之家其它相关文章!

相关文章

  • Python 添加命令行参数步骤

    Python 添加命令行参数步骤

    这篇文章主要介绍了Python 添加命令行参数步骤,文章围绕主题展开详细的内容介绍,具有一定的参考价值,需要的小伙伴可以参考一下
    2022-08-08
  • python用BeautifulSoup库简单爬虫实例分析

    python用BeautifulSoup库简单爬虫实例分析

    文章给大家分享了关于python爬虫的相关实例以及相关代码,有兴趣的朋友们参考下。
    2018-07-07
  • Python中的通函数numpy.ufunc详解

    Python中的通函数numpy.ufunc详解

    这篇文章主要介绍了什么是通函数numpy.ufunc,简单说就是numpy的函数,因为numpy针对的是数组张量,因此,几乎每一个函数都是ufunc。本文针对ufunc的属性进行研究,需要的朋友可以参考下
    2023-04-04
  • Python分支语句与循环语句应用实例分析

    Python分支语句与循环语句应用实例分析

    这篇文章主要介绍了Python分支语句与循环语句应用,结合具体实例形式详细分析了Python分支语句与循环语句各种常见应用操作技巧与相关注意事项,需要的朋友可以参考下
    2019-05-05
  • 详解如何使用Pytest进行自动化测试

    详解如何使用Pytest进行自动化测试

    这篇文章主要介绍了详解如何使用Pytest进行自动化测试,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2021-01-01
  • Django Admin 管理工具的实现

    Django Admin 管理工具的实现

    这篇文章主要介绍了Django Admin 管理工具的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2021-05-05
  • python 用户交互输入input的4种用法详解

    python 用户交互输入input的4种用法详解

    这篇文章主要介绍了python 用户交互输入input的4种用法详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-09-09
  • SymPy库关于矩阵的基本操作和运算

    SymPy库关于矩阵的基本操作和运算

    本文主要介绍了SymPy库关于矩阵的基本操作和运算,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-03-03
  • 基于PyQt5实现一个串口接数据波形显示工具

    基于PyQt5实现一个串口接数据波形显示工具

    这篇文章主要为大家详细介绍了如何利用PyQt5实现一个串口接数据波形显示工具,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起了解一下
    2023-01-01
  • Python中numpy数组的维度增减方法详解

    Python中numpy数组的维度增减方法详解

    这篇文章主要介绍了Python中numpy数组的维度增减方法详解,在操作数组情况下,需要按照某个轴将不同数组的维度对齐,这时候需要为数组添加维度(特别是将二维数组变成高维张量的情况下),numpy提供了expand_dims()函数来为数组增加维度,需要的朋友可以参考下
    2023-09-09

最新评论