JMS学习二(简单的ActiveMQ实例)

前一篇文章我们整体上学习了JMS,这篇文章我们来写写小demo实践一下

在写之前我们要下载安装ActiveMQ服务,下载地址当然可以去官网下载,但我下载下来的有linux 和win两个版本但是win的只有32位的,所以这里给一个win32、64的下载地址

下载ActiveMQ5.9

ActiveMQ安装很简单,下载解压后到bin目录就有win32 和win64两个目录按照自己的系统进入后就有activemq.bat来启动ActiveMQ服务

一、点对点消息模型实例

使用queue作为目的之

1、消息发送端

package mqtest1;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Producer {
	public static void main(String[] args) {
		int i =0;
		//链接工厂
		ActiveMQConnectionFactory connectionFactory = null;
		//链接对象
		Connection connection = null;
		//会话
		Session session = null;
		//队列(目的地、生产者发送消息的目的地)
		Queue  queue = null;
		//消息生产者
		MessageProducer producer = null;
		connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.1.120:61616");
		try {
			connection = connectionFactory.createConnection();
			connection.start();
			//第一个参数是否开启事务 true开启 ,false不开启事务,如果开启记得手动提交
			//参数二,表示的是签收模式,一般使用的有自动签收和客户端自己确认签收
			session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
		    queue  = session.createQueue("test_queue");
			//为队列创建消息生产者
		     producer =  session.createProducer(queue);
			//消息是否为持久性的,这个不设置也是可以的,默认是持久的
			//producer.setDeliveryMode(DeliveryMode.PERSISTENT); //消息设置为持久的发送后及时服务关闭了再次开启消息也不会丢失。
			//producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //发送后如果服务关闭再次开启则消息会丢失。
			while (true){
				//创建消息
				TextMessage message = session.createTextMessage();
				message.setText("测试队列消息"+i);
				//发送消息到目的地
				producer.send(message);
				i++;
				if(i>10) {
					break;
				}
			}
			session.commit();
			System.out.println("呵呵消息发送结束");
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} finally  {
			//释放资源
				//producer.close();
				//session.close();
				//connection.close();
		}
	}
}
2、消息消费端

