activemq推数据给前端的方式

思静语 2024-08-28 15:33:02 阅读 100

文章目录

消费者程序接收消息并通过 WebSocket 将消息传递给前端

消费者程序接收消息并通过 WebSocket 将消息传递给前端

ActiveMQ 是一个开源的消息代理服务,可以用来实现各种消息传递模式,包括点对点和发布/订阅模型。要将数据从 ActiveMQ 推送到前端,通常可以通过以下步骤实现:

配置 ActiveMQ 服务器:首先确保 ActiveMQ 服务器已经正确配置和运行。生产者发送消息:生产者程序将数据发送到 ActiveMQ 队列或主题中。消费者接收消息:消费者程序从 ActiveMQ 队列或主题中接收消息,并将其传递给前端。前端接收消息:前端通过 WebSocket 或者其他方式接收从消费者程序传递过来的消息。

下面是一个具体的实现方案:

步骤 1:配置 ActiveMQ 服务器

确保 ActiveMQ 服务器已经正确配置和运行。可以下载 ActiveMQ 的二进制文件并启动服务。

./bin/activemq start

步骤 2:生产者发送消息

使用 Java 作为示例,生产者代码如下:

<code>import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class Producer {

public static void main(String[] args) throws JMSException {

ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");

Connection connection = factory.createConnection();

connection.start();

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

Destination destination = session.createQueue("TEST.QUEUE");

MessageProducer producer = session.createProducer(destination);

TextMessage message = session.createTextMessage("Hello, World!");

producer.send(message);

session.close();

connection.close();

}

}

步骤 3:消费者接收消息

消费者程序接收消息并通过 WebSocket 将消息传递给前端。

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

import java.io.IOException;

import java.util.concurrent.CopyOnWriteArraySet;

import javax.websocket.*;

import javax.websocket.server.ServerEndpoint;

@ServerEndpoint("/websocket")

public class Consumer {

private static CopyOnWriteArraySet<Session> sessions = new CopyOnWriteArraySet<>();

@OnOpen

public void onOpen(Session session) {

sessions.add(session);

}

@OnClose

public void onClose(Session session) {

sessions.remove(session);

}

public static void main(String[] args) throws JMSException {

ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");

Connection connection = factory.createConnection();

connection.start();

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

Destination destination = session.createQueue("TEST.QUEUE");

MessageConsumer consumer = session.createConsumer(destination);

consumer.setMessageListener(new MessageListener() {

@Override

public void onMessage(Message message) {

if (message instanceof TextMessage) {

try {

String text = ((TextMessage) message).getText();

for (Session s : sessions) {

s.getBasicRemote().sendText(text);

}

} catch (JMSException | IOException e) {

e.printStackTrace();

}

}

}

});

}

}

步骤 4:前端接收消息

前端代码(例如,HTML + JavaScript):

ActiveMQ WebSocket Demo

Messages

<script>

var socket = new WebSocket("ws://localhost:8080/websocket");

socket.onmessage = function(event) {

var messages = document.getElementById('messages');

var message = document.createElement('li');

message.textContent = event.data;

messages.appendChild(message);

};

</script>

通过以上步骤,数据可以从 ActiveMQ 推送到前端。生产者将消息发送到 ActiveMQ 队列,消费者从队列中接收消息并通过 WebSocket 将消息传递给前端,前端通过 WebSocket 接收并显示消息。 ## 使用 MQTT 协议将数据推送到前端 ActiveMQ 也支持 MQTT 协议,可以使用 MQTT 协议将数据推送到前端。以下是一个使用 ActiveMQ 和 MQTT 将数据推送到前端的示例。 步骤 1:配置 ActiveMQ 以支持 MQTT 首先,确保 ActiveMQ 服务器已经正确配置和运行。ActiveMQ 默认支持 MQTT,但需要确保相关的 MQTT 连接器已启用。在 conf/activemq.xml 文件中,确认以下配置已存在: 启动 ActiveMQ 服务器: ./bin/activemq start 步骤 2:生产者发送消息(使用 MQTT 协议) 生产者代码可以使用任意支持 MQTT 的库,例如 Eclipse Paho MQTT 客户端库。以下是一个使用 Java 的示例:

import org.eclipse.paho.client.mqttv3.MqttClient;

import org.eclipse.paho.client.mqttv3.MqttException;

import org.eclipse.paho.client.mqttv3.MqttMessage;

public class MqttProducer {

public static void main(String[] args) {

String broker = "tcp://localhost:1883";

String topic = "test/topic";

String content = "Hello, MQTT!";

int qos = 2;

String clientId = "JavaProducer";

try {

MqttClient client = new MqttClient(broker, clientId);

client.connect();

MqttMessage message = new MqttMessage(content.getBytes());

message.setQos(qos);

client.publish(topic, message);

client.disconnect();

} catch (MqttException e) {

e.printStackTrace();

}

}

}

步骤 3:前端接收消息(使用 MQTT 协议)

前端可以使用 MQTT.js 库,通过 WebSocket 连接到 ActiveMQ 服务器并接收消息。需要确保 ActiveMQ 服务器支持 MQTT over WebSocket。可以在 conf/jetty.xml 文件中添加以下配置:

<bean id="mqttWS" class="org.eclipse.jetty.server.nio.SelectChannelConnector">code>

<property name="host" value="0.0.0.0"/>code>

<property name="port" value="1884"/>code>

<property name="acceptors" value="2"/>code>

<property name="maxIdleTime" value="30000"/>code>

<property name="lowResourcesMaxIdleTime" value="1500"/>code>

<property name="lowResourcesConnections" value="50"/>code>

<property name="statsOn" value="false"/>code>

<property name="confidential" value="false"/>code>

<property name="lowResourcesMaxIdleTime" value="30000"/>code>

<property name="acceptQueueSize" value="100"/>code>

<property name="maxBuffers" value="2048"/>code>

<property name="requestHeaderSize" value="8192"/>code>

<property name="responseHeaderSize" value="8192"/>code>

</bean>

在前端 HTML 中使用 MQTT.js 库:

<!DOCTYPE html>

<html>

<head>

<title>ActiveMQ MQTT WebSocket Demo</title>

<script src="https://cdnjs.cloudflare.com/ajax/libs/paho-mqtt/1.0.1/mqttws31.min.js"></script>code>

</head>

<body>

<h1>Messages</h1>

<ul id="messages"></ul>code>

<script>

var client = new Paho.MQTT.Client("localhost", 1884, "webClient");

client.onMessageArrived = function(message) {

var messages = document.getElementById('messages');

var msg = document.createElement('li');

msg.textContent = message.payloadString;

messages.appendChild(msg);

};

client.connect({onSuccess: function() {

client.subscribe("test/topic");

}});

</script>

</body>

</html>

通过以上步骤,数据可以通过 MQTT 协议从 ActiveMQ 推送到前端。生产者将消息发布到 MQTT 主题,前端通过 MQTT over WebSocket 连接到 ActiveMQ 并订阅相应的主题,以接收并显示消息。



声明

本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。