Synchronizing @Schedule method which runs on multiple JBoss servers

Recently we’ve faced a problem of how to synchronize a method with the @Schedule annotation (java.ejb.Schedule) which runs on multiple JBoss servers. Method with this annotation is used as the timeout callback method. It’s simply a method that’s invoked when a timer fires. An analogy is an alarm clock that starts ringing at a certain time. So if you want to send an e-mail every day at midnight with statistics of how many comments were written on your forum during previous day, this is an ideal solution. I highly recommend everyone who isn’t familiar with this API to explore it (http://docs.oracle.com/javaee/6/tutorial/doc/bnboy.html or EJB 3 in Action, Second Edition, Chapter 7). It supports CRON, timer creation during runtime, …

We use scheduling quite a lot in our projects. Mostly for periodical e-mails with statistics and alerts. Moreover, we use it even for more sophisticated things like regular consistency check of stored files.

Everything is fine as long as your program runs on only one server (in our case JBoss AS 7.x). When you have two servers (or more) things start to be complicated. You’ve to provide some kind of synchronization. Failure to do so, you will for example send multiple identical e-mails instead of one. Why? Each server has a timer stored locally and doesn’t share it or synchronize it with other servers. So each timer runs independently of the others.

There’s multiple solutions:

  1. You will define one server as master and others as slaves. Only master will execute schedule methods. It’s very easy to implement. You place property file on every server and during the execution of schedule method program will check this property file. If it finds that it is slave it will simply do nothing. Problem is that now you have single point of failure. If master crashes all your schedule methods won’t be executed.
  2. Custom synchronization via database.
  3. Clustered Singleton (HA Singleton). It’s service that is deployed on multiple nodes in a cluster, but is providing its service on only one of the nodes.
  4. Third party framework like Quartz Scheduler.

We decided between the second and third solution. Finally, we picked the second option. HA Singleton is available for JBoss 7.x but it suffers from lack of documentation, we had no experience with it and were under time pressure.

Why we didn’t use the Quartz Scheduler? At the beginning we only needed to synchronize sending of e-mails and for that Quartz Scheduler looked a bit of overkill.

So here is our implementation. First you need synchronization point in a database (we run on Oracle 11g):

import java.io.Serializable;

import java.util.Date;

import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.NamedQueries;
import javax.persistence.NamedQuery;
import javax.persistence.PrePersist;
import javax.persistence.Table;
import javax.persistence.Temporal;
import javax.persistence.TemporalType;

@Entity
@NamedQueries({
              @NamedQuery(name = "JobScheduler.findAll", query = "select o from JobScheduler o"),
              @NamedQuery(name = "JobScheduler.setJob",
                          query =
                          "update JobScheduler o set o.status = :newStatus where o.jobName = :jobName and o.status = :status"),
    })
@Table(name = "JOB_SCHEDULER")
public class JobScheduler implements Serializable {

    public enum Status {
        IDLE("IDLE"),
        RUNNING("RUNNING");

        private String value;

        Status(String value) {
            this.value = value;
        }

        public String getValue() {
            return value;
        }
    }

    public enum JobName {
        SEND_STATISTICS("SEND_STATISTICS");

        private String jobName;

        JobName(String jobName) {
            this.jobName = jobName;
        }

        public String getJobName() {
            return jobName;
        }
    }

    @Id
    @Column(name = "JOB_NAME", nullable = false)
    private String jobName;

    @Column(name = "STATUS", nullable = false)
    private String status;

    @Temporal(TemporalType.TIMESTAMP)
    @Column(name = "CREATED_DATE")
    private Date createdDate;

    @PrePersist
    protected void onCreate() {
        createdDate = new Date();
    }

    public JobScheduler() {
    }

    public void setJobName(String jobName) {
        this.jobName = jobName;
    }

    public String getJobName() {
        return jobName;
    }

    public void setStatus(String status) {
        this.status = status;
    }

    public String getStatus() {
        return status;
    }

    public Date getCreatedDate() {
        return createdDate;
    }

}

Entity is very simple:

  • JOB_NAME: purpose of the given schedule method and also a primary key.
  • STATUS: either IDLE (no server performs this job) or RUNNING (one of the servers perform this job).
  • CREATED_DATE: for informational purposes only.

Very important is named query JobScheduler.setJob. It atomically sets job status and return a number of modified rows. If the number is 1, you know, that you switch job status successfully. That’s is all you need to know. And yes, this is typical lock.

Now you need some EJB which implements logic of this locking mechanism:

import javax.annotation.Resource;

import javax.ejb.EJB;
import javax.ejb.SessionContext;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;

import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
import javax.persistence.Query;

@Stateless
public class JobSchedulerSessionEJBBean implements JobSchedulerSessionEJBLocal {

    @PersistenceContext(unitName = "DB")
    private EntityManager em;

    public JobSchedulerSessionEJBBean() {
    }

    @Override
    @TransactionAttribute(TransactionAttributeType.REQUIRES_NEW)
    public boolean runJob(JobScheduler.JobName jobName) {
        Query query = em.createNamedQuery("JobScheduler.setJob");
        query.setParameter("newStatus", JobScheduler.Status.RUNNING.getValue());
        query.setParameter("jobName", jobName.getJobName());
        query.setParameter("status", JobScheduler.Status.IDLE.getValue());
        int howManyUpdatedRows = query.executeUpdate();

        return (howManyUpdatedRows == 1);
    }

    @Override
    @TransactionAttribute(TransactionAttributeType.REQUIRES_NEW)
    public boolean releaseJob(JobScheduler.JobName jobName) {
        Query query = em.createNamedQuery("JobScheduler.setJob");
        query.setParameter("newStatus", JobScheduler.Status.IDLE.getValue());
        query.setParameter("jobName", jobName.getJobName());
        query.setParameter("status", JobScheduler.Status.RUNNING.getValue());
        int howManyUpdatedRows = query.executeUpdate();

        return (howManyUpdatedRows == 1);
    }

}
import javax.ejb.Local;

@Local
public interface JobSchedulerSessionEJBLocal {
    
    public boolean runJob(JobScheduler.JobName jobName);
    public boolean releaseJob(JobScheduler.JobName jobName);
    
}

Method runJob(JobScheduler.JobName jobName) has only one parameter and that is job you want to obtain. It returns true if you obtain the lock for the job and you can run it. False if some other server already obtained lock for this job.

Now there is two potential problems:

  1. What if an exception occurs between runJob and releaseJob methods, or even the JVM crashes? First problem is quite simple: always place releaseJob method inside the finally block. For second problem use an EJB Startup Service which will check if some job isn’t stuck in RUNNING state during the application server startup. If it’s just update it to IDLE. Hey, you can even have job for this so only one server will update value to IDLE (save resources, multiple updates which will do the same thing are unnecessary).
  2. If your servers dont’t have the exact same time this can happen:
    • We’ve job that is timed to fire at 18:00:00.
    • Server A at 18:00:00 runs method runJob, obtains lock and starts executing its logic.
    • Server B is delayed by five seconds. So when it thinks is 18:00:00 it’s actually 18:00:05. At this time Server A already finished its releaseJob method, so status is IDLE.
    • Server B will obtain the job and execute the same logic again.
    • If logic is for example sending an e-mail, then now you sent two exactly same e-mails.

How to prevent this? Add column LAST_RUN to JobScheduler entity and extend enum JobName with attribute intervalInMinutes. Now during the runJob method you’ll fill this column with actual time and also check if job should runs again using the intervalInMinutes attribute. Use database time because is same for everyone server.

If you know that your job should runs every one hour you can easily check if the last run was during previous hour or not.

import java.io.Serializable;

import java.util.Date;

import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.NamedQueries;
import javax.persistence.NamedQuery;
import javax.persistence.PrePersist;
import javax.persistence.Table;
import javax.persistence.Temporal;
import javax.persistence.TemporalType;

@Entity
@NamedQueries({
              @NamedQuery(name = "JobScheduler.findAll", query = "select o from JobScheduler o"),
    })
@Table(name = "JOB_SCHEDULER")
public class JobScheduler implements Serializable {

    public enum Status {
        IDLE("IDLE"),
        RUNNING("RUNNING");

        private String value;

        Status(String value) {
            this.value = value;
        }

        public String getValue() {
            return value;
        }
    }

    public enum JobName {
        SEND_STATISTICS("SEND_STATISTICS", 60);

        private String jobName;
        private int intervalInMinutes;

        JobName(String jobName, int intervalInMinutes) {
            this.jobName = jobName;
            this.intervalInMinutes = intervalInMinutes;
        }

        public String getJobName() {
            return jobName;
        }

        public int getIntervalInMinutes() {
            return intervalInMinutes;
        }
    }

    @Id
    @Column(name = "JOB_NAME", nullable = false)
    private String jobName;

    @Column(name = "STATUS", nullable = false)
    private String status;

    @Temporal(TemporalType.TIMESTAMP)
    @Column(name = "CREATED_DATE")
    private Date createdDate;
    
    @Temporal(TemporalType.TIMESTAMP)
    @Column(name = "LAST_RUN")
    private Date lastRun;

    @PrePersist
    protected void onCreate() {
        createdDate = new Date();
    }

    public JobScheduler() {
    }

    public void setJobName(String jobName) {
        this.jobName = jobName;
    }

    public String getJobName() {
        return jobName;
    }

    public void setStatus(String status) {
        this.status = status;
    }

    public String getStatus() {
        return status;
    }

    public Date getCreatedDate() {
        return createdDate;
    }
    
    public void setLastRun(Date lastRun) {
        this.lastRun = lastRun;
    }

    public Date getLastRun() {
        return lastRun;
    }

}

And here is updated logic:

import javax.annotation.Resource;

import javax.ejb.EJB;
import javax.ejb.SessionContext;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;

import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
import javax.persistence.Query;

@Stateless
public class JobSchedulerSessionEJBBean implements JobSchedulerSessionEJBLocal {

    @Resource
    SessionContext sessionContext;

    @PersistenceContext(unitName = "DB")
    private EntityManager em;

    public JobSchedulerSessionEJBBean() {
    }

    @Override
    @TransactionAttribute(TransactionAttributeType.REQUIRES_NEW)
    public boolean runJob(JobScheduler.JobName jobName) {
        Query query =
            em.createNativeQuery("update job_scheduler set status = :newStatus , last_run = trunc(sysdate, 'MI') where job_name = :jobName and status = :status and (last_run + (1/1440 * :intervalInMinutes)) <= trunc(sysdate, 'MI')");
        query.setParameter("newStatus", JobScheduler.Status.RUNNING.getValue());
        query.setParameter("jobName", jobName.getJobName());
        query.setParameter("status", JobScheduler.Status.IDLE.getValue());
        query.setParameter("intervalInMinutes", jobName.getIntervalInMinutes());
        int howManyUpdatedRows = query.executeUpdate();

        return (howManyUpdatedRows == 1);
    }

    @Override
    @TransactionAttribute(TransactionAttributeType.REQUIRES_NEW)
    public boolean releaseJob(JobScheduler.JobName jobName) {
        Query query = em.createNamedQuery("JobScheduler.setJob");
        query.setParameter("newStatus", JobScheduler.Status.IDLE.getValue());
        query.setParameter("jobName", jobName.getJobName());
        query.setParameter("status", JobScheduler.Status.RUNNING.getValue());
        int howManyUpdatedRows = query.executeUpdate();

        return (howManyUpdatedRows == 1);
    }

}

You can see that now we’re using new query in runJob method. We check if job should run again and if it should, we fill the LAST_RUN column with the truncated database time (only minutes are recorded). If we didn’t truncate seconds or milliseconds, it might not work properly. Why?

  1. Server executes timer for method sendStatistics at 18:00:00.000.
  2. Query is executed at 18:00:00.500 and this time is saved to LAST_RUN.
  3. After one-hour server executes timer for sendStatistics again at 19:00:00.000.
  4. CPU of server may be a little bit less busy than during previous timeout so runJob method has more processor time and query is executed faster, at 19:00:00.400.
  5. Now the query failed because interval is exactly 60 minutes, not 59 minutes 59 seconds a 900 milliseconds.

This is a reason why we must truncate time.

If your servers are timed precisely, one server will set STATUS to RUNNING, and other servers will find job in this status and give up. But if your job has high priority I strongly recommend to implement LAST_RUN and always check it against current time. You should not depend on assumption that your servers are timed to milliseconds.

You can see that both of the methods has an annotation that starts these methods in new transaction. Why? Server obtains the lock by updating value from IDLE to RUNNING. Database will lock the field so no one can access it and rewrite it as long as transaction of this server is in progress. So other servers would have waiting for executing of theirs queries. And because transaction of the server who obtained the lock ends after server executes its logic and calls releaseJob method, the status would be IDLE again and other servers would be seeing job in this status and execute it again. Yes, this is already solved by LAST_RUN column, but it’s unnecessary to hold the transaction longer than necessary. So with this annotation other servers can check status right after runJob method finished. If you choose solution without LAST_RUN column, this annotation is absolutely necessary and without it synchronization won’t work.

Finally, example of usage:

    @Inject
    private JobSchedulerSessionEJBLocal jobSchedulerEJB;
    
    @Schedule(hour="*", minute = "00")
    public void sendStatistics() {
        try {
            if (jobSchedulerEJB.runJob(JobScheduler.JobName.SEND_STATISTICS)) {
                // logic goes here
            }
        } finally {
            jobSchedulerEJB.releaseJob(JobScheduler.JobName.SEND_STATISTICS);
        }
    }

So now you know how to synchronize servers in a cluster. This solution is for simple CRON expressions and your @Schedule annotation must match the value in intervalInMinutes attribute. If you need more complicated CRON expressions or features like load balancing or synchronization without database, definitely use some third party solution like Quartz Scheduler. Don’t reinvent the wheel.

For simple use cases our solution is easy and fast to implement.

Happy clustering.

Leave a Reply

Your email address will not be published. Required fields are marked *