首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >用cloudhopper发送异步PDU响应

用cloudhopper发送异步PDU响应
EN

Stack Overflow用户
提问于 2014-07-11 16:24:50
回答 1查看 3.9K关注 0票数 2

我有一个使用CloudHopper5.0.6库来保存SMPP连接(3.4版本)和发送或接收PDU的项目。我需要修改默认的PDUResopnse,以便通过扩展DefaultSmppSessionHandler来组织定制的PDU处理:

代码语言:javascript
运行
复制
 public class SmppSessionHandlerController extends DefaultSmppSessionHandler {

    @Override
    public PduResponse firePduRequestReceived(PduRequest pduRequest) {
        PduRequestHandler pduReqHandler = pduRequestHandler;
        PduResponse resultPduResponse = pduRequest.createResponse();
        return processDefaultPduResponse(resultPduResponse);
    }

    private PduResponse processDefaultPduResponse(PduResponse pduResponse) {
        //do some transformations here on pduResponse...
        return pduResponse;
    }
 }

它只适用于以下目的:

  1. 更改结果命令状态或某些pdu字段/ tlv参数
  2. 不要为当前的PDU请求发送任何响应。为此,firePduRequestReceived方法必须返回

现在我需要添加延迟的PDU响应发送,这里的问题开始了。我的第一次尝试是这样的:

代码语言:javascript
运行
复制
    @Override
    public PduResponse firePduRequestReceived(PduRequest pduRequest) {
        PduRequestHandler pduReqHandler = pduRequestHandler;
        PduResponse resultPduResponse = pduRequest.createResponse();
        return processDefaultPduResponse(resultPduResponse);
    }

    private PduResponse processDefaultPduResponse(PduResponse pduResponse) {
        try {
                Thread.sleep(responseDelay);
            } catch (InterruptedException e) {
                throw new RuntimeException("Response delay interrupted", e);
            }
        return pduResponse;
    }

添加了当前线程的睡眠以延迟发送响应,因此调用线程被保留了responseDelay毫秒。如果没有更多的请求同时出现在这个会话中,这是很好的。在同一个会话中添加一些submit_sm负载会导致错误:

代码语言:javascript
运行
复制
com.cloudhopper.smpp.type.SmppTimeoutException: Unable to get response within [10000 ms]
    at com.cloudhopper.smpp.impl.DefaultSmppSession.sendRequestAndGetResponse(DefaultSmppSession.java:471) ~[ch-smpp-5.0.6.jar:5.0.6]
    at com.cloudhopper.smpp.impl.DefaultSmppSession.enquireLink(DefaultSmppSession.java:439) ~[ch-smpp-5.0.6.jar:5.0.6] 

在coudhopper源中搜索之后,我发现了这个问题,它是DefaultSmppSession类中任何操作的执行窗口锁:

代码语言:javascript
运行
复制
 future = sendWindow.offer(pdu.getSequenceNumber(), pdu, timeoutMillis, configuration.getRequestExpiryTimeout(), synchronous);

问题发生在com.cloudhopper.commons.util.windowing.Window类中,该类使用排他锁执行任何操作,因此不可能在一个线程中返回PRUResponse并从另一个线程发出请求之前等待。

接下来,尝试返回null作为请求处理(在不发送任何响应的情况下删除请求),并使用方法手动发送PDUResponse。这种方法工作了一段时间,但最后总是出现以下异常:

