13. 线程的奇淫巧技

  1. 线程的基本使用

在python中的threading库可以可以创建线程并执行指定的对象

比如

import time

def countdown(n):

while n > 0:

print(‘T-minus’, n)

n -= 1

time.sleep(5)

# Create and launch a thread

from threading import Thread

t = Thread(target=countdown, args=(10,))

t.start()

上面我们创建了一个Thread,在其中指定了countdown,并且配置了参数元组 (10,)

利用start函数,我们启动了一个线程

常见用法还有join函数,会将目标线程加入到当前线程,并持续等待其执行完成

或者给线程对象设置daemon为true,指定为后台线程,不过需要注意一点,Python中的线程会在一个单独的系统级线程中执行。并且由于由于全局解释锁(GIL)的原因。

Python 的线程被限制到同一时刻只允许一个线程执行这样一个执行模型。所以,Python 的线程更适用于处理I/O和其他需要并发执行的阻塞操作

如果希望完成线程之间的协调,那么可以使用一个简单的标志位

class CountdownTask:

def __init__(self):

self._running = True

def terminate(self):

self._running = False

def run(self, n):

while self._running and n > 0:

print(‘T-minus’, n)

n -= 1

time.sleep(5)

c = CountdownTask()

t = Thread(target=c.run, args=(10,))

t.start()

c.terminate() # Signal termination

t.join() # Wait for actual termination (if needed)

如果希望多进程来执行某些代码,可以使用multiprocessing来执行代码

  1. 线程间通信

我们可以利用threadingk库中的Event对象进行通信,可以设置一个信号标志,从而让线程间进行通信

比如如下的代码

from threading import Thread, Event

import time

# Code to execute in an independent thread

def countdown(n, started_evt):

print(‘countdown starting’)

started_evt.set()

while n > 0:

print(‘T-minus’, n)

n -= 1

time.sleep(5)

# Create the event object that will be used to signal startup

started_evt = Event()

# Launch the thread and pass the startup event

print(‘Launching countdown’)

t = Thread(target=countdown, args=(10,started_evt))

t.start()

# Wait for the thread to start

started_evt.wait()

print(‘countdown is running’)

我们创建了一个Event,并且在子线程中设置了这个Event

那么就只有当子线程set之后,主线程的wait才会放行

但是这种event对象,我们给出一个建议,就是只使用一次,不建议使用多次

如果希望多次使用,建议使用Condition

import threading

import time

class PeriodicTimer:

def __init__(self, interval):

self._interval = interval

self._flag = 0

self._cv = threading.Condition()

def start(self):

t = threading.Thread(target=self.run)

t.daemon = True

t.start()

def run(self):

”’

Run the timer and notify waiting threads after each interval

”’

while True:

time.sleep(self._interval)

with self._cv:

self._flag ^= 1

self._cv.notify_all()

def wait_for_tick(self):

”’

Wait for the next tick of the timer

”’

with self._cv:

last_flag = self._flag

while last_flag == self._flag:

self._cv.wait()

# Example use of the timer

ptimer = PeriodicTimer(5)

ptimer.start()

# Two threads that synchronize on the timer

def countdown(nticks):

while nticks > 0:

ptimer.wait_for_tick()

print(‘T-minus’, nticks)

nticks -= 1

def countup(last):

n = 0

while n < last:

ptimer.wait_for_tick()

print(‘Counting’, n)

n += 1

threading.Thread(target=countdown, args=(10,)).start()

threading.Thread(target=countup, args=(5,)).start()

Condition 会唤醒所有等待线程

  1. 并发容器

我们可以利用queue 库中的队列作为并发容器,通过使用 put() 和 get() 操作来向队列中添加或者删除元素,代码如下

from queue import Queue

from threading import Thread

# A thread that produces data

def producer(out_q):

while True:

# Produce some data

out_q.put(data)

# A thread that consumes data

def consumer(in_q):

while True:

# Get some data

data = in_q.get()

# Process the data

# Create the shared queue and launch both threads

q = Queue()

t1 = Thread(target=consumer, args=(q,))

t2 = Thread(target=producer, args=(q,))

t1.start()

t2.start

Queue对象可以保证在多个线程间多安全地共享数据。

如果想要终止,也是支持使用毒丸的使用方式

rom queue import Queue

from threading import Thread

# Object that signals shutdown

_sentinel = object()

# A thread that produces data

def producer(out_q):

while running:

# Produce some data

