DistributedJobManager will schedule tasks and balance workload in the cluster.
distributed job manager

Concept

DistributedJobManager (DJM) stores its state in ZooKeeper
DJM lives within JVM application and maintain its own thread pool.
DJM regularly launches user defined jobs in separate threads based on schedule.
User defined Job is a class that implements DistributedJob interface.
DJM will restart Job in case of any Job failure.
DJM balance workload between Jobs

DJM consists of two parts: Manager and Worker. Each DJM instance has active Worker. But only one DJM instance has active Manager.
Manager orchestrate Job assignment between Workers in the cluster.

djm zk

User defined Job should implement DistributedJob interface and provide information about work-items.
Work-item is a smallest indivisible peace of work.
Job WorkPool is a list of work-items that DJM will split between active Workers.

class MyJob implements DistributedJob{
    //...
    WorkPool getWorkPool(){
        return WorkPool.of(new HashSet<>(Arrays.asList("workItem1", "workItem2")));
    }
}

Job informs DJM about WorkPool.
DJM split work-items from WorkPool among all Workers according to assignment strategy.
If Job define WorkPool with single work-item then it means that such job will be launched by DJM only within single application in the cluster.

When DJM launches Job it passes information about work-items that Job should process.

class MyJob implements DistributedJob{
    //...
void run(DistributedJobContext context) {
    //...
    Set<String> workShare = context.getWorkShare();
    for (String workItem : workShare) {
        //process workItem
    }
}

Assignment Strategies

Number of active worker changes over time due to server reboot or crash. Jobs change number and composition of their work items. This events trigger DJM reassignment process. During reassignment DJM Manager uses an Assignment Strategy to decide to which worker particular work item will be assigned.
DJM provides several implementations for the most common cases of work item distribution among workers. You can implement custom AssigmentStrategy for your case and register it within DJM.

  • EvenlySpreadAssignmentStrategy
    Intended for evenly distribution work items for each available job on all available workers. It sets up all the work item from the previous distribution and then makes balancing by moving the work item from the more loaded to the less loaded

  • RendezvousHashAssignmentStrategy
    It minimize job reassignment within living workers when new worker added or one of workers removed using Rendezvous hash

  • EvenlyRendezvousAssignmentStrategy
    Modification of RendezvousHashAssignmentStrategy. It gets the worker with the maximum hash from the list of workers who are not yet filled

djm

Getting started

Add dependency into your project.

Java dependencies
  • distributed-job-manager distributed job manager

Launch DJM

DJM requires list of ZooKeeper hosts and path within ZooKeeper that will be used to store DJM state.
DJM will use provided nodeId to identify application instance within cluster.

//During application startup
DistributedJobManager djm = new DistributedJobManager(
    "zooKeeperHost1,zooKeeperHost2,zooKeeperHost3"
    "/zooKeeperPath/for/djm",
    "applicaiotId#3",
    Arrays.asList(new MyJob1(), new MyJob2()));

//During applicatoin shutdown
djm.close();

Monitoring and metrics

DJM expose Jobs state through profiler metrics. Every DJM metric starts with "djm_" prefix and contains tag djmNodeId, which is equal to "nodeId" arg passed to DJM’s constructor.

  • djm_init profiles DJM’s initialization

  • djm_close profiles DJM’s shutdown

  • djm_job_{jobId} profiles every run of every job

  • djm_pool_rebalance_* metrics of Manager’s ProfiledThreadPoolExecutor for performing rebalance

  • djm_scheduled_pool_cleaning_task_* and djm_pool_cleaning_task_* - metrics of Manager’s ReschedulableScheduler for performing cleaning ZK

  • djm_pool_update_assignment_* - metrics of Worker’s ProfiledThreadPoolExecutor for stopping/starting jobs and updating ZK according to updates from manager

  • djm_scheduled_pool_check_work_pool_* and djm_pool_check_work_pool_* - metrics of Worker’s ReschedulableScheduler for performing periodic checks for changes in total work-pool of each job

  • djm_scheduled_pool_job_scheduler_* and djm_pool_job_scheduler_* - metrics of Worker’s ReschedulableScheduler for running jobs

Reassignment

Reassignment is a process when a Master changes Job and work items assignment between application instances in the cluster.

Reassignment is triggered by:

