NOTICE: This forum has been archived (i.e. registration and posting is disabled).

Please go to the new forum at https://forum.kylegabriel.com

Further MQTT integr...
 
Notifications
Clear all

Further MQTT integration

Page 1 / 3

not_5
(@not_5)
Eminent Member
Joined: 2 years ago
Posts: 45
Topic starter  

Kyle, would it be possible to further integrate MQTT into mycodo?

I know there's a MQTT paho input [subscribe]; however, as I was thinking about how to send sensor readings to another application (home assistant) and then receive requests, I thought the best course of action would be to create a MQTT paho output [publish]. I don't mind taking the time to create the custom output to posts sensor readings to an MQTT topic, but as I thought about it further, a lot of the arguments will be duplicative (broker, port, user, password, etc) across the inputs/outputs, so it might make more sense to bake the mqtt broker information into the mycodo settings page. I also think it would be very useful to be able to post MQTT messages with high level logging and the other content similar to email notifications, which are built into mycodo.

I know there are a couple of ways you have current data readily accessible right now (url based and the new API), but I do think MQTT integration would be a worthy effort.

Anyway, let me know your thoughts. Thanks!


Quote
Topic Tags
Kyle Gabriel
(@kylegabriel)
Member Admin
Joined: 6 years ago
Posts: 612
 

There could be built an Input Action system similar to Actions currently for Functions, but these would be actions that are added to Inputs and will trigger every input Period when measurements are acquired. The interface can remain clean with just a dropdown at the end of the Input settings to select whichever Action you want to add, which would then appear as a list of actions and their settings for that particular input. 

Mycodo Developer


ReplyQuote
not_5
(@not_5)
Eminent Member
Joined: 2 years ago
Posts: 45
Topic starter  

Yes, I think that would be the second piece of it. The first part would have the MQTT broker information saved to the influx database to be referenced. I figured the best spot for that information to be inputted would be on the config page. The action would then be to publish a payload to a specific topic.


ReplyQuote
Kyle Gabriel
(@kylegabriel)
Member Admin
Joined: 6 years ago
Posts: 612
 

I'd like to make it initially as configurable as possible to prevent having to refractor it later to add a use case. So, perhaps it would be better to have credentials supplied for each Input Action rather than a global configuration. This would allow measurements to be sent to different servers or with different credentials. A global configuration would prevent this. 

Mycodo Developer


ReplyQuote
not_5
(@not_5)
Eminent Member
Joined: 2 years ago
Posts: 45
Topic starter  

Maybe the answer is still to create a mqtt output then and the arguments passed is the topic and payload. The output could contain all the broker information with the publish function.


ReplyQuote
muehlbucks
(@muehlbucks)
New Member
Joined: 10 months ago
Posts: 3
 

@not_5 Thank you for starting the discussion. I came to the forum looking for info on how to hack something together with conditionals or write a tiny middleware to poll the API.


ReplyQuote
choc_fudge
(@choc_fudge)
Active Member
Joined: 10 months ago
Posts: 8
 

Hey Kyle, thanks for all your work, your system is amazing. I also think it would be great to see an MQTT output module.

Have been trying to implement LED strip pattern control, running on an Arduino ESP32, figured the best way to communicate with the Arduino would be with MQTT. 

Tried building a custom MQTT output by using the dummy example, copying over the relevant info from the MQTT input file, but my python knowledge very is limited.

Below is my attempt (poor) so far. I was wondering if you would mind taking a look? Thanks :)

 

# MQTT OUTPUT TEST
#
# example_dummy_output.py - Example Output module
#

 

from flask_babel import lazy_gettext
from mycodo.outputs.base_output import AbstractOutput

from mycodo.databases.models import Conversion

from mycodo.utils.database import db_retrieve_table_daemon
from mycodo.utils.influx import add_measurements_influxdb
from mycodo.utils.influx import parse_measurement

def constraints_pass_positive_value(mod_input, value):
"""
Check if the user input is acceptable
:param mod_input: SQL object with user-saved Input options
:param value: float or int
:return: tuple: (bool, list of strings)
"""
errors = []
all_passed = True
# Ensure value is positive
if value <= 0:
all_passed = False
errors.append("Must be a positive value")
return all_passed, errors, mod_input

 

