Golang中fasthttp的使用详解

 更新时间:2024年03月20日 14:17:10   作者:~kiss~  
fasthttp 是一个使用 Go 语言开发的 HTTP 包,主打高性能,针对 HTTP 请求响应流程中的 hot path 代码进行了优化,下面我们就来看看它的具体使用吧

背景介绍

fasthttp was designed for some high performance edge cases.
Unless your server/client needs to handle thousands of small to medium requests per second and needs a consistent low millisecond response time fasthttp might not be for you.
For most cases net/http is much better as it’s easier to use and can handle more cases.
For most cases you won’t even notice the performance difference.

为一些高性能边缘情况而设计。除非服务器/客户端需要每秒处理数千个中小型请求(small to medium requests per second),并且需要一致的低毫秒响应时间,否则 fasthttp 可能不适合。

对于大多数情况,net/http 更好,因为它更易于使用并且可以处理更多情况。

Currently fasthttp is successfully used by VertaMedia in a production serving up to 200K rps from more than 1.5M concurrent keep-alive connections per physical server.

1台物理机1.5M的并发保活连接,200K的rps
(rps在互联网领域通常是指"Requests Per Second",每秒请求)

哪些是fasthttp的特性

1.Easily handles more than 100K qps and more than 1M concurrent keep-alive connections on modern hardware.

现代硬件架构下很容易处理超过100K qps、超过1M的并发连接

2.Optimized for low memory usage

低使用内存优化

3.Easy ‘Connection: Upgrade’ support via RequestCtx.Hijack.

Connection: Upgrade 很容易支持

4.Server provides the following anti-DoS limits:

S端提供了如下anti-DoS限制

  • The number of concurrent connections. 并发连接数
  • The number of concurrent connections per client IP. 每个C端IP的并发连接数
  • The number of requests per connection.每个连接的请求数
  • Request read timeout. 请求读超时
  • Response write timeout. 响应写超时
  • Maximum request header size. 最大请求header
  • Maximum request body size. 最大请求body
  • Maximum request execution time. 最大请求执行时长
  • Maximum keep-alive connection lifetime. 最大保活连接生命周期
  • Early filtering out non-GET requests.提早过滤非GET请求

A lot of additional useful info is exposed to request handler:

request handler中暴露出大量有用的额外信息

  • Server and client address. S/C地址
  • Per-request logger. 每次请求的日志
  • Unique request id. 唯一请求id
  • Request start time. 请求开始时间
  • Connection start time. 连接开始时间
  • Request sequence number for the current connection. 当前连接的请求序列号

Client supports automatic retry on idempotent requests’ failure. C支持幂等请求失败时自动重试

Fasthttp API is designed with the ability to extend existing client and server implementations or to write custom client and server implementations from scratch.

Fasthttp API 的设计能够扩展现有的客户端和服务器实现,或支持从头开始编写自定义客户端和服务器实现

个人理解:

看起来又是高并发、又是低内存使用、又是支持客制化的http、又是包含很多额外信息、又是支持大量反DOS的东西,不错不错

并发连接数限制

// ServeConn serves HTTP requests from the given connection.
//
// ServeConn returns nil if all requests from the c are successfully served.
// It returns non-nil error otherwise.
//
// Connection c must immediately propagate all the data passed to Write()
// to the client. Otherwise requests' processing may hang.
//
// ServeConn closes c before returning.
// 从给定的连接处理http请求
func (s *Server) ServeConn(c net.Conn) error {
	... ...
	n := atomic.AddUint32(&s.concurrency, 1) // 连接并发数+1
	if n > uint32(s.getConcurrency()) {
		atomic.AddUint32(&s.concurrency, ^uint32(0)) // 连接并发数-1
		c.Close()
		return ErrConcurrencyLimit
	}

	atomic.AddInt32(&s.open, 1)
	err := s.serveConn(c)
	atomic.AddUint32(&s.concurrency, ^uint32(0)) // 连接并发数-1
	... ...
	return err
}

默认的并发连接数

// 如果没设置并发连接数,则默认采用 DefaultConcurrency
// DefaultConcurrency is the maximum number of concurrent connections the Server may serve by default 
// (i.e. if Server.Concurrency isn't set).
const DefaultConcurrency = 256 * 1024

fasthttp的server定义

type Server struct {
    // The maximum number of concurrent connections the server may serve.
    //
    // DefaultConcurrency is used if not set.
    //
    // Concurrency only works if you either call Serve once, or only ServeConn multiple times.
    // It works with ListenAndServe as well.
    Concurrency int
}

每个C端的并发连接数限制

