import email.utils
import errno
import gzip
+import itertools
import io
import json
import locale
import re
import ssl
import socket
+import struct
import subprocess
import sys
import traceback
def find_xpath_attr(node, xpath, key, val):
""" Find the xpath xpath[@key=val] """
assert re.match(r'^[a-zA-Z]+$', key)
- assert re.match(r'^[a-zA-Z0-9@\s]*$', val)
+ assert re.match(r'^[a-zA-Z0-9@\s:._]*$', val)
expr = xpath + u"[@%s='%s']" % (key, val)
return node.find(expr)
else:
result = re.sub(u'(?u)&(.+?);', htmlentity_transform, s)
return result
-def encodeFilename(s):
+
+def encodeFilename(s, for_subprocess=False):
"""
@param s The name of the file
"""
- assert type(s) == type(u'')
+ assert type(s) == compat_str
# Python 3 has a Unicode API
if sys.version_info >= (3, 0):
# Pass u'' directly to use Unicode APIs on Windows 2000 and up
# (Detecting Windows NT 4 is tricky because 'major >= 4' would
# match Windows 9x series as well. Besides, NT 4 is obsolete.)
- return s
+ if not for_subprocess:
+ return s
+ else:
+ # For subprocess calls, encode with locale encoding
+ # Refer to http://stackoverflow.com/a/9951851/35070
+ encoding = preferredencoding()
else:
encoding = sys.getfilesystemencoding()
- if encoding is None:
- encoding = 'utf-8'
- return s.encode(encoding, 'ignore')
+ if encoding is None:
+ encoding = 'utf-8'
+ return s.encode(encoding, 'ignore')
+
def decodeOption(optval):
if optval is None:
else:
return '%d' % secs
-def make_HTTPS_handler(opts_no_check_certificate):
+
+def make_HTTPS_handler(opts_no_check_certificate, **kwargs):
if sys.version_info < (3, 2):
import httplib
class HTTPSHandlerV3(compat_urllib_request.HTTPSHandler):
def https_open(self, req):
return self.do_open(HTTPSConnectionV3, req)
- return HTTPSHandlerV3()
+ return HTTPSHandlerV3(**kwargs)
else:
context = ssl.SSLContext(ssl.PROTOCOL_SSLv3)
context.verify_mode = (ssl.CERT_NONE
context.load_default_certs()
except AttributeError:
pass # Python < 3.4
- return compat_urllib_request.HTTPSHandler(context=context)
+ return compat_urllib_request.HTTPSHandler(context=context, **kwargs)
class ExtractorError(Exception):
"""Error during info extraction."""
https_request = http_request
https_response = http_response
+
def unified_strdate(date_str):
"""Return a string with the date in the format YYYYMMDD"""
upload_date = None
#Replace commas
- date_str = date_str.replace(',',' ')
+ date_str = date_str.replace(',', ' ')
# %z (UTC offset) is only supported in python>=3.2
- date_str = re.sub(r' (\+|-)[\d]*$', '', date_str)
+ date_str = re.sub(r' ?(\+|-)[0-9]{2}:?[0-9]{2}$', '', date_str)
format_expressions = [
'%d %B %Y',
+ '%d %b %Y',
'%B %d %Y',
'%b %d %Y',
'%Y-%m-%d',
'%d/%m/%Y',
'%Y/%m/%d %H:%M:%S',
+ '%Y-%m-%d %H:%M:%S',
'%d.%m.%Y %H:%M',
'%Y-%m-%dT%H:%M:%SZ',
'%Y-%m-%dT%H:%M:%S.%fZ',
'%Y-%m-%dT%H:%M:%S.%f0Z',
'%Y-%m-%dT%H:%M:%S',
+ '%Y-%m-%dT%H:%M',
]
for expression in format_expressions:
try:
upload_date = datetime.datetime.strptime(date_str, expression).strftime('%Y%m%d')
- except:
+ except ValueError:
pass
if upload_date is None:
timetuple = email.utils.parsedate_tz(date_str)
return today + delta
return datetime.datetime.strptime(date_str, "%Y%m%d").date()
+def hyphenate_date(date_str):
+ """
+ Convert a date in 'YYYYMMDD' format to 'YYYY-MM-DD' format"""
+ match = re.match(r'^(\d\d\d\d)(\d\d)(\d\d)$', date_str)
+ if match is not None:
+ return '-'.join(match.groups())
+ else:
+ return date_str
+
class DateRange(object):
"""Represents a time interval between two dates"""
def __init__(self, start=None, end=None):
def write_string(s, out=None):
if out is None:
out = sys.stderr
- assert type(s) == type(u'')
+ assert type(s) == compat_str
if ('b' in getattr(out, 'mode', '') or
sys.version_info[0] < 3): # Python 2 lies about mode of sys.stderr
s = s.encode(preferredencoding(), 'ignore')
- out.write(s)
+ try:
+ out.write(s)
+ except UnicodeEncodeError:
+ # In Windows shells, this can fail even when the codec is just charmap!?
+ # See https://wiki.python.org/moin/PrintFails#Issue
+ if sys.platform == 'win32' and hasattr(out, 'encoding'):
+ s = s.encode(out.encoding, 'ignore').decode(out.encoding)
+ out.write(s)
+ else:
+ raise
+
out.flush()
return url + u'#' + sdata
-def unsmuggle_url(smug_url):
+def unsmuggle_url(smug_url, default=None):
if not '#__youtubedl_smuggle' in smug_url:
- return smug_url, None
+ return smug_url, default
url, _, sdata = smug_url.rpartition(u'#')
jsond = compat_parse_qs(sdata)[u'__youtubedl_smuggle'][0]
data = json.loads(jsond)
return None
-def fix_xml_all_ampersand(xml_str):
+def fix_xml_ampersands(xml_str):
"""Replace all the '&' by '&' in XML"""
- return xml_str.replace(u'&', u'&')
+ return re.sub(
+ r'&(?!amp;|lt;|gt;|apos;|quot;|#x[0-9a-fA-F]{,4};|#[0-9]{,4};)',
+ u'&',
+ xml_str)
def setproctitle(title):
- assert isinstance(title, type(u''))
+ assert isinstance(title, compat_str)
try:
libc = ctypes.cdll.LoadLibrary("libc.so.6")
except OSError:
class HEADRequest(compat_urllib_request.Request):
def get_method(self):
return "HEAD"
+
+
+def int_or_none(v, scale=1):
+ return v if v is None else (int(v) // scale)
+
+
+def parse_duration(s):
+ if s is None:
+ return None
+
+ m = re.match(
+ r'(?:(?:(?P<hours>[0-9]+)[:h])?(?P<mins>[0-9]+)[:m])?(?P<secs>[0-9]+)s?$', s)
+ if not m:
+ return None
+ res = int(m.group('secs'))
+ if m.group('mins'):
+ res += int(m.group('mins')) * 60
+ if m.group('hours'):
+ res += int(m.group('hours')) * 60 * 60
+ return res
+
+
+def prepend_extension(filename, ext):
+ name, real_ext = os.path.splitext(filename)
+ return u'{0}.{1}{2}'.format(name, ext, real_ext)
+
+
+def check_executable(exe, args=[]):
+ """ Checks if the given binary is installed somewhere in PATH, and returns its name.
+ args can be a list of arguments for a short output (like -version) """
+ try:
+ subprocess.Popen([exe] + args, stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
+ except OSError:
+ return False
+ return exe
+
+
+class PagedList(object):
+ def __init__(self, pagefunc, pagesize):
+ self._pagefunc = pagefunc
+ self._pagesize = pagesize
+
+ def __len__(self):
+ # This is only useful for tests
+ return len(self.getslice())
+
+ def getslice(self, start=0, end=None):
+ res = []
+ for pagenum in itertools.count(start // self._pagesize):
+ firstid = pagenum * self._pagesize
+ nextfirstid = pagenum * self._pagesize + self._pagesize
+ if start >= nextfirstid:
+ continue
+
+ page_results = list(self._pagefunc(pagenum))
+
+ startv = (
+ start % self._pagesize
+ if firstid <= start < nextfirstid
+ else 0)
+
+ endv = (
+ ((end - 1) % self._pagesize) + 1
+ if (end is not None and firstid <= end <= nextfirstid)
+ else None)
+
+ if startv != 0 or endv is not None:
+ page_results = page_results[startv:endv]
+ res.extend(page_results)
+
+ # A little optimization - if current page is not "full", ie. does
+ # not contain page_size videos then we can assume that this page
+ # is the last one - there are no more ids on further pages -
+ # i.e. no need to query again.
+ if len(page_results) + startv < self._pagesize:
+ break
+
+ # If we got the whole page, but the next page is not interesting,
+ # break out early as well
+ if end == nextfirstid:
+ break
+ return res
+
+
+def uppercase_escape(s):
+ return re.sub(
+ r'\\U([0-9a-fA-F]{8})',
+ lambda m: compat_chr(int(m.group(1), base=16)), s)
+
+try:
+ struct.pack(u'!I', 0)
+except TypeError:
+ # In Python 2.6 (and some 2.7 versions), struct requires a bytes argument
+ def struct_pack(spec, *args):
+ if isinstance(spec, compat_str):
+ spec = spec.encode('ascii')
+ return struct.pack(spec, *args)
+
+ def struct_unpack(spec, *args):
+ if isinstance(spec, compat_str):
+ spec = spec.encode('ascii')
+ return struct.unpack(spec, *args)
+else:
+ struct_pack = struct.pack
+ struct_unpack = struct.unpack