JavaScript

超轻量级php框架startmvc

node.js的http.createServer过程深入解析

更新时间:2020-08-31 00:12:01 作者:startmvc
下面是nodejs创建一个服务器的代码。接下来我们一起分析这个过程。varhttp=require('http');http.c

下面是nodejs创建一个服务器的代码。接下来我们一起分析这个过程。


var http = require('http');
http.createServer(function (request, response) {
 response.end('Hello World
');
}).listen(9297);

首先我们去到lib/http.js模块看一下这个函数的代码。


function createServer(requestListener) {
 return new Server(requestListener);
}

只是对_http_server.js做了些封装。我们继续往下看。


function Server(requestListener) {
 if (!(this instanceof Server)) return new Server(requestListener);
 net.Server.call(this, { allowHalfOpen: true });
 // 收到http请求时执行的回调
 if (requestListener) {
 this.on('request', requestListener);
 }

 this.httpAllowHalfOpen = false;
 // 建立tcp连接的回调
 this.on('connection', connectionListener);

 this.timeout = 2 * 60 * 1000;
 this.keepAliveTimeout = 5000;
 this._pendingResponseData = 0;
 this.maxHeadersCount = null;
}
util.inherits(Server, net.Server);

发现_http_server.js也没有太多逻辑,继续看lib/net.js下的代码。


function Server(options, connectionListener) {
 if (!(this instanceof Server))
 return new Server(options, connectionListener);

 EventEmitter.call(this);
 // connectionListener在http.js处理过了
 if (typeof options === 'function') {
 connectionListener = options;
 options = {};
 this.on('connection', connectionListener);
 } else if (options == null || typeof options === 'object') {
 options = options || {};

 if (typeof connectionListener === 'function') {
 this.on('connection', connectionListener);
 }
 } else {
 throw new errors.TypeError('ERR_INVALID_ARG_TYPE',
 'options',
 'Object',
 options);
 }

 this._connections = 0;
 ......
 this[async_id_symbol] = -1;
 this._handle = null;
 this._usingWorkers = false;
 this._workers = [];
 this._unref = false;

 this.allowHalfOpen = options.allowHalfOpen || false;
 this.pauseOnConnect = !!options.pauseOnConnect;
}

至此http.createServer就执行结束了,我们发现这个过程还没有涉及到很多逻辑,并且还是停留到js层面。接下来我们继续分析listen函数的过程。该函数是net模块提供的。我们只看关键的代码。


Server.prototype.listen = function(...args) {
 // 处理入参,根据文档我们知道listen可以接收好几个参数,我们这里是只传了端口号9297
 var normalized = normalizeArgs(args);
 // normalized = [{port: 9297}, null];
 var options = normalized[0];
 var cb = normalized[1];
 // 第一次listen的时候会创建,如果非空说明已经listen过
 if (this._handle) {
 throw new errors.Error('ERR_SERVER_ALREADY_LISTEN');
 }
 ......
 listenInCluster(this, null, options.port | 0, 4,
 backlog, undefined, options.exclusive);
}
function listenInCluster() {
 ...
 server._listen2(address, port, addressType, backlog, fd);
}

_listen2 = setupListenHandle = function() {
 ......
 this._handle = createServerHandle(...);
 this._handle.listen(backlog || 511);
}
function createServerHandle() {
 handle = new TCP(TCPConstants.SERVER);
 handle.bind(address, port);
}

到这我们终于看到了tcp连接的内容,每一个服务器新建一个handle并且保存他,该handle是一个TCP对象。然后执行bind和listen函数。接下来我们就看一下TCP类的代码。TCP是C++提供的类。对应的文件是tcp_wrap.cc。我们看看new TCP的时候发生了什么。


void TCPWrap::New(const FunctionCallbackInfo<Value>& args) {
 // This constructor should not be exposed to public javascript.
 // Therefore we assert that we are not trying to call this as a
 // normal function.
 CHECK(args.IsConstructCall());
 CHECK(args[0]->IsInt32());
 Environment* env = Environment::GetCurrent(args);

 int type_value = args[0].As<Int32>()->Value();
 TCPWrap::SocketType type = static_cast<TCPWrap::SocketType>(type_value);

 ProviderType provider;
 switch (type) {
 case SOCKET:
 provider = PROVIDER_TCPWRAP;
 break;
 case SERVER:
 provider = PROVIDER_TCPSERVERWRAP;
 break;
 default:
 UNREACHABLE();
 }

 new TCPWrap(env, args.This(), provider);
}


TCPWrap::TCPWrap(Environment* env, Local<Object> object, ProviderType provider)
 : ConnectionWrap(env, object, provider) {
 int r = uv_tcp_init(env->event_loop(), &handle_);
 CHECK_EQ(r, 0); 
}

我们看到,new TCP的时候其实是执行libuv的uv_tcp_init函数,初始化一个uv_tcp_t的结构体。首先我们先看一下uv_tcp_t结构体的结构。


uv_tcp_t
uv_tcp_t
// 初始化一个tcp流的结构体
int uv_tcp_init(uv_loop_t* loop, uv_tcp_t* tcp) {
 // 未指定未指定协议
 return uv_tcp_init_ex(loop, tcp, AF_UNSPEC);
}

int uv_tcp_init_ex(uv_loop_t* loop, uv_tcp_t* tcp, unsigned int flags) {
 int domain;

 /* Use the lower 8 bits for the domain */
 // 低八位是domain
 domain = flags & 0xFF;
 if (domain != AF_INET && domain != AF_INET6 && domain != AF_UNSPEC)
 return UV_EINVAL;
 // 除了第八位的其他位是flags
 if (flags & ~0xFF)
 return UV_EINVAL;

 uv__stream_init(loop, (uv_stream_t*)tcp, UV_TCP);

 /* If anything fails beyond this point we need to remove the handle from
 * the handle queue, since it was added by uv__handle_init in uv_stream_init.
 */

 if (domain != AF_UNSPEC) {
 int err = maybe_new_socket(tcp, domain, 0);
 if (err) {
 // 出错则把该handle移除loop队列
 QUEUE_REMOVE(&tcp->handle_queue);
 return err;
 }
 }

 return 0;
}

我们接着看uv__stream_init做了什么事情。


void uv__stream_init(uv_loop_t* loop,
 uv_stream_t* stream,
 uv_handle_type type) {
 int err;

 uv__handle_init(loop, (uv_handle_t*)stream, type);
 stream->read_cb = NULL;
 stream->alloc_cb = NULL;
 stream->close_cb = NULL;
 stream->connection_cb = NULL;
 stream->connect_req = NULL;
 stream->shutdown_req = NULL;
 stream->accepted_fd = -1;
 stream->queued_fds = NULL;
 stream->delayed_error = 0;
 QUEUE_INIT(&stream->write_queue);
 QUEUE_INIT(&stream->write_completed_queue);
 stream->write_queue_size = 0;

 if (loop->emfile_fd == -1) {
 err = uv__open_cloexec("/dev/null", O_RDONLY);
 if (err < 0)
 /* In the rare case that "/dev/null" isn't mounted open "/"
 * instead.
 */
 err = uv__open_cloexec("/", O_RDONLY);
 if (err >= 0)
 loop->emfile_fd = err;
 }

#if defined(__APPLE__)
 stream->select = NULL;
#endif /* defined(__APPLE_) */
 // 初始化io观察者
 uv__io_init(&stream->io_watcher, uv__stream_io, -1);
}

void uv__io_init(uv__io_t* w, uv__io_cb cb, int fd) {
 assert(cb != NULL);
 assert(fd >= -1);
 // 初始化队列,回调,需要监听的fd
 QUEUE_INIT(&w->pending_queue);
 QUEUE_INIT(&w->watcher_queue);
 w->cb = cb;
 w->fd = fd;
 w->events = 0;
 w->pevents = 0;

#if defined(UV_HAVE_KQUEUE)
 w->rcount = 0;
 w->wcount = 0;
#endif /* defined(UV_HAVE_KQUEUE) */
}

从代码可以知道,只是对uv_tcp_t结构体做了一些初始化操作。到这,new TCP的逻辑就执行完毕了。接下来就是继续分类nodejs里调用bind和listen的逻辑。nodejs的bind对应libuv的函数是uv__tcp_bind,listen对应的是uv_tcp_listen。 先看一个bind的核心代码。


/* Cannot set IPv6-only mode on non-IPv6 socket. */
 if ((flags & UV_TCP_IPV6ONLY) && addr->sa_family != AF_INET6)
 return UV_EINVAL;
 // 获取一个socket并且设置某些标记
 err = maybe_new_socket(tcp, addr->sa_family, 0);
 if (err)
 return err;

 on = 1;
 // 设置在端口可重用
 if (setsockopt(tcp->io_watcher.fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)))
 return UV__ERR(errno);
 bind(tcp->io_watcher.fd, addr, addrlen) && errno != EADDRINUSE
