Node.js 之 RPC 调用

什么是 RPC 调用

RPC 全称为 Remote Procedure Call(远程过程调用)。

  • 是指两个计算机之间的网络通信。
  • 客户端与服务端需约定一个数据格式。
  • 基于 TCP 或 UDP 协议。

TCP 通信方式

单工通信

Client 端和 Server 端,只能单方向传输数据,不能双向传输。

Client --> Server或者Client <-- Server

单工通信

半双工通信

Client 端和 Server 端,可以双向交替传输数据。在同一时间段内,只能由其中一端向另外一端传输数据。

半双工通信

全双工通信

Client 端和 Server 端,可以双向同时传输数据。在同一时间段内,可双向交叉传输数据。

全双工通信

Node.js Buffer 模块

使用 Node.js Buffer 模块,编码/解码二进制数据包。

官方文档:Buffer | Node.js v14.20.0 Documentation

知识扩展:

Node.js Net 模块

使用 Node.js Net 模块,搭建多路复用的 RPC 通道。

示例:单工通信

由客户端向服务端传送数据。

客户端 Client

  1. const net = require('net');
  2. const socket = new net.Socket({});
  3. socket.connect({
  4. host: '127.0.0.1',
  5. port: 5000,
  6. });
  7. socket.write('Hello World!');

服务端 Server

  1. const net = require('net');
  2. const server = net.createServer((socket) => {
  3. // 监听 data 事件
  4. socket.on('data', (data) => {
  5. // 参数 data 为 Buffer 类型
  6. console.log(data, data.toString());
  7. });
  8. });
  9. server.listen(5000);

示例:半双工通信

客户端 Client

  1. const net = require('net');
  2. const socket = new net.Socket({});
  3. // 建立连接
  4. socket.connect({
  5. host: '127.0.0.1',
  6. port: 5000,
  7. });
  8. // id 集合
  9. const ids = [
  10. 10001,
  11. 10002,
  12. 10003,
  13. 10004,
  14. 10005,
  15. 10006,
  16. 10007,
  17. 10008,
  18. 10009,
  19. 10010,
  20. ];
  21. // id 随机索引
  22. let index = Math.floor(Math.random() * ids.length);
  23. // 传送第一条数据
  24. socket.write(encode(index));
  25. // 监听从服务端返回的数据
  26. socket.on('data', (buffer) => {
  27. console.log(buffer.toString());
  28. // 接收到数据之后,按照半双工通信逻辑,开始下一次请求。
  29. index = Math.floor(Math.random() * ids.length);
  30. socket.write(encode(index));
  31. });
  32. // 编码请求包
  33. function encode(index) {
  34. buffer = Buffer.alloc(4);
  35. buffer.writeInt32BE(
  36. ids[index]
  37. );
  38. return buffer;
  39. }

服务端 Server

  1. const net = require('net');
  2. const dataList = {
  3. 10001: '第 1 条数据',
  4. 10002: '第 2 条数据',
  5. 10003: '第 3 条数据',
  6. 10004: '第 4 条数据',
  7. 10005: '第 5 条数据',
  8. 10006: '第 6 条数据',
  9. 10007: '第 7 条数据',
  10. 10008: '第 8 条数据',
  11. 10009: '第 9 条数据',
  12. 10010: '第 10 条数据',
  13. };
  14. const server = net.createServer((socket) => {
  15. socket.on('data', (buffer) => {
  16. const id = buffer.readInt32BE();
  17. // 伪造延迟 100 毫秒
  18. setTimeout(() => {
  19. socket.write(
  20. Buffer.from(dataList[id])
  21. );
  22. }, 100);
  23. })
  24. });
  25. server.listen(5000);

示例:全双工通信