package mqtest1;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Receive {
	public static void main(String[] args) {
		// 链接工厂
		ActiveMQConnectionFactory connectionFactory = null;
		// 链接对象
		Connection connection = null;
		// 会话
		Session session = null;
		// 队列(目的地,消费者消费消息的地方)
		Queue queue = null;
		// 消息消费者
		MessageConsumer consumer = null;
		connectionFactory = new ActiveMQConnectionFactory("admin", "admin",
				"tcp://192.168.1.120:61616");
		try {
			connection = connectionFactory.createConnection();
			connection.start();
			// 创建session是的true 和false
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			queue = session.createQueue("test_queue"); // 队列(目的地,消费者消费消息的地方)
			consumer = session.createConsumer(queue); // 消息消费者
			// Message message = consumer.receive(); //同步方式接收
			consumer.setMessageListener(new MessageListener() {
				@Override
				public void onMessage(Message message) {
					TextMessage textMessage = (TextMessage) message;
					try {
						String value = textMessage.getText();
						System.out.println("value: " + value);
					} catch (JMSException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
				}
			});
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}

点对点模型Destination作为目的地

1、消息发送端

package mq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;

public class TestMQ {
	public static void main(String[] args) {
		int i =0;
		//链接工厂
		ConnectionFactory connectionFactory = null;
		// 链接对象
		Connection connection = null;
		// 会话对象
		Session session = null;
		// 目的地
		Destination destination = null;
		// 消息生产者
		MessageProducer producer = null;
		connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.1.120:61616");
		try {
			connection = connectionFactory.createConnection();
			connection.start();
			//第一个参数是否开启事务 true开启 ,false不开启事务,如果开启记得手动提交
			//参数二,表示的是签收模式,一般使用的有自动签收和客户端自己确认签收
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			destination = session.createQueue("test-queue");
			//为目的地创建消息生产者
			producer = session.createProducer(destination);
			//消息是否为持久性的,这个不设置也是可以的,默认是持久的
			producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
			while(true) {
				TestBean tbean = new TestBean();
				tbean.setAge(25);
				tbean.setName("hellojava" +i);
				producer.send(session.createObjectMessage(tbean));
				i++;
				if( i>10) {
					break;
				}
			}
			System.out.println("呵呵消息已发送");
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} finally {
			try {
				producer.close();
				session.close();
				connection.close();
			} catch (JMSException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	}
}

2、消息消费端

package mq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

public class AcceptMq {
public static void main(String[] args) {
	ConnectionFactory connectionFactory;
	// Connection :JMS 客户端到JMS Provider 的连接  
	Connection connection = null;
	// Session: 一个发送或接收消息的线程  
	Session session = null;
	// Destination :消息的目的地;消息发送给谁.  
	Destination destination = null;
	// 消费者,消息接收者  
	//MessageConsumer consumer = null;
	connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.1.120:61616");
	try {
		//通过工厂创建链接
		connection = connectionFactory.createConnection();
		//启动链接
		connection.start();
		//创建会话
		session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
		//消息目的地
		destination = session.createQueue("test-queue");
		//消息消费者
		MessageConsumer consumer = session.createConsumer(destination);
		//同步方式接受信息,如果还没有获取到则会阻塞直到接收到信息
		/*Message messages = consumer.receive();
		TestBean  value  =(TestBean)((ObjectMessage)messages).getObject();
                 String name = value.getName();*/
		consumer.setMessageListener(new MessageListener(){
			@Override
			public void onMessage(Message message){
				try {
				TestBean  tbean =(TestBean)((ObjectMessage)message).getObject();
				System.out.println("tbean: "+tbean);
				if(null != message) {
					 System.out.println("收到信息1: "+tbean.getName());
				}
				} catch (JMSException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		});
	} catch (JMSException e) {
		// TODO Auto-generated catch block
		e.printStackTrace();
	} 
}
}

3、bean 类

package mq;

import java.io.Serializable;

public class TestBean implements Serializable{
private int age;
private String name;
public TestBean() {};
public TestBean(int age, String name) {
	this.age = age;
	this.name = name;
}
public int getAge() {
	return age;
}
public void setAge(int age) {
	this.age = age;
}
public String getName() {
	return name;
}
public void setName(String name) {
	this.name = name;
}
}

二、发布/订阅消息模型实例

1、消息发布端

package mq;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
public class PSMQ {
    public static void main(String[] args) throws JMSException {  
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.1.101:61616");  
        Connection connection = factory.createConnection();  
        connection.start();  
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
        //创建话题
        Topic topic = session.createTopic("myTopic.messages");  
        //为话题创建消息生产者
        MessageProducer producer = session.createProducer(topic);  
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);  
        while(true) {  
            TextMessage message = session.createTextMessage();  
            message.setText("message_" + System.currentTimeMillis());  
            producer.send(message);  
            System.out.println("Sent message: " + message.getText());  
        }  
    }  
}

2、消息订阅端

package mq;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.MessageListener;
import org.apache.activemq.ActiveMQConnectionFactory;
public class PSAccept {
	 public static void main(String[] args) throws JMSException {  
	        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.1.101:61616");  
	        Connection connection = factory.createConnection();  
	        connection.start();  
	        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
	        //创建话题
	        Topic topic = session.createTopic("myTopic.messages"); 
	        //为话题创建消费者
	        MessageConsumer consumer = session.createConsumer(topic); 
	        consumer.setMessageListener(new MessageListener() { 
				@Override
				public void onMessage(Message message) {
	                TextMessage tm = (TextMessage) message;  
	                try {  
	                    System.out.println("Received message: " + tm.getText());  
	                } catch (JMSException e) {  
	                    e.printStackTrace();  
	                }  
				}  
	        });  
	    } 
}

ok 三个demo很简单,在项目中导入jms相关的jar包,代码复制过去就能跑了,在看看第一篇文章就大体上能明白各个类是干什么用的。

点对点消息模型和发布/订阅消息模型两种方式其实不同的就是使用队列、还是使用话题创建目的地不同其他的都一样。

connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.1.120:61616");

其中第一个admin是用户名第二个是密码而第三个参数就是协议+ip+port(端口),这几个参数两个客户端都是一样的不然消费端就获取不到了……

在消息消费者中我们接收消息有两种方式即同步接收和异步接收,同步接受就是使用receive()方法来接受而异步就是设置一个监听对象。


说到密码我们顺便来看看ActiveMQ访问密码的设置


三、ActiveMQ访问密码设置

在ActiveMQ的conf目录的activemq.xml中添加账号密码

 <plugins>  
            <simpleAuthenticationPlugin>  
                <users>  
                    <authenticationUser username="whd" password="123" groups="users,admins"/>  
                </users>  
            </simpleAuthenticationPlugin>  
        </plugins> 
activemq.xml中添加位置:


ok这样我们对这个ActiveMQ设置了一个用户名密码,所以在创建链接的时候要修改admin这个默认的用户名密码为修改后的用户名密码。

connectionFactory = new ActiveMQConnectionFactory("whd", "123","tcp://192.168.0.104:61616");
这样我们就能正常的向服务器发送消息而消费端也能从服务商消费消息了……

差点忘了,还有一个ActiveMQ管理页面地址:http://127.0.0.1:8161/admin/ 访问这个地址登陆管理页面,默认用户名密码都是admin

好了先就小高兴一下吧,之后接着来深入学习……


评论 2
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值