Sunday, June 19, 2011

Unique Constraint in AppEngine Datastore

When you come to AppEngine with a relational database background, it is natural to want a unique constraint on a set of entities. You have a field that must be unique across all your entities of that type. You want to create one and only one of such entities in the face of race conditions.

The AppEngine Datastore has no unique constraint feature per se, but you can manually employ such a mechanism on your entities as you add them to the datastore. To do so requires these things:

  • You must use a transaction. (You probably expected this.)
  • You must use an ancestor query (using the transaction) to test if the entity already exists.
  • You must have a parent entity (to form an entity group).
  • You must utilize your unique field as the entity key as well. (Yes, that data will “exist twice” in the entity.)

 

A Demonstration

I don’t know about you, but when it comes to fundamental behavior such as this, I find it extremely critical to:

  1. understand how it behaves on the system (e.g. with Datastore, or Objectify, etc.)
  2. code it correctly.
  3. prove the coded implementation behaves correctly.

So I’ve built an isolated test case to demonstrate “unique constraint” behavior with Datastore (and with Objectify), and ultimately produced a canonical example of how to implement a “unique constraint” for Datastore on AppEngine. The live demo is at http://gaetestjig.appspot.com/ on the “Unique Constraint” tab. This demo shows two users “Alice” and “Bobby” trying to reserve the same seat in a reservation system. The “winning” request will write the entity to the Datastore while the other will fail.

datastore_unique_constraint_demo2

In the demo, you can alternate clicking “Advance Alice” and “Advance Bobby” buttons, moving each user closer to reserving the same seat. Once a seat has been reserved, click the “Reset Test” button, then you can experiment further.

What you will see is that two parties can find no pre-existing entity, each create their own new entity (with the same key) and not until “commit” time will one of them discover that the other has modified the entity group, and then fail with a java.util.ConcurrentModificationException. Since a ConcurrentModificationException is “normal”, you would retry your seat reservation, and you would subsequently discover there is a pre-existing entity and you will not be able perform your insert.

Note about demo app: On appengine, the underlying transaction has a timeout of approx 30+ seconds, so you need to step through all your “Advance” clicks within this timeframe, or you’ll lose your transaction and get an IllegalStateException. Furthermore, because this appengine app is not visited frequently, your visit will probably be a “cold start”, so you could easily hit this exception upon your first attempt to perform a test. Just try it again right away and it will work.

 

How to implement the “Unique Constraint”

One thing for certain will be unique per entity: its key. So we use our “seatId” as the entity key and we’re basically done. There can only be one such entity with that key. However, anyone can overwrite an entity by using the same key when they call put().

So we want to isolate our “inserts” in a transaction, and do a query inside that transaction to search for an entity that may already exist from a previous claim, and if we find such an entity, then we tell the user they were too late. (In the fine print of “What Can Be Done In a Transaction”, you will find that the only kind of queries you may perform in a transaction are “ancestor queries”, i.e., a query that has an ancestor filter.)

So this means our Seat entity must have a parent, so that all the Seat entities live in the same entity group. Remember we’re trying to stay unique across *all* Seat entities, so it makes sense that we need a common parent to blanket them all in an entity group. Datastore transactions need to be told the scope of what they can “lock”, and entity groups are that scope. (There’s really no locking; just checking of modification timestamps…)

So we have to create a single SeatRoot entity which will act as the parent of all our Seat entities. (If you have another resource with the potential of millions of “Seats”, then you could break down the SeatRoots into multiple roots so that each entity group has a reasonable amount of members.)

Here’s what the seat reservation code looks like:

package gae.testjig.server;
import java.util.ConcurrentModificationException;
import com.google.appengine.api.datastore.DatastoreService;
import com.google.appengine.api.datastore.DatastoreServiceFactory;
import com.google.appengine.api.datastore.Entity;
import com.google.appengine.api.datastore.Key;
import com.google.appengine.api.datastore.Query;
import com.google.appengine.api.datastore.Transaction;
public class ConstraintExample {
    static private Key seatsRootKey;
    static {
        DatastoreService ds = DatastoreServiceFactory.getDatastoreService();
        Entity rootEntity = new Entity("SeatsRoot", "seats_root_key_name"); // we want only one of these entities to ever exist
        ds.put(rootEntity);
        seatsRootKey = rootEntity.getKey();
    }
    