# Measurements
measurements_dict = {}

# Output information
OUTPUT_INFORMATION = {

'output_name_unique': 'MQTT_PAHO',
'output_manufacturer': 'Mycodo',
'output_name': 'MQTT Protocol (paho)',
'output_library': 'paho-mqtt',
'measurements_name': 'Variable measurements',
'measurements_dict': measurements_dict,

# Any dependencies required by the output module
'dependencies_module': [
('pip-pypi', 'paho', 'paho-mqtt')],

 

# The interface or interfaces that can be used with this module
# A custom interface can be used.
# Options: SHELL, PYTHON, GPIO, I2C, FTDI, UART
'interfaces': ['Mycodo'],

# Form input options that are enabled or disabled
'options_enabled': [
'measurements_select'
],

 

'options_disabled': [
'interface' # Show the interface (as a disabled input)
],

 

# NOT REQUIRED?
# Should the output controller handle storing whether the output is on or off?
# If this output module should handle determining the output state, choose False
'on_state_internally_handled': False,

# Type of output. Options: "on_off", "pwm", "volume"
'output_types': ['on_off'],

# A message to display at the top of the output options
'message': 'Information about this output.',

 

'measurements_variable_amount': True,
'measurements_use_same_timestamp': False,
'listener': True,

# Custom actions that will appear at the top of the options in the user interface.
# Buttons are required to have a function with the same name that will be executed
# when the button is pressed. Input values will be passed to the button in a
# dictionary. See the function input_button() at the end of this module.

'custom_actions_message': 'This is a message displayed for custom actions.',
'custom_actions': [
{
'id': 'input_value',
'type': 'float',
'name': lazy_gettext('Value Name'),
'phrase': 'A description for this input'
},
{
'id': 'input_button',
'type': 'button',
'name': lazy_gettext('Button Name')
}
],

# Custom options that can be set by the user in the web interface.
'custom_options_message': 'This is a message displayed for custom options.',

 

# ADDED FROM MQTT INPUT

'custom_options': [
{
'id': 'mqtt_hostname',
'type': 'text',
'default_value': 'localhost',
'required': True,
'name': lazy_gettext('Hostname'),
'phrase': lazy_gettext('The hostname of the MQTT server')
},
{
'id': 'mqtt_port',
'type': 'integer',
'default_value': 1883,
'required': True,
'name': lazy_gettext('Port'),
'phrase': lazy_gettext('The port of the MQTT server')
},
{
'id': 'mqtt_keepalive',
'type': 'integer',
'default_value': 60,
'required': True,
'constraints_pass': constraints_pass_positive_value,
'name': lazy_gettext('Keep Alive'),
'phrase': lazy_gettext('Maximum amount of time between received signals. Set to 0 to disable.')
},
{
'id': 'mqtt_clientid',
'type': 'text',
'default_value': 'mycodo_mqtt_client',
'required': True,
'name': lazy_gettext('Client ID'),
'phrase': lazy_gettext('Unique client ID for connecting to the MQTT server')
}
]
}

# ADDED FROM MQTT INPUT

class OutputModule(AbstractOutput):
"""
An output support class that operates an output
"""
def __init__(self, output, testing=False):
super(OutputModule, self).__init__(output, testing=testing, name=__name__)

# ADDED FROM MQTT INPUT
# Initialize custom option variables to None
self.mqtt_hostname = None
self.mqtt_port = None
self.mqtt_channel = None
self.mqtt_keepalive = None
self.mqtt_clientid = None

# ADDED FROM MQTT INPUT

# Set custom option variables to defaults or user-set values
self.setup_custom_options(
OUTPUT_INFORMATION['custom_options'], output)

if not testing:
import paho.mqtt.client as mqtt

# Variable to store whether the output has been successfully set up
self.output_setup = None

# Since on_state_internally_handled is False, we will store the state of the output
self.output_state = False

# Variables set by the user interface
self.gpio_pin = output.pin