func wrapPerIPConn(s *Server, c net.Conn) net.Conn {
    // 读取连接c的RemoteAddr()
    ip := getUint32IP(c)
    if ip == 0 {
        return c
    }
    // 本地缓存中远端ip+1
    n := s.perIPConnCounter.Register(ip)
    if n > s.MaxConnsPerIP {
        s.perIPConnCounter.Unregister(ip)
        s.writeFastError(c, StatusTooManyRequests, "The number of connections from your ip exceeds MaxConnsPerIP")
        c.Close()
        return nil
    }
    return acquirePerIPConn(c, ip, &s.perIPConnCounter)
}

类型定义

type Server struct {
    // Maximum number of concurrent client connections allowed per IP.
    //
    // By default unlimited number of concurrent connections
    // may be established to the server from a single IP address.
    // 默认不限制
    MaxConnsPerIP int
    
    perIPConnCounter perIPConnCounter
}

本地缓存

type perIPConnCounter struct {
	pool sync.Pool // 存储下文的perIPConn
	lock sync.Mutex
	m    map[uint32]int // C端的ip计数保存在这里
}

func (cc *perIPConnCounter) Register(ip uint32) int {
	cc.lock.Lock()
	if cc.m == nil {
		cc.m = make(map[uint32]int)
	}
	n := cc.m[ip] + 1
	cc.m[ip] = n
	cc.lock.Unlock()
	return n
}

func (cc *perIPConnCounter) Unregister(ip uint32) {
	cc.lock.Lock()
	defer cc.lock.Unlock()
	if cc.m == nil {
		// developer safeguard
		panic("BUG: perIPConnCounter.Register() wasn't called")
	}
	n := cc.m[ip] - 1
	if n < 0 {
		n = 0
	}
	cc.m[ip] = n
}

perIPConn的net.Conn

type perIPConn struct {
	net.Conn

	ip               uint32
	perIPConnCounter *perIPConnCounter
}

func acquirePerIPConn(conn net.Conn, ip uint32, counter *perIPConnCounter) *perIPConn {
	v := counter.pool.Get()
	if v == nil {
		return &perIPConn{
			perIPConnCounter: counter,
			Conn:             conn,
			ip:               ip,
		}
	}
	c := v.(*perIPConn) // 从内容池获取
	c.Conn = conn
	c.ip = ip
	return c
}

func releasePerIPConn(c *perIPConn) {
	c.Conn = nil
	c.perIPConnCounter.pool.Put(c) // 归还到内容池
}

perIPConn的Close

// 什么时候close?当然是在net.Conn想close的时候
func (c *perIPConn) Close() error {
    err := c.Conn.Close() // 关闭连接
    c.perIPConnCounter.Unregister(c.ip) // 更新缓存计数
    releasePerIPConn(c) // 释放PerIPConn资源
    return err
}

低使用内存

大量运用 sync.Pool,比如上文的perIPConn

如何早过滤GET

这个没找到,不会是在reqHandler中过滤吧

超时测试

func TimeoutHandler(h RequestHandler, timeout time.Duration, msg string) RequestHandler {
	return TimeoutWithCodeHandler(h, timeout, msg, StatusRequestTimeout)
}

func TimeoutWithCodeHandler(h RequestHandler, timeout time.Duration, msg string, statusCode int) RequestHandler {
	if timeout <= 0 {
		return h
	}

	// 返回一个什么样的 RequestHandler?
	// 先往server的concurrencyCh中写struct{}{},写失败则填充statusCode=429,并发请求过多
	// ctx中创建一个缓存为1的ch,开启子协程执行:h(ctx),ch写入(标识h执行完成),server的concurrencyCh中读出
	// 主协程初始化定时器ctx.timeoutTimer
	// ch先来则不做处理,ctx.timeoutTimer.C先来则填充statusCode为传入的statusCode,body为传入的msg
	// 最后关闭ctx.timeoutTimer
	return func(ctx *RequestCtx) {
		concurrencyCh := ctx.s.concurrencyCh
		select {
		case concurrencyCh <- struct{}{}:
		default:
			ctx.Error(msg, StatusTooManyRequests)
			return
		}

		ch := ctx.timeoutCh
		if ch == nil {
			ch = make(chan struct{}, 1)
			ctx.timeoutCh = ch
		}
		go func() {
			h(ctx)
			ch <- struct{}{}
			<-concurrencyCh
		}()
		ctx.timeoutTimer = initTimer(ctx.timeoutTimer, timeout)
		select {
		case <-ch:
		case <-ctx.timeoutTimer.C:
			ctx.TimeoutErrorWithCode(msg, statusCode)
		}
		stopTimer(ctx.timeoutTimer)
	}
}

