Skip to content

AMQP Client Integration Guide

AMQP is mainly used for SaaS system integration, enabling SaaS applications to receive device uplink data in real-time.

Protocol Version Information

For detailed information about the AMQP protocol standard, please refer to the AMQP Protocol Standard. The IoT platform AMQP subscription only supports the AMQP 1.0 protocol standard.

For creating client connections, you can refer to the documentation Client Documentation - RabbitMQ.

Prerequisites

  1. Before using the AMQP service, you need to create a SaaS application under the SaaS Development -> SaaS Management menu.

  2. Create or use an existing queue in the SaaS Details -> Message Subscription -> Queue List page, and obtain the queue name, which will be needed as a parameter for connection.

  3. Create a product-level subscription in the SaaS Details -> Message Subscription -> Subscription List page, and configure the event types that need to be pushed as needed. (Note: Different data levels can subscribe to different event types.)

  4. If you need to manage message subscription features through the SaaS application, please activate and authorize the message subscription service package in the SaaS Details -> Service page.

  5. Obtain the AccessKey and AccessSecret, which will be needed as parameters for connection.

Connection Authentication Process

  1. First, the AMQP client and the Developer Center establish a TCP connection through a three-way handshake, then perform a TLS handshake verification.

For security purposes, the receiver must use TLS encryption; unencrypted TCP transmission is not supported.

  1. The client requests to establish a Connection. The connection authentication method is PLAIN-SASL, which can be understood as username and password authentication.

After the IoT platform authenticates the username and password, the Connection is established.

In addition, according to the AMQP protocol, when establishing a connection, the client also needs to include the heartbeat time in the Open frame, which is the idle-time-out parameter of the AMQP protocol. The heartbeat time unit is milliseconds, with a value range of 30,000 to 60,000. If there is no frame communication on the Connection beyond the heartbeat time, the platform will close the connection.

  1. The client initiates a request to the IoT platform to establish a Receiver Link (a one-way channel through which the IoT platform pushes data to the client).

After successfully establishing a Connection, the client needs to complete the Receiver Link establishment within 15 seconds, otherwise the platform will close the connection.

After establishing the Receiver Link, the client successfully connects to the Developer Center.

Connection Configuration Guide

The connection address and authentication parameters for AMQP client access to the Developer Center are as follows:

NameDescription
connectionUrlClient connection address.
For China region, enter: amqps://iot-amqp.quectelcn.com:5671/quec-open
For Europe region, enter: amqps://iot-amqp.acceleronix.io:5671/quec-open
usernameUsername used when creating the connection, see Username Generation Guide below for generation method.
passwordPassword used when creating the connection, generated from username and AccessSecret, see Password Generation Guide below for generation method.
queueNameQueue name for connection, can be obtained from the SaaS Details -> Message Subscription -> Queue List page.

Username and Password Generation Methods

Username Generation Parameters

Parameter NameParameter DescriptionParameter TypeParameter Description
verVersion NumberStringFixed value "1"
auth_modeTypeStringFixed value "accessKey"
sign_methodSignature MethodStringFixed value "sha256"
access_keyAccessKeyStringThe AccessKey created by the user, can be obtained from the SaaS Details page.
timestampTimestampLongPrecision in milliseconds, note that the time difference with standard time must be within 10 minutes

Username Generation Guide

The username fields are composed in a key=value format, with fields joined using the "&" character. There is no specific order requirement for the fields.

Example:

username

java
ver=1&auth_mode=accessKey&sign_method=sha256&access_key=${AccessKey}&timestamp=${timestamp}

Password Generation Guide

The password is generated by appending the credential key ${AccessSecret} to the username ${username}, then using the sha256 hash algorithm.

Example:

password

java
sha256(${username}${AccessSecret})

Java Integration Example

Development Environment

This example uses JDK 1.8 as the development environment.

Add Maven Dependencies

java
<dependency>
    <groupId>commons-codec</groupId>
    <artifactId>commons-codec</artifactId>
    <version>1.15</version>
</dependency>
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.10.0</version>
</dependency>

Example Code

java
package amqp.rabbit.client.demo;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.impl.CredentialsProvider;
import com.rabbitmq.client.impl.DefaultCredentialsRefreshService;
import org.apache.commons.codec.digest.DigestUtils;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Calendar;
import java.util.concurrent.CountDownLatch;


public class RabbitmqClientDemo {

    public static void main(String[] args) {
        String accessKey = "${accessKey}";
        String accessSecret = "${accessSecret}";
        String url = "${connectionUrl}", queueName = "${queueName}";
        // Timestamp used for connection authentication, needs to be refreshed to current time when reconnecting
        long timestamp = System.currentTimeMillis();
        String username = String.format("ver=1&auth_mode=accessKey&sign_method=sha256&access_key=%s&timestamp=%s", accessKey, timestamp);
        String password = DigestUtils.sha256Hex(username + accessSecret).toLowerCase();
        // AccessCredentials refresh cycle, cannot be greater than 10 minutes
        int RefreshTime = 4;

        try {
            CountDownLatch latch = new CountDownLatch(1);
            ConnectionFactory factory = new ConnectionFactory();
            factory.setUri(url);
            factory.setUsername(username);
            factory.setPassword(password);
            factory.useSslProtocol();
            factory.setAutomaticRecoveryEnabled(true);
            factory.setTopologyRecoveryEnabled(true);
            factory.setCredentialsProvider(new AccessCredentialsProvider(accessKey, accessSecret, Duration.ofMinutes(RefreshTime)));
            factory.setCredentialsRefreshService(new DefaultCredentialsRefreshService.DefaultCredentialsRefreshServiceBuilder().build());

            try (Connection conn = factory.newConnection(); Channel channel = conn.createChannel()) {
                channel.basicQos(1); // DeliverCallback
                DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                    try {
                        String message = new String(delivery.getBody(), StandardCharsets.UTF_8); // handle
                        System.out.println("Received message: '" + message + "', timestamp: " + System.currentTimeMillis());
                    } finally {
                        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                    }
                }; // acutoAck true/false

                channel.basicConsume(queueName, false, deliverCallback, consumerTag -> {
                    System.out.println("The consumer is cancelled");
                    latch.countDown();
                });
                latch.await();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

/**
 * Used to refresh AccessCredentials to prevent expiration during reconnection
 */
class AccessCredentialsProvider implements CredentialsProvider {

    private final String accessKey;
    private final String accessSecret;
    private final Duration expireDuration;

    private String username;
    private String password;

    public AccessCredentialsProvider(String accessKey, String accessSecret, Duration expireDuration) {
        this.accessKey = accessKey;
        this.accessSecret = accessSecret;
        this.expireDuration = expireDuration;
        refresh();
    }

    @Override
    public void refresh() {
        long timestamp= Calendar.getInstance().getTimeInMillis();
        this.username = String.format("ver=1&auth_mode=accessKey&sign_method=sha256&access_key=%s&timestamp=%s", accessKey, timestamp);
        this.password = DigestUtils.sha256Hex(username + accessSecret).toLowerCase();
    }

    @Override
    public String getUsername() {
        return this.username;
    }

    @Override
    public String getPassword() {
        return this.password;
    }

    @Override
    public Duration getTimeBeforeExpiration() {
        return this.expireDuration;
    }
}