Skip to main content

适配器与MQTT集成

Octopus 提供了两种的方法与MQTT集成:

  1. modbusopcuabledummy,都支持通过 MQTT 代理同步设备状态。
  2. 如果设备支持 MQTT,则可以将MQTT 适配器用作首选。

这篇文章主要概述了第一种方法的细节,如果您想了解更多关于 MQTT 适配器的信息,请查看MQTT 适配器。 如果以上开箱即用的方式无法满足您的要求,则可以按照CONTRIBUTING提出您的想法,或开发新的适配器

说明: MQTT 集成目前仅支持write - [publish]的模板主题。

尽管 MQTT 的最新版本为 v5.0,但目前 Octopus 暂时不支持该修订版,主要原因是相应的开发库尚不支持paho.mqtt.golang/issues#347

目前 Octopus 支持的 MQTT 版本如下:

设备与 MQTT 集成后,可以显示设备状态、赋予设备使用 MQTT 的能力,或扩展设备的使用场景,例如设备交互和设备监视。

MQTT#

MQTT 是机器对机器(M2M)物联网连接协议。 它被设计为一种非常轻量级的消息传输协议。 对于与需要较小代码占用和网络带宽非常宝贵的远程位置的连接很有用。

尽管 MQTT 的名称包含MQ,但它不是用于定义消息队列的协议,实际上,MQ是指 IBM 的 MQseries 产品,与消息队列无关。。 MQTT 是一种轻量级的二进制协议,并且由于其最小的数据包开销,与 HTTP 之类的协议相比,MQTT 在通过网络传输数据时表现出色。 MQTT 提供了一种可以像消息队列一样发布和订阅的通信方式,同时,提供了许多功能来丰富通信场景,例如 QoS,最后遗嘱和遗嘱,保留的消息等。 要了解有关 MQTT 的更多信息,强烈推荐一系列文章:MQTT Essentials

mqtt-tcp-ip-stack

惯例#

MQTT 使用基于主题的消息过滤每封邮件都包含一个主题(主题),代理可以使用该主题来确定订阅客户端是否收到该邮件。

在 MQTT 中,topic是可用于过滤和路由消息的层次结构字符串, 而payload数据不可知,这意味着发布者可以发送二进制数据,文本数据甚至是 完整的 XML 或 JSON,因此设计主题树和有效负载架构是任何 MQTT 部署的重要工作。

Octopus 建议您参考MQTT Essentials 中 MQTT 主题的最佳实践来构造 topic名称,并将 payload 数据编组为 JSON。

配置选项#

octopus 重组了github.com/eclipse/paho.mqtt.golang的客户端参数,然后提供了一组配置选项。

目前官方的适配器如BLEModbusopcua都支持 MQTT 协议扩展,使用相同的配置(参考以下spec.template.spec.extension.mqtt)。

apiVersion: edge.cattle.io/v1alpha1
kind: DeviceLink
metadata:
name: living-room-fan
spec:
adaptor:
node: edge-worker
name: adaptors.edge.cattle.io/dummy
model:
apiVersion: "devices.edge.cattle.io/v1alpha1"
kind: "DummySpecialDevice"
template:
metadata:
labels:
device: living-room-fan
spec:
extension:
mqtt:
client:
server: tcp://test.mosquitto.org:1883
maxReconnectInterval: 20s
message:
topic: cattle.io/octopus/:namespace/:name
qos: 1
protocol:
location: "living_room"
gear: slow
"on": true

规范#

MQTT 选项的规范在所有 MQTT 扩展适配器中均有效,它们用于连接 MQTT 代理,指导连接,指示要发布/订阅的主题以及有效载 payload 的编码。

MQTTOptions#

