使用activeMQ做消息异步传输, 消息从Agent端(Python)发送, springboot 端接受异步消息
需求:
- 需要支持创建多个Consumer 同时消费 多个Queue的场景.
本来考虑直接把consumer做成单例模式, 但对上面需求支持会比较困难;
- 有效的资源利用: 不能太频繁的做线程创建和 销毁操作,影响系统性能
使用全局线程池就比 局部线程池 更节约资源
综上因素, 最后决定 使用全局的线程池, 把线程池做成单例, consumer可以是多个实例共享全局线程池的的方法实现…
Spring中Controller 默认是单例,通过外部请求生成新的 MQ consumer 实例

1. 配置 全局线程池
@Configuration public class ThreadPoolConfig { @Bean public ThreadPoolTaskExecutor threadPoolTaskExecutor(){ ThreadPoolTaskExecutor myThreadPoolTaskExecutor = new ThreadPoolTaskExecutor(); myThreadPoolTaskExecutor.setCorePoolSize(20); myThreadPoolTaskExecutor.setMaxPoolSize(100); myThreadPoolTaskExecutor.setKeepAliveSeconds(60); myThreadPoolTaskExecutor.setQueueCapacity(10000); myThreadPoolTaskExecutor.setThreadNamePrefix("TASK_EXECUTOR"); myThreadPoolTaskExecutor.setRejectedExecutionHandler(new java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy()); return myThreadPoolTaskExecutor; } }
2. 配置ActiveMQ异步 Consumer
MQ 的地址,Queue信息从配置文件中获取, 也可以考虑从Controller 传输进来, 目前暂时用前者;
使用Controller 接受远端控制接口消息,生成异步consumer实例:
@GetMapping("/v1/XX/on")
String XXOn(HttpServletRequest request) throws JMSException {
QueueConsumerAsyn consumer =new QueueConsumerAsyn("web server");
consumer.recive();
return "ok";
}
定义异步Consumer类:
public class QueueConsumerAsyn{
private String name = "";
private Connection connection = null;
private Session session = null;
private MessageConsumer consumer = null;
private String BROKER_URL=null;// "tcp://xx.xx.xx.xx:61616";
private String TARGET=null;// = "prediction.mq.queue";
public QueueConsumerAsyn(String name){
this.name=name;
}
public void init() throws JMSException
{
BROKER_URL= PropertyUtil.getAppProperty("activemq.brokerUrl");
TARGET=PropertyUtil.getAppProperty("activemq.agentQueue");
ConnectionFactory connectFactory = new ActiveMQConnectionFactory(
BROKER_URL);
Connection connection = connectFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//自动确认
Destination destination = session.createQueue(TARGET);
consumer = session.createConsumer(destination);
connection.start();
}
public void recive()
{
try {
init();
System.out.println("Consumer("+name+"):->Begin listening...");
// 开始监听
consumer.setMessageListener(new MsgListener()); //(异步接收)
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void submit() throws JMSException
{
session.commit();
}
// 关闭连接
public void close() throws JMSException {
System.out.println("Consumer:->Closing connection");
if (consumer != null)
consumer.close();
if (session != null)
session.close();
if (connection != null)
connection.close();
}
}
3. 定义MQ 的异步监听器
/** * 消息监听(多线程接收) * */ public class MsgListener implements MessageListener{ ThreadPoolConfig threadPoolConfig= SpringUtil.getBean(ThreadPoolConfig.class); private ThreadPoolTaskExecutor threadPool=threadPoolConfig.threadPoolTaskExecutor(); @Override public void onMessage(final Message msg) { threadPool.execute(new Runnable() { @Override public void run() { try { Thread.sleep(new Random().nextInt(2)*500); if (msg != null) { if (msg instanceof TextMessage) { //... }else if (msg instanceof MapMessage) { //... }else if (msg instanceof BytesMessage) { //... }else if (msg instanceof StreamMessage) { //... }else if (msg instanceof ObjectMessage) { //... } } } catch (InterruptedException e) { e.printStackTrace(); }catch (JMSException e) { e.printStackTrace(); } } }); } }
没有评论