Tuesday, December 21, 2010

using POST with a TaskQueue on AppEngine (Java)

The AppEngine docs for Task Queues (in Java) lack a sufficient example using a POST method.

It is simple to see how using a GET method will simply encode all information into the URL itself, and the servlet that accepts the task must simply deconstruct the URL (and maybe some query parameters) in order to perform its duty.

But how about receiving a POST? How does this match up with how you filled the payload? Yes it is straightforward, but the docs never explicitly tell you to pair your call to payload() with a call to getReader() in the receiving servlet. (Or you might have a need for getInputStream() instead.)

So here’s my example: I need to query for objects in the DataStore that each need processing. In the same query I need to mark those objects so I know they have been queued for processing.

A transaction is needed to make this atomic. But you can only add up to 5 things to a queue during a transaction. So the strategy is to only add 1 thing to a queue. Accumulate those object keys into a StringBuilder, and POST them all at once to a task queue.

  static public int queueObjectsNeedingProcessing() {
    int count = 0;
    Queue queue = QueueFactory.getQueue("queue-for-doing-processing");
    PersistenceManager pm = PMF.getForTransactions().getPersistenceManager();
    try {
      pm.currentTransaction().begin();
      
      Query query = pm.newQuery(Foo.class);
      // query.setFilter( ... );
      // and other query setup...
      StringBuilder buf = new StringBuilder();
      List<Foo> results = (List<Foo>) query.execute();
      for ( Foo f : results ) {
        String key = KeyFactory.keyToString(f.getId());
        buf.append(key);
        buf.append("\n");
        f.markAsQueued();
        count++;
      }
      TaskOptions taskOptions = TaskOptions.Builder.withUrl("/tasks/process-group").method(Method.POST);
      taskOptions.payload(buf.toString());
      queue.add(taskOptions);
      pm.currentTransaction().commit();
      return count;
    }
    finally {
      if (pm.currentTransaction().isActive()) {
        pm.currentTransaction().rollback();
      }
      pm.close();
    }
  }

 

On the other end, the servlet that receives the POST at /context/tasks/process-group simply recovers the posted String from the InputStream found via the HTTP request object. But rather than using the actual low level InputStream, life is better using the BufferedReader provided by request.getReader().

public class TaskWorkerServlet extends HttpServlet {
  
  final static private int SC_INTERNAL_SERVER_ERROR = 500;
  final static private String PATH_GROUP = "/process-group";
  
  @Override
  protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
    String pathInfo = req.getPathInfo();
    if ( pathInfo.equals(PATH_GROUP) ) {
      BufferedReader buf = req.getReader();
      String line = buf.readLine();
      while ( line != null ) {
        handleOneLine(line);
        // if you want this task to "fail" and retry the whole thing,
        // then return a HTTP error not in 2xx range. I am glossing over this.
        // resp.setStatus(SC_INTERNAL_SERVER_ERROR);
        line = buf.readLine();        
      }
      
    }
  }
}