out_q.put(data)

# Put the sentinel on the queue to indicate completion

out_q.put(_sentinel)

# A thread that consumes data

def consumer(in_q):

while True:

# Get some data

data = in_q.get()

# Check for termination

if data is _sentinel:

in_q.put(_sentinel)

break

# Process the data

唯一需要注意的是,向队列中添加数据项时并不会复制此数据项,线程间通信实际上是在线程间传递对象引用。

最后我们提一下

Queue 对象提供一些在当前上下文很有用的附加特性。比如在创建 Queue 对象时提供可选的 size 参数来限制可以添加到队列中的元素数量。

还支持使用非阻塞的get和put,比如

import queue

q = queue.Queue()

try:

data = q.get(block=False)

except queue.Empty:

try:

q.put(item, block=False)

except queue.Full:

try:

data = q.get(timeout=5.0)

except queue.Empty:

以及使用get和put的超时

def consumer(q):

while _running:

try:

item = q.get(timeout=5.0)

# Process item

except queue.Empty:

pass

并且利用queue.Empty等判断状态

  1. Python的锁

利用threading的Lock对象可以进行枷锁

import threading

class SharedCounter:

”’

A counter object that can be shared by multiple threads.

”’

def __init__(self, initial_value = 0):

self._value = initial_value

self._value_lock = threading.Lock()

def incr(self,delta=1):

”’

Increment the counter with locking

”’

with self._value_lock:

self._value += delta

def decr(self,delta=1):

”’

Decrement the counter with locking

”’

with self._value_lock:

self._value -= delta

利用with语句块,可以保证互斥执行,每次只有一个线程可以进入with语句块执行

并且支持自动加锁和释放锁

当然我们可以进行显式的加锁和释放锁

def incr(self,delta=1):

”’

Increment the counter with locking

”’

self._value_lock.acquire()

self._value += delta

self._value_lock.release()

除了最简单的Lock,还提供了RLock和Semaphore对象

RLock (可重入锁)可以被同一个线程多次获取。Semaphore可以限制线程同时执行数量

  1. 防止死锁的加锁

为了避免出现死锁的情况,我们可以利用一个简单的方法实现

通过对锁排序,使得不管用户以什么样的顺序来请求锁,这些锁都会按照固定的顺序被获取。

@contextmanager

def acquire(*locks):

# Sort locks by object identifier

locks = sorted(locks, key=lambda x: id(x))

# Make sure lock order of previously acquired locks is not violated

acquired = getattr(_local,’acquired’,[])

if acquired and max(id(lock) for lock in acquired) >= id(locks[0]):

raise RuntimeError(‘Lock Order Violation’)

# Acquire all of the locks

acquired.extend(locks)

_local.acquired = acquired

try:

for lock in locks:

lock.acquire()

yield

finally:

# Release locks in reverse order of acquisition

for lock in reversed(locks):

lock.release()

del acquired[-len(locks):]

  1. 线程本地状态

类似Java的ThreadLocal,Python也支持在线程头保存一些信息,利用thread.local() 创建一个本地线程存储对象。 对这个对象的属性的保存和读取操作都只会对执行线程可见,而其他线程并不可见。

我们给出一个简单的代码

from socket import socket, AF_INET, SOCK_STREAM

import threading

class LazyConnection:

def __init__(self, address, family=AF_INET, type=SOCK_STREAM):

self.address = address

self.family = AF_INET

self.type = SOCK_STREAM

self.local = threading.local()

def __enter__(self):

if hasattr(self.local, ‘sock’):

raise RuntimeError(‘Already connected’)

self.local.sock = socket(self.family, self.type)

self.local.sock.connect(self.address)

return self.local.sock

def __exit__(self, exc_ty, exc_val, tb):

self.local.sock.close()

del self.local.sock

利用这个方法我们确保了这个对象,会在线程内部设置保存一个conn进行使用,不会出现线程冲突

  1. 线程池

concurrent.futures 函数库有一个 ThreadPoolExecutor可以用来创建线程池

from socket import AF_INET, SOCK_STREAM, socket

from concurrent.futures import ThreadPoolExecutor

def echo_client(sock, client_addr):

”’

Handle a client connection

”’

print(‘Got connection from’, client_addr)

while True:

msg = sock.recv(65536)

if not msg:

break

sock.sendall(msg)

print(‘Client closed connection’)

sock.close()

def echo_server(addr):

pool = ThreadPoolExecutor(128)