func TestTimeoutHandlerSuccess(t *testing.T) {
	t.Parallel()

	ln := fasthttputil.NewInmemoryListener()
	h := func(ctx *RequestCtx) {
		if string(ctx.Path()) == "/" {
			ctx.Success("aaa/bbb", []byte("real response"))
		}
	}
	s := &Server{
		// 一个什么样的 RequestHandler?
		// 先往server的concurrencyCh中写struct{}{},写失败则填充statusCode=429,并发请求过多
		// ctx中创建一个缓存为1的ch,开启子协程执行:h(ctx),ch写入(标识h执行完成),server的concurrencyCh中读出
		// 主协程初始化定时器ctx.timeoutTimer
		// ch先来则不做处理,ctx.timeoutTimer.C先来则填充statusCode为传入的408,body为传入的timeout!!!
		// 最后关闭ctx.timeoutTimer
		Handler: TimeoutHandler(h, 10*time.Second, "timeout!!!"),
	}
	// serverCh 标识子协程是否处理结束,主协程要监控它,子协程在执行完后close(serverCh),发送0值给chan
	serverCh := make(chan struct{})
	go func() {
		if err := s.Serve(ln); err != nil {
			t.Errorf("unexpected error: %v", err)
		}
		close(serverCh)
	}()

	concurrency := 20
	clientCh := make(chan struct{}, concurrency)
	for i := 0; i < concurrency; i++ {
		go func() {
			// Dial creates new client<->server connection.
			// Just like a real Dial it only returns once the server has accepted the connection.
			// It is safe calling Dial from concurrently running goroutines.
			conn, err := ln.Dial()
			if err != nil {
				t.Errorf("unexpected error: %v", err)
			}
			if _, err = conn.Write([]byte("GET / HTTP/1.1\r\nHost: google.com\r\n\r\n")); err != nil {
				t.Errorf("unexpected error: %v", err)
			}
			br := bufio.NewReader(conn)
			verifyResponse(t, br, StatusOK, "aaa/bbb", "real response")
			clientCh <- struct{}{}
		}()
	}

	for i := 0; i < concurrency; i++ {
		select {
		case <-clientCh: // 客户端连接子协程执行完成
		case <-time.After(time.Second): // 超时
			t.Fatal("timeout")
		}
	}

	if err := ln.Close(); err != nil {
		t.Fatalf("unexpected error: %v", err)
	}

	select {
	case <-serverCh: // s.Serve(ln)先执行完,则不做处理,
	case <-time.After(time.Second):
		t.Fatal("timeout")
	}
}

func TestTimeoutHandlerTimeout(t *testing.T) {
	t.Parallel()

	ln := fasthttputil.NewInmemoryListener()
	readyCh := make(chan struct{})
	doneCh := make(chan struct{}) // 无缓冲,只要发送侧ready,接收侧才ready
	h := func(ctx *RequestCtx) {
		ctx.Success("aaa/bbb", []byte("real response"))
		// 默认耗时20ms仍未完成的操作,测试做完,主协程才close
		// 服务端协程卡在这里期间,返回超时
		fmt.Println("handler before readyCh")
		<-readyCh
		fmt.Println("handler after readyCh")
		doneCh <- struct{}{}
	}
	s := &Server{
		Handler: TimeoutHandler(h, 20*time.Millisecond, "timeout!!!"),
	}
	serverCh := make(chan struct{})
	go func() {
		// 设置好服务端监听
		if err := s.Serve(ln); err != nil {
			t.Errorf("unexpected error: %v", err)
		}
		close(serverCh)
	}()

	// 客户端执行
	concurrency := 20
	clientCh := make(chan struct{}, concurrency)
	for i := 0; i < concurrency; i++ {
		go func() {
			conn, err := ln.Dial()
			if err != nil {
				t.Errorf("unexpected error: %v", err)
			}
			if _, err = conn.Write([]byte("GET / HTTP/1.1\r\nHost: google.com\r\n\r\n")); err != nil {
				t.Errorf("unexpected error: %v", err)
			}
			br := bufio.NewReader(conn)
			// 验证response
			verifyResponse(t, br, StatusRequestTimeout, string(defaultContentType), "timeout!!!")
			clientCh <- struct{}{}
		}()
	}

	for i := 0; i < concurrency; i++ {
		select {
		case <-clientCh: // 客户端都执行完
			fmt.Println("main clientCh finish")
		case <-time.After(time.Second):
			t.Fatal("timeout")
		}
	}

	close(readyCh) // 一次close惊醒了全部的卡住的handler,牛哇
	for i := 0; i < concurrency; i++ {
		select {
		case <-doneCh: // 服务端handler都执行完,打印20遍,监听到20个消息
			fmt.Println("main handler finish")
		case <-time.After(time.Second):
			t.Fatal("timeout")
		}
	}

	// 关闭监听
	if err := ln.Close(); err != nil {
		t.Fatalf("unexpected error: %v", err)
	}

	// 服务端执行结束,预期不超过1秒
	select {
	case <-serverCh:
	case <-time.After(time.Second):
		t.Fatal("timeout")
	}
}

