多线程应用案例
简单多线程示例
简单多线程使用示例,如下所示创建线程A、B,各自做自己的独立任务。因此在需要任务长时间运行,并且会阻塞后续任务执行时,可以采用多线程的方式进行执行。
import _thread
import utime
# 线程 B 函数入口,每隔 3 秒打印一次。
def thread_entry_B(id):
while True:
print('thread {} is running.'.format(id))
utime.sleep(3)
# 线程 A 函数入口,每隔 3 秒打印一次。
def thread_entry_A(id):
while True:
print('thread {} is running.'.format(id))
utime.sleep(3)
# 创建线程
_thread.start_new_thread(thread_entry_A, ('A',))
_thread.start_new_thread(thread_entry_B, ('B',))
多线程原子性操作
原子操作指不会被线程调度机制打断的操作,这种操作一旦开始,就一直运行到结束,中间不会切换到其他线程。
多线程原子性操作,它表示在多个线程访问同一个共享资源的时候,能够确保所有其他的线程都不在同一时间内访问相同的资源。因此在多线程对同一资源进行访问时,但是该资源无法同时进行访问可以采用该方法实现。
如下示例线程 A、B 同时操作共享资源,加锁对共享资源进行保护,保证共享资源在同时只能一个线程访问。
import _thread
import utime
lock = _thread.allocate_lock()
count = 1
def shared_res(id):
global count
with lock:
print( 'thread {} count {}.'.format(id, count))
count += 1
# 线程 B 函数入口,对共享资源 count 进行累加操作。
def thread_entry_B(id):
while True:
shared_res(id)
utime.sleep(1)
# 线程 A 函数入口,对共享资源 count 进行累加操作。
def thread_entry_A(id):
global count
while True:
shared_res(id)
utime.sleep(1)
# 创建线程
_thread.start_new_thread(thread_entry_A, ('A',))
_thread.start_new_thread(thread_entry_B, ('B',))
多线程顺序执行
多线程顺序执行,通过信号量控制,实现多线程按照顺序进行执行。常用于对于多线程执行有依赖关系,可以通过该种方式进行控制运行。
如下示例线程 A、B、C 三个线程,通过信号量控制,实现顺序运行,按照A、B、C 顺序打印。
import _thread
import utime
semphore_A = _thread.allocate_semphore(1)
semphore_B = _thread.allocate_semphore(1)
semphore_C = _thread.allocate_semphore(1)
# 线程 C 函数入口,通过信号量控制线程 A 运行。
def thread_entry_C(id):
count = 0
while count < 30:
semphore_B.acquire()
print('this is thread {}.'.format(id))
utime.sleep_ms(100)
semphore_C.release()
count += 1
# 线程 B 函数入口,通过信号量控制线程 C 运行。
def thread_entry_B(id):
count = 0
while count < 30:
semphore_A.acquire()
print('this is thread {}.'.format(id))
utime.sleep_ms(100)
semphore_B.release()
count += 1
# 线程 A 函数入口,等待信号量运行。
def thread_entry_A(id):
count = 0
while count < 30:
semphore_C.acquire()
print('this is thread {}.'.format(id))
utime.sleep_ms(100)
semphore_A.release()
count += 1
# 清空 A、B 信号量。保证只能线程 A 先运行,及多余信号量干扰。
semphore_A.acquire()
semphore_B.acquire()
# 创建线程A、B、C。
_thread.start_new_thread(thread_entry_A, ('A',))
_thread.start_new_thread(thread_entry_B, ('B',))
_thread.start_new_thread(thread_entry_C, ('C',))
生产者-消费者模式
生产者-消费者模式一般用于将生产数据的一方和消费数据的一方分割开来,将生产数据与消费数据的过程解耦开来。
在多线程开发中,如果生产者生产数据的速度很快,而消费者消费数据的速度很慢,那么生产者就必须等待消费者消费完了数据才能够继续生产数据,因为生产那么多也无法处理,所以为了达到生产者和消费者生产数据和消费数据之间的平衡,那么就需要一个缓冲区用来存储生产者生产的数据,所以就引入了生产者-消费者模式。
如下示例线程生产者每隔 3 秒发送给消费者信息、消费者等待生产者消息进行打印处理。
# 该示例线程 B 通过将消息压栈,线程 A 专门读取消息队列的内容进行处理。
import _thread
import utime
from queue import Queue
q = Queue()
# 线程生产者函数入口,输出信息供给消费者。
def thread_producer(id):
data = 'Hello QuecPython!'
while True:
q.put(data)
print('thread {} send: {}.'.format(id, data))
utime.sleep(3)
# 线程消费者函数入口,处理生产者信息。
def thread_consumer(id):
while True:
data = q.get()
print('thread {} recv: {}.'.format(id, data))
# 创建线程producer、consumer。
_thread.start_new_thread(thread_producer, ('producer',))
_thread.start_new_thread(thread_consumer, ('consumer',))
中断处理流程
在多线程处理中,对于提供的中断/回调接口是其他线程的操作,在使用时需要注意不要在中断/回调中做延时任务,延时任务会导致后续中断/回调阻塞,从而导致中断/回调触发接口异常。对于在接收中断/回调后处理过程中需要耗时操作,一般是通过线程间通信方式,交给其其他独立线程进行任务处理。
如下示例实操对于串口数据中断触发,通过消息队列控制另一个接口进行数据读取及处理,防止对中断/回调接口阻塞丢包等问题。
# 该示例串口触发中断/回调,防止读取串口及串口数据处理延时,通过消息队列控制专门线程对串口数读取及处理。
import _thread
from machine import UART
from queue import Queue
# 串口数据读取线程函数入口,通过消息队列触发消息读取。
def thread_entry():
while True:
len = uart_msg_q.get()
data = UART2.read(len)
print('uart read len {}, recv: {}.'.format(len, data))
def callback(para):
print("uart call para:{}".format(para))
if(0 == para[0]):
uart_msg_q.put(para[2])
uart_msg_q = Queue()
UART2 = UART(UART.UART2, 115200, 8, 0, 1, 0)
UART2.set_callback(callback)
# 创建线程
_thread.start_new_thread(thread_entry, ())
API 说明
线程栈大小设置及查询
线程栈大小设置及查询,方便根据自己业务动态调整栈大小
import _thread
import utime
# 线程函数入口,实现每隔一秒进行一次打印。
def thread_func_entry(name):
while True:
print( 'thread {} id is {}.'.format(name, thread_id))
utime.sleep(1)
#查询当前协议栈大小
thread_size_old = _thread.stack_size()
print('current thread_size_old {}'.format(thread_size_old))
#设置线程栈大小为 10k
_thread.stack_size(10*1024)
# 创建线程
thread_id = _thread.start_new_thread(thread_func_entry, ('QuecPython'))
#还原线程栈大小
_thread.stack_size(thread_size_old)
线程创建及删除
线程创建及删除接口,方便通过该接口建立并行任务或者中断某个任务执行。删除某个任务指中断某个正常执行的任务,对于自动完成的任务,直接退出即可。
通过 thread.stopthread(thread_id) 暴力关闭线程,释放线程资源,需要注意对应线程是否有锁、内存申请等需要用户释放的相关操作,防止导致死锁或者内存泄漏情况。
import _thread
import utime
# 线程函数入口,实现每隔一秒进行一次打印。
def thread_func_entry(name):
while True:
print( 'thread {} id is {}.'.format(name, thread_id))
utime.sleep(1)
# 创建线程
thread_id = _thread.start_new_thread(thread_func_entry, ('QuecPython'))
utime.sleep(10)
#删除线程
_thread.thread_stop(thread_id)
互斥锁 - API 说明
互斥锁是为了解决在多线程访问共享资源时,多个线程同时对共享资源操作产生的冲突而提出的一种解决方法。
如下示例介绍互斥锁创建、加锁、解锁、删除锁的使用过程。
# 该示例线程 B 一定条件下通过互斥锁控制线程 A 运行,达到线程间通信目的。
import _thread
import utime
# 创建线程锁
lock = _thread.allocate_lock()
count = 1
# 线程 B 函数入口,通过锁控制,防止同时操作操作全局变量count。
def thread_entry_B(id):
global count
while True:
# 获取线程锁,加锁。
lock.acquire()
print( 'thread {} count {}.'.format(id, count))
count += 1
utime.sleep(1)
# 释放线程锁,解锁。
lock.release()
# 线程 A 函数入口,通过锁控制,防止同时操作操作全局变量count。
def thread_entry_A(id):
global count
while True:
# 获取线程锁,加锁。
lock.acquire()
print('thread {} count {}.'.format(id, count))
count += 1
# 释放线程锁,解锁。
lock.release()
# 创建线程
thread_id_A = _thread.start_new_thread(thread_entry_A, ('A',))
thread_id_B = _thread.start_new_thread(thread_entry_B, ('B',))
utime.sleep(10)
# 删除锁
_thread.thread_stop(thread_id_A)
_thread.thread_stop(thread_id_B)
_thread.delete_lock(lock)
信号量 - API 说明
信号量是操作系统用来解决并发中的互斥和同步问题的一种方法,是进化版的互斥锁。
如下示例介绍信号量创建、释放信号量、消耗信号量、删除信号量的使用过程。
# 该示例线程 B 一定条件下通过信号量控制线程 A 运行,达到线程间通信目的。
import _thread
import utime
semphore = _thread.allocate_semphore(1)
# 线程 B 函数入口,通过信号量控制线程 A 运行。
def thread_entry_B(id):
while True:
print('this is thread {}.'.format(id))
utime.sleep(1)
# 释放信号量
semphore.release()
# 线程 A 函数入口,等待信号量运行。
def thread_entry_A(id):
while True:
# 消耗信号量
semphore.acquire()
print('this is thread {}.'.format(id))
# 创建线程
_thread.start_new_thread(thread_entry_A, ('A',))
_thread.start_new_thread(thread_entry_B, ('B',))
# 删除信号量
_thread.thread_stop(thread_id_A)
_thread.thread_stop(thread_id_B)
_thread.delete_semphore(semphore)