Thursday, September 30, 2010

CXF Continuation API Enhancement

From Web 2.0, there are more and more long time connections are used by client to access the back end system, and they will consume lots of thread if the server is working on per thread per request mode. Jetty 6 introduced the Continuation API to resolve the this problem.

Few years ago CXF provides the same kind of continuation API to let the user have a chance to release the request thread if the response is not ready. This feature is very important if you want to keep your application with a good performance and good scalability.


As we did some house clean up from Camel 2.4.0 to make sure the Camel component has the best usage of the Camel Async Route Engine, I did some work on camel-cxf recently. I just found current CXF continuation which suspending is implemented by throw the runtime exception. This implementation has a shortcoming which cannot call the other framework's async API after continuation suspend is called as Jetty7 does. This will introduce a situation that continuation resume will be called before the continuation suspend is called.
To avoid this bad situation which will make suspended continuation never resumed, I had to write my Camel code like this

private Object asyncInvoke(Exchange cxfExchange, final Continuation continuation) {
synchronized (continuation) {
if (continuation.isNew()) {
final org.apache.camel.Exchange camelExchange = perpareCamelExchange(cxfExchange);

// use the asynchronous API to process the exchange
boolean sync = getAsyncProcessor().process(camelExchange, new AsyncCallback() {
public void done(boolean doneSync) {
// make sure the continuation resume will not be called before the suspend method in other thread
synchronized (continuation) {
if (LOG.isTraceEnabled()) {
LOG.trace("Resuming continuation of exchangeId: "
+ camelExchange.getExchangeId());
}
// resume processing after both, sync and async callbacks
continuation.setObject(camelExchange);
continuation.resume();
}
}
});
// just need to avoid the continuation.resume is called
// before the continuation.suspend is called
if (continuation.getObject() != camelExchange && !sync) {
// Now we don't set up the timeout value
if (LOG.isTraceEnabled()) {
LOG.trace("Suspending continuation of exchangeId: "
+ camelExchange.getExchangeId());
}
// The continuation could be called before the suspend
// is called
continuation.suspend(0);
} else {
// just set the response back, as the invoking thread is
// not changed
if (LOG.isTraceEnabled()) {
LOG.trace("Processed the Exchange : " + camelExchange.getExchangeId());
}
setResponseBack(cxfExchange, camelExchange);
}

}
if (continuation.isResumed()) {
org.apache.camel.Exchange camelExchange = (org.apache.camel.Exchange)continuation
.getObject();
setResponseBack(cxfExchange, camelExchange);

}
}
return null;
}



As you know CXF has lot of interceptor which can chained together as a SOAP stack and doing the WS* work, if we can suspend the interceptor chain, it will be easy for us to resume it from other thread. You can find a handy example from OneWayProcessorInterceptor.
My enhancement on CXF continuation API by introducing a suspend status into CXF PhaseInterceptorChain. The implementation of the CXF continuation API set the status of PhaseInterceptorChain, then the interceptor calling thread will be broke out after the interceptor handle method returns. Now my Camel code would be more easy to understand

private Object asyncInvoke(Exchange cxfExchange, final Continuation continuation) {
if (continuation.isNew()) {
final org.apache.camel.Exchange camelExchange = perpareCamelExchange(cxfExchange);
// Now we don't set up the timeout value
if (LOG.isTraceEnabled()) {
LOG.trace("Suspending continuation of exchangeId: "
+ camelExchange.getExchangeId());
}
// now we could call the suspend here
continuation.suspend(0);

// use the asynchronous API to process the exchange
getAsyncProcessor().process(camelExchange, new AsyncCallback() {
public void done(boolean doneSync) {
if (LOG.isTraceEnabled()) {
LOG.trace("Resuming continuation of exchangeId: "
+ camelExchange.getExchangeId());
}
// resume processing after both, sync and async callbacks
continuation.setObject(camelExchange);
continuation.resume();
}
});

}
if (continuation.isResumed()) {
org.apache.camel.Exchange camelExchange = (org.apache.camel.Exchange)continuation
.getObject();
setResponseBack(cxfExchange, camelExchange);

}
return null;
}

The enhanced CXF continuation API patch is committed into CXF trunk and it is part of Fuse Service FrameWork 2.3.0 now.

1 comment:

  1. I just found a issue[1] with the CXF JMSContinuation. It doesn't break out the interceptor chain if the continuation is resumed just after the suspend is called in the same thread.

    [1]https://issues.apache.org/jira/browse/CAMEL-6084

    ReplyDelete