新聞中心
本篇內(nèi)容主要講解“怎么在 Rust 中使用 MQTT”,感興趣的朋友不妨來(lái)看看。本文介紹的方法操作簡(jiǎn)單快捷,實(shí)用性強(qiáng)。下面就讓小編來(lái)帶大家學(xué)習(xí)“怎么在 Rust 中使用 MQTT”吧!
創(chuàng)新互聯(lián)公司主營(yíng)玉環(huán)網(wǎng)站建設(shè)的網(wǎng)絡(luò)公司,主營(yíng)網(wǎng)站建設(shè)方案,APP應(yīng)用開(kāi)發(fā),玉環(huán)h5小程序設(shè)計(jì)搭建,玉環(huán)網(wǎng)站營(yíng)銷(xiāo)推廣歡迎玉環(huán)等地區(qū)企業(yè)咨詢(xún)
Rust 是由 Mozilla 主導(dǎo)開(kāi)發(fā)的通用、編譯型編程語(yǔ)言。該語(yǔ)言的設(shè)計(jì)準(zhǔn)則為:安全、并發(fā)、實(shí)用,支持 函數(shù)式、并發(fā)式、過(guò)程式以及面向?qū)ο蟮木幊田L(fēng)格。Rust 速度驚人且內(nèi)存利用率極高。由于沒(méi)有運(yùn)行時(shí)和垃圾回收,它能夠勝任對(duì)性能要求特別高的服務(wù),可以在嵌入式設(shè)備上運(yùn)行,還能輕松和其他語(yǔ)言集成。Rust 豐富的類(lèi)型系統(tǒng)和所有權(quán)模型保證了內(nèi)存安全和線(xiàn)程安全,讓您在編譯期就能夠消除各種各樣的錯(cuò)誤。
MQTT 是一種基于發(fā)布/訂閱模式的 輕量級(jí)物聯(lián)網(wǎng)消息傳輸協(xié)議,可以用極少的代碼和帶寬為聯(lián)網(wǎng)設(shè)備提供實(shí)時(shí)可靠的消息服務(wù),它廣泛應(yīng)用于物聯(lián)網(wǎng)、移動(dòng)互聯(lián)網(wǎng)、智能硬件、車(chē)聯(lián)網(wǎng)、電力能源等行業(yè)。
本文主要介紹如何在 Rust 項(xiàng)目中使用 paho-mqtt客戶(hù)端庫(kù) ,實(shí)現(xiàn)客戶(hù)端與 MQTT 服務(wù)器的連接、訂閱、取消訂閱、收發(fā)消息等功能。
項(xiàng)目初始化
本項(xiàng)目使用 Rust 1.44.0 進(jìn)行開(kāi)發(fā)測(cè)試,并使用 Cargo 1.44.0 包管理工具進(jìn)行項(xiàng)目管理,讀者可用如下命令查看當(dāng)前的 Rust 版本。
~ rustc --version rustc 1.44.0 (49cae5576 2020-06-01)
選擇 MQTT 客戶(hù)端庫(kù)
paho-mqtt 是目前 Rust 中,功能完善且使用較多的 MQTT 客戶(hù)端,最新的 0.7.1
版本支持 MQTT v5、3.1.1、3.1,支持通過(guò)標(biāo)準(zhǔn) TCP、SSL / TLS、WebSockets 傳輸數(shù)據(jù),QoS 支持 0、1、2 等。
初始化項(xiàng)目
執(zhí)行以下命令創(chuàng)建名為 mqtt-example
的 Rust 新項(xiàng)目。
~ cargo new mqtt-example Created binary (application) `mqtt-example` package
編輯項(xiàng)目中的 Cargo.toml
文件,在 dependencies
中添加 paho-mqtt
庫(kù)的地址,以及指定訂閱、發(fā)布代碼文件對(duì)應(yīng)的二進(jìn)制文件。
[dependencies] paho-mqtt = { git = "https://github.com/eclipse/paho.mqtt.rust.git", branch = "master" } [[bin]] name = "sub" path = "src/sub/main.rs" [[bin]] name = "pub" path = "src/pub/main.rs"
Rust MQTT 的使用
創(chuàng)建客戶(hù)端連接
本文將使用 EMQ X 提供的 免費(fèi)公共 MQTT 服務(wù)器 作為測(cè)試連接的 MQTT 服務(wù)器,該服務(wù)基于 EMQ X 的 MQTT 物聯(lián)網(wǎng)云平臺(tái) 創(chuàng)建。服務(wù)器接入信息如下:
Broker: broker.emqx.io
TCP Port: 1883
Websocket Port: 8083
配置 MQTT Broker 連接參數(shù)
配置 MQTT Broker 連接地址(包括端口)、topic (這里我們配置了兩個(gè) topic ),以及客戶(hù)端 id。
const DFLT_BROKER:&str = "tcp://broker.emqx.io:1883"; const DFLT_CLIENT:&str = "rust_publish"; const DFLT_TOPICS:&[&str] = &["rust/mqtt", "rust/test"];
編寫(xiě) MQTT 連接代碼
編寫(xiě) MQTT 連接代碼,為了提升使用體驗(yàn),可在執(zhí)行二進(jìn)制文件時(shí)通過(guò)命令行參數(shù)的形式傳入連接地址。通常我們需要先創(chuàng)建一個(gè)客戶(hù)端,然后將該客戶(hù)端連接到 broker.emqx.io
。
let host = env::args().nth(1).unwrap_or_else(|| DFLT_BROKER.to_string() ); // Define the set of options for the create. // Use an ID for a persistent session. let create_opts = mqtt::CreateOptionsBuilder::new() .server_uri(host) .client_id(DFLT_CLIENT.to_string()) .finalize(); // Create a client. let cli = mqtt::Client::new(create_opts).unwrap_or_else(|err| { println!("Error creating the client: {:?}", err); process::exit(1); }); // Define the set of options for the connection. let conn_opts = mqtt::ConnectOptionsBuilder::new() .keep_alive_interval(Duration::from_secs(20)) .clean_session(true) .finalize(); // Connect and wait for it to complete or fail. if let Err(e) = cli.connect(conn_opts) { println!("Unable to connect:\n\t{:?}", e); process::exit(1); }
發(fā)布消息
這里我們總共發(fā)布五條消息,根據(jù)循環(huán)的奇偶性,分別向 rust/mqtt
、 rust/test
這兩個(gè)主題發(fā)布。
for num in 0..5 { let content = "Hello world! ".to_string() + &num.to_string(); let mut msg = mqtt::Message::new(DFLT_TOPICS[0], content.clone(), QOS); if num % 2 == 0 { println!("Publishing messages on the {:?} topic", DFLT_TOPICS[1]); msg = mqtt::Message::new(DFLT_TOPICS[1], content.clone(), QOS); } else { println!("Publishing messages on the {:?} topic", DFLT_TOPICS[0]); } let tok = cli.publish(msg); if let Err(e) = tok { println!("Error sending message: {:?}", e); break; } }
訂閱消息
在客戶(hù)端連接之前,需要先初始化消費(fèi)者。這里我們會(huì)循環(huán)處理消費(fèi)者中的消息隊(duì)列,并打印出訂閱的 topic 名稱(chēng)及接收到的消息內(nèi)容。
fn subscribe_topics(cli: &mqtt::Client) { if let Err(e) = cli.subscribe_many(DFLT_TOPICS, DFLT_QOS) { println!("Error subscribes topics: {:?}", e); process::exit(1); } } fn main() { ... // Initialize the consumer before connecting. let rx = cli.start_consuming(); ... // Subscribe topics. subscribe_topics(&cli); println!("Processing requests..."); for msg in rx.iter() { if let Some(msg) = msg { println!("{}", msg); } else if !cli.is_connected() { if try_reconnect(&cli) { println!("Resubscribe topics..."); subscribe_topics(&cli); } else { break; } } } ... }
完整代碼
消息發(fā)布代碼
use std::{ env, process, time::Duration }; extern crate paho_mqtt as mqtt; const DFLT_BROKER:&str = "tcp://broker.emqx.io:1883"; const DFLT_CLIENT:&str = "rust_publish"; const DFLT_TOPICS:&[&str] = &["rust/mqtt", "rust/test"]; // Define the qos. const QOS:i32 = 1; fn main() { let host = env::args().nth(1).unwrap_or_else(|| DFLT_BROKER.to_string() ); // Define the set of options for the create. // Use an ID for a persistent session. let create_opts = mqtt::CreateOptionsBuilder::new() .server_uri(host) .client_id(DFLT_CLIENT.to_string()) .finalize(); // Create a client. let cli = mqtt::Client::new(create_opts).unwrap_or_else(|err| { println!("Error creating the client: {:?}", err); process::exit(1); }); // Define the set of options for the connection. let conn_opts = mqtt::ConnectOptionsBuilder::new() .keep_alive_interval(Duration::from_secs(20)) .clean_session(true) .finalize(); // Connect and wait for it to complete or fail. if let Err(e) = cli.connect(conn_opts) { println!("Unable to connect:\n\t{:?}", e); process::exit(1); } // Create a message and publish it. // Publish message to 'test' and 'hello' topics. for num in 0..5 { let content = "Hello world! ".to_string() + &num.to_string(); let mut msg = mqtt::Message::new(DFLT_TOPICS[0], content.clone(), QOS); if num % 2 == 0 { println!("Publishing messages on the {:?} topic", DFLT_TOPICS[1]); msg = mqtt::Message::new(DFLT_TOPICS[1], content.clone(), QOS); } else { println!("Publishing messages on the {:?} topic", DFLT_TOPICS[0]); } let tok = cli.publish(msg); if let Err(e) = tok { println!("Error sending message: {:?}", e); break; } } // Disconnect from the broker. let tok = cli.disconnect(None); println!("Disconnect from the broker"); tok.unwrap(); }
消息訂閱代碼
為了提升使用體驗(yàn),消息訂閱做了斷開(kāi)重連的處理,并在重新建立連接后對(duì)主題進(jìn)行重新訂閱。
use std::{ env, process, thread, time::Duration }; extern crate paho_mqtt as mqtt; const DFLT_BROKER:&str = "tcp://broker.emqx.io:1883"; const DFLT_CLIENT:&str = "rust_subscribe"; const DFLT_TOPICS:&[&str] = &["rust/mqtt", "rust/test"]; // The qos list that match topics above. const DFLT_QOS:&[i32] = &[0, 1]; // Reconnect to the broker when connection is lost. fn try_reconnect(cli: &mqtt::Client) -> bool { println!("Connection lost. Waiting to retry connection"); for _ in 0..12 { thread::sleep(Duration::from_millis(5000)); if cli.reconnect().is_ok() { println!("Successfully reconnected"); return true; } } println!("Unable to reconnect after several attempts."); false } // Subscribes to multiple topics. fn subscribe_topics(cli: &mqtt::Client) { if let Err(e) = cli.subscribe_many(DFLT_TOPICS, DFLT_QOS) { println!("Error subscribes topics: {:?}", e); process::exit(1); } } fn main() { let host = env::args().nth(1).unwrap_or_else(|| DFLT_BROKER.to_string() ); // Define the set of options for the create. // Use an ID for a persistent session. let create_opts = mqtt::CreateOptionsBuilder::new() .server_uri(host) .client_id(DFLT_CLIENT.to_string()) .finalize(); // Create a client. let mut cli = mqtt::Client::new(create_opts).unwrap_or_else(|err| { println!("Error creating the client: {:?}", err); process::exit(1); }); // Initialize the consumer before connecting. let rx = cli.start_consuming(); // Define the set of options for the connection. let lwt = mqtt::MessageBuilder::new() .topic("test") .payload("Consumer lost connection") .finalize(); let conn_opts = mqtt::ConnectOptionsBuilder::new() .keep_alive_interval(Duration::from_secs(20)) .clean_session(false) .will_message(lwt) .finalize(); // Connect and wait for it to complete or fail. if let Err(e) = cli.connect(conn_opts) { println!("Unable to connect:\n\t{:?}", e); process::exit(1); } // Subscribe topics. subscribe_topics(&cli); println!("Processing requests..."); for msg in rx.iter() { if let Some(msg) = msg { println!("{}", msg); } else if !cli.is_connected() { if try_reconnect(&cli) { println!("Resubscribe topics..."); subscribe_topics(&cli); } else { break; } } } // If still connected, then disconnect now. if cli.is_connected() { println!("Disconnecting"); cli.unsubscribe_many(DFLT_TOPICS).unwrap(); cli.disconnect(None).unwrap(); } println!("Exiting"); }
運(yùn)行與測(cè)試
編譯二進(jìn)制文件
執(zhí)行以下命令,會(huì)在 mqtt-example/target/debug
目錄下生成消息訂閱、發(fā)布對(duì)應(yīng)的 sub
、pub
二進(jìn)制文件。
cargo build
執(zhí)行 sub
二進(jìn)制文件,等待消費(fèi)發(fā)布。
消息發(fā)布
執(zhí)行 pub
二進(jìn)制文件,可以看到分別往 rust/test
、rust/mqtt
這兩個(gè)主題發(fā)布了消息。
同時(shí)在消息訂閱中可看到發(fā)布的消息
至此,我們完成了使用 paho-mqtt客戶(hù)端連接到 公共 MQTT 服務(wù)器,并實(shí)現(xiàn)了測(cè)試客戶(hù)端與 MQTT 服務(wù)器的連接、消息發(fā)布和訂閱。
到此,相信大家對(duì)“怎么在 Rust 中使用 MQTT”有了更深的了解,不妨來(lái)實(shí)際操作一番吧!這里是創(chuàng)新互聯(lián)網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢(xún),關(guān)注我們,繼續(xù)學(xué)習(xí)!
文章題目:怎么在Rust中使用MQTT
網(wǎng)頁(yè)路徑:http://www.ef60e0e.cn/article/gphiej.html