作为一个TCP框架,肯定需要管理很多的用户连接,然后会有超级多的消息需要读和写,如果读到一个消息就要阻塞的发送肯定是不合理的,因此我们需要设计一堆消息队列,每一个消息队列我们放到一个协程里面去等待消耗。
//10个通道消息队列
workerPoolSize := 10
//注意,这里是初始化切片,并非初始化通道
worker := make([]chan Msg, workerPoolSize);
我们知道初始化通道使用make(chan Msg, 1)
,所以上面并不是初始化10个通道,得留意。
接着我们初始化每一个通道(消息队列),然后启动对应的10个协程去消耗这些消息。
maxWorkerTaskLen := 1024
for i:= 0; i < int(workerPoolSize); i++ {
//一个worker被启动
//给当前worker对应的任务队列开辟空间
worker[i] = make(chan Msg, maxWorkerTaskLen)
//启动当前Worker,阻塞的等待对应的任务队列是否有消息传递进来
go StartOneWorker(i, worker[i])
}
func StartOneWorker(workerID int, taskQueue chan Msg) {
fmt.Println("Worker ID = ", workerID, " is started.")
//不断的等待队列中的消息
for {
select {
//有消息则取出队列的Request,并执行绑定的业务方法
case msg := <-taskQueue:
...
//得到消息写给用户
}
}
}
这样我们就有了10个消息队列,他们在不同的协程中消耗消息发送给用户,每个消息队列能放1024个消息,如果其它某个队列消息已经满了,还没有被消耗的情况,这时如果在往这个队列写入消息则会阻塞当前的协程。
我们看看在其它协程里怎么随机平均的给这些队列写消息。
//将消息交给TaskQueue,由worker进行处理
func SendMsgToTaskQueue(reqrequest Msg) {
//根据ConnID来分配当前的连接应该由哪个worker负责处理
//轮询的平均分配法则
//得到需要处理此条连接的workerID
workerID := request.GetConnection().GetConnID() % workerPoolSize
worker[workerID] <- request
}
这里的request.GetConnection().GetConnID()
可以理解为用户连接的id,比如第一个用户连接服务器是1,第二个是2,以此类推,这些用户的消息会放到某个消息队列里等待消耗回写给用户。