Coder Thoughts on software, technology and programming.

Piotr Mionskowski

  • Multi tenancy task scheduler

    04 January 2018

    Last time I showed how to extend Spring default request handler adapter so that we are able to schedule or reject incoming requests. The goal of the TenantTaskCoordinator is to:

    • queue requests for processing
    • limit the maximum number of concurrently processed requests
    • reject requests after the maximum queue size is reached
    • interrupt processing of a request upon an upstream subscription disposal

    Assigning resources

    Tenant task coordinator execute method

    Our entry point into TenantTaskCoordinator is a single method fun <T : Any> execute(job: Callable<T>): Mono<T>:

        fun <T : Any> execute(job: Callable<T>): Mono<T> {
            return Mono.create({ outsideSink ->
                val _workInProgressWasDecremented = AtomicBoolean(false)
                fun decrementOnce() {
                    if (_workInProgressWasDecremented.compareAndSet(false, true)) {
                        currentWorkInProgressCounter.decrementAndGet()
                    }
                }
    
                val workInProgress = currentWorkInProgressCounter.incrementAndGet()
                if (workInProgress > maximumWorkInProgress) {
                    outsideSink.error(TooManyTasks("Work in progress $workInProgress exceeds $maximumWorkInProgress jobs in $name"))
                    decrementOnce()
                } else {
                    val singleJob = Mono.fromCallable(job).doAfterTerminate {
                        decrementOnce()
                    }
    
                    val delayedTask = Task(name, singleJob as Mono<Any>, outsideSink as MonoSink<Any>)
    
                    outsideSink.onCancel {
                        delayedTask.outsideCancel()
                        decrementOnce()
                    }
                    
                    taskSink.next(delayedTask)
                }
            })
        }
    

    The first step is to return Mono<T> which is simply done with Mono.create. The sink we get passed is used to control the outcome observed from outside. It also allows for registering an onCancel callback invoked when the upstream cancels its subscription.

    The _workInProgressWasDecremented is used to guard and decrement the currentWorkInProgressCounter in a thread safe fashion. We first check whether we have immediately exceeded the maximum number of queued jobs. If the threshold is reached, we notify the observer about the error with outsideSink.error.

    If we have enough capacity to a perform job, we convert it to a reactive world with Mono.fromCallable and attach a doAfterTerminate callback that decrements the work in progress counter. The Task class links singleJob and outsideSink so that they are both accessible while processing. Finally, we schedule the task through taskSink.next(delayedTask).

    Task coordinator state

    Let’s have a look at the task coordinator state variables and how they are initialized:

    class TenantTaskCoordinator(private val scheduler: Scheduler,
                                val maximumConcurrency: Int = 1,
                                val maximumQueueSize: Int = 50,
                                val name: String = "") : AutoCloseable {
    
        private val maximumWorkInProgress = maximumQueueSize + maximumConcurrency
    
        private val maxBufferSize = maximumWorkInProgress * 2
    
        val currentWorkInProgressCounter = AtomicInteger()
    
        private lateinit var taskSink: FluxSink<Task>
    
        private val taskSource = Flux.create<Task>({ taskSink = it }, FluxSink.OverflowStrategy.BUFFER)
    
        private val processSinkOnErrorResume = processSinkWithLimitedConcurrency()
            .onErrorResume { error: Throwable? ->
                LOG.warn("name={} Error processing sink with limited concurrency", name, error)
                processSinkWithLimitedConcurrency()
            }
    

    The first interesting part is how we setup taskSink by using Flux.create. For clarity, we explicitly pass FluxSink.OverflowStrategy.BUFFER so that tasks are buffered in case they outpace the processor. The name is used to get better log messages. Finally, we call processSinkWithLimitedConcurrency to start task processing using the given scheduler. Interestingly the onErrorResume restarts the processing in case we have a bug.

    Task coordinator concurrent processing

    The most important and tricky to figure out part is to correctly process jobs. It took me several back and forth steps until I got the order of reactive API calls right.

        private fun processSinkWithLimitedConcurrency(): Flux<Any> {
            return taskSource
                .filter { !it.isCancelled }
                .flatMap({ task ->
                    task.work
                        .doOnError(task::onError)
                        .doOnSuccess(task::onSuccess)
                        .subscribeOn(scheduler)
                        .timeout(task.outsideTimeout)
                        .onErrorReturn(task)
                }, maximumConcurrency, maxBufferSize)
        }
    

    First, we filter out tasks that are already cancelled. Then, we use flatMap overload to process tasks with given maximum concurrency. The flatMap callback delegates most of the work to the mentioned Task instance. The onErrorReturn effectively suppresses any errors that might occur during task execution. Let’s see how the inner Task class looks like:

    private data class Task(val name: String,
                                private val job: Mono<Any>,
                                val outsideSink: MonoSink<Any>,
                                @field:Volatile var isCancelled: Boolean = false) {
    
            val work: Mono<Any> get() = if (isCancelled) Mono.empty() else job
    
            lateinit var outsideTimeoutSink: MonoSink<Task>
            val outsideTimeout = Mono.create<Task> { outsideTimeoutSink = it }
    
            fun outsideCancel() {
                isCancelled = true
                outsideTimeoutSink.success(this)
            }
    
            fun onSuccess(result: Any?) {
                outsideSink.success(result)
            }
            
            fun onError(error: Throwable) {
                LOG.warn("Task.onError {}", this, error)
                outsideSink.error(error)
            }
        }
    

    The job argument is the Callable passed to the execute method. The outsideTimeout signals when the task instance subscription is cancelled. The signal is propagated inside processSinkWithLimitedConcurrency with a Mono.timeout call and breaks the task processing. Last but not least the onSuccess and onError simply push the result or error to the outsideSink effectively notifying the observer of the result.

    The TenantTaskCoordinator was not simple to figure out given the requirements mentioned at the begging of the post. I’m pleased with the final result although I must say it was not intuitive to figure out how to combine all the nuts and bolts of Reactor library to achieve the desired outcome.

  • Multi tenancy in Spring MVC

    12 December 2017

    One of our clients aimed to replace old, often DOS based, point of sale systems with a cloud based, SaaS modeled solution. At Bright Inventions we have developed all required components including AWS based back-end processing requests originating from multiple clients. Each business that uses the SaaS point of sale can be considered a tenant in a multi-tenant environment. There many aspects involved when developing multi-tenant application with data isolation and partitioning being the most discussed topic. However, today I would like to focus on computational and resource isolation aspect.

    Multiple consumers

    Controlled resource usage

    In the discussed case each tenant would have multiple iOS based API clients. The exact number varies from 1 to couple of dozens. Each iOS application would be open constantly throughout a sales day and execute frequent requests against the back-end API. In the iOS application there was a code that polls the server for data changes in frequent and regular intervals. Unfortunately a bug slipped through a code review and caused the app to ask the server for changes around 50 times per second instead of once in half of a minute. The bug caused an explosion of API requests issued by a single API client with a throughput 10 to 100 times bigger than expected. To make things worse the rate at which the bug increased polling frequency exceeded back-end the scaling out policy. Soon all request processing threads were busy processing requests issued by only a small percentile of API clients.

    In a multi-tenant application one needs to take special care to prevent tenant “A” from affecting, even indirectly, tenant “B” operations. We have failed that requirement on the CPU/thread pool level and that caused the support lines to be hot.

    Reverse proxy request rate limit

    The first solution that comes to mind is to apply a per API client request rate limiting. In fact this solution is so common that it is available as a configuration opt-in in many servers. For instance in NGINX you could do:

    limit_req_zone $binary_remote_addr zone=mylimit:10m rate=10r/s;
    
    server {
        location /login/ {
            limit_req zone=mylimit burst=20;
    
            proxy_pass http://my_upstream;
        }
    }
    

    The above would only allow up to 10 request per second from the same IP address. Any request that comes in at higher rate would be queued up to specified capacity (burst=20). Any request above the limit would get rejected with 503 status code.

    The nginx approach is battle tested and fairly easy to apply. Instead of using IP address it would be better to group requests by a tenant identifier. However, it may not be easy to determine exactly which tenant is making the request unless the information is easily available in the request headers. For that matter it is good to consider sending the API client identification using a custom HTTP header. For instance if the API client provides X-Tenant-Id: tenant.1 you can use it as limit_req_zone $http_x_tenant_id zone=mylimit:10m rate=10r/s;. When using JWT, you often can determine who is making the request by parsing the Authorization header value.

    Spring MVC request rate limit

    It is often not feasible to apply the request rate limit at the reverse proxy level. In such scenario we can apply the limit inside Spring MVC application. For start one can try suing Servlet Filter. There are several solutions available including a DoSFilter that is part of Jetty project.

    Using a ready-made Servlet Filter is often sufficient especially when the available customization options suit our needs. In case of our client however, we wanted the limits to depend on the size of the client. In other words the more service you buy, the more resources are available to you. Moreover, I wanted to have a have fine-grained control at a controller action level. To my surprise such behavior was not easy to accomplish using AsyncHandlerInterceptor. Fortunately I did find a way to achieve a desired result using a mix of extensibility points and hacks.

    The first step is to customize RequestMappingHandlerAdapter used by Spring MVC to transform @RequestMapping annotation into handler classes. The following configuration class in Kotlin achieves just that:

    @Configuration
    class WebMvcConfiguration : DelegatingWebMvcConfiguration() {
    
        @Autowired(required = false)
        private val mvcProperties: WebMvcProperties? = null
    
        @Inject lateinit var reactiveRequestCommandFactory: ReactiveRequestCommandFactory
        @Inject lateinit var reactiveRequestsProperties: ReactiveRequestsConfiguration.RequestsProperties
    
        @Bean
        override fun requestMappingHandlerAdapter(): RequestMappingHandlerAdapter {
            //copy pasted from WebMvcConfigurationSupport
            val argumentResolvers = ArrayList<HandlerMethodArgumentResolver>()
            addArgumentResolvers(argumentResolvers)
    
            val returnValueHandlers = ArrayList<HandlerMethodReturnValueHandler>()
            addReturnValueHandlers(returnValueHandlers)
    
            val adapter = RateLimitingRequestMappingHandlerAdapter(reactiveRequestCommandFactory, reactiveRequestsProperties)
    
            adapter.setContentNegotiationManager(mvcContentNegotiationManager())
            adapter.messageConverters = messageConverters
            adapter.webBindingInitializer = configurableWebBindingInitializer
            adapter.customArgumentResolvers = argumentResolvers
    
            adapter.customReturnValueHandlers = returnValueHandlers
    
            val requestBodyAdvices = ArrayList<RequestBodyAdvice>()
            requestBodyAdvices.add(JsonViewRequestBodyAdvice())
            adapter.setRequestBodyAdvice(requestBodyAdvices)
    
            val responseBodyAdvices = ArrayList<ResponseBodyAdvice<*>>()
            responseBodyAdvices.add(JsonViewResponseBodyAdvice())
            adapter.setResponseBodyAdvice(responseBodyAdvices)
    
            configureAsync(adapter)
    
    
            adapter.setIgnoreDefaultModelOnRedirect(mvcProperties?.isIgnoreDefaultModelOnRedirect != false)
            return adapter
        }
    
        private fun configureAsync(adapter: RequestMappingHandlerAdapter) {
            //expose field publicly
            val configurer = object : AsyncSupportConfigurer() {
                public override fun getTaskExecutor() = super.getTaskExecutor()
                public override fun getTimeout() = super.getTimeout()
                public override fun getCallableInterceptors() = super.getCallableInterceptors()
                public override fun getDeferredResultInterceptors() = super.getDeferredResultInterceptors()
            }
    
            configureAsyncSupport(configurer)
    
            if (configurer.taskExecutor != null) {
                adapter.setTaskExecutor(configurer.taskExecutor)
            }
    
            if (configurer.timeout != null) {
                adapter.setAsyncRequestTimeout(configurer.timeout!!)
            }
    
            adapter.setCallableInterceptors(configurer.callableInterceptors)
            adapter.setDeferredResultInterceptors(configurer.deferredResultInterceptors)
        }
    }
    

    Note that we are injecting reactiveRequestCommandFactory and reactiveRequestsProperties and pass them into our core RateLimitingRequestMappingHandlerAdapter. All other code is a mostly a copy-paste from DelegatingWebMvcConfiguration base class.

    @Target(AnnotationTarget.CLASS, AnnotationTarget.FUNCTION)
    annotation class RequestCommand(
        val enabled: Boolean = true,
        val timeoutInMillis: Int = 60_000
    )
    
    class RateLimitingRequestMappingHandlerAdapter(private val reactiveRequestCommandFactory: ReactiveRequestCommandFactory,
                                                   private val reactiveRequestProperties: ReactiveRequestsConfiguration.RequestsProperties) : RequestMappingHandlerAdapter() {
        private val handlerMethodConfigurationsCache = ConcurrentHashMap<HandlerMethod, RequestCommandConfiguration>()
    
        override fun createInvocableHandlerMethod(handlerMethod: HandlerMethod): ServletInvocableHandlerMethod? {
            val configuration = requestCommandConfigurationFor(handlerMethod)
    
            return when {
                configuration.enabled && reactiveRequestProperties.enabled -> CommandInvocableHandlerMethod(handlerMethod, reactiveRequestCommandFactory, configuration)
                else -> super.createInvocableHandlerMethod(handlerMethod)
            }
        }
    
        private fun requestCommandConfigurationFor(handlerMethod: HandlerMethod): RequestCommandConfiguration {
            return handlerMethodConfigurationsCache.getOrPut(handlerMethod) {
                val method = handlerMethod.getMethodAnnotation(RequestCommand::class.java)
                val methodOrController = method ?: AnnotatedElementUtils.findMergedAnnotation(handlerMethod.beanType, RequestCommand::class.java)
                methodOrController?.let { RequestCommandConfiguration(it) } ?: RequestCommandConfiguration.Default
            }
        }
    }
    

    Inside of createInvocableHandlerMethod we get the configuration for the handlerMethod determined by Spring MVC. The handlerMethod denotes a controller action. Then we decide if we should use the rate limiting handler or fallback to the default one. In case we need to apply rate limiting we switch the invocation to use custom CommandInvocableHandlerMethod:

    class CommandInvocableHandlerMethod(private val handlerMethod: HandlerMethod,
                                        private val requestCommandFactory: RequestCommandFactory,
                                        private val configuration: RequestCommandConfiguration) : ServletInvocableHandlerMethod(handlerMethod) {
        private lateinit var returnValueHandlers: HandlerMethodReturnValueHandlerComposite
    
        override fun invokeForRequest(request: NativeWebRequest?, mavContainer: ModelAndViewContainer?, vararg providedArgs: Any?): Any {
            // same as super.invokeForRequest(request, mavContainer, *providedArgs)
            // but with request passed to do invoke
            val args = this.getMethodArgumentValuesCallable.invoke(request, mavContainer, providedArgs)
            val result = doInvokeWithRequest(request, args)
            return result
        }
    
        private fun doInvokeWithRequest(request: NativeWebRequest?, args: Array<out Any?>?): Any {
            val nativeRequest = request?.getNativeRequest(HttpServletRequest::class.java)
    
            // If the response has already set error status code tomcat will not wait for async result
            return if (nativeRequest != null && nativeRequest.dispatcherType == DispatcherType.REQUEST) {
                val callSuper = Callable {
                    super.doInvoke(*(args ?: emptyArray()))
                }
    
                val job = callSuper
    
                val context = RequestCommandContext(configuration, handlerMethod, SecurityContextHolder.getContext(), job)
    
                val result = requestCommandFactory.createSingle(context)
    
                MonoDeferredResult(result)
            } else {
                super.doInvoke(*(args ?: emptyArray()))
            }
        }
    
        override fun setHandlerMethodReturnValueHandlers(returnValueHandlers: HandlerMethodReturnValueHandlerComposite?) {
            this.returnValueHandlers = returnValueHandlers!!
            super.setHandlerMethodReturnValueHandlers(returnValueHandlers)
        }
    
        override fun wrapConcurrentResult(result: Any?): ServletInvocableHandlerMethod {
            return ConcurrentResultHandlerMethod(result, ConcurrentResultMethodParameter(result))
        }
    
    ...
    
    

    The above code is using private getMethodArgumentValues API to achieve the desired behavior‼ The doInvokeWithRequest checks if an asynchronous dispatch should be performed and if so creates a Mono that denotes the result of the controller action method invocation. RequestCommandContext stores the information about target controller action method and current security context. The security context needs to be preserved when invoking the controller action on a different thread. The ConcurrentResultHandlerMethod extends ServletInvocableHandlerMethod to add support for using Mono on regular, synchronous controller action. The core logic of rate limiting is delegated to ReactiveRequestCommandFactory:

    interface ReactiveRequestCommandFactory {
        fun createSingle(context: RequestCommandContext): Mono<Optional<Any>>
    }
    

    The factory responsibilty it to convert a request context into an async result. Spring MVC 5 has built in support for Reactor hence we decided to use this implementation of Reactive Streams specification. The ReactiveRequestCommandFactory looks as follows:

    @Component
    class ReactorRequestCommandFactory(
        threadPoolPropertiesCalculator: ThreadPoolPropertiesCalculator,
        @param:Named("reactiveRequestsScheduler")
        private val reactiveRequestsScheduler: Schedule
    ) : ReactiveRequestCommandFactory {
        private val threadPoolPropertiesCalculator = HystrixConfigurationAwarePropertiesCalculator(threadPoolPropertiesCalculator)
        private val tenants = ConcurrentHashMap<String, TenantTaskCoordinator>()
    
        override fun createSingle(context: RequestCommandContext): Mono<Optional<Any>> {
            val properties = threadPoolPropertiesCalculator.newThreadPoolProperties(context)
    
            val taskCoordinator = tenants.computeIfAbsent(properties.threadPoolName) {
                TenantTaskCoordinator(reactiveRequestsScheduler,
                    maximumConcurrency = properties.maximumThreads,
                    maximumQueueSize = properties.maximumQueueSize,
                    name = properties.threadPoolName
                )
            }
    
            val optionalCallable = OptionalCallable(context.job)
            val configureRequestAttributes = SpringServletRequestAttributesCallable(optionalCallable)
            val configureLocale = SpringLocaleContextCallable(configureRequestAttributes)
            val securityCallable = DelegatingSecurityContextCallable(configureLocale, context.securityContext)
    
            return taskCoordinator.execute(securityCallable)
                .timeout(Duration.ofMillis(context.configuration.timeoutInMillis.toLong()))
        }
    }
    
    class OptionalCallable(private val inner: RequestHandlerJob) : Callable<Optional<Any>> {
        override fun call(): Optional<Any> = Optional.ofNullable(inner.call())
    }
    

    The ThreadPoolPropertiesCalculator calculates how concurrent threads and how big the requests queue should be for particular tenant or tenants group. Then for each tenant group, in particular a single tenant, we create a TenantTaskCoordinator responsible for calculating and enforcing limits on concurrently handled requests. Further down we decorate the Callable representing the actual request handling with security delegation, locale configuration and request attributes setup. Finally, we ask the TenantTaskCoordinator to execute the decorated job with a configured timeout.

    The last piece of the puzzle, namely TenantTaskCoordinator requires a separate blog post so stay tuned.

  • Request timeouts in Spring MVC

    28 November 2017

    As we saw previously, we only have limited options to configure maximum time a request processing can take in Spring MVC. In this post I will show how to enforce such timeout through a custom Servlet Filter.

    Late request

    Request timeout Servlet Filter

    Without further ado let us dive right into a sample filter implementation in Kotlin:

    @Component
    @Order(Ordered.HIGHEST_PRECEDENCE)
    class TimeoutFilter : OncePerRequestFilter() {
        override fun doFilterInternal(request: HttpServletRequest, response: HttpServletResponse, filterChain: FilterChain) {
            val completed = AtomicBoolean(false)
            val requestHandlingThread = Thread.currentThread()
            val timeout = timeoutsPool.schedule({
                if (completed.compareAndSet(false, true)) {
                    requestHandlingThread.interrupt()
                }
            }, 5, TimeUnit.SECONDS)
    
            try {
                filterChain.doFilter(request, response)
                timeout.cancel(false)
            } finally {
                completed.set(true)
            }
        }
    
        companion object {
            private val timeoutsPool = Executors.newScheduledThreadPool(10)
        }
    
        @ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR)
        class TimeoutException(message: String) : java.util.concurrent.TimeoutException(message)
    }
    

    The above code declares a Servlet Filter that will interrupt thread processing a request after 5 seconds. There are couple of interesting points about how it works.

    • @Order(Ordered.HIGHEST_PRECEDENCE) puts the Filter at the beginning of filter chain
    • val completed = AtomicBoolean(false) denotes whether the request processing completed.
    • val timeoutsPool = Executors.newScheduledThreadPool(10) creates a thread pool responsible for running timeouts. The newScheduledThreadPool creates a thread pool that is efficient at running delayed tasks.
    • timeoutsPool.schedule({ ... }) schedules a code that will interrupt requestHandlingThread after 5 seconds
    • completed.compareAndSet(false, true) updates the completed flag in a thread safe fashion

    Testing request timeout Servlet Filter

    For the test purposes let us create a simple Spring Boot MVC application written in Kotlin:

    @SpringBootApplication
    @EnableWebMvc
    class Application {
        companion object {
            @JvmStatic
            fun main(args: Array<String>) {
                SpringApplication.run(Application::class.java)
            }
        }
    }
    
    @RestController
    class TimeoutController {
        @GetMapping("/timeout")
        fun timeout(@RequestParam(required = false) timeoutInMillis: Long?): ResponseEntity<*> {
            Thread.sleep(timeoutInMillis ?: 1000)
            return ResponseEntity.ok("completed")
        }
    }
    

    The TimeoutController will sleep for an amount of time given in a parameter. Let’s simulate a short request with httpie:

    http :8080/timeout timeoutInMillis==2000
    
    HTTP/1.1 200 
    Content-Length: 9
    Content-Type: text/plain;charset=ISO-8859-1
    Date: Mon, 27 Nov 2017 12:19:03 GMT
    
    completed
    
    

    This was the happy path. Now let’s try a timeout path:

    http  :8080/timeout timeoutInMillis==6000       
    
    HTTP/1.1 500 
    Connection: close
    Content-Type: application/json;charset=UTF-8
    Date: Mon, 27 Nov 2017 12:21:30 GMT
    Transfer-Encoding: chunked
    
    {
        "error": "Internal Server Error",
        "exception": "java.lang.InterruptedException",
        "message": "sleep interrupted",
        "path": "/timeout",
        "status": 500,
        "timestamp": 1511785290518
    }
    

    As you can see in the exception message, we see that the Thread.sleep in the controller action has been interrupted 🎉

    A word of warning

    The above Servlet Filter will not work if we use Async Servlet Filters. When using Async Servlet Filter there is typically more than 1 thread that handles a request hence the above approach will not work. Having said that if you use Async Servlet Filter there already is a way to apply a timeout that is defined by the API. Another important point is to check how the request processing thread pool handles interrupted threads. As we have discussed earlier, the concrete implementation of thread pool used to process request depends on servlet container and configured used in the application. We should make sure that the interrupted thread is eventually replaced with a new thread by the pool so that timeouts do not change the effective thread pool size.

  • Request timeouts in Spring MVC

    21 November 2017

    Last time we reviewed how to configure HTTP client timeouts. This time let us focus on the other side of the HTTP request i.e. server. There is pretty much always a thread pool involved when we write a Spring MVC application. The thread pool configuration will vary depending on particular servlet container (Tomcat, Undertow, Jetty) so we have to watch out for subtle differences. However, most if not all of them will use a thread pool with fixed maximum size. As we already know, a pool of resources might get exhausted. In particular, a thread pool is more likely to get exhausted if we do not control timeouts diligently.

    Threads involved in a Spring MVC request handling

    A typical servlet container will use one or more thread pools to handle a request. In particular one of the thread pools is used to execute the Spring MVC part of request handling. Let us call this thread pool the request worker thread pool. The request worker thread pool will have a default maximum size:

    • Tomcat: server.tomcat.max-threads controlling maxThreads with a default of 200
    • Undertow: server.undertow.worker-threads controlling WORKER_TASK_CORE_THREADS with a default of availableProcessors() * 8
    • Jetty: There is no Spring configuration property available currently. One can customize the Jetty Thread Pool through code and jetty specific configuration though. The default maximum number of worker threads is 200.

    Thread pool

    What happens when the request processing thread pool is empty?

    Once the request processing thread pool is empty, the servlet container will typically queue the requests. The queue is processed by the request processing thread pool. Queueing up requests is consuming server memory and sockets thus there typically is a limit after which a new incoming request is going to be immediately rejected.

    • Tomcat: server.tomcat.accept-count Maximum queue length for incoming connection requests when all possible request processing threads are in use. The default value is 100.
    • Undertow: As far as I can tell by default the requests will be queued and the only bound is system capacity. There is Request Limiting Handler available though that allows configuring maximum concurrent requests as well as maximum queue size.
    • Jetty: By default will queue requests using unbounded queue. You can configure it though as documented:
    <Configure id="Server" class="org.eclipse.jetty.server.Server">
        <Set name="ThreadPool">
          <New class="org.eclipse.jetty.util.thread.QueuedThreadPool">
            <!-- specify a bounded queue -->
            <Arg>
               <New class="java.util.concurrent.ArrayBlockingQueue">
                  <Arg type="int">6000</Arg>
               </New>
          </Arg>
            <Set name="minThreads">10</Set>
            <Set name="maxThreads">200</Set>
            <Set name="detailedDump">false</Set>
          </New>
        </Set>
    </Configure>
    

    Queuing requests is necessary in the most commons scenarios to handle temporary spikes in load. For example, if your application can handle 100 requests per second, and if you can allow it to recover from one minute of excessive high load, you can set the queue capacity to 60*100=6000.

    Let us assume that the thread pool (max) size is 100 and that on average a request takes 1 second to process. In such server we can thus handle 100 requests per second (rps). Any requests over the rps limit is going to be queued. Now imagine we have a single type of request that for some reason takes much longer to process than usual e.g. 120 seconds due to some dependent service issue. When such request is processed, it will first block subsequent requesting processing threads until all of them are busy waiting. Depending on the limit of queue size and system capacity our server will soon start rejecting all new requests. It is worth noting that the slow requests are also going to be put in queue after thread pool capacity is reached.

    One of the ways to mitigate the issue and speed up system recovery is to apply timeouts. When a timeout for a particular request elapses ideally few things should happen:

    • the client should be notified about the error (503, 504 or 408 depending on the use case)
    • the request should be removed from the processing queue
    • the thread processing the requests should be interrupted

    Let’s review what options are available.

    • Tomcat has Stuck Thread Detection Valve:

      This valve allows to detect requests that take a long time to process, which might indicate that the thread that is processing it is stuck. Additionally it can optionally interrupt such threads to try and unblock them. The valve has 2 configuration options:

    • threshold: Minimum duration in seconds after which a thread is considered stuck. Default is 600 seconds.
    • interruptThreadThreshold: Minimum duration in seconds after which a stuck thread should be interrupted to attempt to “free” it.

    AFAIK the valve only applies to requests that did start processing by the thread pool.

    • Undertow and Jetty do not allow for setting a request timeout directly. They both do have idle connection detection and can timeout it accordingly. Unfortunately since HTTP/2 multiplexing the timeouts options may not be suitable to timeout a single request.

    • In Spring MVC there is no way to configure a timeout unless you use async method. With async method one can use spring.mvc.async.request-timeout= to set amount of time (in milliseconds) before asynchronous request handling times out. However, using Async Servlet with Spring MVC requires changing the controller methods return types.

    There is no standard request timeout configuration

    There are only couple of options available to set encompass request handling with a timeout. This is partially due to historical reasons. The servlet container specification did not consider timeouts until Async Servlet was defined. Another reason is that the there is no way to safely stop a thread that a framework could use. The application code needs to cooperate to safely terminate the request handling execution. In the next post we will show how to add a request timeout to a Spring MVC application.

  • HTTP client timeouts

    05 November 2017

    We have already touched upon the importance of timeouts and described most important related JDBC knobs. The next aspect of timeouts I would like to focus on is using API clients. Specifically HTTP clients which are by far the most popular. We will review couple of popular HTTP client libraries and their configuration regarding timeouts.

    Waiting

    HttpURLConnection timeouts

    HttpURLConnection available since JDK 1.1 has gained the ability to timeout its network communication in version JDK 5. The 2 available timeouts setConnectionTimeout, setReadTimeout control how long to wait until connection is established and how long to wait for a data from the server respectively. The default values are infinite ‼️.

    Apache HttpClient timeouts

    HttpClient from Apache HttpComponents suite has been a standard choice for http communication. It is a mature project, with rich API that fills many HttpURLConnection shortcomings e.g. connection pooling. Many of the APIs have been deprecated e.g. DefaultHttpClient, org.apache.http.params.CoreConnectionPNames hence one needs to be careful when setting the timeouts they fallback to system defined socket level defaults.

    There are 3 timeouts settings available:

    val requestConfig = RequestConfig.custom()
        // Determines the timeout in milliseconds until a connection is established.
        .setConnectTimeout(5_000) 
        // Defines the socket timeout in milliseconds,
        // which is the timeout for waiting for data or, put differently,
        // a maximum period inactivity between two consecutive data packets).
        .setSocketTimeout(5_000)
        // Returns the timeout in milliseconds used when requesting a connection
        // from the connection manager.
        .setConnectionRequestTimeout(2_000)
        .build()
    

    The requestConfig can be further used as a default for an HttpClient instance:

    val httpClient = HttpClients.custom()
        .setDefaultRequestConfig(requestConfig)
        .build()
    

    It is also possible to configure each request separately:

    val get = HttpGet("http://httpbin.org/get").apply { 
        config = requestConfig
    }
    httpClient.execute(get)
    

    OkHttp

    OkHttp is my favorite HTTP & HTTP/2 client for Android and Java applications. It is efficient and has good configuration defaults. There are 3 timeout settings available:

    val client = OkHttpClient.Builder()
        // Sets the default connect timeout for new connections.
        .connectTimeout(5, TimeUnit.SECONDS)
        // Sets the default read timeout for new connections.
        .readTimeout(10, TimeUnit.SECONDS)
        // Sets the default write timeout for new connections.
        .writeTimeout(20, TimeUnit.SECONDS)
        .build()
    

    All connectTimeout, readTimeout and writeTimeout default to 10 seconds 👍.

    XMLHttpRequest and Fetch API timeouts

    XMLHttpRequest is the standard foundation of network communication of Web application for over 10 years now. Nowadays it is being replaced with Fetch API but it still is, and will continue to be, the most popular for couple of years. There is only a single timeout configuration available in XMLHttpRequest:

    The XMLHttpRequest.timeout property is an unsigned long representing the number of milliseconds a request can take before automatically being terminated. The default value is 0, which means there is no timeout.

    Default is infinite ‼️

    Since the default value is not configured we should diligently set the timeout in our code! It may be tempting to think that client side timeout is not so important compared to the one on the server. This is a questionable attitude to say the least. We need to keep in mind that there is a hard limit on the number of connections a browser will make to a single domain which is very important if we use HTTP 1.*. When we reach maximum number of concurrently opened connections, any new XMLHttpRequest is going to be queued indefinitely. The limit value varies in browsers and the recent RCF relaxes it. HTTP/2 alleviates the issue with connection multiplexing nonetheless its adoption is still low. According to w3techs it is about 20% as of today. The timeout value used in XMLHttpRequest is even more important in Single Page Applications. In SPAs the XMLHttpRequest without a timeout can live for as long as server and intermediate network parties allow effectively blocking all subsequent network calls.

    Fetch API is meant to replace XMLHttpRequest. It is thus sad that the ability to timeout a request has not yet made it into the standard. Currently there is no standard way to enforce a timeout. There are couple of GitHub issues active: Add timeout option, Add option to reject the fetch promise automatically after a certain time elapsed that go over potential solutions. There was a proposal for cancelable promises which had been withdrawn after lots of discussion and lack of consensus. A brand new way has recently been implemented by Edge and Firefox allows one to timeout a Fetch API call 🎉 through the DOM standardized AbortController. Hopefully it will get into the Fetch API standard soon.

    const controller = new AbortController();
    const signal = controller.signal;
    
    setTimeout(() => controller.abort(), 5000);
    
    fetch(url, { signal }).then(response => {
      return response.text();
    }).then(text => {
      console.log(text);
    });
    

    URLSession timeouts

    URLSession is the successor to NSURLConnection that underlays most if not all iOS http clients e.g. Alamofire. There are 2 main timeout values to configure both of which have default values available via URLSessionConfiguration.default:

    let sessionConfig = URLSessionConfiguration.default
    sessionConfig.timeoutIntervalForRequest = 20.0
    sessionConfig.timeoutIntervalForResource = 40.0
    
    let session = URLSession(configuration: sessionConfig)
    

    Fortunately there are default values configured:

    • timeoutIntervalForRequest:

      This property determines the request timeout interval for all tasks within sessions based on this configuration. The request timeout interval controls how long (in seconds) a task should wait for additional data to arrive before giving up. The timer associated with this value is reset whenever new data arrives. The default value is 60.

    • timeoutIntervalForResource:

      This property determines the resource timeout interval for all tasks within sessions based on this configuration. The resource timeout interval controls how long (in seconds) to wait for an entire resource to transfer before giving up. The default value is 7 days.

    Note that timeoutIntervalForResource is a higher level timeout that what we have considered in other HTTP clients. It encompasses retries and or request timeouts hence has a large default.

    Summary

    Many of HTTP clients do not have a good default timeout configuration. Hence, if you care about your application resource usage and system stability you have to carefully review and configure timeouts where applicable. It is reassuring to see that modern HTTP clients e.g. OkHttp and URLSession have a short but sane default.