+
+def _build_proxy_handler(name):
+ class HTTPTestRequestHandler(compat_http_server.BaseHTTPRequestHandler):
+ proxy_name = name
+
+ def log_message(self, format, *args):
+ pass
+
+ def do_GET(self):
+ self.send_response(200)
+ self.send_header('Content-Type', 'text/plain; charset=utf-8')
+ self.end_headers()
+ self.wfile.write('{self.proxy_name}: {self.path}'.format(self=self).encode('utf-8'))
+ return HTTPTestRequestHandler
+
+
+class TestProxy(unittest.TestCase):
+ def setUp(self):
+ self.proxy = compat_http_server.HTTPServer(
+ ('localhost', 0), _build_proxy_handler('normal'))
+ self.port = http_server_port(self.proxy)
+ self.proxy_thread = threading.Thread(target=self.proxy.serve_forever)
+ self.proxy_thread.daemon = True
+ self.proxy_thread.start()
+
+ self.geo_proxy = compat_http_server.HTTPServer(
+ ('localhost', 0), _build_proxy_handler('geo'))
+ self.geo_port = http_server_port(self.geo_proxy)
+ self.geo_proxy_thread = threading.Thread(target=self.geo_proxy.serve_forever)
+ self.geo_proxy_thread.daemon = True
+ self.geo_proxy_thread.start()
+
+ def test_proxy(self):
+ geo_proxy = 'localhost:{0}'.format(self.geo_port)
+ ydl = YoutubeDL({
+ 'proxy': 'localhost:{0}'.format(self.port),
+ 'geo_verification_proxy': geo_proxy,
+ })
+ url = 'http://foo.com/bar'
+ response = ydl.urlopen(url).read().decode('utf-8')
+ self.assertEqual(response, 'normal: {0}'.format(url))
+
+ req = compat_urllib_request.Request(url)
+ req.add_header('Ytdl-request-proxy', geo_proxy)
+ response = ydl.urlopen(req).read().decode('utf-8')
+ self.assertEqual(response, 'geo: {0}'.format(url))
+
+ def test_proxy_with_idn(self):
+ ydl = YoutubeDL({
+ 'proxy': 'localhost:{0}'.format(self.port),
+ })
+ url = 'http://中文.tw/'
+ response = ydl.urlopen(url).read().decode('utf-8')
+ # b'xn--fiq228c' is '中文'.encode('idna')
+ self.assertEqual(response, 'normal: http://xn--fiq228c.tw/')
+