0 前言

最近在學習 MQTT,發現 MQTT還是挺好用的,於是花了點時間做了一個簡單的應用示例,希望能給需要做這方面的人一些參考。
相關背景知識:http://www.embed-net.com/thread-224-1-1.html
具體功能為:
1,STM32F405 為主控芯片,它通過傳感器采集環境數據,比如溫度,濕度,光照度,大氣壓強等;
2,主控芯片通過 W5500 模塊將測量的數據通過 MQTT 協議方式發布到 MQTT 服務器(服務器域名和IP見固件程序);
3,主控訂閱 LED 燈控制的消息,當接收到對應的控制指令後點亮或者熄滅對應的 LED 燈;
4,安卓手機端訂閱傳感器數據的消息,當接收到消息後將傳感器數據在界面顯示;
5,安卓手機可發送點亮或者熄滅 LED 燈的指令到服務器,然後服務器會將該指令轉發給 STM32 主控,然後 STM32 主控解析該指令並執行指令。

1 單片機端實現

MQTT 協議是基於 TCP 的協議,所以我們只需要在單片機端實現TCP客戶端代碼之後就很容易移植 MQTT 了,STM32F4+W5500 實現 TCP 客戶端的代碼我們以前已經實現過,代碼下載地址為:

embed-net.com/thread-87-1-1.html

當然,如果你想在代碼裡面直接使用服務器域名方式進行連接,我們還得在 TCP客戶端代碼裡面集成 DNS 的代碼,當然在上面這個連接裡面也有相關的代碼。

MQTT 代碼源碼下載地址:

eclipse.org/paho

在 STM32 這邊我們使用的是 C/C++ MQTT Embedded clients代碼。

硬件連接如下圖所示:

image

1.1 MQTT的移植

MQTT 的移植非常簡單,將C/C++ MQTT Embedded clients 的代碼添加到工程中,然後我們只需要再次封裝4個函數即可:

int transport_sendPacketBuffer(unsigned char* buf, int buflen);
int transport_getdata(unsigned char* buf, int count);
int transport_open(void);
int transport_close(void);

transport_sendPacketBuffer:通過網絡以TCP的方式發送數據;
transport_getdata:TCP方式從服務器端讀取數據,該函數目前屬於阻塞函數;
transport_open:打開一個網絡接口,其實就是和服務器建立一個TCP連接;
transport_close:關閉網絡接口。
如果已經移植好了socket方式的TCP客戶端的程序,那麼這幾個函數的封裝也是非常簡單的,程序代碼如下所示:

下面我們看下主函數的代碼,思路也比較清晰:

/**
 *@brief  通過TCP方式發送數據到TCP服務器
 *@param  buf 數據首地址
 *@param  buflen 數據長度
 *@retval 小於0表示發送失敗
 */
int transport_sendPacketBuffer(unsigned char *buf, int buflen)
{
	return send(SOCK_TCPS, buf, buflen);
}
/**
 *@brief  阻塞方式接收TCP服務器發送的數據
 *@param  buf 數據存儲首地址
 *@param  count 數據緩沖區長度
 *@retval 小於0表示接收數據失敗
 */
int transport_getdata(unsigned char *buf, int count)
{
	return recv(SOCK_TCPS, buf, count);
}

/**
 *@brief  打開一個socket並連接到服務器
 *@param  無
 *@retval 小於0表示打開失敗
 */
int transport_open(void)
{
	int32_t ret;
	//新建一個Socket並綁定本地端口5000
	ret = socket(SOCK_TCPS, Sn_MR_TCP, 5000, 0× 00);
	if (ret != SOCK_TCPS)
	{
		printf(「 % d: Socket Errorrn」, SOCK_TCPS);
		while (1);
	}
	else
	{
		printf(「 % d: Openedrn」, SOCK_TCPS);
	}

	//連接TCP服務器
	ret = connect(SOCK_TCPS, domain_ip, 1883);	//端口必須為1883
	if (ret != SOCK_OK)
	{
		printf(「 % d: Socket Connect Errorrn」, SOCK_TCPS);
		while (1);
	}
	else
	{
		printf(「 % d: Connectedrn」, SOCK_TCPS);
	}
	return 0;
}
/**
 *@brief  關閉socket
 *@param  無
 *@retval 小於0表示關閉失敗
 */
int transport_close(void)
{
	close(SOCK_TCPS);
	return 0;
}