#MQTT
self.logger.debug("Client created with ID {}".format(
self.mqtt_clientid))
self.client = mqtt.Client(self.mqtt_clientid)

 

# ADDED FROM MQTT INPUT - NEEDS MODIFICATION FOR OUTPUT

def callbacks_connect(self):
""" Connect the callback functions """
try:
self.logger.debug("Connecting MQTT callback functions")
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
self.client.on_subscribe = self.on_subscribe
self.client.on_disconnect = self.on_disconnect
self.logger.debug("MQTT callback functions connected")
except:
self.logger.error("Unable to connect mqtt callback functions")

def connect(self):
""" Set up the connection to the MQTT Server """
try:
self.client.connect(
self.mqtt_hostname,
port=self.mqtt_port,
keepalive=self.mqtt_keepalive)
self.logger.info("Connected to {} as {}".format(
self.mqtt_hostname, self.mqtt_clientid))
except:
self.logger.error("Could not connect to mqtt host: {}:{}".format(
self.mqtt_hostname, self.mqtt_port))

def subscribe(self):
""" Set up the subscriptions to the proper MQTT channels to listen to """
try:
for channel in self.channels_measurement:
self.client.subscribe(self.channels_measurement[channel].name)
self.logger.debug("Subscribed to MQTT channel '{}'".format(
self.channels_measurement[channel].name))
except:
self.logger.error("Could not subscribe to MQTT channel '{}'".format(
self.mqtt_channel))

def on_connect(self, client, obj, flags, rc):
self.logger.debug("Connected to '{}', rc: {}".format(
self.mqtt_channel, rc))

def on_subscribe(self, client, obj, mid, granted_qos):
self.logger.debug("Subscribing to mqtt topic: {}, {}, {}".format(
self.mqtt_channel, mid, granted_qos))

def on_log(self, mqttc, obj, level, string):
self.logger.info("Log: {}".format(string))

def on_message(self, client, userdata, msg):
datetime_utc = datetime.datetime.utcnow()
self.logger.debug("Message received: Channel: {}, Value: {}".format(
msg.topic, msg.payload.decode()))
measurement = {}
channel = None
for each_channel in self.channels_measurement:
if self.channels_measurement[each_channel].name == msg.topic:
channel = each_channel

if channel is None:
self.logger.error("Could not determine channel for '{}'".format(msg.topic))
return

try:
value = float(msg.payload.decode())
except Exception:
self.logger.exception("Message doesn't represent a float value.")
return

# Original value/unit
measurement[channel] = {}
measurement[channel]['measurement'] = self.channels_measurement[channel].measurement
measurement[channel]['unit'] = self.channels_measurement[channel].unit
measurement[channel]['value'] = value
measurement[channel]['timestamp_utc'] = datetime_utc

self.add_measurement_influxdb(channel, measurement)

def add_measurement_influxdb(self, channel, measurement):
# Convert value/unit is conversion_id present and valid
if self.channels_conversion[channel]:
conversion = db_retrieve_table_daemon(
Conversion,
unique_id=self.channels_measurement[channel].conversion_id)
if conversion:
meas = parse_measurement(
self.channels_conversion[channel],
self.channels_measurement[channel],
measurement,
channel,
measurement[channel],
timestamp=measurement[channel]['timestamp_utc'])

measurement[channel]['measurement'] = meas[channel]['measurement']
measurement[channel]['unit'] = meas[channel]['unit']
measurement[channel]['value'] = meas[channel]['value']

if measurement:
self.logger.debug(
"Adding measurement to influxdb: {}".format(measurement))
add_measurements_influxdb(
self.unique_id,
measurement,
use_same_timestamp=INPUT_INFORMATION['measurements_use_same_timestamp'])

def on_disconnect(self, client, userdata, rc=0):
self.logger.debug("Disconnected result code {}".format(rc))

def stop_input(self):
""" Called when Input is deactivated """
self.running = False
self.client.loop_stop()
self.client.disconnect()

def listener(self):
self.callbacks_connect()
self.connect()
self.subscribe()
self.client.loop_start()

 


