#!/usr/bin/env python
# -*- coding: utf-8 -*-
+import datetime
+import email.utils
import errno
import gzip
import io
import json
import locale
+import math
import os
+import pipes
+import platform
import re
+import ssl
+import socket
import sys
import traceback
+import xml.etree.ElementTree
import zlib
-import email.utils
-import socket
-import datetime
try:
import urllib.request as compat_urllib_request
except ImportError: # Python 2
import httplib as compat_http_client
+try:
+ from urllib.error import HTTPError as compat_HTTPError
+except ImportError: # Python 2
+ from urllib2 import HTTPError as compat_HTTPError
+
+try:
+ from urllib.request import urlretrieve as compat_urlretrieve
+except ImportError: # Python 2
+ from urllib import urlretrieve as compat_urlretrieve
+
+
try:
from subprocess import DEVNULL
compat_subprocess_get_DEVNULL = lambda: DEVNULL
compiled_regex_type = type(re.compile(''))
std_headers = {
- 'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:10.0) Gecko/20100101 Firefox/10.0',
+ 'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:10.0) Gecko/20100101 Firefox/10.0 (Chrome)',
'Accept-Charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.7',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Encoding': 'gzip, deflate',
def find_xpath_attr(node, xpath, key, val):
""" Find the xpath xpath[@key=val] """
assert re.match(r'^[a-zA-Z]+$', key)
- assert re.match(r'^[a-zA-Z@\s]*$', val)
+ assert re.match(r'^[a-zA-Z0-9@\s]*$', val)
expr = xpath + u"[@%s='%s']" % (key, val)
return node.find(expr)
else:
return f
return None
+# On python2.6 the xml.etree.ElementTree.Element methods don't support
+# the namespace parameter
+def xpath_with_ns(path, ns_map):
+ components = [c.split(':') for c in path.split('/')]
+ replaced = []
+ for c in components:
+ if len(c) == 1:
+ replaced.append(c[0])
+ else:
+ ns, tag = c
+ replaced.append('{%s}%s' % (ns_map[ns], tag))
+ return '/'.join(replaced)
+
def htmlentity_transform(matchobj):
"""Transforms an HTML entity to a character.
return (u'&%s;' % entity)
compat_html_parser.locatestarttagend = re.compile(r"""<[a-zA-Z][-.a-zA-Z0-9:_]*(?:\s+(?:(?<=['"\s])[^\s/>][^\s/=>]*(?:\s*=+\s*(?:'[^']*'|"[^"]*"|(?!['"])[^>\s]*))?\s*)*)?\s*""", re.VERBOSE) # backport bugfix
-class AttrParser(compat_html_parser.HTMLParser):
+class BaseHTMLParser(compat_html_parser.HTMLParser):
+ def __init(self):
+ compat_html_parser.HTMLParser.__init__(self)
+ self.html = None
+
+ def loads(self, html):
+ self.html = html
+ self.feed(html)
+ self.close()
+
+class AttrParser(BaseHTMLParser):
"""Modified HTMLParser that isolates a tag with the specified attribute"""
def __init__(self, attribute, value):
self.attribute = attribute
self.result = None
self.started = False
self.depth = {}
- self.html = None
self.watch_startpos = False
self.error_count = 0
- compat_html_parser.HTMLParser.__init__(self)
+ BaseHTMLParser.__init__(self)
def error(self, message):
if self.error_count > 10 or self.started:
self.error_count += 1
self.goahead(1)
- def loads(self, html):
- self.html = html
- self.feed(html)
- self.close()
-
def handle_starttag(self, tag, attrs):
attrs = dict(attrs)
if self.started:
pass
return parser.get_result()
+class MetaParser(BaseHTMLParser):
+ """
+ Modified HTMLParser that isolates a meta tag with the specified name
+ attribute.
+ """
+ def __init__(self, name):
+ BaseHTMLParser.__init__(self)
+ self.name = name
+ self.content = None
+ self.result = None
+
+ def handle_starttag(self, tag, attrs):
+ if tag != 'meta':
+ return
+ attrs = dict(attrs)
+ if attrs.get('name') == self.name:
+ self.result = attrs.get('content')
+
+ def get_result(self):
+ return self.result
+
+def get_meta_content(name, html):
+ """
+ Return the content attribute from the meta tag with the given name attribute.
+ """
+ parser = MetaParser(name)
+ try:
+ parser.loads(html)
+ except compat_html_parser.HTMLParseError:
+ pass
+ return parser.get_result()
+
def clean_html(html):
"""Clean an HTML snippet into a readable string"""
else:
return '%d' % secs
-def make_HTTPS_handler(opts):
- if sys.version_info < (3,2):
- # Python's 2.x handler is very simplistic
- return compat_urllib_request.HTTPSHandler()
+def make_HTTPS_handler(opts_no_check_certificate):
+ if sys.version_info < (3, 2):
+ import httplib
+
+ class HTTPSConnectionV3(httplib.HTTPSConnection):
+ def __init__(self, *args, **kwargs):
+ httplib.HTTPSConnection.__init__(self, *args, **kwargs)
+
+ def connect(self):
+ sock = socket.create_connection((self.host, self.port), self.timeout)
+ if self._tunnel_host:
+ self.sock = sock
+ self._tunnel()
+ try:
+ self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file, ssl_version=ssl.PROTOCOL_SSLv3)
+ except ssl.SSLError:
+ self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file, ssl_version=ssl.PROTOCOL_SSLv23)
+
+ class HTTPSHandlerV3(compat_urllib_request.HTTPSHandler):
+ def https_open(self, req):
+ return self.do_open(HTTPSConnectionV3, req)
+ return HTTPSHandlerV3()
else:
- import ssl
- context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+ context = ssl.SSLContext(ssl.PROTOCOL_SSLv3)
context.set_default_verify_paths()
context.verify_mode = (ssl.CERT_NONE
- if opts.no_check_certificate
+ if opts_no_check_certificate
else ssl.CERT_REQUIRED)
return compat_urllib_request.HTTPSHandler(context=context)
class ExtractorError(Exception):
"""Error during info extraction."""
- def __init__(self, msg, tb=None, expected=False):
+ def __init__(self, msg, tb=None, expected=False, cause=None):
""" tb, if given, is the original traceback (so that it can be printed out).
If expected is set, this is a normal error message and most likely not a bug in youtube-dl.
"""
self.traceback = tb
self.exc_info = sys.exc_info() # preserve original exception
+ self.cause = cause
def format_traceback(self):
if self.traceback is None:
return u''.join(traceback.format_tb(self.traceback))
+class RegexNotFoundError(ExtractorError):
+ """Error when a regex didn't match"""
+ pass
+
+
class DownloadError(Exception):
"""Download Error exception.
old_resp = resp
# gzip
if resp.headers.get('Content-encoding', '') == 'gzip':
- gz = gzip.GzipFile(fileobj=io.BytesIO(resp.read()), mode='r')
- resp = self.addinfourl_wrapper(gz, old_resp.headers, old_resp.url, old_resp.code)
+ content = resp.read()
+ gz = gzip.GzipFile(fileobj=io.BytesIO(content), mode='rb')
+ try:
+ uncompressed = io.BytesIO(gz.read())
+ except IOError as original_ioerror:
+ # There may be junk add the end of the file
+ # See http://stackoverflow.com/q/4928560/35070 for details
+ for i in range(1, 1024):
+ try:
+ gz = gzip.GzipFile(fileobj=io.BytesIO(content[:-i]), mode='rb')
+ uncompressed = io.BytesIO(gz.read())
+ except IOError:
+ continue
+ break
+ else:
+ raise original_ioerror
+ resp = self.addinfourl_wrapper(uncompressed, old_resp.headers, old_resp.url, old_resp.code)
resp.msg = old_resp.msg
# deflate
if resp.headers.get('Content-encoding', '') == 'deflate':
date_str = date_str.replace(',',' ')
# %z (UTC offset) is only supported in python>=3.2
date_str = re.sub(r' (\+|-)[\d]*$', '', date_str)
- format_expressions = ['%d %B %Y', '%B %d %Y', '%b %d %Y', '%Y-%m-%d', '%d/%m/%Y', '%Y/%m/%d %H:%M:%S', '%d.%m.%Y %H:%M']
+ format_expressions = [
+ '%d %B %Y',
+ '%B %d %Y',
+ '%b %d %Y',
+ '%Y-%m-%d',
+ '%d/%m/%Y',
+ '%Y/%m/%d %H:%M:%S',
+ '%d.%m.%Y %H:%M',
+ '%Y-%m-%dT%H:%M:%SZ',
+ '%Y-%m-%dT%H:%M:%S.%fZ',
+ '%Y-%m-%dT%H:%M:%S.%f0Z',
+ '%Y-%m-%dT%H:%M:%S',
+ ]
for expression in format_expressions:
try:
upload_date = datetime.datetime.strptime(date_str, expression).strftime('%Y%m%d')
else:
return default_ext
+def subtitles_filename(filename, sub_lang, sub_format):
+ return filename.rsplit('.', 1)[0] + u'.' + sub_lang + u'.' + sub_format
+
def date_from_str(date_str):
"""
Return a datetime object from a string in the format YYYYMMDD or
return self.start <= date <= self.end
def __str__(self):
return '%s - %s' % ( self.start.isoformat(), self.end.isoformat())
+
+
+def platform_name():
+ """ Returns the platform name as a compat_str """
+ res = platform.platform()
+ if isinstance(res, bytes):
+ res = res.decode(preferredencoding())
+
+ assert isinstance(res, compat_str)
+ return res
+
+
+def write_string(s, out=None):
+ if out is None:
+ out = sys.stderr
+ assert type(s) == type(u'')
+
+ if ('b' in getattr(out, 'mode', '') or
+ sys.version_info[0] < 3): # Python 2 lies about mode of sys.stderr
+ s = s.encode(preferredencoding(), 'ignore')
+ out.write(s)
+ out.flush()
+
+
+def bytes_to_intlist(bs):
+ if not bs:
+ return []
+ if isinstance(bs[0], int): # Python 3
+ return list(bs)
+ else:
+ return [ord(c) for c in bs]
+
+
+def intlist_to_bytes(xs):
+ if not xs:
+ return b''
+ if isinstance(chr(0), bytes): # Python 2
+ return ''.join([chr(x) for x in xs])
+ else:
+ return bytes(xs)
+
+
+def get_cachedir(params={}):
+ cache_root = os.environ.get('XDG_CACHE_HOME',
+ os.path.expanduser('~/.cache'))
+ return params.get('cachedir', os.path.join(cache_root, 'youtube-dl'))
+
+
+# Cross-platform file locking
+if sys.platform == 'win32':
+ import ctypes.wintypes
+ import msvcrt
+
+ class OVERLAPPED(ctypes.Structure):
+ _fields_ = [
+ ('Internal', ctypes.wintypes.LPVOID),
+ ('InternalHigh', ctypes.wintypes.LPVOID),
+ ('Offset', ctypes.wintypes.DWORD),
+ ('OffsetHigh', ctypes.wintypes.DWORD),
+ ('hEvent', ctypes.wintypes.HANDLE),
+ ]
+
+ kernel32 = ctypes.windll.kernel32
+ LockFileEx = kernel32.LockFileEx
+ LockFileEx.argtypes = [
+ ctypes.wintypes.HANDLE, # hFile
+ ctypes.wintypes.DWORD, # dwFlags
+ ctypes.wintypes.DWORD, # dwReserved
+ ctypes.wintypes.DWORD, # nNumberOfBytesToLockLow
+ ctypes.wintypes.DWORD, # nNumberOfBytesToLockHigh
+ ctypes.POINTER(OVERLAPPED) # Overlapped
+ ]
+ LockFileEx.restype = ctypes.wintypes.BOOL
+ UnlockFileEx = kernel32.UnlockFileEx
+ UnlockFileEx.argtypes = [
+ ctypes.wintypes.HANDLE, # hFile
+ ctypes.wintypes.DWORD, # dwReserved
+ ctypes.wintypes.DWORD, # nNumberOfBytesToLockLow
+ ctypes.wintypes.DWORD, # nNumberOfBytesToLockHigh
+ ctypes.POINTER(OVERLAPPED) # Overlapped
+ ]
+ UnlockFileEx.restype = ctypes.wintypes.BOOL
+ whole_low = 0xffffffff
+ whole_high = 0x7fffffff
+
+ def _lock_file(f, exclusive):
+ overlapped = OVERLAPPED()
+ overlapped.Offset = 0
+ overlapped.OffsetHigh = 0
+ overlapped.hEvent = 0
+ f._lock_file_overlapped_p = ctypes.pointer(overlapped)
+ handle = msvcrt.get_osfhandle(f.fileno())
+ if not LockFileEx(handle, 0x2 if exclusive else 0x0, 0,
+ whole_low, whole_high, f._lock_file_overlapped_p):
+ raise OSError('Locking file failed: %r' % ctypes.FormatError())
+
+ def _unlock_file(f):
+ assert f._lock_file_overlapped_p
+ handle = msvcrt.get_osfhandle(f.fileno())
+ if not UnlockFileEx(handle, 0,
+ whole_low, whole_high, f._lock_file_overlapped_p):
+ raise OSError('Unlocking file failed: %r' % ctypes.FormatError())
+
+else:
+ import fcntl
+
+ def _lock_file(f, exclusive):
+ fcntl.lockf(f, fcntl.LOCK_EX if exclusive else fcntl.LOCK_SH)
+
+ def _unlock_file(f):
+ fcntl.lockf(f, fcntl.LOCK_UN)
+
+
+class locked_file(object):
+ def __init__(self, filename, mode, encoding=None):
+ assert mode in ['r', 'a', 'w']
+ self.f = io.open(filename, mode, encoding=encoding)
+ self.mode = mode
+
+ def __enter__(self):
+ exclusive = self.mode != 'r'
+ try:
+ _lock_file(self.f, exclusive)
+ except IOError:
+ self.f.close()
+ raise
+ return self
+
+ def __exit__(self, etype, value, traceback):
+ try:
+ _unlock_file(self.f)
+ finally:
+ self.f.close()
+
+ def __iter__(self):
+ return iter(self.f)
+
+ def write(self, *args):
+ return self.f.write(*args)
+
+ def read(self, *args):
+ return self.f.read(*args)
+
+
+def shell_quote(args):
+ quoted_args = []
+ encoding = sys.getfilesystemencoding()
+ if encoding is None:
+ encoding = 'utf-8'
+ for a in args:
+ if isinstance(a, bytes):
+ # We may get a filename encoded with 'encodeFilename'
+ a = a.decode(encoding)
+ quoted_args.append(pipes.quote(a))
+ return u' '.join(quoted_args)
+
+
+def takewhile_inclusive(pred, seq):
+ """ Like itertools.takewhile, but include the latest evaluated element
+ (the first element so that Not pred(e)) """
+ for e in seq:
+ yield e
+ if not pred(e):
+ return
+
+
+def smuggle_url(url, data):
+ """ Pass additional data in a URL for internal use. """
+
+ sdata = compat_urllib_parse.urlencode(
+ {u'__youtubedl_smuggle': json.dumps(data)})
+ return url + u'#' + sdata
+
+
+def unsmuggle_url(smug_url):
+ if not '#__youtubedl_smuggle' in smug_url:
+ return smug_url, None
+ url, _, sdata = smug_url.rpartition(u'#')
+ jsond = compat_parse_qs(sdata)[u'__youtubedl_smuggle'][0]
+ data = json.loads(jsond)
+ return url, data
+
+
+def format_bytes(bytes):
+ if bytes is None:
+ return u'N/A'
+ if type(bytes) is str:
+ bytes = float(bytes)
+ if bytes == 0.0:
+ exponent = 0
+ else:
+ exponent = int(math.log(bytes, 1024.0))
+ suffix = [u'B', u'KiB', u'MiB', u'GiB', u'TiB', u'PiB', u'EiB', u'ZiB', u'YiB'][exponent]
+ converted = float(bytes) / float(1024 ** exponent)
+ return u'%.2f%s' % (converted, suffix)