代码语言:javascript
运行
复制
com.cloudhopper.smpp.type.SmppChannelException: null
    at com.cloudhopper.smpp.impl.DefaultSmppSession.sendResponsePdu(DefaultSmppSession.java:581) ~[ch-smpp-5.0.6.jar:5.0.6]
    at com.svzn.autotest.smppclient.impl.cloudhopper.SmppSendingManager.sendPduResponse(SmppSendingManager.java:84) ~[smpp-client-1.0.1.jar:na]
    at com.svzn.autotest.smppclient.impl.cloudhopper.util.SendPduCommand.sendPduResponse(SendPduCommand.java:80) [smpp-client-1.0.1.jar:na]
    at com.svzn.autotest.smppclient.impl.cloudhopper.SmppClientImpl.sendPduResponse(SmppClientImpl.java:91) [smpp-client-1.0.1.jar:na]
    at com.svzn.autotest.example.testng_aggr.lib.smpp.event.BaseEventProcessor$1.run(BaseEventProcessor.java:62) [test-classes/:na]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439) [na:1.6.0_37]
    at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) [na:1.6.0_37]
    at java.util.concurrent.FutureTask.run(FutureTask.java:138) [na:1.6.0_37]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:98) [na:1.6.0_37]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:206) [na:1.6.0_37]
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) [na:1.6.0_37]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) [na:1.6.0_37]
    at java.lang.Thread.run(Thread.java:662) [na:1.6.0_37]
Caused by: org.jboss.netty.handler.timeout.WriteTimeoutException: null
    at org.jboss.netty.handler.timeout.WriteTimeoutHandler.<clinit>(WriteTimeoutHandler.java:79) ~[netty-3.9.0.Final.jar:na]
    at com.cloudhopper.smpp.impl.DefaultSmppClient.createSession(DefaultSmppClient.java:259) ~[ch-smpp-5.0.6.jar:5.0.6]
    at com.cloudhopper.smpp.impl.DefaultSmppClient.doOpen(DefaultSmppClient.java:226) ~[ch-smpp-5.0.6.jar:5.0.6]
    at com.cloudhopper.smpp.impl.DefaultSmppClient.bind(DefaultSmppClient.java:193) ~[ch-smpp-5.0.6.jar:5.0.6]
    at com.svzn.autotest.smppclient.impl.cloudhopper.tasks.RebindTask.run(RebindTask.java:37) ~[smpp-client-1.0.1.jar:na]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439) [na:1.6.0_37]
    at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317) [na:1.6.0_37]
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150) [na:1.6.0_37]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98) [na:1.6.0_37]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180) [na:1.6.0_37]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoo

lExecutor.java:204) [na:1.6.0_37

]
    ... 3 common frames omitted

不知道如何以另一种方式修复此错误或在同一会话中发送异步PDUResponse。你对此有什么想法吗?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2014-07-14 11:10:03

最后,我发现了这个问题。问题发生在不正确的同步块中,它阻止并行异步事件处理(发送pdu响应)和处理请求和响应,而不以通常的方式处理。

在一个线程中调用com.cloudhopper.smpp.SmppSession.sendResponsePdu(pduResponse)方法并通过从另一个线程扩展DefaultSmppSessionHandler来调用请求和响应是完全可以的。所有的事情都会在同一次会议中处理。

更新:这里是我用来处理pdu请求的实现:

代码语言:javascript
运行
复制
public class SmppSessionHandlerController extends DefaultSmppSessionHandler {
    private static final Logger log = LoggerFactory.getLogger(SmppSessionHandlerController.class);

