我是MQ programmimg的新手。根据我的要求,我试图将示例XML消息放入队列中,并期望从响应队列返回响应。 我可以看到关联的频道在短时间内打开,几秒钟后关闭。请在下面找到我用于将消息放入队列的代码。要求解决此问题的宝贵意见。通过java代码关闭MQ通道
错误:
Process(12908.13579) User(abc) Program(amqrmppa)
Host(hostname)
AMQ9208: Error on receive from host 10 (10.0.0.1).
EXPLANATION:
An error occurred receiving data from 10 (10.0.0.1) over TCP/IP. This may
be due to a communications failure.
代码中使用:
package com.company.mq;
import java.io.DataInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.URL;
import com.ibm.mq.MQC;
import com.ibm.mq.MQEnvironment;
import com.ibm.mq.MQException;
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQPutMessageOptions;
import com.ibm.mq.MQQueue;
import com.ibm.mq.MQQueueManager;
public class MQConnection {
private static final String CORR_ID = "CORRELID";
String qMgrStr = "";
String hostName = "hostname";
String password ="xxxx";
String userName ="username";
String putqueueName = "putqueuename";
String getqueuename = "getqueuename ";
String channel = "channel";
String replyToQueue = "replyToQueue";
String replyToQueueManager = "";
static String content = "";
int port =10000;
MQQueue readQueue = null;
MQQueue writeQueue = null;
MQQueueManager qManager;
@SuppressWarnings("unchecked")
public void init(){
MQEnvironment.hostname =hostName;
MQEnvironment.channel = channel;
MQEnvironment.port = port;
MQEnvironment.userID = userName;
MQEnvironment.password = password;
MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_CLIENT);
try {
qManager = new MQQueueManager("");
System.out.println("qManager====>"+qManager);
}catch(Exception e){
e.printStackTrace();
}
try {
System.out.println("qManager==> hhh"+qManager);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public String putAndGetMessage() throws InterruptedException, IOException{
int openOptions = MQC.MQOO_OUTPUT | MQC.MQPMO_SET_ALL_CONTEXT | MQC.MQOO_FAIL_IF_QUIESCING;
String msgString = content.toString();
System.out.println("msgString=="+msgString);
int expiryTime =60000;
MQMessage getmessage = null;
int waitInterval =4000;
try {
System.out.println("qManager Desc==>"+qManager.getDescription());
writeQueue =openWriteQueue(qManager,putqueueName);
MQMessage message = myPut(writeQueue,msgString,expiryTime,getqueuename);
// qManager.accessQueue(putqueueName, openOptions,null,null,null);
readQueue =openReadQueue(qManager,getqueuename);
getmessage =mqGet(readQueue,waitInterval,message.messageId);
/*MQMessage msg = new MQMessage();
msg.messageType = MQC.MQMT_REQUEST;
msg.format = "MQSTR";
// msg.characterSet = 500;
msg.persistence = MQC.MQPER_NOT_PERSISTENT;
msg.correlationId = CORR_ID.getBytes();
// msg.messageId = CORR_ID.getBytes();
msg.expiry= 10000;*/
/*System.out.println("before");
Thread.sleep(10000);
System.out.println("after");*/
/*MQGetMessageOptions gmo = new MQGetMessageOptions();
int openOptions1 = MQC.MQGMO_WAIT| MQC.MQGMO_CONVERT| MQC.MQGMO_FAIL_IF_QUIESCING;
System.out.println("in getqManager==>"+qManager);
readQueue = qManager.accessQueue(getqueuename, openOptions1);
System.out.println("deafaultQueue======>"+readQueue);
readQueue.get(getmessage,gmo);
System.out.println(getmessage.readInt());
String retriveMsg = getmessage.readUTF();
System.out.println("read===>"+retriveMsg);*/
} catch(MQException e){
e.printStackTrace();
}
return getmessage.readString(getmessage.getMessageLength());
}
private MQMessage mqGet(MQQueue readQueue2, int waitInterval,
byte[] corrID) throws MQException {
MQMessage responseMessage = new MQMessage();
responseMessage.correlationId =corrID;
MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.options =MQC.MQGMO_WAIT| MQC.MQGMO_CONVERT| MQC.MQGMO_FAIL_IF_QUIESCING;
gmo.matchOptions = MQC.MQMO_MATCH_CORREL_ID;
gmo.waitInterval = waitInterval;
// TODO Auto-generated method stub
readQueue2.get(responseMessage,gmo);
return responseMessage;
}
private MQQueue openReadQueue(MQQueueManager manager, String getqueuename2) throws MQException {
// TODO Auto-generated method stub
return openQueue(manager,getqueuename2,MQC.MQOO_INPUT_SHARED | MQC.MQOO_INQUIRE |MQC.MQOO_FAIL_IF_QUIESCING);
}
private MQMessage myPut(MQQueue writeQueue2, String msgString,
int expiryTime, String getqueuename2) {
// TODO Auto-generated method stub
MQPutMessageOptions mpo =new MQPutMessageOptions();
mpo.options = MQC.MQPMO_NEW_MSG_ID | MQC.MQMO_MATCH_CORREL_ID;
MQMessage putmessage = new MQMessage();
putmessage.format = MQC.MQFMT_STRING;
putmessage.messageFlags = MQC.MQMT_REQUEST;
putmessage.replyToQueueName =replyToQueue;
putmessage.replyToQueueManagerName = qMgrStr;
putmessage.userId="userId";
putmessage.expiry =expiryTime;
try {
putmessage.write(msgString.getBytes());
try {
writeQueue2.put(putmessage,mpo);
} catch (MQException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return putmessage;
}
private MQQueue openWriteQueue(MQQueueManager manager, String queueName) throws MQException{
// TODO Auto-generated method stub
return openQueue(manager,queueName,MQC.MQOO_OUTPUT | MQC.MQPMO_SET_ALL_CONTEXT | MQC.MQOO_FAIL_IF_QUIESCING);
}
private MQQueue openQueue(MQQueueManager manager, String queueName, int options) throws MQException{
// TODO Auto-generated method stub
return manager.accessQueue(queueName, options,null,null,null);
}
/**
* @param args
* @throws IOException
* @throws InterruptedException
*/
public static void main(String[] args) throws IOException, InterruptedException {
// TODO Auto-generated method stub
MQConnection conn = new MQConnection();
DataInputStream dis = new DataInputStream (new FileInputStream ("c://request//Request.xml"));
byte[] datainBytes = new byte[dis.available()];
dis.readFully(datainBytes);
dis.close();
content = new String(datainBytes, 0, datainBytes.length);
//System.out.println("content===>"+content);
conn.init();
System.out.println("connected");
conn.putAndGetMessage();
}
}
您没有提及您使用的是哪个版本。你是否从应用程序中收到错误?我希望抛出某种异常...... –
是否有指定为错误的一部分的MQ原因码?我猜你的问题中的错误消息来自队列管理器错误日志。如果是这样,这是完整的错误信息,因为它似乎比平常短很多? –