- Article -

协程池的创建记录

分类于 后端开发 标签 协程 通道切片 发表于2023-02-22 20:00

作为一个TCP框架,肯定需要管理很多的用户连接,然后会有超级多的消息需要读和写,如果读到一个消息就要阻塞的发送肯定是不合理的,因此我们需要设计一堆消息队列,每一个消息队列我们放到一个协程里面去等待消耗。

//10个通道消息队列
workerPoolSize := 10
//注意,这里是初始化切片,并非初始化通道
worker := make([]chan Msg, workerPoolSize);

我们知道初始化通道使用make(chan Msg, 1),所以上面并不是初始化10个通道,得留意。

接着我们初始化每一个通道(消息队列),然后启动对应的10个协程去消耗这些消息。

maxWorkerTaskLen := 1024
for i:= 0; i < int(workerPoolSize); i++ {
		//一个worker被启动
		//给当前worker对应的任务队列开辟空间
		worker[i] = make(chan Msg, maxWorkerTaskLen)
		//启动当前Worker,阻塞的等待对应的任务队列是否有消息传递进来
		go StartOneWorker(i, worker[i])
	}

func StartOneWorker(workerID int, taskQueue chan Msg) {
	fmt.Println("Worker ID = ", workerID, " is started.")
	//不断的等待队列中的消息
	for {
		select {
			//有消息则取出队列的Request,并执行绑定的业务方法
			case msg := <-taskQueue:
      	...
				//得到消息写给用户
		}
	}
}

这样我们就有了10个消息队列,他们在不同的协程中消耗消息发送给用户,每个消息队列能放1024个消息,如果其它某个队列消息已经满了,还没有被消耗的情况,这时如果在往这个队列写入消息则会阻塞当前的协程。

我们看看在其它协程里怎么随机平均的给这些队列写消息。

//将消息交给TaskQueue,由worker进行处理
func SendMsgToTaskQueue(reqrequest Msg) {
   //根据ConnID来分配当前的连接应该由哪个worker负责处理
   //轮询的平均分配法则

   //得到需要处理此条连接的workerID
   workerID := request.GetConnection().GetConnID() % workerPoolSize
  
   worker[workerID] <- request
}

这里的request.GetConnection().GetConnID()可以理解为用户连接的id,比如第一个用户连接服务器是1,第二个是2,以此类推,这些用户的消息会放到某个消息队列里等待消耗回写给用户。