Add Mqtt item

This commit is contained in:
Carl Philipp Klemm 2026-04-12 18:06:19 +02:00
parent 0fd50eb227
commit eb60f85604
19 changed files with 901 additions and 48 deletions

View file

@ -10,6 +10,7 @@
#include "auxitem.h"
#include "poweritem.h"
#include "rgbitem.h"
#include "mqttitem.h"
#include <QJsonArray>
@ -338,6 +339,8 @@ std::shared_ptr<Item> Item::loadItem(const QJsonObject& json)
newItem = std::shared_ptr<PowerItem>(new PowerItem);
else if(json["Type"].toString("") == "Rgb")
newItem = std::shared_ptr<RgbItem>(new RgbItem);
else if(json["Type"].toString("") == "Mqtt")
newItem = std::shared_ptr<MqttItem>(new MqttItem);
else
qWarning()<<"Unable to load unkown item type: "<<json["Type"].toString();
if(newItem)

164
src/items/mqttitem.cpp Normal file
View file

@ -0,0 +1,164 @@
#include "mqttitem.h"
#include <QJsonObject>
#include <QJsonDocument>
#include <QtMqtt/QMqttClient>
#include "mqttclient.h"
MqttItem::MqttItem(QString name, uint8_t value, QObject *parent)
: Item(0, name, value, parent),
topic_(""),
valueKey_("state"),
valueOn_("ON"),
valueOff_("OFF")
{
hashId();
std::shared_ptr<MqttClient> workClient = client.lock();
assert(workClient);
connect(workClient->getClient().get(), &QMqttClient::stateChanged, this, &MqttItem::onClientStateChanged);
}
MqttItem::~MqttItem()
{
qDebug()<<__func__;
std::shared_ptr<MqttClient> workClient = client.lock();
if(!workClient || topic_.isEmpty() || !subscription)
return;
workClient->unsubscribe(subscription);
}
void MqttItem::onClientStateChanged(QMqttClient::ClientState state)
{
if(state == QMqttClient::Connected)
refreshSubscription();
}
void MqttItem::refreshSubscription()
{
std::shared_ptr<MqttClient> workClient = client.lock();
if(!workClient || topic_.isEmpty())
return;
if(workClient->getClient()->state() != QMqttClient::Connected)
return;
if(subscription)
{
disconnect(subscription->subscription, &QMqttSubscription::messageReceived, this, &MqttItem::onMessageReceived);
workClient->unsubscribe(subscription);
}
subscription = workClient->subscribe(workClient->getBaseTopic() + "/" + getTopic());
connect(subscription->subscription, &QMqttSubscription::messageReceived, this, &MqttItem::onMessageReceived);
}
void MqttItem::onMessageReceived(const QMqttMessage& message)
{
QJsonDocument doc = QJsonDocument::fromJson(message.payload());
if(doc.isObject())
{
QJsonObject obj = doc.object();
if(obj.contains(getValueKey()))
{
QString value = obj[getValueKey()].toString();
ItemUpdateRequest req = createValueUpdateRequest(ITEM_UPDATE_BACKEND);
req.changes.value = true;
if(value == getValueOn())
req.payload.setValueData(true);
else
req.payload.setValueData(false);
requestUpdate(req);
}
}
}
void MqttItem::hashId()
{
QString hashString = topic_ + "/" + valueKey_;
itemId_ = qHash(hashString.toLatin1());
}
void MqttItem::setTopic(const QString& topic)
{
topic_ = topic;
hashId();
refreshSubscription();
}
void MqttItem::setValueKey(const QString& valueKey)
{
valueKey_ = valueKey;
hashId();
}
void MqttItem::setValueOn(const QString& valueOn)
{
valueOn_ = valueOn;
}
void MqttItem::setValueOff(const QString& valueOff)
{
valueOff_ = valueOff;
}
QString MqttItem::getTopic() const
{
return topic_;
}
QString MqttItem::getValueKey() const
{
return valueKey_;
}
QString MqttItem::getValueOn() const
{
return valueOn_;
}
QString MqttItem::getValueOff() const
{
return valueOff_;
}
void MqttItem::store(QJsonObject& json)
{
Item::store(json);
json["Type"] = "Mqtt";
json["Topic"] = topic_;
json["ValueKey"] = valueKey_;
json["ValueOn"] = valueOn_;
json["ValueOff"] = valueOff_;
}
void MqttItem::load(const QJsonObject& json, const bool preserve)
{
Item::load(json, preserve);
topic_ = json["Topic"].toString();
valueKey_ = json["ValueKey"].toString("state");
valueOn_ = json["ValueOn"].toString("ON");
valueOff_ = json["ValueOff"].toString("OFF");
hashId();
refreshSubscription();
}
void MqttItem::enactValue(uint8_t value)
{
std::shared_ptr<MqttClient> workClient = client.lock();
if(!workClient || topic_.isEmpty())
return;
QString fullTopic = workClient->getBaseTopic() + "/" + topic_ + "/set";
QJsonObject payload;
payload[valueKey_] = value ? valueOn_ : valueOff_;
QJsonDocument doc(payload);
QByteArray data = doc.toJson(QJsonDocument::Compact);
qDebug() << "MqttItem publishing to" << fullTopic << ":" << data;
workClient->getClient()->publish(fullTopic, data);
}

