使用无缓冲的通道来创建一个goroutine池,来执行并控制一组工作,使用无缓冲的通道要比指定一个有缓冲的通道要好,这种情况下,无法接受新的工作的时候,也能及时通过通道来通知调用者,避免被卡主或者丢失

我们先整体的看一下代码

package work

import “sync”

// 接口,实现了Task的才是Worker

type Worker interface {

Task()

}

// 整体结构,包含了Worker接口的通道和一个wg

type Pool struct {

work chan Worker

wg sync.WaitGroup

}

// New函数

func New(maxGoroutines int) *Pool {

p := Pool{

work: make(chan Worker),

}

//增加固定数量

p.wg.Add(maxGoroutines)

//完成固定额度的任务

for i := 0; i < maxGoroutines; i++ {

go func() {

//不断的获取

for w := range p.work {

//执行函数

w.Task()

}

p.wg.Done()

}()

}

return &p

}

// 提交工作到工作池

func (p *Pool) Run(w Worker) {

p.work <- w

}

// 直接强制停止,等待所有的goroutine停止工作

func (p *Pool) Shutdown() {

close(p.work)

p.wg.Wait()

}

首先是上面声明的名为Worker的接口和名为Pool的结构

// 接口,实现了Task的才是Worker

type Worker interface {

Task()

}

// 整体结构,包含了Worker接口的通道和一个wg

type Pool struct {

work chan Worker

wg sync.WaitGroup

}

Worker接口声明了一个名为Task的方法,声明了名为Pool的结构,内部包含两个字段,一个名为work的通道,一个名为wg的sync.WatiGroup类型

然后,是一个工厂函数

// New函数

func New(maxGoroutines int) *Pool {

p := Pool{

work: make(chan Worker),

}

//增加固定数量

p.wg.Add(maxGoroutines)

//完成固定额度的任务

for i := 0; i < maxGoroutines; i++ {

go func() {

//不断的获取

for w := range p.work {

//执行函数

w.Task()

}

p.wg.Done()

}()

}

return &p

}

最后返回了这个对象的指针,初始化了一个无缓冲的通道来代表work字段

然后利用一个异步方法,接收worker类型的对象,执行对应的Task方法

如果隧道关闭,for循环也会关闭,如果到达了指定的数量,也会进行关闭

然后就是提交工作的方法和关闭工作池的方法

分别是Run和shutdown

Run提交工作到工作池

// 提交工作到工作池

func (p *Pool) Run(w Worker) {

p.work <- w

}

以及shutdown方法关闭工作池

// 直接强制停止,等待所有的goroutine停止工作

func (p *Pool) Shutdown() {

close(p.work)

p.wg.Wait()

}

直接关闭工作池,并且等待Waitgroup的Done方法,然后,完成后才会返回

然后是main.go源代码中的,代码如下

// This sample program demonstrates how to use the work package

// to use a pool of goroutines to get work done.

package main

import (

“log”

“sync”

“time”

“../../work”

)

// names provides a set of names to display.

var names = []string{

“steve”,

“bob”,

“mary”,

“therese”,

“jason”,

}

// namePrinter provides special support for printing names.

type namePrinter struct {

name string

}

// Task implements the Worker interface.

func (m *namePrinter) Task() {

log.Println(m.name)

time.Sleep(time.Second)

}

// main is the entry point for all Go programs.

func main() {

// Create a work pool with 2 goroutines.

p := work.New(2)

var wg sync.WaitGroup

wg.Add(100 * len(names))

for i := 0; i < 100; i++ {

// Iterate over the slice of names.

for _, name := range names {

// Create a namePrinter and provide the

// specific name.

np := namePrinter{

name: name,

}

go func() {

// Submit the task to be worked on. When RunTask

// returns we know it is being handled.

p.Run(&np)

wg.Done()

}()

}

}

wg.Wait()

// Shutdown the work pool and wait for all existing work

// to be completed.

p.Shutdown()

}

代码中,我们声明了一个names的切片,初始化了5个名字,声明了一个名为namePrinter的类型结构,里面有一个name的String类型字段,对应的Task函数,只是打印这个name,等待一秒,直接退出

main函数内部.我们使用两个goroutine来创建工作池

这两个goroutine,不断的从通道中拉取Worker,在run中,只要有人接收worker,就会返回,导致main的waitgroup计数递减,终止goroutine

一旦所有的goroutine完成,main函数就会调用waitGroup的wait方法,等待所有创建的goroutine提交其工作,调用Shutdown()关闭工作池

本次我们小结一下,使用default分支的select语句,可以用来尝试向通道发送或者接受数据,而不会阻塞

发表评论

邮箱地址不会被公开。 必填项已用*标注