forked from microsoft/rushstack
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathTaskRunner.ts
More file actions
379 lines (338 loc) · 13.4 KB
/
TaskRunner.ts
File metadata and controls
379 lines (338 loc) · 13.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
// Copyright (c) Microsoft Corporation. All rights reserved. Licensed under the MIT license.
// See LICENSE in the project root for license information.
import * as os from 'os';
import { Interleaver } from '@rushstack/stream-collator';
import {
Terminal,
ConsoleTerminalProvider,
Colors,
IColorableSequence
} from '@rushstack/node-core-library';
import { Stopwatch } from '../../utilities/Stopwatch';
import { ITask } from './ITask';
import { TaskStatus } from './TaskStatus';
import { TaskError } from './TaskError';
import { AlreadyReportedError } from '../../utilities/AlreadyReportedError';
export interface ITaskRunnerOptions {
quietMode: boolean;
parallelism: string | undefined;
changedProjectsOnly: boolean;
allowWarningsInSuccessfulBuild: boolean;
terminal?: Terminal;
}
/**
* A class which manages the execution of a set of tasks with interdependencies.
* Initially, and at the end of each task execution, all unblocked tasks
* are added to a ready queue which is then executed. This is done continually until all
* tasks are complete, or prematurely fails if any of the tasks fail.
*/
export class TaskRunner {
private _tasks: ITask[];
private _changedProjectsOnly: boolean;
private _allowWarningsInSuccessfulBuild: boolean;
private _buildQueue: ITask[];
private _quietMode: boolean;
private _hasAnyFailures: boolean;
private _hasAnyWarnings: boolean;
private _parallelism: number;
private _currentActiveTasks: number;
private _totalTasks: number;
private _completedTasks: number;
private _terminal: Terminal;
public constructor(orderedTasks: ITask[], options: ITaskRunnerOptions) {
const {
quietMode,
parallelism,
changedProjectsOnly,
allowWarningsInSuccessfulBuild,
terminal = new Terminal(new ConsoleTerminalProvider())
} = options;
this._tasks = orderedTasks;
this._buildQueue = orderedTasks.slice(0);
this._quietMode = quietMode;
this._hasAnyFailures = false;
this._hasAnyWarnings = false;
this._changedProjectsOnly = changedProjectsOnly;
this._allowWarningsInSuccessfulBuild = allowWarningsInSuccessfulBuild;
this._terminal = terminal;
const numberOfCores: number = os.cpus().length;
if (parallelism) {
if (parallelism === 'max') {
this._parallelism = numberOfCores;
} else {
const parallelismInt: number = parseInt(parallelism, 10);
if (isNaN(parallelismInt)) {
throw new Error(`Invalid parallelism value of '${parallelism}', expected a number or 'max'`);
}
this._parallelism = parallelismInt;
}
} else {
// If an explicit parallelism number wasn't provided, then choose a sensible
// default.
if (os.platform() === 'win32') {
// On desktop Windows, some people have complained that their system becomes
// sluggish if Rush is using all the CPU cores. Leave one thread for
// other operations. For CI environments, you can use the "max" argument to use all available cores.
this._parallelism = Math.max(numberOfCores - 1, 1);
} else {
// Unix-like operating systems have more balanced scheduling, so default
// to the number of CPU cores
this._parallelism = numberOfCores;
}
}
}
/**
* Executes all tasks which have been registered, returning a promise which is resolved when all the
* tasks are completed successfully, or rejects when any task fails.
*/
public execute(): Promise<void> {
this._currentActiveTasks = 0;
this._completedTasks = 0;
this._totalTasks = this._buildQueue.length;
this._terminal.writeLine(`Executing a maximum of ${this._parallelism} simultaneous processes...${os.EOL}`);
return this._startAvailableTasks().then(() => {
this._printTaskStatus();
if (this._hasAnyFailures) {
return Promise.reject(new Error('Project(s) failed'));
} else if (this._hasAnyWarnings && !this._allowWarningsInSuccessfulBuild) {
this._terminal.writeWarningLine('Project(s) succeeded with warnings');
return Promise.reject(new AlreadyReportedError());
} else {
return Promise.resolve();
}
});
}
/**
* Pulls the next task with no dependencies off the build queue
* Removes any non-ready tasks from the build queue (this should only be blocked tasks)
*/
private _getNextTask(): ITask | undefined {
for (let i: number = 0; i < this._buildQueue.length; i++) {
const task: ITask = this._buildQueue[i];
if (task.status !== TaskStatus.Ready) {
// It shouldn't be on the queue, remove it
this._buildQueue.splice(i, 1);
// Decrement since we modified the array
i--;
} else if (task.dependencies.size === 0 && task.status === TaskStatus.Ready) {
// this is a task which is ready to go. remove it and return it
return this._buildQueue.splice(i, 1)[0];
}
// Otherwise task is still waiting
}
return undefined; // There are no tasks ready to go at this time
}
/**
* Helper function which finds any tasks which are available to run and begins executing them.
* It calls the complete callback when all tasks are completed, or rejects if any task fails.
*/
private _startAvailableTasks(): Promise<void> {
const taskPromises: Promise<void>[] = [];
let ctask: ITask | undefined;
while (this._currentActiveTasks < this._parallelism && (ctask = this._getNextTask())) {
this._currentActiveTasks++;
const task: ITask = ctask;
task.status = TaskStatus.Executing;
this._terminal.writeLine(Colors.white(`[${task.name}] started`));
task.stopwatch = Stopwatch.start();
task.writer = Interleaver.registerTask(task.name, this._quietMode);
taskPromises.push(task.execute(task.writer)
.then((result: TaskStatus) => {
task.stopwatch.stop();
task.writer.close();
this._currentActiveTasks--;
this._completedTasks++;
switch (result) {
case TaskStatus.Success:
this._markTaskAsSuccess(task);
break;
case TaskStatus.SuccessWithWarning:
this._hasAnyWarnings = true;
this._markTaskAsSuccessWithWarning(task);
break;
case TaskStatus.Skipped:
this._markTaskAsSkipped(task);
break;
case TaskStatus.Failure:
this._hasAnyFailures = true;
this._markTaskAsFailed(task);
break;
}
}).catch((error: TaskError) => {
task.writer.close();
this._currentActiveTasks--;
this._hasAnyFailures = true;
task.error = error;
this._markTaskAsFailed(task);
}
).then(() => this._startAvailableTasks()));
}
return Promise.all(taskPromises).then(() => { /* collapse void[] to void */ });
}
/**
* Marks a task as having failed and marks each of its dependents as blocked
*/
private _markTaskAsFailed(task: ITask): void {
this._terminal.writeErrorLine(`${os.EOL}${this._getCurrentCompletedTaskString()}[${task.name}] failed!`);
task.status = TaskStatus.Failure;
task.dependents.forEach((dependent: ITask) => {
this._markTaskAsBlocked(dependent, task);
});
}
/**
* Marks a task and all its dependents as blocked
*/
private _markTaskAsBlocked(task: ITask, failedTask: ITask): void {
if (task.status === TaskStatus.Ready) {
this._completedTasks++;
this._terminal.writeErrorLine(`${this._getCurrentCompletedTaskString()}`
+ `[${task.name}] blocked by [${failedTask.name}]!`);
task.status = TaskStatus.Blocked;
task.dependents.forEach((dependent: ITask) => {
this._markTaskAsBlocked(dependent, failedTask);
});
}
}
/**
* Marks a task as being completed, and removes it from the dependencies list of all its dependents
*/
private _markTaskAsSuccess(task: ITask): void {
if (task.hadEmptyScript) {
this._terminal.writeLine(Colors.green(`${this._getCurrentCompletedTaskString()}`
+ `[${task.name}] had an empty script`));
} else {
this._terminal.writeLine(Colors.green(`${this._getCurrentCompletedTaskString()}`
+ `[${task.name}] completed successfully in ${task.stopwatch.toString()}`));
}
task.status = TaskStatus.Success;
task.dependents.forEach((dependent: ITask) => {
if (!this._changedProjectsOnly) {
dependent.isIncrementalBuildAllowed = false;
}
dependent.dependencies.delete(task);
});
}
/**
* Marks a task as being completed, but with warnings written to stderr, and removes it from the dependencies
* list of all its dependents
*/
private _markTaskAsSuccessWithWarning(task: ITask): void {
this._terminal.writeWarningLine(`${this._getCurrentCompletedTaskString()}`
+ `[${task.name}] completed with warnings in ${task.stopwatch.toString()}`);
task.status = TaskStatus.SuccessWithWarning;
task.dependents.forEach((dependent: ITask) => {
if (!this._changedProjectsOnly) {
dependent.isIncrementalBuildAllowed = false;
}
dependent.dependencies.delete(task);
});
}
/**
* Marks a task as skipped.
*/
private _markTaskAsSkipped(task: ITask): void {
this._terminal.writeLine(Colors.green(`${this._getCurrentCompletedTaskString()}[${task.name}] skipped`));
task.status = TaskStatus.Skipped;
task.dependents.forEach((dependent: ITask) => {
dependent.dependencies.delete(task);
});
}
private _getCurrentCompletedTaskString(): string {
return `${this._completedTasks} of ${this._totalTasks}: `;
}
/**
* Prints out a report of the status of each project
*/
private _printTaskStatus(): void {
const tasksByStatus: { [status: number]: ITask[] } = {};
this._tasks.forEach((task: ITask) => {
if (tasksByStatus[task.status]) {
tasksByStatus[task.status].push(task);
} else {
tasksByStatus[task.status] = [task];
}
});
this._terminal.writeLine('');
this._printStatus(TaskStatus.Executing, tasksByStatus, Colors.yellow);
this._printStatus(TaskStatus.Ready, tasksByStatus, Colors.white);
this._printStatus(TaskStatus.Skipped, tasksByStatus, Colors.gray);
this._printStatus(TaskStatus.Success, tasksByStatus, Colors.green);
this._printStatus(
TaskStatus.SuccessWithWarning,
tasksByStatus,
(text: string) => Colors.yellow(text),
(text: string) => Colors.yellow(Colors.underline(text))
);
this._printStatus(TaskStatus.Blocked, tasksByStatus, Colors.red);
this._printStatus(TaskStatus.Failure, tasksByStatus, Colors.red);
const tasksWithErrors: ITask[] = tasksByStatus[TaskStatus.Failure];
if (tasksWithErrors) {
tasksWithErrors.forEach((task: ITask) => {
if (task.error) {
this._terminal.writeErrorLine(`[${task.name}] ${task.error.message}`);
}
});
}
this._terminal.writeLine('');
}
private _printStatus(
status: TaskStatus,
tasksByStatus: { [status: number]: ITask[] },
color: (text: string) => IColorableSequence,
headingColor: (text: string) => IColorableSequence = color
): void {
const tasks: ITask[] = tasksByStatus[status];
if (tasks && tasks.length) {
this._terminal.writeLine(headingColor(`${status} (${tasks.length})`));
this._terminal.writeLine(color('================================'));
for (let i: number = 0; i < tasks.length; i++) {
const task: ITask = tasks[i];
switch (status) {
case TaskStatus.Executing:
case TaskStatus.Ready:
case TaskStatus.Skipped:
this._terminal.writeLine(color(task.name));
break;
case TaskStatus.Success:
case TaskStatus.SuccessWithWarning:
case TaskStatus.Blocked:
case TaskStatus.Failure:
if (task.stopwatch && !task.hadEmptyScript) {
const time: string = task.stopwatch.toString();
this._terminal.writeLine(headingColor(`${task.name} (${time})`));
} else {
this._terminal.writeLine(headingColor(`${task.name}`));
}
break;
}
if (task.writer) {
const stderr: string = task.writer.getStdError();
const shouldPrintDetails: boolean =
task.status === TaskStatus.Failure || task.status === TaskStatus.SuccessWithWarning;
let details: string = stderr ? stderr : task.writer.getStdOutput();
if (details && shouldPrintDetails) {
details = this._abridgeTaskReport(details);
this._terminal.writeLine(details + (i !== tasks.length - 1 ? os.EOL : ''));
}
}
}
this._terminal.writeLine(color('================================' + os.EOL));
}
}
/**
* Remove trailing blanks, and all middle lines if text is large
*/
private _abridgeTaskReport(text: string): string {
const headSize: number = 10;
const tailSize: number = 20;
const margin: number = 10;
const lines: string[] = text.split(/\s*\r?\n/).filter(line => line);
if (lines.length < headSize + tailSize + margin) {
return lines.join(os.EOL);
}
const amountRemoved: number = lines.length - headSize - tailSize;
const head: string = lines.splice(0, headSize).join(os.EOL);
const tail: string = lines.splice(-tailSize).join(os.EOL);
return `${head}${os.EOL}[...${amountRemoved} lines omitted...]${os.EOL}${tail}`;
}
}