Custom output modul...
 
Notifications
Clear all

[Solved] Custom output module for grove I2C Motor Driver  

  RSS

mcfada
(@mcfada)
New Member
Joined: 2 months ago
Posts: 4
October 2, 2020 8:11 pm  

I'm working on a custom output module for the Grove I2C Motor Driver so we can use generic peristaltic pumps over I2C. I am only an amateur programmer (one or two classes in college, years ago, not python), and am using a few other modules as reference to help me along.

pump_generic.py, pump_atlas_ezo_pmp.py, pcf8574.py, example_dummy_output.py

and also an I2C motor driver from https://github.com/DexterInd/GrovePi/blob/master/Software/Python/grove_i2c_motor_driver/grove_i2c_motor_driver.py

I explain this all in case there is anything missing or just ridiculous in my code, I don't know better. Regardless, I actually think I've gotten pretty far. The interface from OUTPUT_INFORMATION looks how I think it should. But I'm having trouble making sense of how to actually get the OutputModule class to literally speak to the I2C register based on user inputs from the UI. I kind of understand how it's working in the GPIO generic example, but not fully, so having a tough time transfer over that to I2C, which I haven't worked with before. Here is an example excerpt of how the two compare, basically what I've been trying.

#self.GPIO.output(self.options_channels['pin'][0], self.options_channels['on_state'][0])
smbus.SMBus(1).write_i2c_block_data(self.I2CMotorDriverAdd, self.MotorSpeedSet, 100)

where some of the registers listed there are 

MotorSpeedSet = 0x82
PWMFrequenceSet = 0x84
DirectionSet = 0xaa
MotorSetA = 0xa1
MotorSetB = 0xa5
Nothing = 0x01
EnableStepper = 0x1a
UnenableStepper = 0x1b
Stepernu = 0x1c
I2CMotorDriverAdd = 0x0f #Set the address of the I2CMotorDriver

Here is the full code I have so far. Sorry for the formatting, only way I could copy it in with the tabs included. Any help is appreciated. If there is a better place to post this (GitHub maybe?) let me know and I will. Thank you

# coding=utf-8
#
# grove_i2c_motor_driver.py - Output for Grove I2C
#

#Import stuff
import copy
import datetime
import threading
import time

from flask_babel import lazy_gettext

from mycodo.databases.models import DeviceMeasurements
from mycodo.databases.models import OutputChannel
from mycodo.outputs.base_output import AbstractOutput
from mycodo.utils.database import db_retrieve_table_daemon
from mycodo.utils.influx import add_measurements_influxdb
from mycodo.utils.influx import read_last_influxdb


def constraints_pass_positive_value(mod_input,value):
    """
    Check if the user input is acceptable
    :param mod_input: SQL object with user-saved Input options
    :param value: float or int
    :return: tuple: (bool, list of strings)
    """
    errors = []
    all_passed = True
    # Ensure value is positive
    if value <= 0:
        all_passed = False
        
        errors.append("Must be a positive value")
    return all_passed, errors, mod_input


#Measurements
measurements_dict = {
    0: {
        'measurement': 'duration_time',
        'unit': 's',
        'name': 'Pump On',
    },
    1: {
        'measurement': 'volume',
        'unit': 'ml',
        'name': 'Dispense Volume',
    },
    2: {
        'measurement': 'duration_time',
        'unit': 's',
        'name': 'Dispense Duration',
    },
    3: {
        'measurement': 'duration_time',
        'unit': 's',
        'name': 'Pump On',
    },
    4: {
        'measurement': 'volume',
        'unit': 'ml',
        'name': 'Dispense Volume',
    },
    5: {
        'measurement': 'duration_time',
        'unit': 's',
        'name': 'Dispense Duration',
    }
}

channels_dict = {
    0: {
        'name': 'Pump A',
        'types': ['volume', 'on_off'],
        'measurements': [0, 1, 2]
    },
    1: {
        'name': 'Pump B',
        'types': ['volume', 'on_off'],
        'measurements': [3, 4, 5]
    }
}