    public void reserveSeat(String ownerName, String seatId) throws DuplicateException {
        for (int i=0;i<10;i++) {
            try {
                reserveSeatAttempt(ownerName, seatId);
                return; // we get here if reservation succeeds
            }
            catch (ConcurrentModificationException cme) {
                // stay in the loop and try again.
            }
            // you could use another backoff algorithm here rather than 100ms each time.
            try { Thread.sleep(100); } catch (InterruptedException e) {}
        }
        throw new ConcurrentModificationException("failed to reserve seat "+seatId);
    }
    private void reserveSeatAttempt(String ownerName, String seatId) throws DuplicateException, ConcurrentModificationException {
        DatastoreService datastore = DatastoreServiceFactory.getDatastoreService();
        Transaction txn = datastore.beginTransaction();
        try {
            Query testExistsQuery = new Query("Seat");
            testExistsQuery.setAncestor(seatsRootKey);
            testExistsQuery.addFilter("seatId", Query.FilterOperator.EQUAL, seatId);
            Entity exists = datastore.prepare(txn, testExistsQuery).asSingleEntity();
            if ( exists != null ) {
                throw new DuplicateException("seatId "+seatId+" already exists.");
            } else {
                Entity seatEntity = new Entity("Seat", seatId, seatsRootKey);
                seatEntity.setProperty("ownerName", ownerName);
                seatEntity.setProperty("seatId", seatId);
                seatEntity.setProperty("timeStamp", System.currentTimeMillis());
                datastore.put(txn, seatEntity);
                txn.commit(); // throws java.util.ConcurrentModificationException if entity group was modified by other thread
            }
        }
        finally {
            if (txn.isActive()) {
                txn.rollback();
            }
        }
    }
}

 

And here is what it looks like with Objectify:

package gae.testjig.ofy.dto;
import javax.persistence.Id;
import com.googlecode.objectify.annotation.Unindexed;
@Unindexed
public class OSeatsRoot {
    
    @Id private String entityId;
    
    public OSeatsRoot() {}
    
    // we want just one root, with entityId = "seats_root_key_name"
    public OSeatsRoot(String entityId) {
        this.entityId = entityId;
    }
    public void setEntityId(String entityId) {
        this.entityId = entityId;
    }
    public String getEntityId() {
        return entityId;
    }
}

 

package gae.testjig.ofy.dto;
import javax.persistence.Id;
import com.googlecode.objectify.Key;
import com.googlecode.objectify.annotation.Indexed;
import com.googlecode.objectify.annotation.Parent;
import com.googlecode.objectify.annotation.Unindexed;
@Unindexed
public class OSeat {
    
    @Id private String entityId;
    @Parent private Key<OSeatsRoot> seatRootKey;
    
    @Indexed private String seatId;
    private String ownerName;
    private long timeStamp;
    public OSeat() {}
    
    public OSeat(Key<OSeatsRoot> seatRootKey, String seatId, String ownerName) {
        this.entityId = seatId;
        this.seatRootKey = seatRootKey;
        this.seatId = seatId;
        this.ownerName = ownerName;
        this.timeStamp = System.currentTimeMillis();
    }
    
    public String getEntityId() {
        return entityId;
    }
    public void setEntityId(String entityId) {
        this.entityId = entityId;
    }
    public Key<OSeatsRoot> getSeatRootKey() {
        return seatRootKey;
    }
    public void setSeatRootKey(Key<OSeatsRoot> seatRootKey) {
        this.seatRootKey = seatRootKey;
    }
    public String getSeatId() {
        return seatId;
    }
    public void setSeatId(String seatId) {
        this.seatId = seatId;
    }
    public String getOwnerName() {
        return ownerName;
    }
    public void setOwnerName(String ownerName) {
        this.ownerName = ownerName;
    }
    public long getTimeStamp() {
        return timeStamp;
    }
    public void setTimeStamp(long timeStamp) {
        this.timeStamp = timeStamp;
    }
}

 

