shadowsocks源码分析:ssserver

shadowsocks服务端源分析。

shadowsocks的原理

关于shadowsocks[1]的原理,这张图解释的清清楚楚。

../../../_images/whats-shadowsocks-041.png

其中:

  • PC是需要利用shadowsocks代理的应用;
  • SS Local为shadowsocks客户端,通常运行在PC/手机上(也可以运行在任务PC可 以到达的位置),用于与shadowsocks服务端建立连接。
  • GFW无需赘述了;
  • ss servershadowsocks服务端,与ss local通讯,完成ss local请 求的访问,并将返回数据加密返回给ss local

数据流向[2]

  • 图中1) Request即本地应用通过sock5代理向ss local发起访 问请求
  • 图中2) Encrypt Requestss local将PC的请求经加密交给ss server
  • ss server收到ss local的数据后解密,得到目的地址和数据,再向目的地 址(Google/Twitter/Facebook)发起请求,即图中3) Request
  • 目的服务器(Google/Twitter/Facebook)响应ss server的请求,即4) Response
  • ss server收到响应数据后,将其加密发送给ss local,即:5) Encrypt Response
  • ss local收到ss server发回的经加密的响应数据后,解密交给请求发起 方PC。即:6) Response

源码结构

这里以shadowsocks-2.8.2的python源代码为例来分析ssserver的设计。 解开源码可以看到shadowsocks目录结构如下(crypto略去)

|...
├── shadowsocks
│   ├── asyncdns.py
│   ├── common.py
│   ├── crypto
│   │   ├── ....
│   ├── daemon.py
│   ├── encrypt.py
│   ├── eventloop.py
│   ├── __init__.py
│   ├── local.py
│   ├── lru_cache.py
│   ├── manager.py
│   ├── server.py
│   ├── shell.py
│   ├── tcprelay.py
│   └── udprelay.py
|...

其中:

  • server.py是服务端入口ssserver;
  • local.py是客户端入口sslocal;
  • tcprelay.py提供了类TCPRelay, TCPRelayHandler来处理TCP连接, UDP相关由udprelay.py提供
  • eventloop.py提供了类EventLoop对epoll, kqueue, select方法的包装,提 供统一的IO复用接口
  • encrypt.py提供加密解密相关接口
  • common.py包含一些通用接口

服务端数据处理流程

shadowsocks服务端的启动

首先看一下server.py中的入口函数main中的代码段:

  1. 创建TCPRelay侦听相应的端口,等待shadowsocks客户端连接。从这里可以看 到shadowsocks的客户端同时侦听TCP和UDP端口
1
2
3
4
5
6
7
8
9
# file: server.py
for port, password in port_password.items():
    a_config = config.copy()
    a_config['server_port'] = int(port)
    a_config['password'] = password
    logging.info("starting server at %s:%d" %
                 (a_config['server'], int(port)))
    tcp_servers.append(tcprelay.TCPRelay(a_config, dns_resolver, False))
    udp_servers.append(udprelay.UDPRelay(a_config, dns_resolver, False))
  1. 接下来一看TCPRelay的初始化
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# file tcprelay.py

