13. 线程的奇淫巧技
- 线程的基本使用
在python中的threading库可以可以创建线程并执行指定的对象
比如
import time
def countdown(n): while n > 0: print(‘T-minus’, n) n -= 1 time.sleep(5) # Create and launch a thread from threading import Thread t = Thread(target=countdown, args=(10,)) t.start() |
上面我们创建了一个Thread,在其中指定了countdown,并且配置了参数元组 (10,)
利用start函数,我们启动了一个线程
常见用法还有join函数,会将目标线程加入到当前线程,并持续等待其执行完成
或者给线程对象设置daemon为true,指定为后台线程,不过需要注意一点,Python中的线程会在一个单独的系统级线程中执行。并且由于由于全局解释锁(GIL)的原因。
Python 的线程被限制到同一时刻只允许一个线程执行这样一个执行模型。所以,Python 的线程更适用于处理I/O和其他需要并发执行的阻塞操作
如果希望完成线程之间的协调,那么可以使用一个简单的标志位
class CountdownTask:
def __init__(self): self._running = True def terminate(self): self._running = False def run(self, n): while self._running and n > 0: print(‘T-minus’, n) n -= 1 time.sleep(5) c = CountdownTask() t = Thread(target=c.run, args=(10,)) t.start() c.terminate() # Signal termination t.join() # Wait for actual termination (if needed) |
如果希望多进程来执行某些代码,可以使用multiprocessing来执行代码
- 线程间通信
我们可以利用threadingk库中的Event对象进行通信,可以设置一个信号标志,从而让线程间进行通信
比如如下的代码
from threading import Thread, Event
import time # Code to execute in an independent thread def countdown(n, started_evt): print(‘countdown starting’) started_evt.set() while n > 0: print(‘T-minus’, n) n -= 1 time.sleep(5) # Create the event object that will be used to signal startup started_evt = Event() # Launch the thread and pass the startup event print(‘Launching countdown’) t = Thread(target=countdown, args=(10,started_evt)) t.start() # Wait for the thread to start started_evt.wait() print(‘countdown is running’) |
我们创建了一个Event,并且在子线程中设置了这个Event
那么就只有当子线程set之后,主线程的wait才会放行
但是这种event对象,我们给出一个建议,就是只使用一次,不建议使用多次
如果希望多次使用,建议使用Condition
import threading
import time class PeriodicTimer: def __init__(self, interval): self._interval = interval self._flag = 0 self._cv = threading.Condition() def start(self): t = threading.Thread(target=self.run) t.daemon = True t.start() def run(self): ”’ Run the timer and notify waiting threads after each interval ”’ while True: time.sleep(self._interval) with self._cv: self._flag ^= 1 self._cv.notify_all() def wait_for_tick(self): ”’ Wait for the next tick of the timer ”’ with self._cv: last_flag = self._flag while last_flag == self._flag: self._cv.wait() # Example use of the timer ptimer = PeriodicTimer(5) ptimer.start() # Two threads that synchronize on the timer def countdown(nticks): while nticks > 0: ptimer.wait_for_tick() print(‘T-minus’, nticks) nticks -= 1 def countup(last): n = 0 while n < last: ptimer.wait_for_tick() print(‘Counting’, n) n += 1 threading.Thread(target=countdown, args=(10,)).start() threading.Thread(target=countup, args=(5,)).start() |
Condition 会唤醒所有等待线程
- 并发容器
我们可以利用queue 库中的队列作为并发容器,通过使用 put() 和 get() 操作来向队列中添加或者删除元素,代码如下
from queue import Queue
from threading import Thread # A thread that produces data def producer(out_q): while True: # Produce some data … out_q.put(data) # A thread that consumes data def consumer(in_q): while True: # Get some data data = in_q.get() # Process the data … # Create the shared queue and launch both threads q = Queue() t1 = Thread(target=consumer, args=(q,)) t2 = Thread(target=producer, args=(q,)) t1.start() t2.start |
Queue对象可以保证在多个线程间多安全地共享数据。
如果想要终止,也是支持使用毒丸的使用方式
rom queue import Queue
from threading import Thread # Object that signals shutdown _sentinel = object() # A thread that produces data def producer(out_q): while running: # Produce some data … out_q.put(data) # Put the sentinel on the queue to indicate completion out_q.put(_sentinel) # A thread that consumes data def consumer(in_q): while True: # Get some data data = in_q.get() # Check for termination if data is _sentinel: in_q.put(_sentinel) break # Process the data |
唯一需要注意的是,向队列中添加数据项时并不会复制此数据项,线程间通信实际上是在线程间传递对象引用。
最后我们提一下
Queue 对象提供一些在当前上下文很有用的附加特性。比如在创建 Queue 对象时提供可选的 size 参数来限制可以添加到队列中的元素数量。
还支持使用非阻塞的get和put,比如
import queue
q = queue.Queue() try: data = q.get(block=False) except queue.Empty: … try: q.put(item, block=False) except queue.Full: … try: data = q.get(timeout=5.0) except queue.Empty: … |
以及使用get和put的超时
def consumer(q):
while _running: try: item = q.get(timeout=5.0) # Process item … except queue.Empty: pass |
并且利用queue.Empty等判断状态
- Python的锁
利用threading的Lock对象可以进行枷锁
import threading
class SharedCounter: ”’ A counter object that can be shared by multiple threads. ”’ def __init__(self, initial_value = 0): self._value = initial_value self._value_lock = threading.Lock() def incr(self,delta=1): ”’ Increment the counter with locking ”’ with self._value_lock: self._value += delta def decr(self,delta=1): ”’ Decrement the counter with locking ”’ with self._value_lock: self._value -= delta |
利用with语句块,可以保证互斥执行,每次只有一个线程可以进入with语句块执行
并且支持自动加锁和释放锁
当然我们可以进行显式的加锁和释放锁
def incr(self,delta=1):
”’ Increment the counter with locking ”’ self._value_lock.acquire() self._value += delta self._value_lock.release() |
除了最简单的Lock,还提供了RLock和Semaphore对象
RLock (可重入锁)可以被同一个线程多次获取。Semaphore可以限制线程同时执行数量
- 防止死锁的加锁
为了避免出现死锁的情况,我们可以利用一个简单的方法实现
通过对锁排序,使得不管用户以什么样的顺序来请求锁,这些锁都会按照固定的顺序被获取。
@contextmanager
def acquire(*locks): # Sort locks by object identifier locks = sorted(locks, key=lambda x: id(x)) # Make sure lock order of previously acquired locks is not violated acquired = getattr(_local,’acquired’,[]) if acquired and max(id(lock) for lock in acquired) >= id(locks[0]): raise RuntimeError(‘Lock Order Violation’) # Acquire all of the locks acquired.extend(locks) _local.acquired = acquired try: for lock in locks: lock.acquire() yield finally: # Release locks in reverse order of acquisition for lock in reversed(locks): lock.release() del acquired[-len(locks):] |
- 线程本地状态
类似Java的ThreadLocal,Python也支持在线程头保存一些信息,利用thread.local() 创建一个本地线程存储对象。 对这个对象的属性的保存和读取操作都只会对执行线程可见,而其他线程并不可见。
我们给出一个简单的代码
from socket import socket, AF_INET, SOCK_STREAM
import threading class LazyConnection: def __init__(self, address, family=AF_INET, type=SOCK_STREAM): self.address = address self.family = AF_INET self.type = SOCK_STREAM self.local = threading.local() def __enter__(self): if hasattr(self.local, ‘sock’): raise RuntimeError(‘Already connected’) self.local.sock = socket(self.family, self.type) self.local.sock.connect(self.address) return self.local.sock def __exit__(self, exc_ty, exc_val, tb): self.local.sock.close() del self.local.sock |
利用这个方法我们确保了这个对象,会在线程内部设置保存一个conn进行使用,不会出现线程冲突
- 线程池
concurrent.futures 函数库有一个 ThreadPoolExecutor可以用来创建线程池
from socket import AF_INET, SOCK_STREAM, socket
from concurrent.futures import ThreadPoolExecutor def echo_client(sock, client_addr): ”’ Handle a client connection ”’ print(‘Got connection from’, client_addr) while True: msg = sock.recv(65536) if not msg: break sock.sendall(msg) print(‘Client closed connection’) sock.close() def echo_server(addr): pool = ThreadPoolExecutor(128) sock = socket(AF_INET, SOCK_STREAM) sock.bind(addr) sock.listen(5) while True: client_sock, client_addr = sock.accept() pool.submit(echo_client, client_sock, client_addr) echo_server((”,15000)) |
我们设置了128个线程,从而去处理用户的请求
- 并行编程
由于存在全局解释器GIL,所以Python执行CPU密集型任务的时候并不快
Concurrent.futures提供了一个ProcessPoolExector类,可以在一个单独的Python解释器中执行密集型函数
我们这里给出一个简单的使用示例
我们有一个目录下多个压缩包,压缩包内文件写有
124.115.6.12 – – [10/Jul/2012:00:18:50 -0500] “GET /robots.txt …” 200 71
210.212.209.67 – – [10/Jul/2012:00:18:51 -0500] “GET /ply/ …” 200 11875 210.212.209.67 – – [10/Jul/2012:00:18:51 -0500] “GET /favicon.ico …” 404 369 61.135.216.105 – – [10/Jul/2012:00:20:04 -0500] “GET /blog/atom.xml …” 304 – |
我们需要在其中找到访问了robots.text的文件
我们就可以书写如下的线性代码
def find_robots(filename):
”’ Find all of the hosts that access robots.txt in a single log file ”’ robots = set() with gzip.open(filename) as f: for line in io.TextIOWrapper(f,encoding=’ascii’): fields = line.split() if fields[6] == ‘/robots.txt’: robots.add(fields[0]) return robots def find_all_robots(logdir): ”’ Find all hosts across and entire sequence of files ”’ files = glob.glob(logdir+’/*.log.gz’) all_robots = set() for robots in map(find_robots, files): all_robots.update(robots) return all_robots if __name__ == ‘__main__’: robots = find_all_robots(‘logs’) for ipaddr in robots: print(ipaddr) |
这样我们利用find_robots这个函数返回了所有的访问ip
如果希望将find_all中的操作替换为多核CPU,只需要将map()操作替换为一个 concurrent.futures 库中生成的类似操作即可
def find_all_robots(logdir):
”’ Find all hosts across and entire sequence of files ”’ files = glob.glob(logdir+’/*.log.gz’) all_robots = set() with futures.ProcessPoolExecutor() as pool: for robots in pool.map(find_robots, files): all_robots.update(robots) return all_robots |
利用ProcessPoolExecutor进行了加快速度
with ProcessPoolExecutor() as pool:
这样的pool的提交方式可以选择pool.map
或者是利用pool.submit来提交单个任务
future_result = pool.submit(work, arg)
# Obtaining the result (blocks until done) r = future_result.result() |
获取到一个Future实例,利用result方法获取到结果
其还支持设置回调函数来进行在完成之后自动触发回调操作
def when_done(r):
print(‘Got:’, r.result()) with ProcessPoolExecutor() as pool: future_result = pool.submit(work, arg) future_result.add_done_callback(when_done) |
回调函数接受一个Future并在其中获取到结果
在说完了基本的使用之后,我们看下其需要注意的一些地方
1.提交的任务必须是简单函数形式。对于方法、闭包和其他类型的并行执行还不支持
2.函数参数和返回值必须兼容pickle,因为要使用到进程间的通信,所有解释器之间的交换数据必须被序列化
3. 被提交的任务函数不应保留状态或有副作用。除了打印日志之类简单的事情
- 全局锁问题
说了这么多并发问题之后,可能会好奇全局锁问题
这是由于解释器的C语言实现部分在完全并行执行时并不是线程安全的。C相关解释器被一个全局解释器锁保护,确保任何时候只有一个Python线程执行。
这也导致Python中的多线程运行,如果是严重依赖CPU的程序性能并不好
而如果是涉及IO或者网络交互的,那么就很合适,因为大部分都在等待
那么如果我们想要绕开GIL,解决GIL的缺点,那么会有两种方法
1.利用multiprocessing来创建一个进程池
# Processing pool (see below for initiazation)
pool = None # Performs a large calculation (CPU bound) def some_work(args): … return result # A thread that calls the above function def some_thread(): while True: … r = pool.apply(some_work, (args)) … # Initiaze the pool if __name__ == ‘__main__’: import multiprocessing pool = multiprocessing.Pool() |
或者是利用C扩展编程技术,将计算密集型任务交给C,跟Python独立,这一点就不赘述了
- 定义一个Actor任务
在Python中定义actor任务模式
Actor模型是一个非常简单的分布式计算解决方案,就是简单的执行发送给它的消息任务。 响应这些消息时,它可能还会给其他actor发送更进一步的消息。我们简单的给出一个定义
from queue import Queue
from threading import Thread, Event # Sentinel used for shutdown class ActorExit(Exception): pass class Actor: def __init__(self): self._mailbox = Queue() def send(self, msg): ”’ Send a message to the actor ”’ self._mailbox.put(msg) def recv(self): ”’ Receive an incoming message ”’ msg = self._mailbox.get() if msg is ActorExit: raise ActorExit() return msg def close(self): ”’ Close the actor, thus shutting it down ”’ self.send(ActorExit) def start(self): ”’ Start concurrent execution ”’ self._terminated = Event() t = Thread(target=self._bootstrap) t.daemon = True t.start() def _bootstrap(self): try: self.run() except ActorExit: pass finally: self._terminated.set() def join(self): self._terminated.wait() def run(self): ”’ Run method to be implemented by the user ”’ while True: msg = self.recv() # Sample ActorTask class PrintActor(Actor): def run(self): while True: msg = self.recv() print(‘Got:’, msg) # Sample use p = PrintActor() p.start() p.send(‘Hello’) p.send(‘World’) p.close() p.join() |
我们利用actor实例的 send() 方法发送消息给它们
并且利用run不断的读取这个方法内部的队列
其次在close中我们利用毒丸来关闭线程
这样的一个模型,我们可以利用其在一些大型分布式项目中传输数据
- 消息订阅模型
如果我们利用并发容器,也就是Queue的get和put来执行,那么可以书写如下的代码
from collections import defaultdict
class Exchange: def __init__(self): self._subscribers = set() def attach(self, task): self._subscribers.add(task) def detach(self, task): self._subscribers.remove(task) def send(self, msg): for subscriber in self._subscribers: subscriber.send(msg) # Dictionary of all created exchanges _exchanges = defaultdict(Exchange) # Return the Exchange instance associated with a given name def get_exchange(name): return _exchanges[name] |
上面我们利用attachh和detach 来在exchage中维护一个集合
从而在send的发送msg出去
这样就是一个简单的发布订阅模型的实现
这种模式带来了一个全新的通信模式,可以解耦程序中多个任务
而且为了方便的绑定和解绑,可以使用@contextmanager
@contextmanager
def subscribe(self, *tasks): for task in tasks: self.attach(task) try: yield finally: for task in tasks: self.detach(task) with exc.subscribe(task_a, task_b): … exc.send(‘msg1’) exc.send(‘msg2’) |
- 生成器代替线程
生成器是一种协程,有时候又被称为用户级线程或绿色线程。
对于基本的yield的使用
# Two simple generator functions
def countdown(n): while n > 0: print(‘T-minus’, n) yield n -= 1 print(‘Blastoff!’) def countup(n): x = 0 while x < n: print(‘Counting up’, x) yield x += 1 |
from collections import deque
class TaskScheduler: def __init__(self): self._task_queue = deque() def new_task(self, task): ”’ Admit a newly started task to the scheduler ”’ self._task_queue.append(task) def run(self): ”’ Run until there are no more tasks ”’ while self._task_queue: task = self._task_queue.popleft() try: # Run until the next yield statement next(task) self._task_queue.append(task) except StopIteration: # Generator is no longer executing pass # Example use sched = TaskScheduler() sched.new_task(countdown(10)) sched.new_task(countdown(5)) sched.new_task(countup(15)) sched.run() |
主线程不断的轮询tast_queue
刺激协程继续往下走
那么我们这里在对yield补充一点
def some_generator():
… result = yield data |
那么这是一种yield的使用形式
这种情况是,yield后面的是返回数据
Result是传入数据
整体可以是
f = some_generator()
# Initial result. Is None to start since nothing has been computed result = None while True: try: data = f.send(result) result = … do some calculation … except StopIteration: break |
最后提一嘴,使用生成器进行编程,是有很多缺点的,因为很多类库并不支持生成器的线程