在 Java Servlet 3.0 標準推出之前,如果想要實作 asynchronous 的 servlet 必須使用像 Comet 這樣的架構,而現在 Servlet 3.0 API 直接支援 asynchronous 與 synchronous 兩種模式,而因為這是公開的標準,所以寫好的 Servlet 可以很方便的移植到各種符合 Servlet 3.0 的 app server 中(例如 Tomcat 7 或是 GlassFish 3.x)。
在使用 synchronous servlets 時,處理 client HTTP request 的執行序(thread)會跟處理 request 的過程綁在一起,而在處理工作量比較大、等待時間比較久的 request 時(通常在等待外部的 IO 或是其他耗時的動作),該執行序就只能等待工作完成,才能執行下一步,這樣的狀況如果在 request 數量很多的時候,就有可能會造成伺服器的執行序不足的問題,進而影響伺服器整體的效能。
這種情況主要是由於伺服器上的執行序都在等待外部的工作完成(其實這時候它們都在休息),但是從 client 來的 request 數量又太多時,就會很容易把有限的執行序都佔滿,造成伺服器滿載的狀況,但是其實伺服器的執行序都在休息!
Asynchronous servlets 基本上就是為了處理這樣的狀況而設計的,它讓處理 request 的執行序要等待外部工作的時候,可以去處理其他的 request,等到外部工作完成之後,伺服器會找另外一個執行序來把處理完成的外部工作結果送回 client,在這種架構下,client 不會感覺有任何不同,因為這些執行序與工作的分配都是在伺服器上面發生的,也只跟伺服器本身有關,所有的 request 資料還是跟以前一樣。
要達到這樣的工作處理流程,最關鍵的部分就是在外部工作完成時,要如何通知伺服器來領回處理完成的結果,這裡是它所使用的方式是透過 callback 函數的方式(這個 callback 函數是由伺服器本身提供)。
以下是伺服器處理 request 的 pseudo code:
service()
函數來處理 request。service()
函數必須建立一個 AsyncContext
物件(使用 startAsync()
函數)。service()
函數必須建立一個 Runnable
物件,並傳遞給另一個執行序的 Executor
以執行真正要做的工作。Runnable
物件中會包含一個 AsyncContext
的 reference(因為在工作執行完成時,要通知伺服器領回)。service()
函數執行結束。(這看起來不太直覺,但是在 asynchronous 的架構就是這樣)在這個時候 service()
函數雖然執行結束了,但是因為真正的工作還在另外一個執行序中(Executor
物件)處理,所以 client 事實上還在等待,等到真正的工作完成時,伺服器就會接著下面的動作:
Executor
的工作完成時,會通知 AsyncContext
:它會將結果寫進 HttpResponse
中,然後呼叫 AsyncContext
的 complete()
函數,這樣就會促使伺服器把處理完成的結果送回 client。如果在 Runnable
中所執行的工作出錯時,這時候 Runnable
還沒把結果傳回給 Executor
,client 在這種情況下只會收到某種一般的網路錯誤訊息,如果想要自己處理可能發生的錯誤,提供詳細的錯誤訊息,可以用下面的方式:
Runnable
在設定的 timeout 時間內沒有正常完成工作的話,就會發出 timeout 的錯誤。這樣的話,如果 Runnable
在處理工作時出現錯誤,超過我們指定的 timeout 時間沒有完成時,則我們自己撰寫的 listener 就會被呼叫,通知你發生 timeout 錯誤了,這時候你就可以在 listener 中產生要給使用者看的錯誤訊息。如果這時候 Runnable
還繼續嘗試寫入 HttpResponse
物件時或是呼叫 AsyncContext.complete()
函數,都會產生例外(exception),基本上在這裡應該要把之前所產生的輸出全部丟棄,只輸出錯誤訊息。
以下是兩個 asynchronous servlets 範例程式,一個是比較簡單的範例,示範最基本的 asynchronous servlet 的撰寫方式。而另一個則是比較複雜的範例,發生 timeout 錯誤時,該怎麼處理。
基本上只要符合 Java Servlet 3.0 規格的伺服器都應該可以執行這裡的範例程式,而這裡的伺服器環境是使用 Tomcat 7。
這個範例是示範基本的 asynchronous servlets 與 web.xml 內容(這也是 Servlet 3.0 的內容之一)。以下是伺服器端的 servlet 程式碼:
@javax.servlet.annotation.WebServlet( // servlet name name = "simple", // servlet url pattern value = {"/simple"}, // async support needed asyncSupported = true ) public class SimpleAsyncServlet extends HttpServlet { /** * Simply spawn a new thread (from the app server's pool) for every new async request. * Will consume a lot more threads for many concurrent requests. */ public void service(ServletRequest req, final ServletResponse res) throws ServletException, IOException { // create the async context, otherwise getAsyncContext() will be null final AsyncContext ctx = req.startAsync(); // set the timeout ctx.setTimeout(30000); // attach listener to respond to lifecycle events of this AsyncContext ctx.addListener(new AsyncListener() { public void onComplete(AsyncEvent event) throws IOException { log("onComplete called"); } public void onTimeout(AsyncEvent event) throws IOException { log("onTimeout called"); } public void onError(AsyncEvent event) throws IOException { log("onError called"); } public void onStartAsync(AsyncEvent event) throws IOException { log("onStartAsync called"); } }); // spawn some task in a background thread ctx.start(new Runnable() { public void run() { try { ctx.getResponse().getWriter().write( MessageFormat.format("<h1>Processing task in bgt_id:[{0}]</h1>", Thread.currentThread().getId())); } catch (IOException e) { log("Problem processing task", e); } ctx.complete(); } }); } }
這段程式碼有一些需要注意的地方:
asyncSupported=true
告訴伺服器這個 servlet 需要 asynchronous 模式。service()
函數中,timeout 的時間設定為 30 秒,所以如果 Runnable
在 30 秒之內做完所有的工作並且呼叫 complete()
,就不會有 timeout 錯誤產生。Runnable
物件將真正要做的主要工作(也是最耗時的部分)包裝起來,交給另外一個執行序執行。Runnable
所負責的工作是取得目前執行序的 ID,然後交給伺服器傳回到 client 端。以下是 client 端的 Java 測試程式碼:
public class LoadTester { public static final AtomicInteger counter = new AtomicInteger(0); public static final int maxThreadCount = 100; public static void main(String[] args) throws InterruptedException { new LoadTester(); } public LoadTester() throws InterruptedException { // call simple servlet ExecutorService exec1 = Executors.newCachedThreadPool(); for (int i = 0; i < maxThreadCount; i++) { exec1.submit(new UrlReaderTask("http://localhost:8080/test/simple")); } exec1.shutdown(); Thread.currentThread().sleep(5000); System.out.println("....NEXT...."); // call complex servlet counter.set(0); ExecutorService exec2 = Executors.newCachedThreadPool(); for (int i = 0; i < maxThreadCount; i++) { exec2.submit(new UrlReaderTask("http://localhost:8080/test/complex")); } exec2.awaitTermination(1, TimeUnit.DAYS); } public class UrlReaderTask implements Runnable { private String endpoint; public UrlReaderTask(String s) { endpoint = s; } public void run() { try { actuallyrun(); } catch (Exception e) { System.err.println(e.toString()); } } public void actuallyrun() throws Exception { int count = counter.addAndGet(1); BufferedReader in = new BufferedReader( new InputStreamReader(new URL(endpoint).openStream())); String inputLine; while ((inputLine = in.readLine()) != null) { System.out.println(MessageFormat.format("thread[{0}] : {1} : {2}", count, inputLine, endpoint)); } in.close(); } } } //end class ComplexLoadTester
這個簡單的 Java 程式會產生 100 個執行序,同時建立 HTTP 的連線連到我們寫的 asynchronous servlets,然後輸出從伺服器上取得的執行序 ID 資訊。
在這個範例中,我們取得的執行序 ID 會比較雜亂,因為這些執行序是來自於 Tomcat 7 伺服器的 thread pool。
這個比較複雜的範例是將上面的範例再做一些改變:
service()
中產生例外(exception)。sleep()
函數讓程式休息 0 到 5 秒(隨機產生),而 AsyncContext
的 timeout 的時間上限則設為 60 秒,所以大約最後的 20 筆 requests 會出現 timeout 的錯誤,因為我們總共只有 3 個執行序可以同時處理 100 筆 requests,排在後面的 requests 就會來不及在 60 秒之內處理完。AsyncContext
的 listener 必須要自己呼叫 AsyncContext.complete()
函數。 AsyncContext
中的 HttpRequest
與 HttpResponse
物件設為 invalid,透過這樣的方式通知正在執行的工作說 AsyncContext
已經是 invalid 的狀態了,這也是為什麼在 Runnable
中執行完成後,在寫入 response 之前,要先檢查它是不是 null
,這個動作請記得一定要做,因為在 timeout 錯誤發生時,Runnable
這邊正在執行的工作可能無法得知 timeout 的錯誤,只能靠檢查 request 與 response 是否為 null
的方式來判斷,如果檢查出來是 null
,則所有正在執行的工作都可以終止了,因為這個時候 AsyncContext
的 listener 或伺服器已經將 timeout 的錯誤訊息傳給 client 端了,縱使你把所有的工作算完也沒有用了。以下是這個範例的程式碼:
@javax.servlet.annotation.WebServlet( // servlet name name = "complex", // servlet url pattern value = {"/complex"}, // async support needed asyncSupported = true, // servlet init params initParams = { @WebInitParam(name = "threadpoolsize", value = "3") } ) public class ComplexAsyncServlet extends HttpServlet { public static final AtomicInteger counter = new AtomicInteger(0); public static final int CALLBACK_TIMEOUT = 60000; public static final int MAX_SIMULATED_TASK_LENGTH_MS = 5000; /** executor svc */ private ExecutorService exec; /** create the executor */ public void init() throws ServletException { int size = Integer.parseInt( getInitParameter("threadpoolsize")); exec = Executors.newFixedThreadPool(size); } /** destroy the executor */ public void destroy() { exec.shutdown(); } /** * Spawn the task on the provided {@link #exec} object. * This limits the max number of threads in the * pool that can be spawned and puts a ceiling on * the max number of threads that can be used to * the init param "threadpoolsize". */ public void service(final ServletRequest req, final ServletResponse res) throws ServletException, IOException { // create the async context, otherwise getAsyncContext() will be null final AsyncContext ctx = req.startAsync(); // set the timeout ctx.setTimeout(CALLBACK_TIMEOUT); // attach listener to respond to lifecycle events of this AsyncContext ctx.addListener(new AsyncListener() { /** complete() has already been called on the async context, nothing to do */ public void onComplete(AsyncEvent event) throws IOException { } /** timeout has occured in async task... handle it */ public void onTimeout(AsyncEvent event) throws IOException { log("onTimeout called"); log(event.toString()); ctx.getResponse().getWriter().write("TIMEOUT"); ctx.complete(); } /** THIS NEVER GETS CALLED - error has occured in async task... handle it */ public void onError(AsyncEvent event) throws IOException { log("onError called"); log(event.toString()); ctx.getResponse().getWriter().write("ERROR"); ctx.complete(); } /** async context has started, nothing to do */ public void onStartAsync(AsyncEvent event) throws IOException { } }); // simulate error - this does not cause onError - causes network error on client side if (counter.addAndGet(1) < 5) { throw new IndexOutOfBoundsException("Simulated error"); } else { // spawn some task to be run in executor enqueLongRunningTask(ctx); } } /** * if something goes wrong in the task, it simply causes timeout condition * that causes the async context listener to be invoked (after the fact) * <p/> * if the {@link AsyncContext#getResponse()} is null, that means this * context has already timedout (and context listener has been invoked). */ private void enqueLongRunningTask(final AsyncContext ctx) { exec.execute(new Runnable() { public void run() { try { // simulate random delay int delay = new Random().nextInt(MAX_SIMULATED_TASK_LENGTH_MS); Thread.currentThread().sleep(delay); // response is null if the context has already timedout // (at this point the app server has called the listener already) ServletResponse response = ctx.getResponse(); if (response != null) { response.getWriter().write( MessageFormat.format( "<h1>Processing task in bgt_id:[{0}], delay:{1}</h1>", Thread.currentThread().getId(), delay) ); ctx.complete(); } else { throw new IllegalStateException( "Response object from context is null!"); } } catch (Exception e) { log("Problem processing task", e); e.printStackTrace(); } } }); } }
這個範例的 client 程式是與上一個範例共用的,也就是說我們是使用一個 client 程式同時測試兩個不同的 servlets。
當你使用 client 程式同時產生 100 筆 requests 來測試這個比較複雜的範例的時候,有幾個地方是需要注意的:
asynchronous servlet 所帶給伺服器的效能提昇是顯而易見的,而且基本上只要你對於 asynchronous 的處理流程夠了解,Servlet API 3.0 所提供的 asynchronous API 應該是很容易使用的,所以如果你的伺服器端會常常處理耗時的工作,就可以改用這樣的架構。
但如果你沒有搞清楚 asynchronous 的處理方式,則面對這些不太直覺的 callback 使用方式,可能會讓你更頭痛,縱使使用 asynchronous servlet 可以讓伺服器的效能提昇,但是程式開發者還是要很了解這個架構的運行方式,以免為了提昇效能反而造成程式出錯。
除了 asynchronous 之外,Tomcat 7 與 Servlet API 3.0 也使用一些 annotations 可以讓設定 servlet 更容易,並且還可以動態載入 servlet(loading servlets programmatically),有興趣的人可以自己查詢相關的資料。