func verifyResponse(t *testing.T, r *bufio.Reader, expectedStatusCode int, expectedContentType, expectedBody string) *Response {
	var resp Response
	if err := resp.Read(r); err != nil {
		t.Fatalf("Unexpected error when parsing response: %v", err)
	}

	if !bytes.Equal(resp.Body(), []byte(expectedBody)) {
		t.Fatalf("Unexpected body %q. Expected %q", resp.Body(), []byte(expectedBody))
	}
	verifyResponseHeader(t, &resp.Header, expectedStatusCode, len(resp.Body()), expectedContentType, "")
	return &resp
}

func verifyResponseHeader(t *testing.T, h *ResponseHeader, expectedStatusCode, expectedContentLength int, expectedContentType, expectedContentEncoding string) {
	if h.StatusCode() != expectedStatusCode {
		t.Fatalf("Unexpected status code %d. Expected %d", h.StatusCode(), expectedStatusCode)
	}
	if h.ContentLength() != expectedContentLength {
		t.Fatalf("Unexpected content length %d. Expected %d", h.ContentLength(), expectedContentLength)
	}
	if string(h.ContentType()) != expectedContentType {
		t.Fatalf("Unexpected content type %q. Expected %q", h.ContentType(), expectedContentType)
	}
	if string(h.ContentEncoding()) != expectedContentEncoding {
		t.Fatalf("Unexpected content encoding %q. Expected %q", h.ContentEncoding(), expectedContentEncoding)
	}
}

TCP_DEFER_ACCEPT

TCP 套接字选项,用于告诉内核直到数据已经准备好被读取后,才接受一个连接。

这样可以避免一些无用的连接唤醒,只有当真的有数据来到时,应用程序才会被唤醒处理连接。

这个选项主要用于在高并发、低延迟的环境中,减少不必要的系统负载。

在一个 TCP 连接建立之后,如果没有立即收到数据,那么这个连接就可能是一个空连接或者是一个攻击浪费系统资源

通过设置 TCP_DEFER_ACCEPT,内核可以筛选掉这部分无效的连接,提高系统性能。

have positive impact on performance

TCP_FASTOPEN - TFO

在TCP层上的的优化手段,旨在减少网络应用在建立TCP连接时的延迟。它的主要原理是通过允许数据在初始的SYN段中发送来减少传统三次握手建立TCP连接所需的往返次数。这样,就可以消除一个完整的往返时延,从而改善网络的性能。

TFO在2011年被引入Linux内核,并已被许多后续的UNIX和类UNIX系统以及Windows Server 2018采用。

Google的Chrome浏览器和许多Google服务器也使用了这一技术。

虽然TFO能够提高性能,但也可能导致一些安全问题

例如防火墙的规避和抵御SYN洪泛攻击的困难等。

因此,任何决定启用TFO的系统都必须采取相应的安全措施。

通过 RequestCtx.Hijack 轻易支持 HTTP 的 Connection: Upgrade

"Connection: Upgrade"是HTTP协议中的一个头信息,用于从当前的协议切换到其他协议

例如,从HTTP切换到Websocket

RequestCtx.Hijack是一种控制方案,允许接管HTTP连接并发送自定义字节,常用在实现长连接协议(如WebSocket)上

hijack示例

package main

import (
	"fmt"
	"log"
	"net"

	"github.com/valyala/fasthttp"
)

