diff --git a/micropython/changelog.txt b/micropython/changelog.txt new file mode 100644 index 00000000..da46795b --- /dev/null +++ b/micropython/changelog.txt @@ -0,0 +1,2 @@ +2026-03-19: 2.0.0 () +- Initial version diff --git a/micropython/example_authenticate.py b/micropython/example_authenticate.py new file mode 100644 index 00000000..a8bafc4d --- /dev/null +++ b/micropython/example_authenticate.py @@ -0,0 +1,57 @@ +#!/usr/bin/env micropython +# -*- coding: utf-8 -*- + +HOST = "192.168.1.100" # Change to the IP of your PC running brickd +PORT = 4223 +SECRET = "My Authentication Secret!" + +# For WiFi-capable boards (e.g. ESP32), connect to your network first: +#import network +#wlan = network.WLAN(network.STA_IF) +#wlan.active(True) +#wlan.connect("YOUR_SSID", "YOUR_PASSWORD") +#while not wlan.isconnected(): +# pass +#print("Connected:", wlan.ifconfig()) + +# NOTE: Authentication requires the hmac module. If your MicroPython build +# does not include it, install it first: +# import mip +# mip.install("hmac") + +from ip_connection import IPConnection + +# Authenticate each time the connection got established +def cb_connected(connect_reason): + if connect_reason == IPConnection.CONNECT_REASON_REQUEST: + print("Connected by request") + + # Authenticate first... + try: + ipcon.authenticate(SECRET) + print("Authentication succeeded") + except: + print("Could not authenticate") + return + + # ...then trigger enumerate + ipcon.enumerate() + +# Print incoming enumeration +def cb_enumerate(uid, connected_uid, position, hardware_version, firmware_version, + device_identifier, enumeration_type): + print("UID: " + uid + ", Enumeration Type: " + str(enumeration_type)) + +# Create IPConnection +ipcon = IPConnection() + +# Register Connected Callback +ipcon.register_callback(IPConnection.CALLBACK_CONNECTED, cb_connected) + +# Register Enumerate Callback +ipcon.register_callback(IPConnection.CALLBACK_ENUMERATE, cb_enumerate) + +# Connect to brickd +ipcon.connect(HOST, PORT) + +ipcon.dispatch_callbacks(-1) # Dispatch callbacks forever diff --git a/micropython/example_enumerate.py b/micropython/example_enumerate.py new file mode 100644 index 00000000..36f0e326 --- /dev/null +++ b/micropython/example_enumerate.py @@ -0,0 +1,45 @@ +#!/usr/bin/env micropython +# -*- coding: utf-8 -*- + +HOST = "192.168.1.100" # Change to the IP of your PC running brickd +PORT = 4223 + +# For WiFi-capable boards (e.g. ESP32), connect to your network first: +#import network +#wlan = network.WLAN(network.STA_IF) +#wlan.active(True) +#wlan.connect("YOUR_SSID", "YOUR_PASSWORD") +#while not wlan.isconnected(): +# pass +#print("Connected:", wlan.ifconfig()) + +from ip_connection import IPConnection + +# Print incoming enumeration +def cb_enumerate(uid, connected_uid, position, hardware_version, firmware_version, + device_identifier, enumeration_type): + print("UID: " + uid) + print("Enumeration Type: " + str(enumeration_type)) + + if enumeration_type == IPConnection.ENUMERATION_TYPE_DISCONNECTED: + print("") + return + + print("Connected UID: " + connected_uid) + print("Position: " + position) + print("Hardware Version: " + str(hardware_version)) + print("Firmware Version: " + str(firmware_version)) + print("Device Identifier: " + str(device_identifier)) + print("") + +# Create connection and connect to brickd +ipcon = IPConnection() +ipcon.connect(HOST, PORT) + +# Register Enumerate Callback +ipcon.register_callback(IPConnection.CALLBACK_ENUMERATE, cb_enumerate) + +# Trigger Enumerate +ipcon.enumerate() + +ipcon.dispatch_callbacks(-1) # Dispatch callbacks forever diff --git a/micropython/generate_micropython_bindings.py b/micropython/generate_micropython_bindings.py new file mode 100644 index 00000000..09673499 --- /dev/null +++ b/micropython/generate_micropython_bindings.py @@ -0,0 +1,721 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +MicroPython Bindings Generator +Created by René Rohner +Copyright (C) 2026 Tinkerforge GmbH + +generate_micropython_bindings.py: Generator for MicroPython bindings + +This program is free software; you can redistribute it and/or +modify it under the terms of the GNU General Public License +as published by the Free Software Foundation; either version 2 +of the License, or (at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +General Public License for more details. + +You should have received a copy of the GNU General Public +License along with this program; if not, write to the +Free Software Foundation, Inc., 59 Temple Place - Suite 330, +Boston, MA 02111-1307, USA. +""" + +import sys + +if sys.hexversion < 0x3040000: + print('Python >= 3.4 required') + sys.exit(1) + +import os +import importlib.util +import importlib.machinery + +def create_generators_module(): + generators_dir = os.path.split(os.path.dirname(os.path.realpath(__file__)))[0] + + if sys.hexversion < 0x3050000: + generators_module = importlib.machinery.SourceFileLoader('generators', os.path.join(generators_dir, '__init__.py')).load_module() + else: + generators_spec = importlib.util.spec_from_file_location('generators', os.path.join(generators_dir, '__init__.py')) + generators_module = importlib.util.module_from_spec(generators_spec) + + generators_spec.loader.exec_module(generators_module) + + sys.modules['generators'] = generators_module + +if 'generators' not in sys.modules: + create_generators_module() + +from generators import common +from generators.micropython import micropython_common + +class MicroPythonBindingsDevice(micropython_common.MicroPythonDevice): + def get_micropython_import(self): + template = """# -*- coding: utf-8 -*- +{0}{1} +from collections import namedtuple + +from ip_connection import Device, IPConnection, Error, create_char, create_char_list, create_string, create_chunk_data + +""" + + if not self.is_released(): + released = '\n#### __DEVICE_IS_NOT_RELEASED__ ####\n' + else: + released = '' + + return template.format(self.get_generator().get_header_comment('hash'), + released) + + def get_micropython_namedtuples(self): + tuples = '' + template = """{0} = namedtuple('{1}', [{2}]) +""" + + for packet in self.get_packets('function'): + if len(packet.get_elements(direction='out')) < 2: + continue + + name = packet.get_name() + + if name.space.startswith('Get '): + name_tup = name.camel[3:] + else: + name_tup = name.camel + + params = [] + + for element in packet.get_elements(direction='out'): + params.append("'{0}'".format(element.get_name().under)) + + tuples += template.format(name.camel, name_tup, ", ".join(params)) + + for packet in self.get_packets('function'): + if not packet.has_high_level(): + continue + + if len(packet.get_elements(direction='out', high_level=True)) < 2: + continue + + name = packet.get_name(skip=-2) + + if name.space.startswith('Get '): + name_tup = name.camel[3:] + else: + name_tup = name.camel + + params = [] + + for element in packet.get_elements(direction='out', high_level=True): + params.append("'{0}'".format(element.get_name().under)) + + tuples += template.format(name.camel, name_tup, ", ".join(params)) + + return tuples + + def get_micropython_class(self): + template = """ +class {0}(Device): + r\"\"\" + {1} + \"\"\" + + DEVICE_IDENTIFIER = {2} + DEVICE_DISPLAY_NAME = '{3}' + DEVICE_URL_PART = '{4}' # internal + +""" + + return template.format(self.get_micropython_class_name(), + common.select_lang(self.get_description()), + self.get_device_identifier(), + self.get_long_display_name(), + self.get_name().under) + + def get_micropython_callback_id_definitions(self): + callback_ids = '' + template = ' CALLBACK_{0} = {1}\n' + + for packet in self.get_packets('callback'): + callback_ids += template.format(packet.get_name().upper, packet.get_function_id()) + + if self.get_long_display_name() == 'RS232 Bricklet': + callback_ids += ' CALLBACK_READ_CALLBACK = 8 # for backward compatibility\n' + callback_ids += ' CALLBACK_ERROR_CALLBACK = 9 # for backward compatibility\n' + + callback_ids += '\n' + + for packet in self.get_packets('callback'): + if packet.has_high_level(): + callback_ids += template.format(packet.get_name(skip=-2).upper, -packet.get_function_id()) + + return callback_ids + + def get_micropython_function_id_definitions(self): + function_ids = '\n' + template = ' FUNCTION_{0} = {1}\n' + + for packet in self.get_packets('function'): + function_ids += template.format(packet.get_name().upper, packet.get_function_id()) + + return function_ids + + def get_micropython_constants(self): + constant_format = ' {constant_group_name_upper}_{constant_name_upper} = {constant_value}\n' + + return '\n' + self.get_formatted_constants(constant_format) + + def get_micropython_init_method(self): + template = """ + def __init__(self, uid, ipcon): + r\"\"\" + Creates an object with the unique device ID *uid* and adds it to + the IP Connection *ipcon*. + \"\"\" + Device.__init__(self, uid, ipcon, {0}.DEVICE_IDENTIFIER, {0}.DEVICE_DISPLAY_NAME) + + self.api_version = ({1}, {2}, {3}) + +""" + response_expected = '' + + for packet in self.get_packets('function'): + response_expected += ' self.response_expected[{0}.FUNCTION_{1}] = {0}.RESPONSE_EXPECTED_{2}\n' \ + .format(self.get_micropython_class_name(), packet.get_name().upper, + packet.get_response_expected().upper()) + + return template.format(self.get_micropython_class_name(), *self.get_api_version()) + common.wrap_non_empty('', response_expected, '\n') + + def get_micropython_callback_formats(self): + callback_formats = '' + template = " self.callback_formats[{0}.CALLBACK_{1}] = ({2}, '{3}')\n" + + for packet in self.get_packets('callback'): + callback_formats += template.format(self.get_micropython_class_name(), + packet.get_name().upper, + packet.get_response_size(), + packet.get_micropython_format_list('out')) + + return callback_formats + '\n' + + def get_micropython_high_level_callbacks(self): + high_level_callbacks = '' + template = " self.high_level_callbacks[{0}.CALLBACK_{1}] = [{4}, {{'fixed_length': {2}, 'single_chunk': {3}}}, None]\n" + + for packet in self.get_packets('callback'): + stream = packet.get_high_level('stream_*') + + if stream != None: + roles = [] + + for element in packet.get_elements(direction='out'): + roles.append(element.get_role()) + + high_level_callbacks += template.format(self.get_micropython_class_name(), + packet.get_name(skip=-2).upper, + stream.get_fixed_length(), + stream.has_single_chunk(), + repr(tuple(roles))) + + return high_level_callbacks + + def get_micropython_add_device(self): + return ' ipcon.add_device(self)\n' + + def get_micropython_methods(self): + m_tup = """ + def {0}(self{8}{4}): + r\"\"\" + {10} + \"\"\"{11}{12} + return {1}(*self.ipcon.send_request(self, {2}.FUNCTION_{3}, ({4}{9}), '{5}', {6}, '{7}')) +""" + m_ret = """ + def {0}(self{7}{3}): + r\"\"\" + {9} + \"\"\"{10}{11} + return self.ipcon.send_request(self, {1}.FUNCTION_{2}, ({3}{8}), '{4}', {5}, '{6}') +""" + m_nor = """ + def {0}(self{5}{3}): + r\"\"\" + {7} + \"\"\"{8}{9} + self.ipcon.send_request(self, {1}.FUNCTION_{2}, ({3}{6}), '{4}', 0, '') +""" + methods = '' + cls = self.get_micropython_class_name() + + # normal and low-level + for packet in self.get_packets('function'): + nb = packet.get_name().camel + ns = packet.get_name().under + nh = ns.upper() + par = packet.get_micropython_parameters() + doc = packet.get_micropython_formatted_doc() + cp = '' + ct = '' + + if par != '': + cp = ', ' + + if not ',' in par: + ct = ',' + + in_f = packet.get_micropython_format_list('in') + out_l = packet.get_response_size() + out_f = packet.get_micropython_format_list('out') + + if packet.get_function_id() == 255: # .get_identity + check = '' + else: + check = '\n self.check_validity()\n' + + coercions = common.wrap_non_empty('\n ', packet.get_micropython_parameter_coercions(), '\n') + out_c = len(packet.get_elements(direction='out')) + + if out_c > 1: + methods += m_tup.format(ns, nb, cls, nh, par, in_f, out_l, out_f, cp, ct, doc, check, coercions) + elif out_c == 1: + methods += m_ret.format(ns, cls, nh, par, in_f, out_l, out_f, cp, ct, doc, check, coercions) + else: + methods += m_nor.format(ns, cls, nh, par, in_f, cp, ct, doc, check, coercions) + + # high-level + template_stream_in = """ + def {function_name}(self{high_level_parameters}): + r\"\"\" + {doc} + \"\"\"{coercions} + if len({stream_name_under}) > {stream_max_length}: + raise Error(Error.INVALID_PARAMETER, '{stream_name_space} can be at most {stream_max_length} items long') + + {stream_name_under}_length = len({stream_name_under}) + {stream_name_under}_chunk_offset = 0 + + if {stream_name_under}_length == 0: + {stream_name_under}_chunk_data = [{chunk_padding}] * {chunk_cardinality} + ret = self.{function_name}_low_level({parameters}) + else: + while {stream_name_under}_chunk_offset < {stream_name_under}_length: + {stream_name_under}_chunk_data = create_chunk_data({stream_name_under}, {stream_name_under}_chunk_offset, {chunk_cardinality}, {chunk_padding}) + ret = self.{function_name}_low_level({parameters}) + {stream_name_under}_chunk_offset += {chunk_cardinality} +{result} +""" + template_stream_in_fixed_length = """ + def {function_name}(self{high_level_parameters}): + r\"\"\" + {doc} + \"\"\"{coercions} + {stream_name_under}_length = {fixed_length} + {stream_name_under}_chunk_offset = 0 + + if len({stream_name_under}) != {stream_name_under}_length: + raise Error(Error.INVALID_PARAMETER, '{stream_name_space} has to be exactly {{0}} items long'.format({stream_name_under}_length)) + + while {stream_name_under}_chunk_offset < {stream_name_under}_length: + {stream_name_under}_chunk_data = create_chunk_data({stream_name_under}, {stream_name_under}_chunk_offset, {chunk_cardinality}, {chunk_padding}) + ret = self.{function_name}_low_level({parameters}) + {stream_name_under}_chunk_offset += {chunk_cardinality} +{result} +""" + template_stream_in_result = """ + return ret""" + template_stream_in_namedtuple_result = """ + return {result_camel_name}(*ret)""" + template_stream_in_short_write = """ + def {function_name}(self{high_level_parameters}): + r\"\"\" + {doc} + \"\"\"{coercions} + if len({stream_name_under}) > {stream_max_length}: + raise Error(Error.INVALID_PARAMETER, '{stream_name_space} can be at most {stream_max_length} items long') + + {stream_name_under}_length = len({stream_name_under}) + {stream_name_under}_chunk_offset = 0 + + if {stream_name_under}_length == 0: + {stream_name_under}_chunk_data = [{chunk_padding}] * {chunk_cardinality} + ret = self.{function_name}_low_level({parameters}) + {chunk_written_0} + else: + {stream_name_under}_written = 0 + + while {stream_name_under}_chunk_offset < {stream_name_under}_length: + {stream_name_under}_chunk_data = create_chunk_data({stream_name_under}, {stream_name_under}_chunk_offset, {chunk_cardinality}, {chunk_padding}) + ret = self.{function_name}_low_level({parameters}) + {chunk_written_n} + + if {chunk_written_test} < {chunk_cardinality}: + break # either last chunk or short write + + {stream_name_under}_chunk_offset += {chunk_cardinality} +{result} +""" + template_stream_in_short_write_chunk_written = ['{stream_name_under}_written = ret', + '{stream_name_under}_written += ret', + 'ret'] + template_stream_in_short_write_namedtuple_chunk_written = ['{stream_name_under}_written = ret.{stream_name_under}_chunk_written', + '{stream_name_under}_written += ret.{stream_name_under}_chunk_written', + 'ret.{stream_name_under}_chunk_written'] + template_stream_in_short_write_result = """ + return {stream_name_under}_written""" + template_stream_in_short_write_namedtuple_result = """ + return {result_camel_name}({result_fields})""" + template_stream_in_single_chunk = """ + def {function_name}(self{high_level_parameters}): + r\"\"\" + {doc} + \"\"\"{coercions} + {stream_name_under}_length = len({stream_name_under}) + {stream_name_under}_data = list({stream_name_under}) # make a copy so we can potentially extend it + + if {stream_name_under}_length > {chunk_cardinality}: + raise Error(Error.INVALID_PARAMETER, '{stream_name_space} can be at most {chunk_cardinality} items long') + + if {stream_name_under}_length < {chunk_cardinality}: + {stream_name_under}_data += [{chunk_padding}] * ({chunk_cardinality} - {stream_name_under}_length) +{result} +""" + template_stream_in_single_chunk_result = """ + return self.{function_name}_low_level({parameters})""" + template_stream_in_single_chunk_namedtuple_result = """ + return {result_camel_name}(*self.{function_name}_low_level({parameters}))""" + template_stream_out = """ + def {function_name}(self{high_level_parameters}): + r\"\"\" + {doc} + \"\"\"{coercions}{fixed_length} + ret = self.{function_name}_low_level({parameters}){dynamic_length_2} + {chunk_offset_check}{stream_name_under}_out_of_sync = ret.{stream_name_under}_chunk_offset != 0 + {chunk_offset_check_indent}{stream_name_under}_data = ret.{stream_name_under}_chunk_data + + while not {stream_name_under}_out_of_sync and len({stream_name_under}_data) < {stream_name_under}_length: + ret = self.{function_name}_low_level({parameters}){dynamic_length_3} + {stream_name_under}_out_of_sync = ret.{stream_name_under}_chunk_offset != len({stream_name_under}_data) + {stream_name_under}_data += ret.{stream_name_under}_chunk_data + + if {stream_name_under}_out_of_sync: # discard remaining stream to bring it back in-sync + while ret.{stream_name_under}_chunk_offset + {chunk_cardinality} < {stream_name_under}_length: + ret = self.{function_name}_low_level({parameters}){dynamic_length_4} + + raise Error(Error.STREAM_OUT_OF_SYNC, '{stream_name_space} stream is out-of-sync') +{result} +""" + template_stream_out_fixed_length = """ + {stream_name_under}_length = {fixed_length} +""" + template_stream_out_dynamic_length = """ +{{indent}}{stream_name_under}_length = ret.{stream_name_under}_length""" + template_stream_out_chunk_offset_check = """ + if ret.{stream_name_under}_chunk_offset == (1 << {shift_size}) - 1: # maximum chunk offset -> stream has no data + {stream_name_under}_length = 0 + {stream_name_under}_out_of_sync = False + {stream_name_under}_data = () + else: + """ + template_stream_out_single_chunk = """ + def {function_name}(self{high_level_parameters}): + r\"\"\" + {doc} + \"\"\"{coercions} + ret = self.{function_name}_low_level({parameters}) +{result} +""" + template_stream_out_result = """ + return {stream_name_under}_data[:{stream_name_under}_length]""" + template_stream_out_single_chunk_result = """ + return ret.{stream_name_under}_data[:ret.{stream_name_under}_length]""" + template_stream_out_namedtuple_result = """ + return {result_name}({result_fields})""" + + for packet in self.get_packets('function'): + stream_in = packet.get_high_level('stream_in') + stream_out = packet.get_high_level('stream_out') + + if stream_in != None: + if stream_in.get_fixed_length() != None: + template = template_stream_in_fixed_length + elif stream_in.has_short_write() and stream_in.has_single_chunk(): + template = template_stream_in_single_chunk + elif stream_in.has_short_write(): + template = template_stream_in_short_write + elif stream_in.has_single_chunk(): + template = template_stream_in_single_chunk + else: + template = template_stream_in + + if stream_in.has_short_write(): + if len(packet.get_elements(direction='out')) < 2: + chunk_written_0 = template_stream_in_short_write_chunk_written[0].format(stream_name_under=stream_in.get_name().under) + chunk_written_n = template_stream_in_short_write_chunk_written[1].format(stream_name_under=stream_in.get_name().under) + chunk_written_test = template_stream_in_short_write_chunk_written[2].format(stream_name_under=stream_in.get_name().under) + else: + chunk_written_0 = template_stream_in_short_write_namedtuple_chunk_written[0].format(stream_name_under=stream_in.get_name().under) + chunk_written_n = template_stream_in_short_write_namedtuple_chunk_written[1].format(stream_name_under=stream_in.get_name().under) + chunk_written_test = template_stream_in_short_write_namedtuple_chunk_written[2].format(stream_name_under=stream_in.get_name().under) + + if len(packet.get_elements(direction='out', high_level=True)) < 2: + if stream_in.has_single_chunk(): + result = template_stream_in_single_chunk_result.format(function_name=packet.get_name(skip=-2).under, + parameters=packet.get_micropython_parameters()) + else: + result = template_stream_in_short_write_result.format(stream_name_under=stream_in.get_name().under) + else: + if stream_in.has_single_chunk(): + result = template_stream_in_single_chunk_namedtuple_result.format(function_name=packet.get_name(skip=-2).under, + parameters=packet.get_micropython_parameters(), + result_camel_name=packet.get_name(skip=-2).camel) + else: + fields = [] + + for element in packet.get_elements(direction='out', high_level=True): + if element.get_role() == 'stream_written': + fields.append('{0}_written'.format(stream_in.get_name().under)) + else: + fields.append('ret.{0}'.format(element.get_name().under)) + + result = template_stream_in_short_write_namedtuple_result.format(result_camel_name=packet.get_name(skip=-2).camel, + result_fields=', '.join(fields)) + else: + chunk_written_0 = '' + chunk_written_n = '' + chunk_written_test = '' + + if len(packet.get_elements(direction='out', high_level=True)) < 2: + if stream_in.has_single_chunk(): + result = template_stream_in_single_chunk_result.format(function_name=packet.get_name(skip=-2).under, + parameters=packet.get_micropython_parameters()) + else: + result = template_stream_in_result + else: + if stream_in.has_single_chunk(): + result = template_stream_in_single_chunk_namedtuple_result.format(function_name=packet.get_name(skip=-2).under, + parameters=packet.get_micropython_parameters(), + result_camel_name=packet.get_name(skip=-2).camel) + else: + result = template_stream_in_namedtuple_result.format(result_camel_name=packet.get_name(skip=-2).camel) + + methods += template.format(doc=packet.get_micropython_formatted_doc(), + coercions=common.wrap_non_empty('\n ', packet.get_micropython_parameter_coercions(high_level=True), '\n'), + function_name=packet.get_name(skip=-2).under, + parameters=packet.get_micropython_parameters(), + high_level_parameters=common.wrap_non_empty(', ', packet.get_micropython_parameters(high_level=True), ''), + stream_name_space=stream_in.get_name().space, + stream_name_under=stream_in.get_name().under, + stream_max_length=abs(stream_in.get_data_element().get_cardinality()), + fixed_length=stream_in.get_fixed_length(), + chunk_cardinality=stream_in.get_chunk_data_element().get_cardinality(), + chunk_padding=stream_in.get_chunk_data_element().get_micropython_default_item_value(), + chunk_written_0=chunk_written_0, + chunk_written_n=chunk_written_n, + chunk_written_test=chunk_written_test, + result=result) + elif stream_out != None: + if stream_out.get_fixed_length() != None: + fixed_length = template_stream_out_fixed_length.format(stream_name_under=stream_out.get_name().under, + fixed_length=stream_out.get_fixed_length()) + dynamic_length = '' + shift_size = int(stream_out.get_chunk_offset_element().get_type().replace('uint', '')) + chunk_offset_check = template_stream_out_chunk_offset_check.format(stream_name_under=stream_out.get_name().under, + shift_size=shift_size) + chunk_offset_check_indent = ' ' + else: + fixed_length = '' + dynamic_length = template_stream_out_dynamic_length.format(stream_name_under=stream_out.get_name().under) + chunk_offset_check = '' + chunk_offset_check_indent = '' + + if len(packet.get_elements(direction='out', high_level=True)) < 2: + if stream_out.has_single_chunk(): + result = template_stream_out_single_chunk_result.format(stream_name_under=stream_out.get_name().under) + else: + result = template_stream_out_result.format(stream_name_under=stream_out.get_name().under) + else: + fields = [] + + for element in packet.get_elements(direction='out', high_level=True): + if element.get_role() == 'stream_data': + if stream_out.has_single_chunk(): + fields.append('ret.{0}_data[:ret.{0}_length]'.format(stream_out.get_name().under)) + else: + fields.append('{0}_data[:{0}_length]'.format(stream_out.get_name().under)) + else: + fields.append('ret.{0}'.format(element.get_name().under)) + + result = template_stream_out_namedtuple_result.format(result_name=packet.get_name(skip=-2).camel, + result_fields=', '.join(fields)) + + if stream_out.has_single_chunk(): + template = template_stream_out_single_chunk + else: + template = template_stream_out + + methods += template.format(doc=packet.get_micropython_formatted_doc(), + coercions=common.wrap_non_empty('\n ', packet.get_micropython_parameter_coercions(high_level=True), '\n'), + function_name=packet.get_name(skip=-2).under, + parameters=packet.get_micropython_parameters(), + high_level_parameters=common.wrap_non_empty(', ', packet.get_micropython_parameters(high_level=True), ''), + stream_name_space=stream_out.get_name().space, + stream_name_under=stream_out.get_name().under, + fixed_length=fixed_length, + dynamic_length_2=dynamic_length.format(indent=' ' * 2), + dynamic_length_3=dynamic_length.format(indent=' ' * 3), + dynamic_length_4=dynamic_length.format(indent=' ' * 4), + chunk_offset_check=chunk_offset_check, + chunk_offset_check_indent=chunk_offset_check_indent, + chunk_cardinality=stream_out.get_chunk_data_element().get_cardinality(), + result=result) + + return methods + + def get_micropython_register_callback_method(self): + if len(self.get_packets('callback')) == 0: + return '' + + return """ + def register_callback(self, callback_id, function): + r\"\"\" + Registers the given *function* with the given *callback_id*. + \"\"\" + if function is None: + self.registered_callbacks.pop(callback_id, None) + else: + self.registered_callbacks[callback_id] = function +""" + + def get_micropython_old_name(self): + template = """ +{0} = {1} # for backward compatibility +""" + + return template.format(self.get_name().camel, self.get_micropython_class_name()) + + def get_micropython_source(self): + source = self.get_micropython_import() + source += self.get_micropython_namedtuples() + source += self.get_micropython_class() + source += self.get_micropython_callback_id_definitions() + source += self.get_micropython_function_id_definitions() + source += self.get_micropython_constants() + source += self.get_micropython_init_method() + source += self.get_micropython_callback_formats() + source += self.get_micropython_high_level_callbacks() + source += self.get_micropython_add_device() + source += self.get_micropython_methods() + source += self.get_micropython_register_callback_method() + + if self.is_brick() or self.is_bricklet(): + source += self.get_micropython_old_name() + + return common.strip_trailing_whitespace(source) + +class MicroPythonBindingsPacket(micropython_common.MicroPythonPacket): + def get_micropython_formatted_doc(self): + text = common.select_lang(self.get_doc_text()) + + def format_parameter(name): + return '``{0}``'.format(name) # FIXME + + text = common.handle_rst_param(text, format_parameter) + text = common.handle_rst_word(text) + text = common.handle_rst_substitutions(text, self) + text += common.format_since_firmware(self.get_device(), self) + + return '\n '.join(text.strip().split('\n')) + + def get_micropython_format_list(self, io): + forms = [] + + for element in self.get_elements(direction=io): + forms.append(element.get_micropython_struct_format()) + + return ' '.join(forms) + + def get_micropython_parameter_coercions(self, high_level=False): + coercions = [] + + for element in self.get_elements(direction='in', high_level=high_level): + name = element.get_name().under + + coercions.append('{0} = {1}'.format(name, element.get_micropython_parameter_coercion().format(name))) + + return '\n '.join(coercions) + +class MicroPythonBindingsGenerator(micropython_common.MicroPythonGeneratorTrait, common.BindingsGenerator): + def get_device_class(self): + return MicroPythonBindingsDevice + + def get_packet_class(self): + return MicroPythonBindingsPacket + + def get_element_class(self): + return micropython_common.MicroPythonElement + + def prepare(self): + common.BindingsGenerator.prepare(self) + + self.device_factory_all_classes = [] + self.device_factory_released_classes = [] + + def generate(self, device): + filename = '{0}_{1}.py'.format(device.get_category().under, device.get_name().under) + + with open(os.path.join(self.get_bindings_dir(), filename), 'w') as f: + f.write(device.get_micropython_source()) + + self.device_factory_all_classes.append((device.get_micropython_import_name(), device.get_micropython_class_name())) + + if device.is_released(): + self.device_factory_released_classes.append((device.get_micropython_import_name(), device.get_micropython_class_name())) + self.released_files.append(filename) + + def finish(self): + template_import = """from {0} import {1} +""" + template = """# -*- coding: utf-8 -*- +{0} +{1} + +DEVICE_CLASSES = {{ +{2} +}} + +def get_device_class(device_identifier): + return DEVICE_CLASSES[device_identifier] + +def get_device_display_name(device_identifier): + return get_device_class(device_identifier).DEVICE_DISPLAY_NAME + +def create_device(device_identifier, uid, ipcon): + return get_device_class(device_identifier)(uid, ipcon) +""" + for filename, device_factory_classes in [('device_factory_all.py', self.device_factory_all_classes), + ('device_factory.py', self.device_factory_released_classes)]: + imports = [] + classes = [] + + for import_name, class_name in sorted(device_factory_classes): + imports.append(template_import.format(import_name, class_name)) + classes.append(' {0}.DEVICE_IDENTIFIER: {0},'.format(class_name)) + + with open(os.path.join(self.get_bindings_dir(), filename), 'w') as f: + f.write(template.format(self.get_header_comment('hash'), + '\n'.join(imports), + '\n'.join(classes))) + + common.BindingsGenerator.finish(self) + +def generate(root_dir, language, internal): + common.generate(root_dir, language, internal, MicroPythonBindingsGenerator) + +if __name__ == '__main__': + args = common.dockerize('micropython', __file__, add_internal_argument=True) + + generate(os.getcwd(), 'en', args.internal) diff --git a/micropython/generate_micropython_doc.py b/micropython/generate_micropython_doc.py new file mode 100644 index 00000000..5f94e19a --- /dev/null +++ b/micropython/generate_micropython_doc.py @@ -0,0 +1,467 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +MicroPython Documentation Generator +Created by René Rohner +Copyright (C) 2026 Tinkerforge GmbH + +generate_micropython_doc.py: Generator for MicroPython documentation + +This program is free software; you can redistribute it and/or +modify it under the terms of the GNU General Public License +as published by the Free Software Foundation; either version 2 +of the License, or (at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +General Public License for more details. + +You should have received a copy of the GNU General Public +License along with this program; if not, write to the +Free Software Foundation, Inc., 59 Temple Place - Suite 330, +Boston, MA 02111-1307, USA. +""" + +import sys + +if sys.hexversion < 0x3040000: + print('Python >= 3.4 required') + sys.exit(1) + +import os +import importlib.util +import importlib.machinery + +def create_generators_module(): + generators_dir = os.path.split(os.path.dirname(os.path.realpath(__file__)))[0] + + if sys.hexversion < 0x3050000: + generators_module = importlib.machinery.SourceFileLoader('generators', os.path.join(generators_dir, '__init__.py')).load_module() + else: + generators_spec = importlib.util.spec_from_file_location('generators', os.path.join(generators_dir, '__init__.py')) + generators_module = importlib.util.module_from_spec(generators_spec) + + generators_spec.loader.exec_module(generators_module) + + sys.modules['generators'] = generators_module + +if 'generators' not in sys.modules: + create_generators_module() + +from generators import common +from generators.micropython import micropython_common + +class MicroPythonDocDevice(micropython_common.MicroPythonDevice): + def specialize_micropython_doc_function_links(self, text): + def specializer(packet, high_level): + if packet.get_type() == 'callback': + return ':py:attr:`CALLBACK_{1} <{0}.CALLBACK_{1}>`'.format(packet.get_device().get_micropython_class_name(), + packet.get_name(skip=-2 if high_level else 0).upper) + else: + return ':py:func:`{1}() <{0}.{1}>`'.format(packet.get_device().get_micropython_class_name(), + packet.get_name(skip=-2 if high_level else 0).under) + + return self.specialize_doc_rst_links(text, specializer, prefix='py') + + def get_micropython_examples(self): + def title_from_filename(filename): + filename = filename.replace('example_', '').replace('.py', '') + return common.under_to_space(filename) + + return common.make_rst_examples(title_from_filename, self) + + def get_micropython_functions(self, type_): + functions = [] + template = '.. py:function:: {0}.{1}({2})\n\n{3}{4}\n' + cls = self.get_micropython_class_name() + + for packet in self.get_packets('function'): + if packet.get_doc_type() != type_: + continue + + skip = -2 if packet.has_high_level() else 0 + name = packet.get_name(skip=skip).under + params = packet.get_micropython_parameters(high_level=True) + meta = packet.get_formatted_element_meta(lambda element, cardinality=None: element.get_micropython_type(cardinality=cardinality), + lambda element, index=None: element.get_micropython_name(index=index), + return_object='conditional', + no_out_value={'en': 'None', 'de': 'None'}, + explicit_string_cardinality=True, + explicit_variable_stream_cardinality=True, + explicit_fixed_stream_cardinality=True, + explicit_common_cardinality=True, + high_level=True) + meta_table = common.make_rst_meta_table(meta) + desc = packet.get_micropython_formatted_doc() + + functions.append(template.format(cls, name, params, meta_table, desc)) + + return ''.join(functions) + + def get_micropython_callbacks(self): + callbacks = [] + template = '.. py:attribute:: {0}.CALLBACK_{1}\n\n{2}{3}\n' + cls = self.get_micropython_class_name() + + for packet in self.get_packets('callback'): + skip = -2 if packet.has_high_level() else 0 + meta = packet.get_formatted_element_meta(lambda element, cardinality=None: element.get_micropython_type(cardinality=cardinality), + lambda element, index=None: element.get_micropython_name(index=index), + no_out_value={'en': 'no parameters', 'de': 'keine Parameter'}, + explicit_string_cardinality=True, + explicit_variable_stream_cardinality=True, + explicit_fixed_stream_cardinality=True, + explicit_common_cardinality=True, + high_level=True) + meta_table = common.make_rst_meta_table(meta) + desc = packet.get_micropython_formatted_doc() + + callbacks.append(template.format(cls, packet.get_name(skip=skip).upper, meta_table, desc)) + + return ''.join(callbacks) + + def get_micropython_api(self): + create_str = { + 'en': """ +.. py:function:: {0}(uid, ipcon) + +{2} + + Creates an object with the unique device ID ``uid``: + + .. code-block:: python + + {1} = {0}("YOUR_DEVICE_UID", ipcon) + + This object can then be used after the IP Connection is connected. +""", + 'de': """ +.. py:function:: {0}(uid, ipcon) + +{2} + + Erzeugt ein Objekt mit der eindeutigen Geräte ID ``uid``: + + .. code-block:: python + + {1} = {0}("YOUR_DEVICE_UID", ipcon) + + Dieses Objekt kann benutzt werden, nachdem die IP Connection verbunden ist. +""" + } + + register_str = { + 'en': """ +.. py:function:: {2}{1}.register_callback(callback_id, function) + +{3} + + Registers the given ``function`` with the given ``callback_id``. + + The available callback IDs with corresponding function signatures are listed + :ref:`below <{0}_micropython_callbacks>`. +""", + 'de': """ +.. py:function:: {2}{1}.register_callback(callback_id, function) + +{3} + + Registriert die ``function`` für die gegebene ``callback_id``. + + Die verfügbaren Callback IDs mit den zugehörigen Funktionssignaturen sind + :ref:`unten <{0}_micropython_callbacks>` zu finden. +""" + } + + c_str = { + 'en': """ +.. _{0}_micropython_callbacks: + +Callbacks +^^^^^^^^^ + +Callbacks can be registered to receive +time critical or recurring data from the device. The registration is done +with the :py:func:`register_callback() <{1}.register_callback>` function of +the device object. The first parameter is the callback ID and the second +parameter the callback function: + +.. code-block:: python + + def my_callback(param): + print(param) + + {2}.register_callback({1}.CALLBACK_EXAMPLE, my_callback) + +The available constants with inherent number and type of parameters are +described below. + +.. note:: + Because MicroPython doesn't support threads, you need to call + :py:func:`dispatch_callbacks() ` periodically + to ensure that incoming callbacks are handled. + +{3} +""", + 'de': """ +.. _{0}_micropython_callbacks: + +Callbacks +^^^^^^^^^ + +Callbacks können registriert werden um zeitkritische +oder wiederkehrende Daten vom Gerät zu erhalten. Die Registrierung kann +mit der Funktion :py:func:`register_callback() <{1}.register_callback>` des +Geräte Objektes durchgeführt werden. Der erste Parameter ist die Callback ID +und der zweite Parameter die Callback-Funktion: + +.. code-block:: python + + def my_callback(param): + print(param) + + {2}.register_callback({1}.CALLBACK_EXAMPLE, my_callback) + +Die verfügbaren IDs mit der dazugehörigen Parameteranzahl und -typen werden +weiter unten beschrieben. + +.. note:: + Da MicroPython keine Threads unterstützt, muss + :py:func:`dispatch_callbacks() ` periodisch + aufgerufen werden, damit eingehende Callbacks verarbeitet werden. + +{3} +""" + } + + api = { + 'en': """ +.. _{0}_micropython_api: + +API +--- + +Generally, every function of the MicroPython bindings can throw an +``ip_connection.Error`` exception that has a ``value`` and a +``description`` property. ``value`` can have different values: + +* Error.TIMEOUT = -1 +* Error.ALREADY_CONNECTED = -7 +* Error.NOT_CONNECTED = -8 +* Error.INVALID_PARAMETER = -9 +* Error.NOT_SUPPORTED = -10 +* Error.UNKNOWN_ERROR_CODE = -11 +* Error.STREAM_OUT_OF_SYNC = -12 +* Error.INVALID_UID = -13 +* Error.NON_ASCII_CHAR_IN_SECRET = -14 +* Error.WRONG_DEVICE_TYPE = -15 +* Error.DEVICE_REPLACED = -16 +* Error.WRONG_RESPONSE_LENGTH = -17 + +Because MicroPython doesn't support threads, none of the functions listed below +are thread-safe. The MicroPython bindings use synchronous I/O. Use +:py:func:`dispatch_callbacks() ` to handle +callbacks. + +{1} + +{2} +""", + 'de': """ +.. _{0}_micropython_api: + +API +--- + +Prinzipiell kann jede Funktion der MicroPython Bindings +``ip_connection.Error`` Exception werfen, welche ein ``value`` und +eine ``description`` Property hat. ``value`` kann verschiedene Werte haben: + +* Error.TIMEOUT = -1 +* Error.ALREADY_CONNECTED = -7 +* Error.NOT_CONNECTED = -8 +* Error.INVALID_PARAMETER = -9 +* Error.NOT_SUPPORTED = -10 +* Error.UNKNOWN_ERROR_CODE = -11 +* Error.STREAM_OUT_OF_SYNC = -12 +* Error.INVALID_UID = -13 +* Error.NON_ASCII_CHAR_IN_SECRET = -14 +* Error.WRONG_DEVICE_TYPE = -15 +* Error.DEVICE_REPLACED = -16 +* Error.WRONG_RESPONSE_LENGTH = -17 + +Da MicroPython keine Threads unterstützt, ist keine der folgend aufgelisteten +Funktionen Thread-sicher. Die MicroPython Bindings verwenden synchrone I/O. +Verwende :py:func:`dispatch_callbacks() ` um +Callbacks zu verarbeiten. + +{1} + +{2} +""" + } + + const_str = { + 'en': """ +.. _{0}_micropython_constants: + +Constants +^^^^^^^^^ + +.. py:attribute:: {1}.DEVICE_IDENTIFIER + + This constant is used to identify a {3}. + + The :py:func:`get_identity() <{1}.get_identity>` function and the + :py:attr:`IPConnection.CALLBACK_ENUMERATE ` + callback of the IP Connection have a ``device_identifier`` parameter to specify + the Brick's or Bricklet's type. + +.. py:attribute:: {1}.DEVICE_DISPLAY_NAME + + This constant represents the human readable name of a {3}. +""", + 'de': """ +.. _{0}_micropython_constants: + +Konstanten +^^^^^^^^^^ + +.. py:attribute:: {1}.DEVICE_IDENTIFIER + + Diese Konstante wird verwendet um {2} {3} zu identifizieren. + + Die :py:func:`get_identity() <{1}.get_identity>` Funktion und der + :py:attr:`IPConnection.CALLBACK_ENUMERATE ` + Callback der IP Connection haben ein ``device_identifier`` Parameter um den Typ + des Bricks oder Bricklets anzugeben. + +.. py:attribute:: {1}.DEVICE_DISPLAY_NAME + + Diese Konstante stellt den Anzeigenamen eines {3} dar. +""" + } + + create_meta = common.format_simple_element_meta([('uid', 'str', 1, 'in'), + ('ipcon', 'IPConnection', 1, 'in'), + (self.get_name().under, self.get_micropython_class_name(), 1, 'out')]) + create_meta_table = common.make_rst_meta_table(create_meta) + + cre = common.select_lang(create_str).format(self.get_micropython_class_name(), + self.get_name().under, + create_meta_table) + + reg_meta = common.format_simple_element_meta([('callback_id', 'int', 1, 'in'), + ('function', 'callable', 1, 'in')], + no_out_value={'en': 'None', 'de': 'None'}) + reg_meta_table = common.make_rst_meta_table(reg_meta) + + reg = common.select_lang(register_str).format(self.get_doc_rst_ref_name(), + self.get_name().camel, + self.get_category().camel, + reg_meta_table) + + bf = self.get_micropython_functions('bf') + af = self.get_micropython_functions('af') + ccf = self.get_micropython_functions('ccf') + c = self.get_micropython_callbacks() + vf = self.get_micropython_functions('vf') + if_ = self.get_micropython_functions('if') + api_str = '' + + if bf: + api_str += common.select_lang(common.bf_str).format(cre, bf) + + if af: + api_str += common.select_lang(common.af_str).format(af) + + if c: + api_str += common.select_lang(common.ccf_str).format(reg, ccf) + api_str += common.select_lang(c_str).format(self.get_doc_rst_ref_name(), + self.get_micropython_class_name(), + self.get_name().under, + c) + + if vf: + api_str += common.select_lang(common.vf_str).format(vf) + + if if_: + api_str += common.select_lang(common.if_str).format(if_) + + article = 'ein' + + if self.is_brick(): + article = 'einen' + + api_str += common.select_lang(const_str).format(self.get_doc_rst_ref_name(), + self.get_micropython_class_name(), + article, + self.get_long_display_name()) + + return common.select_lang(api).format(self.get_doc_rst_ref_name(), + self.specialize_micropython_doc_function_links(common.select_lang(self.get_doc())), + api_str) + + def get_micropython_doc(self): + doc = common.make_rst_header(self) + doc += common.make_rst_summary(self) + doc += self.get_micropython_examples() + doc += self.get_micropython_api() + + return doc + +class MicroPythonDocPacket(micropython_common.MicroPythonPacket): + def get_micropython_formatted_doc(self): + text = common.select_lang(self.get_doc_text()) + text = self.get_device().specialize_micropython_doc_function_links(text) + + def format_parameter(name): + return '``{0}``'.format(name) # FIXME + + text = common.handle_rst_param(text, format_parameter) + text = common.handle_rst_word(text) + text = common.handle_rst_substitutions(text, self) + + prefix = self.get_device().get_micropython_class_name() + '.' + + def format_element_name(element, index): + if index == None: + return element.get_name().under + + return '{0}[{1}]'.format(element.get_name().under, index) + + text += common.format_constants(prefix, self, format_element_name) + text += common.format_since_firmware(self.get_device(), self) + + return common.shift_right(text, 1) + +class MicroPythonDocGenerator(micropython_common.MicroPythonGeneratorTrait, common.DocGenerator): + def get_doc_rst_filename_part(self): + return 'MicroPython' + + def get_doc_example_regex(self): + return r'^example_.*\.py$' + + def get_device_class(self): + return MicroPythonDocDevice + + def get_packet_class(self): + return MicroPythonDocPacket + + def get_element_class(self): + return micropython_common.MicroPythonElement + + def generate(self, device): + with open(device.get_doc_rst_path(), 'w') as f: + f.write(device.get_micropython_doc()) + +def generate(root_dir, language, internal): + common.generate(root_dir, language, internal, MicroPythonDocGenerator) + +if __name__ == '__main__': + args = common.dockerize('micropython', __file__, add_internal_argument=True) + + for language in ['en', 'de']: + generate(os.getcwd(), language, args.internal) diff --git a/micropython/generate_micropython_examples.py b/micropython/generate_micropython_examples.py new file mode 100644 index 00000000..94240b4f --- /dev/null +++ b/micropython/generate_micropython_examples.py @@ -0,0 +1,685 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +MicroPython Examples Generator +Created by René Rohner +Copyright (C) 2026 Tinkerforge GmbH + +generate_micropython_examples.py: Generator for MicroPython examples + +This program is free software; you can redistribute it and/or +modify it under the terms of the GNU General Public License +as published by the Free Software Foundation; either version 2 +of the License, or (at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +General Public License for more details. + +You should have received a copy of the GNU General Public +License along with this program; if not, write to the +Free Software Foundation, Inc., 59 Temple Place - Suite 330, +Boston, MA 02111-1307, USA. +""" + +import sys + +if sys.hexversion < 0x3040000: + print('Python >= 3.4 required') + sys.exit(1) + +import os +import importlib.util +import importlib.machinery + +def create_generators_module(): + generators_dir = os.path.split(os.path.dirname(os.path.realpath(__file__)))[0] + + if sys.hexversion < 0x3050000: + generators_module = importlib.machinery.SourceFileLoader('generators', os.path.join(generators_dir, '__init__.py')).load_module() + else: + generators_spec = importlib.util.spec_from_file_location('generators', os.path.join(generators_dir, '__init__.py')) + generators_module = importlib.util.module_from_spec(generators_spec) + + generators_spec.loader.exec_module(generators_module) + + sys.modules['generators'] = generators_module + +if 'generators' not in sys.modules: + create_generators_module() + +from generators import common +from generators.micropython import micropython_common + +global_line_prefix = '' + +class MicroPythonConstant(common.Constant): + def get_micropython_source(self, callback=False): + templateA = '{device_class}.{constant_group_name}_{constant_name}' + templateB = '{device_name}.{constant_group_name}_{constant_name}' + + if callback: + template = templateA + else: + template = templateB + + return template.format(device_class=self.get_device().get_micropython_class_name(), + device_name=self.get_device().get_initial_name(), + constant_group_name=self.get_constant_group().get_name().upper, + constant_name=self.get_name().upper) + +class MicroPythonExample(common.Example): + def get_micropython_source(self): + template = r"""#!/usr/bin/env micropython +# -*- coding: utf-8 -*-{incomplete}{description} + +HOST = "localhost" +PORT = 4223 +UID = "{dummy_uid}" # Change {dummy_uid} to the UID of your {device_name_long_display} +{imports} +from ip_connection import IPConnection +from {device_category_under}_{device_name_under} import {device_category_camel}{device_name_camel} +{functions} +if __name__ == "__main__": + ipcon = IPConnection() # Create IP connection + {device_name_initial} = {device_category_camel}{device_name_camel}(UID, ipcon) # Create device object + + ipcon.connect(HOST, PORT) # Connect to brickd + # Don't use device before ipcon is connected +{sources} + {wait_or_sleep}{cleanups} + ipcon.disconnect() +""" + + if self.is_incomplete(): + incomplete = '\n\n# FIXME: This example is incomplete' + else: + incomplete = '' + + if self.get_description() != None: + description = '\n\n# {0}'.format(self.get_description().replace('\n', '\n# ')) + else: + description = '' + + imports = [] + functions = [] + sources = [] + cleanups = [] + + for function in self.get_functions(): + imports += function.get_micropython_imports() + functions.append(function.get_micropython_function()) + sources.append(function.get_micropython_source()) + + for cleanup in self.get_cleanups(): + imports += cleanup.get_micropython_imports() + functions.append(cleanup.get_micropython_function()) + cleanups.append(cleanup.get_micropython_source()) + + unique_imports = [] + + for import_ in imports: + if import_ not in unique_imports: + unique_imports.append(import_) + + while None in functions: + functions.remove(None) + + while None in sources: + sources.remove(None) + + if len(sources) == 0: + sources = [' # TODO: Add example code here\n'] + + while None in cleanups: + cleanups.remove(None) + + # Use dispatch_callbacks for callback examples, sleep for others + has_callbacks = any(isinstance(f, MicroPythonExampleCallbackFunction) for f in self.get_functions()) + + if has_callbacks: + wait_or_sleep = 'ipcon.dispatch_callbacks(-1) # Dispatch callbacks forever' + else: + wait_or_sleep = 'time.sleep(1)' + + if 'import time\n' not in unique_imports: + unique_imports.append('import time\n') + + return template.format(incomplete=incomplete, + description=description, + device_category_camel=self.get_device().get_category().camel, + device_category_under=self.get_device().get_category().under, + device_name_camel=self.get_device().get_name().camel, + device_name_under=self.get_device().get_name().under, + device_name_initial=self.get_device().get_initial_name(), + device_name_long_display=self.get_device().get_long_display_name(), + dummy_uid=self.get_dummy_uid(), + imports=common.wrap_non_empty('\n', ''.join(unique_imports), ''), + functions=common.wrap_non_empty('\n', '\n'.join(functions), ''), + sources='\n' + '\n'.join(sources).replace('\n\r', '').lstrip('\r'), + wait_or_sleep=wait_or_sleep, + cleanups=common.wrap_non_empty('\n\n', '\n'.join(cleanups).replace('\n\r', '').lstrip('\r').rstrip('\n'), '\n')) + +class MicroPythonExampleArgument(common.ExampleArgument): + def get_micropython_source(self): + type_ = self.get_type() + + def helper(value): + if type_ == 'float': + return common.format_float(value) + elif type_ == 'bool': + return str(bool(value)) + elif type_ in ['char', 'string']: + return '"{0}"'.format(value.replace('"', '\\"')) + elif ':bitmask:' in type_: + return common.make_c_like_bitmask(value) + elif type_.endswith(':constant'): + return self.get_value_constant(value).get_micropython_source() + else: + return str(value) + + value = self.get_value() + + if isinstance(value, list): + return '[{0}]'.format(', '.join([helper(item) for item in value])) + + return helper(value) + +class MicroPythonExampleArgumentsMixin(object): + def get_micropython_arguments(self): + return [argument.get_micropython_source() for argument in self.get_arguments()] + +class MicroPythonExampleParameter(common.ExampleParameter): + def get_micropython_source(self): + return self.get_name().under + + def get_micropython_prints(self): + if self.get_type().split(':')[-1] == 'constant': + if self.get_label_name() == None: + return [] + + # FIXME: need to handle multiple labels + assert self.get_label_count() == 1 + + template = '{global_line_prefix} {else_}if {name} == {constant_name}:\n{global_line_prefix} print("{label}: {constant_title}"){comment}' + constant_group = self.get_constant_group() + result = [] + + for constant in constant_group.get_constants(): + result.append(template.format(global_line_prefix=global_line_prefix, + else_='el' if len(result) > 0 else '', + name=self.get_name().under, + label=self.get_label_name(), + constant_name=constant.get_micropython_source(callback=True), + constant_title=constant.get_name().space, + comment=self.get_formatted_comment(' # {0}'))) + + result = ['\r' + '\n'.join(result) + '\r'] + else: + template = '{global_line_prefix} print("{label}: " + {format_prefix}{name}{index}{divisor}{format_suffix}{unit}){comment}' + + if self.get_label_name() == None: + return [] + + if self.get_cardinality() < 0: + return [] # FIXME: streaming + + type_ = self.get_type() + + if ':bitmask:' in type_: + format_prefix = 'format(' + format_suffix = ', "0{0}b")'.format(int(type_.split(':')[2])) + elif type_ in ['char', 'string']: + format_prefix = '' + format_suffix = '' + else: + format_prefix = 'str(' + format_suffix = ')' + + result = [] + + for index in range(self.get_label_count()): + result.append(template.format(global_line_prefix=global_line_prefix, + name=self.get_name().under, + label=self.get_label_name(index=index), + index='[{0}]'.format(index) if self.get_label_count() > 1 else '', + divisor=self.get_formatted_divisor('/{0}'), + unit=self.get_formatted_unit_name(' + " {0}"'), + format_prefix=format_prefix, + format_suffix=format_suffix, + comment=self.get_formatted_comment(' # {0}'))) + + return result + +class MicroPythonExampleResult(common.ExampleResult): + def get_micropython_variable(self): + name = self.get_name().under + + if name == self.get_device().get_initial_name(): + name += '_' + + return name + + def get_micropython_prints(self): + if self.get_type().split(':')[-1] == 'constant': + # FIXME: need to handle multiple labels + assert self.get_label_count() == 1 + + template = '{global_line_prefix} {else_}if {name} == {constant_name}:\n{global_line_prefix} print("{label}: {constant_title}"){comment}' + constant_group = self.get_constant_group() + result = [] + + for constant in constant_group.get_constants(): + result.append(template.format(global_line_prefix=global_line_prefix, + else_='el' if len(result) > 0 else '', + name=self.get_name().under, + label=self.get_label_name(), + constant_name=constant.get_micropython_source(), + constant_title=constant.get_name().space, + comment=self.get_formatted_comment(' # {0}'))) + + result = ['\r' + '\n'.join(result) + '\r'] + else: + template = '{global_line_prefix} print("{label}: " + {format_prefix}{name}{index}{divisor}{format_suffix}{unit}){comment}' + + if self.get_label_name() == None: + return [] + + if self.get_cardinality() < 0: + return [] # FIXME: streaming + + name = self.get_name().under + + if name == self.get_device().get_initial_name(): + name += '_' + + type_ = self.get_type() + + if ':bitmask:' in type_: + format_prefix = 'format(' + format_suffix = ', "0{0}b")'.format(int(type_.split(':')[2])) + elif type_ in ['char', 'string']: + format_prefix = '' + format_suffix = '' + else: + format_prefix = 'str(' + format_suffix = ')' + + result = [] + + for index in range(self.get_label_count()): + result.append(template.format(global_line_prefix=global_line_prefix, + name=name, + label=self.get_label_name(index=index), + index='[{0}]'.format(index) if self.get_label_count() > 1 else '', + divisor=self.get_formatted_divisor('/{0}'), + unit=self.get_formatted_unit_name(' + " {0}"'), + format_prefix=format_prefix, + format_suffix=format_suffix, + comment=self.get_formatted_comment(' # {0}'))) + + return result + +class MicroPythonExampleGetterFunction(common.ExampleGetterFunction, MicroPythonExampleArgumentsMixin): + def get_micropython_imports(self): + return [] + + def get_micropython_function(self): + return None + + def get_micropython_source(self): + template = r"""{global_line_prefix} # Get current {function_name_comment} +{global_line_prefix} {variables} = {device_name}.{function_name_under}({arguments}) +{prints} +""" + variables = [] + prints = [] + + for result in self.get_results(): + variables.append(result.get_micropython_variable()) + prints += result.get_micropython_prints() + + while None in prints: + prints.remove(None) + + if len(prints) > 1: + prints.insert(0, '\b') + + result = template.format(global_line_prefix=global_line_prefix, + device_name=self.get_device().get_initial_name(), + function_name_under=self.get_name().under, + function_name_comment=self.get_comment_name(), + variables=','.join(variables), + prints='\n'.join(prints).replace('\b\n\r', '\n').replace('\b', '').replace('\r\n\r', '\n\n').rstrip('\r').replace('\r', '\n'), + arguments=', '.join(self.get_micropython_arguments())) + + return common.break_string(result, ' ', continuation=' \\', indent_suffix=' ') + +class MicroPythonExampleSetterFunction(common.ExampleSetterFunction, MicroPythonExampleArgumentsMixin): + def get_micropython_imports(self): + return [] + + def get_micropython_function(self): + return None + + def get_micropython_source(self): + template = '{comment1}{global_line_prefix} {device_name}.{function_name}({arguments}){comment2}\n' + + result = template.format(global_line_prefix=global_line_prefix, + device_name=self.get_device().get_initial_name(), + function_name=self.get_name().under, + arguments=','.join(self.get_micropython_arguments()), + comment1=self.get_formatted_comment1(global_line_prefix + ' # {0}\n', '\r', '\n' + global_line_prefix + ' # '), + comment2=self.get_formatted_comment2(' # {0}', '')) + + return common.break_string(result, '.{0}('.format(self.get_name().under)) + +class MicroPythonExampleCallbackFunction(common.ExampleCallbackFunction): + def get_micropython_imports(self): + return [] + + def get_micropython_function(self): + template1A = r"""# Callback function for {function_name_comment} callback +""" + template1B = r"""{override_comment} +""" + template2 = r"""def cb_{function_name_under}({parameters}): +{prints}{extra_message} +""" + override_comment = self.get_formatted_override_comment('# {0}', None, '\n# ') + + if override_comment == None: + template1 = template1A + else: + template1 = template1B + + parameters = [] + prints = [] + + for parameter in self.get_parameters(): + parameters.append(parameter.get_micropython_source()) + prints += parameter.get_micropython_prints() + + while None in prints: + prints.remove(None) + + if len(prints) > 1: + prints.append(' print("")') + + extra_message = self.get_formatted_extra_message(' print("{0}")') + + if len(extra_message) > 0 and len(prints) > 0: + extra_message = '\n' + extra_message + + if len(prints) == 0 and len(extra_message) == 0: + prints_str = ' pass' + else: + prints_str = '\n'.join(prints).replace('\r\n\r', '\n\n').strip('\r').replace('\r', '\n') + + result = template1.format(function_name_comment=self.get_comment_name(), + override_comment=override_comment) + \ + template2.format(function_name_under=self.get_name().under, + parameters=','.join(parameters), + prints=prints_str, + extra_message=extra_message) + + return common.break_string(result, 'cb_{}('.format(self.get_name().under)) + + def get_micropython_source(self): + template1 = r""" # Register {function_name_comment}callbacktofunctioncb_{function_name_under} +""" + template2 = r""" {device_name}.register_callback({device_name}.CALLBACK_{function_name_upper},cb_{function_name_under}) +""" + + result1 = template1.format(function_name_under=self.get_name().under, + function_name_comment=self.get_comment_name()) + result2 = template2.format(device_name=self.get_device().get_initial_name(), + function_name_under=self.get_name().under, + function_name_upper=self.get_name().upper) + + return common.break_string(result1, '# ', indent_tail='# ') + \ + common.break_string(result2, 'register_callback(') + +class MicroPythonExampleCallbackPeriodFunction(common.ExampleCallbackPeriodFunction, MicroPythonExampleArgumentsMixin): + def get_micropython_imports(self): + return [] + + def get_micropython_function(self): + return None + + def get_micropython_source(self): + templateA = r""" # Set period for {function_name_comment} callback to {period_sec_short} ({period_msec}ms) + {device_name}.set_{function_name_under}_period({arguments}{period_msec}) +""" + templateB = r""" # Set period for {function_name_comment} callback to {period_sec_short} ({period_msec}ms) + # Note: The {function_name_comment} callback is only called every {period_sec_long} + # if the {function_name_comment} has changed since the last call! + {device_name}.set_{function_name_under}_callback_period({arguments}{period_msec}) +""" + + if self.get_device().get_name().space.startswith('IMU'): + template = templateA # FIXME: special hack for IMU Brick (2.0) callback behavior and name mismatch + else: + template = templateB + + period_msec, period_sec_short, period_sec_long = self.get_formatted_period() + + return template.format(device_name=self.get_device().get_initial_name(), + function_name_under=self.get_name().under, + function_name_comment=self.get_comment_name(), + arguments=common.wrap_non_empty('', ', '.join(self.get_micropython_arguments()), ', '), + period_msec=period_msec, + period_sec_short=period_sec_short, + period_sec_long=period_sec_long) + +class MicroPythonExampleCallbackThresholdMinimumMaximum(common.ExampleCallbackThresholdMinimumMaximum): + def get_micropython_source(self): + template = '{minimum}, {maximum}' + + return template.format(minimum=self.get_formatted_minimum(), + maximum=self.get_formatted_maximum()) + +class MicroPythonExampleCallbackThresholdFunction(common.ExampleCallbackThresholdFunction, MicroPythonExampleArgumentsMixin): + def get_micropython_imports(self): + return [] + + def get_micropython_function(self): + return None + + def get_micropython_source(self): + template = r""" # Configure threshold for {function_name_comment} "{option_comment}" + {device_name}.set_{function_name_under}_callback_threshold({arguments}"{option_char}", {minimum_maximums}) +""" + minimum_maximums = [] + + for minimum_maximum in self.get_minimum_maximums(): + minimum_maximums.append(minimum_maximum.get_micropython_source()) + + return template.format(device_name=self.get_device().get_initial_name(), + function_name_under=self.get_name().under, + function_name_comment=self.get_comment_name(), + arguments=common.wrap_non_empty('', ', '.join(self.get_micropython_arguments()), ', '), + option_char=self.get_option_char(), + option_comment=self.get_option_comment(), + minimum_maximums=', '.join(minimum_maximums)) + +class MicroPythonExampleCallbackConfigurationFunction(common.ExampleCallbackConfigurationFunction, MicroPythonExampleArgumentsMixin): + def get_micropython_imports(self): + return [] + + def get_micropython_function(self): + return None + + def get_micropython_source(self): + templateA = r""" # Set period for {function_name_comment} callback to {period_sec_short} ({period_msec}ms) + {device_name}.set_{function_name_under}_callback_configuration({arguments}{period_msec}{value_has_to_change}) +""" + templateB = r""" # Set period for {function_name_comment} callback to {period_sec_short} ({period_msec}ms) without a threshold + {device_name}.set_{function_name_under}_callback_configuration({arguments}{period_msec}{value_has_to_change}, "{option_char}", {minimum_maximums}) +""" + templateC = r""" # Configure threshold for {function_name_comment} "{option_comment}" + # with a debounce period of {period_sec_short} ({period_msec}ms) + {device_name}.set_{function_name_under}_callback_configuration({arguments}{period_msec}{value_has_to_change}, "{option_char}", {minimum_maximums}) +""" + + if self.get_option_char() == None: + template = templateA + elif self.get_option_char() == 'x': + template = templateB + else: + template = templateC + + period_msec, period_sec_short, period_sec_long = self.get_formatted_period() + + minimum_maximums = [] + + for minimum_maximum in self.get_minimum_maximums(): + minimum_maximums.append(minimum_maximum.get_micropython_source()) + + return template.format(device_name=self.get_device().get_initial_name(), + function_name_under=self.get_name().under, + function_name_comment=self.get_comment_name(), + arguments=common.wrap_non_empty('', ', '.join(self.get_micropython_arguments()), ', '), + period_msec=period_msec, + period_sec_short=period_sec_short, + period_sec_long=period_sec_long, + value_has_to_change=common.wrap_non_empty(', ', self.get_value_has_to_change('True', 'False', ''), ''), + option_char=self.get_option_char(), + option_comment=self.get_option_comment(), + minimum_maximums=', '.join(minimum_maximums)) + +class MicroPythonExampleSpecialFunction(common.ExampleSpecialFunction): + def get_micropython_imports(self): + if self.get_type() == 'sleep': + return ['import time\n'] + else: + return [] + + def get_micropython_function(self): + return None + + def get_micropython_source(self): + global global_line_prefix + + type_ = self.get_type() + + if type_ == 'empty': + return '' + elif type_ == 'debounce_period': + template = r""" # Get threshold callbacks with a debounce time of {period_sec} ({period_msec}ms) + {device_name_initial}.set_debounce_period({period_msec}) +""" + period_msec, period_sec = self.get_formatted_debounce_period() + + return template.format(device_name_initial=self.get_device().get_initial_name(), + period_msec=period_msec, + period_sec=period_sec) + elif type_ == 'sleep': + template = '{comment1}{global_line_prefix} time.sleep({duration}){comment2}\n' + duration = self.get_sleep_duration() + + if duration % 1000 == 0: + duration //= 1000 + else: + duration /= 1000.0 + + return template.format(global_line_prefix=global_line_prefix, + duration=duration, + comment1=self.get_formatted_sleep_comment1(global_line_prefix + ' # {0}\n', '\r', '\n' + global_line_prefix + ' # '), + comment2=self.get_formatted_sleep_comment2(' # {0}', '')) + elif type_ == 'wait': + return None + elif type_ == 'loop_header': + template = '{comment} for i in range({limit}):\n' + global_line_prefix = ' ' + + return template.format(limit=self.get_loop_header_limit(), + comment=self.get_formatted_loop_header_comment(' # {0}\n', '', '\n # ')) + elif type_ == 'loop_footer': + global_line_prefix = '' + + return '\r' + +class MicroPythonExamplesGenerator(micropython_common.MicroPythonGeneratorTrait, common.ExamplesGenerator): + def get_constant_class(self): + return MicroPythonConstant + + def get_device_class(self): + return micropython_common.MicroPythonDevice + + def get_example_class(self): + return MicroPythonExample + + def get_example_argument_class(self): + return MicroPythonExampleArgument + + def get_example_parameter_class(self): + return MicroPythonExampleParameter + + def get_example_result_class(self): + return MicroPythonExampleResult + + def get_example_getter_function_class(self): + return MicroPythonExampleGetterFunction + + def get_example_setter_function_class(self): + return MicroPythonExampleSetterFunction + + def get_example_callback_function_class(self): + return MicroPythonExampleCallbackFunction + + def get_example_callback_period_function_class(self): + return MicroPythonExampleCallbackPeriodFunction + + def get_example_callback_threshold_minimum_maximum_class(self): + return MicroPythonExampleCallbackThresholdMinimumMaximum + + def get_example_callback_threshold_function_class(self): + return MicroPythonExampleCallbackThresholdFunction + + def get_example_callback_configuration_function_class(self): + return MicroPythonExampleCallbackConfigurationFunction + + def get_example_special_function_class(self): + return MicroPythonExampleSpecialFunction + + def generate(self, device): + if os.getenv('TINKERFORGE_GENERATE_EXAMPLES_FOR_DEVICE', device.get_name().camel) != device.get_name().camel: + common.print_verbose(' \033[01;31m- skipped\033[0m') + return + + examples_dir = self.get_examples_dir(device) + examples = device.get_examples() + + if len(examples) == 0: + common.print_verbose(' \033[01;31m- no examples\033[0m') + return + + if not os.path.exists(examples_dir): + os.makedirs(examples_dir) + + for example in examples: + filename = 'example_{0}.py'.format(example.get_name().under) + filepath = os.path.join(examples_dir, filename) + + if example.is_incomplete(): + if os.path.exists(filepath) and self.skip_existing_incomplete_example: + common.print_verbose(' - ' + filename + ' \033[01;35m(incomplete, skipped)\033[0m') + continue + else: + common.print_verbose(' - ' + filename + ' \033[01;31m(incomplete)\033[0m') + else: + common.print_verbose(' - ' + filename) + + with open(filepath, 'w') as f: + f.write(example.get_micropython_source()) + +def generate(root_dir, language, internal): + common.generate(root_dir, language, internal, MicroPythonExamplesGenerator) + +if __name__ == '__main__': + args = common.dockerize('micropython', __file__, add_internal_argument=True) + + generate(os.getcwd(), 'en', args.internal) diff --git a/micropython/generate_micropython_stubs.py b/micropython/generate_micropython_stubs.py new file mode 100644 index 00000000..f73dbfd2 --- /dev/null +++ b/micropython/generate_micropython_stubs.py @@ -0,0 +1,372 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +MicroPython Stubs Generator +Created by René Rohner +Copyright (C) 2026 Tinkerforge GmbH + +generate_micropython_stubs.py: Generator for MicroPython .pyi type stubs + +This program is free software; you can redistribute it and/or +modify it under the terms of the GNU General Public License +as published by the Free Software Foundation; either version 2 +of the License, or (at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +General Public License for more details. + +You should have received a copy of the GNU General Public +License along with this program; if not, write to the +Free Software Foundation, Inc., 59 Temple Place - Suite 330, +Boston, MA 02111-1307, USA. +""" + +import sys + +if sys.hexversion < 0x3040000: + print('Python >= 3.4 required') + sys.exit(1) + +import os +import importlib.util +import importlib.machinery +import shutil + +def create_generators_module(): + generators_dir = os.path.split(os.path.dirname(os.path.realpath(__file__)))[0] + + if sys.hexversion < 0x3050000: + generators_module = importlib.machinery.SourceFileLoader('generators', os.path.join(generators_dir, '__init__.py')).load_module() + else: + generators_spec = importlib.util.spec_from_file_location('generators', os.path.join(generators_dir, '__init__.py')) + generators_module = importlib.util.module_from_spec(generators_spec) + + generators_spec.loader.exec_module(generators_module) + + sys.modules['generators'] = generators_module + +if 'generators' not in sys.modules: + create_generators_module() + +from generators import common +from generators.micropython import micropython_common + +class MicroPythonStubsDevice(micropython_common.MicroPythonDevice): + def get_stub_imports(self): + imports = ['from typing import Optional, Callable, Sequence'] + imports.append('from collections import namedtuple') + imports.append('from ip_connection import Device, IPConnection, Error') + + return '\n'.join(imports) + '\n' + + def get_stub_namedtuples(self): + tuples = '' + template = """{0} = namedtuple('{1}', [{2}]) +""" + + for packet in self.get_packets('function'): + if len(packet.get_elements(direction='out')) < 2: + continue + + name = packet.get_name() + + if name.space.startswith('Get '): + name_tup = name.camel[3:] + else: + name_tup = name.camel + + params = [] + + for element in packet.get_elements(direction='out'): + params.append("'{0}'".format(element.get_name().under)) + + tuples += template.format(name.camel, name_tup, ", ".join(params)) + + for packet in self.get_packets('function'): + if not packet.has_high_level(): + continue + + if len(packet.get_elements(direction='out', high_level=True)) < 2: + continue + + name = packet.get_name(skip=-2) + + if name.space.startswith('Get '): + name_tup = name.camel[3:] + else: + name_tup = name.camel + + params = [] + + for element in packet.get_elements(direction='out', high_level=True): + params.append("'{0}'".format(element.get_name().under)) + + tuples += template.format(name.camel, name_tup, ", ".join(params)) + + return tuples + + def get_stub_class_header(self): + template = """ +class {0}(Device): + r\"\"\" + {1} + \"\"\" + + DEVICE_IDENTIFIER: int + DEVICE_DISPLAY_NAME: str + DEVICE_URL_PART: str + +""" + + return template.format(self.get_micropython_class_name(), + common.select_lang(self.get_description())) + + def get_stub_callback_id_definitions(self): + lines = '' + + for packet in self.get_packets('callback'): + lines += ' CALLBACK_{0}: int\n'.format(packet.get_name().upper) + + if self.get_long_display_name() == 'RS232 Bricklet': + lines += ' CALLBACK_READ_CALLBACK: int\n' + lines += ' CALLBACK_ERROR_CALLBACK: int\n' + + for packet in self.get_packets('callback'): + if packet.has_high_level(): + lines += ' CALLBACK_{0}: int\n'.format(packet.get_name(skip=-2).upper) + + return lines + + def get_stub_function_id_definitions(self): + lines = '' + + for packet in self.get_packets('function'): + lines += ' FUNCTION_{0}: int\n'.format(packet.get_name().upper) + + return lines + + def get_stub_constants(self): + lines = '' + + for constant_group in self.get_constant_groups(): + if constant_group.is_virtual(): + continue + + for constant in constant_group.get_constants(): + if constant_group.get_type() == 'bool': + lines += ' {0}_{1}: bool\n'.format(constant_group.get_name().upper, constant.get_name().upper) + elif constant_group.get_type() in ['char', 'string']: + lines += ' {0}_{1}: str\n'.format(constant_group.get_name().upper, constant.get_name().upper) + else: + lines += ' {0}_{1}: int\n'.format(constant_group.get_name().upper, constant.get_name().upper) + + return lines + + def get_stub_init_method(self): + return """ + def __init__(self, uid: str, ipcon: IPConnection) -> None: + r\"\"\" + Creates an object with the unique device ID *uid* and adds it to + the IP Connection *ipcon*. + \"\"\" + ... + +""" + + def get_stub_methods(self): + methods = '' + + # normal and low-level + for packet in self.get_packets('function'): + methods += self._get_stub_method(packet) + + # high-level + for packet in self.get_packets('function'): + stream_in = packet.get_high_level('stream_in') + stream_out = packet.get_high_level('stream_out') + + if stream_in is not None or stream_out is not None: + methods += self._get_stub_method(packet, high_level=True) + + return methods + + def _get_stub_method(self, packet, high_level=False): + if high_level: + name = packet.get_name(skip=-2).under + else: + name = packet.get_name().under + + # Build typed parameter list + params = self._get_typed_parameters(packet, high_level) + param_str = ', '.join(['self'] + params) + + # Build return type + ret_type = self._get_return_type(packet, high_level) + + # Get docstring + doc = packet.get_stub_formatted_doc() + + method = ' def {0}({1}) -> {2}:\n'.format(name, param_str, ret_type) + method += ' r"""\n' + method += ' {0}\n'.format(doc) + method += ' """\n' + method += ' ...\n\n' + + return method + + def _get_typed_parameters(self, packet, high_level=False): + params = [] + + for element in packet.get_elements(direction='in', high_level=high_level): + name = element.get_name().under + type_str = self._get_element_type_hint(element, direction='in') + params.append('{0}: {1}'.format(name, type_str)) + + return params + + def _get_return_type(self, packet, high_level=False): + out_elements = packet.get_elements(direction='out', high_level=high_level) + + if len(out_elements) == 0: + return 'None' + + if len(out_elements) > 1: + if high_level: + name = packet.get_name(skip=-2) + else: + name = packet.get_name() + return name.camel + + # Single return value + element = out_elements[0] + return self._get_element_type_hint(element, direction='out') + + def _get_element_type_hint(self, element, direction='in'): + type_map = { + 'int8': 'int', + 'uint8': 'int', + 'int16': 'int', + 'uint16': 'int', + 'int32': 'int', + 'uint32': 'int', + 'int64': 'int', + 'uint64': 'int', + 'float': 'float', + 'bool': 'bool', + 'char': 'str', + 'string': 'str' + } + + base_type = type_map[element.get_type()] + cardinality = element.get_cardinality() + + if element.get_type() == 'string': + return 'str' + + if cardinality == 1: + return base_type + + if abs(cardinality) > 1: + if direction == 'in': + return 'Sequence[{0}]'.format(base_type) + else: + return 'tuple[{0}, ...]'.format(base_type) + + # variable length (negative cardinality from high-level) + if direction == 'in': + return 'Sequence[{0}]'.format(base_type) + else: + return 'list[{0}]'.format(base_type) + + def get_stub_register_callback_method(self): + if len(self.get_packets('callback')) == 0: + return '' + + return """ def register_callback(self, callback_id: int, function: Optional[Callable]) -> None: + r\"\"\" + Registers the given *function* with the given *callback_id*. + \"\"\" + ... + +""" + + def get_stub_old_name(self): + return '\n{0} = {1}\n'.format(self.get_name().camel, self.get_micropython_class_name()) + + def get_stub_source(self): + source = '# -*- coding: utf-8 -*-\n' + source += self.get_generator().get_header_comment('hash') + source += '\n' + source += self.get_stub_imports() + source += '\n' + source += self.get_stub_namedtuples() + source += self.get_stub_class_header() + source += self.get_stub_callback_id_definitions() + source += self.get_stub_function_id_definitions() + source += self.get_stub_constants() + source += self.get_stub_init_method() + source += self.get_stub_methods() + source += self.get_stub_register_callback_method() + + if self.is_brick() or self.is_bricklet(): + source += self.get_stub_old_name() + + return common.strip_trailing_whitespace(source) + +class MicroPythonStubsPacket(micropython_common.MicroPythonPacket): + def get_stub_formatted_doc(self): + text = common.select_lang(self.get_doc_text()) + + def format_parameter(name): + return '``{0}``'.format(name) + + text = common.handle_rst_param(text, format_parameter) + text = common.handle_rst_word(text) + text = common.handle_rst_substitutions(text, self) + text += common.format_since_firmware(self.get_device(), self) + + return '\n '.join(text.strip().split('\n')) + +class MicroPythonStubsGenerator(micropython_common.MicroPythonGeneratorTrait, common.BindingsGenerator): + recreate_bindings_dir = False + + def get_device_class(self): + return MicroPythonStubsDevice + + def get_packet_class(self): + return MicroPythonStubsPacket + + def get_element_class(self): + return micropython_common.MicroPythonElement + + def prepare(self): + self.stubs_dir = os.path.join(self.get_root_dir(), 'stubs') + + if os.path.exists(self.stubs_dir): + shutil.rmtree(self.stubs_dir) + + os.makedirs(self.stubs_dir) + + def generate(self, device): + filename = '{0}_{1}.pyi'.format(device.get_category().under, device.get_name().under) + + with open(os.path.join(self.stubs_dir, filename), 'w') as f: + f.write(device.get_stub_source()) + + def finish(self): + # Copy the ip_connection stub + ip_connection_stub = os.path.join(self.get_root_dir(), 'ip_connection.pyi') + + if os.path.exists(ip_connection_stub): + shutil.copy(ip_connection_stub, self.stubs_dir) + +def generate(root_dir, language, internal): + common.generate(root_dir, language, internal, MicroPythonStubsGenerator) + +if __name__ == '__main__': + args = common.dockerize('micropython', __file__, add_internal_argument=True) + + generate(os.getcwd(), 'en', args.internal) diff --git a/micropython/generate_micropython_zip.py b/micropython/generate_micropython_zip.py new file mode 100644 index 00000000..e8d92917 --- /dev/null +++ b/micropython/generate_micropython_zip.py @@ -0,0 +1,122 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +MicroPython ZIP Generator +Created by René Rohner +Copyright (C) 2026 Tinkerforge GmbH + +generate_micropython_zip.py: Generator for MicroPython ZIP + +This program is free software; you can redistribute it and/or +modify it under the terms of the GNU General Public License +as published by the Free Software Foundation; either version 2 +of the License, or (at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +General Public License for more details. + +You should have received a copy of the GNU General Public +License along with this program; if not, write to the +Free Software Foundation, Inc., 59 Temple Place - Suite 330, +Boston, MA 02111-1307, USA. +""" + +import sys + +if sys.hexversion < 0x3040000: + print('Python >= 3.4 required') + sys.exit(1) + +import os +import shutil +import importlib.util +import importlib.machinery + +def create_generators_module(): + generators_dir = os.path.split(os.path.dirname(os.path.realpath(__file__)))[0] + + if sys.hexversion < 0x3050000: + generators_module = importlib.machinery.SourceFileLoader('generators', os.path.join(generators_dir, '__init__.py')).load_module() + else: + generators_spec = importlib.util.spec_from_file_location('generators', os.path.join(generators_dir, '__init__.py')) + generators_module = importlib.util.module_from_spec(generators_spec) + + generators_spec.loader.exec_module(generators_module) + + sys.modules['generators'] = generators_module + +if 'generators' not in sys.modules: + create_generators_module() + +from generators import common +from generators.micropython import micropython_common + +class MicroPythonZipGenerator(micropython_common.MicroPythonGeneratorTrait, common.ZipGenerator): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + self.tmp_dir = self.get_zip_dir() + self.tmp_source_dir = os.path.join(self.tmp_dir, 'source') + self.tmp_examples_dir = os.path.join(self.tmp_dir, 'examples') + self.tmp_stubs_dir = os.path.join(self.tmp_dir, 'stubs') + + def prepare(self): + super().prepare() + + os.makedirs(self.tmp_source_dir) + os.makedirs(self.tmp_examples_dir) + os.makedirs(self.tmp_stubs_dir) + + def generate(self, device): + if not device.is_released(): + return + + # Copy device examples + tmp_examples_device = os.path.join(self.tmp_examples_dir, + device.get_category().under, + device.get_name().under) + + if not os.path.exists(tmp_examples_device): + os.makedirs(tmp_examples_device) + + for example in common.find_device_examples(device, r'^example_.*\.py$'): + shutil.copy(example[1], tmp_examples_device) + + def finish(self): + root_dir = self.get_root_dir() + + # Copy IP Connection examples + if self.get_config_name().space == 'Tinkerforge': + for example in common.find_examples(root_dir, r'^example_.*\.py$'): + shutil.copy(example[1], self.tmp_examples_dir) + + # Copy bindings and readme (flat layout, no package subdirectory) + for filename in self.get_released_files() + ['device_factory.py']: + shutil.copy(os.path.join(self.get_bindings_dir(), filename), self.tmp_source_dir) + + shutil.copy(os.path.join(root_dir, 'ip_connection.py'), self.tmp_source_dir) + shutil.copy(os.path.join(root_dir, 'changelog.txt'), self.tmp_dir) + shutil.copy(os.path.join(root_dir, 'readme.txt'), self.tmp_dir) + shutil.copy(os.path.join(root_dir, '..', 'configs', 'license.txt'), self.tmp_dir) + + # Copy type stubs for IDE support + stubs_dir = os.path.join(root_dir, 'stubs') + + if os.path.exists(stubs_dir): + for filename in os.listdir(stubs_dir): + if filename.endswith('.pyi'): + shutil.copy(os.path.join(stubs_dir, filename), self.tmp_stubs_dir) + + # Make zip + self.create_zip_file(self.tmp_dir) + +def generate(root_dir, language, internal): + common.generate(root_dir, language, internal, MicroPythonZipGenerator) + +if __name__ == '__main__': + args = common.dockerize('micropython', __file__, add_internal_argument=True) + + generate(os.getcwd(), 'en', args.internal) diff --git a/micropython/ip_connection.py b/micropython/ip_connection.py new file mode 100644 index 00000000..f0d9ec59 --- /dev/null +++ b/micropython/ip_connection.py @@ -0,0 +1,990 @@ +# -*- coding: utf-8 -*- +# Created by René Rohner +# Copyright (C) 2026 Tinkerforge GmbH +# +# Redistribution and use in source and binary forms of this file, +# with or without modification, are permitted. See the Creative +# Commons Zero (CC0 1.0) License for more details. + +# MicroPython IP Connection implementation. +# Designed for ESP32 and other MicroPython-capable boards. +# Uses synchronous/blocking I/O with explicit callback dispatch, +# following the same pattern as the PHP bindings. + +import struct +import socket +import sys +import time +import math +import hashlib + +try: + import hmac +except ImportError: + hmac = None + +try: + import os +except ImportError: + os = None + +try: + from collections import namedtuple +except ImportError: + pass + +def get_uid_from_data(data): + return struct.unpack('> 4) & 0x0F + +def get_error_code_from_data(data): + return (struct.unpack('> 6) & 0x03 + +BASE58 = '123456789abcdefghijkmnopqrstuvwxyzABCDEFGHJKLMNPQRSTUVWXYZ' + +def base58encode(value): + encoded = '' + + while value >= 58: + div, mod = divmod(value, 58) + encoded = BASE58[mod] + encoded + value = div + + return BASE58[value] + encoded + +def base58decode(encoded): + value = 0 + column_multiplier = 1 + + for c in reversed(encoded): + try: + column = BASE58.index(c) + except ValueError: + raise Error(Error.INVALID_UID, 'UID "{0}" contains invalid character'.format(encoded)) + + value += column * column_multiplier + column_multiplier *= 58 + + return value + +def uid64_to_uid32(uid64): + value1 = uid64 & 0xFFFFFFFF + value2 = (uid64 >> 32) & 0xFFFFFFFF + + uid32 = (value1 & 0x00000FFF) + uid32 |= (value1 & 0x0F000000) >> 12 + uid32 |= (value2 & 0x0000003F) << 16 + uid32 |= (value2 & 0x000F0000) << 6 + uid32 |= (value2 & 0x3F000000) << 2 + + return uid32 + +def create_chunk_data(data, chunk_offset, chunk_length, chunk_padding): + chunk_data = data[chunk_offset:chunk_offset + chunk_length] + + if len(chunk_data) < chunk_length: + chunk_data += [chunk_padding] * (chunk_length - len(chunk_data)) + + return chunk_data + +def create_char(value): + if isinstance(value, str) and len(value) == 1 and ord(value) <= 255: + return value + elif isinstance(value, (bytes, bytearray)) and len(value) == 1: + return chr(value[0]) + elif isinstance(value, int) and value >= 0 and value <= 255: + return chr(value) + else: + raise ValueError('Invalid char value: ' + repr(value)) + +def create_char_list(value, expected_type='char list'): + if isinstance(value, list): + return list(map(create_char, value)) + elif isinstance(value, str): + chars = list(value) + + for char in chars: + if ord(char) > 255: + raise ValueError('Invalid {0} value: {1}'.format(expected_type, repr(value))) + + return chars + elif isinstance(value, (bytes, bytearray)): + return list(map(chr, value)) + else: + raise ValueError('Invalid {0} value: {1}'.format(expected_type, repr(value))) + +def create_string(value): + if isinstance(value, str): + for char in value: + if ord(char) > 255: + raise ValueError('Invalid string value: {0}'.format(repr(value))) + + return value + elif isinstance(value, (bytes, bytearray)): + return ''.join(map(chr, value)) + else: + return ''.join(create_char_list(value, expected_type='string')) + +def pack_payload(data, form): + packed = b'' + + for f, d in zip(form.split(' '), data): + if '!' in f: + if len(f) > 1: + if int(f.replace('!', '')) != len(d): + raise ValueError('Incorrect bool list length') + + p = [0] * int(math.ceil(len(d) / 8.0)) + + for i, b in enumerate(d): + if b: + p[i // 8] |= 1 << (i % 8) + + packed += struct.pack('<{0}B'.format(len(p)), *p) + else: + packed += struct.pack(' 1: + packed += struct.pack('<' + bf, *list(map(lambda char: ord(char), d))) + else: + packed += struct.pack('<' + bf, ord(d)) + elif 's' in f: + packed += struct.pack('<' + f, bytes(map(ord, d))) + elif len(f) > 1: + packed += struct.pack('<' + f, *d) + else: + packed += struct.pack('<' + f, d) + + return packed + +def unpack_payload(data, form): + ret = [] + + for f in form.split(' '): + o = f + + if '!' in f: + if len(f) > 1: + f = '{0}B'.format(int(math.ceil(int(f.replace('!', '')) / 8.0))) + else: + f = 'B' + + # MicroPython struct doesn't support 'c', use 'B' (unsigned byte) instead + f = '<' + f.replace('c', 'B') + length = struct.calcsize(f) + x = struct.unpack(f, data[:length]) + + if '!' in o: + y = [] + + if len(o) > 1: + for i in range(int(o.replace('!', ''))): + y.append(x[i // 8] & (1 << (i % 8)) != 0) + else: + y.append(x[0] != 0) + + x = tuple(y) + + if 'c' in o: + if len(o) > 1: + ret.append(tuple(map(lambda item: chr(item), x))) + else: + ret.append(chr(x[0])) + elif 's' in f: + s = ''.join(map(chr, x[0])) + + i = s.find('\x00') + + if i >= 0: + s = s[:i] + + ret.append(s) + elif len(x) > 1: + ret.append(x) + else: + ret.append(x[0]) + + data = data[length:] + + if len(ret) == 1: + return ret[0] + else: + return ret + +class Error(Exception): + TIMEOUT = -1 + NOT_ADDED = -6 # obsolete since v2.0 + ALREADY_CONNECTED = -7 + NOT_CONNECTED = -8 + INVALID_PARAMETER = -9 + NOT_SUPPORTED = -10 + UNKNOWN_ERROR_CODE = -11 + STREAM_OUT_OF_SYNC = -12 + INVALID_UID = -13 + NON_ASCII_CHAR_IN_SECRET = -14 + WRONG_DEVICE_TYPE = -15 + DEVICE_REPLACED = -16 + WRONG_RESPONSE_LENGTH = -17 + + def __init__(self, value, description): + super().__init__('{0} ({1})'.format(description, value)) + + self.value = value + self.description = description + +class Device: + DEVICE_IDENTIFIER_CHECK_PENDING = 0 + DEVICE_IDENTIFIER_CHECK_MATCH = 1 + DEVICE_IDENTIFIER_CHECK_MISMATCH = 2 + + RESPONSE_EXPECTED_INVALID_FUNCTION_ID = 0 + RESPONSE_EXPECTED_ALWAYS_TRUE = 1 # getter + RESPONSE_EXPECTED_TRUE = 2 # setter + RESPONSE_EXPECTED_FALSE = 3 # setter, default + + def __init__(self, uid, ipcon, device_identifier, device_display_name): + uid_ = base58decode(uid) + + if uid_ > (1 << 64) - 1: + raise Error(Error.INVALID_UID, 'UID "{0}" is too big'.format(uid)) + + if uid_ > (1 << 32) - 1: + uid_ = uid64_to_uid32(uid_) + + if uid_ == 0: + raise Error(Error.INVALID_UID, 'UID "{0}" is empty or maps to zero'.format(uid)) + + self.replaced = False + self.uid = uid_ + self.uid_string = uid + self.ipcon = ipcon + self.device_identifier = device_identifier + self.device_display_name = device_display_name + self.device_identifier_check = Device.DEVICE_IDENTIFIER_CHECK_PENDING + self.wrong_device_display_name = '?' + self.api_version = (0, 0, 0) + self.registered_callbacks = {} + self.callback_formats = {} + self.high_level_callbacks = {} + self.pending_callbacks = [] + self.expected_response_function_id = None + self.expected_response_sequence_number = None + self.received_response = None + + self.response_expected = [Device.RESPONSE_EXPECTED_INVALID_FUNCTION_ID] * 256 + self.response_expected[IPConnection.FUNCTION_ADC_CALIBRATE] = Device.RESPONSE_EXPECTED_ALWAYS_TRUE + self.response_expected[IPConnection.FUNCTION_GET_ADC_CALIBRATION] = Device.RESPONSE_EXPECTED_ALWAYS_TRUE + self.response_expected[IPConnection.FUNCTION_READ_BRICKLET_UID] = Device.RESPONSE_EXPECTED_ALWAYS_TRUE + self.response_expected[IPConnection.FUNCTION_WRITE_BRICKLET_UID] = Device.RESPONSE_EXPECTED_ALWAYS_TRUE + + def get_api_version(self): + """ + Returns the API version (major, minor, revision) of the bindings for + this device. + """ + return self.api_version + + def get_response_expected(self, function_id): + """ + Returns the response expected flag for the function specified by the + *function_id* parameter. It is *true* if the function is expected to + send a response, *false* otherwise. + """ + if function_id < 0 or function_id >= len(self.response_expected): + raise ValueError('Function ID {0} out of range'.format(function_id)) + + flag = self.response_expected[function_id] + + if flag == Device.RESPONSE_EXPECTED_INVALID_FUNCTION_ID: + raise ValueError('Invalid function ID {0}'.format(function_id)) + + return flag in [Device.RESPONSE_EXPECTED_ALWAYS_TRUE, Device.RESPONSE_EXPECTED_TRUE] + + def set_response_expected(self, function_id, response_expected): + """ + Changes the response expected flag of the function specified by the + *function_id* parameter. This flag can only be changed for setter + (default value: *false*) and callback configuration functions + (default value: *true*). For getter functions it is always enabled. + """ + if function_id < 0 or function_id >= len(self.response_expected): + raise ValueError('Function ID {0} out of range'.format(function_id)) + + flag = self.response_expected[function_id] + + if flag == Device.RESPONSE_EXPECTED_INVALID_FUNCTION_ID: + raise ValueError('Invalid function ID {0}'.format(function_id)) + + if flag == Device.RESPONSE_EXPECTED_ALWAYS_TRUE: + raise ValueError('Response Expected flag cannot be changed for function ID {0}'.format(function_id)) + + if bool(response_expected): + self.response_expected[function_id] = Device.RESPONSE_EXPECTED_TRUE + else: + self.response_expected[function_id] = Device.RESPONSE_EXPECTED_FALSE + + def set_response_expected_all(self, response_expected): + """ + Changes the response expected flag for all setter and callback + configuration functions of this device at once. + """ + if bool(response_expected): + flag = Device.RESPONSE_EXPECTED_TRUE + else: + flag = Device.RESPONSE_EXPECTED_FALSE + + for i in range(len(self.response_expected)): + if self.response_expected[i] in [Device.RESPONSE_EXPECTED_TRUE, Device.RESPONSE_EXPECTED_FALSE]: + self.response_expected[i] = flag + + def check_validity(self): + if self.replaced: + raise Error(Error.DEVICE_REPLACED, 'Device has been replaced') + + if self.device_identifier < 0: + return + + if self.device_identifier_check == Device.DEVICE_IDENTIFIER_CHECK_MATCH: + return + + if self.device_identifier_check == Device.DEVICE_IDENTIFIER_CHECK_PENDING: + device_identifier = self.ipcon.send_request(self, 255, (), '', 33, '8s 8s c 3B 3B H')[5] # .get_identity + + if device_identifier == self.device_identifier: + self.device_identifier_check = Device.DEVICE_IDENTIFIER_CHECK_MATCH + else: + self.device_identifier_check = Device.DEVICE_IDENTIFIER_CHECK_MISMATCH + self.wrong_device_display_name = str(device_identifier) + + if self.device_identifier_check == Device.DEVICE_IDENTIFIER_CHECK_MISMATCH: + raise Error(Error.WRONG_DEVICE_TYPE, + 'UID {0} belongs to a {1} instead of the expected {2}' + .format(self.uid_string, self.wrong_device_display_name, self.device_display_name)) + + def _dispatch_pending_callbacks(self): + """Dispatch all pending callbacks for this device.""" + pending = self.pending_callbacks + self.pending_callbacks = [] + + for packet in pending: + if self.ipcon.socket is None: + break + + try: + self.check_validity() + except: + continue # silently ignoring callbacks from mismatching devices + + self.ipcon.dispatch_packet(packet) + +class BrickDaemon(Device): + FUNCTION_GET_AUTHENTICATION_NONCE = 1 + FUNCTION_AUTHENTICATE = 2 + + def __init__(self, uid, ipcon): + Device.__init__(self, uid, ipcon, 0, 'Brick Daemon') + + self.api_version = (2, 0, 0) + + self.response_expected[BrickDaemon.FUNCTION_GET_AUTHENTICATION_NONCE] = BrickDaemon.RESPONSE_EXPECTED_ALWAYS_TRUE + self.response_expected[BrickDaemon.FUNCTION_AUTHENTICATE] = BrickDaemon.RESPONSE_EXPECTED_TRUE + + ipcon.add_device(self) + + def get_authentication_nonce(self): + return self.ipcon.send_request(self, BrickDaemon.FUNCTION_GET_AUTHENTICATION_NONCE, (), '', 12, '4B') + + def authenticate(self, client_nonce, digest): + self.ipcon.send_request(self, BrickDaemon.FUNCTION_AUTHENTICATE, (client_nonce, digest), '4B 20B', 0, '') + +class IPConnection: + FUNCTION_ENUMERATE = 254 + FUNCTION_ADC_CALIBRATE = 251 + FUNCTION_GET_ADC_CALIBRATION = 250 + FUNCTION_READ_BRICKLET_UID = 249 + FUNCTION_WRITE_BRICKLET_UID = 248 + FUNCTION_DISCONNECT_PROBE = 128 + + CALLBACK_ENUMERATE = 253 + CALLBACK_CONNECTED = 0 + CALLBACK_DISCONNECTED = 1 + + BROADCAST_UID = 0 + + # enumeration_type parameter to the enumerate callback + ENUMERATION_TYPE_AVAILABLE = 0 + ENUMERATION_TYPE_CONNECTED = 1 + ENUMERATION_TYPE_DISCONNECTED = 2 + + # connect_reason parameter to the connected callback + CONNECT_REASON_REQUEST = 0 + + # disconnect_reason parameter to the disconnected callback + DISCONNECT_REASON_REQUEST = 0 + DISCONNECT_REASON_ERROR = 1 + DISCONNECT_REASON_SHUTDOWN = 2 + + # returned by get_connection_state + CONNECTION_STATE_DISCONNECTED = 0 + CONNECTION_STATE_CONNECTED = 1 + + DISCONNECT_PROBE_INTERVAL = 5 + + def __init__(self): + """ + Creates an IP Connection object that can be used to enumerate the + available devices. It is also required for the constructor of Bricks + and Bricklets. + """ + self.host = None + self.port = None + self.timeout = 2.5 + self.next_sequence_number = 0 + self.next_authentication_nonce = 0 + self.devices = {} + self.registered_callbacks = {} + self.socket = None + self.pending_data = b'' + self.pending_callbacks = [] # for enumerate callbacks + self.next_disconnect_probe = 0 + self.disconnect_probe_request = None + self.brickd = BrickDaemon('2', self) + + def connect(self, host, port): + """ + Creates a TCP/IP connection to the given *host* and *port*. The host + and port can point to a Brick Daemon or to a WIFI/Ethernet Extension. + + Devices can only be controlled when the connection was established + successfully. + + Blocks until the connection is established and throws an exception if + there is no Brick Daemon or WIFI/Ethernet Extension listening at the + given host and port. + """ + if self.socket is not None: + raise Error(Error.ALREADY_CONNECTED, + 'Already connected to {0}:{1}'.format(self.host, self.port)) + + self.host = host + self.port = port + + try: + addr = socket.getaddrinfo(host, port)[0][-1] + self.socket = socket.socket() + self.socket.connect(addr) + except Exception as e: + if self.socket is not None: + try: + self.socket.close() + except: + pass + self.socket = None + raise + + # Pre-build disconnect probe request + self.disconnect_probe_request, _, _ = self.create_packet_header(None, 8, IPConnection.FUNCTION_DISCONNECT_PROBE) + self.next_disconnect_probe = time.time() + IPConnection.DISCONNECT_PROBE_INTERVAL + self.pending_data = b'' + + cb = self.registered_callbacks.get(IPConnection.CALLBACK_CONNECTED) + if cb is not None: + cb(IPConnection.CONNECT_REASON_REQUEST) + + def disconnect(self): + """ + Disconnects the TCP/IP connection from the Brick Daemon or the + WIFI/Ethernet Extension. + """ + if self.socket is None: + raise Error(Error.NOT_CONNECTED, 'Not connected') + + disconnect_reason = IPConnection.DISCONNECT_REASON_REQUEST + + try: + self.socket.close() + except: + pass + + self.socket = None + + cb = self.registered_callbacks.get(IPConnection.CALLBACK_DISCONNECTED) + if cb is not None: + cb(disconnect_reason) + + def authenticate(self, secret): + """ + Performs an authentication handshake with the connected Brick Daemon or + WIFI/Ethernet Extension. + """ + if hmac is None: + raise Error(Error.NOT_SUPPORTED, 'authenticate requires the hmac module which is not available in this MicroPython build') + + try: + secret_bytes = secret.encode('ascii') + except UnicodeEncodeError: + raise Error(Error.NON_ASCII_CHAR_IN_SECRET, 'Authentication secret contains non-ASCII characters') + + if self.next_authentication_nonce == 0: + try: + self.next_authentication_nonce = struct.unpack('> 6) & 0xFFFFFFFF) + subseconds + + server_nonce = self.brickd.get_authentication_nonce() + client_nonce = struct.unpack('<4B', struct.pack(' 0: + payload = pack_payload(data, form) + else: + payload = b'' + + header, response_expected, sequence_number = self.create_packet_header(device, 8 + len(payload), function_id) + request = header + payload + + if response_expected: + device.expected_response_function_id = function_id + device.expected_response_sequence_number = sequence_number + device.received_response = None + + try: + self._send(request) + self._receive(self.timeout, device, False) + finally: + device.expected_response_function_id = None + device.expected_response_sequence_number = None + + if device.received_response is None: + msg = 'Did not receive response for function {0} in time'.format(function_id) + raise Error(Error.TIMEOUT, msg) + + response = device.received_response + device.received_response = None + + error_code = get_error_code_from_data(response) + + if error_code == 0: + if length_ret == 0: + length_ret = 8 # setter with response-expected enabled + + if len(response) != length_ret: + msg = 'Expected response of {0} byte for function ID {1}, got {2} byte instead' \ + .format(length_ret, function_id, len(response)) + raise Error(Error.WRONG_RESPONSE_LENGTH, msg) + elif error_code == 1: + msg = 'Got invalid parameter for function {0}'.format(function_id) + raise Error(Error.INVALID_PARAMETER, msg) + elif error_code == 2: + msg = 'Function {0} is not supported'.format(function_id) + raise Error(Error.NOT_SUPPORTED, msg) + else: + msg = 'Function {0} returned an unknown error'.format(function_id) + raise Error(Error.UNKNOWN_ERROR_CODE, msg) + + if len(form_ret) > 0: + return unpack_payload(response[8:], form_ret) + else: + self._send(request) + + def _send(self, packet): + if self.socket is None: + raise Error(Error.NOT_CONNECTED, 'Not connected') + + try: + self.socket.send(packet) + except OSError: + self._handle_disconnect(IPConnection.DISCONNECT_REASON_ERROR) + raise Error(Error.NOT_CONNECTED, 'Not connected') + + self.next_disconnect_probe = time.time() + IPConnection.DISCONNECT_PROBE_INTERVAL + + def _receive(self, seconds, device, direct_callback_dispatch): + if seconds < 0: + seconds = 0 + + start = time.time() + end = start + seconds + + while True: + if self.socket is None: + return + + now = time.time() + + # Send disconnect probe if needed + if self.disconnect_probe_request is not None and \ + (self.next_disconnect_probe < now or + (self.next_disconnect_probe - now) > IPConnection.DISCONNECT_PROBE_INTERVAL): + try: + self.socket.send(self.disconnect_probe_request) + except OSError: + self._handle_disconnect(IPConnection.DISCONNECT_REASON_ERROR) + return + + now = time.time() + self.next_disconnect_probe = now + IPConnection.DISCONNECT_PROBE_INTERVAL + + timeout = end - now + + if timeout < 0: + timeout = 0 + + # Set socket timeout for this receive cycle + try: + self.socket.settimeout(timeout) + except OSError: + self._handle_disconnect(IPConnection.DISCONNECT_REASON_ERROR) + return + + try: + data = self.socket.recv(8192) + except OSError as e: + # Check for timeout + if hasattr(e, 'errno'): + import errno + if e.errno == errno.ETIMEDOUT or e.errno == errno.EAGAIN: + if device is not None and device.received_response is not None: + return + now = time.time() + if now >= end and now >= start: + return + continue + # On MicroPython, timeout raises OSError with ETIMEDOUT + # Try to detect timeout by checking time + now = time.time() + if now >= end and now >= start: + return + continue + + if len(data) == 0: + self._handle_disconnect(IPConnection.DISCONNECT_REASON_SHUTDOWN) + return + + self.pending_data += data + + while True: + if len(self.pending_data) < 8: + break + + length = get_length_from_data(self.pending_data) + + if len(self.pending_data) < length: + break + + packet = self.pending_data[0:length] + self.pending_data = self.pending_data[length:] + + self._handle_response(packet, direct_callback_dispatch) + + if device is not None and device.received_response is not None: + return + + now = time.time() + if now >= end and now >= start: + return + + def _handle_response(self, packet, direct_callback_dispatch): + self.next_disconnect_probe = time.time() + IPConnection.DISCONNECT_PROBE_INTERVAL + + function_id = get_function_id_from_data(packet) + sequence_number = get_sequence_number_from_data(packet) + + if sequence_number == 0 and function_id == IPConnection.CALLBACK_ENUMERATE: + if IPConnection.CALLBACK_ENUMERATE in self.registered_callbacks: + if direct_callback_dispatch: + if self.socket is None: + return + self.dispatch_packet(packet) + else: + self.pending_callbacks.append(packet) + return + + uid = get_uid_from_data(packet) + device = self.devices.get(uid) + + if device is None: + return # Response from an unknown device, ignoring it + + if sequence_number == 0: + if function_id in device.registered_callbacks or \ + -function_id in device.high_level_callbacks: + if direct_callback_dispatch: + if self.socket is None: + return + self.dispatch_packet(packet) + else: + device.pending_callbacks.append(packet) + return + + if device.expected_response_function_id == function_id and \ + device.expected_response_sequence_number == sequence_number: + device.received_response = packet + return + + # Response seems to be OK, but can't be handled + + def dispatch_packet(self, packet): + uid = get_uid_from_data(packet) + function_id = get_function_id_from_data(packet) + payload = packet[8:] + + if function_id == IPConnection.CALLBACK_ENUMERATE: + cb = self.registered_callbacks.get(IPConnection.CALLBACK_ENUMERATE) + + if cb is None: + return + + if len(packet) != 34: + return # silently ignoring callback with wrong length + + uid_str, connected_uid, position, hardware_version, \ + firmware_version, device_identifier, enumeration_type = \ + unpack_payload(payload, '8s 8s c 3B 3B H B') + + cb(uid_str, connected_uid, position, hardware_version, + firmware_version, device_identifier, enumeration_type) + + return + + device = self.devices.get(uid) + + if device is None: + return + + try: + device.check_validity() + except Error: + return # silently ignoring callback for invalid device + + if -function_id in device.high_level_callbacks: + hlcb = device.high_level_callbacks[-function_id] # [roles, options, data] + length, form = device.callback_formats[function_id] + + if len(packet) != length: + return # silently ignoring callback with wrong length + + llvalues = unpack_payload(payload, form) + has_data = False + data = None + + if hlcb[1]['fixed_length'] is not None: + length = hlcb[1]['fixed_length'] + else: + length = llvalues[hlcb[0].index('stream_length')] + + if not hlcb[1]['single_chunk']: + chunk_offset = llvalues[hlcb[0].index('stream_chunk_offset')] + else: + chunk_offset = 0 + + chunk_data = llvalues[hlcb[0].index('stream_chunk_data')] + + if hlcb[2] is None: # no stream in-progress + if chunk_offset == 0: # stream starts + hlcb[2] = chunk_data + + if len(hlcb[2]) >= length: # stream complete + has_data = True + data = hlcb[2][:length] + hlcb[2] = None + else: # ignore tail of current stream, wait for next stream start + pass + else: # stream in-progress + if chunk_offset != len(hlcb[2]): # stream out-of-sync + has_data = True + data = None + hlcb[2] = None + else: # stream in-sync + hlcb[2] += chunk_data + + if len(hlcb[2]) >= length: # stream complete + has_data = True + data = hlcb[2][:length] + hlcb[2] = None + + cb = device.registered_callbacks.get(-function_id) + + if has_data and cb is not None: + result = [] + + for role, llvalue in zip(hlcb[0], llvalues): + if role == 'stream_chunk_data': + result.append(data) + elif role is None: + result.append(llvalue) + + cb(*tuple(result)) + + cb = device.registered_callbacks.get(function_id) + + if cb is not None: + length, form = device.callback_formats.get(function_id, (None, None)) + + if length is None: + return # silently ignore registered but unknown callback + + if len(packet) != length: + return # silently ignoring callback with wrong length + + if len(form) == 0: + cb() + elif ' ' not in form: + cb(unpack_payload(payload, form)) + else: + cb(*unpack_payload(payload, form)) + + def _dispatch_pending_callbacks(self): + """Dispatch all pending IPConnection-level and device-level callbacks.""" + # Dispatch IPConnection-level pending callbacks (enumerate) + pending = self.pending_callbacks + self.pending_callbacks = [] + + for packet in pending: + if self.socket is None: + break + self.dispatch_packet(packet) + + # Dispatch device-level pending callbacks + for device in list(self.devices.values()): + if self.socket is None: + break + device._dispatch_pending_callbacks() + + def _handle_disconnect(self, disconnect_reason): + if self.socket is not None: + try: + self.socket.close() + except: + pass + self.socket = None + + cb = self.registered_callbacks.get(IPConnection.CALLBACK_DISCONNECTED) + if cb is not None: + cb(disconnect_reason) + + def get_next_sequence_number(self): + sequence_number = self.next_sequence_number + 1 + self.next_sequence_number = sequence_number % 15 + return sequence_number + + def create_packet_header(self, device, length, function_id): + uid = IPConnection.BROADCAST_UID + sequence_number = self.get_next_sequence_number() + r_bit = 0 + + if device is not None: + uid = device.uid + + if device.get_response_expected(function_id): + r_bit = 1 + + sequence_number_and_options = (sequence_number << 4) | (r_bit << 3) + + return (struct.pack(' int: ... +def get_length_from_data(data: bytes) -> int: ... +def get_function_id_from_data(data: bytes) -> int: ... +def get_sequence_number_from_data(data: bytes) -> int: ... +def get_error_code_from_data(data: bytes) -> int: ... + +def base58encode(value: int) -> str: ... +def base58decode(encoded: str) -> int: ... +def uid64_to_uid32(uid64: int) -> int: ... + +def create_chunk_data(data: Sequence, chunk_offset: int, chunk_length: int, chunk_padding: Any) -> list: ... +def create_char(value: Union[str, bytes, bytearray, int]) -> str: + """ + Tries to convert the given value to a single character string. + """ + ... + +def create_char_list(value: Union[list, str, bytes, bytearray], expected_type: str = 'char list') -> list[str]: + """ + Tries to convert the given value to a list of single character strings. + """ + ... + +def create_string(value: Union[str, bytes, bytearray, list]) -> str: + """ + Tries to convert the given value to a string. + """ + ... + +def pack_payload(data: tuple, form: str) -> bytes: ... +def unpack_payload(data: bytes, form: str) -> Any: ... + +class Error(Exception): + TIMEOUT: int + NOT_ADDED: int + ALREADY_CONNECTED: int + NOT_CONNECTED: int + INVALID_PARAMETER: int + NOT_SUPPORTED: int + UNKNOWN_ERROR_CODE: int + STREAM_OUT_OF_SYNC: int + INVALID_UID: int + NON_ASCII_CHAR_IN_SECRET: int + WRONG_DEVICE_TYPE: int + DEVICE_REPLACED: int + WRONG_RESPONSE_LENGTH: int + + value: int + description: str + + def __init__(self, value: int, description: str) -> None: ... + +class Device: + DEVICE_IDENTIFIER_CHECK_PENDING: int + DEVICE_IDENTIFIER_CHECK_MATCH: int + DEVICE_IDENTIFIER_CHECK_MISMATCH: int + + RESPONSE_EXPECTED_INVALID_FUNCTION_ID: int + RESPONSE_EXPECTED_ALWAYS_TRUE: int + RESPONSE_EXPECTED_TRUE: int + RESPONSE_EXPECTED_FALSE: int + + uid: int + uid_string: str + ipcon: 'IPConnection' + device_identifier: int + device_display_name: str + api_version: tuple[int, int, int] + registered_callbacks: dict + callback_formats: dict + high_level_callbacks: dict + response_expected: list[int] + + def __init__(self, uid: str, ipcon: 'IPConnection', device_identifier: int, device_display_name: str) -> None: ... + + def get_api_version(self) -> tuple[int, int, int]: + """ + Returns the API version (major, minor, revision) of the bindings for + this device. + """ + ... + + def get_response_expected(self, function_id: int) -> bool: + """ + Returns the response expected flag for the function specified by the + *function_id* parameter. It is *true* if the function is expected to + send a response, *false* otherwise. + """ + ... + + def set_response_expected(self, function_id: int, response_expected: bool) -> None: + """ + Changes the response expected flag of the function specified by the + *function_id* parameter. This flag can only be changed for setter + (default value: *false*) and callback configuration functions + (default value: *true*). For getter functions it is always enabled. + """ + ... + + def set_response_expected_all(self, response_expected: bool) -> None: + """ + Changes the response expected flag for all setter and callback + configuration functions of this device at once. + """ + ... + + def check_validity(self) -> None: ... + +class IPConnection: + FUNCTION_ENUMERATE: int + FUNCTION_ADC_CALIBRATE: int + FUNCTION_GET_ADC_CALIBRATION: int + FUNCTION_READ_BRICKLET_UID: int + FUNCTION_WRITE_BRICKLET_UID: int + FUNCTION_DISCONNECT_PROBE: int + + CALLBACK_ENUMERATE: int + CALLBACK_CONNECTED: int + CALLBACK_DISCONNECTED: int + + BROADCAST_UID: int + + ENUMERATION_TYPE_AVAILABLE: int + ENUMERATION_TYPE_CONNECTED: int + ENUMERATION_TYPE_DISCONNECTED: int + + CONNECT_REASON_REQUEST: int + + DISCONNECT_REASON_REQUEST: int + DISCONNECT_REASON_ERROR: int + DISCONNECT_REASON_SHUTDOWN: int + + CONNECTION_STATE_DISCONNECTED: int + CONNECTION_STATE_CONNECTED: int + + DISCONNECT_PROBE_INTERVAL: int + + host: Optional[str] + port: Optional[int] + timeout: float + + def __init__(self) -> None: + """ + Creates an IP Connection object that can be used to enumerate the + available devices. It is also required for the constructor of Bricks + and Bricklets. + """ + ... + + def connect(self, host: str, port: int) -> None: + """ + Creates a TCP/IP connection to the given *host* and *port*. The host + and port can point to a Brick Daemon or to a WIFI/Ethernet Extension. + + Devices can only be controlled when the connection was established + successfully. + + Blocks until the connection is established and throws an exception if + there is no Brick Daemon or WIFI/Ethernet Extension listening at the + given host and port. + """ + ... + + def disconnect(self) -> None: + """ + Disconnects the TCP/IP connection from the Brick Daemon or the + WIFI/Ethernet Extension. + """ + ... + + def authenticate(self, secret: str) -> None: + """ + Performs an authentication handshake with the connected Brick Daemon or + WIFI/Ethernet Extension. + """ + ... + + def get_connection_state(self) -> int: + """ + Can return the following states: + + - CONNECTION_STATE_DISCONNECTED: No connection is established. + - CONNECTION_STATE_CONNECTED: A connection to the Brick Daemon or + the WIFI/Ethernet Extension is established. + """ + ... + + def set_timeout(self, timeout: float) -> None: + """ + Sets the timeout in seconds for getters and for setters for which the + response expected flag is activated. + + Default timeout is 2.5. + """ + ... + + def get_timeout(self) -> float: + """ + Returns the timeout as set by set_timeout. + """ + ... + + def enumerate(self) -> None: + """ + Broadcasts an enumerate request. All devices will respond with an + enumerate callback. + """ + ... + + def register_callback(self, callback_id: int, function: Optional[Callable]) -> None: + """ + Registers the given *function* with the given *callback_id*. + """ + ... + + def dispatch_callbacks(self, seconds: float) -> None: + """ + Dispatches incoming callbacks for the given amount of time in seconds + (negative value means infinity). Because MicroPython doesn't support + threads you need to call this method periodically to ensure that + incoming callbacks are handled. If you don't use callbacks you don't + need to call this method. + + The recommended dispatch time is 0. This will just dispatch all pending + callbacks without waiting for further callbacks. + """ + ... diff --git a/micropython/micropython_common.py b/micropython/micropython_common.py new file mode 100644 index 00000000..a91bcee8 --- /dev/null +++ b/micropython/micropython_common.py @@ -0,0 +1,182 @@ +# -*- coding: utf-8 -*- + +""" +MicroPython Generator +Created by René Rohner +Copyright (C) 2026 Tinkerforge GmbH + +micropython_common.py: Common library for generation of MicroPython bindings and documentation + +This program is free software; you can redistribute it and/or +modify it under the terms of the GNU General Public License +as published by the Free Software Foundation; either version 2 +of the License, or (at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +General Public License for more details. + +You should have received a copy of the GNU General Public +License along with this program; if not, write to the +Free Software Foundation, Inc., 59 Temple Place - Suite 330, +Boston, MA 02111-1307, USA. +""" + +from generators import common + +class MicroPythonDevice(common.Device): + def get_micropython_import_name(self): + return self.get_category().under + '_' + self.get_name().under + + def get_micropython_class_name(self): + return self.get_category().camel + self.get_name().camel + +class MicroPythonPacket(common.Packet): + def get_micropython_parameters(self, high_level=False): + parameters = [] + + for element in self.get_elements(direction='in', high_level=high_level): + parameters.append(element.get_name().under) + + return ', '.join(parameters) + +class MicroPythonElement(common.Element): + micropython_types = { + 'int8': 'int', + 'uint8': 'int', + 'int16': 'int', + 'uint16': 'int', + 'int32': 'int', + 'uint32': 'int', + 'int64': 'int', + 'uint64': 'int', + 'float': 'float', + 'bool': 'bool', + 'char': 'chr', + 'string': 'str' + } + + micropython_struct_formats = { + 'int8': 'b', + 'uint8': 'B', + 'int16': 'h', + 'uint16': 'H', + 'int32': 'i', + 'uint32': 'I', + 'int64': 'q', + 'uint64': 'Q', + 'float': 'f', + 'bool': '!', + 'char': 'c', + 'string': 's' + } + + micropython_default_item_values = { + 'int8': '0', + 'uint8': '0', + 'int16': '0', + 'uint16': '0', + 'int32': '0', + 'uint32': '0', + 'int64': '0', + 'uint64': '0', + 'float': '0.0', + 'bool': 'False', + 'char': "'\\0'", + 'string': None + } + + micropython_parameter_coercions = { + 'int8': ('int({0})', 'list(map(int, {0}))'), + 'uint8': ('int({0})', 'list(map(int, {0}))'), + 'int16': ('int({0})', 'list(map(int, {0}))'), + 'uint16': ('int({0})', 'list(map(int, {0}))'), + 'int32': ('int({0})', 'list(map(int, {0}))'), + 'uint32': ('int({0})', 'list(map(int, {0}))'), + 'int64': ('int({0})', 'list(map(int, {0}))'), + 'uint64': ('int({0})', 'list(map(int, {0}))'), + 'float': ('float({0})', 'list(map(float, {0}))'), + 'bool': ('bool({0})', 'list(map(bool, {0}))'), + 'char': ('create_char({0})', 'create_char_list({0})'), + 'string': ('create_string({0})', 'create_string({0})') + } + + def format_value(self, value): + if isinstance(value, list): + result = [] + + for subvalue in value: + result.append(self.format_value(subvalue)) + + return '[{0}]'.format(', '.join(result)) + + type_ = self.get_type() + + if type_ == 'float': + return common.format_float(value) + + if type_ == 'bool': + return str(bool(value)) + + if type_ in ['char', 'string']: + return '"{0}"'.format(value.replace('"', '\\"')) + + return str(value) + + def get_micropython_name(self, index=None): + return self.get_name(index=index).under + + def get_micropython_type(self, cardinality=None): + assert cardinality == None or (isinstance(cardinality, int) and cardinality > 0), cardinality + + micropython_type = MicroPythonElement.micropython_types[self.get_type()] + + if cardinality == None: + cardinality = self.get_cardinality() + + if cardinality == 1 or self.get_type() == 'string': + return micropython_type + + return '[{0}, ...]'.format(micropython_type) + + def get_micropython_struct_format(self): + f = MicroPythonElement.micropython_struct_formats[self.get_type()] + cardinality = self.get_cardinality() + + if cardinality > 1: + f = str(cardinality) + f + + return f + + def get_micropython_default_item_value(self): + value = MicroPythonElement.micropython_default_item_values[self.get_type()] + + if value == None: + raise common.GeneratorError('Invalid array item type: ' + self.get_type()) + + return value + + def get_micropython_parameter_coercion(self): + coercion = MicroPythonElement.micropython_parameter_coercions[self.get_type()] + + if self.get_cardinality() == 1: + return coercion[0] + else: + return coercion[1] + +class MicroPythonGeneratorTrait: + def get_bindings_name(self): + return 'micropython' + + def get_bindings_display_name(self): + return 'MicroPython' + + def get_doc_null_value_name(self): + return 'None' + + def get_doc_formatted_param(self, element): + return element.get_name().under + + def generates_high_level_callbacks(self): + return True diff --git a/micropython/readme.txt b/micropython/readme.txt new file mode 100644 index 00000000..17f81dda --- /dev/null +++ b/micropython/readme.txt @@ -0,0 +1,54 @@ +This ZIP contains the MicroPython bindings for all Tinkerforge Bricks and +Bricklets. The bindings are designed for MicroPython on ESP32 and other +MicroPython-capable boards. + +The ZIP file for the bindings is structured as follows: + + source/ -- source code of the bindings + examples/ -- examples for every supported Brick and Bricklet + stubs/ -- .pyi type stubs for IDE code completion + +The source/ folder contains the ip_connection.py and all device bindings as +flat Python modules. To use the bindings, copy ip_connection.py and the device +bindings you need to your MicroPython board. + +You can copy files to your board using tools such as: + - mpremote (recommended): mpremote cp source/ip_connection.py : + - Thonny IDE: Use the file browser to upload files + - ampy: ampy --port /dev/ttyUSB0 put source/ip_connection.py + +For WiFi-capable boards (e.g. ESP32), connect to your network first: + + import network + wlan = network.WLAN(network.STA_IF) + wlan.active(True) + wlan.connect("YOUR_SSID", "YOUR_PASSWORD") + +If you want to use authentication (ipcon.authenticate()), the hmac module is +required. Most MicroPython builds do not include it by default. Install it +using MicroPython's package manager (requires network connection): + + import mip + mip.install("hmac") + +Because MicroPython does not support threads, you need to call +ipcon.dispatch_callbacks(seconds) periodically if you want to receive +callbacks. The recommended dispatch time is 0. This will just dispatch all +pending callbacks without waiting for further callbacks. + +To reduce file size on your board, you can compile .py files to .mpy bytecode +using mpy-cross. Make sure the mpy-cross version matches your MicroPython +firmware version. + +See the documentation for details: + + pip install mpy-cross==1.23.0 # adjust version to match your firmware + mpy-cross source/bricklet_temperature_v2.py + +The stubs/ folder contains .pyi type stub files for IDE code completion and +type checking (e.g. VS Code with Pylance). Add the stubs/ folder to your IDE's +analysis paths. These files are not needed on the board. + +Documentation for the API can be found at: + + https://www.tinkerforge.com/en/doc/Software/API_Bindings_MicroPython.html diff --git a/micropython/test_micropython_bindings.py b/micropython/test_micropython_bindings.py new file mode 100644 index 00000000..10cf42b5 --- /dev/null +++ b/micropython/test_micropython_bindings.py @@ -0,0 +1,76 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +MicroPython Bindings Tester +Created by René Rohner +Copyright (C) 2026 Tinkerforge GmbH + +test_micropython_bindings.py: Tests the MicroPython bindings + +This program is free software; you can redistribute it and/or +modify it under the terms of the GNU General Public License +as published by the Free Software Foundation; either version 2 +of the License, or (at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +General Public License for more details. + +You should have received a copy of the GNU General Public +License along with this program; if not, write to the +Free Software Foundation, Inc., 59 Temple Place - Suite 330, +Boston, MA 02111-1307, USA. +""" + +import sys + +if sys.hexversion < 0x3040000: + print('Python >= 3.4 required') + sys.exit(1) + +import os +import importlib.util +import importlib.machinery + +def create_generators_module(): + generators_dir = os.path.split(os.path.dirname(os.path.realpath(__file__)))[0] + + if sys.hexversion < 0x3050000: + generators_module = importlib.machinery.SourceFileLoader('generators', os.path.join(generators_dir, '__init__.py')).load_module() + else: + generators_spec = importlib.util.spec_from_file_location('generators', os.path.join(generators_dir, '__init__.py')) + generators_module = importlib.util.module_from_spec(generators_spec) + + generators_spec.loader.exec_module(generators_module) + + sys.modules['generators'] = generators_module + +if 'generators' not in sys.modules: + create_generators_module() + +from generators import common + +class MicroPythonTester(common.Tester): + def __init__(self, root_dir, extra_paths): + common.Tester.__init__(self, 'micropython', '.py', root_dir, + subdirs=['examples', 'source'], + extra_paths=extra_paths) + + def test(self, cookie, tmp_dir, scratch_dir, path, extra): + args = ['python3', + '-c', + 'import py_compile; py_compile.compile("{0}", doraise=True)'.format(path)] + + self.execute(cookie, args) + +def test(root_dir): + extra_paths = [os.path.join(root_dir, '../stubs/generate_tng_stubs.py')] + + return MicroPythonTester(root_dir, extra_paths).run() + +if __name__ == '__main__': + common.dockerize('micropython', __file__) + + test(os.getcwd())