Master Worker Pattern
Calculate matrix multiply with Master Worker Pattern.
About Master Worker
Solution 1:
If one worker finishes his own task, he will take the next new task without any rest, until master’s taskQueue is empty(all tasks are done).
Point.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
package master_worker; public class Point { private int row; private int col; public Point(int row, int col) { this.row = row; this.col = col; } public int getRow() { return row; } public void setRow(int row) { this.row = row; } public int getCol() { return col; } public void setCol(int col) { this.col = col; } @Override public String toString() { return "Point{" + "row=" + row + ", col=" + col + '}'; } } |
Task.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
package master_worker; import java.util.Arrays; public class Task { private Point id; private int dimension; private int[] rowA; private int[] colB; public Point getId() { return id; } public void setId(Point id) { this.id = id; } public int getDimension() { return this.dimension; } // rowA.length equal to colB.length public void setCondition(int[] rowA, int[] colB) { this.rowA = rowA; this.colB = colB; } public void calculateDimension() { this.dimension = 0; for (int i = 0, j = rowA.length; i < j; i++) { this.dimension += this.rowA[i] * colB[i]; } } @Override public String toString() { return "Task{" + "id=" + id.toString() + ", dimension=" + dimension + ", rowA=" + Arrays.toString(rowA) + ", colB=" + Arrays.toString(colB) + '}'; } } |
Master.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
package master_worker; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; public class Master { //Task queue to store all tasks private ConcurrentLinkedQueue<Task> taskQueue = new ConcurrentLinkedQueue<>(); //A container to store all workers private HashMap<String, Thread> workers = new HashMap<>(); //A container to store the working result from workers private ConcurrentHashMap<Point, Object> resultMap = new ConcurrentHashMap<>(); private int workerCount; private int[][] matrixResult; private int lastTaskQueueCount; //workerCount: worker thread count Master(int workerCount, int[][] matrixResult) { this.workerCount = workerCount; this.matrixResult = matrixResult; for (int i = 0; i < this.workerCount; i++) { //workers share the calculating result together Worker worker = new Worker(); worker.setId(Integer.toString(i)); worker.setTaskQueue(this.taskQueue); worker.setResultMap(this.resultMap); this.workers.put(Integer.toString(i), new Thread(worker)); } } public int getTaskQueueCount(){ return lastTaskQueueCount; } public int getWorkerCount(){ return workers.size(); } public void submit(Task task) { this.taskQueue.add(task); } //let all workers begin to work public void execute() { lastTaskQueueCount = taskQueue.size(); for (Map.Entry<String, Thread> me : workers.entrySet()) { me.getValue().start(); } } //Method to check whether all tasks have been finished public boolean isComplete() { for (Map.Entry<String, Thread> me : workers.entrySet()) { if (me.getValue().getState() != Thread.State.TERMINATED) { return false; } } return true; } //calculation result public int[][] getResult() { for (Map.Entry<Point, Object> me : resultMap.entrySet()) { Point key = me.getKey(); this.matrixResult[key.getRow()][key.getCol()] = (Integer) me.getValue(); } return this.matrixResult; } } |
Worker.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
package master_worker; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; public class Worker implements Runnable { private String id; private ConcurrentLinkedQueue<Task> taskQueue; private ConcurrentHashMap<Point, Object> resultMap; public void setTaskQueue(ConcurrentLinkedQueue<Task> workQueue) { this.taskQueue = workQueue; } public void setResultMap(ConcurrentHashMap<Point, Object> resultMap) { this.resultMap = resultMap; } public String getId() { return id; } public void setId(String id) { this.id = id; } @Override public void run() { System.out.println("=============worker "+this.getId()+" begins to calculate matrix unit===================="); // If one worker finishes his own task, he will take next new task until master's taskQueue is empty while (true) { Task taskInput = this.taskQueue.poll(); if (taskInput == null) { break; } Object output = handle(taskInput); this.resultMap.put(taskInput.getId(), output); } } // Calculate one dimession in new matrix private Object handle(Task taskInput) { Object outputDimension = null; try { taskInput.calculateDimension(); Thread.sleep(500); //simulate complex process here outputDimension = taskInput.getDimension(); System.out.println("Worker " + this.getId() +" works for "+ taskInput.toString()); } catch (Exception e) { e.printStackTrace(); } return outputDimension; } } |
Main.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
package master_worker; /** * Master-Worker Pattern * */ public class Main { static int[][] matrixA = { {1, -2, 3, 4, -1}, {-2, 3, 0, 1, 2}, {4, -1, 2, 1, -2}, {-2, 1, 3, -1, 3}, {0, 2, -1, 2, 4} }; static int[][] matrixB = { {2, -4, -1, 1, -2}, {-1, 1, -2, 2, 1}, {5, 0, 3, -2, -4}, {1, -2, 1, 0, 2}, {2, 3, -3, 0, 0} }; static int row = matrixA.length; static int col = matrixB[0].length; //dirty static int[][] matrixResult = new int[row][col]; static int workerCount = 5; //Here you can change the workers' number // Get one row array from matrix private static int[] getRowData(int[][] matrix, int row) { return matrix[row]; } // get one colum array from matrix private static int[] getColData(int[][] matrix, int col) { int m = matrix.length; int[] colData = new int[m]; for (int i = 0; i < m; i++) { colData[i] = matrix[i][col]; } return colData; } private static String printMatrix(int[][] matrix) { System.out.println("{"); for (int i = 0; i < row; i++) { System.out.print("[ "); for (int j = 0; j < col; j++) { System.out.print(matrix[i][j] + " "); } System.out.println("]"); } System.out.println("}"); return ""; } public static void main(String[] args) { Master master = new Master(workerCount, matrixResult); for (int i = 0; i < row; i++) { for (int j = 0; j < col; j++) { Task task = new Task(); task.setId(new Point(i, j)); task.setCondition(getRowData(matrixA, i), getColData(matrixB, j)); master.submit(task); System.out.println(task.toString()); } } System.out.println("=============Tasks have been arranged===================="); master.execute(); long start = System.currentTimeMillis(); while (true) { if (master.isComplete()) { long end = System.currentTimeMillis() - start; matrixResult = master.getResult(); System.out.println("=============================="); System.out.println("Calculating Task amount:" + master.getTaskQueueCount()); System.out.println("worker amount:" + master.getWorkerCount()); System.out.println("Final result:"); System.out.println(printMatrix(matrixResult)); System.out.println("Executed time:" + end); break; } } } } |
Solution 2:
Each worker will get 1-N (probably is tasks/workers) tasks, given by the master. If the worker finishes his own tasks, he will stop working and waits for others until master’s taskQueue is empty.
For example, there is 25 tasks and 10 workers. Worker 1-5 will receive 3 tasks, and worker 6-10 will get 2 tasks.
In Master.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
Master(int workerCount, int[][] matrixResult) { this.workerCount = workerCount; this.matrixResult = matrixResult; for (int i = 0; i < this.workerCount; i++) { ConcurrentLinkedQueue<Task> workerOwnTaskQueue = new ConcurrentLinkedQueue<>(); //here you can assign personal tasks to each worker Worker worker = new Worker(); worker.setId(Integer.toString(i)); worker.setTaskQueue(workerOwnTaskQueue); worker.setResultMap(this.resultMap); this.workers.put(Integer.toString(i), new Thread(worker)); } } |
[Reference] Master-Worker Pattern