static int maybe_new_socket(uv_tcp_t* handle, int domain, unsigned long flags) {
 struct sockaddr_storage saddr;
 socklen_t slen;

 if (domain == AF_UNSPEC) {
 handle->flags |= flags;
 return 0;
 }
 return new_socket(handle, domain, flags);
}
static int new_socket(uv_tcp_t* handle, int domain, unsigned long flags) {
 struct sockaddr_storage saddr;
 socklen_t slen;
 int sockfd;
 int err;
 // 获取一个socket
 err = uv__socket(domain, SOCK_STREAM, 0);
 if (err < 0)
 return err;
 sockfd = err;
 // 设置选项和保存socket的文件描述符到io观察者中
 err = uv__stream_open((uv_stream_t*) handle, sockfd, flags);
 if (err) {
 uv__close(sockfd);
 return err;
 }
 ...
 return 0;
}

int uv__stream_open(uv_stream_t* stream, int fd, int flags) {
 if (!(stream->io_watcher.fd == -1 || stream->io_watcher.fd == fd))
 return UV_EBUSY;

 assert(fd >= 0);
 stream->flags |= flags;

 if (stream->type == UV_TCP) {
 if ((stream->flags & UV_HANDLE_TCP_NODELAY) && uv__tcp_nodelay(fd, 1))
 return UV__ERR(errno);

 /* TODO Use delay the user passed in. */
 if ((stream->flags & UV_HANDLE_TCP_KEEPALIVE) &&
 uv__tcp_keepalive(fd, 1, 60)) {
 return UV__ERR(errno);
 }
 }
 ...
 // 保存socket对应的文件描述符到io观察者中,libuv会在io poll阶段监听该文件描述符
 stream->io_watcher.fd = fd;

 return 0;
}

