using MQTTnet; using MQTTnet.Client; using MQTTnet.Protocol; using MQTTnet.Server; using Newtonsoft.Json; using Rhea.Common; using System; using System.Collections.Generic; using System.Net; using System.Net.Http; using System.Net.Http.Headers; using System.Security.Policy; using System.Text; using System.Threading.Tasks; namespace Tiger.IBusiness.Utility { /// /// EMQX 帮助类 /// public class EMQX : IDisposable { public EMQX(string address, string username, string password) { Address = address; Username = username; Password = password; } public EMQX(string address, int port, string username, string password) { Address = address; Port = port; Username = username; Password = password; } public string Address { get; set; } public int Port { get; set; } = 1883; public string ClientId { get; set; } = $"{Environment.MachineName}-Client-{Guid.NewGuid():N}"; public string Username { get; set; } public string Password { get; set; } public int ApiPort { get; set; } = 18083; public string ApiKey { get; set; } public string ApiSecretKey { get; set; } public string ApiCredentials => Convert.ToBase64String(Encoding.ASCII.GetBytes($"{ApiKey}:{ApiSecretKey}")); /// /// MQTT客户端 /// public IMqttClient Client = new MqttFactory().CreateMqttClient(); private List TopicList = new(); public Result Connect(string clientId = null) { Result result = new(); if (!Address.IsNullOrEmpty()) { ClientId = clientId.IsNullOrEmpty(ClientId); var options = new MqttClientOptionsBuilder() .WithTcpServer(Address, Port) // MQTT broker address and port .WithCredentials(Username, Password) // Set username and password .WithClientId(ClientId) .WithCleanSession() .Build(); var connectResult = Client.ConnectAsync(options).Result; if (connectResult.ResultCode == MqttClientConnectResultCode.Success) { result.Flag = Result.Flags.Success; result.Data = Client; result.Message = "Connect to MQTT server success."; } else { result.Flag = Result.Flags.Failed; result.Data = null; result.Message = $"Failed to connect to MQTT server: {connectResult.ResultCode}"; } } else { result.Flag = Result.Flags.Failed; result.Data = null; result.Message = $"Failed to connect to MQTT server: no server address."; } return result; } public void Disconnect() { foreach (var topic in TopicList) { Client.UnsubscribeAsync(topic).Wait(); } Client.DisconnectAsync().Wait(); } public void Dispose() { Disconnect(); } public bool Subscribe(string topic, Action onReceive, MqttQualityOfServiceLevel QoS = MqttQualityOfServiceLevel.AtMostOnce) { if (TopicList.Contains(topic)) { return true; } else { try { if (!Client.IsConnected) { Client.ReconnectAsync().Wait(); } // Subscribe to a topic Client.SubscribeAsync(topic, QoS).Wait(); TopicList.Add(topic); // Callback function when a message is received Client.ApplicationMessageReceivedAsync += e => { onReceive.Invoke(e); return Task.CompletedTask; }; return true; } catch (System.Exception ex) { return false; } } } public void Unsubscribe(string topic) { // Unsubscribe Client.UnsubscribeAsync(topic).Wait(); TopicList.Remove(topic); } /// /// 发送一个MQ消息 /// /// 消息 /// 消息的Topic,移动端根据这个Topic来接收消息 public async Task SendAsync(string topic, string msg, MqttQualityOfServiceLevel QoS) { bool isFinish = false; int doTimes = 0; do { doTimes++; try { if (!Client.IsConnected) { await Client.ReconnectAsync(); } // Publish a message await Client.PublishAsync(new MqttApplicationMessageBuilder() .WithTopic(topic) .WithPayload(msg) .WithQualityOfServiceLevel(QoS) .WithRetainFlag() .Build()); isFinish = true; } catch (System.Exception ex) { //Logger.Console.Fatal(ex, $"Send MQTT Message[{sessionId}] Exception(Text: {msg})"); } } while (!isFinish && doTimes < 3); return isFinish; } /// /// 发送一个内容是ApiAction的MQ消息,以ApiAction的ID作为消息的Topic /// /// /// /// public ApiAction Send(ApiAction action, MqttQualityOfServiceLevel QoS = MqttQualityOfServiceLevel.AtLeastOnce) { MQTTMessage message = new MQTTMessage(); message.IsVoice = !action.IsSuccessed; message.IsSuccessed = action.IsSuccessed; message.Content = action.Message; message.Color = action.IsSuccessed ? "#878787" : "#880000"; SendAsync(JsonConvert.SerializeObject(message), action.ID, QoS).Wait(); return action; } public bool Send(string topic, Result result, string msgTitle, MqttQualityOfServiceLevel QoS = MqttQualityOfServiceLevel.AtLeastOnce) { try { MQTTMessage message = new MQTTMessage(); message.IsVoice = !result.IsSuccessed; message.IsSuccessed = result.IsSuccessed; message.Content = msgTitle + ":" + result.Message; message.Color = result.IsSuccessed ? "#878787" : "#880000"; return SendAsync(topic, JsonConvert.SerializeObject(message), QoS).Result; } catch (System.Exception ex) { return false; } } public bool Send(string topic, MQTTMessage msg, MqttQualityOfServiceLevel QoS = MqttQualityOfServiceLevel.AtLeastOnce) { try { return SendAsync(topic, JsonConvert.SerializeObject(msg), QoS).Result; } catch (System.Exception ex) { return false; } } public HttpResponseMessage GetTopicList(string topic = "") { //var content = new StringContent(json); //content.Headers.ContentType = new MediaTypeHeaderValue("application/json"); //var client = new HttpClient(); //var response = client.PostAsync(url, content).Result; ApiKey = "ee26b0dd4af7e749"; ApiSecretKey = "XHOUC9BQSoa0j0jRINQEkQanIR1nzDQUY8GnYtaktEQJ"; string credentials = Convert.ToBase64String(Encoding.ASCII.GetBytes($"{ApiKey}:{ApiSecretKey}")); var client = new HttpClient(); client.DefaultRequestHeaders.Add("Content-Type", "application/json"); client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Basic", ApiCredentials);// 添加基本认证头 string url = $"http://{Address}:{ApiPort}/api/v5/topics/"; var response = client.GetAsync(url).Result; return response; } public bool DeleteTopic(string sessionId) { int doTimes = 0; bool isFinish = false; do { doTimes++; try { isFinish = true; } catch (Exception ex) { //Logger.Console.Fatal(ex, $"Delete MQTT Topic[{sessionId}] Exception"); } } while (!isFinish && doTimes < 3); return isFinish; } } }