我有一个使用CloudHopper5.0.6库来保存SMPP连接(3.4版本)和发送或接收PDU的项目。我需要修改默认的PDUResopnse,以便通过扩展DefaultSmppSessionHandler来组织定制的PDU处理:
public class SmppSessionHandlerController extends DefaultSmppSessionHandler {
@Override
public PduResponse firePduRequestReceived(PduRequest pduRequest) {
PduRequestHandler pduReqHandler = pduRequestHandler;
PduResponse resultPduResponse = pduRequest.createResponse();
return processDefaultPduResponse(resultPduResponse);
}
private PduResponse processDefaultPduResponse(PduResponse pduResponse) {
//do some transformations here on pduResponse...
return pduResponse;
}
}
它只适用于以下目的:
现在我需要添加延迟的PDU响应发送,这里的问题开始了。我的第一次尝试是这样的:
@Override
public PduResponse firePduRequestReceived(PduRequest pduRequest) {
PduRequestHandler pduReqHandler = pduRequestHandler;
PduResponse resultPduResponse = pduRequest.createResponse();
return processDefaultPduResponse(resultPduResponse);
}
private PduResponse processDefaultPduResponse(PduResponse pduResponse) {
try {
Thread.sleep(responseDelay);
} catch (InterruptedException e) {
throw new RuntimeException("Response delay interrupted", e);
}
return pduResponse;
}
添加了当前线程的睡眠以延迟发送响应,因此调用线程被保留了responseDelay毫秒。如果没有更多的请求同时出现在这个会话中,这是很好的。在同一个会话中添加一些submit_sm负载会导致错误:
com.cloudhopper.smpp.type.SmppTimeoutException: Unable to get response within [10000 ms]
at com.cloudhopper.smpp.impl.DefaultSmppSession.sendRequestAndGetResponse(DefaultSmppSession.java:471) ~[ch-smpp-5.0.6.jar:5.0.6]
at com.cloudhopper.smpp.impl.DefaultSmppSession.enquireLink(DefaultSmppSession.java:439) ~[ch-smpp-5.0.6.jar:5.0.6]
在coudhopper源中搜索之后,我发现了这个问题,它是DefaultSmppSession类中任何操作的执行窗口锁:
future = sendWindow.offer(pdu.getSequenceNumber(), pdu, timeoutMillis, configuration.getRequestExpiryTimeout(), synchronous);
问题发生在com.cloudhopper.commons.util.windowing.Window类中,该类使用排他锁执行任何操作,因此不可能在一个线程中返回PRUResponse并从另一个线程发出请求之前等待。
接下来,尝试返回null作为请求处理(在不发送任何响应的情况下删除请求),并使用方法手动发送PDUResponse。这种方法工作了一段时间,但最后总是出现以下异常:
com.cloudhopper.smpp.type.SmppChannelException: null
at com.cloudhopper.smpp.impl.DefaultSmppSession.sendResponsePdu(DefaultSmppSession.java:581) ~[ch-smpp-5.0.6.jar:5.0.6]
at com.svzn.autotest.smppclient.impl.cloudhopper.SmppSendingManager.sendPduResponse(SmppSendingManager.java:84) ~[smpp-client-1.0.1.jar:na]
at com.svzn.autotest.smppclient.impl.cloudhopper.util.SendPduCommand.sendPduResponse(SendPduCommand.java:80) [smpp-client-1.0.1.jar:na]
at com.svzn.autotest.smppclient.impl.cloudhopper.SmppClientImpl.sendPduResponse(SmppClientImpl.java:91) [smpp-client-1.0.1.jar:na]
at com.svzn.autotest.example.testng_aggr.lib.smpp.event.BaseEventProcessor$1.run(BaseEventProcessor.java:62) [test-classes/:na]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439) [na:1.6.0_37]
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) [na:1.6.0_37]
at java.util.concurrent.FutureTask.run(FutureTask.java:138) [na:1.6.0_37]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:98) [na:1.6.0_37]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:206) [na:1.6.0_37]
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) [na:1.6.0_37]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) [na:1.6.0_37]
at java.lang.Thread.run(Thread.java:662) [na:1.6.0_37]
Caused by: org.jboss.netty.handler.timeout.WriteTimeoutException: null
at org.jboss.netty.handler.timeout.WriteTimeoutHandler.<clinit>(WriteTimeoutHandler.java:79) ~[netty-3.9.0.Final.jar:na]
at com.cloudhopper.smpp.impl.DefaultSmppClient.createSession(DefaultSmppClient.java:259) ~[ch-smpp-5.0.6.jar:5.0.6]
at com.cloudhopper.smpp.impl.DefaultSmppClient.doOpen(DefaultSmppClient.java:226) ~[ch-smpp-5.0.6.jar:5.0.6]
at com.cloudhopper.smpp.impl.DefaultSmppClient.bind(DefaultSmppClient.java:193) ~[ch-smpp-5.0.6.jar:5.0.6]
at com.svzn.autotest.smppclient.impl.cloudhopper.tasks.RebindTask.run(RebindTask.java:37) ~[smpp-client-1.0.1.jar:na]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439) [na:1.6.0_37]
at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317) [na:1.6.0_37]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150) [na:1.6.0_37]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98) [na:1.6.0_37]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180) [na:1.6.0_37]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoo
lExecutor.java:204) [na:1.6.0_37
]
... 3 common frames omitted
不知道如何以另一种方式修复此错误或在同一会话中发送异步PDUResponse。你对此有什么想法吗?
发布于 2014-07-14 11:10:03
最后,我发现了这个问题。问题发生在不正确的同步块中,它阻止并行异步事件处理(发送pdu响应)和处理请求和响应,而不以通常的方式处理。
在一个线程中调用com.cloudhopper.smpp.SmppSession.sendResponsePdu(pduResponse)方法并通过从另一个线程扩展DefaultSmppSessionHandler来调用请求和响应是完全可以的。所有的事情都会在同一次会议中处理。
更新:这里是我用来处理pdu请求的实现:
public class SmppSessionHandlerController extends DefaultSmppSessionHandler {
private static final Logger log = LoggerFactory.getLogger(SmppSessionHandlerController.class);
private volatile PduHandler pduHandler;
private PduResponseHandler pduResponseHandler;
private PduRequestHandler pduRequestHandler;
public SmppSessionHandlerController() {
super(log);
}
public PduHandler getPduHandler() {
return pduHandler;
}
public void setPduHandler(PduHandler pduHandler) {
this.pduHandler = pduHandler;
}
public PduResponseHandler getPduResponseHandler() {
return pduResponseHandler;
}
public void setPduResponseHandler(PduResponseHandler pduResponseHandler) {
this.pduResponseHandler = pduResponseHandler;
}
public PduRequestHandler getPduRequestHandler() {
return pduRequestHandler;
}
public void setPduRequestHandler(PduRequestHandler pduRequestHandler) {
this.pduRequestHandler = pduRequestHandler;
}
@Override
public void fireExpectedPduResponseReceived(PduAsyncResponse pduAsyncResponse) {
log.trace("Handling response PDU: {}", pduAsyncResponse);
pduAsyncResponse.getResponse().setReferenceObject(pduAsyncResponse.getRequest().getReferenceObject());
processPduResponse(pduAsyncResponse.getResponse());
}
@Override
public void fireUnexpectedPduResponseReceived(PduResponse pduResponse) {
log.warn("Handling unexpected response PDU: {}", pduResponse);
processPduResponse(pduResponse);
}
@Override
public boolean firePduReceived(Pdu pdu) {
PduHandler currPduHandler = pduHandler;
if (currPduHandler != null) {
SmppPdu smppPdu = PduToApiConverter.convertToApiObject(pdu);
currPduHandler.handlePduReceived(smppPdu);
}
// default handling is to accept pdu for processing up chain
return true;
}
public void firePduRequestExpired(PduRequest pduRequest) {
super.firePduRequestExpired(pduRequest);
}
private void processPduResponse(PduResponse pduResponse) {
HandlersContextHelper referenceObj = (HandlersContextHelper) pduResponse.getReferenceObject();
if (referenceObj != null) {
referenceObj.getSequenceIdHolder().addReceivedSequenceId(pduResponse.getSequenceNumber());
}
PduResponseHandler pduRespHandler = pduResponseHandler;
if (pduRespHandler != null) {
SmppPduResponse smppPduResponse = PduToApiConverter.convertToApiResponse(pduResponse);
if (smppPduResponse != null) {
pduRespHandler.handlePduResponse(smppPduResponse);
}
}
if (referenceObj != null) {
referenceObj.getSequenceIdHolder().checkSentAndReceivedClosed();
}
}
@Override
public PduResponse firePduRequestReceived(PduRequest pduRequest) {
PduRequestHandler pduReqHandler = pduRequestHandler;
PduResponse resultPduResponse = pduRequest.createResponse();
if (pduReqHandler == null) {
return resultPduResponse;
}
PduResponse defaultPduResponse = pduRequest.createResponse();
SmppPduRequest smppPduRequest = PduToApiConverter.convertToApiRequest(pduRequest);
SmppPduResponse defaultSmppPduResponse = PduToApiConverter.convertToApiResponse(defaultPduResponse);
if (smppPduRequest == null || defaultSmppPduResponse == null) {
return resultPduResponse;
}
SmppPduResponse resultSmppPduResponse = pduReqHandler.handlePduRequest(smppPduRequest, defaultSmppPduResponse);
if (resultSmppPduResponse == null) {
return null;
}
PduResponse convertedPduResponse = ApiToPduConverter.convertToPduResponse(resultSmppPduResponse);
if (convertedPduResponse == null) {
return resultPduResponse;
}
if (!resultPduResponse.getClass().isAssignableFrom(convertedPduResponse.getClass())) {
return resultPduResponse;
}
return convertedPduResponse;
}
}
Wich被添加到像这样的小丑client smpp客户端
SmppSession session = smppClient.bind(SmppSessionConfiguration_instance, SmppSessionHandlerController_instance );
我为PduHandler PduRequestHandler和PduResponseHandler定义了自定义接口,它们处理smpp事件的特殊情况,您可以看到SmppSessionHandlerController只是将调用委托给其中之一。
使用方法
public PduResponse firePduRequestReceived(PduRequest pduRequest)
在SmppSessionHandler中,您可以在同步模式下发送任何您想要的响应。如果要在异步模式下执行此操作,则返回空pduResponse并从当前线程或任何其他线程中使用SmppSession.sendResponsePdu(Pdu)。
https://stackoverflow.com/questions/24702356
复制相似问题