上面的一系列操作主要是新建一个socket文件描述符,设置一些flag,然后把文件描述符保存到IO观察者中,libuv在poll IO阶段会监听该文件描述符,如果有事件到来,会执行设置的回调函数,该函数是在uv__stream_init里设置的uv__stream_io。最后执行bind函数进行绑定操作。最后我们来分析一下listen函数。首先看下tcp_wrapper.cc的代码。


void TCPWrap::Listen(const FunctionCallbackInfo<Value>& args) {
 TCPWrap* wrap;
 ASSIGN_OR_RETURN_UNWRAP(&wrap,
 args.Holder(),
 args.GetReturnValue().Set(UV_EBADF));
 int backlog = args[0]->Int32Value();
 int err = uv_listen(reinterpret_cast<uv_stream_t*>(&wrap->handle_),
 backlog,
 OnConnection);
 args.GetReturnValue().Set(err);
}

代码中有个很重要的地方就是OnConnection函数,nodejs给listen函数设置了一个回调函数OnConnection,该函数在IO观察者里保存的文件描述符有连接到来时会被调用。OnConnection函数是在connection_wrap.cc定义的,tcp_wrapper继承了connection_wrap。下面我们先看一下uv_listen。该函数调用了uv_tcp_listen。该函数的核心代码如下。


 if (listen(tcp->io_watcher.fd, backlog))
 return UV__ERR(errno);
 // cb即OnConnection
 tcp->connection_cb = cb;
 tcp->flags |= UV_HANDLE_BOUND;

 // 有连接到来时的libuv层回调,覆盖了uv_stream_init时设置的值
 tcp->io_watcher.cb = uv__server_io;
 // 注册事件
 uv__io_start(tcp->loop, &tcp->io_watcher, POLLIN);

