使用无缓冲的通道来创建一个goroutine池,来执行并控制一组工作,使用无缓冲的通道要比指定一个有缓冲的通道要好,这种情况下,无法接受新的工作的时候,也能及时通过通道来通知调用者,避免被卡主或者丢失
我们先整体的看一下代码
package work
import “sync”
// 接口,实现了Task的才是Worker
type Worker interface {
Task()
}
// 整体结构,包含了Worker接口的通道和一个wg
type Pool struct {
work chan Worker
wg sync.WaitGroup
}
// New函数
func New(maxGoroutines int) *Pool {
p := Pool{
work: make(chan Worker),
}
//增加固定数量
p.wg.Add(maxGoroutines)
//完成固定额度的任务
for i := 0; i < maxGoroutines; i++ {
go func() {
//不断的获取
for w := range p.work {
//执行函数
w.Task()
}
p.wg.Done()
}()
}
return &p
}
// 提交工作到工作池
func (p *Pool) Run(w Worker) {
p.work <- w
}
// 直接强制停止,等待所有的goroutine停止工作
func (p *Pool) Shutdown() {
close(p.work)
p.wg.Wait()
}
首先是上面声明的名为Worker的接口和名为Pool的结构
// 接口,实现了Task的才是Worker
type Worker interface {
Task()
}
// 整体结构,包含了Worker接口的通道和一个wg
type Pool struct {
work chan Worker
wg sync.WaitGroup
}
Worker接口声明了一个名为Task的方法,声明了名为Pool的结构,内部包含两个字段,一个名为work的通道,一个名为wg的sync.WatiGroup类型
然后,是一个工厂函数
// New函数
func New(maxGoroutines int) *Pool {
p := Pool{
work: make(chan Worker),
}
//增加固定数量
p.wg.Add(maxGoroutines)
//完成固定额度的任务
for i := 0; i < maxGoroutines; i++ {
go func() {
//不断的获取
for w := range p.work {
//执行函数
w.Task()
}
p.wg.Done()
}()
}
return &p
}
最后返回了这个对象的指针,初始化了一个无缓冲的通道来代表work字段
然后利用一个异步方法,接收worker类型的对象,执行对应的Task方法
如果隧道关闭,for循环也会关闭,如果到达了指定的数量,也会进行关闭
然后就是提交工作的方法和关闭工作池的方法
分别是Run和shutdown
Run提交工作到工作池
// 提交工作到工作池
func (p *Pool) Run(w Worker) {
p.work <- w
}
以及shutdown方法关闭工作池
// 直接强制停止,等待所有的goroutine停止工作
func (p *Pool) Shutdown() {
close(p.work)
p.wg.Wait()
}
直接关闭工作池,并且等待Waitgroup的Done方法,然后,完成后才会返回
然后是main.go源代码中的,代码如下
// This sample program demonstrates how to use the work package
// to use a pool of goroutines to get work done.
package main
import (
“log”
“sync”
“time”
“../../work”
)
// names provides a set of names to display.
var names = []string{
“steve”,
“bob”,
“mary”,
“therese”,
“jason”,
}
// namePrinter provides special support for printing names.
type namePrinter struct {
name string
}
// Task implements the Worker interface.
func (m *namePrinter) Task() {
log.Println(m.name)
time.Sleep(time.Second)
}
// main is the entry point for all Go programs.
func main() {
// Create a work pool with 2 goroutines.
p := work.New(2)
var wg sync.WaitGroup
wg.Add(100 * len(names))
for i := 0; i < 100; i++ {
// Iterate over the slice of names.
for _, name := range names {
// Create a namePrinter and provide the
// specific name.
np := namePrinter{
name: name,
}
go func() {
// Submit the task to be worked on. When RunTask
// returns we know it is being handled.
p.Run(&np)
wg.Done()
}()
}
}
wg.Wait()
// Shutdown the work pool and wait for all existing work
// to be completed.
p.Shutdown()
}
代码中,我们声明了一个names的切片,初始化了5个名字,声明了一个名为namePrinter的类型结构,里面有一个name的String类型字段,对应的Task函数,只是打印这个name,等待一秒,直接退出
main函数内部.我们使用两个goroutine来创建工作池
这两个goroutine,不断的从通道中拉取Worker,在run中,只要有人接收worker,就会返回,导致main的waitgroup计数递减,终止goroutine
一旦所有的goroutine完成,main函数就会调用waitGroup的wait方法,等待所有创建的goroutine提交其工作,调用Shutdown()关闭工作池
本次我们小结一下,使用default分支的select语句,可以用来尝试向通道发送或者接受数据,而不会阻塞