import json
import locale
import math
+import operator
import os
import pipes
import platform
import zlib
from .compat import (
+ compat_basestring,
compat_chr,
- compat_getenv,
compat_html_entities,
compat_http_client,
+ compat_kwargs,
compat_parse_qs,
compat_socket_create_connection,
compat_str,
compiled_regex_type = type(re.compile(''))
std_headers = {
- 'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:10.0) Gecko/20100101 Firefox/10.0 (Chrome)',
+ 'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:10.0) Gecko/20150101 Firefox/20.0 (Chrome)',
'Accept-Charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.7',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Encoding': 'gzip, deflate',
}
+ENGLISH_MONTH_NAMES = [
+ 'January', 'February', 'March', 'April', 'May', 'June',
+ 'July', 'August', 'September', 'October', 'November', 'December']
+
+
def preferredencoding():
"""Get preferred encoding.
try:
pref = locale.getpreferredencoding()
'TEST'.encode(pref)
- except:
+ except Exception:
pref = 'UTF-8'
return pref
'encoding': 'utf-8',
})
- tf = tempfile.NamedTemporaryFile(**args)
+ tf = tempfile.NamedTemporaryFile(**compat_kwargs(args))
try:
with tf:
except OSError:
pass
os.rename(tf.name, fn)
- except:
+ except Exception:
try:
os.remove(tf.name)
except OSError:
def find_xpath_attr(node, xpath, key, val):
# Here comes the crazy part: In 2.6, if the xpath is a unicode,
# .//node does not match if a node is a direct child of . !
- if isinstance(xpath, unicode):
+ if isinstance(xpath, compat_str):
xpath = xpath.encode('ascii')
for f in node.findall(xpath):
raise
# In case of error, try to remove win32 forbidden chars
- alt_filename = os.path.join(
- re.sub('[/<>:"\\|\\\\?\\*]', '#', path_part)
- for path_part in os.path.split(filename)
- )
+ alt_filename = sanitize_path(filename)
if alt_filename == filename:
raise
else:
# An exception here should be caught in the caller
- stream = open(encodeFilename(filename), open_mode)
+ stream = open(encodeFilename(alt_filename), open_mode)
return (stream, alt_filename)
# Common case of "Foreign band name - English song title"
if restricted and result.startswith('-_'):
result = result[2:]
+ if result.startswith('-'):
+ result = '_' + result[len('-'):]
+ result = result.lstrip('.')
if not result:
result = '_'
return result
+def sanitize_path(s):
+ """Sanitizes and normalizes path on Windows"""
+ if sys.platform != 'win32':
+ return s
+ drive_or_unc, _ = os.path.splitdrive(s)
+ if sys.version_info < (2, 7) and not drive_or_unc:
+ drive_or_unc, _ = os.path.splitunc(s)
+ norm_path = os.path.normpath(remove_start(s, drive_or_unc)).split(os.path.sep)
+ if drive_or_unc:
+ norm_path.pop(0)
+ sanitized_path = [
+ path_part if path_part in ['.', '..'] else re.sub('(?:[/<>:"\\|\\\\?\\*]|\.$)', '#', path_part)
+ for path_part in norm_path]
+ if drive_or_unc:
+ sanitized_path.insert(0, drive_or_unc + os.path.sep)
+ return os.path.join(*sanitized_path)
+
+
def orderedSet(iterable):
""" Remove all duplicates from the input iterable """
res = []
if entity in compat_html_entities.name2codepoint:
return compat_chr(compat_html_entities.name2codepoint[entity])
- mobj = re.match(r'#(x?[0-9]+)', entity)
+ mobj = re.match(r'#(x[0-9a-fA-F]+|[0-9]+)', entity)
if mobj is not None:
numstr = mobj.group(1)
if numstr.startswith('x'):
r'&([^;]+);', lambda m: _htmlentity_transform(m.group(1)), s)
+def get_subprocess_encoding():
+ if sys.platform == 'win32' and sys.getwindowsversion()[0] >= 5:
+ # For subprocess calls, encode with locale encoding
+ # Refer to http://stackoverflow.com/a/9951851/35070
+ encoding = preferredencoding()
+ else:
+ encoding = sys.getfilesystemencoding()
+ if encoding is None:
+ encoding = 'utf-8'
+ return encoding
+
+
def encodeFilename(s, for_subprocess=False):
"""
@param s The name of the file
if sys.version_info >= (3, 0):
return s
- if sys.platform == 'win32' and sys.getwindowsversion()[0] >= 5:
- # Pass '' directly to use Unicode APIs on Windows 2000 and up
- # (Detecting Windows NT 4 is tricky because 'major >= 4' would
- # match Windows 9x series as well. Besides, NT 4 is obsolete.)
- if not for_subprocess:
- return s
- else:
- # For subprocess calls, encode with locale encoding
- # Refer to http://stackoverflow.com/a/9951851/35070
- encoding = preferredencoding()
- else:
- encoding = sys.getfilesystemencoding()
- if encoding is None:
- encoding = 'utf-8'
- return s.encode(encoding, 'ignore')
+ # Pass '' directly to use Unicode APIs on Windows 2000 and up
+ # (Detecting Windows NT 4 is tricky because 'major >= 4' would
+ # match Windows 9x series as well. Besides, NT 4 is obsolete.)
+ if not for_subprocess and sys.platform == 'win32' and sys.getwindowsversion()[0] >= 5:
+ return s
+
+ return s.encode(get_subprocess_encoding(), 'ignore')
+
+
+def decodeFilename(b, for_subprocess=False):
+
+ if sys.version_info >= (3, 0):
+ return b
+
+ if not isinstance(b, bytes):
+ return b
+
+ return b.decode(get_subprocess_encoding(), 'ignore')
def encodeArgument(s):
return encodeFilename(s, True)
+def decodeArgument(b):
+ return decodeFilename(b, True)
+
+
def decodeOption(optval):
if optval is None:
return optval
pass
if sys.version_info < (3, 2):
- import httplib
-
- class HTTPSConnectionV3(httplib.HTTPSConnection):
- def __init__(self, *args, **kwargs):
- httplib.HTTPSConnection.__init__(self, *args, **kwargs)
-
- def connect(self):
- sock = socket.create_connection((self.host, self.port), self.timeout)
- if getattr(self, '_tunnel_host', False):
- self.sock = sock
- self._tunnel()
- try:
- self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file, ssl_version=ssl.PROTOCOL_TLSv1)
- except ssl.SSLError:
- self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file, ssl_version=ssl.PROTOCOL_SSLv23)
-
- return YoutubeDLHTTPSHandler(params, https_conn_class=HTTPSConnectionV3, **kwargs)
+ return YoutubeDLHTTPSHandler(params, **kwargs)
else: # Python < 3.4
- context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+ context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
context.verify_mode = (ssl.CERT_NONE
if opts_no_check_certificate
else ssl.CERT_REQUIRED)
return YoutubeDLHTTPSHandler(params, context=context, **kwargs)
+def bug_reports_message():
+ if ytdl_is_updateable():
+ update_cmd = 'type youtube-dl -U to update'
+ else:
+ update_cmd = 'see https://yt-dl.org/update on how to update'
+ msg = '; please report this issue on https://yt-dl.org/bug .'
+ msg += ' Make sure you are using the latest version; %s.' % update_cmd
+ msg += ' Be sure to call youtube-dl with the --verbose flag and include its complete output.'
+ return msg
+
+
class ExtractorError(Exception):
"""Error during info extraction."""
if cause:
msg += ' (caused by %r)' % cause
if not expected:
- if ytdl_is_updateable():
- update_cmd = 'type youtube-dl -U to update'
- else:
- update_cmd = 'see https://yt-dl.org/update on how to update'
- msg += '; please report this issue on https://yt-dl.org/bug .'
- msg += ' Make sure you are using the latest version; %s.' % update_cmd
- msg += ' Be sure to call youtube-dl with the --verbose flag and include its complete output.'
+ msg += bug_reports_message()
super(ExtractorError, self).__init__(msg)
self.traceback = tb
sock = compat_socket_create_connection(
(self.host, self.port), self.timeout, sa)
if is_https:
- self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file)
+ self.sock = ssl.wrap_socket(
+ sock, self.key_file, self.cert_file,
+ ssl_version=ssl.PROTOCOL_TLSv1)
else:
self.sock = sock
hc.connect = functools.partial(_hc_connect, hc)
def http_request(self, req):
for h, v in std_headers.items():
- if h not in req.headers:
+ # Capitalize is needed because of Python bug 2275: http://bugs.python.org/issue2275
+ # The dict keys are capitalized because of this bug by urllib
+ if h.capitalize() not in req.headers:
req.add_header(h, v)
if 'Youtubedl-no-compression' in req.headers:
if 'Accept-encoding' in req.headers:
del req.headers['Accept-encoding']
del req.headers['Youtubedl-no-compression']
- if 'Youtubedl-user-agent' in req.headers:
- if 'User-agent' in req.headers:
- del req.headers['User-agent']
- req.headers['User-agent'] = req.headers['Youtubedl-user-agent']
- del req.headers['Youtubedl-user-agent']
if sys.version_info < (2, 7) and '#' in req.get_full_url():
# Python 2.6 is brain-dead when it comes to fragments
self._params = params
def https_open(self, req):
+ kwargs = {}
+ if hasattr(self, '_context'): # python > 2.6
+ kwargs['context'] = self._context
+ if hasattr(self, '_check_hostname'): # python 3.x
+ kwargs['check_hostname'] = self._check_hostname
return self.do_open(functools.partial(
_create_http_connection, self, self._https_conn_class, True),
- req)
+ req, **kwargs)
-def parse_iso8601(date_str, delimiter='T'):
+def parse_iso8601(date_str, delimiter='T', timezone=None):
""" Return a UNIX timestamp from the given date """
if date_str is None:
return None
- m = re.search(
- r'(\.[0-9]+)?(?:Z$| ?(?P<sign>\+|-)(?P<hours>[0-9]{2}):?(?P<minutes>[0-9]{2})$)',
- date_str)
- if not m:
- timezone = datetime.timedelta()
- else:
- date_str = date_str[:-len(m.group(0))]
- if not m.group('sign'):
+ if timezone is None:
+ m = re.search(
+ r'(\.[0-9]+)?(?:Z$| ?(?P<sign>\+|-)(?P<hours>[0-9]{2}):?(?P<minutes>[0-9]{2})$)',
+ date_str)
+ if not m:
timezone = datetime.timedelta()
else:
- sign = 1 if m.group('sign') == '+' else -1
- timezone = datetime.timedelta(
- hours=sign * int(m.group('hours')),
- minutes=sign * int(m.group('minutes')))
+ date_str = date_str[:-len(m.group(0))]
+ if not m.group('sign'):
+ timezone = datetime.timedelta()
+ else:
+ sign = 1 if m.group('sign') == '+' else -1
+ timezone = datetime.timedelta(
+ hours=sign * int(m.group('hours')),
+ minutes=sign * int(m.group('minutes')))
date_format = '%Y-%m-%d{0}%H:%M:%S'.format(delimiter)
dt = datetime.datetime.strptime(date_str, date_format) - timezone
return calendar.timegm(dt.timetuple())
# Replace commas
date_str = date_str.replace(',', ' ')
# %z (UTC offset) is only supported in python>=3.2
- date_str = re.sub(r' ?(\+|-)[0-9]{2}:?[0-9]{2}$', '', date_str)
+ if not re.match(r'^[0-9]{1,2}-[0-9]{1,2}-[0-9]{4}$', date_str):
+ date_str = re.sub(r' ?(\+|-)[0-9]{2}:?[0-9]{2}$', '', date_str)
# Remove AM/PM + timezone
- date_str = re.sub(r'(?i)\s*(?:AM|PM)\s+[A-Z]+', '', date_str)
+ date_str = re.sub(r'(?i)\s*(?:AM|PM)(?:\s+[A-Z]+)?', '', date_str)
format_expressions = [
'%d %B %Y',
]
if day_first:
format_expressions.extend([
+ '%d-%m-%Y',
'%d.%m.%Y',
'%d/%m/%Y',
'%d/%m/%y',
])
else:
format_expressions.extend([
+ '%m-%d-%Y',
'%m.%d.%Y',
'%m/%d/%Y',
'%m/%d/%y',
except AttributeError:
# If the output stream doesn't have a fileno, it's virtual
return False
+ except io.UnsupportedOperation:
+ # Some strange Windows pseudo files?
+ return False
if fileno not in WIN_OUTPUT_IDS:
return False
def not_a_console(handle):
if handle == INVALID_HANDLE_VALUE or handle is None:
return True
- return ((GetFileType(handle) & ~FILE_TYPE_REMOTE) != FILE_TYPE_CHAR
- or GetConsoleMode(handle, ctypes.byref(ctypes.wintypes.DWORD())) == 0)
+ return ((GetFileType(handle) & ~FILE_TYPE_REMOTE) != FILE_TYPE_CHAR or
+ GetConsoleMode(handle, ctypes.byref(ctypes.wintypes.DWORD())) == 0)
if not_a_console(h):
return False
return ' '.join(quoted_args)
-def takewhile_inclusive(pred, seq):
- """ Like itertools.takewhile, but include the latest evaluated element
- (the first element so that Not pred(e)) """
- for e in seq:
- yield e
- if not pred(e):
- return
-
-
def smuggle_url(url, data):
""" Pass additional data in a URL for internal use. """
return int(float(num_str) * mult)
-def get_term_width():
- columns = compat_getenv('COLUMNS', None)
- if columns:
- return int(columns)
+def month_by_name(name):
+ """ Return the number of a month by (locale-independently) English name """
try:
- sp = subprocess.Popen(
- ['stty', 'size'],
- stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- out, err = sp.communicate()
- return int(out.split()[1])
- except:
- pass
- return None
+ return ENGLISH_MONTH_NAMES.index(name) + 1
+ except ValueError:
+ return None
-def month_by_name(name):
- """ Return the number of a month by (locale-independently) English name """
+def month_by_abbreviation(abbrev):
+ """ Return the number of a month by (locale-independently) English
+ abbreviations """
- ENGLISH_NAMES = [
- 'January', 'February', 'March', 'April', 'May', 'June',
- 'July', 'August', 'September', 'October', 'November', 'December']
try:
- return ENGLISH_NAMES.index(name) + 1
+ return [s[:3] for s in ENGLISH_MONTH_NAMES].index(abbrev) + 1
except ValueError:
return None
def parse_duration(s):
- if not isinstance(s, basestring if sys.version_info < (3, 0) else compat_str):
+ if not isinstance(s, compat_basestring):
return None
s = s.strip()
(?P<only_mins>[0-9.]+)\s*(?:mins?|minutes?)\s*|
(?P<only_hours>[0-9.]+)\s*(?:hours?)|
+ \s*(?P<hours_reversed>[0-9]+)\s*(?:[:h]|hours?)\s*(?P<mins_reversed>[0-9]+)\s*(?:[:m]|mins?|minutes?)\s*|
(?:
- (?:(?P<hours>[0-9]+)\s*(?:[:h]|hours?)\s*)?
+ (?:
+ (?:(?P<days>[0-9]+)\s*(?:[:d]|days?)\s*)?
+ (?P<hours>[0-9]+)\s*(?:[:h]|hours?)\s*
+ )?
(?P<mins>[0-9]+)\s*(?:[:m]|mins?|minutes?)\s*
)?
(?P<secs>[0-9]+)(?P<ms>\.[0-9]+)?\s*(?:s|secs?|seconds?)?
return float_or_none(m.group('only_hours'), invscale=60 * 60)
if m.group('secs'):
res += int(m.group('secs'))
+ if m.group('mins_reversed'):
+ res += int(m.group('mins_reversed')) * 60
if m.group('mins'):
res += int(m.group('mins')) * 60
if m.group('hours'):
res += int(m.group('hours')) * 60 * 60
+ if m.group('hours_reversed'):
+ res += int(m.group('hours_reversed')) * 60 * 60
+ if m.group('days'):
+ res += int(m.group('days')) * 24 * 60 * 60
if m.group('ms'):
res += float(m.group('ms'))
return res
-def prepend_extension(filename, ext):
+def prepend_extension(filename, ext, expected_real_ext=None):
+ name, real_ext = os.path.splitext(filename)
+ return (
+ '{0}.{1}{2}'.format(name, ext, real_ext)
+ if not expected_real_ext or real_ext[1:] == expected_real_ext
+ else '{0}.{1}'.format(filename, ext))
+
+
+def replace_extension(filename, ext, expected_real_ext=None):
name, real_ext = os.path.splitext(filename)
- return '{0}.{1}{2}'.format(name, ext, real_ext)
+ return '{0}.{1}'.format(
+ name if not expected_real_ext or real_ext[1:] == expected_real_ext else filename,
+ ext)
def check_executable(exe, args=[]):
or False if the executable is not present """
try:
out, _ = subprocess.Popen(
- [exe] + args,
+ [encodeArgument(exe)] + args,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT).communicate()
except OSError:
return False
s)
+def lowercase_escape(s):
+ unicode_escape = codecs.getdecoder('unicode_escape')
+ return re.sub(
+ r'\\u[0-9a-fA-F]{4}',
+ lambda m: unicode_escape(m.group(0))[0],
+ s)
+
+
def escape_rfc3986(s):
"""Escape non-ASCII characters as suggested by RFC 3986"""
- if sys.version_info < (3, 0) and isinstance(s, unicode):
+ if sys.version_info < (3, 0) and isinstance(s, compat_str):
s = s.encode('utf-8')
return compat_urllib_parse.quote(s, b"%/;:@&=+$,!~*'()?#[]")
return '"%s"' % v
res = re.sub(r'''(?x)
- "(?:[^"\\]*(?:\\\\|\\")?)*"|
- '(?:[^'\\]*(?:\\\\|\\')?)*'|
- [a-zA-Z_][a-zA-Z_0-9]*
+ "(?:[^"\\]*(?:\\\\|\\['"nu]))*[^"\\]*"|
+ '(?:[^'\\]*(?:\\\\|\\['"nu]))*[^'\\]*'|
+ [a-zA-Z_][.a-zA-Z_0-9]*
''', fix_kv, code)
- res = re.sub(r',(\s*\])', lambda m: m.group(1), res)
+ res = re.sub(r',(\s*[\]}])', lambda m: m.group(1), res)
return res
return ' '.join(shlex_quote(a) for a in args)
+def mimetype2ext(mt):
+ _, _, res = mt.rpartition('/')
+
+ return {
+ 'x-ms-wmv': 'wmv',
+ 'x-mp4-fragmented': 'mp4',
+ 'ttml+xml': 'ttml',
+ }.get(res, res)
+
+
def urlhandle_detect_ext(url_handle):
try:
url_handle.headers
except AttributeError: # Python < 3
getheader = url_handle.info().getheader
- return getheader('Content-Type').split("/")[1]
+ cd = getheader('Content-Disposition')
+ if cd:
+ m = re.match(r'attachment;\s*filename="(?P<filename>[^"]+)"', cd)
+ if m:
+ e = determine_ext(m.group('filename'), default_ext=None)
+ if e:
+ return e
+
+ return mimetype2ext(getheader('Content-Type'))
def age_restricted(content_limit, age_limit):
if content_limit is None:
return False # Content available for everyone
return age_limit < content_limit
+
+
+def is_html(first_bytes):
+ """ Detect whether a file contains HTML by examining its first bytes. """
+
+ BOMS = [
+ (b'\xef\xbb\xbf', 'utf-8'),
+ (b'\x00\x00\xfe\xff', 'utf-32-be'),
+ (b'\xff\xfe\x00\x00', 'utf-32-le'),
+ (b'\xff\xfe', 'utf-16-le'),
+ (b'\xfe\xff', 'utf-16-be'),
+ ]
+ for bom, enc in BOMS:
+ if first_bytes.startswith(bom):
+ s = first_bytes[len(bom):].decode(enc, 'replace')
+ break
+ else:
+ s = first_bytes.decode('utf-8', 'replace')
+
+ return re.match(r'^\s*<', s)
+
+
+def determine_protocol(info_dict):
+ protocol = info_dict.get('protocol')
+ if protocol is not None:
+ return protocol
+
+ url = info_dict['url']
+ if url.startswith('rtmp'):
+ return 'rtmp'
+ elif url.startswith('mms'):
+ return 'mms'
+ elif url.startswith('rtsp'):
+ return 'rtsp'
+
+ ext = determine_ext(url)
+ if ext == 'm3u8':
+ return 'm3u8'
+ elif ext == 'f4m':
+ return 'f4m'
+
+ return compat_urllib_parse_urlparse(url).scheme
+
+
+def render_table(header_row, data):
+ """ Render a list of rows, each as a list of values """
+ table = [header_row] + data
+ max_lens = [max(len(compat_str(v)) for v in col) for col in zip(*table)]
+ format_str = ' '.join('%-' + compat_str(ml + 1) + 's' for ml in max_lens[:-1]) + '%s'
+ return '\n'.join(format_str % tuple(row) for row in table)
+
+
+def _match_one(filter_part, dct):
+ COMPARISON_OPERATORS = {
+ '<': operator.lt,
+ '<=': operator.le,
+ '>': operator.gt,
+ '>=': operator.ge,
+ '=': operator.eq,
+ '!=': operator.ne,
+ }
+ operator_rex = re.compile(r'''(?x)\s*
+ (?P<key>[a-z_]+)
+ \s*(?P<op>%s)(?P<none_inclusive>\s*\?)?\s*
+ (?:
+ (?P<intval>[0-9.]+(?:[kKmMgGtTpPeEzZyY]i?[Bb]?)?)|
+ (?P<strval>(?![0-9.])[a-z0-9A-Z]*)
+ )
+ \s*$
+ ''' % '|'.join(map(re.escape, COMPARISON_OPERATORS.keys())))
+ m = operator_rex.search(filter_part)
+ if m:
+ op = COMPARISON_OPERATORS[m.group('op')]
+ if m.group('strval') is not None:
+ if m.group('op') not in ('=', '!='):
+ raise ValueError(
+ 'Operator %s does not support string values!' % m.group('op'))
+ comparison_value = m.group('strval')
+ else:
+ try:
+ comparison_value = int(m.group('intval'))
+ except ValueError:
+ comparison_value = parse_filesize(m.group('intval'))
+ if comparison_value is None:
+ comparison_value = parse_filesize(m.group('intval') + 'B')
+ if comparison_value is None:
+ raise ValueError(
+ 'Invalid integer value %r in filter part %r' % (
+ m.group('intval'), filter_part))
+ actual_value = dct.get(m.group('key'))
+ if actual_value is None:
+ return m.group('none_inclusive')
+ return op(actual_value, comparison_value)
+
+ UNARY_OPERATORS = {
+ '': lambda v: v is not None,
+ '!': lambda v: v is None,
+ }
+ operator_rex = re.compile(r'''(?x)\s*
+ (?P<op>%s)\s*(?P<key>[a-z_]+)
+ \s*$
+ ''' % '|'.join(map(re.escape, UNARY_OPERATORS.keys())))
+ m = operator_rex.search(filter_part)
+ if m:
+ op = UNARY_OPERATORS[m.group('op')]
+ actual_value = dct.get(m.group('key'))
+ return op(actual_value)
+
+ raise ValueError('Invalid filter part %r' % filter_part)
+
+
+def match_str(filter_str, dct):
+ """ Filter a dictionary with a simple string syntax. Returns True (=passes filter) or false """
+
+ return all(
+ _match_one(filter_part, dct) for filter_part in filter_str.split('&'))
+
+
+def match_filter_func(filter_str):
+ def _match_func(info_dict):
+ if match_str(filter_str, info_dict):
+ return None
+ else:
+ video_title = info_dict.get('title', info_dict.get('id', 'video'))
+ return '%s does not pass filter %s, skipping ..' % (video_title, filter_str)
+ return _match_func
+
+
+def parse_dfxp_time_expr(time_expr):
+ if not time_expr:
+ return 0.0
+
+ mobj = re.match(r'^(?P<time_offset>\d+(?:\.\d+)?)s?$', time_expr)
+ if mobj:
+ return float(mobj.group('time_offset'))
+
+ mobj = re.match(r'^(\d+):(\d\d):(\d\d(?:\.\d+)?)$', time_expr)
+ if mobj:
+ return 3600 * int(mobj.group(1)) + 60 * int(mobj.group(2)) + float(mobj.group(3))
+
+
+def srt_subtitles_timecode(seconds):
+ return '%02d:%02d:%02d,%03d' % (seconds / 3600, (seconds % 3600) / 60, seconds % 60, (seconds % 1) * 1000)
+
+
+def dfxp2srt(dfxp_data):
+ _x = functools.partial(xpath_with_ns, ns_map={'ttml': 'http://www.w3.org/ns/ttml'})
+
+ def parse_node(node):
+ str_or_empty = functools.partial(str_or_none, default='')
+
+ out = str_or_empty(node.text)
+
+ for child in node:
+ if child.tag in (_x('ttml:br'), 'br'):
+ out += '\n' + str_or_empty(child.tail)
+ elif child.tag in (_x('ttml:span'), 'span'):
+ out += str_or_empty(parse_node(child))
+ else:
+ out += str_or_empty(xml.etree.ElementTree.tostring(child))
+
+ return out
+
+ dfxp = xml.etree.ElementTree.fromstring(dfxp_data.encode('utf-8'))
+ out = []
+ paras = dfxp.findall(_x('.//ttml:p')) or dfxp.findall('.//p')
+
+ if not paras:
+ raise ValueError('Invalid dfxp/TTML subtitle')
+
+ for para, index in zip(paras, itertools.count(1)):
+ begin_time = parse_dfxp_time_expr(para.attrib['begin'])
+ end_time = parse_dfxp_time_expr(para.attrib.get('end'))
+ if not end_time:
+ end_time = begin_time + parse_dfxp_time_expr(para.attrib['dur'])
+ out.append('%d\n%s --> %s\n%s\n\n' % (
+ index,
+ srt_subtitles_timecode(begin_time),
+ srt_subtitles_timecode(end_time),
+ parse_node(para)))
+
+ return ''.join(out)
+
+
+class PerRequestProxyHandler(compat_urllib_request.ProxyHandler):
+ def __init__(self, proxies=None):
+ # Set default handlers
+ for type in ('http', 'https'):
+ setattr(self, '%s_open' % type,
+ lambda r, proxy='__noproxy__', type=type, meth=self.proxy_open:
+ meth(r, proxy, type))
+ return compat_urllib_request.ProxyHandler.__init__(self, proxies)
+
+ def proxy_open(self, req, proxy, type):
+ req_proxy = req.headers.get('Ytdl-request-proxy')
+ if req_proxy is not None:
+ proxy = req_proxy
+ del req.headers['Ytdl-request-proxy']
+
+ if proxy == '__noproxy__':
+ return None # No Proxy
+ return compat_urllib_request.ProxyHandler.proxy_open(
+ self, req, proxy, type)