using Microsoft.Extensions.Configuration;
|
using MQTTnet;
|
using MQTTnet.Client;
|
using MQTTnet.Protocol;
|
using MQTTnet.Server;
|
using Newtonsoft.Json;
|
using Rhea.Common;
|
using System;
|
using System.Collections.Generic;
|
using System.Net;
|
using System.Net.Http;
|
using System.Net.Http.Headers;
|
using System.Security.Policy;
|
using System.Text;
|
using System.Threading.Tasks;
|
|
namespace Tiger.IBusiness.Utility
|
{
|
/// <summary>
|
/// MQTTHelper
|
/// </summary>
|
public static class MQTTHelper
|
{
|
#region Variables
|
private static IConfiguration Setting = ApiConfig.Configuration;
|
#endregion
|
|
#region Propertys
|
|
#endregion
|
|
#region Functions
|
public static void Start()
|
{
|
try
|
{
|
if (Setting["MQTT:Enable"].ToBoolean() == true)
|
{
|
|
ConsoleExt.WriteLine("Start MQTT Service..........", ConsoleColor.Yellow);
|
Logger.Default.Info("Start MQTT Service");
|
}
|
}
|
catch (System.Exception ex)
|
{
|
Logger.Default.Fatal(ex, "Start MQTT Service Exception");
|
}
|
}
|
|
public static void Stop()
|
{
|
try
|
{
|
|
Logger.Console.Info("Stop MQTT Service");
|
}
|
catch (System.Exception ex)
|
{
|
Logger.Console.Fatal(ex, "Stop MQTT Service Exception");
|
}
|
}
|
|
|
#endregion
|
}
|
/// <summary>
|
/// EMQX 帮助类
|
/// </summary>
|
public class EMQX : IDisposable
|
{
|
public EMQX(string address, string username, string password)
|
{
|
Address = address;
|
Username = username;
|
Password = password;
|
}
|
|
public EMQX(string address, int port, string username, string password)
|
{
|
Address = address;
|
Port = port;
|
Username = username;
|
Password = password;
|
}
|
|
#region Variables
|
private List<string> TopicList = new();
|
#endregion
|
|
#region Propertys
|
public string Id { get; set; } = Guid.NewGuid().ToString("N");
|
public string Tag { get; set; } = "ClientMonitor";
|
public string Name { get; set; } = "ClientMonitor";
|
public bool IsRunning { get; set; }
|
public IMqttClient Client { get; set; } = new MqttFactory().CreateMqttClient();//MQTT客户端
|
public string Address { get; set; }
|
public int Port { get; set; } = 1883;
|
public string ClientId { get; set; } = $"{Environment.MachineName}-ApiClient-{Guid.NewGuid():N}";
|
public string Username { get; set; }
|
public string Password { get; set; }
|
public int ApiPort { get; set; } = 18083;
|
public string ApiKey { get; set; }
|
public string ApiSecretKey { get; set; }
|
public string ApiCredentials => Convert.ToBase64String(Encoding.ASCII.GetBytes($"{ApiKey}:{ApiSecretKey}"));
|
//遗嘱消息
|
public string WillMsgTopic { get; set; }
|
public MqttQualityOfServiceLevel WillMsgQos { get; set; }
|
public bool WillMsgRetain { get; set; }
|
public string WillMsgPayload { get; set; }
|
public int? WillMsgDelay { get; set; }
|
public int? WillMsgExpiry { get; set; }
|
#endregion
|
|
|
public Result<IMqttClient> Connect(string clientId = null)
|
{
|
Result<IMqttClient> result = new();
|
if (!Address.IsNullOrEmpty())
|
{
|
ClientId = clientId.IsNullOrEmpty(ClientId);
|
var options = new MqttClientOptionsBuilder()
|
.WithTcpServer(Address, Port) // MQTT broker address and port
|
.WithCredentials(Username, Password) // Set username and password
|
.WithClientId(ClientId)
|
.WithCleanSession()
|
.Build();
|
var connectResult = Client.ConnectAsync(options).Result;
|
if (connectResult.ResultCode == MqttClientConnectResultCode.Success)
|
{
|
result.Flag = Result.Flags.Success;
|
result.Data = Client;
|
result.Message = "Connect to MQTT server success.";
|
}
|
else
|
{
|
result.Flag = Result.Flags.Failed;
|
result.Data = null;
|
result.Message = $"Failed to connect to MQTT server: {connectResult.ResultCode}";
|
}
|
}
|
else
|
{
|
result.Flag = Result.Flags.Failed;
|
result.Data = null;
|
result.Message = $"Failed to connect to MQTT server: no server address.";
|
}
|
return result;
|
}
|
|
public void Disconnect()
|
{
|
foreach (var topic in TopicList)
|
{
|
Client.UnsubscribeAsync(topic).Wait();
|
}
|
Client.DisconnectAsync().Wait();
|
}
|
|
public void Dispose()
|
{
|
Disconnect();
|
}
|
|
public bool Subscribe(string topic, Action<MqttApplicationMessageReceivedEventArgs> onReceive, MqttQualityOfServiceLevel QoS = MqttQualityOfServiceLevel.AtMostOnce)
|
{
|
if (TopicList.Contains(topic))
|
{
|
return true;
|
}
|
else
|
{
|
try
|
{
|
if (!Client.IsConnected)
|
{
|
Client.ReconnectAsync().Wait();
|
}
|
|
// Subscribe to a topic
|
Client.SubscribeAsync(topic, QoS).Wait();
|
TopicList.Add(topic);
|
|
// Callback function when a message is received
|
Client.ApplicationMessageReceivedAsync += e =>
|
{
|
onReceive.Invoke(e);
|
return Task.CompletedTask;
|
};
|
|
return true;
|
}
|
catch (System.Exception ex)
|
{
|
return false;
|
}
|
}
|
}
|
|
public void Unsubscribe(string topic)
|
{
|
// Unsubscribe
|
Client.UnsubscribeAsync(topic).Wait();
|
TopicList.Remove(topic);
|
}
|
|
/// <summary>
|
/// 发送一个MQ消息
|
/// </summary>
|
/// <param name="msg">消息</param>
|
/// <param name="topic">消息的Topic,移动端根据这个Topic来接收消息<param>
|
public async Task<bool> SendAsync(string topic, string msg, MqttQualityOfServiceLevel QoS)
|
{
|
bool isFinish = false;
|
int doTimes = 0;
|
do
|
{
|
doTimes++;
|
try
|
{
|
if (!Client.IsConnected)
|
{
|
await Client.ReconnectAsync();
|
}
|
|
// Publish a message
|
await Client.PublishAsync(new MqttApplicationMessageBuilder()
|
.WithTopic(topic)
|
.WithPayload(msg)
|
.WithQualityOfServiceLevel(QoS)
|
.WithRetainFlag()
|
.Build());
|
|
isFinish = true;
|
}
|
catch (System.Exception ex)
|
{
|
//Logger.Console.Fatal(ex, $"Send MQTT Message[{sessionId}] Exception(Text: {msg})");
|
}
|
} while (!isFinish && doTimes < 3);
|
return isFinish;
|
}
|
|
/// <summary>
|
/// 发送一个内容是ApiAction的MQ消息,以ApiAction的ID作为消息的Topic
|
/// </summary>
|
/// <typeparam name="T"></typeparam>
|
/// <param name="action"></param>
|
/// <returns></returns>
|
public ApiAction<T> Send<T>(ApiAction<T> action, MqttQualityOfServiceLevel QoS = MqttQualityOfServiceLevel.AtLeastOnce)
|
{
|
MQTTMessage message = new MQTTMessage();
|
message.IsVoice = !action.IsSuccessed;
|
message.IsSuccessed = action.IsSuccessed;
|
message.Content = action.Message;
|
message.Color = action.IsSuccessed ? "#878787" : "#880000";
|
SendAsync(JsonConvert.SerializeObject(message), action.ID, QoS).Wait();
|
return action;
|
}
|
|
public bool Send<T>(string topic, Result<T> result, string msgTitle, MqttQualityOfServiceLevel QoS = MqttQualityOfServiceLevel.AtLeastOnce)
|
{
|
try
|
{
|
MQTTMessage message = new MQTTMessage();
|
message.IsVoice = !result.IsSuccessed;
|
message.IsSuccessed = result.IsSuccessed;
|
message.Content = msgTitle + ":" + result.Message;
|
message.Color = result.IsSuccessed ? "#878787" : "#880000";
|
return SendAsync(topic, JsonConvert.SerializeObject(message), QoS).Result;
|
}
|
catch (System.Exception ex)
|
{
|
return false;
|
}
|
}
|
|
public bool Send(string topic, MQTTMessage msg, MqttQualityOfServiceLevel QoS = MqttQualityOfServiceLevel.AtLeastOnce)
|
{
|
try
|
{
|
return SendAsync(topic, JsonConvert.SerializeObject(msg), QoS).Result;
|
}
|
catch (System.Exception ex)
|
{
|
return false;
|
}
|
}
|
|
public HttpResponseMessage GetTopicList(string topic = "")
|
{
|
//var content = new StringContent(json);
|
//content.Headers.ContentType = new MediaTypeHeaderValue("application/json");
|
|
//var client = new HttpClient();
|
//var response = client.PostAsync(url, content).Result;
|
|
ApiKey = "ee26b0dd4af7e749";
|
ApiSecretKey = "XHOUC9BQSoa0j0jRINQEkQanIR1nzDQUY8GnYtaktEQJ";
|
string credentials = Convert.ToBase64String(Encoding.ASCII.GetBytes($"{ApiKey}:{ApiSecretKey}"));
|
|
var client = new HttpClient();
|
client.DefaultRequestHeaders.Add("Content-Type", "application/json");
|
client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Basic", ApiCredentials);// 添加基本认证头
|
|
string url = $"http://{Address}:{ApiPort}/api/v5/topics/";
|
var response = client.GetAsync(url).Result;
|
return response;
|
}
|
|
public bool DeleteTopic(string sessionId)
|
{
|
int doTimes = 0;
|
bool isFinish = false;
|
do
|
{
|
doTimes++;
|
try
|
{
|
|
|
isFinish = true;
|
}
|
catch (Exception ex)
|
{
|
//Logger.Console.Fatal(ex, $"Delete MQTT Topic[{sessionId}] Exception");
|
}
|
} while (!isFinish && doTimes < 3);
|
return isFinish;
|
}
|
}
|
|
}
|