ReplyQuote
Kyle Gabriel
(@kylegabriel)
Member Admin
Joined: 6 years ago
Posts: 612
 

I don't mind taking a look, but you have to present it in a way that preserves the formatting. The way you posted it, there are no indentations. Either use the forum <pre> tags (<> code button), or attach it as a text (.txt extension) file.

Mycodo Developer


ReplyQuote
not_5
(@not_5)
Eminent Member
Joined: 2 years ago
Posts: 45
Topic starter  

@kylegabriel I'm going to fork mycodo and take a crack at adding in some MQTT functionality similar to how SMTP is added. I personally think some basic things would be helpful from an IoT perspective to just have built in. The MQTT output is helpful for some use cases, but honestly most things should be publishing to a uniform topic prefix with some changes to the specific ending.

Prefix would be something like: location/mycodo-{UID}/{thing: (input, output, conditional, etc)}/

Then the specific ending could be like: status, measurement, etc.


ReplyQuote
choc_fudge
(@choc_fudge)
Active Member
Joined: 10 months ago
Posts: 8
 

@kylegabriel, Sorry about that, I couldn't find an edit existing post button to fix this up. Thanks for taking a look, when you have time.

@not_5, thanks and look forward to seeing what you come up with.

Yes I imagined the simple output functionality, would be publishing to a list of topics - 1/0 Value or On/Off Value, with an optional time duration. 


ReplyQuote
Kyle Gabriel
(@kylegabriel)
Member Admin
Joined: 6 years ago
Posts: 612
 
Posted by: @choc_fudge

Thanks for taking a look, when you have time.

I don't see an attachment or that you edited your post. There should be an edit button at the bottom of your post when you're logged in. Otherwise, you can just make a new post.

Mycodo Developer


ReplyQuote
choc_fudge
(@choc_fudge)
Active Member
Joined: 10 months ago
Posts: 8
 

When i'm logged in with chrome or firefox, can't see an edit post option at the bottom of existing post (only reply, quote, like, report).

Cant see an attach file button on a new post either, there is insert external link and source code. sorry I may be looking straight over it...

EDIT:edit is now showing on this post, but not the original post.. still no attach file that I can see. I can upload it to dropbox and share the link that way if that helps

This post was modified 10 months ago by choc_fudge

ReplyQuote
Kyle Gabriel
(@kylegabriel)
Member Admin
Joined: 6 years ago
Posts: 612
 
Posted by: @kylegabriel

Either use the forum <pre> tags (<> code button), or attach it as a text (.txt extension) file.

The easiest solution is to just use the <pre>/code button in a reply.

Mycodo Developer


ReplyQuote
choc_fudge
(@choc_fudge)
Active Member
Joined: 10 months ago
Posts: 8
 

mqtt_output_test.txt file attached. Thanks for your patience! :)

edit: wont let me attach a .txt file 

attempting code button below 

# coding=utf-8
#
# example_dummy_output.py - Example Output module
#
# MQTT OUTPUT



from flask_babel import lazy_gettext
from mycodo.outputs.base_output import AbstractOutput

from mycodo.databases.models import Conversion

from mycodo.utils.database import db_retrieve_table_daemon
from mycodo.utils.influx import add_measurements_influxdb
from mycodo.utils.influx import parse_measurement


def constraints_pass_positive_value(mod_input, value):
"""
Check if the user input is acceptable
:param mod_input: SQL object with user-saved Input options
:param value: float or int
:return: tuple: (bool, list of strings)
"""
errors = []
all_passed = True
# Ensure value is positive
if value <= 0:
all_passed = False
errors.append("Must be a positive value")
return all_passed, errors, mod_input



# Measurements
measurements_dict = {}


