Appearance
AMQP Client Integration Guide
AMQP is mainly used for SaaS system integration, enabling SaaS applications to receive device uplink data in real-time.
Protocol Version Information
For detailed information about the AMQP protocol standard, please refer to the AMQP Protocol Standard. The IoT platform AMQP subscription only supports the AMQP 1.0 protocol standard.
For creating client connections, you can refer to the documentation Client Documentation - RabbitMQ.
Prerequisites
Before using the AMQP service, you need to create a SaaS application under the SaaS Development -> SaaS Management menu.
Create or use an existing queue in the SaaS Details -> Message Subscription -> Queue List page, and obtain the queue name, which will be needed as a parameter for connection.
Create a product-level subscription in the SaaS Details -> Message Subscription -> Subscription List page, and configure the event types that need to be pushed as needed. (Note: Different data levels can subscribe to different event types.)
If you need to manage message subscription features through the SaaS application, please activate and authorize the message subscription service package in the SaaS Details -> Service page.
Obtain the AccessKey and AccessSecret, which will be needed as parameters for connection.
Connection Authentication Process
- First, the AMQP client and the Developer Center establish a TCP connection through a three-way handshake, then perform a TLS handshake verification.
For security purposes, the receiver must use TLS encryption; unencrypted TCP transmission is not supported.
- The client requests to establish a Connection. The connection authentication method is PLAIN-SASL, which can be understood as username and password authentication.
After the IoT platform authenticates the username and password, the Connection is established.
In addition, according to the AMQP protocol, when establishing a connection, the client also needs to include the heartbeat time in the Open frame, which is the idle-time-out parameter of the AMQP protocol. The heartbeat time unit is milliseconds, with a value range of 30,000 to 60,000. If there is no frame communication on the Connection beyond the heartbeat time, the platform will close the connection.
- The client initiates a request to the IoT platform to establish a Receiver Link (a one-way channel through which the IoT platform pushes data to the client).
After successfully establishing a Connection, the client needs to complete the Receiver Link establishment within 15 seconds, otherwise the platform will close the connection.
After establishing the Receiver Link, the client successfully connects to the Developer Center.
Connection Configuration Guide
The connection address and authentication parameters for AMQP client access to the Developer Center are as follows:
Name | Description |
---|---|
connectionUrl | Client connection address. For China region, enter: amqps://iot-amqp.quectelcn.com:5671/quec-open For Europe region, enter: amqps://iot-amqp.acceleronix.io:5671/quec-open |
username | Username used when creating the connection, see Username Generation Guide below for generation method. |
password | Password used when creating the connection, generated from username and AccessSecret, see Password Generation Guide below for generation method. |
queueName | Queue name for connection, can be obtained from the SaaS Details -> Message Subscription -> Queue List page. |
Username and Password Generation Methods
Username Generation Parameters
Parameter Name | Parameter Description | Parameter Type | Parameter Description |
---|---|---|---|
ver | Version Number | String | Fixed value "1" |
auth_mode | Type | String | Fixed value "accessKey" |
sign_method | Signature Method | String | Fixed value "sha256" |
access_key | AccessKey | String | The AccessKey created by the user, can be obtained from the SaaS Details page. |
timestamp | Timestamp | Long | Precision in milliseconds, note that the time difference with standard time must be within 10 minutes |
Username Generation Guide
The username fields are composed in a key=value format, with fields joined using the "&" character. There is no specific order requirement for the fields.
Example:
username
java
ver=1&auth_mode=accessKey&sign_method=sha256&access_key=${AccessKey}×tamp=${timestamp}
Password Generation Guide
The password is generated by appending the credential key ${AccessSecret} to the username ${username}, then using the sha256 hash algorithm.
Example:
password
java
sha256(${username}${AccessSecret})
Java Integration Example
Development Environment
This example uses JDK 1.8 as the development environment.
Add Maven Dependencies
java
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.15</version>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
Example Code
java
package amqp.rabbit.client.demo;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.impl.CredentialsProvider;
import com.rabbitmq.client.impl.DefaultCredentialsRefreshService;
import org.apache.commons.codec.digest.DigestUtils;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Calendar;
import java.util.concurrent.CountDownLatch;
public class RabbitmqClientDemo {
public static void main(String[] args) {
String accessKey = "${accessKey}";
String accessSecret = "${accessSecret}";
String url = "${connectionUrl}", queueName = "${queueName}";
// Timestamp used for connection authentication, needs to be refreshed to current time when reconnecting
long timestamp = System.currentTimeMillis();
String username = String.format("ver=1&auth_mode=accessKey&sign_method=sha256&access_key=%s×tamp=%s", accessKey, timestamp);
String password = DigestUtils.sha256Hex(username + accessSecret).toLowerCase();
// AccessCredentials refresh cycle, cannot be greater than 10 minutes
int RefreshTime = 4;
try {
CountDownLatch latch = new CountDownLatch(1);
ConnectionFactory factory = new ConnectionFactory();
factory.setUri(url);
factory.setUsername(username);
factory.setPassword(password);
factory.useSslProtocol();
factory.setAutomaticRecoveryEnabled(true);
factory.setTopologyRecoveryEnabled(true);
factory.setCredentialsProvider(new AccessCredentialsProvider(accessKey, accessSecret, Duration.ofMinutes(RefreshTime)));
factory.setCredentialsRefreshService(new DefaultCredentialsRefreshService.DefaultCredentialsRefreshServiceBuilder().build());
try (Connection conn = factory.newConnection(); Channel channel = conn.createChannel()) {
channel.basicQos(1); // DeliverCallback
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
try {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8); // handle
System.out.println("Received message: '" + message + "', timestamp: " + System.currentTimeMillis());
} finally {
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}; // acutoAck true/false
channel.basicConsume(queueName, false, deliverCallback, consumerTag -> {
System.out.println("The consumer is cancelled");
latch.countDown();
});
latch.await();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* Used to refresh AccessCredentials to prevent expiration during reconnection
*/
class AccessCredentialsProvider implements CredentialsProvider {
private final String accessKey;
private final String accessSecret;
private final Duration expireDuration;
private String username;
private String password;
public AccessCredentialsProvider(String accessKey, String accessSecret, Duration expireDuration) {
this.accessKey = accessKey;
this.accessSecret = accessSecret;
this.expireDuration = expireDuration;
refresh();
}
@Override
public void refresh() {
long timestamp= Calendar.getInstance().getTimeInMillis();
this.username = String.format("ver=1&auth_mode=accessKey&sign_method=sha256&access_key=%s×tamp=%s", accessKey, timestamp);
this.password = DigestUtils.sha256Hex(username + accessSecret).toLowerCase();
}
@Override
public String getUsername() {
return this.username;
}
@Override
public String getPassword() {
return this.password;
}
@Override
public Duration getTimeBeforeExpiration() {
return this.expireDuration;
}
}