# 订阅主题(在一个终端窗口)
mosquitto_sub -h localhost -t "test/topic" -v
# 发布消息(在另一个终端窗口)
mosquitto_pub -h localhost -t "test/topic" -m "Hello MQTT"
mosquitto_pub -h localhost -t "test/topic" -m "auth test" -u username -P password
systemctl status mosquitto # 对于使用systemd的系统
/etc/init.d/mosquitto status # 对于使用init.d的系统
netstat -tulnp | grep 1883 # 默认非加密端口
netstat -tulnp | grep 8883 # 常用SSL/TLS端口
使用mqtt-benchmark进行压力测试:
# 安装Node.js后
npm install -g mqtt-benchmark
# 运行基准测试
mqtt-benchmark --broker mqtt://localhost --count 1000 --size 200 --qos 2 --clients 10
# 使用mosquitto客户端测试SSL连接
mosquitto_sub -h localhost -p 8883 -t "test/topic" --cafile /path/to/ca.crt
import paho.mqtt.client as mqtt
def on_connect(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
client.subscribe("test/topic")
def on_message(client, userdata, msg):
print(msg.topic+" "+str(msg.payload))
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect("localhost", 1883, 60)
client.publish("test/topic", "Hello from Python")
client.loop_forever()
基本连接测试
功能测试
性能测试
安全测试
稳定性测试
连接被拒绝
认证失败
消息未收到
性能问题
通过以上方法,您可以全面测试Linux环境下搭建的MQTT服务的功能、性能和稳定性。