完成了這幾個函數,然後我們就可以根據官方提供的示例代碼實現我們自己的代碼了,比如我們向代理服務器發送一個消息的代碼如下所示:

/**
 *@brief  向代理(服務器)發送一個消息
 *@param  pTopic 消息主題
 *@param  pMessage 消息內容
 *@retval 小於0表示發送失敗
 */
int mqtt_publish(char *pTopic, char *pMessage)
{
	int32_t len, rc;
	MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
	unsigned char buf[200];
	MQTTString topicString = MQTTString_initializer;
	int msglen = strlen(pMessage);
	int buflen = sizeof(buf);

	data.clientID.cstring = 「me」;
	data.keepAliveInterval = 5;
	data.cleansession = 1;
	len = MQTTSerialize_connect(buf, buflen, &data); /*1 */

	topicString.cstring = pTopic;
	len += MQTTSerialize_publish(buf + len, buflen– len, 0, 0, 0, 0, topicString, (unsigned char *) pMessage, msglen); /*2 */

	len += MQTTSerialize_disconnect(buf + len, buflen– len); /*3 */
	transport_open();
	rc = transport_sendPacketBuffer(buf, len);
	transport_close();
	if (rc == len)
		printf(「Successfully publishednr」);
	else
		printf(「Publish failednr」);
	return 0;
}

下面我們看下主函數的代碼,思路也比較清晰:

int main(void)
{
	static char meassage[200];
	int rc;
	char *led;
	char led_value;
	float temperature, humidity, light, pressure;
	srand(0);
	//配置LED燈引腳
	LED_Config();
	//初始化配置網絡
	network_init();
	while (1)
	{
		memset(meassage, 0, sizeof(meassage));
		//訂閱消息
		rc = mqtt_subscrib(「pyboard_led」, meassage);
		printf(「rc = % dnr」, rc);
		if (rc >= 0)
		{
			printf(「meassage = % snr」, meassage);
			//解析JSON格式字符串並點亮相應的LED燈
			cJSON *root = cJSON_Parse(meassage);
			if (root != NULL)
			{
				led = cJSON_GetObjectItem(root, 」led」)->valuestring;
				printf(「led = % snr」, led);
				led_value = cJSON_GetObjectItem(root, 」value」)->valueint;
				if (!strcmp(led, 」red」))
				{
					if (led_value)
					{
						LED_On(LED_RED);
					}
					else
					{
						LED_Off(LED_RED);
					}
				}
				else if (!strcmp(led, 」green」))
				{
					if (led_value)
					{
						LED_On(LED_GREEN);
					}
					else
					{
						LED_Off(LED_GREEN);
					}
				}
				else if (!strcmp(led, 」blue」))
				{
					if (led_value)
					{
						LED_On(LED_BLUE);
					}
					else
					{
						LED_Off(LED_BLUE);
					}
				}
				else if (!strcmp(led, 」yellow」))
				{
					if (led_value)
					{
						LED_On(LED_YELLOW);
						printf(「Yellow Onnr」);
					}
					else
					{
						LED_Off(LED_YELLOW);
						printf(「Yellow Offnr」);
					}
				}
				// 釋放內存空間
				cJSON_Delete(root);
			}
			else
			{
				printf(「Error before:[ % s] nr」, cJSON_GetErrorPtr());
			}
		}
		delay_ms(500);
		//獲取傳感器測量數據,該示例使用隨機數
		temperature = rand() % 50;
		humidity = rand() % 100;
		light = rand() % 1000;
		pressure = rand() % 1000;
		//將數據合成為JSON格式數據
		sprintf(meassage, 」
		{」
			temperature」: % .1 f, 」humidity」: % .1 f, 」light」: % .1 f, 」pressure」: % .1 f
		}」, temperature, humidity, light, pressure);
		//將數據發送出去
		mqtt_publish(「pyboard_value」, meassage);
	}
}

完整工程代碼可在後面的附件下載。

2 手機端代碼實現

手機端我們也使用官方提供的Java庫Java client and utilities,下載地址:

http://www.eclipse.org/paho/

將jar文件添加到工程中即可,程序界面如下所示:

image

上面 4 個條目分別顯示 STM32 單片機通過 W5500 發送到服務器端的傳感器測量數據;
下面 4 個圖片分別控制板子上的 4 個 LED 燈;
消息發送我們采用線程的方式發送,接收采用回調函數方式接收消息。

2.1 實現消息發送