53
src/items/mqttitem.h Normal file
View file

@ -0,0 +1,53 @@
#ifndef MQTTITEM_H
#define MQTTITEM_H
#include "item.h"
#include "mqttclient.h"
class QString;
class MqttItem : public Item
{
Q_OBJECT
public:
inline static std::weak_ptr<MqttClient> client;
private:
QString topic_;
QString valueKey_;
QString valueOn_;
QString valueOff_;
MqttClient::Subscription* subscription = nullptr;
void hashId();
void refreshSubscription();
void onMessageReceived(const QMqttMessage& message);
void onClientStateChanged(QMqttClient::ClientState state);
public:
explicit MqttItem(QString name = "MqttItem",
uint8_t value = 0,
QObject *parent = nullptr);
virtual ~MqttItem() override;
void setTopic(const QString& topic);
void setValueKey(const QString& valueKey);
void setBaseTopic(const QString& baseTopic);
void setValueOn(const QString& valueOn);
void setValueOff(const QString& valueOff);
QString getTopic() const;
QString getValueKey() const;
QString getBaseTopic() const;
QString getValueOn() const;
QString getValueOff() const;
virtual void store(QJsonObject& json) override;
virtual void load(const QJsonObject& json, const bool preserve = false) override;
protected:
virtual void enactValue(uint8_t value) override;
};
#endif // MQTTITEM_H

View file