sock = socket(AF_INET, SOCK_STREAM)

sock.bind(addr)

sock.listen(5)

while True:

client_sock, client_addr = sock.accept()

pool.submit(echo_client, client_sock, client_addr)

echo_server((”,15000))

我们设置了128个线程,从而去处理用户的请求

  1. 并行编程

由于存在全局解释器GIL,所以Python执行CPU密集型任务的时候并不快

Concurrent.futures提供了一个ProcessPoolExector类,可以在一个单独的Python解释器中执行密集型函数

我们这里给出一个简单的使用示例

我们有一个目录下多个压缩包,压缩包内文件写有

124.115.6.12 – – [10/Jul/2012:00:18:50 -0500] “GET /robots.txt …” 200 71

210.212.209.67 – – [10/Jul/2012:00:18:51 -0500] “GET /ply/ …” 200 11875

210.212.209.67 – – [10/Jul/2012:00:18:51 -0500] “GET /favicon.ico …” 404 369

61.135.216.105 – – [10/Jul/2012:00:20:04 -0500] “GET /blog/atom.xml …” 304 –

我们需要在其中找到访问了robots.text的文件

我们就可以书写如下的线性代码

def find_robots(filename):

”’

Find all of the hosts that access robots.txt in a single log file

”’

robots = set()

with gzip.open(filename) as f:

for line in io.TextIOWrapper(f,encoding=’ascii’):

fields = line.split()

if fields[6] == ‘/robots.txt’:

robots.add(fields[0])

return robots

def find_all_robots(logdir):

”’

Find all hosts across and entire sequence of files

”’

