mqtt中间件和influxdb时序数据库的安装另外介绍,这里不说明

一、服务架构

IoT Simulator(publisher)----> MQTT broker---->Telegraf(subscriber)---->InfluxDB

二、Telegraf下载地址

下面主要以windows版本为例,linux安装过程也差不多
https://portal.influxdata.com/downloads/

三、安装和插入数据到数据库

1: 安装并解压telegraf (如果没有wget请自行下载)
2: 修改配置文件telegraf.conf(主要配置input,output&processor plugins)
processor plugin的功能主要是打印从mqtt broker订阅的数据并显示在console中

参考资料:https://docs.influxdata.com/telegraf/v1.15/data_formats/input/json/

[[outputs.influxdb]]
  urls = ["https://localhost:8086"] # required
  # The target database for metrics (telegraf will create it if not exists)
  database = "telegraf" # required
  precision = "s"
 ## Name of existing retention policy to write to.  Empty string writes to
  ## the default retention policy.
  retention_policy = ""
  ## Write consistency (clusters only), can be: "any", "one", "quorum", "all"
  write_consistency = "any"

  ## Write timeout (for the InfluxDB client), formatted as a string.
  ## If not provided, will default to 5s. 0s means no timeout (not recommended).
  timeout = "5s"



  [[inputs.mqtt_consumer]]
  ## MQTT broker URLs to be used. The format should be scheme://host:port,
  ## schema can be tcp, ssl, or ws.
  servers = ["tcp://localhost:1883"]
  ## MQTT QoS, must be 0, 1, or 2
  qos = 2
  ## Connection timeout for initial connection in seconds
  connection_timeout = "30s"

  ## Topics to subscribe to
  topics = [
    "Network Quality/+/UU"
  ]

  # if true, messages that can't be delivered while the subscriber is offline
  # will be delivered when it comes back (such as on service restart).
  # NOTE: if true, client_id MUST be set
  persistent_session = true
  # If empty, a random client ID will be generated.
  client_id = "networkQualityUU"

  ## Optional SSL Config
  # ssl_ca = "/etc/telegraf/ca.pem"
  # ssl_cert = "/etc/telegraf/cert.pem"
  # ssl_key = "/etc/telegraf/key.pem"
  ## Use SSL but skip chain & host verification
  insecure_skip_verify = true

  ## Data format to consume.
  ## Each data format has its own unique set of configuration options, read
  ## more about them here:
  ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
  data_format = "json"
  ## List of tag names to extract from top-level of JSON server response
  tag_keys = [
    "APN",
    "Bandwidth",
    "Cell ID",
    "EARFCN",
    "IMSI",
    "IPAddress",
    "LTE EARFCN",
    "LTE PCI",
    "Lat",
    "Lon",
    "MSISDN",
    "NetworkMode",
    "PCI",
    "SubnetMask",
    "eNB ID"
  ]
  #所有JSON数字都将转换为float字段。除非在tag_key或json_string_fields选项中指定,否则将忽略JSON字符串。
  json_string_fields = [
    "BER",
    "BytesReceived",
    "BytesSent",
    "DownlinkRATE",
    "LTE RSRP",
    "Neighbor RSRP",
    "Neighbor SINR",
    "PacketsReceived",
    "PacketsSent",
    "RSRP",
    "RSRQ",
    "RSSI",
    "SINR",
    "UplinkRATE"
  ]
  json_time_key = "Timestamp"   #要插入的时间主键名称
  json_time_format = "unix_ms"   #消息时间的格式,此为时间戳
  name_override = "msg_quality_uu"   #插入到的表名


[[inputs.exec]]
  ## Commands array
  commands = []
#"/tmp/test.sh", "/usr/bin/mycollector --foo=bar"
  ## measurement name suffix (for separating different commands)
  name_suffix = "mqtt_consumer_uu"

  ## Data format to consume.
  ## Each data format has its own unique set of configuration options, read
  ## more about them here:
  ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
  data_format = "json"
  
  #语句打印出来
[[processors.printer]]

注意:如果有多个主题需要插入到时序数据库的不同表里,设置多个 [[inputs.mqtt_consumer]]就行

3:运行telegraf,运行前先开启数据模拟发射器和MQTT broker确保influxdb能订阅到稳定的数据流,否则influxdb有可能会报错监听不到数据写入。
到安装目录下运行:

telegraf -config telegraf.conf

由于配置了printer plugin,在telegraf正常运行的情况下可以看到数据流打印在console中

4: 发送消息可以看到数据自动写入数据库

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