func main1() {
	// hijackHandler is called on hijacked connection.
	hijackHandler := func(c net.Conn) {
		fmt.Fprintf(c, "This message is sent over a hijacked connection to the client %s\n", c.RemoteAddr())
		fmt.Fprintf(c, "Send me something and I'll echo it to you\n")
		var buf [1]byte
		for {
			if _, err := c.Read(buf[:]); err != nil {
				log.Printf("error when reading from hijacked connection: %v", err)
				return
			}
			fmt.Fprintf(c, "You sent me %q. Waiting for new data\n", buf[:])
		}
	}

	// requestHandler is called for each incoming request.
	requestHandler := func(ctx *fasthttp.RequestCtx) {
		path := ctx.Path()
		switch {
		case string(path) == "/hijack":
			// Note that the connection is hijacked only after
			// returning from requestHandler and sending http response.
			ctx.Hijack(hijackHandler)

			// The connection will be hijacked after sending this response.
			fmt.Fprintf(ctx, "Hijacked the connection!")
		case string(path) == "/":
			fmt.Fprintf(ctx, "Root directory requested")
		default:
			fmt.Fprintf(ctx, "Requested path is %q", path)
		}
	}

	if err := fasthttp.ListenAndServe(":8081", requestHandler); err != nil {
		log.Fatalf("error in ListenAndServe: %v", err)
	}
}

附录

go的一个类如何设置禁止拷贝

type noCopy struct{}

func (*noCopy) Lock()   {}
func (*noCopy) Unlock() {}

type Server struct {
	// It is forbidden copying Server instances. Create new Server instances
	// instead.
	noCopy noCopy // 私有类型和其私有方法
}

牛逼的代码连减1都这么清晰脱俗

var concurrency uint32
concurrency = 2
atomic.AddUint32(&concurrency, ^uint32(0))
fmt.Println(concurrency) // 1
concurrency = 200
atomic.AddUint32(&concurrency, ^uint32(0))
fmt.Println(concurrency) // 199
concurrency = 0
atomic.AddUint32(&concurrency, ^uint32(0))
fmt.Println(concurrency) // 4294967295

计算机世界中,无符号的三位相加:

011 + 111 = 010 => 3-1=2

到此这篇关于Golang中fasthttp的使用详解的文章就介绍到这了,更多相关Go fasthttp内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 浅析Go语言版本的forgery

    浅析Go语言版本的forgery

    使用过Python语言的朋友们可能使用过 forgery_py ,它是一个伪造数据的工具。这篇文章主要介绍了Go语言版本的forgery,需要的朋友可以参考下
    2018-08-08
  • 代码之美:探索Go语言断行规则的奥秘

    代码之美:探索Go语言断行规则的奥秘

    Go语言是一门以简洁、清晰和高效著称的编程语言,而断行规则是其代码风格的重要组成部分,通过深入研究Go语言的断行规则,我们可以更好地理解和编写优雅的代码,本文将从语法规范、代码风格和最佳实践等方面进行探讨,帮助读者更好地理解和应用Go语言的断行规则
    2023-10-10
  • golang数组-----寻找数组中缺失的整数方法

    golang数组-----寻找数组中缺失的整数方法

    这篇文章主要介绍了golang数组-----寻找数组中缺失的整数方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2020-12-12
  • Golang与其他语言不同的九个特性

    Golang与其他语言不同的九个特性

    近来关于对Golang的讨论有很多,七牛的几个大牛们也断定Go语言在未来将会快速发展,并且很可能会取代Java成为互联网时代最受欢迎的编程语言。本文将带你了解它不同于其他语言的九个特性
    2021-09-09
  • Golang反射获取结构体的值和修改值的代码示例

    Golang反射获取结构体的值和修改值的代码示例

    这篇文章主要给大家介绍了golang反射获取结构体的值和修改值的代码示例及演示效果,对我们的学习或工作有一定的帮助,感兴趣的同学可以参考阅读本文
    2023-08-08
  • 如何使用Go语言实现远程执行命令

    如何使用Go语言实现远程执行命令

    远程执行命令最常用的方法就是利用SSH协议,将命令发送到远程机器上执行,并获取返回结果。本文将介绍如何使用Go语言实现远程执行命令。下面一起来看看。
    2016-08-08
  • 使用golang实现一个MapReduce的示例代码

    使用golang实现一个MapReduce的示例代码

    这篇文章主要给大家介绍了关于如何使用golang实现一个MapReduce,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-09-09
  • 使用Go语言实现远程传输文件

    使用Go语言实现远程传输文件

    本文主要介绍如何利用Go语言实现远程传输文件的功能,有需要的小伙伴们可以参考学习。下面跟着小编一起来学习学习。
    2016-08-08
  • Go语言实现基于websocket浏览器通知功能

    Go语言实现基于websocket浏览器通知功能

    这篇文章主要介绍了Go语言实现基于websocket浏览器通知功能,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-07-07
  • go语言工程结构

    go语言工程结构

    这篇文章主要简单介绍了go语言工程结构,对于我们学习go语言很有帮助,需要的朋友可以参考下
    2015-01-01

最新评论