在libuv的poll IO阶段,epoll_wait会监听到到来的连接,然后调用uv__server_io。下面是该函数的核心代码。


// 继续注册事件,等待连接
 uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
 err = uv__accept(uv__stream_fd(stream));
 // 保存连接对应的socket
 stream->accepted_fd = err;
 // 执行nodejs层回调
 stream->connection_cb(stream, 0);

libuv会摘下一个连接,得到对应的socket。然后执行nodejs层的回调,这时候我们来看一下OnConnection的代码。


OnConnection(uv_stream_t* handle,int status)
 if (status == 0) {
 // 新建一个uv_tcp_t结构体
 Local<Object> client_obj = WrapType::Instantiate(env, wrap_data, WrapType::SOCKET);
 WrapType* wrap;
 ASSIGN_OR_RETURN_UNWRAP(&wrap, client_obj);
 uv_stream_t* client_handle = reinterpret_cast<uv_stream_t*>(&wrap->handle_);
 // uv_accept返回0表示成功
 if (uv_accept(handle, client_handle))
 return;
 argv[1] = client_obj;
 }
 // 执行上层的回调,该回调是net.js设置的onconnection
 wrap_data->MakeCallback(env->onconnection_string(), arraysize(argv), argv);

OnConnection新建了一个uv_tcp_t结构体。代表这个连接。然后调用uv_accept。


int uv_accept(uv_stream_t* server, uv_stream_t* client) {
 ...
 // 新建的uv_tcp_t结构体关联accept_fd,注册读写事件
 uv__stream_open(client, server->accepted_fd, UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
 ...
}

最后执行nodejs的回调。


function onconnection(err, clientHandle) {
 var handle = this;
 var self = handle.owner;
 if (err) {
 self.emit('error', errnoException(err, 'accept'));
 return;
 }
 if (self.maxConnections && self._connections >= self.maxConnections) {
 clientHandle.close();
 return;
 }
 var socket = new Socket({
 handle: clientHandle,
 allowHalfOpen: self.allowHalfOpen,
 pauseOnCreate: self.pauseOnConnect
 });
 socket.readable = socket.writable = true;
 self._connections++;
 socket.server = self;
 socket._server = self;
 DTRACE_NET_SERVER_CONNECTION(socket);
 LTTNG_NET_SERVER_CONNECTION(socket);
 COUNTER_NET_SERVER_CONNECTION(socket);
 // 触发_http_server.js里设置的connectionListener回调
 self.emit('connection', socket);
}

listen函数总体的逻辑就是把socket设置为可监听,然后注册事件,等待连接的到来,连接到来的时候,调用accept获取新建立的连接,tcp_wrapper.cc的回调新建一个uv_tcp_t结构体,代表新的连接,然后设置可读写事件,并且设置回调为uv__stream_io,等待数据的到来。最后执行_http_server.js设置的回调connectionListener。至此,服务器启动并且接收连接的过程就完成了。接下来就是对用户数据的读写。当用户传来数据时,处理数据的函数是uv__stream_io。后面继续解析数据的读写。

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对脚本之家的支持。

nodejs http.createserver过程 nodejs实例