SpringBoot整合ActiveMQ

生产者:
引入 maven依赖

org.springframework.boot
spring-boot-starter-parent
1.5.4.RELEASE



<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>



org.springframework.boot
spring-boot-starter



org.springframework.boot
spring-boot-starter-web


org.springframework.boot
spring-boot-starter-test
test


org.springframework.boot
spring-boot-starter-activemq





org.springframework.boot
spring-boot-maven-plugin


引入 application.yml配置
spring:
activemq:
broker-url: tcp://127.0.0.1:61616
user: admin
password: admin
queue: springboot-queue
server:
port: 8080

创建QueueConfig
@Configuration
public class QueueConfig {
@Value(“${queue}”)
private String queue;

@Bean
public Queue logQueue() {
    return new ActiveMQQueue(queue);
}

@Bean
public JmsTemplate jmsTemplate(ActiveMQConnectionFactory activeMQConnectionFactory, Queue queue) {
    JmsTemplate jmsTemplate = new JmsTemplate();
    jmsTemplate.setDeliveryMode(2);// 进行持久化配置 1表示非持久化,2表示持久化</span>
    jmsTemplate.setConnectionFactory(activeMQConnectionFactory);
    jmsTemplate.setDefaultDestination(queue); // 此处可不设置默认,在发送消息时也可设置队列
    jmsTemplate.setSessionAcknowledgeMode(4);// 客户端签收模式</span>
    return jmsTemplate;
}

// 定义一个消息监听器连接工厂,这里定义的是点对点模式的监听器连接工厂
@Bean(name = "jmsQueueListener")
public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(
        ActiveMQConnectionFactory activeMQConnectionFactory) {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(activeMQConnectionFactory);
    // 设置连接数
    factory.setConcurrency("1-10");
    // 重连间隔时间
    factory.setRecoveryInterval(1000L);
    factory.setSessionAcknowledgeMode(4);
    return factory;
}

}

创建Producer
@Component
@EnableScheduling
public class Producer {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Queue queue;

@Scheduled(fixedDelay = 5000)
public void send() {
    jmsMessagingTemplate.convertAndSend(queue, "测试消息队列" + System.currentTimeMillis());
}

}

启动
@SpringBootApplication
@EnableScheduling
public class App {
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}
}

消费者:
引入 maven依赖

org.springframework.boot
spring-boot-starter-parent
1.5.4.RELEASE



<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>



org.springframework.boot
spring-boot-starter



org.springframework.boot
spring-boot-starter-web


org.springframework.boot
spring-boot-starter-test
test


org.springframework.boot
spring-boot-starter-activemq





org.springframework.boot
spring-boot-maven-plugin


引入 YML配置
application.yml
spring:
activemq:
broker-url: tcp://127.0.0.1:61616
user: admin
password: admin
queue: springboot-queue
server:
port: 8081

创建Consumer
@JmsListener(destination = “${queue}”)
public void receive(TextMessage text, Session session) throws JMSException {
try {
System.out.println(“生产者第” + (++count) + “次向消费者发送消息..”);
// int id = 1 / 0;
String value = text.getText();
System.out.println(“消费者收到消息:” + value);
//手动签收
text.acknowledge();
} catch (Exception e) {
// 如果代码发生异常,需要发布版本才可以解决的问题,不要使用重试机制,采用日志记录方式,定时Job进行补偿。
// 如果不需要发布版本解决的问题,可以采用重试机制进行补偿。
// session.recover();// 继续重试
e.printStackTrace();
}
}

public static void main(String[] args) {
    SpringApplication.run(Consumer.class, args);
}

启动
@SpringBootApplication
public class App {
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}
}

使用消息中间注意事项
1.消费者代码不要抛出异常,否则activqmq默认有重试机制。
2.如果代码发生异常,需要发布版本才可以解决的问题,不要使用重试机制,采用日志记录方式,定时Job进行补偿。
3.如果不需要发布版本解决的问题,可以采用重试机制进行补偿。

消费者如果保证消息幂等性,不被重复消费。
产生原因:网络延迟传输中,会造成进行MQ重试中,在重试过程中,可能会造成重复消费。
解决办法:
1.使用全局MessageID 判断消费方使用同一个,解决幂等性。
2.使用JMS可靠消息机制


转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 591235401@qq.com

文章标题:SpringBoot整合ActiveMQ

本文作者:阿杜同学

发布时间:2019-05-15, 20:15:35

最后更新:2019-05-15, 20:15:35

原始链接:http://yoursite.com/2019/05/15/SpringBoot%E6%95%B4%E5%90%88ActiveMQ/

版权声明: "署名-非商用-相同方式共享 4.0" 转载请保留原文链接及作者。

目录