参数描述类型是否必填
client指定客户端的设置MQTTClientOptions
message指定信息的设置MQTTMessageOptions
MQTTClientOptions#
参数描述类型是否必填
server指定 MQTT broker 的服务器 URI,格式为 "schema://host:port"。schema的可选值为:"ws"、"wss"、"tcp"、"unix"、"ssl"、"tls "或 "tcps "。string
protocolVersion指定集群连接到 broker 时使用的 MQTT 协议版本。可选值是3--MQTT 3.1 和4--MQTT 3.1.1,默认值是0*uint
basicAuth指定客户端连接到 MQTT broker 的用户名和密码*MQTTClientBasicAuth
tlsConfig指定客户端连接到 MQTT broker 的 TLS 配置*MQTTClientTLS
cleanSession指定在连接消息中设置 "clean session "标志,MQTT broker 不应该,默认为true*bool
store指定在 QoS 级别为 1 或 2 的情况下提供消息持久性,默认存储为 "Memory"*MQTTClientStore
resumeSubs指定在连接但未重新连接时恢复存储的(未)订阅信息。只有当cleanSessionfalse时才有效。默认值是falsebool
connectTimeout指定客户端在超时和出错前尝试打开与 MQTT 代理的连接的时间。持续时间为 0,则不会超时。默认值是30s*metav1.Duration
keepAlive指定客户端在向代理发送 PING 请求之前应该等待的时间。默认的 keep alive 是10s*metav1.Duration
pingTimeout指定客户端向 broker 发送 PING 请求后应该等待的时间长度默认值是10s*metav1.Duration
order指定消息路由以保证每个 QoS 级别内的顺序。默认值为 "true"*bool
writeTimeout指定客户端成功发布消息后出现超时错误的时间,默认为 30s*metav1.Duration
waitTimeout指定客户端订阅/发布消息后应超时的时间,持续时间为0永远不会超时。*metav1.Duration
disconnectQuiesce指定客户端断开连接时的静止时间,默认为 "5s"*metav1.Duration
autoReconnect配置使用自动重连逻辑,默认为 "true"bool
maxReconnectInterval指定客户在重新连接到经纪商之前应该等待的时间,默认为10m*metav1.Duration
messageChannelDepth指定客户端暂时离线时保存消息的内部队列大小,默认为100*uint
httpHeaders指定客户端在 WebSocket 开启握手时发送的附加 HTTP 头信息。string
MQTTClientBasicAuth#
参数描述类型是否必填
username指定基本认证的用户名string
usernameRef指定 DeviceLink 的引用关系,将该值作为用户名引用*edgev1alpha1.DeviceLinkReferenceRelationship
password指定基本认证的密码string
passwordRef指定 DeviceLink 的引用关系,将该值作为密码引用*edgev1alpha1.DeviceLinkReferenceRelationship
MQTTClientTLS#
参数描述类型是否必填
caFilePEMCA 证书的 PEM 格式内容,用于验证服务器证书string
caFilePEMRef指定 DeviceLink 的引用关系,以引用值作为 CA 文件的 PEM 内容*edgev1alpha1.DeviceLinkReferenceRelationship
certFilePEM证书的 PEM 格式内容,用于客户端对服务器的认证string
certFilePEMRef指定 DeviceLink 的引用关系,以引用值作为客户端证书文件的 PEM 内容*edgev1alpha1.DeviceLinkReferenceRelationship
keyFilePEM密钥的 PEM 格式内容,用于客户端对服务器的认证。string
keyFilePEMRef指定 DeviceLink 的引用关系,将该值作为客户端密钥文件 PEM 内容引用。*edgev1alpha1.DeviceLinkReferenceRelationship
serverName表示服务器的名称,参考http://tools.ietf.org/html/rfc4366#section-3.1string
insecureSkipVerify不验证服务器证书,默认值为falsebool
MQTTClientStore#
参数描述类型是否必填
type指定存储的类型,可选值为"memory"和 "file",默认值是 "memory"。string
direcotryPrefix如果使用 "文件 "存储,则指定存储的目录前缀,默认值是/var/run/octopus/mqtt。默认值是/var/run/octopus/mqttstring

MQTTMessageOptions#

