我需要在Java中动态创建异步消息队列。我的用例是通过多个SMTP服务器发送电子邮件:我需要强制执行到同一SMTP服务器的电子邮件是顺序进程,但可以同时处理发送到不同SMTP服务器的电子邮件。我过去使用过JMS,但据我所见,它只允许创建编译时队列,而我需要在运行时创建队列(每个SMTP服务器一个队列)。在Java中动态创建异步消息队列
我是否错过了关于JMS的一些问题,或者有其他一些我应该看看的工具/建议吗?
我需要在Java中动态创建异步消息队列。我的用例是通过多个SMTP服务器发送电子邮件:我需要强制执行到同一SMTP服务器的电子邮件是顺序进程,但可以同时处理发送到不同SMTP服务器的电子邮件。我过去使用过JMS,但据我所见,它只允许创建编译时队列,而我需要在运行时创建队列(每个SMTP服务器一个队列)。在Java中动态创建异步消息队列
我是否错过了关于JMS的一些问题,或者有其他一些我应该看看的工具/建议吗?
我同意亚当,用例听起来像JMS是开销。 Java内置功能已足够:
package de.mhaller;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import org.junit.Assert;
import org.junit.Test;
public class Mailer {
@Test
public void testMailer() throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
ArrayList<Mail> log = new ArrayList<Mail>();
LinkedBlockingDeque<Mail> incoming = new LinkedBlockingDeque<Mail>();
// TODO: Put mails to be sent into the incoming queue
incoming.offer(new Mail("[email protected]", "localhost"));
incoming.offer(new Mail("[email protected]", "otherhost"));
incoming.offer(new Mail("[email protected]", "otherhost"));
incoming.offer(new Mail("[email protected]", "localhost"));
Map<Mailserver, Queue<Mail>> queues = new HashMap<Mailserver, Queue<Mail>>();
while (!incoming.isEmpty()) {
Mail mail = incoming.pollFirst();
Mailserver mailserver = findMailserver(mail);
if (!queues.containsKey(mailserver)) {
ArrayDeque<Mail> serverQueue = new ArrayDeque<Mail>();
queues.put(mailserver, serverQueue);
executor.execute(new SendMail(mailserver, serverQueue));
}
Queue<Mail> slot = queues.get(mailserver);
slot.offer(mail);
}
assertMailSentWithCorrectServer(log);
}
private void assertMailSentWithCorrectServer(ArrayList<Mail> log) {
for (Mail mail : log) {
if (!mail.server.equals(mail.sentBy.mailserver)) {
Assert.fail("Mail sent by wrong server: " + mail);
}
}
}
private Mailserver findMailserver(Mail mail) {
// TODO: Your lookup logic which server to use
return new Mailserver(mail.server);
}
private static class Mail {
String recipient;
String server;
SendMail sentBy;
public Mail(String recipient, String server) {
this.recipient = recipient;
this.server = server;
}
@Override
public String toString() {
return "mail for " + recipient;
}
}
public static class SendMail implements Runnable {
private final Deque<Mail> queue;
private final Mailserver mailserver;
public SendMail(Mailserver mailserver, Deque<Mail> queue) {
this.mailserver = mailserver;
this.queue = queue;
}
@Override
public void run() {
while (!queue.isEmpty()) {
Mail mail = queue.pollFirst();
// TODO: Use SMTP to send the mail via mailserver
System.out.println(this + " sent " + mail + " via " + mailserver);
mail.sentBy = this;
}
}
}
public static class Mailserver {
String hostname;
public Mailserver(String hostname) {
this.hostname = hostname;
}
@Override
public String toString() {
return hostname;
}
@Override
public int hashCode() {
return hostname.hashCode();
}
@Override
public boolean equals(Object obj) {
return hostname.equals(((Mailserver) obj).hostname);
}
}
}
作为规范的JMS本身在这个问题上相当沉默。大多数实现允许你这样做,而不是通过JMS本身,而是使用他们自己的API。但是你将无法将一些像MDB这样的形式连接到动态队列。相反,你需要管理你自己的连接和听众。
我们最后一次在WebSphere环境中查看这个问题时,动态地创建队列是非常困难/不可能的(我认为临时队列过于短暂)。尽管用于创建队列的API存在,但他们需要以后重新启动服务器才能激活。然后就是MDB问题了。
基于格言的肮脏解决方法如何解决所有问题都可以通过额外的间接级别来解决,该间接级别假定可用打印机的集合相对较小。
创建队列Printer01到Printer99(或更小的数字)。有一个将数据库映射到真实打印机的“数据库”。随着打印机请求的出现,您可以添加到映射表中。你可能会有一些MDB的开销,看着永远不会使用的队列,但除非你的打印机数量庞大,否则你可以负担得起?
为您的每个SMTP服务器,并限制消费者队列(MDB或消息监听器),以1
我做这个和ActiveMQ队列 - 我其实张贴在这个问题的时候,作为我也有类似的担忧(当时的JMS文档声称不支持这一点),并确信它得到了支持。
你有链接到你的问题或描述如何实现这一目的的文档? – Zecrates 2009-11-02 06:34:15
您是专门使用JMS还是可以使用java.util.concurrent和它的ExecutorServices来执行此操作? – 2009-10-30 23:34:30
我没有专门使用JMS,所以我会看看ExecutorServices,谢谢。 – Zecrates 2009-11-02 06:30:23