]>
Raphaƫl G. Git Repositories - youtubedl/blob - test/test_download.py
   3 # Allow direct execution 
   7 sys
.path
.insert(0, os
.path
.dirname(os
.path
.dirname(os
.path
.abspath(__file__
)))) 
   9 from test
.helper 
import get_params
, get_testcases
, global_setup
, try_rm
, md5
 
  18 import youtube_dl
.YoutubeDL
 
  19 from youtube_dl
.utils 
import ( 
  24     UnavailableVideoError
, 
  29 class YoutubeDL(youtube_dl
.YoutubeDL
): 
  30     def __init__(self
, *args
, **kwargs
): 
  31         self
.to_stderr 
= self
.to_screen
 
  32         self
.processed_info_dicts 
= [] 
  33         super(YoutubeDL
, self
).__init
__(*args
, **kwargs
) 
  34     def report_warning(self
, message
): 
  35         # Don't accept warnings during tests 
  36         raise ExtractorError(message
) 
  37     def process_info(self
, info_dict
): 
  38         self
.processed_info_dicts
.append(info_dict
) 
  39         return super(YoutubeDL
, self
).process_info(info_dict
) 
  42     with open(fn
, 'rb') as f
: 
  43         return hashlib
.md5(f
.read()).hexdigest() 
  45 defs 
= get_testcases() 
  48 class TestDownload(unittest
.TestCase
): 
  53 ### Dynamically generate tests 
  54 def generator(test_case
): 
  56     def test_template(self
): 
  57         ie 
= youtube_dl
.extractor
.get_info_extractor(test_case
['name']) 
  58         def print_skipping(reason
): 
  59             print('Skipping %s: %s' % (test_case
['name'], reason
)) 
  61             print_skipping('IE marked as not _WORKING') 
  63         if 'playlist' not in test_case 
and not test_case
['file']: 
  64             print_skipping('No output file specified') 
  66         if 'skip' in test_case
: 
  67             print_skipping(test_case
['skip']) 
  70         params 
= get_params(test_case
.get('params', {})) 
  72         ydl 
= YoutubeDL(params
) 
  73         ydl
.add_default_info_extractors() 
  74         finished_hook_called 
= set() 
  76             if status
['status'] == 'finished': 
  77                 finished_hook_called
.add(status
['filename']) 
  78         ydl
.fd
.add_progress_hook(_hook
) 
  80         test_cases 
= test_case
.get('playlist', [test_case
]) 
  83             try_rm(tc
['file'] + '.part') 
  84             try_rm(tc
['file'] + '.info.json') 
  86             for retry 
in range(1, RETRIES 
+ 1): 
  88                     ydl
.download([test_case
['url']]) 
  89                 except (DownloadError
, ExtractorError
) as err
: 
  90                     if retry 
== RETRIES
: raise 
  92                     # Check if the exception is not a network related one 
  93                     if not err
.exc_info
[0] in (compat_urllib_error
.URLError
, socket
.timeout
, UnavailableVideoError
): 
  96                     print('Retrying: {0} failed tries\n\n##########\n\n'.format(retry
)) 
 100             for tc 
in test_cases
: 
 101                 if not test_case
.get('params', {}).get('skip_download', False): 
 102                     self
.assertTrue(os
.path
.exists(tc
['file']), msg
='Missing file ' + tc
['file']) 
 103                     self
.assertTrue(tc
['file'] in finished_hook_called
) 
 104                 self
.assertTrue(os
.path
.exists(tc
['file'] + '.info.json')) 
 106                     md5_for_file 
= _file_md5(tc
['file']) 
 107                     self
.assertEqual(md5_for_file
, tc
['md5']) 
 108                 with io
.open(tc
['file'] + '.info.json', encoding
='utf-8') as infof
: 
 109                     info_dict 
= json
.load(infof
) 
 110                 for (info_field
, expected
) in tc
.get('info_dict', {}).items(): 
 111                     if isinstance(expected
, compat_str
) and expected
.startswith('md5:'): 
 112                         got 
= 'md5:' + md5(info_dict
.get(info_field
)) 
 114                         got 
= info_dict
.get(info_field
) 
 115                     self
.assertEqual(expected
, got
, 
 116                         u
'invalid value for field %s, expected %r, got %r' % (info_field
, expected
, got
)) 
 118                 # If checkable fields are missing from the test case, print the info_dict 
 119                 test_info_dict 
= dict((key
, value 
if not isinstance(value
, compat_str
) or len(value
) < 250 else 'md5:' + md5(value
)) 
 120                     for key
, value 
in info_dict
.items() 
 121                     if value 
and key 
in ('title', 'description', 'uploader', 'upload_date', 'uploader_id', 'location')) 
 122                 if not all(key 
in tc
.get('info_dict', {}).keys() for key 
in test_info_dict
.keys()): 
 123                     sys
.stderr
.write(u
'\n"info_dict": ' + json
.dumps(test_info_dict
, ensure_ascii
=False, indent
=2) + u
'\n') 
 125                 # Check for the presence of mandatory fields 
 126                 for key 
in ('id', 'url', 'title', 'ext'): 
 127                     self
.assertTrue(key 
in info_dict
.keys() and info_dict
[key
]) 
 129             for tc 
in test_cases
: 
 131                 try_rm(tc
['file'] + '.part') 
 132                 try_rm(tc
['file'] + '.info.json') 
 136 ### And add them to TestDownload 
 137 for n
, test_case 
in enumerate(defs
): 
 138     test_method 
= generator(test_case
) 
 139     tname 
= 'test_' + str(test_case
['name']) 
 141     while hasattr(TestDownload
, tname
): 
 142         tname 
= 'test_'  + str(test_case
['name']) + '_' + str(i
) 
 144     test_method
.__name
__ = tname
 
 145     setattr(TestDownload
, test_method
.__name
__, test_method
) 
 149 if __name__ 
== '__main__':