  • One of nodes shutdown or restart

  • Network disconnect

  • Job WorkPool change

Rebalance steps:

djm reassignment
  • Manager reads from zookeeper availability state - list of jobs, where they can be launched and list of each job work items.

  • Manager reads from zookeeper assignment state - on which nodes jobs and work-items are launched or scheduled right now.

  • Manager calculate new assignment state

  • Manager writes new assignment state to zookeeper

  • Workers receive notification from zookeeper about assignment state update

  • Workers stop old Jobs and launch new Jobs according to the new assignment state.

Main goal of rebalance process is to minimize unnecessary job restarts and reassignments.

State

DJM keeps cluster state as a tree of zookeeper nodes.
Once created, DJM initializes zookeeper paths, if needed:

job-manager
  └ alive
  └ locks
  └ leader-latch
  └ workers
  └ work-pool
  └ work-pool-version
  └ worker-assignment-version
  └ worker-version

StateWorker life-cycle:

Every time worker (re)connected to cluster, it process few step in single transaction, using work-pool-version and worker-version locks:

  • (re)creates ephemeral node in alive subtree, to inform manager about (re)connecting

  └ alive
    └ worker_1
  • (re)creates its subtree with empty assigned subtree (this needs worker-version lock)

  └ workers
    └ worker_1
      └ available
        └ async.report.building.job
        └ elasticsearch.upload.job
      └ assigned
  • registers (or updates) its jobs in common work-pool subtree (this needs work-pool-version lock)

  └ work-pool
    └ async.report.building.job
      └ workItemA
      └ workItemB
    └ elasticsearch.upload.job
      └ workItemC

Worker listens for updates in workers/worker/assigned subtree and stops/launches jobs accordingly.
Worker also periodically updates its jobs in work-pool subtree using work-pool-version lock

Manager life-cycle:

Manager listens for updates in alive and work-pool subtrees and invokes rebalance accordingly. During rebalance manager updates workers assigned subtree using worker-assignment-version lock

Manager also periodically removes not relevant (which isn’t in any workers/*/available subtree) jobs from work-pool subtree using work-pool-version lock (clearing process)

Full ZK tree you can see below:

distributed-job-manager
  └ alive //alive workers that can run jobs
    └ 20 //worker with id 20
    └ 3  //worker with id 3
    ...
  └ locks //locks guard work-items: ony one Job can access work-item in the same time
    └ async.report.building.job //job id
      └ workItemA.lock  //list of job locks so only one node could run job with same work-item
    └ elasticsearch.upload.job
      └ workItemC.lock
      ...
  └ leader-latch // used for Manager election
    └ ...
  └ workers //list of workers
    └ 20
     ...
    └ 3
      └ available //list of jobs that worker with id `3` can run
        └ async.report.building.job
        └ elasticsearch.upload.job
         ...
      └ assigned  //List of assigned jobs to worker '3'
        └ async.report.building.job
          └ workItemA  //report.building job with `workItemA` and `workItemB` is assigned to worker `3`
          └ workItemB
        └ elasticsearch.upload.job
          └ workItemC
         ...
  └ work-pool //common work-pool for all jobs
    └ async.report.building.job
      └ workItemA //Work pool of report.building Job
      └ workItemB
    └ elasticsearch.upload.job
      └ workItemC
       ...
  └ work-pool-version //part of transaction, allows atomicaly update available work-pool
  └ worker-assignment-version //part of transaction, allows atomicaly assign jobs
  └ worker-version //part of transaction, allows atomicaly register workier

Guarantees

In case of network failure one one can be temporary detached from other part of the cluster.
In order to improve stability of the cluster and tolerate short connectivity problems DJM allows detached Worker to continue it’s work for configured amount of time.
To enable that DJM uses persistent locks and keep information when which worker started to process particular job in ZooKeeper.
Other Workers will not be able to process same work-items during this lock timeout even if owner of lock gone offline.

djm zk disconnect

Source guidebook

Implementation details

Generate documentation

Documentation source located at

asciidoc/*

Compiled documentation stored at

docs/*

Compiled documentation stored in git and served as a static content. To compile documentation run

gradle asciidoctor