线程安全:Python 多线程最容易忽视的五个点

作者:互联网

2026-03-24

AI模型库

多线程,听起来高大上,用起来真香!几行代码就能让程序“分身有术”,性能起飞。

但等等,你的线程安全吗?

我见过太多新手,甚至一些老鸟,写出的多线程代码就像在悬崖边跳舞。平时跑得挺欢,一上线就各种诡异bug:数据莫名丢失、计算结果随机出错、程序偶尔卡死...

线程安全问题,就是多线程世界的“隐形杀手”。它不一定会每次都发作,但一旦发作,调试起来能让你怀疑人生。

今天,小甲鱼就带你扒一扒Python多线程里最容易被忽视的5个线程安全大坑。看完这篇,保证你写多线程代码时,后背发凉,然后... 写出更安全的代码!

一、核心原理:先搞懂什么是“线程不安全”

想象一下这个场景:你和你女朋友同时去ATM机查余额(假设能同时操作)。

  • 你查到余额:1000元
  • 你女朋友同时查到余额:1000元
  • 你取了500元,余额应为500元
  • 你女朋友取了600元,余额应为400元

但最终余额是多少?可能是500元,也可能是400元,甚至可能是-100元!这就是典型的竞态条件(Race Condition)。

1. 线程安全的本质

线程安全就是保证多个线程同时访问共享资源时,程序的行为是可预测且正确的。

关键就两点:

  • 共享资源:多个线程都能访问的数据(全局变量、共享对象等)
  • 非原子操作:看起来是一步,实际上需要多个CPU指令完成的操作

比如 count += 1 这个操作,在Python里至少需要三步:

  • 读取count的当前值
  • 计算count + 1
  • 将新值写回count

线程A刚做完第一步,线程B可能就插进来了! 这就是问题的根源。

2. 锁:多线程世界的“交通信号灯”

Python的threading.Lock就是为了解决这个问题。它就像一个单人卫生间的门锁:

  • 线程A进去后,把门锁上(acquire())
  • 线程B想进去?等着! 直到线程A出来(release())
  • 这样保证同一时间只有一个线程能访问共享资源
lock = threading.Lock()

def safe_function():
    lock.acquire()  # ? 锁上门
    try:
        # 操作共享资源的代码
        pass
    finally:
        lock.release()  # ? 打开门
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.

但锁不是万能的,用不好反而会制造更多问题...

二、实战案例:三个让你惊掉下巴的线程安全问题

1. 案例一:全局变量的“量子态”

你以为的代码:

import threading

counter = 0
results = []

def increment():
    global counter
    for _ in range(100000):
        counter += 1
    results.append(counter)

# 创建10个线程
threads = []
for i in range(10):
    t = threading.Thread(target=increment)
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print(f"最终计数: {counter}")
print(f"预期结果: 1000000")
print(f"实际结果列表: {results}")
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.

你看到的结果:

最终计数: 643289  # 每次运行都不一样!
预期结果: 1000000
实际结果列表: [100000, 200000, 300000, 387654, 487654, ...]  # 乱序且有缺失
  • 1.
  • 2.
  • 3.

为什么?counter += 1 不是原子操作!多个线程同时读取、修改、写回,导致大量操作被覆盖。

修复方案:加锁!

import threading

counter = 0
lock = threading.Lock()  # ? 创建一把锁
results = []

def safe_increment():
    global counter
    for _ in range(100000):
        lock.acquire()  # 上锁
        try:
            counter += 1
        finally:
            lock.release()  # 必须释放锁!
    lock.acquire()
    results.append(counter)
    lock.release()

# 测试代码同上...
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.

优化:使用上下文管理器

def better_increment():
    global counter
    for _ in range(100000):
        with lock:  # 自动管理锁的获取和释放
            counter += 1
    with lock:
        results.append(counter)
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.

2. 案例二:列表操作的“魔法消失术”

场景: 多线程向同一个列表添加元素

import threading

shared_list = []

def add_items(thread_id):
    for i in range(1000):
        # 模拟一些处理时间
        temp = f"Thread-{thread_id}-Item-{i}"
        shared_list.append(temp)