# Output information
OUTPUT_INFORMATION = {
    # A unique output name used to distinguish it from others
    'output_name_unique': 'grove_i2c_motor_driver'


    # A friendly/common name for the output to display to the user
    'output_name': "{} ({})".format(lazy_gettext('Dual Peristaltic Pump'), lazy_gettext('Groe')),


    # Optional library name (for outputs that are named the same but use different libraries)
    #'output_library': 'library_name'


    # The dictionary of measurements for this output. Don't edit this.
    'measurements_dict': measurements_dict,
    
    'channels_dict': channels_dict


    # Type of output. Options: "on_off", "pwm", "volume"
    'output_types': ['volume','on_off'],
    
    'url_manufacturer': 'https://wiki.seeedstudio.com/Grove-I2C_Motor_Driver_V1.3/',
    'url_datasheet': 'https://wiki.seeedstudio.com/Grove-I2C_Motor_Driver_V1.3/',
    'url_product_purchase': 'https://amzn.to/3nbBCuX'


    # A message to display at the top of the output options
    'message': 'Grove I2C Motor Driver V1.3, used to power two peristaltic pumps by I2C. '
                'Pumps are available at https://amzn.to/3imFAO3 and dispense 5-100mL/min',
    
    # Form input options that are enabled or disabled
    'options_enabled': [
        'i2c_location',
        'button_on',            # Shows a button to turn the output on
        'button_send_volume',   # Shows an input field and a button to turn on for a volume
        'button_send_duration'  # Shows an input field and a button to turn on for a duration
    ],
    'options_disabled': [
        'interface'  # Show the interface (as a disabled input)
    ],
    
    # Any dependencies required by the output module
    'dependencies_module': [
        ('pip-pypi', 'smbus2', 'smbus2')
    ],
    
    # The interface or interfaces that can be used with this module
    # A custom interface can be used.
    # Options: SHELL, PYTHON, GPIO, I2C, FTDI, UART
    'interfaces': ['I2C'],
    
    'i2c_address_editable': True,
    'i2c_address_default': '0x0f',
    
    'custom_options_message':   "To accurately dispense specific volumes, the following options need to be correctly "
                                "set. To determine the flow rate of your pump, first purge the fluid line to remove "
                                "air. Next, turn the pump on for 60 seconds and collect the fluid that's dispensed. "
                                "Last, measure and enter the amount of fluid that was dispensed, in ml, into the "
                                "Fastest Rate (ml/min) field. Your pump should now be calibrated to dispense volumes "
                                "accurately. "
                                "Since Peristaltic Pump Output controllers are capable of accepting multiple different "
                                "dispersal value types, Default Dispersal Method must be set in order to specify whether "
                                "the peristaltic pump should output for a duration or a specific volume when other "
                                "controllers (such as PID controllers) send a value instructing it to dispense." ,
    
    'custom_channel_options': [
        {
            'id': 'flow_mode',
            'type': 'select',
            'default_value': 'fastest_flow_rate',
            'options_select': [
                ('fastest_flow_rate', 'Fastest Flow Rate'),
                ('specify_flow_rate', 'Specify Flow Rate')
            ],
            'name': lazy_gettext('Flow Rate Method'),
            'phrase': lazy_gettext('The flow rate to use when pumping a volume')
        },
        {
            'id': 'flow_rate',
            'type': 'float',
            'default_value': 10.0,
            'constraints_pass': constraints_pass_positive_value,
            'name': 'Desired Flow Rate (ml/min)',
            'phrase': 'Desired flow rate in ml/minute when Specify Flow Rate set'
        },
        {
            'id': 'fastest_dispense_rate_ml_min',
            'type': 'float',
            'default_value': 100.0,
            'constraints_pass': constraints_pass_positive_value,
            'name': 'Fastest Rate (ml/min)',
            'phrase': 'The fastest rate that the pump can dispense (ml/min)'
        }
    ],
}
    
    
class OutputModule(AbstractOutput):
    """ An output support class that operates an output """
    
    MotorSpeedSet       = 0x82
    PWMFrequenceSet     = 0x84
    DirectionSet        = 0xaa
    MotorSetA           = 0xa1
    MotorSetB           = 0xa5
    Nothing             = 0x01
    EnableStepper       = 0x1a
    UnenableStepper     = 0x1b
    Stepernu            = 0x1c
    I2CMotorDriverAdd   = 0x0f  #Set the address of the I2CMotorDriver
    
    def __init__(self, output, testing=False):
        super(OutputModule, self).__init__(output, testing=testing, name=__name__)
        
        # Initialize custom option variables to Non
        self.currently_dispensing = False
        
        # Set custom option variables to defaults or user-set values
        output_channels = db_retrieve_table_daemon(
            OutputChannel).filter(OutputChannel.output_id == output.unique_id).all()
        self.options_channels = self.setup_custom_channel_options_json(
            OUTPUT_INFORMATION['custom_channel_options'], output_channels)
    
    def setup_output(self):
        import smbus2
        
        self.setup_on_off_output(OUTPUT_INFORMATION)
        
        try:
            self.logger.debug("I2C: Address: {}, Bus: {}".format(self.output.i2c_location, self.output.i2c_bus))
            if self.output.i2c_location:
                i2c_address=int(str(self.output.i2c_location), 16),
                i2c_bus=self.output.i2c_bus,
                self.output_setup = True
        except:
            self.logger.exception("Could not set up output")
            return

        # Variable to store whether the output has been successfully set up
        self.logger.info("Output set up")
        self.output_setup = True


    #Maps speed from 0-100 to 0-255
    def map_vals(self,value, leftMin, leftMax, rightMin, rightMax):
        # http://stackoverflow.com/questions/1969240/mapping-a-range-of-values-to-another
        # Figure out how 'wide' each range is
        leftSpan = leftMax - leftMin
        rightSpan = rightMax - rightMin

        # Convert the left range into a 0-1 range (float)
        valueScaled = float(value - leftMin) / float(leftSpan)

        # Convert the 0-1 range into a value in the right range.
        return int(rightMin + (valueScaled * rightSpan))
    
    def dispense_volume_fastest(self, amount, total_dispense_seconds):
        """ Dispense at fastest flow rate, a 100 % duty cycle """
        self.currently_dispensing = True
        self.logger.debug("Output turned on")
        
        """
        #bus.write_i2c_block_data(self.I2CMotorDriverAdd, self.MotorSpeedSet, [self.map_vals(MotorSpeedA,0,100,0,255),self.map_vals(MotorSpeedB,0,100,0,255)])
        #self.GPIO.output(self.options_channels['pin'][0], self.options_channels['on_state'][0])
        """
        #self.GPIO.output(self.options_channels['pin'][0], self.options_channels['on_state'][0])
        smbus.SMBus(1).write_i2c_block_data(self.I2CMotorDriverAdd, self.MotorSpeedSet, 100)
        
        
        timer_dispense = time.time() + total_dispense_second


        while time.time() < timer_dispense and self.currently_dispensing:
            time.sleep(0.01)
        
        #####################
        #self.GPIO.output(self.options_channels['pin'][0], not self.options_channels['on_state'][0])
        self.currently_dispensing = False
        self.logger.debug("Output turned off")
        self.record_dispersal(amount, total_dispense_seconds, total_dispense_seconds)

    def dispense_volume_rate(self, amount, dispense_rate):
        """ Dispense at a specific flow rate """
        # Calculate total disperse time and durations to cycle on/off to reach total volume
        total_dispense_seconds = amount / dispense_rate * 60
        self.logger.debug("Total duration to run: {0:.1f} seconds".format(total_dispense_seconds))

        duty_cycle = dispense_rate / self.options_channels['fastest_dispense_rate_ml_min'][0]
        self.logger.debug("Duty Cycle: {0:.1f} %".format(duty_cycle * 100))

        total_seconds_on = total_dispense_seconds * duty_cycle
        self.logger.debug("Total seconds on: {0:.1f}".format(total_seconds_on))

        total_seconds_off = total_dispense_seconds - total_seconds_on
        self.logger.debug("Total seconds off: {0:.1f}".format(total_seconds_off))

        repeat_seconds_on = self.options_channels['minimum_sec_on_per_min'][0]
        repeat_seconds_off = self.options_channels['minimum_sec_on_per_min'][0] / duty_cycle
        self.logger.debug("Repeat for {rep:.2f} seconds: on {on:.1f} seconds, off {off:.1f} seconds".format(
            rep=repeat_seconds_off, on=repeat_seconds_on, off=repeat_seconds_off))

        self.currently_dispensing = True
        timer_dispense = time.time() + total_dispense_seconds

        while time.time() < timer_dispense and self.currently_dispensing:
            # On for duration
            timer_dispense_on = time.time() + repeat_seconds_on
            self.logger.debug("Output turned on")
            ########################
            #self.GPIO.output(self.options_channels['pin'][0], self.options_channels['on_state'][0])
            while time.time() < timer_dispense_on and self.currently_dispensing:
                time.sleep(0.01)

            # Off for duration
            timer_dispense_off = time.time() + repeat_seconds_off
            self.logger.debug("Output turned off")
            ########################
            #self.GPIO.output(self.options_channels['pin'][0], not self.options_channels['on_state'][0])
            while time.time() < timer_dispense_off and self.currently_dispensing:
                time.sleep(0.01)
        ########################
        #self.GPIO.output(self.options_channels['pin'][0], not self.options_channels['on_state'][0])
        self.currently_dispensing = False
        self.logger.debug("Output turned off")
        self.record_dispersal(amount, total_seconds_on, total_dispense_seconds)

    def record_dispersal(self, amount, total_on_seconds, total_dispense_seconds):
        measure_dict = copy.deepcopy(measurements_dict)
        measure_dict[0]['value'] = total_on_seconds
        measure_dict[1]['value'] = amount
        measure_dict[2]['value'] = total_dispense_seconds
        add_measurements_influxdb(self.unique_id, measure_dict)

    def output_switch(self, state, output_type=None, amount=None, output_channel=None):
        """
        Set the output on, off, to an amount, or to a duty cycle
        output_type can be None, 'sec', 'vol', or 'pwm', and determines the amount's unit
        """
        if state == 'on':
            self.logger.info("Output turned on")
            self.output_states[output_channel] = True
        elif state == 'off':
            self.logger.info("Output turned off")
            self.output_states[output_channel] = False

        self.logger.debug("state: {}, output_type: {}, amount: {}".format(
            state, output_type, amount))

        if state == 'off':
            if self.currently_dispensing:
                self.currently_dispensing = False
            self.logger.debug("Output turned off")
            #self.GPIO.output(self.options_channels['pin'][0], not self.options_channels['on_state'][0])

        elif state == 'on' and output_type == ['vol', None] and amount:
            if self.currently_dispensing:
                self.logger.debug("Pump instructed to turn on for a duration while it's already dispensing. "
                                  "Overriding current dispense with new instruction.")
            
            if self.options_channels['flow_mode'][0] == 'fastest_flow_rate':
                total_dispense_seconds = amount / self.options_channels['fastest_dispense_rate_ml_min'][0] * 60
                msg = "Turning pump on for {sec:.1f} seconds to dispense {ml:.1f} ml (at {rate:.1f} ml/min, " \
                      "the fastest flow rate).".format(
                        sec=total_dispense_seconds,
                        ml=amount,
                        rate=self.options_channels['fastest_dispense_rate_ml_min'][0])
                self.logger.debug(msg)

                write_db = threading.Thread(
                    target=self.dispense_volume_fastest,
                    args=(amount, total_dispense_seconds,))
                write_db.start()
                return

            elif self.options_channels['flow_mode'][0] == 'specify_flow_rate':
                slowest_rate_ml_min = (self.options_channels['fastest_dispense_rate_ml_min'][0] /
                                       60 * self.options_channels['minimum_sec_on_per_min'][0])
                if self.options_channels['flow_rate'][0] < slowest_rate_ml_min:
                    self.logger.debug(
                        "Instructed to dispense {ir:.1f} ml/min, "
                        "however the slowest rate is set to {sr:.1f} ml/min.".format(
                            ir=self.options_channels['flow_rate'][0], sr=slowest_rate_ml_min))
                    dispense_rate = slowest_rate_ml_min
                elif self.options_channels['flow_rate'][0] > self.options_channels['fastest_dispense_rate_ml_min'][0]:
                    self.logger.debug(
                        "Instructed to dispense {ir:.1f} ml/min, "
                        "however the fastest rate is set to {fr:.1f} ml/min.".format(
                            ir=self.options_channels['flow_rate'][0],
                            fr=self.options_channels['fastest_dispense_rate_ml_min'][0]))
                    dispense_rate = self.options_channels['fastest_dispense_rate_ml_min'][0]
                else:
                    dispense_rate = self.options_channels['flow_rate'][0]

                self.logger.debug("Turning pump on to dispense {ml:.1f} ml at {rate:.1f} ml/min.".format(
                    ml=amount, rate=dispense_rate))
                write_db = threading.Thread(
                    target=self.dispense_volume_rate,
                    args=(amount, dispense_rate,))

                write_db.start()
                return

            else:
                self.logger.error("Invalid Output Mode: '{}'. Make sure it is properly set.".format(
                    self.options_channels['flow_mode'][0]))
                return

        elif state == 'on' and output_type == 'sec':
            if self.currently_dispensing:
                self.logger.debug(
                    "Pump instructed to turn on while it's already dispensing. "
                    "Overriding current dispense with new instruction.")
            self.logger.debug("Output turned on")
            #self.GPIO.output(self.options_channels['pin'][0], self.options_channels['on_state'][0])

        else:
            self.logger.error(
                "Invalid parameters: State: {state}, Type: {ot}, Mode: {mod}, Amount: {amt}, Flow Rate: {fr}".format(
                    state=state,
                    ot=output_type,
                    mod=self.options_channels['flow_mode'][0],
                    amt=amount,
                    fr=self.options_channels['flow_rate'][0]))
            return

    def is_on(self, output_channel=None):
        if self.is_setup():
            if self.currently_dispensing:
                return True

    def is_setup(self):
        return self.output_setup

 

 

 

 

    

 