# Output information
OUTPUT_INFORMATION = {


'output_name_unique': 'MQTT_PAHO',
'output_manufacturer': 'Mycodo',
'output_name': 'MQTT Protocol (paho)',
'output_library': 'paho-mqtt',
'measurements_name': 'Variable measurements',
'measurements_dict': measurements_dict,


# Any dependencies required by the output module
'dependencies_module': [
('pip-pypi', 'paho', 'paho-mqtt')],



# The interface or interfaces that can be used with this module
# A custom interface can be used.
# Options: SHELL, PYTHON, GPIO, I2C, FTDI, UART
'interfaces': ['Mycodo'],


# Form input options that are enabled or disabled
'options_enabled': [
'measurements_select'
],



'options_disabled': [
'interface' # Show the interface (as a disabled input)
],




# NOT REQUIRED?
# Should the output controller handle storing whether the output is on or off?
# If this output module should handle determining the output state, choose False
'on_state_internally_handled': False,

# Type of output. Options: "on_off", "pwm", "volume"
'output_types': ['on_off'],

# A message to display at the top of the output options
'message': 'Information about this output.',



'measurements_variable_amount': True,
'measurements_use_same_timestamp': False,
'listener': True,


# Custom actions that will appear at the top of the options in the user interface.
# Buttons are required to have a function with the same name that will be executed
# when the button is pressed. Input values will be passed to the button in a
# dictionary. See the function input_button() at the end of this module.

'custom_actions_message': 'This is a message displayed for custom actions.',
'custom_actions': [
{
'id': 'input_value',
'type': 'float',
'name': lazy_gettext('Value Name'),
'phrase': 'A description for this input'
},
{
'id': 'input_button',
'type': 'button',
'name': lazy_gettext('Button Name')
}
],

# Custom options that can be set by the user in the web interface.
'custom_options_message': 'This is a message displayed for custom options.',





# ADDED FROM MQTT INPUT

'custom_options': [
{
'id': 'mqtt_hostname',
'type': 'text',
'default_value': 'localhost',
'required': True,
'name': lazy_gettext('Hostname'),
'phrase': lazy_gettext('The hostname of the MQTT server')
},
{
'id': 'mqtt_port',
'type': 'integer',
'default_value': 1883,
'required': True,
'name': lazy_gettext('Port'),
'phrase': lazy_gettext('The port of the MQTT server')
},
{
'id': 'mqtt_keepalive',
'type': 'integer',
'default_value': 60,
'required': True,
'constraints_pass': constraints_pass_positive_value,
'name': lazy_gettext('Keep Alive'),
'phrase': lazy_gettext('Maximum amount of time between received signals. Set to 0 to disable.')
},
{
'id': 'mqtt_clientid',
'type': 'text',
'default_value': 'mycodo_mqtt_client',
'required': True,
'name': lazy_gettext('Client ID'),
'phrase': lazy_gettext('Unique client ID for connecting to the MQTT server')
}
]
}


# ADDED FROM MQTT INPUT

class OutputModule(AbstractOutput):
"""
An output support class that operates an output
"""
def __init__(self, output, testing=False):
super(OutputModule, self).__init__(output, testing=testing, name=__name__)

# ADDED FROM MQTT INPUT
# Initialize custom option variables to None
self.mqtt_hostname = None
self.mqtt_port = None
self.mqtt_channel = None
self.mqtt_keepalive = None
self.mqtt_clientid = None


# ADDED FROM MQTT INPUT

# Set custom option variables to defaults or user-set values
self.setup_custom_options(
OUTPUT_INFORMATION['custom_options'], output)


if not testing:
import paho.mqtt.client as mqtt


# Variable to store whether the output has been successfully set up
self.output_setup = None

# Since on_state_internally_handled is False, we will store the state of the output
self.output_state = False

# Variables set by the user interface
self.gpio_pin = output.pin


#MQTT
self.logger.debug("Client created with ID {}".format(
self.mqtt_clientid))
self.client = mqtt.Client(self.mqtt_clientid)



# ADDED FROM MQTT INPUT - NEEDS MODIFICATION FOR OUTPUT

def callbacks_connect(self):
""" Connect the callback functions """
try:
self.logger.debug("Connecting MQTT callback functions")
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
self.client.on_subscribe = self.on_subscribe
self.client.on_disconnect = self.on_disconnect
self.logger.debug("MQTT callback functions connected")
except:
self.logger.error("Unable to connect mqtt callback functions")

