using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
using MQTTnet.Server;
using Newtonsoft.Json;
using Rhea.Common;
using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Security.Policy;
using System.Text;
using System.Threading.Tasks;
namespace Tiger.IBusiness.Utility
{
///
/// EMQX 帮助类
///
public class EMQX : IDisposable
{
public EMQX(string address, string username, string password)
{
Address = address;
Username = username;
Password = password;
}
public EMQX(string address, int port, string username, string password)
{
Address = address;
Port = port;
Username = username;
Password = password;
}
public string Address { get; set; }
public int Port { get; set; } = 1883;
public string ClientId { get; set; } = $"{Environment.MachineName}-Client-{Guid.NewGuid():N}";
public string Username { get; set; }
public string Password { get; set; }
public int ApiPort { get; set; } = 18083;
public string ApiKey { get; set; }
public string ApiSecretKey { get; set; }
public string ApiCredentials => Convert.ToBase64String(Encoding.ASCII.GetBytes($"{ApiKey}:{ApiSecretKey}"));
///
/// MQTT客户端
///
public IMqttClient Client = new MqttFactory().CreateMqttClient();
private List TopicList = new();
public Result Connect(string clientId = null)
{
Result result = new();
if (!Address.IsNullOrEmpty())
{
ClientId = clientId.IsNullOrEmpty(ClientId);
var options = new MqttClientOptionsBuilder()
.WithTcpServer(Address, Port) // MQTT broker address and port
.WithCredentials(Username, Password) // Set username and password
.WithClientId(ClientId)
.WithCleanSession()
.Build();
var connectResult = Client.ConnectAsync(options).Result;
if (connectResult.ResultCode == MqttClientConnectResultCode.Success)
{
result.Flag = Result.Flags.Success;
result.Data = Client;
result.Message = "Connect to MQTT server success.";
}
else
{
result.Flag = Result.Flags.Failed;
result.Data = null;
result.Message = $"Failed to connect to MQTT server: {connectResult.ResultCode}";
}
}
else
{
result.Flag = Result.Flags.Failed;
result.Data = null;
result.Message = $"Failed to connect to MQTT server: no server address.";
}
return result;
}
public void Disconnect()
{
foreach (var topic in TopicList)
{
Client.UnsubscribeAsync(topic).Wait();
}
Client.DisconnectAsync().Wait();
}
public void Dispose()
{
Disconnect();
}
public bool Subscribe(string topic, Action onReceive, MqttQualityOfServiceLevel QoS = MqttQualityOfServiceLevel.AtMostOnce)
{
if (TopicList.Contains(topic))
{
return true;
}
else
{
try
{
if (!Client.IsConnected)
{
Client.ReconnectAsync().Wait();
}
// Subscribe to a topic
Client.SubscribeAsync(topic, QoS).Wait();
TopicList.Add(topic);
// Callback function when a message is received
Client.ApplicationMessageReceivedAsync += e =>
{
onReceive.Invoke(e);
return Task.CompletedTask;
};
return true;
}
catch (System.Exception ex)
{
return false;
}
}
}
public void Unsubscribe(string topic)
{
// Unsubscribe
Client.UnsubscribeAsync(topic).Wait();
TopicList.Remove(topic);
}
///
/// 发送一个MQ消息
///
/// 消息
/// 消息的Topic,移动端根据这个Topic来接收消息
public async Task SendAsync(string topic, string msg, MqttQualityOfServiceLevel QoS)
{
bool isFinish = false;
int doTimes = 0;
do
{
doTimes++;
try
{
if (!Client.IsConnected)
{
await Client.ReconnectAsync();
}
// Publish a message
await Client.PublishAsync(new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(msg)
.WithQualityOfServiceLevel(QoS)
.WithRetainFlag()
.Build());
isFinish = true;
}
catch (System.Exception ex)
{
//Logger.Console.Fatal(ex, $"Send MQTT Message[{sessionId}] Exception(Text: {msg})");
}
} while (!isFinish && doTimes < 3);
return isFinish;
}
///
/// 发送一个内容是ApiAction的MQ消息,以ApiAction的ID作为消息的Topic
///
///
///
///
public ApiAction Send(ApiAction action, MqttQualityOfServiceLevel QoS = MqttQualityOfServiceLevel.AtLeastOnce)
{
MQTTMessage message = new MQTTMessage();
message.IsVoice = !action.IsSuccessed;
message.IsSuccessed = action.IsSuccessed;
message.Content = action.Message;
message.Color = action.IsSuccessed ? "#878787" : "#880000";
SendAsync(JsonConvert.SerializeObject(message), action.ID, QoS).Wait();
return action;
}
public bool Send(string topic, Result result, string msgTitle, MqttQualityOfServiceLevel QoS = MqttQualityOfServiceLevel.AtLeastOnce)
{
try
{
MQTTMessage message = new MQTTMessage();
message.IsVoice = !result.IsSuccessed;
message.IsSuccessed = result.IsSuccessed;
message.Content = msgTitle + ":" + result.Message;
message.Color = result.IsSuccessed ? "#878787" : "#880000";
return SendAsync(topic, JsonConvert.SerializeObject(message), QoS).Result;
}
catch (System.Exception ex)
{
return false;
}
}
public bool Send(string topic, MQTTMessage msg, MqttQualityOfServiceLevel QoS = MqttQualityOfServiceLevel.AtLeastOnce)
{
try
{
return SendAsync(topic, JsonConvert.SerializeObject(msg), QoS).Result;
}
catch (System.Exception ex)
{
return false;
}
}
public HttpResponseMessage GetTopicList(string topic = "")
{
//var content = new StringContent(json);
//content.Headers.ContentType = new MediaTypeHeaderValue("application/json");
//var client = new HttpClient();
//var response = client.PostAsync(url, content).Result;
ApiKey = "ee26b0dd4af7e749";
ApiSecretKey = "XHOUC9BQSoa0j0jRINQEkQanIR1nzDQUY8GnYtaktEQJ";
string credentials = Convert.ToBase64String(Encoding.ASCII.GetBytes($"{ApiKey}:{ApiSecretKey}"));
var client = new HttpClient();
client.DefaultRequestHeaders.Add("Content-Type", "application/json");
client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Basic", ApiCredentials);// 添加基本认证头
string url = $"http://{Address}:{ApiPort}/api/v5/topics/";
var response = client.GetAsync(url).Result;
return response;
}
public bool DeleteTopic(string sessionId)
{
int doTimes = 0;
bool isFinish = false;
do
{
doTimes++;
try
{
isFinish = true;
}
catch (Exception ex)
{
//Logger.Console.Fatal(ex, $"Delete MQTT Topic[{sessionId}] Exception");
}
} while (!isFinish && doTimes < 3);
return isFinish;
}
}
}