# 启动5个线程
threads = []
for i in range(5):
    t = threading.Thread(target=add_items, args=(i,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print(f"预期元素数量: 5000")
print(f"实际元素数量: {len(shared_list)}")
print(f"列表前10个元素: {shared_list[:10]}")
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.

可能的结果:

  • 元素数量可能正确(运气好)
  • 可能出现 IndexError(列表内部结构损坏)
  • 可能元素丢失或重复

为什么列表不安全?列表的append()操作虽然看起来是一个方法调用,但在CPython内部,它可能触发列表的重新分配和复制。多个线程同时触发这个过程,就会导致内部状态混乱。

正确做法:

import threading

shared_list = []
list_lock = threading.Lock()

def safe_add_items(thread_id):
    for i in range(1000):
        temp = f"Thread-{thread_id}-Item-{i}"
        with list_lock:
            shared_list.append(temp)

# 或者使用线程安全的队列
from queue import Queue
safe_queue = Queue()

def producer(thread_id):
    for i in range(1000):
        safe_queue.put(f"Thread-{thread_id}-Item-{i}")

def consumer():
    while True:
        item = safe_queue.get()
        if item is None:
            break
        # 处理item...
        safe_queue.task_done()
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.

3. 案例三:文件操作的“内容混搭”

场景: 多个线程同时写入同一个文件

import threading
import time

def write_to_file(thread_id):
    with open('shared_log.txt', 'a') as f:
        for i in range(100):
            f.write(f"Thread {thread_id}: Line {i}n")
            time.sleep(0.001)  # 模拟耗时操作

# 启动3个线程同时写入
threads = []
for i in range(3):
    t = threading.Thread(target=write_to_file, args=(i,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

# 检查文件内容
with open('shared_log.txt', 'r') as f:
    lines = f.readlines()
    print(f"总行数: {len(lines)}")
    print("最后10行:")
    for line in lines[-10:]:
        print(line, end='')
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.

文件内容可能变成这样:

Thread 0: Line 99
Thread 1: Line 9Thread 2: Line 88
Thread 0: Line 98
Thread 1: Line 9Thread 2: Line 87
...
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.

为什么?文件的write()操作不是原子的,多个线程的写入会相互干扰,导致内容错乱。

解决方案:

import threading

file_lock = threading.Lock()

def safe_write_to_file(thread_id):
    for i in range(100):
        with file_lock:
            with open('shared_log.txt', 'a') as f:
                f.write(f"Thread {thread_id}: Line {i}n")
        time.sleep(0.001)

# 或者使用专门的日志模块,它内部已经处理了线程安全
import logging
logging.basicConfig(
    filename='thread_safe.log',
    level=logging.INFO,
    format='%(asctime)s - %(threadName)s - %(message)s'
)

def log_with_logging(thread_id):
    for i in range(100):
        logging.info(f"Thread {thread_id}: Line {i}")
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.

三、高级技巧:不只是Lock那么简单

1. 技巧一:RLock(可重入锁)

问题场景: 函数递归调用时,普通锁会死锁

import threading

lock = threading.Lock()

def recursive_function(n):
    lock.acquire()
    try:
        if n > 0:
            print(f"进入第{n}层")
            recursive_function(n-1)  # ? 这里会再次尝试获取锁,导致死锁!
            print(f"离开第{n}层")
    finally:
        lock.release()

# 这会导致死锁!
# recursive_function(3)
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.

解决方案:使用RLock

import threading

rlock = threading.RLock()  # 可重入锁

def safe_recursive_function(n):
    with rlock:  # 同一线程可以多次获取同一个RLock
        if n > 0:
            print(f"进入第{n}层")
            safe_recursive_function(n-1)  # ✅ 可以再次获取
            print(f"离开第{n}层")

safe_recursive_function(3)
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.

RLock原理:

  • 内部维护一个拥有者线程和递归计数器
  • 同一线程多次获取时,计数器+1
  • 每次释放时,计数器-1
  • 只有计数器归零时,其他线程才能获取

2. 技巧二:Semaphore(信号量)

场景: 需要控制同时访问资源的线程数量(比如数据库连接池)

import threading
import time
from random import random

# 最多允许3个线程同时访问
semaphore = threading.Semaphore(3)

def access_database(thread_id):
    print(f"线程 {thread_id} 等待数据库连接...")
    
    with semaphore:  # 获取信号量
        print(f"线程 {thread_id} 获得连接,正在查询...")
        time.sleep(random() * 2)  # 模拟查询时间
        print(f"线程 {thread_id} 查询完成,释放连接")

# 启动10个线程,但同时只有3个能访问数据库
threads = []
for i in range(10):
    t = threading.Thread(target=access_database, args=(i,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.

输出示例:

线程 0 等待数据库连接...
线程 0 获得连接,正在查询...
线程 1 等待数据库连接...
线程 1 获得连接,正在查询...
线程 2 等待数据库连接...
线程 2 获得连接,正在查询...
线程 3 等待数据库连接...  # 必须等待,直到有信号量释放
...
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.

3. 技巧三:Condition(条件变量)

场景: 生产者-消费者模型

import threading
import time
from collections import deque

class ProducerConsumer:
    def __init__(self, capacity=10):
        self.buffer = deque()
        self.capacity = capacity
        self.condition = threading.Condition()
    
    def produce(self, item):
        with self.condition:
            # 缓冲区满时等待
            while len(self.buffer) >= self.capacity:
                print("缓冲区已满,生产者等待...")
                self.condition.wait()
            
            self.buffer.append(item)
            print(f"生产: {item}, 缓冲区大小: {len(self.buffer)}")
            
            # 通知消费者
            self.condition.notify_all()
    
    def consume(self):
        with self.condition:
            # 缓冲区空时等待
            while len(self.buffer) == 0:
                print("缓冲区为空,消费者等待...")
                self.condition.wait()
            
            item = self.buffer.popleft()
            print(f"消费: {item}, 缓冲区大小: {len(self.buffer)}")
            
            # 通知生产者
            self.condition.notify_all()
            return item

# 测试
pc = ProducerConsumer(capacity=5)

def producer():
    for i in range(20):
        pc.produce(f"产品-{i}")
        time.sleep(0.1)

def consumer():
    for _ in range(20):
        item = pc.consume()
        time.sleep(0.15)

# 启动生产者和消费者
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start()
t2.start()
t1.join()
t2.join()
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.
  • 57.

4. 技巧四:Event(事件)

场景: 线程间简单的信号通知

import threading
import time

# 创建事件对象
data_ready = threading.Event()
data = None

def data_producer():
    global data
    time.sleep(2)  # 模拟数据准备
    data = {"status": "success", "value": 42}
    
    # 设置事件,通知消费者数据已准备好
    data_ready.set()
    print("生产者: 数据已准备好")

def data_consumer():
    print("消费者: 等待数据...")
    
    # 等待事件被设置
    data_ready.wait()
    
    print(f"消费者: 收到数据 {data}")

# 启动线程
t1 = threading.Thread(target=data_consumer)
t2 = threading.Thread(target=data_producer)

t1.start()
t2.start()

t1.join()
t2.join()
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.

四、常见误区:这些错误你可能正在犯

1. 误区一:忘记释放锁

错误代码:

lock = threading.Lock()

def dangerous_function():
    lock.acquire()
    # 做一些事情
    if some_condition:
        return  # ? 直接返回,锁永远不会释放!
    # 更多代码...
    lock.release()
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.

正确做法:

def safe_function():
    lock.acquire()
    try:
        # 做一些事情
        if some_condition:
            return  # ✅ 即使返回,finally也会执行
        # 更多代码...
    finally:
        lock.release()  # 确保锁被释放

# 或者使用上下文管理器(推荐)
def even_better_function():
    with lock:
        # 做一些事情
        if some_condition:
            return  # ✅ 自动释放锁
        # 更多代码...
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.

2. 误区二:锁的粒度太大

错误代码: 锁住整个函数,性能极差

lock = threading.Lock()

def slow_function():
    with lock:  # ? 整个函数都被锁住
        # 一些不需要锁的操作
        time.sleep(1)  # 耗时操作
        # 只有这一小部分需要锁
        update_shared_data()
        # 更多不需要锁的操作
        process_data()
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.

正确做法: 只锁住必要的部分

def fast_function():
    # 不需要锁的操作
    time.sleep(1)
    prepare_data()
    
    # 只锁住真正需要同步的部分
    with lock:
        update_shared_data()
    
    # 不需要锁的操作
    process_data()
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.

3. 误区三:在锁内调用外部函数

危险代码:

lock = threading.Lock()

def function_a():
    with lock:
        # 调用外部函数,不知道它内部是否也会获取锁
        external_function()  # ? 可能导致死锁!

def external_function():
    # 如果这个函数内部也尝试获取同一个锁...
    with lock:  # ? 死锁!
        pass
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.

原则:

  • 在锁内尽量只操作简单的数据结构
  • 避免在锁内调用复杂的外部函数
  • 如果必须调用,确保了解被调用函数的锁行为

4. 误区四:以为某些操作是线程安全的

常见误解:

# 很多人以为这些是线程安全的,其实不是!

# 1. 列表操作
shared_list = []
shared_list.append(item)  # ❌ 不安全
shared_list.pop()         # ❌ 不安全

# 2. 字典操作
shared_dict = {}
shared_dict[key] = value  # ❌ 不安全(在resize时可能出问题)
value = shared_dict[key]  # ❌ 不安全

# 3. 文件操作
with open('file.txt', 'a') as f:
    f.write('data')  # ❌ 不安全

# 4. 甚至print()也不是完全安全的!
print("Hello")  # ❌ 可能与其他线程的输出混在一起
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.

正确做法: 对所有共享资源的访问都要加锁!

5. 误区五:过度同步导致性能问题

错误模式:

# 每个微小的操作都加锁
lock = threading.Lock()

def over_synced():
    for i in range(1000000):
        with lock:
            x = i * 2  # ? 这根本不需要锁!
        with lock:
            y = x + 1  # ? 这也不需要!
        with lock:
            results.append(y)  # 只有这里需要锁
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.

优化策略:

  • 批处理操作:积累一批数据,一次性加锁处理
  • 使用线程本地存储:每个线程有自己的数据副本
  • 考虑无锁数据结构:如queue.Queue、collections.deque(在特定操作下)

五、总结:线程安全的五个黄金法则

  • 识别共享资源:所有能被多个线程访问的数据都是潜在风险点
  • 保护所有访问:对共享资源的每一次读写操作都要加锁
  • 锁的粒度要适中:太大会影响性能,太小容易遗漏
  • 使用高级同步工具:根据场景选择RLock、Semaphore、Condition等
  • 测试!测试!测试!:多线程bug难以复现,要充分测试并发场景

记住: 在多线程世界里,没有所谓的"大部分时候正确"。要么完全正确,要么就是定时炸弹。

相关标签:

AI 大模型 资讯