files = glob.glob(logdir+’/*.log.gz’)

all_robots = set()

for robots in map(find_robots, files):

all_robots.update(robots)

return all_robots

if __name__ == ‘__main__’:

robots = find_all_robots(‘logs’)

for ipaddr in robots:

print(ipaddr)

这样我们利用find_robots这个函数返回了所有的访问ip

如果希望将find_all中的操作替换为多核CPU,只需要将map()操作替换为一个 concurrent.futures 库中生成的类似操作即可

def find_all_robots(logdir):

”’

Find all hosts across and entire sequence of files

”’

files = glob.glob(logdir+’/*.log.gz’)

all_robots = set()

with futures.ProcessPoolExecutor() as pool:

for robots in pool.map(find_robots, files):

all_robots.update(robots)

return all_robots

利用ProcessPoolExecutor进行了加快速度

with ProcessPoolExecutor() as pool:

这样的pool的提交方式可以选择pool.map

或者是利用pool.submit来提交单个任务

future_result = pool.submit(work, arg)

# Obtaining the result (blocks until done)

r = future_result.result()

获取到一个Future实例,利用result方法获取到结果

其还支持设置回调函数来进行在完成之后自动触发回调操作

def when_done(r):

print(‘Got:’, r.result())

with ProcessPoolExecutor() as pool:

future_result = pool.submit(work, arg)

future_result.add_done_callback(when_done)

回调函数接受一个Future并在其中获取到结果

在说完了基本的使用之后,我们看下其需要注意的一些地方

1.提交的任务必须是简单函数形式。对于方法、闭包和其他类型的并行执行还不支持

2.函数参数和返回值必须兼容pickle,因为要使用到进程间的通信,所有解释器之间的交换数据必须被序列化

3. 被提交的任务函数不应保留状态或有副作用。除了打印日志之类简单的事情

  1. 全局锁问题

说了这么多并发问题之后,可能会好奇全局锁问题

这是由于解释器的C语言实现部分在完全并行执行时并不是线程安全的。C相关解释器被一个全局解释器锁保护,确保任何时候只有一个Python线程执行。

这也导致Python中的多线程运行,如果是严重依赖CPU的程序性能并不好

而如果是涉及IO或者网络交互的,那么就很合适,因为大部分都在等待

那么如果我们想要绕开GIL,解决GIL的缺点,那么会有两种方法

1.利用multiprocessing来创建一个进程池

# Processing pool (see below for initiazation)

pool = None

# Performs a large calculation (CPU bound)

def some_work(args):

return result

# A thread that calls the above function

def some_thread():

while True:

r = pool.apply(some_work, (args))

# Initiaze the pool

if __name__ == ‘__main__’:

import multiprocessing

pool = multiprocessing.Pool()

或者是利用C扩展编程技术,将计算密集型任务交给C,跟Python独立,这一点就不赘述了

  1. 定义一个Actor任务

在Python中定义actor任务模式

Actor模型是一个非常简单的分布式计算解决方案,就是简单的执行发送给它的消息任务。 响应这些消息时,它可能还会给其他actor发送更进一步的消息。我们简单的给出一个定义

from queue import Queue

from threading import Thread, Event

# Sentinel used for shutdown

class ActorExit(Exception):

pass

class Actor:

def __init__(self):

self._mailbox = Queue()

def send(self, msg):

”’

Send a message to the actor

”’

self._mailbox.put(msg)

def recv(self):

”’

Receive an incoming message

”’

msg = self._mailbox.get()

if msg is ActorExit:

raise ActorExit()

return msg

def close(self):

”’

Close the actor, thus shutting it down

”’

self.send(ActorExit)

def start(self):

”’

Start concurrent execution

”’

self._terminated = Event()

t = Thread(target=self._bootstrap)

t.daemon = True

t.start()

def _bootstrap(self):

try:

self.run()

except ActorExit:

pass

finally:

self._terminated.set()

def join(self):

self._terminated.wait()

def run(self):

”’

Run method to be implemented by the user

”’

while True:

msg = self.recv()

# Sample ActorTask

class PrintActor(Actor):

def run(self):

while True:

msg = self.recv()

print(‘Got:’, msg)

# Sample use

p = PrintActor()

p.start()

p.send(‘Hello’)

p.send(‘World’)

p.close()

p.join()

我们利用actor实例的 send() 方法发送消息给它们

并且利用run不断的读取这个方法内部的队列

其次在close中我们利用毒丸来关闭线程

这样的一个模型,我们可以利用其在一些大型分布式项目中传输数据

  1. 消息订阅模型

如果我们利用并发容器,也就是Queue的get和put来执行,那么可以书写如下的代码

from collections import defaultdict

class Exchange:

def __init__(self):

self._subscribers = set()

def attach(self, task):

self._subscribers.add(task)

def detach(self, task):

self._subscribers.remove(task)

def send(self, msg):

for subscriber in self._subscribers:

subscriber.send(msg)

# Dictionary of all created exchanges

_exchanges = defaultdict(Exchange)

# Return the Exchange instance associated with a given name

def get_exchange(name):

return _exchanges[name]

上面我们利用attachh和detach 来在exchage中维护一个集合

从而在send的发送msg出去

这样就是一个简单的发布订阅模型的实现

这种模式带来了一个全新的通信模式,可以解耦程序中多个任务

而且为了方便的绑定和解绑,可以使用@contextmanager

@contextmanager

def subscribe(self, *tasks):

for task in tasks:

self.attach(task)

try:

yield

finally:

for task in tasks:

self.detach(task)

with exc.subscribe(task_a, task_b):

exc.send(‘msg1’)

exc.send(‘msg2’)

  1. 生成器代替线程

生成器是一种协程,有时候又被称为用户级线程或绿色线程。

对于基本的yield的使用

# Two simple generator functions

def countdown(n):

while n > 0:

print(‘T-minus’, n)

yield

n -= 1

print(‘Blastoff!’)

def countup(n):

x = 0

while x < n:

print(‘Counting up’, x)

yield

x += 1

from collections import deque

class TaskScheduler:

def __init__(self):

self._task_queue = deque()

def new_task(self, task):

”’

Admit a newly started task to the scheduler

”’

self._task_queue.append(task)

def run(self):

”’

Run until there are no more tasks

”’

while self._task_queue:

task = self._task_queue.popleft()

try:

# Run until the next yield statement

next(task)

self._task_queue.append(task)

except StopIteration:

# Generator is no longer executing

pass

# Example use

sched = TaskScheduler()

sched.new_task(countdown(10))

sched.new_task(countdown(5))

sched.new_task(countup(15))

sched.run()

主线程不断的轮询tast_queue

刺激协程继续往下走

那么我们这里在对yield补充一点

def some_generator():

result = yield data

那么这是一种yield的使用形式

这种情况是,yield后面的是返回数据

Result是传入数据

整体可以是

f = some_generator()

# Initial result. Is None to start since nothing has been computed

result = None

while True:

try:

data = f.send(result)

result = … do some calculation …

except StopIteration:

break

最后提一嘴,使用生成器进行编程,是有很多缺点的,因为很多类库并不支持生成器的线程

发表评论

邮箱地址不会被公开。 必填项已用*标注