    private volatile PduHandler pduHandler;
private PduResponseHandler pduResponseHandler;
private PduRequestHandler pduRequestHandler;

public SmppSessionHandlerController() {
    super(log);
}

public PduHandler getPduHandler() {
    return pduHandler;
}

public void setPduHandler(PduHandler pduHandler) {
    this.pduHandler = pduHandler;
}

public PduResponseHandler getPduResponseHandler() {
    return pduResponseHandler;
}

public void setPduResponseHandler(PduResponseHandler pduResponseHandler) {
    this.pduResponseHandler = pduResponseHandler;
}

public PduRequestHandler getPduRequestHandler() {
    return pduRequestHandler;
}

public void setPduRequestHandler(PduRequestHandler pduRequestHandler) {
    this.pduRequestHandler = pduRequestHandler;
}

@Override
public void fireExpectedPduResponseReceived(PduAsyncResponse pduAsyncResponse) {
    log.trace("Handling response PDU: {}", pduAsyncResponse);
    pduAsyncResponse.getResponse().setReferenceObject(pduAsyncResponse.getRequest().getReferenceObject());
    processPduResponse(pduAsyncResponse.getResponse());
}


@Override
public void fireUnexpectedPduResponseReceived(PduResponse pduResponse) {
    log.warn("Handling unexpected response PDU: {}", pduResponse);
    processPduResponse(pduResponse);
}

@Override
public boolean firePduReceived(Pdu pdu) {
    PduHandler currPduHandler = pduHandler;
    if (currPduHandler != null) {
        SmppPdu smppPdu = PduToApiConverter.convertToApiObject(pdu);
        currPduHandler.handlePduReceived(smppPdu);
    }
    // default handling is to accept pdu for processing up chain
    return true;
}

public void firePduRequestExpired(PduRequest pduRequest) {
    super.firePduRequestExpired(pduRequest);
}

private void processPduResponse(PduResponse pduResponse) {
    HandlersContextHelper referenceObj = (HandlersContextHelper) pduResponse.getReferenceObject();
    if (referenceObj != null) {
        referenceObj.getSequenceIdHolder().addReceivedSequenceId(pduResponse.getSequenceNumber());
    }
    PduResponseHandler pduRespHandler = pduResponseHandler;
    if (pduRespHandler != null) {
        SmppPduResponse smppPduResponse = PduToApiConverter.convertToApiResponse(pduResponse);
        if (smppPduResponse != null) {
            pduRespHandler.handlePduResponse(smppPduResponse);
        }
    }
    if (referenceObj != null) {
        referenceObj.getSequenceIdHolder().checkSentAndReceivedClosed();
    }
}

@Override
public PduResponse firePduRequestReceived(PduRequest pduRequest) {
    PduRequestHandler pduReqHandler = pduRequestHandler;
    PduResponse resultPduResponse = pduRequest.createResponse();
    if (pduReqHandler == null) {
        return resultPduResponse;
    }
    PduResponse defaultPduResponse = pduRequest.createResponse();
    SmppPduRequest smppPduRequest = PduToApiConverter.convertToApiRequest(pduRequest);
    SmppPduResponse defaultSmppPduResponse = PduToApiConverter.convertToApiResponse(defaultPduResponse);
    if (smppPduRequest == null || defaultSmppPduResponse == null) {
        return resultPduResponse;
    }
    SmppPduResponse resultSmppPduResponse = pduReqHandler.handlePduRequest(smppPduRequest, defaultSmppPduResponse);
    if (resultSmppPduResponse == null) {
        return null;
    }
    PduResponse convertedPduResponse = ApiToPduConverter.convertToPduResponse(resultSmppPduResponse);
    if (convertedPduResponse == null) {
        return resultPduResponse;
    }
    if (!resultPduResponse.getClass().isAssignableFrom(convertedPduResponse.getClass())) {
        return resultPduResponse;
    }
    return convertedPduResponse;
}
}

Wich被添加到像这样的小丑client smpp客户端

代码语言:javascript
运行
复制
SmppSession session = smppClient.bind(SmppSessionConfiguration_instance, SmppSessionHandlerController_instance );

我为PduHandler PduRequestHandler和PduResponseHandler定义了自定义接口,它们处理smpp事件的特殊情况,您可以看到SmppSessionHandlerController只是将调用委托给其中之一。

使用方法

代码语言:javascript
运行
复制
public PduResponse firePduRequestReceived(PduRequest pduRequest) 

在SmppSessionHandler中,您可以在同步模式下发送任何您想要的响应。如果要在异步模式下执行此操作,则返回空pduResponse并从当前线程或任何其他线程中使用SmppSession.sendResponsePdu(Pdu)。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/24702356

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档