《深入浅出node.js》读书笔记3-1异步编程

函数式编程

函数式编程是Javascript异步编程的基础。

高阶函数

通常的语言中,函数的参数只接受基本的数据类型或是对象引用,返回值也只是基本数据类型和对象引用。
高阶函数在Javascript中比比皆是,一些复杂业务逻辑的解耦,受益于高阶函数。

偏函数用法

偏函数: 指定部分参数来产生一个新的定制函数的形式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//例子
var toString = Object.prototype.toString();

var isString = function() {
return toString.call(obj) == '[object String]';
};

var isFunction = function() {
return toStrinf.call(obj) == '[object Function]';
}

//使用偏函数后
var isType = function(type) {
return function(obj) {
return Object.prototype.toString.call(obj) == '[object ' + type + ']';
}
}

var isString = isType('String');
var isFunction = isType('Function');

异步编程的优势与难点

优势

1、基于事件驱动的非阻塞I/O模型,在分布式和云服务上的高并行使得各个单点之间能够更有效的组织起来。
2、Node的V8引擎能够调用C/C++扩展模块,性能可以逼近C语言。
3、合理利用Node的异步模型与V8的高性能,就可以充分发挥CPU和I/O资源的优势。

难点

1、难点1:异常处理

tray/catch失效:
异步I/O实现主要包含两个阶段:提交请求和处理结果。这两个阶段中间有事件循环的调度,两者彼此不关联。异步方法则通常在第一个阶段提价请求后立即返回,因为异常并不一定发生在这个阶段,tray/catch功效在此处不会发挥任何作用。
Node在处理异常上形成一种约定:将异常作为回调函数的第一个实参传回,如果为空值,则表明异步调用没有异常抛出

1
2
3
async(function(err, result){
//TODO
})

我们自行编写的异步方法上,也需要去遵循一些原则:

  • 原则一: 必须执行调用者传入的回调函数;
  • 原则二:正确传递回异常供调用者判断。
    示例代码如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    var async = function(callback){
    process.nextTick(function(){
    var result = something;
    if(error) {
    return callback(error);
    }
    callback(null, result);
    })
    }

2、难点2:函数嵌套过深

Node中多个异步调用的场景比比皆是。

3、难点3:多线程编程

child_process是多线程编程的基础API,cluster模式是更深层次的应用,在第九章详细展开。

4、难点4:异步转同步

Node中的同步式编程,需要借助库或者编译等手段来实现。

异步编程解决方案

异步编程的主要解决方案有3种:

  • 事件发布/订阅模式
  • Promise/Deferred模式
  • 流程控制库

事件发布/订阅模式

事件监听器模式是一种广泛用于异步编程的模式,是回调函数的事件化,又称发布/订阅模式。

1
2
3
4
5
6
//订阅
emitter.on('event1', function(message){
console.log(message)
});
// 发布
emitter.emit('event1', 'i am message');

事件发布/订阅模式可以实现一个事件与多个回调函数的关联,这些回调函数又称为事件侦听器
Node对事件发布/订阅的机制做了健壮性处理:

  • 如果对一个事件添加了超过10个侦听器,将会得到一个警告。调用emitter.setMaxListeners(0)可以去掉限制。
  • EvenEmitter对象对error事件进行了特殊对待.如果没有为 ‘error’ 事件注册监听器,则当 ‘error’ 事件触发时,会抛出错误、打印堆栈跟踪、并退出 Node.js 进程。

利用事件队列解决雪崩问题

雪崩问题:
在高访问量、大并发量大情况下缓存失效大情景,此时大量大请求同时涌入数据库中,数据库无法同时承受如此大的查询请求,进而往前影响到网站整体的响应速度。
以下是一条数据库查询语句的调用:

1
2
3
4
5
var select = function(callback) {
db.select('SQL', function(results) {
callback(results)
});
};

如果站点刚好启动,这时缓存中是不存在数据的,而如果访问量巨大,同一句SQL会被发送到数据库中反复查询,会影响服务的整体性能。 一种改进方案是添加一个状态锁,代码如下:

1
2
3
4
5
6
7
8
9
10
var status = 'ready';
var select = function(callback) {
if(status === 'ready') {
status = 'pending';
db.select('SQL', function(results){
status = 'ready';
callback(results);
})
}
}

在上面状态锁的情景中,连续多次调用select(),只有第一次调用是生效的,后续的select()是没有数据服务的,这个时候可以引入事件队列,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
var proxy = new events.EventEmitter();
var status = 'ready';
var select = function(callback) {
proxy.once('selected', callback);
if(status === 'ready') {
status = 'pendimg';
db.select('SQL', function(results){
proxy.emit('selected', results);
status = 'ready';
})
}
}

利用once()方法,将所有请求的回调都压入事件队列中,利用其执行一次就会将监视器移除的特点,保证每一个回调只执行一次。
此处可能因为存在侦听器过多引发的警告,需要调用setMaxListeners(0)移除掉警告,或者设置更大警告阀值。

多异步之间的协作方案

一般而言,事件与侦听器是一对多,但在异步编程中,也会出现事件与侦听器的关系是多对一的情况,也就是说一个业务逻辑可能依赖两个通过回调或事件传递的结果。这也是难点2——嵌套过深的原因。
我们尝试通过原生代码实现将串行执行转变为并行调用。这里以渲染页面所需要的模版读取、数据读取和本地化资源读取为例,代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
var count = 0;
var results = {};
var done = function(key, value){
results[key] = value;
count++;
if(count === 3) {
// 渲染页面
render(results);
}
}

