SpringBoot整合ActiveMQ
生产者:
引入 maven依赖
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
引入 application.yml配置
spring:
activemq:
broker-url: tcp://127.0.0.1:61616
user: admin
password: admin
queue: springboot-queue
server:
port: 8080
创建QueueConfig
@Configuration
public class QueueConfig {
@Value(“${queue}”)
private String queue;
@Bean
public Queue logQueue() {
return new ActiveMQQueue(queue);
}
@Bean
public JmsTemplate jmsTemplate(ActiveMQConnectionFactory activeMQConnectionFactory, Queue queue) {
JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setDeliveryMode(2);// 进行持久化配置 1表示非持久化,2表示持久化</span>
jmsTemplate.setConnectionFactory(activeMQConnectionFactory);
jmsTemplate.setDefaultDestination(queue); // 此处可不设置默认,在发送消息时也可设置队列
jmsTemplate.setSessionAcknowledgeMode(4);// 客户端签收模式</span>
return jmsTemplate;
}
// 定义一个消息监听器连接工厂,这里定义的是点对点模式的监听器连接工厂
@Bean(name = "jmsQueueListener")
public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(
ActiveMQConnectionFactory activeMQConnectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(activeMQConnectionFactory);
// 设置连接数
factory.setConcurrency("1-10");
// 重连间隔时间
factory.setRecoveryInterval(1000L);
factory.setSessionAcknowledgeMode(4);
return factory;
}
}
创建Producer
@Component
@EnableScheduling
public class Producer {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Queue queue;
@Scheduled(fixedDelay = 5000)
public void send() {
jmsMessagingTemplate.convertAndSend(queue, "测试消息队列" + System.currentTimeMillis());
}
}
启动
@SpringBootApplication
@EnableScheduling
public class App {
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}
}
消费者:
引入 maven依赖
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
引入 YML配置
application.yml
spring:
activemq:
broker-url: tcp://127.0.0.1:61616
user: admin
password: admin
queue: springboot-queue
server:
port: 8081
创建Consumer
@JmsListener(destination = “${queue}”)
public void receive(TextMessage text, Session session) throws JMSException {
try {
System.out.println(“生产者第” + (++count) + “次向消费者发送消息..”);
// int id = 1 / 0;
String value = text.getText();
System.out.println(“消费者收到消息:” + value);
//手动签收
text.acknowledge();
} catch (Exception e) {
// 如果代码发生异常,需要发布版本才可以解决的问题,不要使用重试机制,采用日志记录方式,定时Job进行补偿。
// 如果不需要发布版本解决的问题,可以采用重试机制进行补偿。
// session.recover();// 继续重试
e.printStackTrace();
}
}
public static void main(String[] args) {
SpringApplication.run(Consumer.class, args);
}
启动
@SpringBootApplication
public class App {
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}
}
使用消息中间注意事项
1.消费者代码不要抛出异常,否则activqmq默认有重试机制。
2.如果代码发生异常,需要发布版本才可以解决的问题,不要使用重试机制,采用日志记录方式,定时Job进行补偿。
3.如果不需要发布版本解决的问题,可以采用重试机制进行补偿。
消费者如果保证消息幂等性,不被重复消费。
产生原因:网络延迟传输中,会造成进行MQ重试中,在重试过程中,可能会造成重复消费。
解决办法:
1.使用全局MessageID 判断消费方使用同一个,解决幂等性。
2.使用JMS可靠消息机制
转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 591235401@qq.com
文章标题:SpringBoot整合ActiveMQ
本文作者:阿杜同学
发布时间:2019-05-15, 20:15:35
最后更新:2019-05-15, 20:15:35
原始链接:http://yoursite.com/2019/05/15/SpringBoot%E6%95%B4%E5%90%88ActiveMQ/版权声明: "署名-非商用-相同方式共享 4.0" 转载请保留原文链接及作者。