X-Git-Url: https://git.rapsys.eu/youtubedl/blobdiff_plain/54477c858fa2504360964e675faf953d78ac57e6..deb336935f1c0705845cf0fec03f36bf9e6e9166:/youtube_dl/utils.py?ds=sidebyside diff --git a/youtube_dl/utils.py b/youtube_dl/utils.py index bfb8f6b..057cd20 100644 --- a/youtube_dl/utils.py +++ b/youtube_dl/utils.py @@ -1,18 +1,24 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- +import ctypes import datetime import email.utils import errno import gzip +import itertools import io import json import locale +import math import os import pipes import platform import re +import ssl import socket +import struct +import subprocess import sys import traceback import zlib @@ -220,7 +226,7 @@ if sys.version_info >= (2,7): def find_xpath_attr(node, xpath, key, val): """ Find the xpath xpath[@key=val] """ assert re.match(r'^[a-zA-Z]+$', key) - assert re.match(r'^[a-zA-Z0-9@\s]*$', val) + assert re.match(r'^[a-zA-Z0-9@\s:._]*$', val) expr = xpath + u"[@%s='%s']" % (key, val) return node.find(expr) else: @@ -496,12 +502,13 @@ def unescapeHTML(s): result = re.sub(u'(?u)&(.+?);', htmlentity_transform, s) return result -def encodeFilename(s): + +def encodeFilename(s, for_subprocess=False): """ @param s The name of the file """ - assert type(s) == type(u'') + assert type(s) == compat_str # Python 3 has a Unicode API if sys.version_info >= (3, 0): @@ -511,12 +518,18 @@ def encodeFilename(s): # Pass u'' directly to use Unicode APIs on Windows 2000 and up # (Detecting Windows NT 4 is tricky because 'major >= 4' would # match Windows 9x series as well. Besides, NT 4 is obsolete.) - return s + if not for_subprocess: + return s + else: + # For subprocess calls, encode with locale encoding + # Refer to http://stackoverflow.com/a/9951851/35070 + encoding = preferredencoding() else: encoding = sys.getfilesystemencoding() - if encoding is None: - encoding = 'utf-8' - return s.encode(encoding, 'ignore') + if encoding is None: + encoding = 'utf-8' + return s.encode(encoding, 'ignore') + def decodeOption(optval): if optval is None: @@ -535,19 +548,40 @@ def formatSeconds(secs): else: return '%d' % secs -def make_HTTPS_handler(opts): - if sys.version_info < (3,2): - # Python's 2.x handler is very simplistic - return compat_urllib_request.HTTPSHandler() + +def make_HTTPS_handler(opts_no_check_certificate, **kwargs): + if sys.version_info < (3, 2): + import httplib + + class HTTPSConnectionV3(httplib.HTTPSConnection): + def __init__(self, *args, **kwargs): + httplib.HTTPSConnection.__init__(self, *args, **kwargs) + + def connect(self): + sock = socket.create_connection((self.host, self.port), self.timeout) + if getattr(self, '_tunnel_host', False): + self.sock = sock + self._tunnel() + try: + self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file, ssl_version=ssl.PROTOCOL_SSLv3) + except ssl.SSLError: + self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file, ssl_version=ssl.PROTOCOL_SSLv23) + + class HTTPSHandlerV3(compat_urllib_request.HTTPSHandler): + def https_open(self, req): + return self.do_open(HTTPSConnectionV3, req) + return HTTPSHandlerV3(**kwargs) else: - import ssl - context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) - context.set_default_verify_paths() - + context = ssl.SSLContext(ssl.PROTOCOL_SSLv3) context.verify_mode = (ssl.CERT_NONE - if opts.no_check_certificate + if opts_no_check_certificate else ssl.CERT_REQUIRED) - return compat_urllib_request.HTTPSHandler(context=context) + context.set_default_verify_paths() + try: + context.load_default_certs() + except AttributeError: + pass # Python < 3.4 + return compat_urllib_request.HTTPSHandler(context=context, **kwargs) class ExtractorError(Exception): """Error during info extraction.""" @@ -572,6 +606,11 @@ class ExtractorError(Exception): return u''.join(traceback.format_tb(self.traceback)) +class RegexNotFoundError(ExtractorError): + """Error when a regex didn't match""" + pass + + class DownloadError(Exception): """Download Error exception. @@ -713,29 +752,39 @@ class YoutubeDLHandler(compat_urllib_request.HTTPHandler): https_request = http_request https_response = http_response + def unified_strdate(date_str): """Return a string with the date in the format YYYYMMDD""" upload_date = None #Replace commas - date_str = date_str.replace(',',' ') + date_str = date_str.replace(',', ' ') # %z (UTC offset) is only supported in python>=3.2 - date_str = re.sub(r' (\+|-)[\d]*$', '', date_str) + date_str = re.sub(r' ?(\+|-)[0-9]{2}:?[0-9]{2}$', '', date_str) format_expressions = [ '%d %B %Y', + '%d %b %Y', '%B %d %Y', '%b %d %Y', '%Y-%m-%d', '%d/%m/%Y', '%Y/%m/%d %H:%M:%S', + '%Y-%m-%d %H:%M:%S', '%d.%m.%Y %H:%M', '%Y-%m-%dT%H:%M:%SZ', + '%Y-%m-%dT%H:%M:%S.%fZ', + '%Y-%m-%dT%H:%M:%S.%f0Z', '%Y-%m-%dT%H:%M:%S', + '%Y-%m-%dT%H:%M', ] for expression in format_expressions: try: upload_date = datetime.datetime.strptime(date_str, expression).strftime('%Y%m%d') - except: + except ValueError: pass + if upload_date is None: + timetuple = email.utils.parsedate_tz(date_str) + if timetuple: + upload_date = datetime.datetime(*timetuple[:6]).strftime('%Y%m%d') return upload_date def determine_ext(url, default_ext=u'unknown_video'): @@ -774,6 +823,15 @@ def date_from_str(date_str): return today + delta return datetime.datetime.strptime(date_str, "%Y%m%d").date() +def hyphenate_date(date_str): + """ + Convert a date in 'YYYYMMDD' format to 'YYYY-MM-DD' format""" + match = re.match(r'^(\d\d\d\d)(\d\d)(\d\d)$', date_str) + if match is not None: + return '-'.join(match.groups()) + else: + return date_str + class DateRange(object): """Represents a time interval between two dates""" def __init__(self, start=None, end=None): @@ -814,12 +872,22 @@ def platform_name(): def write_string(s, out=None): if out is None: out = sys.stderr - assert type(s) == type(u'') + assert type(s) == compat_str if ('b' in getattr(out, 'mode', '') or sys.version_info[0] < 3): # Python 2 lies about mode of sys.stderr s = s.encode(preferredencoding(), 'ignore') - out.write(s) + try: + out.write(s) + except UnicodeEncodeError: + # In Windows shells, this can fail even when the codec is just charmap!? + # See https://wiki.python.org/moin/PrintFails#Issue + if sys.platform == 'win32' and hasattr(out, 'encoding'): + s = s.encode(out.encoding, 'ignore').decode(out.encoding) + out.write(s) + else: + raise + out.flush() @@ -944,7 +1012,16 @@ class locked_file(object): def shell_quote(args): - return ' '.join(map(pipes.quote, args)) + quoted_args = [] + encoding = sys.getfilesystemencoding() + if encoding is None: + encoding = 'utf-8' + for a in args: + if isinstance(a, bytes): + # We may get a filename encoded with 'encodeFilename' + a = a.decode(encoding) + quoted_args.append(pipes.quote(a)) + return u' '.join(quoted_args) def takewhile_inclusive(pred, seq): @@ -964,10 +1041,201 @@ def smuggle_url(url, data): return url + u'#' + sdata -def unsmuggle_url(smug_url): +def unsmuggle_url(smug_url, default=None): if not '#__youtubedl_smuggle' in smug_url: - return smug_url, None + return smug_url, default url, _, sdata = smug_url.rpartition(u'#') jsond = compat_parse_qs(sdata)[u'__youtubedl_smuggle'][0] data = json.loads(jsond) return url, data + + +def format_bytes(bytes): + if bytes is None: + return u'N/A' + if type(bytes) is str: + bytes = float(bytes) + if bytes == 0.0: + exponent = 0 + else: + exponent = int(math.log(bytes, 1024.0)) + suffix = [u'B', u'KiB', u'MiB', u'GiB', u'TiB', u'PiB', u'EiB', u'ZiB', u'YiB'][exponent] + converted = float(bytes) / float(1024 ** exponent) + return u'%.2f%s' % (converted, suffix) + + +def str_to_int(int_str): + int_str = re.sub(r'[,\.]', u'', int_str) + return int(int_str) + + +def get_term_width(): + columns = os.environ.get('COLUMNS', None) + if columns: + return int(columns) + + try: + sp = subprocess.Popen( + ['stty', 'size'], + stdout=subprocess.PIPE, stderr=subprocess.PIPE) + out, err = sp.communicate() + return int(out.split()[1]) + except: + pass + return None + + +def month_by_name(name): + """ Return the number of a month by (locale-independently) English name """ + + ENGLISH_NAMES = [ + u'January', u'February', u'March', u'April', u'May', u'June', + u'July', u'August', u'September', u'October', u'November', u'December'] + try: + return ENGLISH_NAMES.index(name) + 1 + except ValueError: + return None + + +def fix_xml_ampersands(xml_str): + """Replace all the '&' by '&' in XML""" + return re.sub( + r'&(?!amp;|lt;|gt;|apos;|quot;|#x[0-9a-fA-F]{,4};|#[0-9]{,4};)', + u'&', + xml_str) + + +def setproctitle(title): + assert isinstance(title, compat_str) + try: + libc = ctypes.cdll.LoadLibrary("libc.so.6") + except OSError: + return + title = title + buf = ctypes.create_string_buffer(len(title) + 1) + buf.value = title.encode('utf-8') + try: + libc.prctl(15, ctypes.byref(buf), 0, 0, 0) + except AttributeError: + return # Strange libc, just skip this + + +def remove_start(s, start): + if s.startswith(start): + return s[len(start):] + return s + + +def url_basename(url): + path = compat_urlparse.urlparse(url).path + return path.strip(u'/').split(u'/')[-1] + + +class HEADRequest(compat_urllib_request.Request): + def get_method(self): + return "HEAD" + + +def int_or_none(v, scale=1): + return v if v is None else (int(v) // scale) + + +def parse_duration(s): + if s is None: + return None + + m = re.match( + r'(?:(?:(?P[0-9]+)[:h])?(?P[0-9]+)[:m])?(?P[0-9]+)s?$', s) + if not m: + return None + res = int(m.group('secs')) + if m.group('mins'): + res += int(m.group('mins')) * 60 + if m.group('hours'): + res += int(m.group('hours')) * 60 * 60 + return res + + +def prepend_extension(filename, ext): + name, real_ext = os.path.splitext(filename) + return u'{0}.{1}{2}'.format(name, ext, real_ext) + + +def check_executable(exe, args=[]): + """ Checks if the given binary is installed somewhere in PATH, and returns its name. + args can be a list of arguments for a short output (like -version) """ + try: + subprocess.Popen([exe] + args, stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate() + except OSError: + return False + return exe + + +class PagedList(object): + def __init__(self, pagefunc, pagesize): + self._pagefunc = pagefunc + self._pagesize = pagesize + + def __len__(self): + # This is only useful for tests + return len(self.getslice()) + + def getslice(self, start=0, end=None): + res = [] + for pagenum in itertools.count(start // self._pagesize): + firstid = pagenum * self._pagesize + nextfirstid = pagenum * self._pagesize + self._pagesize + if start >= nextfirstid: + continue + + page_results = list(self._pagefunc(pagenum)) + + startv = ( + start % self._pagesize + if firstid <= start < nextfirstid + else 0) + + endv = ( + ((end - 1) % self._pagesize) + 1 + if (end is not None and firstid <= end <= nextfirstid) + else None) + + if startv != 0 or endv is not None: + page_results = page_results[startv:endv] + res.extend(page_results) + + # A little optimization - if current page is not "full", ie. does + # not contain page_size videos then we can assume that this page + # is the last one - there are no more ids on further pages - + # i.e. no need to query again. + if len(page_results) + startv < self._pagesize: + break + + # If we got the whole page, but the next page is not interesting, + # break out early as well + if end == nextfirstid: + break + return res + + +def uppercase_escape(s): + return re.sub( + r'\\U([0-9a-fA-F]{8})', + lambda m: compat_chr(int(m.group(1), base=16)), s) + +try: + struct.pack(u'!I', 0) +except TypeError: + # In Python 2.6 (and some 2.7 versions), struct requires a bytes argument + def struct_pack(spec, *args): + if isinstance(spec, compat_str): + spec = spec.encode('ascii') + return struct.pack(spec, *args) + + def struct_unpack(spec, *args): + if isinstance(spec, compat_str): + spec = spec.encode('ascii') + return struct.unpack(spec, *args) +else: + struct_pack = struct.pack + struct_unpack = struct.unpack