fs.readFile(template_path, 'utf8', function(err, template){
done('template',template);
});
db.query(sql, function(err, data) {
done('data', data);
});
l10n.get(function(err, resources){
done('resources', resources);
})

回调函数不能保证顺序,且回调函数之间没有交集,需要借助一个第三方函数和第三方变量来处理异步协作的结果。通常,我们把检测次数的变量叫做哨兵变量。同时,还需利用偏函数来处理哨兵函数和第三方函数的关系,如下:

1
2
3
4
5
6
7
8
9
10
11
12
var after = function(times, callback){
var count = 0, results = {};
return function(kes, value) {
results[key] = value;
count++;
if(count === times) {
callback(results)
}
}
}

var done = after(times, render)

另一个方案是来自笔者的EventProxy模版。

Promise/Deferred模式

Promise/Deferred模式,先执行异步调用,延迟传递处理的方式。

Promises/A

Promises/A提议对单个异步操作做出抽象的定义:

  • Promise操作只会处在3种状态的一种: 未完成态、完成态和失败态。
  • Promise的状态只会出现从未完成态向完成态或失败态的转化,不能逆反。完成态和失败态不能互相转化。
  • Promise的状态一旦转化,将不能更改。

Promises/A提议只定义一个API–一个promise对象只具备then()即可。但有如下要求:

  • 接受完成态、错误态的回调方法。在操作完成或出现错误时,将会调用对应方法。
  • 可选地支持progress事件回调作为第三个方法。
  • then()方法只接受function对象,其余对象将被忽略。
  • then()方法继续返回promise对象,以实现链式调用。

Promises/A+提议扩展Promises/A提议,浏览器端和node端的promise函数遵照Promises/A+提议。

流程控制库

  1. 尾触发与Next
    除了事件与Promise外,还有一类是需要手动调用才能持续执行后续调用的,我们将此类方法叫做尾触发,常见关键词是next,应用最多地方在connect中间件。也就是洋葱模型。

  2. async
    async流程控制模块.ES6规范已经加入async/await.

异步并发控制

bagpipe的解决方案

作者写的bagpipe模块的解决思路:

  • 通过一个队列来控制并发量
  • 如果当前活跃(指调用发起但未执行回调)的异步调用量小于限定值,从队列中取出执行
  • 如果活跃调用达到限定值,调用暂时存放的队列中
  • 每个异步调用结束时,从队列取出新的异步调用执行

bagpipe的API主要暴露了一个push()方法和full事件,示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
var Bagpipe = require('bagpipe');
// 设定最大并发数为10
var bagpipe = new Bagpipe(10);
for(var i=0; i < 100; i++) {
bagpipe.push(async, function(){
// 异步回调执行
});
}
bagpipe.on('full', function(length){
console.warn('底层系统处理不能及时完成,队列拥堵,目前队列长度为:' + length);
});

push()方法通过函数变换的方式实现,假设第一个参数是方法,最后一个参数是回调参数,其余为其他参数,其核心实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* 推入方法,参数。最后一个参数为回调函数
* @param {Function} method 异步方法
* @param {Mix} args 参数列表,最后一个参数为回调函数
*/
Bagpipe.prototype.push = function(method) {
var aegs = [].slice.call(arguments, 1);
var callback = args[args.length - 1];
if(typeof callback !== 'function') {
args.push(function({}));
}
if(this.options.disabled || this.limit < 1) {
method.apply(null, args);
return this;
}

// 队列长度也超过限制值时
if(this.queue.length > this.queueLength || !this.options.refuse) { // refuse为拒绝模式
this.queue.push({
method: method,
args: args
});
} else {
var err = new Error('Too much async call in queue');
err.name = 'TooMuchAsyncCallError';
callback(err);
}

if(this.queue.length > 1) {
this.emit('full', this.queue.length);
}

this.next();
return this;
};

将调用推入队列后,调用一次next()方法尝试触发。next()方法定义如下:

1
2
3
4
5
6
7
8
9
10
/*
* 继续执行队列中的后续动作
*/
Bagpipe.prototype.next = function() {
var that = this;
if(that.active < that.limit && that.queue.length) {
var req = that.queue.shift();
that.run(req.method, req.args);
}
};

next()方法主要判断活跃调用的数量,如果正常,将调用内部方法run()来执行真正的调用。这里为了判断回调函数是否执行,采用了一个注入代码的技巧,具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/*
* 执行队列中的方法
*/
Bagpipe.prototype.run = function(method, args) {
var that = this;
that.active++;
var callback = args[args.length - 1];
var timer = null;
var called = false;

// inject logic
args[args.length -1] = function(err) {
// anyway, clear the timer
if(timer) {
clearTimer(timer);
timer = null;
}
// if timeout,don't excute
if(!called) {
that._next();
callback.apply(null, arguments);
} else {
// pass the outdated error
if(err) {
that.emit('outdated', err);
}
}
};

var timeout = that.options.timeout; // 超时控制
if(timeout) {
timer = setTimeout(function(){
// set called as true
called = true;
that._next();
// pass the exception
var err = new Error(timeout + 'ms timeout');
err.name = 'BagpipeTimeoutError';
err.data = {
name: method.name,
method: method.toString(),
args: args.slice(0, -1)
};
callback(err);
}, timeout);
}
method.apply(null, args);
}

用户传入的回调函数被真正执行前,被封装替换过。这个封装的回调函数内部的逻辑将活跃值的计数器减1后,主动调用next()执行后续等待的异步调用。

小结

本章中介绍的几种异步编程解决方案,是当时的主要使用方案。随着ECMAScript5的完善,目前的异步编程解决方案已有所变化。

liborn wechat
欢迎您扫一扫上面的微信二维码,订阅我的公众号!