使用activeMQ做消息异步传输, 消息从Agent端(Python)发送, springboot 端接受异步消息
需求:
- 需要支持创建多个Consumer 同时消费 多个Queue的场景.
本来考虑直接把consumer做成单例模式, 但对上面需求支持会比较困难;
- 有效的资源利用: 不能太频繁的做线程创建和 销毁操作,影响系统性能
使用全局线程池就比 局部线程池 更节约资源
综上因素, 最后决定 使用全局的线程池, 把线程池做成单例, consumer可以是多个实例共享全局线程池的的方法实现…
Spring中Controller 默认是单例,通过外部请求生成新的 MQ consumer 实例
1. 配置 全局线程池
@Configuration public class ThreadPoolConfig { @Bean public ThreadPoolTaskExecutor threadPoolTaskExecutor(){ ThreadPoolTaskExecutor myThreadPoolTaskExecutor = new ThreadPoolTaskExecutor(); myThreadPoolTaskExecutor.setCorePoolSize(20); myThreadPoolTaskExecutor.setMaxPoolSize(100); myThreadPoolTaskExecutor.setKeepAliveSeconds(60); myThreadPoolTaskExecutor.setQueueCapacity(10000); myThreadPoolTaskExecutor.setThreadNamePrefix("TASK_EXECUTOR"); myThreadPoolTaskExecutor.setRejectedExecutionHandler(new java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy()); return myThreadPoolTaskExecutor; } }
2. 配置ActiveMQ异步 Consumer
MQ 的地址,Queue信息从配置文件中获取, 也可以考虑从Controller 传输进来, 目前暂时用前者;
使用Controller 接受远端控制接口消息,生成异步consumer实例:
@GetMapping("/v1/XX/on")
String XXOn(HttpServletRequest request) throws JMSException {
QueueConsumerAsyn consumer =new QueueConsumerAsyn("web server");
consumer.recive();
return "ok";
}
定义异步Consumer类:
public class QueueConsumerAsyn{ private String name = ""; private Connection connection = null; private Session session = null; private MessageConsumer consumer = null; private String BROKER_URL=null;// "tcp://xx.xx.xx.xx:61616"; private String TARGET=null;// = "prediction.mq.queue"; public QueueConsumerAsyn(String name){ this.name=name; } public void init() throws JMSException { BROKER_URL= PropertyUtil.getAppProperty("activemq.brokerUrl"); TARGET=PropertyUtil.getAppProperty("activemq.agentQueue"); ConnectionFactory connectFactory = new ActiveMQConnectionFactory( BROKER_URL); Connection connection = connectFactory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//自动确认 Destination destination = session.createQueue(TARGET); consumer = session.createConsumer(destination); connection.start(); } public void recive() { try { init(); System.out.println("Consumer("+name+"):->Begin listening..."); // 开始监听 consumer.setMessageListener(new MsgListener()); //(异步接收) } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public void submit() throws JMSException { session.commit(); } // 关闭连接 public void close() throws JMSException { System.out.println("Consumer:->Closing connection"); if (consumer != null) consumer.close(); if (session != null) session.close(); if (connection != null) connection.close(); } }
3. 定义MQ 的异步监听器
/** * 消息监听(多线程接收) * */ public class MsgListener implements MessageListener{ ThreadPoolConfig threadPoolConfig= SpringUtil.getBean(ThreadPoolConfig.class); private ThreadPoolTaskExecutor threadPool=threadPoolConfig.threadPoolTaskExecutor(); @Override public void onMessage(final Message msg) { threadPool.execute(new Runnable() { @Override public void run() { try { Thread.sleep(new Random().nextInt(2)*500); if (msg != null) { if (msg instanceof TextMessage) { //... }else if (msg instanceof MapMessage) { //... }else if (msg instanceof BytesMessage) { //... }else if (msg instanceof StreamMessage) { //... }else if (msg instanceof ObjectMessage) { //... } } } catch (InterruptedException e) { e.printStackTrace(); }catch (JMSException e) { e.printStackTrace(); } } }); } }
没有评论