Call async service which return DeferredResults, multiple times without increasing execution time

  • A+
Category:Languages

my applications should have 2 core endpoints: push, pull for sending and fetching data.

Pull operation should works asynchronously and result DeferredResult. When user call pull service over rest, new DefferedResult is created and stored into Map<Long, DefferedResult> results = new ConcurrentHashMap<>() where is waiting for new data or until timeout is expired.

Push operation call user over rest as well, and this operation checks map of results for recipient of data pushed by this operation. When map contains result of recipient, these data are set to his result, DefferedResult is returned.

Here is base code:

@Service public class FooServiceImpl {     Map<Long, DefferedResult> results = new ConcurrentHashMap<>();      @Transactional     @Override     public DeferredResult<String> pull(Long userId) {         // here is database call, String data = fooRepository.getNewData(); where I check if there are some new data in database, and if there are, just return it, if not add deferred result into collection to wait for it         DeferredResult<String> newResult = new DeferredResult<>(5000L);         results.putIfAbsent(userId, newResult);         newResult.onCompletion(() -> results.remove(userId));          // if (data != null)         //      newResult.setResult(data);          return newResult;     }      @Transactional     @Override     public void push(String data, Long recipientId) {         // fooRepository.save(data, recipientId);         if (results.containsKey(recipientId)) {             results.get(recipientId).setResult(data);         }     } } 

Code is working as I expected problem is that should also works for multiple users. I guess the max active users which will call pull operation will max 1000. So every call of pull take max 5 seconds as I set in DefferedResult but it isn't.

As you can see in image, if I immediately call rest of pull operation from my javascript client multiple times you can see that tasks will executed sequentially instead of simultaneously. Tasks which I fired as last take about 25 seconds, but I need that when 1000 users execute at same time pull operation, that operation should take max 5 seconds + latency.

Call async service which return DeferredResults, multiple times without increasing execution time

How to configure my app to execute these tasks simultaneously and ensure each each task will about 5 seconds or less (when another user send something to waiting user)? I tried add this configuration into property file:

server.tomcat.max-threads=1000 

and also this configuration:

@Configuration public class AsyncConfig extends AsyncSupportConfigurer {      @Override     protected AsyncTaskExecutor getTaskExecutor() {         ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();         taskExecutor.setCorePoolSize(1000);         taskExecutor.initialize();         return taskExecutor;     } } 

But it didn't help, still same result. Can you help me configure it please? Thanks in advice.

EDIT:

This is how I calling this service from angular:

this.http.get<any>(this.url, {params})   .subscribe((data) => {     console.log('s', data);   }, (error) => {     console.log('e', error);   }); 

When I tried call it multiple times with pure JS code like this:

function httpGet() {     var xmlHttp = new XMLHttpRequest();     xmlHttp.open( "GET", 'http://localhost:8080/api/pull?id=1', true );     xmlHttp.send( null );     return xmlHttp.responseText; } setInterval(httpGet, 500); 

it will execute every request call much faster (about 7 seconds). I expected that increasing is caused database calling in service, but it still better than 25 sec. Do I have something wrong with calling this service in angular?

EDIT 2:

I tried another form of testing and instead of browser I used jMeter. I execute 100 requests in 100 threads and here is result:

Call async service which return DeferredResults, multiple times without increasing execution time

As you can see requests will be proceed by 10, and after reach 50 requests application throw exception:

java.sql.SQLTransientConnectionException: HikariPool-1 - Connection is not available, request timed out after 30000ms.     at com.zaxxer.hikari.pool.HikariPool.createTimeoutException(HikariPool.java:667) ~[HikariCP-2.7.8.jar:na]     at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:183) ~[HikariCP-2.7.8.jar:na]     at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:148) ~[HikariCP-2.7.8.jar:na]     at com.zaxxer.hikari.HikariDataSource.getConnection(HikariDataSource.java:128) ~[HikariCP-2.7.8.jar:na]     at org.hibernate.engine.jdbc.connections.internal.DatasourceConnectionProviderImpl.getConnection(DatasourceConnectionProviderImpl.java:122) ~[hibernate-core-5.2.16.Final.jar:5.2.16.Final]     at org.hibernate.internal.NonContextualJdbcConnectionAccess.obtainConnection(NonContextualJdbcConnectionAccess.java:35) ~[hibernate-core-5.2.16.Final.jar:5.2.16.Final]     at org.hibernate.resource.jdbc.internal.LogicalConnectionManagedImpl.acquireConnectionIfNeeded(LogicalConnectionManagedImpl.java:106) ~[hibernate-core-5.2.16.Final.jar:5.2.16.Final]     at org.hibernate.resource.jdbc.internal.LogicalConnectionManagedImpl.getPhysicalConnection(LogicalConnectionManagedImpl.java:136) ~[hibernate-core-5.2.16.Final.jar:5.2.16.Final]     at org.hibernate.internal.SessionImpl.connection(SessionImpl.java:523) ~[hibernate-core-5.2.16.Final.jar:5.2.16.Final]     at sun.reflect.GeneratedMethodAccessor61.invoke(Unknown Source) ~[na:na]     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_171]     at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_171]     at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:223) ~[spring-core-5.0.5.RELEASE.jar:5.0.5.RELEASE]     at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:207) ~[spring-core-5.0.5.RELEASE.jar:5.0.5.RELEASE]     at org.springframework.orm.jpa.vendor.HibernateJpaDialect$HibernateConnectionHandle.doGetConnection(HibernateJpaDialect.java:391) ~[spring-orm-5.0.5.RELEASE.jar:5.0.5.RELEASE]     at org.springframework.orm.jpa.vendor.HibernateJpaDialect.beginTransaction(HibernateJpaDialect.java:154) ~[spring-orm-5.0.5.RELEASE.jar:5.0.5.RELEASE]     at org.springframework.orm.jpa.JpaTransactionManager.doBegin(JpaTransactionManager.java:400) ~[spring-orm-5.0.5.RELEASE.jar:5.0.5.RELEASE]     at org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:378) ~[spring-tx-5.0.5.RELEASE.jar:5.0.5.RELEASE]     at org.springframework.transaction.interceptor.TransactionAspectSupport.createTransactionIfNecessary(TransactionAspectSupport.java:474) ~[spring-tx-5.0.5.RELEASE.jar:5.0.5.RELEASE]     at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:289) ~[spring-tx-5.0.5.RELEASE.jar:5.0.5.RELEASE]     at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98) ~[spring-tx-5.0.5.RELEASE.jar:5.0.5.RELEASE]     at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185) ~[spring-aop-5.0.5.RELEASE.jar:5.0.5.RELEASE]     at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:92) ~[spring-aop-5.0.5.RELEASE.jar:5.0.5.RELEASE]     at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185) ~[spring-aop-5.0.5.RELEASE.jar:5.0.5.RELEASE]     at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689) ~[spring-aop-5.0.5.RELEASE.jar:5.0.5.RELEASE]     at sk.moe.zoya.service.impl.FooServiceImpl$$EnhancerBySpringCGLIB$$ebab570a.pull(<generated>) ~[classes/:na]     at sk.moe.zoya.web.FooController.pull(FooController.java:25) ~[classes/:na]     at sun.reflect.GeneratedMethodAccessor60.invoke(Unknown Source) ~[na:na]     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_171]     at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_171]     at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:209) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]     at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:136) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]     at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:102) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]     at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:877) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]     at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:783) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]     at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]     at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:991) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]     at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:925) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]     at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:974) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]     at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:866) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]     at javax.servlet.http.HttpServlet.service(HttpServlet.java:635) ~[tomcat-embed-core-8.5.29.jar:8.5.29]     at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:851) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]     at javax.servlet.http.HttpServlet.service(HttpServlet.java:742) ~[tomcat-embed-core-8.5.29.jar:8.5.29]     at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231) ~[tomcat-embed-core-8.5.29.jar:8.5.29]     at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]     at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52) ~[tomcat-embed-websocket-8.5.29.jar:8.5.29]     at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.29.jar:8.5.29]     at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]     at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]     at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]     at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.29.jar:8.5.29]     at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]     at org.springframework.web.filter.HttpPutFormContentFilter.doFilterInternal(HttpPutFormContentFilter.java:109) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]     at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]     at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.29.jar:8.5.29]     at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]     at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:81) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]     at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]     at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.29.jar:8.5.29]     at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]     at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:200) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]     at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]     at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.29.jar:8.5.29]     at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]     at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:198) ~[tomcat-embed-core-8.5.29.jar:8.5.29]     at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96) [tomcat-embed-core-8.5.29.jar:8.5.29]     at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:496) [tomcat-embed-core-8.5.29.jar:8.5.29]     at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:140) [tomcat-embed-core-8.5.29.jar:8.5.29]     at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:81) [tomcat-embed-core-8.5.29.jar:8.5.29]     at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:87) [tomcat-embed-core-8.5.29.jar:8.5.29]     at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:342) [tomcat-embed-core-8.5.29.jar:8.5.29]     at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:803) [tomcat-embed-core-8.5.29.jar:8.5.29]     at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66) [tomcat-embed-core-8.5.29.jar:8.5.29]     at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:790) [tomcat-embed-core-8.5.29.jar:8.5.29]     at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1459) [tomcat-embed-core-8.5.29.jar:8.5.29]     at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) [tomcat-embed-core-8.5.29.jar:8.5.29]     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_171]     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_171]     at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-8.5.29.jar:8.5.29]     at java.lang.Thread.run(Thread.java:748) [na:1.8.0_171]  2018-06-02 13:21:47.163  WARN 26978 --- [io-8080-exec-48] o.h.engine.jdbc.spi.SqlExceptionHelper   : SQL Error: 0, SQLState: null 2018-06-02 13:21:47.163  WARN 26978 --- [io-8080-exec-40] o.h.engine.jdbc.spi.SqlExceptionHelper   : SQL Error: 0, SQLState: null 2018-06-02 13:21:47.163 ERROR 26978 --- [io-8080-exec-48] o.h.engine.jdbc.spi.SqlExceptionHelper   : HikariPool-1 - Connection is not available, request timed out after 30000ms. 2018-06-02 13:21:47.163 ERROR 26978 --- [io-8080-exec-40] o.h.engine.jdbc.spi.SqlExceptionHelper   : HikariPool-1 - Connection is not available, request timed out after 30000ms. 2018-06-02 13:21:47.164 ERROR 26978 --- [io-8080-exec-69] o.a.c.c.C.[.[.[/].[dispatcherServlet]    : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.springframework.transaction.CannotCreateTransactionException: Could not open JPA EntityManager for transaction; nested exception is org.hibernate.exception.JDBCConnectionException: Unable to acquire JDBC Connection] with root cause 

I also comment code where I use Repositories to ensure there is nothing with database, and same result. ALso I set uniqe userId for each request with AtomicLong class.

EDIT 3:

I find out when I comment also @Transactional everything works fine! So can you tell me how to set spring's transactions for large amount of operations without increasing delay?

I added spring.datasource.maximumPoolSize=1000 to increase pool size which I guess shoulds, so the only problem is how to speed up methods with @Transactional.

Every call to pull method is annotated with @Transactional because I need at first load data from database and check if there are new data, because yes, I do not have to do creating waiting deferred result. push methods have to be annotation with @Transaction as well, because there I need at first store received data in database and next set that value to waiting results. For my data I am using Postgres.

 


It seems the problem here is that you're running out of connections in the database pool.

You have your method tagged with @Transaction but your controller is also expecting the result of the method i.e. the DeferredResult to be delivered as soon as possible such that the thread is set free.

Now, this is what happens when you run a request:

  • The @Transaction functionality is implemented in a Spring proxy which must open a connection, call your subject method and then commit or rollback the transaction.
  • Therefore, when your controller invokes the fooService.pull method, it is in fact calling a proxy.
  • The proxy must first request a connection from the pool, then it invokes your service method, which within that transaction does some database operation. Finally, it must commit or rollback the transaction and finally return the connection to the pool.
  • After all this your method returns a DeferredResult, which is then passed to the controller for it to return.

Now, the problem is that DeferredResult is designed in such a way that it should be used asynchronously. In other words, the promise is expected to be resolved later in some other thread, and we are supposed to free the request thread as soon as possible.

In fact, Spring documentation on DeferredResult says:

@GetMapping("/quotes") @ResponseBody public DeferredResult<String> quotes() {     DeferredResult<String> deferredResult = new DeferredResult<String>();     // Save the deferredResult somewhere..     return deferredResult; }  // From some other thread... deferredResult.setResult(data); 

The problem in your code is precisely that the DeferredResult is being solved in the same request thread.

So, the thing is that when the Spring proxy requests a connection to the database pool, when you do your heavy load tests, many requests will find the pool is full and does not have connections available. So the request is put on hold, but at that point your DeferredResult has not been created yet, so its timeout functionality does not exist.

Your request is basically waiting for some connection from the database pool to become available. So, let's say 5 seconds pass, then the request gets a connection, and now you get DeferredResult which the controller uses to handle the response. Eventually, 5 seconds later it timeout. So you have to add your time waiting for a connection from the pool and your time waiting for the DeferredResult to get resolved.

That's why you probably see that, when you test with JMeter, the request time gradually increases as connections get depleted from the database pool.

You can enable some logging for the thread pool by adding the following your application.properties file:

logging.level.com.zaxxer.hikari=DEBUG 

You could also configure the size of your database pool and even add some JMX support such that you can monitor it from Java Mission Control:

spring.datasource.hikari.maximumPoolSize=10 spring.datasource.hikari.registerMbeans=true 

Using JMX support you will be able to see how the database pool gets depleted.

The trick here consists in moving the logic that resolves the promise to another thread:

@Override public DeferredResult pull(Long previousId, String username) {       DeferredResult result = createPollingResult(previousId, username);      CompletableFuture.runAsync(() -> {         //this is where you encapsulate your db transaction         List<MessageDTO> messages = messageService.findRecents(previousId, username); // should be final or effective final         if (messages.isEmpty()) {            pollingResults.putIfAbsent(username, result);         } else {            result.setResult(messages);         }     });      return result; } 

By doing this, your DeferredResult is returned immediately and Spring can do its magic of asynchronous request handling while it sets free that precious Tomcat thread.

Comment

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen: