作者:E4b9a6, 创建:2022-04-16, 字数:6429, 已阅:56, 最后更新:2022-04-16
在Python中,如果有多个计算任务需要并行计算,通常会采用多线程(multithreading)或者协程(coroutine)的方式来完成
本文是基于Python3.9
官方文档说明,针对一些开发中常见的应用进行总结
线程是操作系统进行运算调度的最小单位,一个进程可以有多个线程,多个线程之间共享当前进程的全部系统资源
因此,线程也被称为"轻量级进程"
线程虽然共享进程的全部系统资源,但仍然拥有自己的独立资源部分
由于共享当前进程所有系统资源,所以一个线程可以轻易摧毁其他线程的独立资源部分
线程优点
线程缺点
Linux下可通过 ulimit -s
来查看为每一个线程默认分配内存大小
在Python3中启动一个线程非常简单
编辑一个main.py
,代码如下
import datetime
import time
from threading import Thread
letters = ['a', 'b', 'c', 'd', 'e', 'f']
def PrintLetter(sleep: int = 5):
while len(letters) > 0:
print('%s Pop letter: %s' % (datetime.datetime.now().strftime('%H:%M:%S'), letters.pop()))
time.sleep(sleep)
print('Print success')
t = Thread(target=PrintLetter, args=(1, ))
t.start()
输出如下
17:30:10 Pop letter: f
17:30:11 Pop letter: e
17:30:12 Pop letter: d
17:30:13 Pop letter: c
17:30:14 Pop letter: b
17:30:15 Pop letter: a
但需要注意的是,Python中的线程是通过操作系统提供的底层线程机制来实现的,Python解释器会将线程映射到操作系统的底层线程,比如POSIX线程(在类Unix系统上)或Windows线程(在Windows系统上)
这会导致一些常见的注意事项,如全局GIL锁
和平台兼容性
实际开发中,经常需要线程之间互相通信或订阅某些特定信息来决定是否执行下一步
线程之间的消息传递
queue.Queue
是线程安全的,可用于线程之间交互threding.Event
可用于状态监听,以满足特别数据在被线程处理完成后由其他线程进行下一步处理假设有2个线程,1个生产者线程负责推送字符串"hello",等待消费者线程在字符串后添加"world"后生产者线程立刻打印出来
线程间通信实际上是在线程间传递对象引用
编辑main.py
,实现如下
from multiprocessing import Event
from queue import Queue
from threading import Thread, Event
import time
def Producet(q: Queue):
while True:
e = Event()
hello = ['hello']
q.put((hello, e))
print('%s Producet push data "hello"' % time.time())
e.wait()
print('%s Producet say %s\n' % (time.time(), ' '.join(hello)))
def consumer(q: Queue):
while True:
# Until the queue has to value
data, e = q.get()
time.sleep(5)
data = data.append('world')
e.set()
print('%s Consumer add "world" to data \n' % time.time())
qe = Queue()
t1 = Thread(target=Producet, args=(qe, ))
t1.start()
t2 = Thread(target=consumer, args=(qe, ))
t2.start()
输出如下,可以看到生产者线程在加工后的1毫秒内拿到了”hello world“字符串
1652840455.1704156 Producet push data "hello"
1652840460.1712673 Consumer add "world" to data
1652840460.1713297 Producet say hello world
有时不好确认任务数量大小,那么线程池便派上用场了,以产生随机字符串为例,每次产生1个随机字符串需要1秒,我们使用2条线程来生成4个长度分别为5,6,7,8的随机字符串
编辑main.py
,代码如下
from concurrent.futures import ThreadPoolExecutor
import random
import threading
import time
def RomdomString(length: int):
romdom_Str = ''
while length > 0:
num = random.randint(0, 9)
s = str(random.choice([num, chr(random.randint(65, 90))]))
romdom_Str += s
length -= 1
time.sleep(1)
t = threading.currentThread()
print('Thread(%s) romdom string: %s' % (t.native_id, romdom_Str))
return romdom_Str
start = time.time()
pool = ThreadPoolExecutor(2)
t1 = pool.submit(RomdomString, 5)
t2 = pool.submit(RomdomString, 6)
t3 = pool.submit(RomdomString, 7)
t4 = pool.submit(RomdomString, 8)
print('All thread start')
print('T1 string: %s\nT2 string: %s\nT3 string: %s\nT4 string: %s' % (t1.result(), t2.result(), t3.result(), t4.result()))
print('Run time: %0.3f' % (time.time() - start))
输出如下
All thread start
Thread(13538) romdom string: VZFSV
Thread(13541) romdom string: E4PM0S
Thread(13538) romdom string: M2QN7XX
Thread(13541) romdom string: 2MZ33XH3
T1 string: VZFSV
T2 string: E4PM0S
T3 string: M2QN7XX
T4 string: 2MZ33XH3
Run time: 14.018
可以看到无论何时都只有2条线程在处理字符串的生成,且自行调度处理剩余任务,线程池非常适合处理大量IO堵塞型任务的场景
通常,你也应该只在I/O处理相关代码中使用线程池
GIL
(Global Interpreter Lock),即全局解释锁,在Python中,GIL是一种线程级别的锁,用于保护解释器内部的数据结构不受并发访问的影响
简单来说,GIL限制了Python解释器同一时间只能执行一个线程的字节码,即同一时刻只有一个线程在解释和执行Python代码,这意味着在多线程的情况下,Python无法真正实现多核的并行计算
GIL的存在是因为CPython
解释器的设计和实现方式,CPython
是Python的一种主要实现,它使用了引用计数(reference counting)作为内存管理的方式
GIL的主要作用是保护引用计数的一致性,确保在多线程环境下引用计数的操作是线程安全的
由于同一时间只有一个线程在执行Python代码,所以在多线程的情况下,无法充分利用多核处理器的性能优势
因此,在需要并行执行计算密集型任务时,使用多线程可能并不能提高性能,反而可能会导致性能下降
Python的解释器实现并不只有CPython一种,在JPython的实现中并没有GIL锁
综上,结论有2点
例如对于下列程序,计算 100000000 的累加值
import time
def Add(n: int):
count = 1
for i in range(2, n):
count = count + i
print('Count: %s' % count)
start = time.time()
Add(100000000)
Add(100000000)
print('Run time: %0.3f' % (time.time() - start))
输出如下
Count: 4999999950000000
Count: 4999999950000000
Run time: 15.024
我们采用线程的方式来并行计算2个 100000000 的累加值
import time
from threading import Thread
def Add(n: int):
count = 1
for i in range(2, n):
count = count + i
print('Count: %s ' % count)
start = time.time()
t1 = Thread(target=Add, args=(100000000, ))
t1.start()
t2 = Thread(target=Add, args=(100000000, ))
t2.start()
# Wait t1 and t2 completed
t1.join()
t2.join()
print('Run time: %0.3f' % (time.time() - start))
输出结果
Count: 4999999950000000
Count: 4999999950000000
Run time: 15.406
可以看到相对于顺序执行其速度没有区别,对于计算密集型任务而言,线程并不能使处理速度变快
如果一定要在Python中处理计算密集型任务,可以考虑进程池
现实中任何多线程调度的操作多半要考虑到原子操作
(atomic operation),即固定步骤的执行顺序不应被线程调度机制所打断乃至出现异常
如以下 Add
方法,每调用1次,则count的值增大500000,2条线程各调一次,理论上应该输出1000000
from threading import Thread
count = 0
def Add():
global count
i = 0
while i < 500000:
count += 1
i += 1
t1 = Thread(target=Add)
t1.start()
t2 = Thread(target=Add)
t2.start()
# Wait t1 and t2 completed
t1.join()
t2.join()
print('Count value : %d' % count)
输出如下,Count的值并不是1000000,多次执行可以发现这个数变化不一,这就是原子操作被破坏的体现
Count value : 804263
锁可以在实际开发中很好的避免这种问题,但通常也伴随着一定的性能损耗
from threading import Thread, Lock
count = 0
lock = Lock()
def Add():
global count
i = 0
while i < 500000:
with lock:
count += 1
i += 1
t1 = Thread(target=Add)
t1.start()
t2 = Thread(target=Add)
t2.start()
# Wait t1 and t2 completed
t1.join()
t2.join()
print('Count value : %d' % count)
输出如下,多次执行Count的值都是 1000000
Count value : 1000000
资料来源