import binascii
import struct

import cantools
from cantools.database import Message

from logutil import Logs


class DBC():
    def __init__(self, dbc_path):
        # 加载dbc文件
        self.db =cantools.db.load_file(filename=dbc_path, database_format=None,
                                        encoding="UTF-8",
                                        frame_id_mask=None, strict=False, cache_dir=None)

        self.__log = Logs()


    def get_message_by_id(self, message_id: str) -> Message:
        """
        根据报文id获取当前dbc的报文name
        :param message_id: 报文id
        :return: 报文
        """
        message = self.db.get_message_by_frame_id(int(message_id, 16))
        self.__log.debug(f"message信息:{message}")

        return message


    def get_message_by_name(self, message_name: str) -> Message:
        """
        根据报文名称获取当前dbc的报文frame_id
        :param message_name: 报文名称
        :return: 报文
        """
        message = self.db.get_message_by_name(message_name)
        self.__log.debug(f"message信息:{message}")

        return message


    def get_single_value(self, select_data: dict, value_type_is_num: bool = False) -> [dict, None]:
        """
        根据包含有message_id的字典,查询当前报文的信号表示的含义
        :param select_data: 要查询的信息,包含message_id和data
        :param value_type_is_num: 查询结果为数字还是值
        :return: 报文详情
        """

        # 在dbc中找到该信号供解析使用
        select_message = self.db.get_message_by_frame_id(int(select_data['message_id'], 16))

        message_key: bytes = b""
        for i in select_data['data']:
            message_key += struct.pack('B', i)

        if value_type_is_num:

            result = select_message.decode(message_key,
                                           decode_choices=False,  # 值为数字为 False 值为 True
                                           scaling=True,
                                           decode_containers=False)
        else:
            result = select_message.decode(message_key,
                                           decode_choices=True,  # 值为数字为 False 值为 True
                                           scaling=True,
                                           decode_containers=False)

        self.__log.debug(f"报文{select_data['message_id']}值为:{result}")

        return result

    def get_single_array(self, name: str,single_data: dict = {}):
        """
        根据报文名称和详情,返回字节形式的报文
        :param name: 要查询的报文名称
        :param single_data: 报文详情
        :return: 报文详情
        """
        # 查询报文对象
        message = self.db.get_message_by_name(name)
        # 将dbc中所有的键值对取出来
        data_orig = {}
        for i in range(len(message.signals)):
            signal = message.signals[i]
            data_orig[signal.name] = 0
        # 更新该字典键值对的值
        data_orig.update(single_data)
        # 返回字节形式
        msgdata = message.encode(data_orig, scaling=True, padding=False, strict=False)
        data_final = self.single_format(str(binascii.b2a_hex(msgdata), 'utf-8'))
        
        return data_final

    def single_format(self, single: str) -> str:
        """
        格式化信号
        :param single: 信号
        :return: 增加空格的信号
        """
        single_length: int = len(single)
        result: str = ""
        for i in range(single_length):
            if i > 0 and i%2 ==0 and i < single_length:
                result += " "+ single[i]
            else:
                result += single[i]

        return result

if __name__ == "__main__":
    dbc = DBC(".\\演示用dbc.dbc")
    print(dbc.get_single_array("message_name",{'sigle_name1': 1,'sigle_name2': 1}))
    # 00 60 00 00 00 00 00 00
    print(dbc.get_message_by_name("message_name"))
    # message('message_name', 0x001, False, 8, None)
    print(dbc.get_message_by_id("0x001"))
    print(dbc.get_single_value({"message_id": "0x001",
                                         "data": [00, 60, 00, 00, 00, 00, 00, 00]
                                         }))
    # {'sigle_name1': ' OFF ', 'sigle_name2': ' Invalid ', 'sigle_name3': ' OFF ', 'sigle_name4': 'Not Active'}

最近接受封装周立功接口的工作,在解析dbc中的CAN信号时,发现cantools相关的网络资源少,阅读源码的效率太低,而且只需要用到格式的转换。所以特此发稿。

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