This topic was modified 2 months ago 2 times by mcfada

Quote
Topic Tags
mcfada
(@mcfada)
New Member
Joined: 2 months ago
Posts: 4
October 2, 2020 8:51 pm  

PS. There are syntax errors in the code above because it didn't paste right. I'm trying to figure out how to edit my post again to fix the missing commas. If there's a way to upload a file I will just share my .py file. It passes syntax tests on my actual file


ReplyQuote
Kyle Gabriel
(@kylegabriel)
Member Admin
Joined: 6 years ago
Posts: 368
October 18, 2020 7:59 pm  

output_switch() is the function that is called when the output need to turn on, either for a duration or a volume. Depending how you set up your output, you would either send the I2C command for the pump to run in this function to any helper functions you are using, such as dispense_volume_fastest(). I would work on simple on/off command first and replace your code to turn the pump on below self.logger.info("Output turned on") and your code to turn the pump off below self.logger.info("Output turned off"). Once you get that working, moving to more complex tasks such as dispersing a volume seems appropriate.

Mycodo Developer


ReplyQuote
mcfada
(@mcfada)
New Member
Joined: 2 months ago
Posts: 4
November 1, 2020 7:17 pm  

@kylegabriel Hi Kyle thanks for the feedback. I've got the plugin running where I can turn on/off and change direction of each motor individually. I will keep working on it myself for now, but when I'm ready, what is the best way to contribute something like this to the project? As well as receive coding help, especially for the more in-depth debugging like making sure data records correctly (current level of debug is getting the motors to actually vary their speed...)?


ReplyQuote
Kyle Gabriel
(@kylegabriel)
Member Admin
Joined: 6 years ago
Posts: 368
November 6, 2020 7:56 pm  

You can make pull requests directly to github.com/kizniche/Mycodo or post the code here and I can incorporate it. This forum is the best place for questions related to developing a module.

Mycodo Developer


ReplyQuote