我们看下pool包,利用有缓冲的通道来实现资源池,来管理任意数量的goroutine之间共享和独立使用的资源,在需要共享一组静态资源的情况下非常有用,也就是手动实现资源池
首先是我们声明的结构
type Pool struct {
m sync.Mutex
resources chan io.Closer
factory func() (io.Closer, error)
closed bool
}
Pool声明了4个字段,每个字段用来辅助以goroutine安全的方式来管理资源池,sync.Mutex用来加锁解锁,保证线程安全,
然后是一个io.Closer接口的类型通道,保存共享的资源
然后是一个factory字段,创建新的资源
最后是bool类型的字段表明是否被关闭
接下来是创建函数
// 创建上面Pool对象的函数,分配一个新资源的函数,并规定池的大小
func New(fn func() (io.Closer, error), size uint) (*Pool, error) {
//数值不对直接抛出异常
if size <= 0 {
return nil, errors.New(“Size value too small.”)
}
return &Pool{
factory: fn,
resources: make(chan io.Closer, size),
}, nil
}
接收两个参数,一个数值, 一个函数,函数是一个工厂函数,创建池化资源的值,第二个参数的Size表示为保存资源而创建的有缓冲的通道的缓冲区大小
在创建并初始化Pool类型的值之后,我们看下Acquire方法
// 从池中获取一个资源
func (p *Pool) Acquire() (io.Closer, error) {
select {
// 检查有空闲的资源
case r, ok := <-p.resources:
log.Println(“Acquire:”, “Shared Resource”)
if !ok {
return nil, ErrPoolClosed
}
return r, nil
// 如果没有空闲资源,就提供一个新的资源,可以选择直接返回nil
default:
log.Println(“Acquire:”, “New Resource”)
return p.factory()
}
}
在利用完成资源之后,我们需要将这个资源放回资源池之后,于是需要一个Release的方法,我们理解一下Release方法背后的机制
// 放回池中
func (p *Pool) Release(r io.Closer) {
// 确保编程安全
p.m.Lock()
defer p.m.Unlock()
// 池子已经被销毁的话,就销毁资源
if p.closed {
r.Close()
return
}
select {
// 放回池中
case p.resources <- r:
log.Println(“Release:”, “In Queue”)
// 队列已满,关闭资源,打印信息
default:
log.Println(“Release:”, “Closing”)
r.Close()
}
}
我们利用加锁来保证线程安全,然后我们读取close变量,来确定是否可以存入,如果close标志为true,则可以放进去
然后如果已经关闭了,直接销毁io并返回
最后就是关闭Pool的方法
// Close会让Pool对象关闭
func (p *Pool) Close() {
// 加锁保证线程安全
p.m.Lock()
defer p.m.Unlock()
// 如果已经关闭,直接返回
if p.closed {
return
}
// 设置关闭标志位
p.closed = true
// 清空通道里的资源,关闭通道
close(p.resources)
// 迭代关闭所有io资源
for r := range p.resources {
r.Close()
}
}
整体的代码基本如上,我们来看下main函数的执行流程
// This sample program demonstrates how to use the pool package
// to share a simulated set of database connections.
package main
import (
“io”
“log”
“math/rand”
“sync”
“sync/atomic”
“time”
“../../pool”
)
const (
maxGoroutines = 25 // the number of routines to use.
pooledResources = 2 // number of resources in the pool
)
// dbConnection simulates a resource to share.
type dbConnection struct {
ID int32
}
// Close implements the io.Closer interface so dbConnection
// can be managed by the pool. Close performs any resource
// release management.
func (dbConn *dbConnection) Close() error {
log.Println(“Close: Connection”, dbConn.ID)
return nil
}
// idCounter provides support for giving each connection a unique id.
var idCounter int32
// createConnection is a factory method that will be called by
// the pool when a new connection is needed.
func createConnection() (io.Closer, error) {
id := atomic.AddInt32(&idCounter, 1)
log.Println(“Create: New Connection”, id)
return &dbConnection{id}, nil
}
// main is the entry point for all Go programs.
func main() {
var wg sync.WaitGroup
wg.Add(maxGoroutines)
// Create the pool to manage our connections.
p, err := pool.New(createConnection, pooledResources)
if err != nil {
log.Println(err)
}
// Perform queries using connections from the pool.
for query := 0; query < maxGoroutines; query++ {
// Each goroutine needs its own copy of the query
// value else they will all be sharing the same query
// variable.
go func(q int) {
performQueries(q, p)
wg.Done()
}(query)
}
// Wait for the goroutines to finish.
wg.Wait()
// Close the pool.
log.Println(“Shutdown Program.”)
p.Close()
}
// performQueries tests the resource pool of connections.
func performQueries(query int, p *pool.Pool) {
// Acquire a connection from the pool.
conn, err := p.Acquire()
if err != nil {
log.Println(err)
return
}
// Release the connection back to the pool.
defer p.Release(conn)
// Wait to simulate a query response.
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
log.Printf(“Query: QID[%d] CID[%d]\n”, query, conn.(*dbConnection).ID)
}
我们首先声明了两个常量maxGoroutines和pooledResoure,来设置goroutine数量和要使用的资源数
然后设置一个实现了io.Close的结构,实现的代码人如下
type dbConnection struct {
ID int32
}
//实现了io.Closer接口,方便管理和实验
func (dbConn *dbConnection) Close() error {
log.Println(“Close: Connection”, dbConn.ID)
return nil
}
接下来创建这个结构的工厂函数,代码如下
// 创建的工厂函数
// 需要新的连接的时候,资源池会调用这个函数
func createConnection() (io.Closer, error) {
id := atomic.AddInt32(&idCounter, 1)
log.Println(“Create: New Connection”, id)
return &dbConnection{id}, nil
}
里面会创建一个唯一标识,我们利用这个工厂函数,来使用pool包
这个函数返回一个指向Pool值的指针,检查可能出现的错误,我们现在有了一个pool的实例,可以使用对应的代码了
//使用池中资源
for query := 0; query < maxGoroutines; query++ {
//开启多个go
go func(q int) {
//查询值的副本,取出资源
performQueries(q, p)
wg.Done()
}(query)
}
for循环创建要使用池的goroutine,然后调用performQueries函数,传入为一个Id值,以及一个Pool指针,然后main函数等待执行完成,并关闭pool
然后是PerformQueries函数,使用池的Acquire和Release方法
// 测试Pool相关代码
func performQueries(query int, p *pool.Pool) {
// 获得一个连接
conn, err := p.Acquire()
if err != nil {
log.Println(err)
return
}
// 释放会池中
defer p.Release(conn)
// 模拟工作
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
log.Printf(“Query: QID[%d] CID[%d]\n”, query, conn.(*dbConnection).ID)
}
整体的代码流程如上所示