]>
Raphaƫl G. Git Repositories - youtubedl/blob - test/test_download.py
  13 # Allow direct execution 
  14 sys
.path
.append(os
.path
.dirname(os
.path
.dirname(os
.path
.abspath(__file__
)))) 
  16 import youtube_dl
.YoutubeDL
 
  17 from youtube_dl
.utils 
import * 
  19 PARAMETERS_FILE 
= os
.path
.join(os
.path
.dirname(os
.path
.abspath(__file__
)), "parameters.json") 
  23 # General configuration (from __init__, not very elegant...) 
  24 jar 
= compat_cookiejar
.CookieJar() 
  25 cookie_processor 
= compat_urllib_request
.HTTPCookieProcessor(jar
) 
  26 proxy_handler 
= compat_urllib_request
.ProxyHandler() 
  27 opener 
= compat_urllib_request
.build_opener(proxy_handler
, cookie_processor
, YoutubeDLHandler()) 
  28 compat_urllib_request
.install_opener(opener
) 
  29 socket
.setdefaulttimeout(10) 
  31 def _try_rm(filename
): 
  32     """ Remove a file if it exists """ 
  35     except OSError as ose
: 
  36         if ose
.errno 
!= errno
.ENOENT
: 
  39 md5 
= lambda s
: hashlib
.md5(s
.encode('utf-8')).hexdigest() 
  41 class YoutubeDL(youtube_dl
.YoutubeDL
): 
  42     def __init__(self
, *args
, **kwargs
): 
  43         self
.to_stderr 
= self
.to_screen
 
  44         self
.processed_info_dicts 
= [] 
  45         super(YoutubeDL
, self
).__init
__(*args
, **kwargs
) 
  46     def report_warning(self
, message
): 
  47         # Don't accept warnings during tests 
  48         raise ExtractorError(message
) 
  49     def process_info(self
, info_dict
): 
  50         self
.processed_info_dicts
.append(info_dict
) 
  51         return super(YoutubeDL
, self
).process_info(info_dict
) 
  54     with open(fn
, 'rb') as f
: 
  55         return hashlib
.md5(f
.read()).hexdigest() 
  57 from helper 
import get_testcases
 
  58 defs 
= get_testcases() 
  60 with io
.open(PARAMETERS_FILE
, encoding
='utf-8') as pf
: 
  61     parameters 
= json
.load(pf
) 
  64 class TestDownload(unittest
.TestCase
): 
  67         self
.parameters 
= parameters
 
  70 ### Dynamically generate tests 
  71 def generator(test_case
): 
  73     def test_template(self
): 
  74         ie 
= youtube_dl
.extractor
.get_info_extractor(test_case
['name']) 
  75         def print_skipping(reason
): 
  76             print('Skipping %s: %s' % (test_case
['name'], reason
)) 
  78             print_skipping('IE marked as not _WORKING') 
  80         if 'playlist' not in test_case 
and not test_case
['file']: 
  81             print_skipping('No output file specified') 
  83         if 'skip' in test_case
: 
  84             print_skipping(test_case
['skip']) 
  87         params 
= self
.parameters
.copy() 
  88         params
.update(test_case
.get('params', {})) 
  90         ydl 
= YoutubeDL(params
) 
  91         ydl
.add_default_info_extractors() 
  92         finished_hook_called 
= set() 
  94             if status
['status'] == 'finished': 
  95                 finished_hook_called
.add(status
['filename']) 
  96         ydl
.fd
.add_progress_hook(_hook
) 
  98         test_cases 
= test_case
.get('playlist', [test_case
]) 
 101             _try_rm(tc
['file'] + '.part') 
 102             _try_rm(tc
['file'] + '.info.json') 
 104             for retry 
in range(1, RETRIES 
+ 1): 
 106                     ydl
.download([test_case
['url']]) 
 107                 except (DownloadError
, ExtractorError
) as err
: 
 108                     if retry 
== RETRIES
: raise 
 110                     # Check if the exception is not a network related one 
 111                     if not err
.exc_info
[0] in (compat_urllib_error
.URLError
, socket
.timeout
, UnavailableVideoError
): 
 114                     print('Retrying: {0} failed tries\n\n##########\n\n'.format(retry
)) 
 118             for tc 
in test_cases
: 
 119                 if not test_case
.get('params', {}).get('skip_download', False): 
 120                     self
.assertTrue(os
.path
.exists(tc
['file']), msg
='Missing file ' + tc
['file']) 
 121                     self
.assertTrue(tc
['file'] in finished_hook_called
) 
 122                 self
.assertTrue(os
.path
.exists(tc
['file'] + '.info.json')) 
 124                     md5_for_file 
= _file_md5(tc
['file']) 
 125                     self
.assertEqual(md5_for_file
, tc
['md5']) 
 126                 with io
.open(tc
['file'] + '.info.json', encoding
='utf-8') as infof
: 
 127                     info_dict 
= json
.load(infof
) 
 128                 for (info_field
, expected
) in tc
.get('info_dict', {}).items(): 
 129                     if isinstance(expected
, compat_str
) and expected
.startswith('md5:'): 
 130                         got 
= 'md5:' + md5(info_dict
.get(info_field
)) 
 132                         got 
= info_dict
.get(info_field
) 
 133                     self
.assertEqual(expected
, got
, 
 134                         u
'invalid value for field %s, expected %r, got %r' % (info_field
, expected
, got
)) 
 136                 # If checkable fields are missing from the test case, print the info_dict 
 137                 test_info_dict 
= dict((key
, value 
if not isinstance(value
, compat_str
) or len(value
) < 250 else 'md5:' + md5(value
)) 
 138                     for key
, value 
in info_dict
.items() 
 139                     if value 
and key 
in ('title', 'description', 'uploader', 'upload_date', 'uploader_id', 'location')) 
 140                 if not all(key 
in tc
.get('info_dict', {}).keys() for key 
in test_info_dict
.keys()): 
 141                     sys
.stderr
.write(u
'\n"info_dict": ' + json
.dumps(test_info_dict
, ensure_ascii
=False, indent
=2) + u
'\n') 
 143                 # Check for the presence of mandatory fields 
 144                 for key 
in ('id', 'url', 'title', 'ext'): 
 145                     self
.assertTrue(key 
in info_dict
.keys() and info_dict
[key
]) 
 147             for tc 
in test_cases
: 
 149                 _try_rm(tc
['file'] + '.part') 
 150                 _try_rm(tc
['file'] + '.info.json') 
 154 ### And add them to TestDownload 
 155 for n
, test_case 
in enumerate(defs
): 
 156     test_method 
= generator(test_case
) 
 157     tname 
= 'test_' + str(test_case
['name']) 
 159     while hasattr(TestDownload
, tname
): 
 160         tname 
= 'test_'  + str(test_case
['name']) + '_' + str(i
) 
 162     test_method
.__name
__ = tname
 
 163     setattr(TestDownload
, test_method
.__name
__, test_method
) 
 167 if __name__ 
== '__main__':