def connect(self):
""" Set up the connection to the MQTT Server """
try:
self.client.connect(
self.mqtt_hostname,
port=self.mqtt_port,
keepalive=self.mqtt_keepalive)
self.logger.info("Connected to {} as {}".format(
self.mqtt_hostname, self.mqtt_clientid))
except:
self.logger.error("Could not connect to mqtt host: {}:{}".format(
self.mqtt_hostname, self.mqtt_port))

def subscribe(self):
""" Set up the subscriptions to the proper MQTT channels to listen to """
try:
for channel in self.channels_measurement:
self.client.subscribe(self.channels_measurement[channel].name)
self.logger.debug("Subscribed to MQTT channel '{}'".format(
self.channels_measurement[channel].name))
except:
self.logger.error("Could not subscribe to MQTT channel '{}'".format(
self.mqtt_channel))

def on_connect(self, client, obj, flags, rc):
self.logger.debug("Connected to '{}', rc: {}".format(
self.mqtt_channel, rc))

def on_subscribe(self, client, obj, mid, granted_qos):
self.logger.debug("Subscribing to mqtt topic: {}, {}, {}".format(
self.mqtt_channel, mid, granted_qos))

def on_log(self, mqttc, obj, level, string):
self.logger.info("Log: {}".format(string))

def on_message(self, client, userdata, msg):
datetime_utc = datetime.datetime.utcnow()
self.logger.debug("Message received: Channel: {}, Value: {}".format(
msg.topic, msg.payload.decode()))
measurement = {}
channel = None
for each_channel in self.channels_measurement:
if self.channels_measurement[each_channel].name == msg.topic:
channel = each_channel

if channel is None:
self.logger.error("Could not determine channel for '{}'".format(msg.topic))
return

try:
value = float(msg.payload.decode())
except Exception:
self.logger.exception("Message doesn't represent a float value.")
return

# Original value/unit
measurement[channel] = {}
measurement[channel]['measurement'] = self.channels_measurement[channel].measurement
measurement[channel]['unit'] = self.channels_measurement[channel].unit
measurement[channel]['value'] = value
measurement[channel]['timestamp_utc'] = datetime_utc

self.add_measurement_influxdb(channel, measurement)

def add_measurement_influxdb(self, channel, measurement):
# Convert value/unit is conversion_id present and valid
if self.channels_conversion[channel]:
conversion = db_retrieve_table_daemon(
Conversion,
unique_id=self.channels_measurement[channel].conversion_id)
if conversion:
meas = parse_measurement(
self.channels_conversion[channel],
self.channels_measurement[channel],
measurement,
channel,
measurement[channel],
timestamp=measurement[channel]['timestamp_utc'])

measurement[channel]['measurement'] = meas[channel]['measurement']
measurement[channel]['unit'] = meas[channel]['unit']
measurement[channel]['value'] = meas[channel]['value']

if measurement:
self.logger.debug(
"Adding measurement to influxdb: {}".format(measurement))
add_measurements_influxdb(
self.unique_id,
measurement,
use_same_timestamp=INPUT_INFORMATION['measurements_use_same_timestamp'])

def on_disconnect(self, client, userdata, rc=0):
self.logger.debug("Disconnected result code {}".format(rc))

def stop_input(self):
""" Called when Input is deactivated """
self.running = False
self.client.loop_stop()
self.client.disconnect()

def listener(self):
self.callbacks_connect()
self.connect()
self.subscribe()
self.client.loop_start()
This post was modified 10 months ago 3 times by choc_fudge

ReplyQuote
Kyle Gabriel
(@kylegabriel)
Member Admin
Joined: 6 years ago
Posts: 612
 

Thanks for formatting the code. I just reviewed it and made some changes. I see you took elements from the input module and added, which is an issue because some elements work in output modules and others do not. I have the output module stripped of things that don't work and it's shaping up nicely. I'm at a point where I need to review a little deeper how MQTT works in order to determine the best way to design what to do when the controller receives a command to turn on or off. Any assistance in this area will be helpful. Once I have what appears to be a working output module (i.e. able to be imported without causing errors), I'll post it.

Mycodo Developer


ReplyQuote
Page 1 / 3