]> Raphaƫl G. Git Repositories - youtubedl/blobdiff - test/test_download.py
Merge tag 'upstream/2014.11.23'
[youtubedl] / test / test_download.py
index f171c10bad84a876a9fe4caba2b71a984c3169ec..12cfb5cbe49dde5468b8b773f85cfbbb331d8eb8 100644 (file)
@@ -1,5 +1,7 @@
 #!/usr/bin/env python
 
 #!/usr/bin/env python
 
+from __future__ import unicode_literals
+
 # Allow direct execution
 import os
 import sys
 # Allow direct execution
 import os
 import sys
@@ -7,10 +9,11 @@ import unittest
 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
 
 from test.helper import (
 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
 
 from test.helper import (
+    assertGreaterEqual,
+    expect_warnings,
     get_params,
     gettestcases,
     expect_info_dict,
     get_params,
     gettestcases,
     expect_info_dict,
-    md5,
     try_rm,
     report_warning,
 )
     try_rm,
     report_warning,
 )
@@ -22,13 +25,15 @@ import json
 import socket
 
 import youtube_dl.YoutubeDL
 import socket
 
 import youtube_dl.YoutubeDL
-from youtube_dl.utils import (
+from youtube_dl.compat import (
     compat_http_client,
     compat_http_client,
-    compat_str,
     compat_urllib_error,
     compat_HTTPError,
     compat_urllib_error,
     compat_HTTPError,
+)
+from youtube_dl.utils import (
     DownloadError,
     ExtractorError,
     DownloadError,
     ExtractorError,
+    format_bytes,
     UnavailableVideoError,
 )
 from youtube_dl.extractor import get_info_extractor
     UnavailableVideoError,
 )
 from youtube_dl.extractor import get_info_extractor
@@ -65,15 +70,21 @@ def generator(test_case):
     def test_template(self):
         ie = youtube_dl.extractor.get_info_extractor(test_case['name'])
         other_ies = [get_info_extractor(ie_key) for ie_key in test_case.get('add_ie', [])]
     def test_template(self):
         ie = youtube_dl.extractor.get_info_extractor(test_case['name'])
         other_ies = [get_info_extractor(ie_key) for ie_key in test_case.get('add_ie', [])]
+        is_playlist = any(k.startswith('playlist') for k in test_case)
+        test_cases = test_case.get(
+            'playlist', [] if is_playlist else [test_case])
+
         def print_skipping(reason):
             print('Skipping %s: %s' % (test_case['name'], reason))
         if not ie.working():
             print_skipping('IE marked as not _WORKING')
             return
         def print_skipping(reason):
             print('Skipping %s: %s' % (test_case['name'], reason))
         if not ie.working():
             print_skipping('IE marked as not _WORKING')
             return
-        if 'playlist' not in test_case:
-            info_dict = test_case.get('info_dict', {})
-            if not test_case.get('file') and not (info_dict.get('id') and info_dict.get('ext')):
+
+        for tc in test_cases:
+            info_dict = tc.get('info_dict', {})
+            if not tc.get('file') and not (info_dict.get('id') and info_dict.get('ext')):
                 raise Exception('Test definition incorrect. The output file cannot be known. Are both \'id\' and \'ext\' keys present?')
                 raise Exception('Test definition incorrect. The output file cannot be known. Are both \'id\' and \'ext\' keys present?')
+
         if 'skip' in test_case:
             print_skipping(test_case['skip'])
             return
         if 'skip' in test_case:
             print_skipping(test_case['skip'])
             return
@@ -83,21 +94,27 @@ def generator(test_case):
                 return
 
         params = get_params(test_case.get('params', {}))
                 return
 
         params = get_params(test_case.get('params', {}))
+        if is_playlist and 'playlist' not in test_case:
+            params.setdefault('extract_flat', True)
+            params.setdefault('skip_download', True)
 
 
-        ydl = YoutubeDL(params)
+        ydl = YoutubeDL(params, auto_init=False)
         ydl.add_default_info_extractors()
         finished_hook_called = set()
         def _hook(status):
             if status['status'] == 'finished':
                 finished_hook_called.add(status['filename'])
         ydl.add_progress_hook(_hook)
         ydl.add_default_info_extractors()
         finished_hook_called = set()
         def _hook(status):
             if status['status'] == 'finished':
                 finished_hook_called.add(status['filename'])
         ydl.add_progress_hook(_hook)
+        expect_warnings(ydl, test_case.get('expected_warnings', []))
 
         def get_tc_filename(tc):
             return tc.get('file') or ydl.prepare_filename(tc.get('info_dict', {}))
 
 
         def get_tc_filename(tc):
             return tc.get('file') or ydl.prepare_filename(tc.get('info_dict', {}))
 
-        test_cases = test_case.get('playlist', [test_case])
-        def try_rm_tcs_files():
-            for tc in test_cases:
+        res_dict = None
+        def try_rm_tcs_files(tcs=None):
+            if tcs is None:
+                tcs = test_cases
+            for tc in tcs:
                 tc_filename = get_tc_filename(tc)
                 try_rm(tc_filename)
                 try_rm(tc_filename + '.part')
                 tc_filename = get_tc_filename(tc)
                 try_rm(tc_filename)
                 try_rm(tc_filename + '.part')
@@ -107,7 +124,10 @@ def generator(test_case):
             try_num = 1
             while True:
                 try:
             try_num = 1
             while True:
                 try:
-                    ydl.download([test_case['url']])
+                    # We're not using .download here sine that is just a shim
+                    # for outside error handling, and returns the exit code
+                    # instead of the result dict.
+                    res_dict = ydl.extract_info(test_case['url'])
                 except (DownloadError, ExtractorError) as err:
                     # Check if the exception is not a network related one
                     if not err.exc_info[0] in (compat_urllib_error.URLError, socket.timeout, UnavailableVideoError, compat_http_client.BadStatusLine) or (err.exc_info[0] == compat_HTTPError and err.exc_info[1].code == 503):
                 except (DownloadError, ExtractorError) as err:
                     # Check if the exception is not a network related one
                     if not err.exc_info[0] in (compat_urllib_error.URLError, socket.timeout, UnavailableVideoError, compat_http_client.BadStatusLine) or (err.exc_info[0] == compat_HTTPError and err.exc_info[1].code == 503):
@@ -123,22 +143,66 @@ def generator(test_case):
                 else:
                     break
 
                 else:
                     break
 
+            if is_playlist:
+                self.assertEqual(res_dict['_type'], 'playlist')
+                self.assertTrue('entries' in res_dict)
+                expect_info_dict(self, test_case.get('info_dict', {}), res_dict)
+
+            if 'playlist_mincount' in test_case:
+                assertGreaterEqual(
+                    self,
+                    len(res_dict['entries']),
+                    test_case['playlist_mincount'],
+                    'Expected at least %d in playlist %s, but got only %d' % (
+                        test_case['playlist_mincount'], test_case['url'],
+                        len(res_dict['entries'])))
+            if 'playlist_count' in test_case:
+                self.assertEqual(
+                    len(res_dict['entries']),
+                    test_case['playlist_count'],
+                    'Expected %d entries in playlist %s, but got %d.' % (
+                        test_case['playlist_count'],
+                        test_case['url'],
+                        len(res_dict['entries']),
+                    ))
+            if 'playlist_duration_sum' in test_case:
+                got_duration = sum(e['duration'] for e in res_dict['entries'])
+                self.assertEqual(
+                    test_case['playlist_duration_sum'], got_duration)
+
             for tc in test_cases:
                 tc_filename = get_tc_filename(tc)
                 if not test_case.get('params', {}).get('skip_download', False):
                     self.assertTrue(os.path.exists(tc_filename), msg='Missing file ' + tc_filename)
                     self.assertTrue(tc_filename in finished_hook_called)
             for tc in test_cases:
                 tc_filename = get_tc_filename(tc)
                 if not test_case.get('params', {}).get('skip_download', False):
                     self.assertTrue(os.path.exists(tc_filename), msg='Missing file ' + tc_filename)
                     self.assertTrue(tc_filename in finished_hook_called)
+                    expected_minsize = tc.get('file_minsize', 10000)
+                    if expected_minsize is not None:
+                        if params.get('test'):
+                            expected_minsize = max(expected_minsize, 10000)
+                        got_fsize = os.path.getsize(tc_filename)
+                        assertGreaterEqual(
+                            self, got_fsize, expected_minsize,
+                            'Expected %s to be at least %s, but it\'s only %s ' %
+                            (tc_filename, format_bytes(expected_minsize),
+                                format_bytes(got_fsize)))
+                    if 'md5' in tc:
+                        md5_for_file = _file_md5(tc_filename)
+                        self.assertEqual(md5_for_file, tc['md5'])
                 info_json_fn = os.path.splitext(tc_filename)[0] + '.info.json'
                 info_json_fn = os.path.splitext(tc_filename)[0] + '.info.json'
-                self.assertTrue(os.path.exists(info_json_fn))
-                if 'md5' in tc:
-                    md5_for_file = _file_md5(tc_filename)
-                    self.assertEqual(md5_for_file, tc['md5'])
+                self.assertTrue(
+                    os.path.exists(info_json_fn),
+                    'Missing info file %s' % info_json_fn)
                 with io.open(info_json_fn, encoding='utf-8') as infof:
                     info_dict = json.load(infof)
 
                 expect_info_dict(self, tc.get('info_dict', {}), info_dict)
         finally:
             try_rm_tcs_files()
                 with io.open(info_json_fn, encoding='utf-8') as infof:
                     info_dict = json.load(infof)
 
                 expect_info_dict(self, tc.get('info_dict', {}), info_dict)
         finally:
             try_rm_tcs_files()
+            if is_playlist and res_dict is not None and res_dict.get('entries'):
+                # Remove all other files that may have been extracted if the
+                # extractor returns full results even with extract_flat
+                res_tcs = [{'info_dict': e} for e in res_dict['entries']]
+                try_rm_tcs_files(res_tcs)
 
     return test_template
 
 
     return test_template
 
@@ -148,9 +212,9 @@ for n, test_case in enumerate(defs):
     tname = 'test_' + str(test_case['name'])
     i = 1
     while hasattr(TestDownload, tname):
     tname = 'test_' + str(test_case['name'])
     i = 1
     while hasattr(TestDownload, tname):
-        tname = 'test_'  + str(test_case['name']) + '_' + str(i)
+        tname = 'test_%s_%d' % (test_case['name'], i)
         i += 1
         i += 1
-    test_method.__name__ = tname
+    test_method.__name__ = str(tname)
     setattr(TestDownload, test_method.__name__, test_method)
     del test_method
 
     setattr(TestDownload, test_method.__name__, test_method)
     del test_method