FieldDescriptionSchemaRequired
topic指定主题true
will指定遗嘱信息*MQTTWillMessage
qos指定消息的 QoS,默认值为1*MQTTMessageQoSLevel
retained指定是否保留最后发布的消息,默认为 "true"bool
path指定渲染 topic 的:path关键字的路径。string
operator指定用于渲染主题的:operator关键字的操作符。*MQTTMessageTopicOperator
MQTTWillMessage#
参数描述类型是否必填
topic指定遗嘱消息的主题,如果没有设置,主题将在父字段指定的主题名称后附加$will作为主题名称。如果没有设置,主题将在父字段中指定的主题名称后附加"\$will "作为主题名称。string
content指定 will 消息的内容。内容的序列化形式是 Base64 编码的字符串,在这里表示任意(可能是非字符串)的内容值string
MQTTMessageQoSLevel#
参数描述类型
0最多发送一次byte
1最少发送一次byte
2只发送一次byte
MQTTMessageTopicOperator#
参数描述类型是否必填
read指定在订阅时渲染主题的:operator关键字的操作符stringfalse
write指定在发布时渲染主题的:operator关键字的操作符stringfalse

YAML 配置文件示例#

MQTT 选项的规范在所有 MQTT 扩展适配器中均有效,它们用于连接 MQTT 代理服务器,引导连接,指示要发布/订阅的主题以及有效 Payload 的编码等。REQUIRED 是必填字段。

