DistributedJobManager will schedule tasks and balance workload in the cluster.
DistributedJobManager (DJM) stores its state in ZooKeeper
DJM lives within JVM application and maintain its own thread pool.
DJM regularly launches user defined jobs in separate threads based on schedule.
User defined Job is a class that implements DistributedJob
DJM will restart Job in case of any Job failure.
DJM balance workload between Jobs
DJM consists of two parts: Manager and Worker.
Each DJM instance has active Worker.
But only one DJM instance has active Manager.
Manager orchestrate Job assignment between Workers in the cluster.
User defined Job should implement DistributedJob
interface and provide information about work-items.
Work-item is a smallest indivisible peace of work.
Job WorkPool is a list of work-items that DJM will split between active Workers.
class MyJob implements DistributedJob{ //... WorkPool getWorkPool(){ return WorkPool.of(new HashSet<>(Arrays.asList("workItem1", "workItem2"))); } }
Job informs DJM about WorkPool.
DJM split work-items from WorkPool among all Workers according to assignment strategy.
If Job define WorkPool with single work-item then it means that such job will be launched by DJM only within single application in the cluster.
When DJM launches Job it passes information about work-items that Job should process.
class MyJob implements DistributedJob{ //... void run(DistributedJobContext context) { //... Set<String> workShare = context.getWorkShare(); for (String workItem : workShare) { //process workItem } }
Assignment Strategies
Number of active worker changes over time due to server reboot or crash.
Jobs change number and composition of their work items.
This events trigger DJM reassignment process.
During reassignment DJM Manager uses an Assignment Strategy to decide to which worker particular work item will be assigned.
DJM provides several implementations for the most common cases of work item distribution among workers.
You can implement custom AssigmentStrategy for your case and register it within DJM.
Intended for evenly distribution work items for each available job on all available workers. It sets up all the work item from the previous distribution and then makes balancing by moving the work item from the more loaded to the less loaded -
It minimize job reassignment within living workers when new worker added or one of workers removed using Rendezvous hash -
Modification of RendezvousHashAssignmentStrategy. It gets the worker with the maximum hash from the list of workers who are not yet filled
Getting started
Launch DJM
DJM requires list of ZooKeeper hosts and path within ZooKeeper that will be used to store DJM state.
DJM will use provided nodeId to identify application instance within cluster.
//During application startup DistributedJobManager djm = new DistributedJobManager( "zooKeeperHost1,zooKeeperHost2,zooKeeperHost3" "/zooKeeperPath/for/djm", "applicaiotId#3", Arrays.asList(new MyJob1(), new MyJob2())); //During applicatoin shutdown djm.close();
Monitoring and metrics
DJM expose Jobs state through profiler metrics. Every DJM metric starts with "djm_" prefix and contains tag djmNodeId, which is equal to "nodeId" arg passed to DJM’s constructor.
profiles DJM’s initialization -
profiles DJM’s shutdown -
profiles every run of every job -
metrics of Manager’s ProfiledThreadPoolExecutor for performing rebalance -
- metrics of Manager’s ReschedulableScheduler for performing cleaning ZK -
- metrics of Worker’s ProfiledThreadPoolExecutor for stopping/starting jobs and updating ZK according to updates from manager -
- metrics of Worker’s ReschedulableScheduler for performing periodic checks for changes in total work-pool of each job -
- metrics of Worker’s ReschedulableScheduler for running jobs
Reassignment is a process when a Master changes Job and work items assignment between application instances in the cluster.
Reassignment is triggered by:
One of nodes shutdown or restart
Network disconnect
Job WorkPool change
Rebalance steps:
Manager reads from zookeeper availability state - list of jobs, where they can be launched and list of each job work items.
Manager reads from zookeeper assignment state - on which nodes jobs and work-items are launched or scheduled right now.
Manager calculate new assignment state
Manager writes new assignment state to zookeeper
Workers receive notification from zookeeper about assignment state update
Workers stop old Jobs and launch new Jobs according to the new assignment state.
Main goal of rebalance process is to minimize unnecessary job restarts and reassignments.
DJM keeps cluster state as a tree of zookeeper nodes.
Once created, DJM initializes zookeeper paths, if needed:
└ alive
└ locks
└ leader-latch
└ workers
└ work-pool
└ work-pool-version
└ worker-assignment-version
└ worker-version
StateWorker life-cycle:
Every time worker (re)connected to cluster, it process few step in single transaction, using work-pool-version
and worker-version
(re)creates ephemeral node in
subtree, to inform manager about (re)connecting
└ alive
└ worker_1
(re)creates its subtree with empty
subtree (this needsworker-version
└ workers
└ worker_1
└ available
└ async.report.building.job
└ elasticsearch.upload.job
└ assigned
registers (or updates) its jobs in common
subtree (this needswork-pool-version
└ work-pool
└ async.report.building.job
└ workItemA
└ workItemB
└ elasticsearch.upload.job
└ workItemC
Worker listens for updates in workers/worker/assigned
subtree and stops/launches jobs accordingly.
Worker also periodically updates its jobs in work-pool
subtree using work-pool-version
Manager life-cycle:
Manager listens for updates in alive
and work-pool
subtrees and invokes rebalance accordingly.
During rebalance manager updates workers assigned
subtree using worker-assignment-version
Manager also periodically removes not relevant (which isn’t in any workers/*/available
subtree) jobs from work-pool
subtree using work-pool-version
lock (clearing process)
Full ZK tree you can see below:
└ alive //alive workers that can run jobs
└ 20 //worker with id 20
└ 3 //worker with id 3
└ locks //locks guard work-items: ony one Job can access work-item in the same time
└ async.report.building.job //job id
└ workItemA.lock //list of job locks so only one node could run job with same work-item
└ elasticsearch.upload.job
└ workItemC.lock
└ leader-latch // used for Manager election
└ ...
└ workers //list of workers
└ 20
└ 3
└ available //list of jobs that worker with id `3` can run
└ async.report.building.job
└ elasticsearch.upload.job
└ assigned //List of assigned jobs to worker '3'
└ async.report.building.job
└ workItemA //report.building job with `workItemA` and `workItemB` is assigned to worker `3`
└ workItemB
└ elasticsearch.upload.job
└ workItemC
└ work-pool //common work-pool for all jobs
└ async.report.building.job
└ workItemA //Work pool of report.building Job
└ workItemB
└ elasticsearch.upload.job
└ workItemC
└ work-pool-version //part of transaction, allows atomicaly update available work-pool
└ worker-assignment-version //part of transaction, allows atomicaly assign jobs
└ worker-version //part of transaction, allows atomicaly register workier
In case of network failure one one can be temporary detached from other part of the cluster.
In order to improve stability of the cluster and tolerate short connectivity problems DJM allows detached Worker to continue it’s work for configured amount of time.
To enable that DJM uses persistent locks and keep information when which worker started to process particular job in ZooKeeper.
Other Workers will not be able to process same work-items during this lock timeout even if owner of lock gone offline.
Source guidebook
Implementation details
Generate documentation
Documentation source located at
Compiled documentation stored at
Compiled documentation stored in git and served as a static content. To compile documentation run
gradle asciidoctor