客户端 Client

  1. const net = require('net');
  2. const socket = new net.Socket({});
  3. socket.connect({
  4. host: '127.0.0.1',
  5. port: 4000
  6. });
  7. const LESSON_IDS = [
  8. 10001,
  9. 10002,
  10. 10003,
  11. 10004,
  12. 10005,
  13. 10006,
  14. 10007,
  15. 10008,
  16. 10009,
  17. 10010,
  18. ]
  19. let id = Math.floor(Math.random() * LESSON_IDS.length);
  20. let oldBuffer = null;
  21. socket.on('data', (buffer) => {
  22. // 把上一次data事件使用残余的buffer接上来
  23. if (oldBuffer) {
  24. buffer = Buffer.concat([oldBuffer, buffer]);
  25. }
  26. let completeLength = 0;
  27. // 只要还存在可以解成完整包的包长
  28. while (completeLength = checkComplete(buffer)) {
  29. const package = buffer.slice(0, completeLength);
  30. buffer = buffer.slice(completeLength);
  31. // 把这个包解成数据和seq
  32. const result = decode(package);
  33. console.log(`包${result.seq},返回值是${result.data}`);
  34. }
  35. // 把残余的buffer记下来
  36. oldBuffer = buffer;
  37. })
  38. let seq = 0;
  39. /**
  40. * 二进制包编码函数
  41. * 在一段rpc调用里,客户端需要经常编码rpc调用时,业务数据的请求包
  42. */
  43. function encode(data) {
  44. // 正常情况下,这里应该是使用 protobuf 来encode一段代表业务数据的数据包
  45. // 为了不要混淆重点,这个例子比较简单,就直接把课程id转buffer发送
  46. const body = Buffer.alloc(4);
  47. body.writeInt32BE(LESSON_IDS[data.id]);
  48. // 一般来说,一个rpc调用的数据包会分为定长的包头和不定长的包体两部分
  49. // 包头的作用就是用来记载包的序号和包的长度,以实现全双工通信
  50. const header = Buffer.alloc(6);
  51. header.writeInt16BE(seq)
  52. header.writeInt32BE(body.length, 2);
  53. // 包头和包体拼起来发送
  54. const buffer = Buffer.concat([header, body])
  55. console.log(`包${seq}传输的课程id${LESSON_IDS[data.id]}`);
  56. seq++;
  57. return buffer;
  58. }
  59. /**
  60. * 二进制包解码函数
  61. * 在一段rpc调用里,客户端需要经常解码rpc调用时,业务数据的返回包
  62. */
  63. function decode(buffer) {
  64. const header = buffer.slice(0, 6);
  65. const seq = header.readInt16BE();
  66. const body = buffer.slice(6)
  67. return {
  68. seq,
  69. data: body.toString()
  70. }
  71. }
  72. /**
  73. * 检查一段buffer是不是一个完整的数据包。
  74. * 具体逻辑是:判断header的bodyLength字段,看看这段buffer是不是长于header和body的总长
  75. * 如果是,则返回这个包长,意味着这个请求包是完整的。
  76. * 如果不是,则返回0,意味着包还没接收完
  77. * @param {} buffer
  78. */
  79. function checkComplete(buffer) {
  80. if (buffer.length < 6) {
  81. return 0;
  82. }
  83. const bodyLength = buffer.readInt32BE(2);
  84. return 6 + bodyLength
  85. }
  86. for (let k = 0; k < 100; k++) {
  87. id = Math.floor(Math.random() * LESSON_IDS.length);
  88. socket.write(encode({ id }));
  89. }

服务端 Server

  1. const net = require('net');
  2. const server = net.createServer((socket) => {
  3. let oldBuffer = null;
  4. socket.on('data', function (buffer) {
  5. // 把上一次data事件使用残余的buffer接上来
  6. if (oldBuffer) {
  7. buffer = Buffer.concat([oldBuffer, buffer]);
  8. }
  9. let packageLength = 0;
  10. // 只要还存在可以解成完整包的包长
  11. while (packageLength = checkComplete(buffer)) {
  12. const package = buffer.slice(0, packageLength);
  13. buffer = buffer.slice(packageLength);
  14. // 把这个包解成数据和seq
  15. const result = decode(package);
  16. // 计算得到要返回的结果,并write返回
  17. socket.write(
  18. encode(LESSON_DATA[result.data], result.seq)
  19. );
  20. }
  21. // 把残余的buffer记下来
  22. oldBuffer = buffer;
  23. })
  24. });
  25. server.listen(4000);
  26. /**
  27. * 二进制包编码函数
  28. * 在一段rpc调用里,服务端需要经常编码rpc调用时,业务数据的返回包
  29. */
  30. function encode(data, seq) {
  31. // 正常情况下,这里应该是使用 protobuf 来encode一段代表业务数据的数据包
  32. // 为了不要混淆重点,这个例子比较简单,就直接把课程标题转buffer返回
  33. const body = Buffer.from(data)
  34. // 一般来说,一个rpc调用的数据包会分为定长的包头和不定长的包体两部分
  35. // 包头的作用就是用来记载包的序号和包的长度,以实现全双工通信
  36. const header = Buffer.alloc(6);
  37. header.writeInt16BE(seq)
  38. header.writeInt32BE(body.length, 2);
  39. const buffer = Buffer.concat([header, body])
  40. return buffer;
  41. }
  42. /**
  43. * 二进制包解码函数
  44. * 在一段rpc调用里,服务端需要经常解码rpc调用时,业务数据的请求包
  45. */
  46. function decode(buffer) {
  47. const header = buffer.slice(0, 6);
  48. const seq = header.readInt16BE();
  49. // 正常情况下,这里应该是使用 protobuf 来decode一段代表业务数据的数据包
  50. // 为了不要混淆重点,这个例子比较简单,就直接读一个Int32即可
  51. const body = buffer.slice(6).readInt32BE()
  52. // 这里把seq和数据返回出去
  53. return {
  54. seq,
  55. data: body
  56. }
  57. }
  58. /**
  59. * 检查一段buffer是不是一个完整的数据包。
  60. * 具体逻辑是:判断header的bodyLength字段,看看这段buffer是不是长于header和body的总长
  61. * 如果是,则返回这个包长,意味着这个请求包是完整的。
  62. * 如果不是,则返回0,意味着包还没接收完
  63. * @param {} buffer
  64. */
  65. function checkComplete(buffer) {
  66. if (buffer.length < 6) {
  67. return 0;
  68. }
  69. const bodyLength = buffer.readInt32BE(2);
  70. return 6 + bodyLength
  71. }
  72. // 假数据
  73. const LESSON_DATA = {
  74. 10001: '第 1 条数据',
  75. 10002: '第 2 条数据',
  76. 10003: '第 3 条数据',
  77. 10004: '第 4 条数据',
  78. 10005: '第 5 条数据',
  79. 10006: '第 6 条数据',
  80. 10007: '第 7 条数据',
  81. 10008: '第 8 条数据',
  82. 10009: '第 9 条数据',
  83. 10010: '第 10 条数据',
  84. }

(完)