Go高级特性探究之处理1分钟百万请求详解

 更新时间:2023年06月19日 08:29:38   作者:Goland猫  
对于大型的互联网应用程序,如电商平台、社交网络、金融交易平台等,每秒钟都会收到大量的请求,那么Go是如何处理这些百万请求的呢,下面就来和大家详细讲讲

对于大型的互联网应用程序,如电商平台、社交网络、金融交易平台等,每秒钟都会收到大量的请求。在这些应用程序中,需要使用高效的技术来应对高并发的请求,尤其是在短时间内处理大量的请求,如1分钟百万请求。

同时,为了降低用户的使用门槛和提升用户体验,前端需要实现参数的无感知传递。这样用户在使用时,无需担心参数传递的问题,能够轻松地享受应用程序的服务。

在处理1分钟百万请求时,需要使用高效的技术和算法,以提高请求的响应速度和处理能力。Go语言以其高效性和并发性而闻名,因此成为处理高并发请求的优秀选择。Go中有多种模式可供选择,如基于goroutine和channel的并发模型、使用池技术的协程模型等,以便根据具体应用的需要来选择适合的技术模式。

本文代码参考搬至

http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/

W1

W1 结构体类型,它有五个成员:

  • WgSend 用于等待任务发送的 goroutine 完成。
  • Wg 用于等待任务处理的 goroutine 完成。
  • MaxNum 表示 goroutine 池的大小。
  • Ch 是一个字符串类型的通道,用于传递任务。
  • DispatchStop 是一个空结构体类型的通道,用于停止任务分发。
type W1 struct {
	WgSend       *sync.WaitGroup
	Wg           *sync.WaitGroup
	MaxNum       int
	Ch           chan string
	DispatchStop chan struct{}
}

接下来是 Dispatch 方法,它将任务发送到通道 Ch 中。它通过 for 循环来发送 10 倍于 MaxNum 的任务,每个任务都是一个 goroutine。defer 语句用于在任务完成时减少 WgSend 的计数。select 语句用于在任务分发被中止时退出任务发送。

Dispatch

func (w *W1) Dispatch(job string) {
	w.WgSend.Add(10 * w.MaxNum)
	for i := 0; i < 10*w.MaxNum; i++ {
		go func(i int) {
			defer w.WgSend.Done()
			select {
			case w.Ch <- fmt.Sprintf("%d", i):
				return
			case <-w.DispatchStop:
				fmt.Println("退出发送 job: ", fmt.Sprintf("%d", i))
				return
			}
		}(i)
	}
}

StartPool

然后是 StartPool 方法,它创建了一个 goroutine 池来处理从通道 Ch 中读取到的任务。

如果通道 Ch 还没有被创建,那么它将被创建。如果计数器 WgSend 还没有被创建,那么它也将被创建。如果计数器 Wg 还没有被创建,那么它也将被创建。

如果通道 DispatchStop 还没有被创建,那么它也将被创建。

for 循环用于创建 MaxNum 个 goroutine 来处理从通道中读取到的任务。defer 语句用于在任务完成时减少 Wg 的计数。

func (w *W1) StartPool() {
	if w.Ch == nil {
		w.Ch = make(chan string, w.MaxNum)
	}
	if w.WgSend == nil {
		w.WgSend = &sync.WaitGroup{}
	}
	if w.Wg == nil {
		w.Wg = &sync.WaitGroup{}
	}
	if w.DispatchStop == nil {
		w.DispatchStop = make(chan struct{})
	}
	w.Wg.Add(w.MaxNum)
	for i := 0; i < w.MaxNum; i++ {
		go func() {
			defer w.Wg.Done()
			for v := range w.Ch {
				fmt.Printf("完成工作: %s \n", v)
			}
		}()
	}
}

Stop

最后是 Stop 方法,它停止任务分发并等待所有任务完成。

它关闭了通道 DispatchStop,等待 WgSend 中的任务发送 goroutine 完成,然后关闭通道 Ch,等待 Wg 中的任务处理 goroutine 完成。

