我们看下pool包,利用有缓冲的通道来实现资源池,来管理任意数量的goroutine之间共享和独立使用的资源,在需要共享一组静态资源的情况下非常有用,也就是手动实现资源池

首先是我们声明的结构

type Pool struct {

m sync.Mutex

resources chan io.Closer

factory func() (io.Closer, error)

closed bool

}

Pool声明了4个字段,每个字段用来辅助以goroutine安全的方式来管理资源池,sync.Mutex用来加锁解锁,保证线程安全,

然后是一个io.Closer接口的类型通道,保存共享的资源

然后是一个factory字段,创建新的资源

最后是bool类型的字段表明是否被关闭

接下来是创建函数

// 创建上面Pool对象的函数,分配一个新资源的函数,并规定池的大小

func New(fn func() (io.Closer, error), size uint) (*Pool, error) {

//数值不对直接抛出异常

if size <= 0 {

return nil, errors.New(“Size value too small.”)

}

 

return &Pool{

factory: fn,

resources: make(chan io.Closer, size),

}, nil

}

接收两个参数,一个数值, 一个函数,函数是一个工厂函数,创建池化资源的值,第二个参数的Size表示为保存资源而创建的有缓冲的通道的缓冲区大小

在创建并初始化Pool类型的值之后,我们看下Acquire方法

// 从池中获取一个资源

func (p *Pool) Acquire() (io.Closer, error) {

select {

// 检查有空闲的资源

case r, ok := <-p.resources:

log.Println(“Acquire:”, “Shared Resource”)

if !ok {

return nil, ErrPoolClosed

}

return r, nil

// 如果没有空闲资源,就提供一个新的资源,可以选择直接返回nil

default:

log.Println(“Acquire:”, “New Resource”)

return p.factory()

}

}

在利用完成资源之后,我们需要将这个资源放回资源池之后,于是需要一个Release的方法,我们理解一下Release方法背后的机制

// 放回池中

func (p *Pool) Release(r io.Closer) {

// 确保编程安全

p.m.Lock()

defer p.m.Unlock()

// 池子已经被销毁的话,就销毁资源

if p.closed {

r.Close()

return

}

select {

// 放回池中

case p.resources <- r:

log.Println(“Release:”, “In Queue”)

// 队列已满,关闭资源,打印信息

default:

log.Println(“Release:”, “Closing”)

r.Close()

}

}

我们利用加锁来保证线程安全,然后我们读取close变量,来确定是否可以存入,如果close标志为true,则可以放进去

然后如果已经关闭了,直接销毁io并返回

最后就是关闭Pool的方法

// Close会让Pool对象关闭

func (p *Pool) Close() {

// 加锁保证线程安全

p.m.Lock()

defer p.m.Unlock()

// 如果已经关闭,直接返回

if p.closed {

return

}

// 设置关闭标志位

p.closed = true

// 清空通道里的资源,关闭通道

close(p.resources)

// 迭代关闭所有io资源

for r := range p.resources {

r.Close()

}

}

整体的代码基本如上,我们来看下main函数的执行流程

// This sample program demonstrates how to use the pool package

// to share a simulated set of database connections.

package main

import (

“io”

“log”

“math/rand”

“sync”

“sync/atomic”

“time”

“../../pool”

)

const (

maxGoroutines = 25 // the number of routines to use.

pooledResources = 2 // number of resources in the pool

)

// dbConnection simulates a resource to share.

type dbConnection struct {

ID int32

}

// Close implements the io.Closer interface so dbConnection

// can be managed by the pool. Close performs any resource

// release management.

func (dbConn *dbConnection) Close() error {

log.Println(“Close: Connection”, dbConn.ID)

return nil

}

// idCounter provides support for giving each connection a unique id.

var idCounter int32

// createConnection is a factory method that will be called by

// the pool when a new connection is needed.

func createConnection() (io.Closer, error) {

id := atomic.AddInt32(&idCounter, 1)

log.Println(“Create: New Connection”, id)

return &dbConnection{id}, nil

}

// main is the entry point for all Go programs.

func main() {

var wg sync.WaitGroup

wg.Add(maxGoroutines)

// Create the pool to manage our connections.

p, err := pool.New(createConnection, pooledResources)

if err != nil {

log.Println(err)

}

// Perform queries using connections from the pool.

for query := 0; query < maxGoroutines; query++ {

// Each goroutine needs its own copy of the query

// value else they will all be sharing the same query

// variable.

go func(q int) {

performQueries(q, p)

wg.Done()

}(query)

}

// Wait for the goroutines to finish.

wg.Wait()

// Close the pool.

log.Println(“Shutdown Program.”)

p.Close()

}

// performQueries tests the resource pool of connections.

func performQueries(query int, p *pool.Pool) {

// Acquire a connection from the pool.

conn, err := p.Acquire()

if err != nil {

log.Println(err)

return

}

// Release the connection back to the pool.

defer p.Release(conn)

// Wait to simulate a query response.

time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)

log.Printf(“Query: QID[%d] CID[%d]\n”, query, conn.(*dbConnection).ID)

}

我们首先声明了两个常量maxGoroutines和pooledResoure,来设置goroutine数量和要使用的资源数

然后设置一个实现了io.Close的结构,实现的代码人如下

type dbConnection struct {

ID int32

}

//实现了io.Closer接口,方便管理和实验

func (dbConn *dbConnection) Close() error {

log.Println(“Close: Connection”, dbConn.ID)

return nil

}

接下来创建这个结构的工厂函数,代码如下

// 创建的工厂函数

// 需要新的连接的时候,资源池会调用这个函数

func createConnection() (io.Closer, error) {

id := atomic.AddInt32(&idCounter, 1)

log.Println(“Create: New Connection”, id)

return &dbConnection{id}, nil

}

里面会创建一个唯一标识,我们利用这个工厂函数,来使用pool包

这个函数返回一个指向Pool值的指针,检查可能出现的错误,我们现在有了一个pool的实例,可以使用对应的代码了

//使用池中资源

for query := 0; query < maxGoroutines; query++ {

//开启多个go

go func(q int) {

//查询值的副本,取出资源

performQueries(q, p)

wg.Done()

}(query)

}

for循环创建要使用池的goroutine,然后调用performQueries函数,传入为一个Id值,以及一个Pool指针,然后main函数等待执行完成,并关闭pool

然后是PerformQueries函数,使用池的Acquire和Release方法

// 测试Pool相关代码

func performQueries(query int, p *pool.Pool) {

// 获得一个连接

conn, err := p.Acquire()

if err != nil {

log.Println(err)

return

}

// 释放会池中

defer p.Release(conn)

// 模拟工作

time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)

log.Printf(“Query: QID[%d] CID[%d]\n”, query, conn.(*dbConnection).ID)

}

整体的代码流程如上所示

发表评论

邮箱地址不会被公开。 必填项已用*标注