File size: 8,518 Bytes
0aee47a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
"""

bilibili_api.utils.captcha



人机测试

"""
import os
import copy
import json
import time

from .utils import get_api
from .network import Api

validate = None
seccode = None
gt = None
challenge = None
key = None
server = None
thread = None

API = get_api("login")


def _geetest_urlhandler(url: str, content_type: str):
    """

    极验验证服务器 html 源获取函数

    """
    global gt, challenge, key
    url = url[1:]
    if url[:7] == "result/":
        global validate, seccode
        datas = url[7:]
        datas = datas.split("&")
        for data in datas:
            if data[:8] == "validate":
                validate = data[9:]
            elif data[:7] == "seccode":
                seccode = data[8:].replace("%7C", "|")
        with open(
            os.path.abspath(
                os.path.join(
                    os.path.dirname(__file__), "..", "data", "geetest", "done.html"
                )
            ),
            encoding="utf8",
        ) as f:
            html_source_bytes = f.read()
        return html_source_bytes
    elif url[:7] == "":
        api = API["password"]["captcha"]
        json_data = Api(**api).result_sync
        gt = json_data["geetest"]["gt"]
        challenge = json_data["geetest"]["challenge"]
        key = json_data["token"]
        with open(
            os.path.abspath(
                os.path.join(
                    os.path.dirname(__file__), "..", "data", "geetest", "captcha.html"
                )
            ),
            encoding="utf8",
        ) as f:
            html_source_bytes = (
                f.read()
                .replace("{ Python_Interface: GT }", f'"{gt}"')
                .replace("{ Python_Interface: CHALLENGE }", f'"{challenge}"')
            )
        return html_source_bytes
    else:
        return ""


def _start_server(urlhandler, hostname, port):
    """Start an HTTP server thread on a specific port.



    Start an HTML/text server thread, so HTML or text documents can be

    browsed dynamically and interactively with a web browser.  Example use:



        >>> import time

        >>> import pydoc



        Define a URL handler.  To determine what the client is asking

        for, check the URL and content_type.



        Then get or generate some text or HTML code and return it.



        >>> def my_url_handler(url, content_type):

        ...     text = 'the URL sent was: (%s, %s)' % (url, content_type)

        ...     return text



        Start server thread on port 0.

        If you use port 0, the server will pick a random port number.

        You can then use serverthread.port to get the port number.



        >>> port = 0

        >>> serverthread = pydoc._start_server(my_url_handler, port)



        Check that the server is really started.  If it is, open browser

        and get first page.  Use serverthread.url as the starting page.



        >>> if serverthread.serving:

        ...    import webbrowser



        The next two lines are commented out so a browser doesn't open if

        doctest is run on this module.



        #...    webbrowser.open(serverthread.url)

        #True



        Let the server do its thing. We just need to monitor its status.

        Use time.sleep so the loop doesn't hog the CPU.



        >>> starttime = time.monotonic()

        >>> timeout = 1                    #seconds



        This is a short timeout for testing purposes.



        >>> while serverthread.serving:

        ...     time.sleep(.01)

        ...     if serverthread.serving and time.monotonic() - starttime > timeout:

        ...          serverthread.stop()

        ...          break



        Print any errors that may have occurred.



        >>> print(serverthread.error)

        None

    """
    import select
    import threading
    import http.server
    import email.message

    class DocHandler(http.server.BaseHTTPRequestHandler):
        def do_GET(self):
            """Process a request from an HTML browser.



            The URL received is in self.path.

            Get an HTML page from self.urlhandler and send it.

            """
            if self.path.endswith(".css"):
                content_type = "text/css"
            else:
                content_type = "text/html"
            self.send_response(200)
            self.send_header("Content-Type", "%s; charset=UTF-8" % content_type)
            self.end_headers()
            self.wfile.write(self.urlhandler(self.path, content_type).encode("utf-8"))  # type: ignore

        def log_message(self, *args):
            # Don't log messages.
            pass

    class DocServer(http.server.HTTPServer):
        def __init__(self, host, port, callback):
            self.host = host
            self.address = (self.host, port)
            self.callback = callback
            self.base.__init__(self, self.address, self.handler)  # type: ignore
            self.quit = False

        def serve_until_quit(self):
            while not self.quit:
                rd, wr, ex = select.select([self.socket.fileno()], [], [], 1)
                if rd:
                    self.handle_request()
            self.server_close()

        def server_activate(self):
            self.base.server_activate(self)  # type: ignore
            if self.callback:
                self.callback(self)

    class ServerThread(threading.Thread):
        def __init__(self, urlhandler, host, port):
            self.urlhandler = urlhandler
            self.host = host
            self.port = int(port)
            threading.Thread.__init__(self)
            self.serving = False
            self.error = None

        def run(self):
            """Start the server."""
            try:
                DocServer.base = http.server.HTTPServer  # type: ignore
                DocServer.handler = DocHandler  # type: ignore
                DocHandler.MessageClass = email.message.Message  # type: ignore
                DocHandler.urlhandler = staticmethod(self.urlhandler)  # type: ignore
                docsvr = DocServer(self.host, self.port, self.ready)
                self.docserver = docsvr
                docsvr.serve_until_quit()
            except Exception as e:
                self.error = e

        def ready(self, server):
            self.serving = True
            self.host = server.host
            self.port = server.server_port
            self.url = "http://%s:%d/" % (self.host, self.port)

        def stop(self):
            """Stop the server and this thread nicely"""
            if self.docserver != None:
                self.docserver.quit = True
                self.join()
                # explicitly break a reference cycle: DocServer.callback
                # has indirectly a reference to ServerThread.
                self.docserver = None
                self.serving = False
                self.url = None

    thread = ServerThread(urlhandler, hostname, port)
    thread.start()
    # Wait until thread.serving is True to make sure we are
    # really up before returning.
    while not thread.error and not thread.serving:
        time.sleep(0.01)
    return thread


def start_server():
    """

    验证码服务打开服务器



    Returns:

        ServerThread: 服务进程



    返回值内函数及属性:

        - url   (str)     : 验证码服务地址

        - start (Callable): 开启进程

        - stop  (Callable): 结束进程

    """
    global thread
    thread = _start_server(_geetest_urlhandler, "127.0.0.1", 0)
    print("请打开 " + thread.url + " 进行验证。")  # type: ignore
    return thread


def close_server():
    """

    关闭服务器

    """
    global thread
    thread.stop()  # type: ignore


def get_result():
    """

    获取结果



    Returns:

        dict: 验证结果

    """
    global validate, seccode, challenge, gt, key
    if (
        validate is None
        or seccode is None
        or gt is None
        or challenge is None
        or key is None
    ):
        return -1
    else:
        dct = {
            "gt": copy.copy(gt),
            "challenge": copy.copy(challenge),
            "validate": copy.copy(validate),
            "seccode": copy.copy(seccode),
            "token": copy.copy(key),
        }
        return dct