简易智能识别服务4–使用MQ+线程池实现异步消息监听

2018年11月1日

使用activeMQ做消息异步传输,  消息从Agent端(Python)发送, springboot 端接受异步消息

需求:

  •   需要支持创建多个Consumer  同时消费 多个Queue的场景.

本来考虑直接把consumer做成单例模式, 但对上面需求支持会比较困难;

  •   有效的资源利用: 不能太频繁的做线程创建和 销毁操作,影响系统性能

使用全局线程池就比 局部线程池 更节约资源

综上因素, 最后决定 使用全局的线程池, 把线程池做成单例, consumer可以是多个实例共享全局线程池的的方法实现…

Spring中Controller 默认是单例,通过外部请求生成新的 MQ consumer 实例

 

 

1. 配置 全局线程池

@Configuration
public class ThreadPoolConfig {

    @Bean
    public ThreadPoolTaskExecutor threadPoolTaskExecutor(){
        ThreadPoolTaskExecutor myThreadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        myThreadPoolTaskExecutor.setCorePoolSize(20);
        myThreadPoolTaskExecutor.setMaxPoolSize(100);
        myThreadPoolTaskExecutor.setKeepAliveSeconds(60);
        myThreadPoolTaskExecutor.setQueueCapacity(10000);
        myThreadPoolTaskExecutor.setThreadNamePrefix("TASK_EXECUTOR");
        myThreadPoolTaskExecutor.setRejectedExecutionHandler(new java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy());
        return myThreadPoolTaskExecutor;
    }
}

2. 配置ActiveMQ异步 Consumer

MQ 的地址,Queue信息从配置文件中获取, 也可以考虑从Controller 传输进来, 目前暂时用前者;

使用Controller 接受远端控制接口消息,生成异步consumer实例:

 @GetMapping("/v1/XX/on")
    String XXOn(HttpServletRequest request) throws JMSException {       
        QueueConsumerAsyn consumer =new QueueConsumerAsyn("web server");
        consumer.recive();
        return "ok";
    }

定义异步Consumer类:

public class QueueConsumerAsyn{
    private String name = "";
    private Connection connection = null;
    private Session session = null;
    private MessageConsumer consumer = null;

    private  String BROKER_URL=null;// "tcp://xx.xx.xx.xx:61616";
    private String TARGET=null;// = "prediction.mq.queue";

    public QueueConsumerAsyn(String name){
        this.name=name;
    }
    public  void init() throws JMSException
    {
        BROKER_URL= PropertyUtil.getAppProperty("activemq.brokerUrl");
        TARGET=PropertyUtil.getAppProperty("activemq.agentQueue");
        ConnectionFactory connectFactory = new ActiveMQConnectionFactory(
            BROKER_URL);
        Connection connection = connectFactory.createConnection();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//自动确认
        Destination destination =  session.createQueue(TARGET);
        consumer = session.createConsumer(destination);
        connection.start();

    }

    public void recive()
    {
        try {
            init();
            System.out.println("Consumer("+name+"):->Begin listening...");
            // 开始监听
            consumer.setMessageListener(new MsgListener());  //(异步接收)

        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    public void submit() throws JMSException
    {
        session.commit();
    }
    // 关闭连接
    public void close() throws JMSException {
        System.out.println("Consumer:->Closing connection");
        if (consumer != null)
            consumer.close();
        if (session != null)
            session.close();
        if (connection != null)
            connection.close();
    }

}

3. 定义MQ 的异步监听器

/**
 * 消息监听(多线程接收)
 *
 */
public class MsgListener implements MessageListener{
    ThreadPoolConfig threadPoolConfig= SpringUtil.getBean(ThreadPoolConfig.class);

    private ThreadPoolTaskExecutor threadPool=threadPoolConfig.threadPoolTaskExecutor();

    @Override
    public void onMessage(final Message msg) {
        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(new Random().nextInt(2)*500);  
                    if (msg != null) {
                       if (msg instanceof TextMessage)
                       {
                          //...
                       }else if (msg  instanceof MapMessage)
                       {
                          //...
                       }else if (msg  instanceof BytesMessage)
                       {
                          //...
                       }else if (msg  instanceof StreamMessage)
                       {
                          //...
                       }else if (msg  instanceof ObjectMessage)
                       {
                          //...
                       }
                   }
                } catch (InterruptedException e) {
                       e.printStackTrace(); 
                }catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

 

 

没有评论

发表评论

邮箱地址不会被公开。 必填项已用*标注