func (w *W1) Stop() {
	close(w.DispatchStop)
	w.WgSend.Wait()
	close(w.Ch)
	w.Wg.Wait()
}

W2

SubWorker

type SubWorker struct {
	JobChan chan string
}

子协程,它有一个 JobChan,用于接收任务。

Run:SubWorker 的方法,用于启动一个子协程,从 JobChan 中读取任务并执行。

func (sw *SubWorker) Run(wg *sync.WaitGroup, poolCh chan chan string, quitCh chan struct{}) {
	if sw.JobChan == nil {
		sw.JobChan = make(chan string)
	}
	wg.Add(1)
	go func() {
		defer wg.Done()
		for {
			poolCh <- sw.JobChan
			select {
			case res := <-sw.JobChan:
				fmt.Printf("完成工作: %s \n", res)
			case <-quitCh:
				fmt.Printf("消费者结束...... \n")
				return
			}
		}
	}()
}

W2

type W2 struct {
	SubWorkers []SubWorker
	Wg         *sync.WaitGroup
	MaxNum     int
	ChPool     chan chan string
	QuitChan   chan struct{}
}

Dispatch

Dispatch:W2 的方法,用于从 ChPool 中获取 TaskChan,将任务发送给一个 SubWorker 执行。

func (w *W2) Dispatch(job string) {
	jobChan := <-w.ChPool
	select {
	case jobChan <- job:
		fmt.Printf("发送任务 : %s 完成 \n", job)
		return
	case <-w.QuitChan:
		fmt.Printf("发送者(%s)结束 \n", job)
		return
	}
}

StartPool

StartPool:W2 的方法,用于初始化协程池,启动所有子协程并把 TaskChan 存储在 ChPool 中。

func (w *W2) StartPool() {
	if w.ChPool == nil {
		w.ChPool = make(chan chan string, w.MaxNum)
	}
	if w.SubWorkers == nil {
		w.SubWorkers = make([]SubWorker, w.MaxNum)
	}
	if w.Wg == nil {
		w.Wg = &sync.WaitGroup{}
	}
	for i := 0; i < len(w.SubWorkers); i++ {
		w.SubWorkers[i].Run(w.Wg, w.ChPool, w.QuitChan)
	}
}

Stop

Stop:W2 的方法,用于停止协程的工作,并等待所有协程结束。

func (w *W2) Stop() {
	close(w.QuitChan)
	w.Wg.Wait()
	close(w.ChPool)
}

DealW2 函数则是整个协程池的入口,它通过 NewWorker 方法创建一个 W2 实例,然后调用 StartPool 启动协程池,并通过 Dispatch 发送任务,最后调用 Stop 停止协程池。

func DealW2(max int) {
	w := NewWorker(w2, max)
	w.StartPool()
	for i := 0; i < 10*max; i++ {
		go w.Dispatch(fmt.Sprintf("%d", i))
	}
	w.Stop()
}

个人见解

看到这里对于w2我已经有点迷糊了,还能传递w.Wg, w.ChPool, w.QuitChan?

原来是golang里如果方法传递的不是地址,那么就会做一个拷贝,所以这里调用的wg根本就不是一个对象。

传递的地方传递地址就可以了,如果不传递地址,将会出现死锁