package gae.testjig.ofy.dao;
import gae.testjig.ofy.dto.OSeat;
import gae.testjig.ofy.dto.OSeatsRoot;
import gae.testjig.server.ChannelLogger;
import gae.testjig.server.DuplicateException;
import java.util.ConcurrentModificationException;
import com.google.appengine.api.datastore.KeyFactory;
import com.googlecode.objectify.Key;
import com.googlecode.objectify.Objectify;
import com.googlecode.objectify.ObjectifyService;
import com.googlecode.objectify.Query;
public class ODaoSeats {
    
    static private Key<OSeatsRoot> seatsRootKey;
    static {
        ObjectifyService.register(OSeatsRoot.class);
        ObjectifyService.register(OSeat.class);
        
        Objectify ofy = ObjectifyService.begin();
        OSeatsRoot rootEntity = new OSeatsRoot("seats_root_key_name");
        ofy.put(rootEntity);
        seatsRootKey = new Key<OSeatsRoot>(OSeatsRoot.class, rootEntity.getEntityId());
    }
    
    static public void reserveSeat(ChannelLogger logger, String ownerName, String seatId) throws DuplicateException {
        logger.info(ownerName+": BEGIN");
        for (int i=0;i<10;i++) {
            try {
                reserveSeatAttempt(logger, ownerName, seatId);
                logger.info(ownerName+": END");
                return; // we get here if reservation succeeds
            }
            catch (ConcurrentModificationException cme) {
                logger.info(ownerName+": EXCEPTION java.util.ConcurrentModificationException");
                // stay in the loop and try again.
            }
            // you could use another backoff algorithm here rather than 100ms each time.
            try { Thread.sleep(100); } catch (InterruptedException e) {}
        }
        logger.info(ownerName+": ABORT");
        throw new ConcurrentModificationException("failed to reserve seat "+seatId);
    }
    static private void reserveSeatAttempt(ChannelLogger logger, String ownerName, String seatId) throws DuplicateException, ConcurrentModificationException {
        logger.info(ownerName+": beginning transaction.");
        Objectify ofy = ObjectifyService.beginTransaction();
        logger.info(ownerName+": transactionId="+ofy.getTxn().getId());
        try {
            logger.info(ownerName+": test for existence of entity with seatId=" + seatId);
            Query<OSeat> testExistsQuery = ofy.query(OSeat.class).ancestor(seatsRootKey).filter("seatId =", seatId);
            OSeat exists = testExistsQuery.get();
            if ( exists != null ) {
                logger.info(ownerName+": sorry, that seat has already been taken.");
                throw new DuplicateException("seatId "+seatId+" already exists.");
            } else {
                logger.info(ownerName+": that seat has not been taken yet.");
                logger.info(ownerName+": create the seat entity with seatId=" + seatId);
                OSeat seat = new OSeat(seatsRootKey, seatId, ownerName);
                Key<OSeat> key = ofy.put(seat);
                logger.info(ownerName+": created seat. entity key = " + KeyFactory.keyToString(key.getRaw()));
                ofy.getTxn().commit(); // throws java.util.ConcurrentModificationException if entity group was modified by other thread
                logger.info(ownerName+": transaction committed.");
            }
        }
        finally {
            if (ofy.getTxn().isActive()) {
                ofy.getTxn().rollback();
            }
        }
    }
    
    static public String fetchSeatInfo(String seatId) {
        Objectify ofy = ObjectifyService.begin();
        Query<OSeat> testExistsQuery = ofy.query(OSeat.class).filter("seatId =", seatId);
        OSeat exists = testExistsQuery.get();
        if ( exists != null ) {
            return "Found seat entity: seatId="+exists.getSeatId()+", ownerName="+exists.getOwnerName()+", timestamp="+exists.getTimeStamp();
        } else {
            return "There is no entity in the datastore with seatId="+seatId;
        }
    }
    
    static public void deleteSeatEntity(String seatId) {
        Objectify ofy = ObjectifyService.begin();
        Key<OSeat> key = new Key<OSeat>(seatsRootKey, OSeat.class, seatId);
        ofy.delete(key);
    }
    
}

 

You can find all this source code here.

Limitations: You can only have a single “unique constraint” on your entity. The pre-exists query is an ancestor query, which forces us to search by entity parameters rather than just lookup the entity by its key. Likewise, those entity parameters must correspond directly with the unique key of the entity. It is ultimately the key that is unique, and you can only have one of them. This is also why we end up with “duplicated data” in our entity, i.e. the seatId field is also used as the entityId separately.