# Specifies the client settings.
client:
# Specifies the server URI of MQTT broker, the format should be `schema://host:port`.
# The "schema" is one of the "ws", "wss", "tcp", "unix", "ssl", "tls" or "tcps".
# REQUIRED
server: <string>
# Specifies the MQTT protocol version that the cluster uses to connect to broker.
# Legitimate values are currently 3 - MQTT 3.1 or 4 - MQTT 3.1.1.
# The default value is 0, which means MQTT v3.1.1 identification is preferred.
protocolVersion: <int, 0|3|4>
# Specifies the username and password that the client connects
# to the MQTT broker. Without the use of `tlsConfig`,
# the account information will be sent in plaintext across the wire.
basicAuth:
# Specifies the username for basic authentication.
username: <string>
# Specifies the relationship of DeviceLink's references to
# refer to the value as the username.
usernameRef:
# Specifies the name of reference.
# REQUIRED
name: <string>
# Specifies the item name of the referred reference.
# REQUIRED
item: <string>
# Specifies the relationship of DeviceLink's references to refer to the value as the username.
passsword: <string>
# Specifies the relationship of DeviceLink's references to
# refer to the value as the password.
passwordRef:
# Specifies the name of reference.
# REQUIRED
name: <string>
# Specifies the item name of the referred reference.
# REQUIRED
item: <string>
# Specifies the TLS configuration that the client connects to the MQTT broker.
tlsConfig:
# The PEM format content of the CA certificate,
# which is used for validate the server certificate with.
caFilePEM: <string>
# Specifies the relationship of DeviceLink's references to
# refer to the value as the CA file PEM content.
caFilePEMRef:
# Specifies the name of reference.
# REQUIRED
name: <string>
# Specifies the item name of the referred reference.
# REQUIRED
item: <string>
# The PEM format content of the certificate and key,
# which is used for client authenticate to the server.
certFilePEM: <string>
# Specifies the relationship of DeviceLink's references to
# refer to the value as the client certificate file PEM content.
certFilePEMRef:
# Specifies the name of reference.
# REQUIRED
name: <string>
# Specifies the item name of the referred reference.
# REQUIRED
item: <string>
# Specifies the PEM format content of the key(private key),
# which is used for client authenticate to the server.
keyFilePEM: <string>
# Specifies the relationship of DeviceLink's references to
# refer to the value as the client key file PEM content.
keyFilePEMRef:
# Specifies the name of reference.
# REQUIRED
name: <string>
# Specifies the item name of the referred reference.
# REQUIRED
item: <string>
# Indicates the name of the server, ref to http://tools.ietf.org/html/rfc4366#section-3.1.
serverName: <string>
# Doesn't validate the server certificate.
insecureSkipVerify: <bool>
# Specifies setting the "clean session" flag in the connect message that the MQTT broker should not
# save it. Any messages that were going to be sent by this client before disconnecting previously but didn't send upon connecting to the broker.
# The default value is "true".
cleanSession: <bool>
# Specifies to provide message persistence in cases where QoS level is 1 or 2.
# The default store is "memory".
store:
# Specifies the type of storage.
# The default store is "Memory".
type: <string, Memory|File>
# Specifies the directory prefix of the storage, if using file store.
# The default value is "/var/run/octopus/mqtt".
direcotryPrefix: <string>
# Specifies to enable resuming of stored (un)subscribe messages when connecting but not reconnecting.
# This is only valid if `cleanSession` is false.
# The default value is "false".
resumeSubs: <bool>
# Specifies the amount of time that the client try to open a connection
# to an MQTT broker before timing out and getting error.
# A duration of 0 never times out.
# The default value is "30s".
connectionTime: <string>
# Specifies the amount of time that the client should wait
# before sending a PING request to the broker. This will
# allow the client to know that the connection has not been lost
# with the server.
# A duration of 0 never keeps alive.
# The default keep alive is "30s".
keepAlive: <string>
# Specifies the amount of time that the client should wait
# after sending a PING request to the broker. This will
# allow the client to know that the connection has been lost
# with the server.
# A duration of 0 may cause unnecessary timeout error.
# The default value is "10s".
pingTimeout: <string>
# Specifies the message routing to guarantee order within each QoS level. If set to false,
# the message can be delivered asynchronously from the client to the application and
# possibly arrive out of order.
# The default value is "true".
order: <bool>
# Specifies the amount of time that the client publish a message successfully before
# getting a timeout error.
# A duration of 0 never times out.
# The default value is "30s".
writeTimeout: <string>
# Specifies the amount of time that the client should timeout
# after subscribed/published a message.
# A duration of 0 never times out.
waitTimeout: <string>
# Specifies the quiesce when the client disconnects.
# The default value is "5s".
disconnectQuiesce: <string>
# Configures using the automatic reconnection logic.
# The default value is "true".
autoReconnect: <bool>
# Specifies the amount of time that the client should wait
# before reconnecting to the broker. The first reconnect interval is 1 second,
# and then the interval is incremented by *2 until `MaxReconnectInterval` is reached.
# This is only valid if `AutoReconnect` is true.
# A duration of 0 may trigger the reconnection immediately.
# The default value is "10m".
maxReconnectInterval: <string>
# Specifies the size of the internal queue that holds messages
# while the client is temporarily offline, allowing the application to publish
# when the client is reconnected.
# This is only valid if `autoReconnect` is true.
# The default value is "100".
messageChannelDepth: <int>
# Specifies the additional HTTP headers that the client sends in the WebSocket opening handshake.
httpHeaders: <map[string][]string>
# Specifies the message settings.
message:
# Specifies the topic.
# REQUIRED
topic: <string>
# Specifies the will message that the client gives it to the broker,
# which can be published to any clients that are subscribed the provided topic.
will:
# Specifies the topic of will message.
# if not set, the topic will append "$will" to the topic name specified
# in parent field as its topic name.
topic: <string>
# Specifies the content of will message. The serialized form of the content is a
# base64 encoded string, representing the arbitrary (possibly non-string) content value here.
content: <string, base64-encoded>
# Specifies the QoS of the will message.
# 0: Send at most once.
# 1: Send at least once.
# 2: Send exactly once.
# The default value is "1".
qos: <int, 0|1|2>
# Specifies the will message to be retained.
# The default value is "true".
retained: <bool>
# Specifies the path for rendering the `:path` keyword of topic.
path: <string>
# Specifies the operator for rendering the `:operator` keyword of topic.
operator:
# Specifies the operator for rendering the `:operator` keyword of topic during subscribing.
read: <string>
# Specifies the operator for rendering the `:operator` keyword of topic during publishing.
write: <string>

