具有MQTT,NiFi和InfluxDB的物聯網數據管道
1.簡介
在本教程中,我們將學習為物聯網應用程序創建數據管道時需要做什麼。
在此過程中,我們將了解IoT架構的特徵,並了解如何利用MQTT Broker,NiFi和InfluxDB等不同工具來為IoT應用程序構建高度可擴展的數據管道。
2.物聯網及其架構
首先,讓我們研究一些基本概念並了解IoT應用程序的一般體系結構。
2.1。什麼是物聯網?
物聯網(IoT)泛指物理對象的網絡,稱為“事物”。例如,事物可能包括從普通家用物品(例如燈泡)到復雜的工業設備的任何事物。通過這個網絡,我們可以將各種各樣的傳感器和執行器連接到互聯網上以交換數據:
現在,我們可以在非常不同的環境中部署事物-例如,該環境可以是我們的家,也可以是完全不同的事物,例如移動的貨運卡車。但是,我們實際上無法對可用於這些設備的電源和網絡的質量做出任何假設。因此,這對物聯網應用提出了獨特的要求。
2.2。物聯網架構簡介
典型的物聯網架構通常將自身構造為四個不同的層。讓我們了解數據實際上是如何流過這些層的:
首先,傳感層主要由收集來自環境的測量值的傳感器組成。然後,網絡層幫助匯總原始數據並通過Internet發送以進行處理。此外,數據處理層過濾原始數據並生成早期分析。最後,應用程序層使用強大的數據處理功能來執行更深入的數據分析和管理。
3. MQTT,NiFi和InfluxDB簡介
現在,讓我們檢查一下我們今天在IoT設置中廣泛使用的一些產品。這些都提供了一些獨特的功能,使其適合於IoT應用程序的數據需求。
3.1。 MQTT
消息隊列遙測傳輸(MQTT)是一種輕量級的發布-訂閱網絡協議。現在是OASIS和ISO標準。 IBM最初開發它是為了在設備之間傳輸消息。 MQTT適用於內存,網絡帶寬和電源稀缺的受限環境。
MQTT遵循客戶機-服務器模型,其中不同的組件可以充當客戶機並通過TCP連接到服務器。我們知道該服務器是MQTT代理。客戶端可以將消息發佈到稱為主題的地址。他們還可以訂閱主題並接收發佈到該主題的所有消息。
在典型的物聯網設置中,傳感器可以將溫度等測量結果發佈到MQTT代理,而上游數據處理系統可以訂閱以下主題以接收數據:
如我們所見,MQTT中的主題是分層的。系統可以使用通配符輕鬆訂閱整個主題層次結構。
MQTT支持三個級別的服務質量(QoS) 。這些是“最多交付一次”,“至少交付一次”和“恰好交付一次”。 QoS定義了客戶端和服務器之間的協議級別。每個客戶可以選擇適合其環境的服務級別。
客戶端還可以請求代理在發佈時保留消息。在某些設置中,MQTT代理可能要求客戶端提供用戶名和密碼身份驗證才能進行連接。此外,出於隱私考慮,可以使用SSL / TLS對TCP連接進行加密。
有幾種可用的MQTT代理實現和客戶端庫可供使用-例如, HiveMQ , Mosquitto和Paho MQTT 。在本教程的示例中,我們將使用Mosquitto。 Mosquitto是Eclipse Foundation的一部分,我們可以輕鬆地將其安裝在Raspberry Pi或Arduino之類的板上。
3.2。Apache NiFi
Apache NiFi最初是由NSA開發為NiagaraFiles。它促進了系統之間數據流的自動化和管理,並且基於基於流的編程模型,該模型將應用程序定義為黑匣子流程網絡。
讓我們首先了解一些基本概念。在NiFi中通過系統移動的對象稱為FlowFile。 FlowFile處理器實際上執行有用的工作,例如FlowFiles的路由,轉換和中介。 FlowFile處理器與Connections連接。
進程組是一種將組件分組在一起以在NiFi中組織數據流的機制。進程組可以通過輸入端口接收數據,並可以通過輸出端口發送數據。遠程進程組(RPG)提供了一種向NiFi遠程實例發送數據或從其接收數據的機制。
現在,有了這些知識,讓我們看一下NiFi架構:
NiFi是基於Java的程序,可在JVM中運行多個組件。 Web服務器是承載命令和控制API的組件。 Flow Controller是NiFi的核心組件,它管理擴展何時接收執行資源的時間表。擴展允許NiFi進行擴展,並支持與不同系統的集成。
NiFi會在FlowFile信息庫中跟踪FlowFile的狀態。 FlowFile的實際內容字節位於內容存儲庫中。最後,與FlowFile相關的出處事件數據位於出處庫中。
由於從源頭收集數據可能需要較小的佔地面積和較低的資源消耗,因此NiFi擁有一個名為MiNiFi的子項目。 MiNiFi為NiFi提供了一種補充性的數據收集方法,並通過站點到站點(S2S)協議輕鬆與NiFi集成:
此外,它還可以通過MiNiFi命令和控制(C2)協議對代理進行集中管理。此外,它通過生成完整的監管信息鏈來幫助建立數據出處。
3.3。 InfluxDB
InfluxDB是一個用Go編寫的時間序列數據庫,由InfluxData開發。它設計用於快速和高可用性的時間序列數據存儲和檢索。這特別適合處理應用程序指標,IoT傳感器數據和實時分析。
首先,InfluxDB中的數據是按時間序列組織的。時間序列可以包含零個或多個點。點代表具有四個組成部分的單個數據記錄-測量,標籤集,字段集和時間戳:
首先,時間戳顯示與特定點關聯的UTC日期和時間。字段集由一個或多個字段關鍵字和字段值對組成。他們使用點標記捕獲實際數據。同樣,標籤集由標籤鍵和標籤值對組成,但它們是可選的。它們基本上充當點的元數據,並且可以為更快的查詢響應建立索引。
該度量充當標籤集,字段集和時間戳的容器。此外,InfluxDB中的每個點都可以具有與其關聯的保留策略。保留策略描述了InfluxDB將保留數據多長時間,以及它將通過複製創建多少個副本。
最後,數據庫充當用戶,保留策略,連續查詢和時間序列數據的邏輯容器。我們可以理解InfluxDB中的數據庫與傳統的關係數據庫大致相似。
此外,InfluxDB是InfluxData平台的一部分,該平台提供了其他幾種產品來有效地處理時間序列數據。 InfluxData現在提供它作為開源平台InfluxDB OSS 2.0和商業產品InfluxDB Cloud:
除了InfluxDB之外,該平台還包括Chronograf ,它為InfluxData平台提供了完整的接口。此外,它還包括Telegraf ,它是用於收集和報告指標和事件的代理。最後,還有一個實時流數據處理引擎Kapacitor。
4.物聯網數據管道的動手實踐
現在,我們已經覆蓋了足夠的基礎知識來一起使用這些產品來為我們的物聯網應用程序創建數據管道。我們假定本教程從多個城市的多個觀測站收集與空氣質量相關的測量值。例如,測量包括地面臭氧,一氧化碳,二氧化硫,二氧化氮和氣溶膠。
4.1。設置基礎架構
首先,我們假設城市中的每個氣象站都配備了所有傳感設備。此外,這些傳感器還連接到Raspberry Pi之類的板上,以收集模擬數據並將其數字化。該評估板已連接至無線設備,以將原始測量結果發送到上游:
區域控制站從城市中的所有氣象站收集數據。我們可以將這些數據匯總並提供給一些本地分析引擎,以更快地獲得洞察。來自所有區域控制中心的已過濾數據被發送到中央命令中心,該中心主要託管在雲中。
4.2。創建物聯網架構
現在,我們準備為簡單的空氣質量應用程序設計IoT架構。我們將在此處使用MQTT代理,MiNiFi Java代理,NiFi和InfluxDB:
如我們所見,我們正在氣象站站點上使用Mosquitto MQTT代理和MiNiFi Java代理。在區域控制中心,我們正在使用NiFi服務器來聚合和路由數據。最後,我們使用InfluxDB在命令中心級別存儲度量。
4.3。執行安裝
在Raspberry Pi之類的板上安裝Mosquitto MQTT代理和MiNiFi Java代理非常容易。但是,對於本教程,我們將它們安裝在本地計算機上。
Eclipse Mosquito的官方下載頁面提供了多個平台的二進製文件。安裝後,從安裝目錄啟動Mosquitto非常簡單:
net start mosquitto
此外, NiFi二進製文件也可以從其官方網站下載。我們必須將下載的存檔提取到合適的目錄中。由於MiNiFi將使用站點間協議連接到NiFi,因此我們必須在<NIFI_HOME> /conf/nifi.properties中指定站點間輸入套接字端口:
# Site to Site properties
nifi.remote.input.host=
nifi.remote.input.secure=false
nifi.remote.input.socket.port=1026
nifi.remote.input.http.enabled=true
nifi.remote.input.http.transaction.ttl=30 sec
然後,我們可以啟動NiFi:
<NIFI_HOME>/bin/run-nifi.bat
同樣,可以從官方網站下載Java或C ++ MiNiFi代理和工具包二進製文件。同樣,我們必須將存檔提取到合適的目錄中。
默認情況下,MiNiFi配備的處理器數量很少。由於我們將使用MQTT中的數據,因此必須將MQTT處理器複製到<MINIFI_HOME> / lib目錄中。這些文件打包為NiFi存檔(NAR)文件,可以位於<NIFI_HOME> / lib目錄中:
COPY <NIFI_HOME>/lib/nifi-mqtt-nar-xxxnar <MINIFI_HOME>/lib/nifi-mqtt-nar-xxxnar
然後,我們可以啟動MiNiFi代理:
<MINIFI_HOME>/bin/run-minifi.bat
最後,我們可以從InfluxDB的官方站點下載開源版本。和以前一樣,我們可以提取存檔並使用簡單的命令啟動InfluxDB:
<INFLUXDB_HOME>/influxd.exe
我們應該保留所有其他配置(包括端口)作為本教程的默認配置。至此,我們本地計算機上的安裝和設置結束了。
4.4。定義NiFi數據流
現在,我們準備定義數據流。 NiFi提供了一個易於使用的界面來創建和監視數據流。可通過URL http:// localhost:8080 / nifi訪問。
首先,我們將定義將在NiFi服務器上運行的主要數據流:
如我們所見,在這裡,我們定義了一個輸入端口,它將接收來自MiNiFi代理的數據。它還通過連接發送數據到負責將數據存儲在InfluxDB中PutInfluxDB
處理器。在此處理器的配置中,我們定義了InfluxDB的連接URL和要在其中發送數據的數據庫名稱。
4.5。定義MiNiFi數據流
接下來,我們將定義將在MiNiFi代理上運行的數據流。我們將使用NiFi相同的用戶界面,並將數據流導出為模板,以便在MiNiFi代理中進行配置。讓我們為MiNiFi代理定義數據流:
在這裡,我們定義了ConsumeMQTT
處理器,該處理器負責從MQTT代理獲取數據。我們在屬性中提供了代理URI以及主題過濾器。 air-quality
等級下定義的所有主題中提取數據。
我們還定義了一個遠程進程組,並將其連接到ConcumeMQTT處理器。遠程進程組負責通過站點到站點協議將數據推送到NiFi。
我們可以將此數據流另存為模板,然後將其下載為XML文件。讓我們將此文件命名為config.xml
。現在,我們可以使用轉換器工具包將此模板從XML轉換為YAML,MiNiFi代理使用該模板:
<MINIFI_TOOLKIT_HOME>/bin/config.bat transform config.xml config.yml
這將為我們提供config.yml
文件,我們必須在其中手動添加NiFi服務器的主機和端口:
Input Ports:
- id: 19442f9d-aead-3569-b94c-1ad397e8291c
name: From MiNiFi
comment: ''
max concurrent tasks: 1
use compression: false
Properties: # Deviates from spec and will later be removed when this is autonegotiated
Port: 1026
Host Name: localhost
現在,我們可以將該文件放置在目錄<MINIFI_HOME> / conf中,替換那裡可能已經存在的文件。此後,我們將不得不重新啟動MiNiFi代理。
在這裡,我們正在做很多手動工作來創建數據流並在MiNiFi代理中進行配置。對於在遠程位置可能存在數百個代理的現實生活場景,這是不切實際的。但是,正如我們前面所看到的,我們可以使用MiNiFi C2服務器來實現此自動化。但這不在本教程的範圍之內。
4.6。測試數據管道
最後,我們準備測試數據管道!由於我們沒有使用真實傳感器的自由,因此我們將創建一個小型仿真。我們將使用一個小的Java程序生成傳感器數據:
class Sensor implements Callable<Boolean> {
String city;
String station;
String pollutant;
String topic;
Sensor(String city, String station, String pollutant, String topic) {
this.city = city;
this.station = station;
this.pollutant = pollutant;
this.topic = topic;
}
@Override
public Boolean call() throws Exception {
MqttClient publisher = new MqttClient(
"tcp://localhost:1883", UUID.randomUUID().toString());
MqttConnectOptions options = new MqttConnectOptions();
options.setAutomaticReconnect(true);
options.setCleanSession(true);
options.setConnectionTimeout(10);
publisher.connect(options);
IntStream.range(0, 10).forEach(i -> {
String payload = String.format("%1$s,city=%2$s,station=%3$s value=%4$04.2f",
pollutant,
city,
station,
ThreadLocalRandom.current().nextDouble(0, 100));
MqttMessage message = new MqttMessage(payload.getBytes());
message.setQos(0);
message.setRetained(true);
try {
publisher.publish(topic, message);
Thread.sleep(1000);
} catch (MqttException | InterruptedException e) {
e.printStackTrace();
}
});
return true;
}
}
在這裡,我們使用Eclipse Paho Java客戶端來生成到MQTT代理的消息。我們可以根據需要添加任意數量的傳感器來創建仿真:
ExecutorService executorService = Executors.newCachedThreadPool();
List<Callable<Boolean>> sensors = Arrays.asList(
new Simulation.Sensor("london", "central", "ozone", "air-quality/ozone"),
new Simulation.Sensor("london", "central", "co", "air-quality/co"),
new Simulation.Sensor("london", "central", "so2", "air-quality/so2"),
new Simulation.Sensor("london", "central", "no2", "air-quality/no2"),
new Simulation.Sensor("london", "central", "aerosols", "air-quality/aerosols"));
List<Future<Boolean>> futures = executorService.invokeAll(sensors);
如果一切正常,我們將能夠在InfluxDB數據庫中查詢數據:
例如,我們可以在數據庫“ airquality”中看到屬於測量“臭氧”的所有點。
5.結論
綜上所述,我們在本教程中介紹了一個基本的IoT用例。我們還了解瞭如何使用MQTT,NiFi和InfluxDB之類的工具來構建可擴展的數據管道。當然,這並不涵蓋物聯網應用程序的全部範圍,並且擴展數據分析管道的可能性是無限的。
此外,我們在本教程中選擇的示例僅用於演示目的。物聯網應用程序的實際基礎架構和架構可能千差萬別。此外,我們可以通過將可操作的見解作為命令向後推來完成反饋週期。