+
+
+# Cross-platform file locking
+if sys.platform == 'win32':
+ import ctypes.wintypes
+ import msvcrt
+
+ class OVERLAPPED(ctypes.Structure):
+ _fields_ = [
+ ('Internal', ctypes.wintypes.LPVOID),
+ ('InternalHigh', ctypes.wintypes.LPVOID),
+ ('Offset', ctypes.wintypes.DWORD),
+ ('OffsetHigh', ctypes.wintypes.DWORD),
+ ('hEvent', ctypes.wintypes.HANDLE),
+ ]
+
+ kernel32 = ctypes.windll.kernel32
+ LockFileEx = kernel32.LockFileEx
+ LockFileEx.argtypes = [
+ ctypes.wintypes.HANDLE, # hFile
+ ctypes.wintypes.DWORD, # dwFlags
+ ctypes.wintypes.DWORD, # dwReserved
+ ctypes.wintypes.DWORD, # nNumberOfBytesToLockLow
+ ctypes.wintypes.DWORD, # nNumberOfBytesToLockHigh
+ ctypes.POINTER(OVERLAPPED) # Overlapped
+ ]
+ LockFileEx.restype = ctypes.wintypes.BOOL
+ UnlockFileEx = kernel32.UnlockFileEx
+ UnlockFileEx.argtypes = [
+ ctypes.wintypes.HANDLE, # hFile
+ ctypes.wintypes.DWORD, # dwReserved
+ ctypes.wintypes.DWORD, # nNumberOfBytesToLockLow
+ ctypes.wintypes.DWORD, # nNumberOfBytesToLockHigh
+ ctypes.POINTER(OVERLAPPED) # Overlapped
+ ]
+ UnlockFileEx.restype = ctypes.wintypes.BOOL
+ whole_low = 0xffffffff
+ whole_high = 0x7fffffff
+
+ def _lock_file(f, exclusive):
+ overlapped = OVERLAPPED()
+ overlapped.Offset = 0
+ overlapped.OffsetHigh = 0
+ overlapped.hEvent = 0
+ f._lock_file_overlapped_p = ctypes.pointer(overlapped)
+ handle = msvcrt.get_osfhandle(f.fileno())
+ if not LockFileEx(handle, 0x2 if exclusive else 0x0, 0,
+ whole_low, whole_high, f._lock_file_overlapped_p):
+ raise OSError('Locking file failed: %r' % ctypes.FormatError())
+
+ def _unlock_file(f):
+ assert f._lock_file_overlapped_p
+ handle = msvcrt.get_osfhandle(f.fileno())
+ if not UnlockFileEx(handle, 0,
+ whole_low, whole_high, f._lock_file_overlapped_p):
+ raise OSError('Unlocking file failed: %r' % ctypes.FormatError())
+
+else:
+ import fcntl
+
+ def _lock_file(f, exclusive):
+ fcntl.lockf(f, fcntl.LOCK_EX if exclusive else fcntl.LOCK_SH)
+
+ def _unlock_file(f):
+ fcntl.lockf(f, fcntl.LOCK_UN)
+
+
+class locked_file(object):
+ def __init__(self, filename, mode, encoding=None):
+ assert mode in ['r', 'a', 'w']
+ self.f = io.open(filename, mode, encoding=encoding)
+ self.mode = mode
+
+ def __enter__(self):
+ exclusive = self.mode != 'r'
+ try:
+ _lock_file(self.f, exclusive)
+ except IOError:
+ self.f.close()
+ raise
+ return self
+
+ def __exit__(self, etype, value, traceback):
+ try:
+ _unlock_file(self.f)
+ finally:
+ self.f.close()
+
+ def __iter__(self):
+ return iter(self.f)
+
+ def write(self, *args):
+ return self.f.write(*args)
+
+ def read(self, *args):
+ return self.f.read(*args)
+
+
+def shell_quote(args):
+ quoted_args = []
+ encoding = sys.getfilesystemencoding()
+ if encoding is None:
+ encoding = 'utf-8'
+ for a in args:
+ if isinstance(a, bytes):
+ # We may get a filename encoded with 'encodeFilename'
+ a = a.decode(encoding)
+ quoted_args.append(pipes.quote(a))
+ return u' '.join(quoted_args)
+
+
+def takewhile_inclusive(pred, seq):
+ """ Like itertools.takewhile, but include the latest evaluated element
+ (the first element so that Not pred(e)) """
+ for e in seq:
+ yield e
+ if not pred(e):
+ return
+
+
+def smuggle_url(url, data):
+ """ Pass additional data in a URL for internal use. """
+
+ sdata = compat_urllib_parse.urlencode(
+ {u'__youtubedl_smuggle': json.dumps(data)})
+ return url + u'#' + sdata
+
+
+def unsmuggle_url(smug_url, default=None):
+ if not '#__youtubedl_smuggle' in smug_url:
+ return smug_url, default
+ url, _, sdata = smug_url.rpartition(u'#')
+ jsond = compat_parse_qs(sdata)[u'__youtubedl_smuggle'][0]
+ data = json.loads(jsond)
+ return url, data
+
+
+def format_bytes(bytes):
+ if bytes is None:
+ return u'N/A'
+ if type(bytes) is str:
+ bytes = float(bytes)
+ if bytes == 0.0:
+ exponent = 0
+ else:
+ exponent = int(math.log(bytes, 1024.0))
+ suffix = [u'B', u'KiB', u'MiB', u'GiB', u'TiB', u'PiB', u'EiB', u'ZiB', u'YiB'][exponent]
+ converted = float(bytes) / float(1024 ** exponent)
+ return u'%.2f%s' % (converted, suffix)
+
+
+def str_to_int(int_str):
+ int_str = re.sub(r'[,\.]', u'', int_str)
+ return int(int_str)
+
+
+def get_term_width():
+ columns = os.environ.get('COLUMNS', None)
+ if columns:
+ return int(columns)
+
+ try:
+ sp = subprocess.Popen(
+ ['stty', 'size'],
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ out, err = sp.communicate()
+ return int(out.split()[1])
+ except:
+ pass
+ return None
+
+
+def month_by_name(name):
+ """ Return the number of a month by (locale-independently) English name """
+
+ ENGLISH_NAMES = [
+ u'January', u'February', u'March', u'April', u'May', u'June',
+ u'July', u'August', u'September', u'October', u'November', u'December']
+ try:
+ return ENGLISH_NAMES.index(name) + 1
+ except ValueError:
+ return None
+
+
+def fix_xml_ampersands(xml_str):
+ """Replace all the '&' by '&' in XML"""
+ return re.sub(
+ r'&(?!amp;|lt;|gt;|apos;|quot;|#x[0-9a-fA-F]{,4};|#[0-9]{,4};)',
+ u'&',
+ xml_str)
+
+
+def setproctitle(title):
+ assert isinstance(title, compat_str)
+ try:
+ libc = ctypes.cdll.LoadLibrary("libc.so.6")
+ except OSError:
+ return
+ title_bytes = title.encode('utf-8')
+ buf = ctypes.create_string_buffer(len(title_bytes))
+ buf.value = title_bytes
+ try:
+ libc.prctl(15, buf, 0, 0, 0)
+ except AttributeError:
+ return # Strange libc, just skip this
+
+
+def remove_start(s, start):
+ if s.startswith(start):
+ return s[len(start):]
+ return s
+
+
+def url_basename(url):
+ path = compat_urlparse.urlparse(url).path
+ return path.strip(u'/').split(u'/')[-1]
+
+
+class HEADRequest(compat_urllib_request.Request):
+ def get_method(self):
+ return "HEAD"
+
+
+def int_or_none(v, scale=1, default=None, get_attr=None):
+ if get_attr:
+ if v is not None:
+ v = getattr(v, get_attr, None)
+ return default if v is None else (int(v) // scale)
+
+
+def float_or_none(v, scale=1, default=None):
+ return default if v is None else (float(v) / scale)
+
+
+def parse_duration(s):
+ if s is None:
+ return None
+
+ m = re.match(
+ r'(?:(?:(?P<hours>[0-9]+)[:h])?(?P<mins>[0-9]+)[:m])?(?P<secs>[0-9]+)s?(?::[0-9]+)?$', s)
+ if not m:
+ return None
+ res = int(m.group('secs'))
+ if m.group('mins'):
+ res += int(m.group('mins')) * 60
+ if m.group('hours'):
+ res += int(m.group('hours')) * 60 * 60
+ return res
+
+
+def prepend_extension(filename, ext):
+ name, real_ext = os.path.splitext(filename)
+ return u'{0}.{1}{2}'.format(name, ext, real_ext)
+
+
+def check_executable(exe, args=[]):
+ """ Checks if the given binary is installed somewhere in PATH, and returns its name.
+ args can be a list of arguments for a short output (like -version) """
+ try:
+ subprocess.Popen([exe] + args, stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
+ except OSError:
+ return False
+ return exe
+
+
+class PagedList(object):
+ def __init__(self, pagefunc, pagesize):
+ self._pagefunc = pagefunc
+ self._pagesize = pagesize
+
+ def __len__(self):
+ # This is only useful for tests
+ return len(self.getslice())
+
+ def getslice(self, start=0, end=None):
+ res = []
+ for pagenum in itertools.count(start // self._pagesize):
+ firstid = pagenum * self._pagesize
+ nextfirstid = pagenum * self._pagesize + self._pagesize
+ if start >= nextfirstid:
+ continue
+
+ page_results = list(self._pagefunc(pagenum))
+
+ startv = (
+ start % self._pagesize
+ if firstid <= start < nextfirstid
+ else 0)
+
+ endv = (
+ ((end - 1) % self._pagesize) + 1
+ if (end is not None and firstid <= end <= nextfirstid)
+ else None)
+
+ if startv != 0 or endv is not None:
+ page_results = page_results[startv:endv]
+ res.extend(page_results)
+
+ # A little optimization - if current page is not "full", ie. does
+ # not contain page_size videos then we can assume that this page
+ # is the last one - there are no more ids on further pages -
+ # i.e. no need to query again.
+ if len(page_results) + startv < self._pagesize:
+ break
+
+ # If we got the whole page, but the next page is not interesting,
+ # break out early as well
+ if end == nextfirstid:
+ break
+ return res
+
+
+def uppercase_escape(s):
+ unicode_escape = codecs.getdecoder('unicode_escape')
+ return re.sub(
+ r'\\U[0-9a-fA-F]{8}',
+ lambda m: unicode_escape(m.group(0))[0],
+ s)
+
+try:
+ struct.pack(u'!I', 0)
+except TypeError:
+ # In Python 2.6 (and some 2.7 versions), struct requires a bytes argument
+ def struct_pack(spec, *args):
+ if isinstance(spec, compat_str):
+ spec = spec.encode('ascii')
+ return struct.pack(spec, *args)
+
+ def struct_unpack(spec, *args):
+ if isinstance(spec, compat_str):
+ spec = spec.encode('ascii')
+ return struct.unpack(spec, *args)
+else:
+ struct_pack = struct.pack
+ struct_unpack = struct.unpack
+
+
+def read_batch_urls(batch_fd):
+ def fixup(url):
+ if not isinstance(url, compat_str):
+ url = url.decode('utf-8', 'replace')
+ BOM_UTF8 = u'\xef\xbb\xbf'
+ if url.startswith(BOM_UTF8):
+ url = url[len(BOM_UTF8):]
+ url = url.strip()
+ if url.startswith(('#', ';', ']')):
+ return False
+ return url
+
+ with contextlib.closing(batch_fd) as fd:
+ return [url for url in map(fixup, fd) if url]
+
+
+def urlencode_postdata(*args, **kargs):
+ return compat_urllib_parse.urlencode(*args, **kargs).encode('ascii')
+
+
+def parse_xml(s):
+ class TreeBuilder(xml.etree.ElementTree.TreeBuilder):
+ def doctype(self, name, pubid, system):
+ pass # Ignore doctypes
+
+ parser = xml.etree.ElementTree.XMLParser(target=TreeBuilder())
+ kwargs = {'parser': parser} if sys.version_info >= (2, 7) else {}
+ return xml.etree.ElementTree.XML(s.encode('utf-8'), **kwargs)
+
+
+if sys.version_info < (3, 0) and sys.platform == 'win32':
+ def compat_getpass(prompt, *args, **kwargs):
+ if isinstance(prompt, compat_str):
+ prompt = prompt.encode(preferredencoding())
+ return getpass.getpass(prompt, *args, **kwargs)
+else:
+ compat_getpass = getpass.getpass
+
+
+US_RATINGS = {
+ 'G': 0,
+ 'PG': 10,
+ 'PG-13': 13,
+ 'R': 16,
+ 'NC': 18,
+}
+
+
+def strip_jsonp(code):
+ return re.sub(r'(?s)^[a-zA-Z0-9_]+\s*\(\s*(.*)\);?\s*?\s*$', r'\1', code)
+
+
+def qualities(quality_ids):
+ """ Get a numeric quality value out of a list of possible values """
+ def q(qid):
+ try:
+ return quality_ids.index(qid)
+ except ValueError:
+ return -1
+ return q
+
+
+DEFAULT_OUTTMPL = '%(title)s-%(id)s.%(ext)s'
+
+try:
+ subprocess_check_output = subprocess.check_output
+except AttributeError:
+ def subprocess_check_output(*args, **kwargs):
+ assert 'input' not in kwargs
+ p = subprocess.Popen(*args, stdout=subprocess.PIPE, **kwargs)
+ output, _ = p.communicate()
+ ret = p.poll()
+ if ret:
+ raise subprocess.CalledProcessError(ret, p.args, output=output)
+ return output