發送消息的代碼如下所示:

/**
 *send message
 */
class PublishThread extends Thread
{
	String topic;
	MqttMessage message;
	int qos = 0;
	MemoryPersistence persistence = new MemoryPersistence();
	PublishThread(String topic, String message)
	{
		this.topic = topic;
		this.message = new MqttMessage(message.getBytes());
	}
	public void sendMessage(String topic, String message)
	{
		this.topic = topic;
		this.message = new MqttMessage(message.getBytes());
		run();
	}
	@Override
	public void run()
	{
		try
		{
			MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
			MqttConnectOptions connOpts = new MqttConnectOptions();
			connOpts.setCleanSession(true);
			connOpts.setKeepAliveInterval(1);
			System.out.println(「Connecting to broker: 」+broker);
			sampleClient.connect(connOpts);
			System.out.println(「Connected」);
			System.out.println(「Publishing message: 」+message.toString());
			message.setQos(qos);
			sampleClient.publish(topic, message);
			System.out.println(「Message published」);
			sampleClient.disconnect();
			System.out.println(「Disconnected」);
		}
		catch (MqttException me)
		{
			System.out.println(「reason「 + me.getReasonCode());
			System.out.println(「msg「 + me.getMessage());
			System.out.println(「loc「 + me.getLocalizedMessage());
			System.out.println(「cause「 + me.getCause());
			System.out.println(「excep「 + me);
			me.printStackTrace();
		}
	}
}

2.2 實現消息接收
接收消息的代碼如下所示:

/**
 *receive message
 */
class SubscribeThread extends Thread
{
	final String topic;
	MemoryPersistence persistence = new MemoryPersistence();
	SubscribeThread(String topic)
	{
		this.topic = topic;
	}
	@Override
	public void run()
	{
		try
		{
			final MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
			final MqttConnectOptions connOpts = new MqttConnectOptions();
			connOpts.setCleanSession(true);
			System.out.println(「Connecting to broker: 」+broker);
			connOpts.setKeepAliveInterval(5);
			sampleClient.setCallback(new MqttCallback()
			{ 	@Override
				public void connectionLost(Throwable throwable)
				{
					System.out.println(「connectionLost」);
					try
					{
						sampleClient.connect(connOpts);
						sampleClient.subscribe(topic);
					}
					catch (MqttException e)
					{
						e.printStackTrace();
					}
				}

				@Override
				public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception
				{
					System.out.println(「messageArrived: 」+mqttMessage.toString());
					System.out.println(topic);
					System.out.println(mqttMessage.toString());
					try
					{
						JSONTokener jsonParser = new JSONTokener(mqttMessage.toString());
						JSONObject person = (JSONObject) jsonParser.nextValue();
						temperature = person.getDouble(「temperature」);
						humidity = person.getDouble(「humidity」);
						light = person.getDouble(「light」);
						pressure = person.getDouble(「pressure」);
						System.out.println(「temperature = 」+temperature);
						System.out.println(「humidity = 」+humidity);
						runOnUiThread(new Runnable()
						{ 				@Override
							public void run()
							{
								temperatureTextView.setText(String.format(「 % .1 f」, temperature));
								humidityTextView.setText(String.format(「 % .1 f」, humidity));
								lightTextView.setText(String.format(「 % .1 f」, light));
								pressureTextView.setText(String.format(「 % .1 f」, pressure));
							} });
					}
					catch (JSONException ex)
					{
						ex.printStackTrace();
					}
				}

				@Override
				public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken)
				{
					System.out.println(「deliveryComplete」);
				} });
			sampleClient.connect(connOpts);
			sampleClient.subscribe(topic);
		}
		catch (MqttException me)
		{
			System.out.println(「reason「 + me.getReasonCode());
			System.out.println(「msg「 + me.getMessage());
			System.out.println(「loc「 + me.getLocalizedMessage());
			System.out.println(「cause「 + me.getCause());
			System.out.println(「excep「 + me);
			me.printStackTrace();
		}
	}
}

3 實測效果

1,單片機端定時更新傳感器數據,手機端也會同步更新;

2,手機端點擊 4 個 LED 控制的按鈕,板子上也會點亮或者熄滅對應的 LED;

4 源碼下載

4.1 STM32端源碼下載
MQTT_STM32_W5500.rar
4.2 手機端源碼下載
MQTT_Android.rar
4.3 手機端apk下載
stm32_w5500_mqtt_app.rar