class TCPRelay(object):
    # TCPRelay侦听并处理来自ss client连接请求
    def __init__(self, config, dns_resolver, is_local, stat_callback=None):
        self._config = config
        self._is_local = is_local       # 是否为ss local。False
        self._dns_resolver = dns_resolver
        self._closed = False
        self._eventloop = None
        self._fd_to_handlers = {}   # {socketfd: socket_handle}

        self._timeout = config['timeout']
        self._timeouts = []  # a list for all the handlers
        # we trim the timeouts once a while
        self._timeout_offset = 0   # last checked position for timeout
        self._handler_to_timeouts = {}  # key: handler value: index in timeouts

        if is_local:
            listen_addr = config['local_address']
            listen_port = config['local_port']
        else:
            listen_addr = config['server']
            listen_port = config['server_port']
        self._listen_port = listen_port

        # 打开一个socket,并设置相关参数,然后bind, listen
        addrs = socket.getaddrinfo(listen_addr, listen_port, 0,
                                   socket.SOCK_STREAM, socket.SOL_TCP)
        if len(addrs) == 0:
            raise Exception("can't get addrinfo for %s:%d" %
                            (listen_addr, listen_port))
        af, socktype, proto, canonname, sa = addrs[0]
        server_socket = socket.socket(af, socktype, proto)
        server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        server_socket.bind(sa)
        server_socket.setblocking(False)
        if config['fast_open']:
            try:
                server_socket.setsockopt(socket.SOL_TCP, 23, 5)
            except socket.error:
                logging.error('warning: fast open is not available')
                self._config['fast_open'] = False
        server_socket.listen(1024)
        self._server_socket = server_socket
        self._stat_callback = stat_callback
  1. TCPRelay初始化完成后,返回回server.py的main函数。接下来创建了一个 EventLoop对象,并将打开的socket注册到EventLoop
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# file: server.py
def main():
    # ...
    def run_server():
        # 注册signal处理函数
        # ...
        try:
            # 创建EventLoop
            loop = eventloop.EventLoop()
            dns_resolver.add_to_loop(loop)

            # 观察者模式
            # epoll/kqueue/select观察着socket的状态,当socket状态发生变化时
            # 调用消息处理函数
            # 将已经打开的socket注册到EventLoop侦听相应的事件
            list(map(lambda s: s.add_to_loop(loop), tcp_servers + udp_servers))

            daemon.set_user(config.get('user', None))
            # 启动事件循环,等待shadowsocks客户端的连接
            loop.run()
        except Exception as e:
            shell.print_exception(e)
            sys.exit(1)

    if int(config['workers']) > 1:
        #...
    run_server()
  1. 看一下EventLoop的初始化和TCPRelay.add_to_loop的代码:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# file: eventloop.py

class EventLoop(object):
    def __init__(self):
        # 选择IO复用模式,初始化一些参数
        if hasattr(select, 'epoll'):
            self._impl = select.epoll()
            model = 'epoll'
        elif hasattr(select, 'kqueue'):
            self._impl = KqueueLoop()
            model = 'kqueue'
        elif hasattr(select, 'select'):
            self._impl = SelectLoop()
            model = 'select'
        else:
            raise Exception('can not find any available functions in select '
                            'package')
        self._fdmap = {}  # (f, handler) 此结构非重要
        self._last_time = time.time()
        self._periodic_callbacks = []
        self._stopping = False
        logging.debug('using event model: %s', model)

    def add(self, f, mode, handler):
        # 参数:
        #   f:  socket
        #   mode: 所侦听的事件
        #   handler: 事件处理对象,当socket注册的事件mode发生时,会调用
        #           handler.handle_event(...)
        fd = f.fileno()
        # 此处将(socket, handler)加入到一个字典, key为socket的文件描述符
        self._fdmap[fd] = (f, handler)
        self._impl.register(fd, mode)

    def run(self):
        events = []
        while not self._stopping:
            # ...
            asap = False
            try:
                # 等待事件触发,返回触发的事件
                events = self.poll(TIMEOUT_PRECISION)
            except (OSError, IOError) as e:
                if errno_from_exception(e) in (errno.EPIPE, errno.EINTR):
                # ...

            # 遍历被激活的事件
            for sock, fd, event in events:
                # 根据文件描述述查找对应的handle。在self.add函数加注册的
                handler = self._fdmap.get(fd, None)
                if handler is not None:
                    handler = handler[1]
                    try:
                        # 调用相关_handle_event方法,处理事件
                        handler.handle_event(sock, fd, event)
                    # ...
  1. 再来看看TCPRelay.add_to_loop做了什么
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
# file: tcprelay.py
class TCPRelay(object):
    # ...
    def add_to_loop(self, loop):
        if self._eventloop:
            raise Exception('already add to loop')
        if self._closed:
            raise Exception('already closed')
        self._eventloop = loop
        # 调用EventLoop的add方法注册自身打开的socket
        self._eventloop.add(self._server_socket,
                            eventloop.POLL_IN | eventloop.POLL_ERR, self)
        # 调用EventLoop的add_periodic方法注册一个周期处理函数,
        # 清理失效的socket
        self._eventloop.add_periodic(self.handle_periodic)

