cluster

当我们去了解Cluster的时候,我们会在node的官方文档的一开始就会看到一个简单的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 这个例子是根据cpu的核数来生成对应的子进程数量,
// 然后每个子进程都运行一个绑定到8000端口的http server
const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;

if (cluster.isMaster) {
console.log(`Master ${process.pid} is running`);

// Fork workers.
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}

cluster.on('exit', (worker, code, signal) => {
console.log(`worker ${worker.process.pid} died`);
});
} else {
// Workers can share any TCP connection
// In this case it is an HTTP server
http.createServer((req, res) => {
res.writeHead(200);
res.end('hello world\n');
}).listen(8000);

console.log(`Worker ${process.pid} started`);
}

我们可以看到创建子进程的方式就是很简单的一句cluster.fork(),接下来我们就来看一下这个函数做了什么样的事情。

首先我们打印了一下这个函数,可以看到函数如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
function (env) {
cluster.setupMaster();
const id = ++ids;
const workerProcess = createWorkerProcess(id, env);
const worker = new Worker({
id: id,
process: workerProcess
});

worker.on('message', function(message, handle) {
cluster.emit('message', this, message, handle);
});

worker.process.once('exit', function(exitCode, signalCode) {
/*
* Remove the worker from the workers list only
* if it has disconnected, otherwise we might
* still want to access it.
*/
if (!worker.isConnected()) {
removeHandlesForWorker(worker);
removeWorker(worker);
}

worker.exitedAfterDisconnect = !!worker.exitedAfterDisconnect;
worker.state = 'dead';
worker.emit('exit', exitCode, signalCode);
cluster.emit('exit', worker, exitCode, signalCode);
});

worker.process.once('disconnect', function() {
/*
* Now is a good time to remove the handles
* associated with this worker because it is
* not connected to the master anymore.
*/
removeHandlesForWorker(worker);

/*
* Remove the worker from the workers list only
* if its process has exited. Otherwise, we might
* still want to access it.
*/
if (worker.isDead()) removeWorker(worker);

worker.exitedAfterDisconnect = !!worker.exitedAfterDisconnect;
worker.state = 'disconnected';
worker.emit('disconnect');
cluster.emit('disconnect', worker);
});

worker.process.on('internalMessage', internal(worker, onmessage));
process.nextTick(emitForkNT, worker);
cluster.workers[worker.id] = worker;
return worker;
}

我们看到一开始就调用了一个函数cluster.setupMaster()

我们在官方文档中可以查到这个函数, 可以看到这个函数的第一个参数exec默认为process.argv[1],在主线程中打印process.argv[1]可以看到process.argv[1]指向了入口文件,所以cluster.fork()的时候,是重新运行了入口文件。这也对应了文档中的下面这句话。

Unlike the fork(2) POSIX system call, child_process.fork() does not clone the current process.

同样我们通过查看源码可以了解到下面这行代码,是根据当前的环境变量和cluster配置,传递给child_process.fork()workerProcess就是fork后的返回值,通过查看文档,我们知道child_process.fork()返回了一个<ChildProcess>,关于这里面的细节,可以看child_process.fork()的细节

1
const workerProcess = createWorkerProcess(id, env);

再下面,Worker对象对ChildProcess包裹了一层,提供了一些API,具体可以看文档

其他的就是一个信息的传输