go doSomething(i, &wg, ch)
func doSomething(index int, wg *sync.WaitGroup, ch chan int) {

w1也有一个比较大的问题。在处理请求时,每个 Goroutine 都会占用一定的系统资源,如果请求量过大,会造成 Goroutine 数量的剧增消耗过多系统资源,程序可能会崩溃

能不能用上我之前写的协程池 探究 Go 的高级特性之 【Go 协程池】

探究原文

在这段代码中,poolCh代表工作者池,sw.JobChan代表工作者的工作通道。当一个工作者完成了工作后,它会将工作结果发送到sw.JobChan,此时可以通过case res := <-sw.JobChan:来接收该工作的结果。

在这个代码块中,还需要处理一个退出信号quitCh。因此,第二个case <-quitCh:用于检测是否接收到了退出信号。如果接收到了退出信号,程序将打印出消息并结束。

需要注意的是,这两个case语句是互斥的,只有当工作者完成工作或收到退出信号时,才会进入其中一个语句。因此,这个循环可以保证在工作者完成工作或收到退出信号时退出。

需要读取两次sw.JobChan的原因是:第一次读取用于将工作者的工作通道放回工作者池中,这样其他工作者就可以使用该通道。第二次读取用于接收工作者的工作结果或退出信号。因此,这两次读取是为了确保能够在正确的时刻将工作者的工作通道放回工作者池中并正确地处理工作结果或退出信号。

根据w2的特点 我自己写了一个w2

import (
   "fmt"
   "sync"
)
type SubWorkerNew struct {
   JobChan chan string
}
type W2New struct {
   SubWorkers []SubWorkerNew
   Wg         *sync.WaitGroup
   MaxNum     int
   ChPool     chan chan string
   QuitChan   chan struct{}
}
func NewW2(maxNum int) *W2New {
   subWorkers := make([]SubWorkerNew, maxNum)
   for i := 0; i < maxNum; i++ {
      subWorkers[i] = SubWorkerNew{JobChan: make(chan string)}
   }
   pool := make(chan chan string, maxNum)
   for i := 0; i < maxNum; i++ {
      pool <- subWorkers[i].JobChan
   }
   return &W2New{
      SubWorkers: subWorkers,
      Wg:         &sync.WaitGroup{},
      MaxNum:     maxNum,
      ChPool:     pool,
      QuitChan:   make(chan struct{}),
   }
}
func (w *W2New) Dispatch(job string) {
   select {
   case jobChannel := <-w.ChPool:
      jobChannel <- job
   default:
      fmt.Println("All workers busy")
   }
}
func (w *W2New) StartPool() {
   for i := 0; i < w.MaxNum; i++ {
      go func(subWorker *SubWorkerNew) {
         w.Wg.Add(1)
         defer w.Wg.Done()
         for {
            select {
            case job := <-subWorker.JobChan:
               fmt.Println("processing ", job)
            case <-w.QuitChan:
               return
            }
         }
      }(&w.SubWorkers[i])
   }
}
func (w *W2New) Stop() {
   close(w.QuitChan)
   w.Wg.Wait()
   close(w.ChPool)
   for _, subWorker := range w.SubWorkers {
      close(subWorker.JobChan)
   }
}
func main() {
   w := NewW2(5)
   w.StartPool()
   for i := 0; i < 20; i++ {
      w.Dispatch(fmt.Sprintf("job %d", i))
   }
   w.Stop()
}

但是有几个点需要注意

1.没有考虑JobChan通道的缓冲区大小,如果有大量任务被并发分配,容易导致内存占用过高;

2.每个线程都会执行无限循环,此时线程退出的条件是接收到QuitChan通道的信号,可能导致线程的阻塞等问题;

3.Dispatch函数的默认情况下只会输出"All workers busy",而不是阻塞,这意味着当所有线程都处于忙碌状态时,任务会丢失

4.线程池启动后无法动态扩展或缩小。

优化

这个优化版本改了很多次。有一些需要注意的点是,不然会一直死锁

1.使用sync.WaitGroup来确保线程池中所有线程都能够启动并运行;

2.在Stop函数中,先向SubWorker的JobChan中发送一个关闭信号,再等待所有SubWorker线程退出;

3.在Dispatch函数中,将默认情况下的输出改为阻塞等待可用通道;

w2new

package handle_million_requests
import (
	"fmt"
	"sync"
	"time"
)
type SubWorkerNew struct {
	Id      int
	JobChan chan string
}
type W2New struct {
	SubWorkers []SubWorkerNew
	MaxNum     int
	ChPool     chan chan string
	QuitChan   chan struct{}
	Wg         *sync.WaitGroup
}
func NewW2(maxNum int) *W2New {
	chPool := make(chan chan string, maxNum)
	subWorkers := make([]SubWorkerNew, maxNum)
	for i := 0; i < maxNum; i++ {
		subWorkers[i] = SubWorkerNew{Id: i, JobChan: make(chan string)}
		chPool <- subWorkers[i].JobChan
	}
	wg := new(sync.WaitGroup)
	wg.Add(maxNum)
	return &W2New{
		MaxNum:     maxNum,
		SubWorkers: subWorkers,
		ChPool:     chPool,
		QuitChan:   make(chan struct{}),
		Wg:         wg,
	}
}
func (w *W2New) StartPool() {
	for i := 0; i < w.MaxNum; i++ {
		go func(wg *sync.WaitGroup, subWorker *SubWorkerNew) {
			defer wg.Done()
			for {
				select {
				case job := <-subWorker.JobChan:
					fmt.Printf("SubWorker %d processing job %s\n", subWorker.Id, job)
					time.Sleep(time.Second) // 模拟任务处理过程
				case <-w.QuitChan:
					return
				}
			}
		}(w.Wg, &w.SubWorkers[i])
	}
}
func (w *W2New) Stop() {
	close(w.QuitChan)
	for i := 0; i < w.MaxNum; i++ {
		close(w.SubWorkers[i].JobChan)
	}
	w.Wg.Wait()
}
func (w *W2New) Dispatch(job string) {
	select {
	case jobChan := <-w.ChPool:
		jobChan <- job
	default:
		fmt.Println("All workers busy")
	}
}
func (w *W2New) AddWorker() {
	newWorker := SubWorkerNew{Id: w.MaxNum, JobChan: make(chan string)}
	w.SubWorkers = append(w.SubWorkers, newWorker)
	w.ChPool <- newWorker.JobChan
	w.MaxNum++
	w.Wg.Add(1)
	go func(subWorker *SubWorkerNew) {
		defer w.Wg.Done()
		for {
			select {
			case job := <-subWorker.JobChan:
				fmt.Printf("SubWorker %d processing job %s\n", subWorker.Id, job)
				time.Sleep(time.Second) // 模拟任务处理过程
			case <-w.QuitChan:
				return
			}
		}
	}(&newWorker)
}
func (w *W2New) RemoveWorker() {
	if w.MaxNum > 1 {
		worker := w.SubWorkers[w.MaxNum-1]
		close(worker.JobChan)
		w.MaxNum--
		w.SubWorkers = w.SubWorkers[:w.MaxNum]
	}
}

AddWorkerRemoveWorker,用于动态扩展/缩小线程池。

  • AddWorker函数中,我们首先将MaxNum增加了1,然后创建一个新的SubWorkerNew结构体,将其添加到SubWorkers中,并将其JobChan通道添加到ChPool通道中。最后,我们创建一个新的协程来处理新添加的SubWorkerNew并让它进入无限循环,等待接收任务。
  • RemoveWorker函数中,我们首先将MaxNum减少1,然后获取最后一个SubWorkerNew结构体,将它的JobChan通道发送到ChPool通道中,并从其通道中读取任何待处理的任务,最后创建一个新的协程来处理SubWorkerNew,继续处理任务。

测试用例

func TestW2New(t *testing.T) {  
    pool := NewW2(3)  
    pool.StartPool()  
    pool.Dispatch("task 1")  
    pool.Dispatch("task 2")  
    pool.Dispatch("task 3")  
    pool.AddWorker()  
    pool.AddWorker()  
    pool.RemoveWorker()  
    pool.Stop()  
}

当Dispatch函数向ChPool通道获取可用通道时,会从通道中取出一个SubWorker的JobChan通道,并将任务发送到该通道中。而对于SubWorker来说,并没有进行任务的使用次数限制,所以它可以处理多个任务。

在这个例子中,当任务数量比SubWorker数量多时,一个SubWorker的JobChan通道会接收到多个任务,它们会在SubWorker的循环中按顺序依次处理,直到JobChan中没有未处理的任务为止。因此,如果任务数量特别大,可能会导致某些SubWorker的JobChan通道暂时处于未处理任务状态,而其他的SubWorker在执行任务。

在测试结果中,最后三行中出现了多个"SubWorker 0 processing job",说明SubWorker 0的JobChan通道接收了多个任务,并且在其循环中处理这些任务。下面的代码片段显示了这个过程:

SubWorker 0 的循环部分

for {
    select {
    case job := <-subWorker.JobChan:
        fmt.Printf("SubWorker %d processing job %s\n", subWorker.Id, job)
    case <-w.QuitChan:
        return
    }
}

到此这篇关于Go高级特性探究之处理1分钟百万请求详解的文章就介绍到这了,更多相关Go处理百万请求内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 使用Go实现在命令行输出好看的表格

    使用Go实现在命令行输出好看的表格

    这篇文章主要介绍了使用Go实现在命令行输出好看的表格方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-04-04
  • Go语言实现AzDG可逆加密算法实例

    Go语言实现AzDG可逆加密算法实例

    这篇文章主要介绍了Go语言实现AzDG可逆加密算法,实例分析了AzDG可逆加密算法的实现技巧,具有一定参考借鉴价值,需要的朋友可以参考下
    2015-02-02
  • Go语言驱动低代码应用引擎工具Yao开发管理系统

    Go语言驱动低代码应用引擎工具Yao开发管理系统

    这篇文章主要为大家介绍了Go语言驱动低代码应用引擎工具Yao开发管理系统使用详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-06-06
  • 使用Go实现一个百行聊天服务器的示例代码

    使用Go实现一个百行聊天服务器的示例代码

    前段时间, redis作者整了个c语言版本的聊天服务器,代码量拢共不过百行,于是, 心血来潮下, 我也整了个Go语言版本, 简单来说就是实现了一个聊天室的功能,文中通过代码示例给大家介绍的非常详细,需要的朋友可以参考下
    2023-12-12
  • Go语言单元测试基础从入门到放弃

    Go语言单元测试基础从入门到放弃

    这篇文章主要介绍了Go单元测试基础从入门到放弃为大家开启Go语言单元测试第一篇章,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-06-06
  • Go语言类型转换及问题探讨

    Go语言类型转换及问题探讨

    本文探讨了Go语言中的类型转换机制,特别是在使用atomic.Pointer和unsafe时可能引发的问题,通过深入分析Go语言的类型转换规则,如数值类型间转换、字符串与byte/rune切片转换、slice与数组转换,以及底层类型相同时的转换,文章最后给出了一些在Go中使用类型转换的建议
    2024-10-10
  • Go语言中获取IP地址的方法小结

    Go语言中获取IP地址的方法小结

    这篇文章主要为大家详细介绍了Go语言中获取IP地址的常用方法,文中的示例代码讲解详细,具有一定的学习价值,感兴趣的小伙伴可以跟随小编一起学习一下
    2023-12-12
  • Golang之模糊测试工具的使用

    Golang之模糊测试工具的使用

    本文主要介绍了Golang之模糊测试工具的使用,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-03-03
  • Golang Makefile示例深入讲解使用

    Golang Makefile示例深入讲解使用

    一次偶然的机会,在 github 上看到有人用 Makefile,就尝试了一下,发现真的非常合适,Makefile 本身就是用来描述依赖的,可读性非常好,而且与强大的 shell 结合在一起,基本可以实现任何想要的功能
    2023-01-01
  • Go语言sync.Pool对象池使用场景基本示例

    Go语言sync.Pool对象池使用场景基本示例

    这篇文章主要为大家介绍了Go语言sync.Pool对象池使用场景的基本示例,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-12-12

最新评论