@ -0,0 +1,205 @@
#include "mqttitemsource.h"
#include <QJsonArray>
#include <QJsonDocument>
#include <QJsonObject>
MqttItemSource::MqttItemSource(QObject *parent)
: ItemSource(parent)
{
}
MqttItemSource::~MqttItemSource()
{
}
void MqttItemSource::start(const QJsonObject& settings)
{
baseTopicName_ = settings["BaseTopic"].toString("zigbee2mqtt");
connect(&client_, &QMqttClient::stateChanged, this, &MqttItemSource::onClientStateChanged);
connect(&client_, &QMqttClient::errorChanged, this, &MqttItemSource::onClientError);
client_.setHostname(settings["Host"].toString("127.0.0.1"));
client_.setPort(settings["Port"].toInt(1883));
if(settings.contains("User"))
client_.setUsername(settings["User"].toString());
if(settings.contains("Password"))
client_.setPassword(settings["Password"].toString());
client_.setProtocolVersion(QMqttClient::MQTT_5_0);
client_.connectToHost();
QJsonArray itemsArray = settings["Items"].toArray();
for(QJsonValueRef itemRef : itemsArray)
{
QJsonObject itemObject = itemRef.toObject();
if(!itemObject.contains("Topic"))
continue;
MqttItemConfig config;
config.topic = itemObject["Topic"].toString();
config.name = itemObject.contains("Name") ? itemObject["Name"].toString() : config.topic;
config.valueKey = itemObject["ValueKey"].toString("state");
config.valueOn = itemObject["ValueOn"].toString("ON");
config.valueOff = itemObject["ValueOff"].toString("OFF");
// Determine value type
QString valueTypeStr = itemObject["ValueType"].toString("bool");
if(valueTypeStr == "uint")
config.valueType = ITEM_VALUE_UINT;
else
config.valueType = ITEM_VALUE_BOOL;
config.id = qHash(baseTopicName_ + "/" + config.topic);
// Create the item
config.item = std::make_shared<MqttItem>(config.id, config.name, 0);
config.item->setTopic(config.topic);
config.item->setValueKey(config.valueKey);
config.item->setBaseTopic(baseTopicName_);
config.item->setValueOn(config.valueOn);
config.item->setValueOff(config.valueOff);
config.item->setMqttClient(&client_);
items_.push_back(config);
}
}
MqttItemSource::MqttItemConfig* MqttItemSource::findConfig(const QString& topic)
{
for(MqttItemConfig& config : items_)
{
if(baseTopicName_ + "/" + config.topic == topic)
return &config;
}
return nullptr;
}
void MqttItemSource::onClientError(QMqttClient::ClientError error)
{
qWarning() << "MqttItemSource Client error:" << error;
}
void MqttItemSource::onClientStateChanged(QMqttClient::ClientState state)
{
if(state == QMqttClient::ClientState::Connected)
{
qInfo() << "MqttItemSource connected to MQTT broker at " << client_.hostname() << client_.port();
for(MqttItemConfig& config : items_)
{
// Subscribe to state topic to receive updates from devices
QString stateTopic = baseTopicName_ + "/" + config.topic;
qDebug() << "MqttItemSource subscribing to" << stateTopic;
QMqttSubscription* subscription = client_.subscribe(stateTopic);
if(subscription)
{
connect(subscription, &QMqttSubscription::messageReceived, this, &MqttItemSource::onMessageReceived);
}
}
}
else if(state == QMqttClient::ClientState::Disconnected)
{
qWarning() << "MqttItemSource lost connection to MQTT broker";
}
else if(state == QMqttClient::ClientState::Connecting)
{
qInfo() << "MqttItemSource connecting to MQTT broker at " << client_.hostname() << client_.port();
}
}
void MqttItemSource::onMessageReceived(const QMqttMessage& message)
{
MqttItemConfig* config = findConfig(message.topic().name());
if(!config)
return;
QJsonDocument doc = QJsonDocument::fromJson(message.payload());
if(doc.isObject())
{
QJsonObject obj = doc.object();
QString valueKey = config->valueKey;
if(obj.contains(valueKey))
{
uint8_t newValue = 0;
// Handle different value types
if(config->valueType == ITEM_VALUE_UINT)
{
// Numeric value (brightness, etc.)
newValue = obj[valueKey].toInt(0);
}
else
{
// Binary value (state on/off, etc.)
QString value = obj[valueKey].toString();
if(value == config->valueOn || value == "ON" || value == "true")
newValue = 1;
else if(value == config->valueOff || value == "OFF" || value == "false")
newValue = 0;
}
// Only update if value changed
if(config->item->getValue() != newValue)
{
ItemUpdateRequest update;
update.type = ITEM_UPDATE_BACKEND;
update.payload = *config->item;
update.payload.setValueData(newValue);
update.changes.value = true;
config->item->requestUpdate(update);
}
}
}
}
void MqttItemSource::refresh()
{
std::vector<ItemAddRequest> requests;
for(MqttItemConfig& config : items_)
{
ItemAddRequest request;
request.type = ITEM_UPDATE_BACKEND;
request.payload = config.item;
request.changes = ItemFieldChanges(true);
requests.push_back(request);
}
gotItems(requests);
}
void MqttItemSource::store(QJsonObject& json)
{
json["Host"] = client_.hostname();
json["Port"] = client_.port();
json["BaseTopic"] = baseTopicName_;
if(client_.username() != "")
json["User"] = client_.username();
if(client_.password() != "")
json["Password"] = client_.password();
QJsonArray itemsArray;
for(const MqttItemConfig& config : items_)
{
QJsonObject itemObject;
itemObject["Name"] = config.name;
itemObject["Topic"] = config.topic;
itemObject["ValueKey"] = config.valueKey;
itemObject["ValueOn"] = config.valueOn;
itemObject["ValueOff"] = config.valueOff;
if(config.valueType == ITEM_VALUE_UINT)
itemObject["ValueType"] = "uint";
else
itemObject["ValueType"] = "bool";
itemsArray.append(itemObject);
}
json["Items"] = itemsArray;
}
QMqttClient* MqttItemSource::getClient()
{
return &client_;
}

View file

@ -0,0 +1,34 @@
#ifndef MQTTITEMSOURCE_H
#define MQTTITEMSOURCE_H
#include <QObject>
#include <QJsonObject>
#include <QtMqtt/QMqttClient>
#include <vector>
#include <memory>
#include "itemsource.h"
#include "mqttitem.h"
class MqttItemSource : public ItemSource
{
Q_OBJECT
QString baseTopicName_;
QMqttClient client_;
private slots:
void onClientStateChanged(QMqttClient::ClientState state);
void onMessageReceived(const QMqttMessage& message);
void onClientError(QMqttClient::ClientError error);
public:
explicit MqttItemSource(QObject *parent = nullptr);
virtual ~MqttItemSource() override;
virtual void refresh() override;
void start(const QJsonObject& settings);
void store(QJsonObject& json);
QMqttClient* getClient();
};
#endif // MQTTITEMSOURCE_H