Templated Topic#

Octopus 提供了一个templated topic,以适应不同的 MQTT 发布和订阅场景。templated topic 支持的关键词有五个。

  • :namespace,替换 DeviceLink 的命名空间
  • :name,替换为 DeviceLink 的名称
  • :uid,替换为 DeviceLink 的UID
  • :path,替换为自定义路径。
  • :operator,基于操作(read - subscribe, write - publish)进行替换。值得注意的是,read操作在 MQTT 扩展中不支持,但在MQTT 适配器中运行良好。

模板化主题有以下两个特点:

  • 容错的额外分隔符,path. "a///b//c"将被视为path."。"a///b///c"将作为path: "a/b/c"
  • 自动忽略没有内容的关键词。

使用案例#

  1. 给定主题cattle.io/octopus/:namespace/device/:name,当 DeviceLink 命名为default/case1

    apiVersion: edge.cattle.io/v1alpha1
    kind: DeviceLink
    metadata:
    namespace: default
    name: case1
    uid: fcd1eb1b-ea42-4cb9-afb0-0ec2d0830583
    spec:
    ...
    template:
    ...
    spec:
    extension:
    mqtt:
    ...
    message:
    topic: "cattle.io/octopus/:namespace/device/:name"
    • Publish Topic: cattle.io/octopus/default/device/case1
    • Subscribe Topic: cattle.io/octopus/default/device/case1
  2. 给定主题cattle.io/octopus/device/:uid,当 DeviceLink 命名为default/case2时:

    apiVersion: edge.cattle.io/v1alpha1
    kind: DeviceLink
    metadata:
    namespace: default
    name: case2
    uid: 41478d1e-c3f8-46e3-a3b5-ba251f285277
    spec:
    ...
    template:
    ...
    spec:
    extension:
    mqtt:
    ...
    message:
    topic: "cattle.io/octopus/device/:uid"
    • Publish Topic: cattle.io/octopus/device/41478d1e-c3f8-46e3-a3b5-ba251f285277
    • Subscribe Topic: cattle.io/octopus/device/41478d1e-c3f8-46e3-a3b5-ba251f285277

    UID 是 Kubernetes 提供的代表资源的唯一标识,对外没有太多的解读意义。因此,一般情况下不建议使用这个关键字。

  3. 给定主题cattle.io/octopus/:operator/device/:namespace/:name,当 DeviceLink 命名为default/case3时:

    apiVersion: edge.cattle.io/v1alpha1
    kind: DeviceLink
    metadata:
    namespace: default
    name: case3
    uid: 835aea2e-5f80-4d14-88f5-40c4bda41aa3
    spec:
    ...
    template:
    ...
    spec:
    extension:
    mqtt:
    ...
    message:
    topic: "cattle.io/octopus/:operator/device/:namespace/:name"
    operator:
    write: "set"
    • Publish Topic: cattle.io/octopus/set/device/default/case3
    • Subscribe Topic: cattle.io/octopus/device/default/case3
  4. 给定主题cattle.io/octopus/:operator/device/:path/:uid,当 DeviceLink 命名为default/case4时。

    apiVersion: edge.cattle.io/v1alpha1
    kind: DeviceLink
    metadata:
    namespace: default
    name: case4
    uid: 014997f5-1f12-498b-8631-d2f22920e20a
    spec:
    ...
    template:
    ...
    spec:
    extension:
    mqtt:
    ...
    message:
    topic: "cattle.io/octopus/:operator/device/:path/:uid"
    operator:
    read: "status"
    path: "region/ap"
    • Publish Topic: cattle.io/octopus/device/region/ap/014997f5-1f12-498b-8631-d2f22920e20a
    • Subscribe Topic: cattle.io/octopus/status/device/region/ap/014997f5-1f12-498b-8631-d2f22920e20a

可用适配器列表#

Last updated on by yzeng25