官方公布了umqtt库的源码,昨天查了一下,原因其实很简单,就是官方的stop_thread并不会释放线程内的线程锁,导致这个锁一直锁在wait_msg上
self.mqttmsglock.acquire() #umqtt.py 644行
ret = super().wait_msg()
self.mqttmsglock.release()
我在使用这个方法的时候,为了实现断开连接的功能用的是
def run(self): #注意client就是实例化的MqttClient类
if self.get_connect_state() == 0:
print("Mqtt connect server")
self.connect()
if self.thread_id == None:
print("Mqtt listen run")
self.listen() #listen 启动了线程
def stop(self):
if self.thread_id != None:
print("stop listen") #发现问题了吗?结束线程之后并不会让wait_msg释放锁
_thread.stop_thread(self.thread_id)
self.thread_id = None
if self.get_connect_state() == 1:
try:
print("mqtt close connect")
self.connect_state = 0
self.client.close()
return True
except Exception as e:
print("disconnect failed:", e)
return False
def listen_thread_worker(self):#这是waitmsg线程,线程栈按照文档建议给的16*1024
while True:
try:
if (self.connect_state != 0):
self.client.wait_msg()
else:
utime.sleep_ms(10)
continue
except Exception as e:
self.put_error(error.ListenError())
if(self.connect_state != 0):
self.client.close()
self.connect()
注意上面的代码我是直接用的stop_thread,但是很不幸,这不会让wait_msg把锁放开,锁一直锁着,那后边的线程自然也没法调用super.wait_msg。所以剑走偏锋,直接先mqttclient.close()等wait_msg报错自己退出去不就好了,但是,我们的移远芯片会因为调用已经关闭的资源而概率性重启芯片,何等天才的设计啊!既不提供exception也不给错误信息,直接重启!遥想我刚从事开发还因为这种事情被领导疯狂批评为屎山代码。你这样做wait_msg必定会至少有一次调用已经关闭的资源,不过幸好,移远没有在官方库里使用大量的私有属性,这为我们解决这个问题提供了契机。
所以最终得咋办呢,你得在外面手动开锁,设想一下如果umqtt的这个mqttmsglock是个私有属性,哈哈哈
def stop(self):
if self.thread_id != None: #stop listen thread
print("stop listen")
try:
_thread.stop_thread(self.thread_id)
except Exception as e:
print("Error stop thread failed:", e)
try:
if self.client.mqttmsglock.locked():
self.client.mqttmsglock.release()
except Exception as e:
print("Error release lock failed:", e)
self.thread_id = None
self.client.close()
这样就不会倒是wait_msg调用已经关闭的资源了,而且释放掉了锁,得益于socket.close的特性也把网络资源释放掉了,虽不优雅但有用