MQTT通讯模块
📋 模块概述
MQTT(Message Queuing Telemetry Transport)通讯模块是H07设备与云端服务器通信的核心,基于Eclipse Paho MQTT Android库实现。该模块提供可靠的消息传输、自动重连、状态管理等功能,广泛应用于设备状态上报、远程指令接收、数据实时同步等场景。
核心类:
android.znhaas.util.MqttClient(现代化MQTT客户端,推荐)android.znhaas.util.MqttClientHelper(传统MQTT客户端,兼容)
协议版本: MQTT 3.1.1
QoS等级: 0 (最多一次), 1 (至少一次), 2 (恰好一次)
🎯 主要功能
- ✅ 连接管理 (连接/断开/自动重连)
- ✅ 消息发布 (设备数据上报)
- ✅ 主题订阅 (远程指令接收)
- ✅ 状态观察 (LiveData/状态流)
- ✅ QoS质量保证
- ✅ 消息保留 (Retained Message)
- ✅ 订阅恢复 (重连后自动恢复订阅)
- ✅ 协程支持 (Kotlin Coroutines)
📚 应用场景
1. 设备状态上报
// GPS位置上报
val locationData = JSONObject().apply {
put("deviceId", deviceId)
put("latitude", location.latitude)
put("longitude", location.longitude)
put("timestamp", System.currentTimeMillis())
}
mqttClient.publish(
topic = "device/$deviceId/location",
payload = locationData.toString(),
qos = 1 // 至少送达一次
)
常见上报类型:
- 📍 GPS位置数据 (每10秒)
- 🔋 电量状态 (每分钟或低电量时)
- 🌡️ 环境数据 (温度、湿度等)
- ⚠️ 报警事件 (跌倒、脱帽、SOS等)
- 📹 录制状态 (开始/停止录制)
- 📶 网络状态 (WiFi/4G信号强度)
2. 远程指令接收
// 订阅指令主题
mqttClient.subscribe("cmd/$deviceId", qos = 1)
// 监听消息
lifecycleScope.launch {
mqttClient.receivedMessages.observe(lifecycleOwner) { message ->
when (message.topic) {
"cmd/$deviceId" -> handleCommand(message.payload)
}
}
}
常见远程指令:
- 🎥 开始/停止录制 (指令码: 8001)
- 📸 拍照 (指令码: 8003)
- 📹 开始/停止推流 (指令码: 9527/9528)
- 🔊 TTS语音播报 (指令码: 8006)
- 🔄 OTA固件升级 (指令码: 8007)
- ⚙️ 配置更新 (指令码: 8002)
- 📋 查询配置 (指令码: 8008)
3. 设备配置同步
// 上报当前配置
val configData = JSONObject().apply {
put("deviceId", deviceId)
put("recordQuality", "1080p")
put("uploadInterval", 10)
put("language", "zh")
}
mqttClient.publish(
topic = "config/$deviceId",
payload = configData.toString(),
qos = 1,
retained = true // 保留消息,新订阅者立即收到最新配置
)
📚 核心API
1. 获取MQTT客户端实例
// 获取单例实例
val mqttClient = MqttClient.getInstance(context)
2. 连接MQTT服务器
// 创建连接配置
val config = MqttConfig(
serverUri = "tcp://mqtt.example.com:1883", // 服务器地址
clientId = "H07_$deviceId", // 客户端ID(唯一)
username = "device_user", // 用户名
password = "device_pass", // 密码
cleanSession = true, // 清除会话
keepAliveInterval = 60, // 心跳间隔(秒)
connectionTimeout = 30, // 连接超时(秒)
automaticReconnect = true // 自动重连
)
// 连接服务器(协程)
lifecycleScope.launch {
mqttClient.connect(config)
.onSuccess {
Log.d(TAG, "MQTT连接成功")
}
.onFailure { e ->
Log.e(TAG, "MQTT连接失败: ${e.message}")
}
}
连接参数说明:
| 参数 | 说明 | 默认值 | 建议值 |
|---|---|---|---|
serverUri | 服务器地址 | 无 | tcp://host:1883 或 ssl://host:8883 |
clientId | 客户端唯一标识 | 无 | H07_${设备ID} |
username | MQTT用户名 | "" | 根据服务器配置 |
password | MQTT密码 | "" | 根据服务器配置 |
cleanSession | 是否清除会话 | true | true (每次全新连接) |
keepAliveInterval | 心跳间隔(秒) | 60 | 60-120 |
connectionTimeout | 连接超时(秒) | 30 | 30 |
automaticReconnect | 自动重连 | true | true (网络波动时自动恢复) |
3. 订阅主题
// 订阅单个主题
lifecycleScope.launch {
mqttClient.subscribe(
topic = "cmd/$deviceId",
qos = 1 // 0=最多一次, 1=至少一次, 2=恰好一次
)
}
// 订阅多个主题
lifecycleScope.launch {
mqttClient.subscribe("cmd/$deviceId", qos = 1)
mqttClient.subscribe("config/$deviceId", qos = 1)
mqttClient.subscribe("broadcast/all", qos = 0)
}
// 获取已订阅的主题
val topics = mqttClient.getSubscribedTopics()
Log.d(TAG, "已订阅主题: $topics")
主题通配符:
+: 单层通配符 (device/+/status匹配device/001/status,device/002/status)#: 多层通配符 (device/#匹配device/001/status,device/001/location等所有子主题)
4. 发布消息
// 发布普通消息
lifecycleScope.launch {
mqttClient.publish(
topic = "device/$deviceId/status",
payload = "online",
qos = 1,
retained = false
).onSuccess {
Log.d(TAG, "消息发布成功")
}.onFailure { e ->
Log.e(TAG, "消息发布失败: ${e.message}")
}
}
// 发布JSON数据
val data = JSONObject().apply {
put("deviceId", deviceId)
put("battery", 85)
put("signal", -65)
put("timestamp", System.currentTimeMillis())
}
lifecycleScope.launch {
mqttClient.publish(
topic = "device/$deviceId/telemetry",
payload = data.toString(),
qos = 1
)
}
// 发布保留消息(Retained)
lifecycleScope.launch {
mqttClient.publish(
topic = "device/$deviceId/lastWill",
payload = "offline",
qos = 1,
retained = true // 服务器保留最后一条消息
)
}
QoS等级选择建议:
| QoS | 说明 | 使用场景 | 网络开销 |
|---|---|---|---|
| 0 | 最多一次(可能丢失) | 实时数据(GPS),允许丢失 | 最低 |
| 1 | 至少一次(可能重复) | 重要数据(报警、状态) | 中等 (推荐) |
| 2 | 恰好一次(最可靠) | 关键指令(OTA升级) | 最高 |
5. 接收消息
// 使用LiveData观察消息
lifecycleScope.launch {
mqttClient.receivedMessages.observe(lifecycleOwner) { message ->
Log.d(TAG, "收到消息:")
Log.d(TAG, " 主题: ${message.topic}")
Log.d(TAG, " 内容: ${message.payload}")
Log.d(TAG, " QoS: ${message.qos}")
Log.d(TAG, " 保留: ${message.retained}")
// 解析JSON消息
try {
val json = JSONObject(message.payload)
val command = json.getString("cmd")
handleCommand(command, json)
} catch (e: JSONException) {
Log.e(TAG, "JSON解析失败", e)
}
}
}
6. 监听连接状态
// 观察连接状态
lifecycleScope.launch {
mqttClient.connectionState.observe(lifecycleOwner) { state ->
when (state) {
is MqttState.Disconnected -> {
Log.d(TAG, "MQTT未连接")
updateUI("❌ 未连接")
}
is MqttState.Connecting -> {
Log.d(TAG, "MQTT连接中...")
updateUI("🔄 连接中...")
}
is MqttState.Connected -> {
Log.d(TAG, "MQTT已连接")
updateUI("✅ 已连接")
}
is MqttState.Error -> {
Log.e(TAG, "MQTT错误: ${state.message}")
updateUI("⚠️ 错误")
}
}
}
}
// 检查当前连接状态
val isConnected = mqttClient.isConnected()
val currentState = mqttClient.getCurrentState()
7. 断开连接
// 手动断开连接
lifecycleScope.launch {
mqttClient.disconnect()
Log.d(TAG, "MQTT已断开")
}
8. 取消订阅
// 取消订阅主题
lifecycleScope.launch {
mqttClient.unsubscribe("cmd/$deviceId")
}
9. 清理资源
// Activity/Service销毁时清理
override fun onDestroy() {
super.onDestroy()
mqttClient.destroy()
}
📚 完整使用示例
示例1: 设备启动时的MQTT初始化
class MainActivity : AppCompatActivity() {
private val TAG = "MainActivity"
private lateinit var mqttClient: MqttClient
private lateinit var deviceId: String
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
// 获取设备ID
deviceId = getDeviceId()
// 初始化MQTT客户端
mqttClient = MqttClient.getInstance(this)
// 监听连接状态
setupMqttStateObserver()
// 监听消息
setupMqttMessageObserver()
// 连接MQTT服务器
connectToMqtt()
}
/**
* 连接MQTT服务器
*/
private fun connectToMqtt() {
// 从SharedPreferences读取配置
val prefs = getSharedPreferences("mqtt_config", Context.MODE_PRIVATE)
val serverUri = prefs.getString("server_uri", "tcp://mqtt.example.com:1883")!!
val username = prefs.getString("username", "")!!
val password = prefs.getString("password", "")!!
val config = MqttConfig(
serverUri = serverUri,
clientId = "H07_$deviceId",
username = username,
password = password,
cleanSession = true,
keepAliveInterval = 60,
connectionTimeout = 30,
automaticReconnect = true
)
lifecycleScope.launch {
mqttClient.connect(config)
.onSuccess {
Log.d(TAG, "MQTT连接成功")
// 连接成功后订阅主题
subscribeMqttTopics()
}
.onFailure { e ->
Log.e(TAG, "MQTT连接失败: ${e.message}", e)
Toast.makeText(this@MainActivity, "MQTT连接失败", Toast.LENGTH_SHORT).show()
}
}
}
/**
* 订阅MQTT主题
*/
private suspend fun subscribeMqttTopics() {
// 订阅指令主题
mqttClient.subscribe("cmd/$deviceId", qos = 1)
// 订阅配置主题
mqttClient.subscribe("config/$deviceId", qos = 1)
// 订阅广播主题
mqttClient.subscribe("broadcast/all", qos = 0)
Log.d(TAG, "已订阅所有主题")
}
/**
* 监听MQTT连接状态
*/
private fun setupMqttStateObserver() {
mqttClient.connectionState.observe(this) { state ->
when (state) {
is MqttState.Connected -> {
statusText.text = "✅ MQTT已连接"
statusText.setTextColor(Color.GREEN)
}
is MqttState.Connecting -> {
statusText.text = "🔄 MQTT连接中..."
statusText.setTextColor(Color.YELLOW)
}
is MqttState.Disconnected -> {
statusText.text = "❌ MQTT未连接"
statusText.setTextColor(Color.GRAY)
}
is MqttState.Error -> {
statusText.text = "⚠️ MQTT错误"
statusText.setTextColor(Color.RED)
Log.e(TAG, "MQTT错误: ${state.message}")
}
}
}
}
/**
* 监听MQTT消息
*/
private fun setupMqttMessageObserver() {
mqttClient.receivedMessages.observe(this) { message ->
Log.d(TAG, "收到MQTT消息: ${message.topic}")
when {
message.topic == "cmd/$deviceId" -> {
// 处理指令
handleCommand(message.payload)
}
message.topic == "config/$deviceId" -> {
// 处理配置更新
handleConfigUpdate(message.payload)
}
message.topic == "broadcast/all" -> {
// 处理广播消息
handleBroadcast(message.payload)
}
}
}
}
/**
* 处理远程指令
*/
private fun handleCommand(payload: String) {
try {
val json = JSONObject(payload)
val cmd = json.getString("cmd")
when (cmd) {
"8001" -> {
// 开始录制
val action = json.getString("action")
if (action == "start") {
recordingService?.startRecording()
sendCommandAck(cmd, "success")
} else {
recordingService?.stopRecording()
sendCommandAck(cmd, "success")
}
}
"8003" -> {
// 拍照
recordingService?.takePhoto()
sendCommandAck(cmd, "success")
}
"9527" -> {
// 开始推流
val serverUrl = json.getString("webrtc_server")
val token = json.getString("token")
startLiveStream(serverUrl, token)
sendCommandAck(cmd, "success")
}
"8006" -> {
// TTS语音播报
val text = json.getString("text")
ttsManager.speak(text)
sendCommandAck(cmd, "success")
}
else -> {
Log.w(TAG, "未知指令: $cmd")
sendCommandAck(cmd, "unknown_command")
}
}
} catch (e: Exception) {
Log.e(TAG, "处理指令失败", e)
}
}
/**
* 发送指令响应
*/
private fun sendCommandAck(cmd: String, status: String) {
val ackData = JSONObject().apply {
put("deviceId", deviceId)
put("cmd", cmd)
put("status", status)
put("timestamp", System.currentTimeMillis())
}
lifecycleScope.launch {
mqttClient.publish(
topic = "ack/$deviceId",
payload = ackData.toString(),
qos = 1
)
}
}
override fun onDestroy() {
super.onDestroy()
mqttClient.destroy()
}
}
示例2: 定时上报GPS位置
class LocationReportManager(
private val context: Context,
private val deviceId: String
) {
private val TAG = "LocationReport"
private val mqttClient = MqttClient.getInstance(context)
private val locationService = NativeLocationService.getInstance(context)
private var reportTimer: Timer? = null
private val reportInterval = 10000L // 10秒上报一次
/**
* 开始定时上报位置
*/
fun startLocationReport() {
stopLocationReport() // 先停止旧的定时器
reportTimer = Timer().apply {
scheduleAtFixedRate(object : TimerTask() {
override fun run() {
reportLocation()
}
}, 0, reportInterval)
}
Log.d(TAG, "位置上报已启动,间隔: ${reportInterval}ms")
}
/**
* 停止定时上报
*/
fun stopLocationReport() {
reportTimer?.cancel()
reportTimer = null
Log.d(TAG, "位置上报已停止")
}
/**
* 上报位置数据
*/
private fun reportLocation() {
// 获取当前位置
val location = locationService.getLastLocation()
if (location.latitude == 0.0 || location.longitude == 0.0) {
Log.w(TAG, "位置无效,跳过上报")
return
}
// 构造位置数据
val locationData = JSONObject().apply {
put("deviceId", deviceId)
put("latitude", location.latitude)
put("longitude", location.longitude)
put("altitude", location.altitude)
put("accuracy", location.accuracy)
put("provider", location.provider)
put("speed", location.speed)
put("bearing", location.bearing)
put("timestamp", System.currentTimeMillis())
}
// 发布到MQTT
CoroutineScope(Dispatchers.IO).launch {
mqttClient.publish(
topic = "device/$deviceId/location",
payload = locationData.toString(),
qos = 0 // GPS数据允许丢失,使用QoS 0降低网络开销
).onSuccess {
Log.d(TAG, "位置上报成功: ${location.latitude}, ${location.longitude}")
}.onFailure { e ->
Log.e(TAG, "位置上报失败: ${e.message}")
}
}
}
}
示例3: 报警事件上报
class AlarmReporter(
private val context: Context,
private val deviceId: String
) {
private val TAG = "AlarmReporter"
private val mqttClient = MqttClient.getInstance(context)
private val soundPlayer = SoundPlayer.getInstance(context)
/**
* 上报跌倒报警
*/
fun reportFallAlarm() {
Log.e(TAG, "检测到跌倒!")
// 播放报警音
soundPlayer.playWavFileAsync(SoundPlayer.FALL_WAV)
// 获取当前位置
val location = NativeLocationService.getInstance(context).getLastLocation()
// 构造报警数据
val alarmData = JSONObject().apply {
put("deviceId", deviceId)
put("alarmType", "FALL_ALARM")
put("alarmCode", "1002")
put("latitude", location.latitude)
put("longitude", location.longitude)
put("timestamp", System.currentTimeMillis())
put("severity", "HIGH")
}
// 发布到MQTT (使用QoS 2确保送达)
CoroutineScope(Dispatchers.IO).launch {
mqttClient.publish(
topic = "alarm/$deviceId",
payload = alarmData.toString(),
qos = 2 // 报警消息必须送达
).onSuccess {
Log.d(TAG, "跌倒报警上报成功")
}.onFailure { e ->
Log.e(TAG, "跌倒报警上报失败: ${e.message}")
// 失败时重试
retryReportAlarm(alarmData.toString())
}
}
}
/**
* 重试上报报警
*/
private suspend fun retryReportAlarm(payload: String, maxRetries: Int = 3) {
repeat(maxRetries) { attempt ->
delay(1000 * (attempt + 1)) // 指数退避
mqttClient.publish(
topic = "alarm/$deviceId",
payload = payload,
qos = 2
).onSuccess {
Log.d(TAG, "报警重试上报成功 (第${attempt + 1}次)")
return
}
}
Log.e(TAG, "报警上报失败,已重试$maxRetries次")
}
}
🔧 注意事项
1. 客户端ID唯一性
问题: 多个设备使用相同的ClientId会导致相互踢下线
解决方案:
// 使用设备唯一ID作为ClientId
val deviceId = Settings.Secure.getString(
context.contentResolver,
Settings.Secure.ANDROID_ID
)
val clientId = "H07_$deviceId" // 确保全局唯一
2. 自动重连机制
// 配置自动重连
val config = MqttConfig(
automaticReconnect = true, // 启用自动重连
keepAliveInterval = 60 // 心跳间隔
)
// 监听连接丢失
mqttClient.connectionState.observe(lifecycleOwner) { state ->
if (state is MqttState.Disconnected) {
// 连接丢失会自动重连,无需手动处理
Log.w(TAG, "MQTT连接丢失,自动重连中...")
}
}
3. 订阅恢复
特性: 重连后自动恢复所有订阅
// 初次连接时订阅
mqttClient.subscribe("cmd/$deviceId", qos = 1)
mqttClient.subscribe("config/$deviceId", qos = 1)
// 网络断开后重连,会自动重新订阅上述主题,无需手动处理
4. 网络要求
- ✅ 支持WiFi和4G/5G网络
- ✅ 最低带宽: 10KB/s
- ✅ 建议延迟: 小于200ms
- ✅ TCP端口: 1883 (明文) / 8883 (SSL/TLS)
5. SSL/TLS加密连接
// 使用SSL/TLS加密连接
val config = MqttConfig(
serverUri = "ssl://mqtt.example.com:8883", // 使用ssl://
// ... 其他配置
)
// 可选: 配置自定义证书
val socketFactory = createSSLSocketFactory()
options.socketFactory = socketFactory
6. 权限要求
<!-- AndroidManifest.xml -->
<uses-permission android:name="android.permission.INTERNET" />
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
<uses-permission android:name="android.permission.WAKE_LOCK" />
7. 线程安全
- ✅ 所有MQTT操作在IO线程执行
- ✅ 使用Kotlin Coroutines保证线程安全
- ✅ LiveData在主线程回调
8. 消息大小限制
- MQTT协议限制: 单条消息最大256MB
- 实际建议: 小于10KB (避免网络超时)
- 大数据传输: 建议使用HTTP/FTP上传,MQTT只传URL
📖 相关资源
源码位置
- MQTTClient:
app/src/main/java/android/znhaas/util/MqttClient.kt - MQTTClientHelper:
app/src/main/java/android/znhaas/util/MqttClientHelper.kt - MQTTConfigHandler:
app/src/main/java/android/znhaas/util/MqttConfigHandler.kt
依赖库
// app/build.gradle.kts
dependencies {
// Eclipse Paho MQTT Android
implementation("org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5")
implementation("org.eclipse.paho:org.eclipse.paho.android.service:1.1.1")
}
相关模块
扩展阅读
最后更新: 2025-01-19
文档版本: v2.0
基于代码版本: H07 Android App (main分支)