到这里,shadowsocks服务端就算启动完成了,就等shadowsocks客户端来连接了。回顾一下做了些什么:

  • 根据配置文件打开指定的TCP/UDP端口
  • 建立一个IO复用对象EventLoop,并注册socket的读事件(eventloop.POLL_IN
  • 启动EventLoop,等待shadowsocks连接。

shadowsocks客户端连接服务端

当shadowsocks客户端连接到服务端时,EventLoop中就会产生相关的事件,并调用对应的 TCPRelay.handle_event方法来处理事件。

  1. shadowsocks客户端向服务端发起连接,将触发TCPRelay的socket的 EventLoop.POLL_IN事件。即会调用TCPRelay.handle_event方法。 shodowsocks服务端接受连接,并创建一个TCPRelayHandler来管理, 即由TCPRelayHandler与shadowsocks客户端通讯。初始连接事件处理完,Eventloop又 进入待机状态,
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# file: tcprelay.py
class TCPRelay(object):
    def handle_event(self, sock, fd, event):
        # handle events and dispatch to handlers
        if sock:
            logging.log(shell.VERBOSE_LEVEL, 'fd %d %s', fd,
                        eventloop.EVENT_NAMES.get(event, event))
        # 如果事件来自self._server_socket(服务端侦听socket)
        if sock == self._server_socket:
            if event & eventloop.POLL_ERR:
                # TODO
                raise Exception('server_socket error')
            try:
                logging.debug('accept')
                # 接受新的客户端连接
                conn = self._server_socket.accept()
                # 建立TCPRelayHandler来管理客户端
                TCPRelayHandler(self, self._fd_to_handlers,
                                self._eventloop, conn[0], self._config,
                                self._dns_resolver, self._is_local)
            except (OSError, IOError) as e:
            # ...
        else:
        # 如果事件是由其它socket触发的,
        # 且sock是有效的
            if sock:
                # 根据fd查找到对应的handler
                handler = self._fd_to_handlers.get(fd, None)
                if handler:
                    # 调用handler.handle_event来处理事件
                    handler.handle_event(sock, event)
        # ...
  1. 第一步中,当一个客户端连接上服务端后,建立一个新的TCPRelayHandler来 管理客户端,看看TCPRelayHandler的初始化做了什么:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# file: tcprelay.py
class TCPRelayHandler(object):
    def __init__(self, server, fd_to_handlers, loop, local_sock, config,
                 dns_resolver, is_local):
        # 创建当前对象的TCPRelay对象
        self._server = server
        self._fd_to_handlers = fd_to_handlers
        self._loop = loop
        # 与ss client的socket连接
        self._local_sock = local_sock
        # ...
        # 请注意初始状态值
        self._stage = STAGE_INIT
        self._upstream_status = WAIT_STATUS_READING
        self._downstream_status = WAIT_STATUS_INIT
        # ...
        # 将socket(与ss client连接)加入到TCPRelay的fd_to_handlers的字典中
        # 此时字典里只有一项(假定当前是第一个也是唯一一个ss client的连接)
        # 即ss client与ss server的连接socket
        fd_to_handlers[local_sock.fileno()] = self
        local_sock.setblocking(False)
        local_sock.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
        # 将与ss client连接的socket注册至EventLoop。首先注册的是POLL_IN事件
        # socket被添加到EventLoop的字典_fdmap中
        loop.add(local_sock, eventloop.POLL_IN | eventloop.POLL_ERR,
                 self._server)
        self.last_activity = 0
        # 更新活动信息,以免被TCPRelay清理
        self._update_activity()

这样,ss client就与ss server建立了连接,EventLoop开始侦听与ss client连接的socket。

shadowsocks客户端向服务端发送数据

ss localss server建立好连接之后,socket即处理EvetLoop的监听下。 ss local开始向ss server发送数据。EventLoop会调用TCPRelay.handle_event并通过查找TCPRelay._fd_to_handles,最终调用 TCPRelayHandler.handle_event方法来处理。

  1. ss localss server发送请求数据。调用TCPRelayHandler.handle_event来处理。因为是ss local -> ss server 所以socket是TCPRelayHandler._local_sock, 最后会调用 TCPRelayHandler._on_local_read()来处理
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# file: tcprelay.py
class TCPRelayHandler(object):
    # ...
    def handle_event(self, sock, event):
        # handle all events in this handler and dispatch them to methods
        if self._stage == STAGE_DESTROYED:
            logging.debug('ignore handle_event: destroyed')
            return
        # order is important
        if sock == self._remote_sock:
            if event & eventloop.POLL_ERR:
                self._on_remote_error()
                if self._stage == STAGE_DESTROYED:
                    return
            if event & (eventloop.POLL_IN | eventloop.POLL_HUP):
                self._on_remote_read()
                if self._stage == STAGE_DESTROYED:
                    return
            if event & eventloop.POLL_OUT:
                self._on_remote_write()
        # 是ss local与ss server间的socket触发了事件
        elif sock == self._local_sock:
            if event & eventloop.POLL_ERR:
                self._on_local_error()
                if self._stage == STAGE_DESTROYED:
                    return
            # 如果为读事件,即ss local向ss server发送请求
            if event & (eventloop.POLL_IN | eventloop.POLL_HUP):
                self._on_local_read()
                if self._stage == STAGE_DESTROYED:
                    return
            # 如果为可写事件
            if event & eventloop.POLL_OUT:
                self._on_local_write()
        else:
            logging.warn('unknown socket')
    # ...
  1. 接下来看看TCPRelayHandler.on_local_read()如何处理ss local的 请求。
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# file: tcprelay.py
class TCPRelayHandler(object):
    def _on_local_read(self):
        # handle all local read events and dispatch them to methods for
        # each stage
        if not self._local_sock:
            return
        is_local = self._is_local
        data = None
        try:
            # 从socket读取数据
            data = self._local_sock.recv(BUF_SIZE)
        # ...
        if not is_local:
            # 由于当前是 ss server,所以走向此分支
            # 将数据解密
            data = self._encryptor.decrypt(data)
            if not data:
                return
        # 检查状态
        if self._stage == STAGE_STREAM:
            if self._is_local:
                data = self._encryptor.encrypt(data)
            self._write_to_sock(data, self._remote_sock)
            return
        # 当前是ss server
        elif is_local and self._stage == STAGE_INIT:
            # TODO check auth method
            self._write_to_sock(b'\x05\00', self._local_sock)
            self._stage = STAGE_ADDR
            return
        elif self._stage == STAGE_CONNECTING:
            self._handle_stage_connecting(data)
        elif (is_local and self._stage == STAGE_ADDR) or \
                (not is_local and self._stage == STAGE_INIT):
            # 第一个数据包的处理来到这一步
            self._handle_stage_addr(data)

    def _handle_stage_addr(self, data):
        try:
            if self._is_local:
                # ...
            # 解析ss local的数据包
            header_result = parse_header(data)
            # ...
            # 得到ss local想要连接到remote主机和数据
            addrtype, remote_addr, remote_port, header_length = header_result
            logging.info('connecting %s:%d from %s:%d' %
                         (common.to_str(remote_addr), remote_port,
                          self._client_address[0], self._client_address[1]))
            self._remote_address = (common.to_str(remote_addr), remote_port)
            # pause reading
            # 改变与ss local连接socket的侦听事件?
            self._update_stream(STREAM_UP, WAIT_STATUS_WRITING)
            # 进入到STAGE_DNS态
            self._stage = STAGE_DNS
            if self._is_local:
                # ...
            else:
                if len(data) > header_length:
                    # 将ss local的请求数据写入值缓冲队列
                    self._data_to_write_to_remote.append(data[header_length:])
                # notice here may go into _handle_dns_resolved directly
                # 解析ss local请求的域名
                # 完成后调用TCPRelayHandler._handle_dns_resolved()
                self._dns_resolver.resolve(remote_addr,
                                           self._handle_dns_resolved)
        except Exception as e:
            # ...
    # ...

    # 此函数有点让人晕
    def _update_stream(self, stream, status):
        # update a stream to a new waiting status

        # check if status is changed
        # only update if dirty
        dirty = False
        if stream == STREAM_DOWN:
            if self._downstream_status != status:
                self._downstream_status = status
                dirty = True
        elif stream == STREAM_UP:
            if self._upstream_status != status:
                self._upstream_status = status
                dirty = True
        if dirty:
            if self._local_sock:
                event = eventloop.POLL_ERR
                if self._downstream_status & WAIT_STATUS_WRITING:
                    event |= eventloop.POLL_OUT
                if self._upstream_status & WAIT_STATUS_READING:
                    event |= eventloop.POLL_IN
                self._loop.modify(self._local_sock, event)
            if self._remote_sock:
                event = eventloop.POLL_ERR
                if self._downstream_status & WAIT_STATUS_READING:
                    event |= eventloop.POLL_IN
                if self._upstream_status & WAIT_STATUS_WRITING:
                    event |= eventloop.POLL_OUT
                self._loop.modify(self._remote_sock, event)

    # 客户端请求的域名解析完成
    def _handle_dns_resolved(self, result, error):
        if error:
            self._log_error(error)
            self.destroy()
            return
        if result:
            ip = result[1]
            if ip:

                try:
                    self._stage = STAGE_CONNECTING
                    remote_addr = ip
                    if self._is_local:
                        remote_port = self._chosen_server[1]
                    else:
                        remote_port = self._remote_address[1]

                    if self._is_local and self._config['fast_open']:
                        # ...
                    else:
                        # else do connect
                        # 非常重要!!
                        # 创建一个到remote主机的socket,并将socket的文件描述
                        # 符和socket添加到TCPRelay的字典
                        # TCPRelay._fd_to_handler中
                        remote_sock = self._create_remote_socket(remote_addr,
                                                                 remote_port)
                        try:
                            # 连接remote主机(ss local请求的地址)
                            remote_sock.connect((remote_addr, remote_port))
                        except (OSError, IOError) as e:
                            if eventloop.errno_from_exception(e) == \
                                    errno.EINPROGRESS:
                                pass
                        # 注册可写事件。是不是只要可写立即就会触发?
                        # 如果是,马上会调用TCPRelayHandler._on_remote_wirte()
                        # 将ss local请求的数据发送到remote主机
                        self._loop.add(remote_sock,
                                       eventloop.POLL_ERR | eventloop.POLL_OUT,
                                       self._server)
                        # TCPRelayHandler进入STAGE_CONNECTING态
                        self._stage = STAGE_CONNECTING
                        # 更新stream状态
                        self._update_stream(STREAM_UP, WAIT_STATUS_READWRITING)
                        self._update_stream(STREAM_DOWN, WAIT_STATUS_READING)
                    return
                except Exception as e:
                    shell.print_exception(e)
                    if self._config['verbose']:
                        traceback.print_exc()
        self.destroy()
  1. 此时TCPRelayHandler已经将ss local的请求数据发送给的remote主机,当远程主 机发回响应数据给ss server时,会触发TCPRelayHandler._remote_socket的读事 件。最终会调用TCPRelayHandler._on_remote_read方法:

即此时,ss local完成了一次与ss server的代理访问:

  1. ss local将想要访问的地址和数据加密将给ss server
  2. ss serverss local的请求数据解密,并向remote主机发送请求
  3. remote主机将响应数据发送给ss serverss server将响应数据加密发送 给ss local

注意:

  1. TCPRelay对应着一个ss server的侦听端口,主要负责处理与ss local 的连接
  2. TCPRelayHandler对应着一条ss localss server间的TCP链接, 同时管理着一条ss server与remote主机的连接。

不解

  1. 侦听与ss local连接socket事件时,为什么在侦听读和侦听写之间切换?同时侦 听两个事件有什么问题?
  2. 由于TCPRelayHandler只能管理一条ss server与remote的连接,且除第 一个数据包之外,后续数据,ss server都是收到之后直接交给remote。如此则 要求ss local针对每一个不同域名/IP,需要与ss server建立一条TCP链 接?