小天管理 发表于 2024年8月29日 发表于 2024年8月29日 我遇到的问题: 2024-08-29 07:13:10 error go-iot/mqtt_service.go:40 mqtt connection lost id = TT_5 , error = pingresp not received, disconnecting 2024-08-29 07:13:14 error go-iot/mqtt_service.go:40 mqtt connection lost id = TT_88 , error = pingresp not received, disconnecting 2024-08-29 07:13:15 error go-iot/mqtt_service.go:40 mqtt connection lost id = TT_43 , error = pingresp not received, disconnecting 2024-08-29 07:13:15 error go-iot/mqtt_service.go:40 mqtt connection lost id = TT_72 , error = pingresp not received, disconnecting 2024-08-29 07:13:15 error go-iot/mqtt_service.go:40 mqtt connection lost id = TT_1 , error = pingresp not received, disconnecting 2024-08-29 07:13:17 error go-iot/mqtt_service.go:40 mqtt connection lost id = TT_37 , error = pingresp not received, disconnecting 2024-08-29 07:13:18 error go-iot/mqtt_service.go:40 mqtt connection lost id = TT_10 , error = pingresp not received, disconnecting 2024-08-29 07:14:13 error go-iot/mqtt_service.go:40 mqtt connection lost id = TT_52 , error = pingresp not received, disconnecting 2024-08-29 07:14:18 error go-iot/mqtt_service.go:40 mqtt connection lost id = TT_59 , error = pingresp not received, disconnecting 2024-08-29 07:14:19 error go-iot/mqtt_service.go:40 mqtt connection lost id = TT_84 , error = pingresp not received, disconnecting 2024-08-29 07:14:19 error go-iot/mqtt_service.go:40 mqtt connection lost id = TT_54 , error = pingresp not received, disconnecting 2024-08-29 07:14:21 error go-iot/mqtt_service.go:40 mqtt connection lost id = TT_22 , error = pingresp not received, disconnecting 2024-08-29 07:14:22 error go-iot/mqtt_service.go:40 mqtt connection lost id = TT_12 , error = pingresp not received, disconnecting 2024-08-29 07:14:23 error go-iot/mqtt_service.go:40 mqtt connection lost id = TT_25 , error = pingresp not received, disconnecting 2024-08-29 07:14:24 error go-iot/mqtt_service.go:40 mqtt connection lost id = TT_97 , error = pingresp not received, disconnecting 2024-08-29 07:14:26 error go-iot/mqtt_service.go:40 mqtt connection lost id = TT_36 , error = pingresp not received, disconnecting 2024-08-29 07:15:08 error go-iot/mqtt_service.go:40 mqtt connection lost id = TT_63 , error = pingresp not received, disconnecting 2024-08-29 07:15:16 error go-iot/mqtt_service.go:40 mqtt connection lost id = TT_23 , error = pingresp not received, disconnecting 2024-08-29 07:15:19 error go-iot/mqtt_service.go:40 mqtt connection lost id = TT_96 , error = pingresp not received, disconnecting 2024-08-29 07:15:20 error go-iot/mqtt_service.go:40 mqtt connection lost id = TT_50 , error = pingresp not received, disconnecting 2024-08-29 07:15:25 error go-iot/mqtt_service.go:40 mqtt connection lost id = TT_75 , error = pingresp not received, disconnecting 2024-08-29 07:15:30 error go-iot/mqtt_service.go:40 mqtt connection lost id = TT_78 , error = pingresp not received, disconnecting 2024-08-29 07:15:36 error go-iot/mqtt_service.go:40 mqtt connection lost id = TT_7 , error = pingresp not received, disconnecting 2024-08-29 07:15:39 error go-iot/mqtt_service.go:40 mqtt connection lost id = TT_97 , error = pingresp not received, disconnecting 2024-08-29 07:16:17 error go-iot/mqtt_service.go:40 mqtt connection lost id = TT_79 , error = pingresp not received, disconnecting 这是我正在使用的程序代码 package main import ( "encoding/json" "fmt" mqtt "github.com/eclipse/paho.mqtt.golang" "go.uber.org/zap" "sync" "time" ) // MqttInterface 定义了 MQTT 客户端的基本接口 type MqttInterface struct { client mqtt.Client Id string Chan chan []byte Config MqttConfig wg sync.WaitGroup } // NewMqttClient 初始化并返回一个新的 MqttInterface 实例 func NewMqttClient(id string, config MqttConfig) *MqttInterface { return &MqttInterface{ Id: id, Chan: make(chan []byte, 1000), Config: config, } } // Connect 连接到 MQTT 服务器 func (m *MqttInterface) Connect(host, username, password string, port int) error { opts := mqtt.NewClientOptions() opts.AddBroker(fmt.Sprintf("tcp://%s:%d", host, port)) opts.SetUsername(username) opts.SetAutoReconnect(false) opts.SetPassword(password) opts.SetClientID(m.Id) //opts.SetDefaultPublishHandler(m.messageHandler) opts.OnConnectionLost = func(client mqtt.Client, err error) { zap.S().Errorf("mqtt connection lost id = %s , error = %+v", m.Id, err) StopMqttClient(m.Id, m.Config) } opts.SetOrderMatters(false) opts.SetKeepAlive(60 * time.Second) // 创建并启动客户端 client := mqtt.NewClient(opts) if token := client.Connect(); token.Wait() && token.Error() != nil { return token.Error() } m.client = client return nil } // messageHandler 处理接收到的消息 func (m *MqttInterface) messageHandler(client mqtt.Client, msg mqtt.Message) { } // Subscribe 订阅一个或多个主题 func (m *MqttInterface) Subscribe(topics string) error { var token = m.client.Subscribe(topics, 0, func(client mqtt.Client, msg mqtt.Message) { m.wg.Add(1) defer func() { m.wg.Done() //zap.S().Errorf("mqtt subscribe id = %s , topic = %s", m.Id, msg.Topic()) }() mqttMsg := MQTTMessage{ MQTTClientID: m.Id, Message: string(msg.Payload()), } jsonData, _ := json.Marshal(mqttMsg) m.Chan <- jsonData }) if token.Wait() && token.Error() != nil { zap.S().Errorf(token.Error().Error()) return token.Error() } return nil } // Publish 向一个主题发布消息 func (m *MqttInterface) Publish(topic string, payload interface{}) { token := m.client.Publish(topic, 0, false, payload) token.Wait() } // Disconnect 断开与 MQTT 服务器的连接 func (m *MqttInterface) Disconnect() { m.client.Disconnect(250) } func (m *MqttInterface) HandlerMsg() { for { c := <-m.Chan PushToQueue("pre_handler", c) } } 创建 MQTT 客户端和开启订阅 client := NewMqttClient(clientId,config) err := client.Connect(broker, username, password, port) if err != nil { zap.S().Errorf("mqtt connect err = %v", err) return false } go client.Subscribe(subTopic) go client.HandlerMsg() 请问这个问题应该如何解决。 我的尝试 我发起了一个 Issues ,我理解是让消息接收后进行异步处理 https://github.com/eclipse/paho.mqtt.golang/issues/686 修改程序如下 var token = m.client.Subscribe(topics, 0, func(client mqtt.Client, msg mqtt.Message) { go func() { mqttMsg := MQTTMessage{ MQTTClientID: m.Id, Message: string(msg.Payload()), } jsonData, _ := json.Marshal(mqttMsg) m.Chan <- jsonData }() }) 上述两个操作均没有得到正常处理。请问应当如何解决这个问题。
已推荐帖子