+
+
+def int_or_none(v, scale=1, default=None, get_attr=None, invscale=1):
+ if get_attr:
+ if v is not None:
+ v = getattr(v, get_attr, None)
+ if v == '':
+ v = None
+ return default if v is None else (int(v) * invscale // scale)
+
+
+def str_or_none(v, default=None):
+ return default if v is None else compat_str(v)
+
+
+def str_to_int(int_str):
+ """ A more relaxed version of int_or_none """
+ if int_str is None:
+ return None
+ int_str = re.sub(r'[,\.\+]', '', int_str)
+ return int(int_str)
+
+
+def float_or_none(v, scale=1, invscale=1, default=None):
+ return default if v is None else (float(v) * invscale / scale)
+
+
+def parse_duration(s):
+ if not isinstance(s, compat_basestring):
+ return None
+
+ s = s.strip()
+
+ m = re.match(
+ r'''(?ix)(?:P?T)?
+ (?:
+ (?P<only_mins>[0-9.]+)\s*(?:mins?|minutes?)\s*|
+ (?P<only_hours>[0-9.]+)\s*(?:hours?)|
+
+ \s*(?P<hours_reversed>[0-9]+)\s*(?:[:h]|hours?)\s*(?P<mins_reversed>[0-9]+)\s*(?:[:m]|mins?|minutes?)\s*|
+ (?:
+ (?:
+ (?:(?P<days>[0-9]+)\s*(?:[:d]|days?)\s*)?
+ (?P<hours>[0-9]+)\s*(?:[:h]|hours?)\s*
+ )?
+ (?P<mins>[0-9]+)\s*(?:[:m]|mins?|minutes?)\s*
+ )?
+ (?P<secs>[0-9]+)(?P<ms>\.[0-9]+)?\s*(?:s|secs?|seconds?)?
+ )$''', s)
+ if not m:
+ return None
+ res = 0
+ if m.group('only_mins'):
+ return float_or_none(m.group('only_mins'), invscale=60)
+ if m.group('only_hours'):
+ return float_or_none(m.group('only_hours'), invscale=60 * 60)
+ if m.group('secs'):
+ res += int(m.group('secs'))
+ if m.group('mins_reversed'):
+ res += int(m.group('mins_reversed')) * 60
+ if m.group('mins'):
+ res += int(m.group('mins')) * 60
+ if m.group('hours'):
+ res += int(m.group('hours')) * 60 * 60
+ if m.group('hours_reversed'):
+ res += int(m.group('hours_reversed')) * 60 * 60
+ if m.group('days'):
+ res += int(m.group('days')) * 24 * 60 * 60
+ if m.group('ms'):
+ res += float(m.group('ms'))
+ return res
+
+
+def prepend_extension(filename, ext, expected_real_ext=None):
+ name, real_ext = os.path.splitext(filename)
+ return (
+ '{0}.{1}{2}'.format(name, ext, real_ext)
+ if not expected_real_ext or real_ext[1:] == expected_real_ext
+ else '{0}.{1}'.format(filename, ext))
+
+
+def replace_extension(filename, ext, expected_real_ext=None):
+ name, real_ext = os.path.splitext(filename)
+ return '{0}.{1}'.format(
+ name if not expected_real_ext or real_ext[1:] == expected_real_ext else filename,
+ ext)
+
+
+def check_executable(exe, args=[]):
+ """ Checks if the given binary is installed somewhere in PATH, and returns its name.
+ args can be a list of arguments for a short output (like -version) """
+ try:
+ subprocess.Popen([exe] + args, stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
+ except OSError:
+ return False
+ return exe
+
+
+def get_exe_version(exe, args=['--version'],
+ version_re=None, unrecognized='present'):
+ """ Returns the version of the specified executable,
+ or False if the executable is not present """
+ try:
+ out, _ = subprocess.Popen(
+ [encodeArgument(exe)] + args,
+ stdout=subprocess.PIPE, stderr=subprocess.STDOUT).communicate()
+ except OSError:
+ return False
+ if isinstance(out, bytes): # Python 2.x
+ out = out.decode('ascii', 'ignore')
+ return detect_exe_version(out, version_re, unrecognized)
+
+
+def detect_exe_version(output, version_re=None, unrecognized='present'):
+ assert isinstance(output, compat_str)
+ if version_re is None:
+ version_re = r'version\s+([-0-9._a-zA-Z]+)'
+ m = re.search(version_re, output)
+ if m:
+ return m.group(1)
+ else:
+ return unrecognized
+
+
+class PagedList(object):
+ def __len__(self):
+ # This is only useful for tests
+ return len(self.getslice())
+
+
+class OnDemandPagedList(PagedList):
+ def __init__(self, pagefunc, pagesize):
+ self._pagefunc = pagefunc
+ self._pagesize = pagesize
+
+ def getslice(self, start=0, end=None):
+ res = []
+ for pagenum in itertools.count(start // self._pagesize):
+ firstid = pagenum * self._pagesize
+ nextfirstid = pagenum * self._pagesize + self._pagesize
+ if start >= nextfirstid:
+ continue
+
+ page_results = list(self._pagefunc(pagenum))
+
+ startv = (
+ start % self._pagesize
+ if firstid <= start < nextfirstid
+ else 0)
+
+ endv = (
+ ((end - 1) % self._pagesize) + 1
+ if (end is not None and firstid <= end <= nextfirstid)
+ else None)
+
+ if startv != 0 or endv is not None:
+ page_results = page_results[startv:endv]
+ res.extend(page_results)
+
+ # A little optimization - if current page is not "full", ie. does
+ # not contain page_size videos then we can assume that this page
+ # is the last one - there are no more ids on further pages -
+ # i.e. no need to query again.
+ if len(page_results) + startv < self._pagesize:
+ break
+
+ # If we got the whole page, but the next page is not interesting,
+ # break out early as well
+ if end == nextfirstid:
+ break
+ return res
+
+
+class InAdvancePagedList(PagedList):
+ def __init__(self, pagefunc, pagecount, pagesize):
+ self._pagefunc = pagefunc
+ self._pagecount = pagecount
+ self._pagesize = pagesize
+
+ def getslice(self, start=0, end=None):
+ res = []
+ start_page = start // self._pagesize
+ end_page = (
+ self._pagecount if end is None else (end // self._pagesize + 1))
+ skip_elems = start - start_page * self._pagesize
+ only_more = None if end is None else end - start
+ for pagenum in range(start_page, end_page):
+ page = list(self._pagefunc(pagenum))
+ if skip_elems:
+ page = page[skip_elems:]
+ skip_elems = None
+ if only_more is not None:
+ if len(page) < only_more:
+ only_more -= len(page)
+ else:
+ page = page[:only_more]
+ res.extend(page)
+ break
+ res.extend(page)
+ return res
+
+
+def uppercase_escape(s):
+ unicode_escape = codecs.getdecoder('unicode_escape')
+ return re.sub(
+ r'\\U[0-9a-fA-F]{8}',
+ lambda m: unicode_escape(m.group(0))[0],
+ s)
+
+
+def lowercase_escape(s):
+ unicode_escape = codecs.getdecoder('unicode_escape')
+ return re.sub(
+ r'\\u[0-9a-fA-F]{4}',
+ lambda m: unicode_escape(m.group(0))[0],
+ s)
+
+
+def escape_rfc3986(s):
+ """Escape non-ASCII characters as suggested by RFC 3986"""
+ if sys.version_info < (3, 0) and isinstance(s, compat_str):
+ s = s.encode('utf-8')
+ return compat_urllib_parse.quote(s, b"%/;:@&=+$,!~*'()?#[]")
+
+
+def escape_url(url):
+ """Escape URL as suggested by RFC 3986"""
+ url_parsed = compat_urllib_parse_urlparse(url)
+ return url_parsed._replace(
+ path=escape_rfc3986(url_parsed.path),
+ params=escape_rfc3986(url_parsed.params),
+ query=escape_rfc3986(url_parsed.query),
+ fragment=escape_rfc3986(url_parsed.fragment)
+ ).geturl()
+
+try:
+ struct.pack('!I', 0)
+except TypeError:
+ # In Python 2.6 (and some 2.7 versions), struct requires a bytes argument
+ def struct_pack(spec, *args):
+ if isinstance(spec, compat_str):
+ spec = spec.encode('ascii')
+ return struct.pack(spec, *args)
+
+ def struct_unpack(spec, *args):
+ if isinstance(spec, compat_str):
+ spec = spec.encode('ascii')
+ return struct.unpack(spec, *args)
+else:
+ struct_pack = struct.pack
+ struct_unpack = struct.unpack
+
+
+def read_batch_urls(batch_fd):
+ def fixup(url):
+ if not isinstance(url, compat_str):
+ url = url.decode('utf-8', 'replace')
+ BOM_UTF8 = '\xef\xbb\xbf'
+ if url.startswith(BOM_UTF8):
+ url = url[len(BOM_UTF8):]
+ url = url.strip()
+ if url.startswith(('#', ';', ']')):
+ return False
+ return url
+
+ with contextlib.closing(batch_fd) as fd:
+ return [url for url in map(fixup, fd) if url]
+
+
+def urlencode_postdata(*args, **kargs):
+ return compat_urllib_parse.urlencode(*args, **kargs).encode('ascii')
+
+
+try:
+ etree_iter = xml.etree.ElementTree.Element.iter
+except AttributeError: # Python <=2.6
+ etree_iter = lambda n: n.findall('.//*')
+
+
+def parse_xml(s):
+ class TreeBuilder(xml.etree.ElementTree.TreeBuilder):
+ def doctype(self, name, pubid, system):
+ pass # Ignore doctypes
+
+ parser = xml.etree.ElementTree.XMLParser(target=TreeBuilder())
+ kwargs = {'parser': parser} if sys.version_info >= (2, 7) else {}
+ tree = xml.etree.ElementTree.XML(s.encode('utf-8'), **kwargs)
+ # Fix up XML parser in Python 2.x
+ if sys.version_info < (3, 0):
+ for n in etree_iter(tree):
+ if n.text is not None:
+ if not isinstance(n.text, compat_str):
+ n.text = n.text.decode('utf-8')
+ return tree
+
+
+US_RATINGS = {
+ 'G': 0,
+ 'PG': 10,
+ 'PG-13': 13,
+ 'R': 16,
+ 'NC': 18,
+}
+
+
+def parse_age_limit(s):
+ if s is None:
+ return None
+ m = re.match(r'^(?P<age>\d{1,2})\+?$', s)
+ return int(m.group('age')) if m else US_RATINGS.get(s, None)
+
+
+def strip_jsonp(code):
+ return re.sub(
+ r'(?s)^[a-zA-Z0-9_]+\s*\(\s*(.*)\);?\s*?(?://[^\n]*)*$', r'\1', code)
+
+
+def js_to_json(code):
+ def fix_kv(m):
+ v = m.group(0)
+ if v in ('true', 'false', 'null'):
+ return v
+ if v.startswith('"'):
+ return v
+ if v.startswith("'"):
+ v = v[1:-1]
+ v = re.sub(r"\\\\|\\'|\"", lambda m: {
+ '\\\\': '\\\\',
+ "\\'": "'",
+ '"': '\\"',
+ }[m.group(0)], v)
+ return '"%s"' % v
+
+ res = re.sub(r'''(?x)
+ "(?:[^"\\]*(?:\\\\|\\['"nu]))*[^"\\]*"|
+ '(?:[^'\\]*(?:\\\\|\\['"nu]))*[^'\\]*'|
+ [a-zA-Z_][.a-zA-Z_0-9]*
+ ''', fix_kv, code)
+ res = re.sub(r',(\s*[\]}])', lambda m: m.group(1), res)
+ return res
+
+
+def qualities(quality_ids):
+ """ Get a numeric quality value out of a list of possible values """
+ def q(qid):
+ try:
+ return quality_ids.index(qid)
+ except ValueError:
+ return -1
+ return q
+
+
+DEFAULT_OUTTMPL = '%(title)s-%(id)s.%(ext)s'
+
+
+def limit_length(s, length):
+ """ Add ellipses to overly long strings """
+ if s is None:
+ return None
+ ELLIPSES = '...'
+ if len(s) > length:
+ return s[:length - len(ELLIPSES)] + ELLIPSES
+ return s
+
+
+def version_tuple(v):
+ return tuple(int(e) for e in re.split(r'[-.]', v))
+
+
+def is_outdated_version(version, limit, assume_new=True):
+ if not version:
+ return not assume_new
+ try:
+ return version_tuple(version) < version_tuple(limit)
+ except ValueError:
+ return not assume_new
+
+
+def ytdl_is_updateable():
+ """ Returns if youtube-dl can be updated with -U """
+ from zipimport import zipimporter
+
+ return isinstance(globals().get('__loader__'), zipimporter) or hasattr(sys, 'frozen')
+
+
+def args_to_str(args):
+ # Get a short string representation for a subprocess command
+ return ' '.join(shlex_quote(a) for a in args)
+
+
+def mimetype2ext(mt):
+ _, _, res = mt.rpartition('/')
+
+ return {
+ 'x-ms-wmv': 'wmv',
+ 'x-mp4-fragmented': 'mp4',
+ 'ttml+xml': 'ttml',
+ }.get(res, res)
+
+
+def urlhandle_detect_ext(url_handle):
+ try:
+ url_handle.headers
+ getheader = lambda h: url_handle.headers[h]
+ except AttributeError: # Python < 3
+ getheader = url_handle.info().getheader
+
+ cd = getheader('Content-Disposition')
+ if cd:
+ m = re.match(r'attachment;\s*filename="(?P<filename>[^"]+)"', cd)
+ if m:
+ e = determine_ext(m.group('filename'), default_ext=None)
+ if e:
+ return e
+
+ return mimetype2ext(getheader('Content-Type'))
+
+
+def age_restricted(content_limit, age_limit):
+ """ Returns True iff the content should be blocked """
+
+ if age_limit is None: # No limit set
+ return False
+ if content_limit is None:
+ return False # Content available for everyone
+ return age_limit < content_limit
+
+
+def is_html(first_bytes):
+ """ Detect whether a file contains HTML by examining its first bytes. """
+
+ BOMS = [
+ (b'\xef\xbb\xbf', 'utf-8'),
+ (b'\x00\x00\xfe\xff', 'utf-32-be'),
+ (b'\xff\xfe\x00\x00', 'utf-32-le'),
+ (b'\xff\xfe', 'utf-16-le'),
+ (b'\xfe\xff', 'utf-16-be'),
+ ]
+ for bom, enc in BOMS:
+ if first_bytes.startswith(bom):
+ s = first_bytes[len(bom):].decode(enc, 'replace')
+ break
+ else:
+ s = first_bytes.decode('utf-8', 'replace')
+
+ return re.match(r'^\s*<', s)
+
+
+def determine_protocol(info_dict):
+ protocol = info_dict.get('protocol')
+ if protocol is not None:
+ return protocol
+
+ url = info_dict['url']
+ if url.startswith('rtmp'):
+ return 'rtmp'
+ elif url.startswith('mms'):
+ return 'mms'
+ elif url.startswith('rtsp'):
+ return 'rtsp'
+
+ ext = determine_ext(url)
+ if ext == 'm3u8':
+ return 'm3u8'
+ elif ext == 'f4m':
+ return 'f4m'
+
+ return compat_urllib_parse_urlparse(url).scheme
+
+
+def render_table(header_row, data):
+ """ Render a list of rows, each as a list of values """
+ table = [header_row] + data
+ max_lens = [max(len(compat_str(v)) for v in col) for col in zip(*table)]
+ format_str = ' '.join('%-' + compat_str(ml + 1) + 's' for ml in max_lens[:-1]) + '%s'
+ return '\n'.join(format_str % tuple(row) for row in table)
+
+
+def _match_one(filter_part, dct):
+ COMPARISON_OPERATORS = {
+ '<': operator.lt,
+ '<=': operator.le,
+ '>': operator.gt,
+ '>=': operator.ge,
+ '=': operator.eq,
+ '!=': operator.ne,
+ }
+ operator_rex = re.compile(r'''(?x)\s*
+ (?P<key>[a-z_]+)
+ \s*(?P<op>%s)(?P<none_inclusive>\s*\?)?\s*
+ (?:
+ (?P<intval>[0-9.]+(?:[kKmMgGtTpPeEzZyY]i?[Bb]?)?)|
+ (?P<strval>(?![0-9.])[a-z0-9A-Z]*)
+ )
+ \s*$
+ ''' % '|'.join(map(re.escape, COMPARISON_OPERATORS.keys())))
+ m = operator_rex.search(filter_part)
+ if m:
+ op = COMPARISON_OPERATORS[m.group('op')]
+ if m.group('strval') is not None:
+ if m.group('op') not in ('=', '!='):
+ raise ValueError(
+ 'Operator %s does not support string values!' % m.group('op'))
+ comparison_value = m.group('strval')
+ else:
+ try:
+ comparison_value = int(m.group('intval'))
+ except ValueError:
+ comparison_value = parse_filesize(m.group('intval'))
+ if comparison_value is None:
+ comparison_value = parse_filesize(m.group('intval') + 'B')
+ if comparison_value is None:
+ raise ValueError(
+ 'Invalid integer value %r in filter part %r' % (
+ m.group('intval'), filter_part))
+ actual_value = dct.get(m.group('key'))
+ if actual_value is None:
+ return m.group('none_inclusive')
+ return op(actual_value, comparison_value)
+
+ UNARY_OPERATORS = {
+ '': lambda v: v is not None,
+ '!': lambda v: v is None,
+ }
+ operator_rex = re.compile(r'''(?x)\s*
+ (?P<op>%s)\s*(?P<key>[a-z_]+)
+ \s*$
+ ''' % '|'.join(map(re.escape, UNARY_OPERATORS.keys())))
+ m = operator_rex.search(filter_part)
+ if m:
+ op = UNARY_OPERATORS[m.group('op')]
+ actual_value = dct.get(m.group('key'))
+ return op(actual_value)
+
+ raise ValueError('Invalid filter part %r' % filter_part)
+
+
+def match_str(filter_str, dct):
+ """ Filter a dictionary with a simple string syntax. Returns True (=passes filter) or false """
+
+ return all(
+ _match_one(filter_part, dct) for filter_part in filter_str.split('&'))
+
+
+def match_filter_func(filter_str):
+ def _match_func(info_dict):
+ if match_str(filter_str, info_dict):
+ return None
+ else:
+ video_title = info_dict.get('title', info_dict.get('id', 'video'))
+ return '%s does not pass filter %s, skipping ..' % (video_title, filter_str)
+ return _match_func
+
+
+def parse_dfxp_time_expr(time_expr):
+ if not time_expr:
+ return 0.0
+
+ mobj = re.match(r'^(?P<time_offset>\d+(?:\.\d+)?)s?$', time_expr)
+ if mobj:
+ return float(mobj.group('time_offset'))
+
+ mobj = re.match(r'^(\d+):(\d\d):(\d\d(?:\.\d+)?)$', time_expr)
+ if mobj:
+ return 3600 * int(mobj.group(1)) + 60 * int(mobj.group(2)) + float(mobj.group(3))
+
+
+def srt_subtitles_timecode(seconds):
+ return '%02d:%02d:%02d,%03d' % (seconds / 3600, (seconds % 3600) / 60, seconds % 60, (seconds % 1) * 1000)
+
+
+def dfxp2srt(dfxp_data):
+ _x = functools.partial(xpath_with_ns, ns_map={'ttml': 'http://www.w3.org/ns/ttml'})
+
+ def parse_node(node):
+ str_or_empty = functools.partial(str_or_none, default='')
+
+ out = str_or_empty(node.text)
+
+ for child in node:
+ if child.tag in (_x('ttml:br'), 'br'):
+ out += '\n' + str_or_empty(child.tail)
+ elif child.tag in (_x('ttml:span'), 'span'):
+ out += str_or_empty(parse_node(child))
+ else:
+ out += str_or_empty(xml.etree.ElementTree.tostring(child))
+
+ return out
+
+ dfxp = xml.etree.ElementTree.fromstring(dfxp_data.encode('utf-8'))
+ out = []
+ paras = dfxp.findall(_x('.//ttml:p')) or dfxp.findall('.//p')
+
+ if not paras:
+ raise ValueError('Invalid dfxp/TTML subtitle')
+
+ for para, index in zip(paras, itertools.count(1)):
+ begin_time = parse_dfxp_time_expr(para.attrib['begin'])
+ end_time = parse_dfxp_time_expr(para.attrib.get('end'))
+ if not end_time:
+ end_time = begin_time + parse_dfxp_time_expr(para.attrib['dur'])
+ out.append('%d\n%s --> %s\n%s\n\n' % (
+ index,
+ srt_subtitles_timecode(begin_time),
+ srt_subtitles_timecode(end_time),
+ parse_node(para)))
+
+ return ''.join(out)
+
+
+class PerRequestProxyHandler(compat_urllib_request.ProxyHandler):
+ def __init__(self, proxies=None):
+ # Set default handlers
+ for type in ('http', 'https'):
+ setattr(self, '%s_open' % type,
+ lambda r, proxy='__noproxy__', type=type, meth=self.proxy_open:
+ meth(r, proxy, type))
+ return compat_urllib_request.ProxyHandler.__init__(self, proxies)
+
+ def proxy_open(self, req, proxy, type):
+ req_proxy = req.headers.get('Ytdl-request-proxy')
+ if req_proxy is not None:
+ proxy = req_proxy
+ del req.headers['Ytdl-request-proxy']
+
+ if proxy == '__noproxy__':
+ return None # No Proxy
+ return compat_urllib_request.ProxyHandler.proxy_open(
+ self, req, proxy, type)