]>
Raphaƫl G. Git Repositories - youtubedl/blob - test/test_download.py
f171c10bad84a876a9fe4caba2b71a984c3169ec
3 # Allow direct execution
7 sys
.path
.insert(0, os
.path
.dirname(os
.path
.dirname(os
.path
.abspath(__file__
))))
9 from test
.helper
import (
24 import youtube_dl
.YoutubeDL
25 from youtube_dl
.utils
import (
32 UnavailableVideoError
,
34 from youtube_dl
.extractor
import get_info_extractor
38 class YoutubeDL(youtube_dl
.YoutubeDL
):
39 def __init__(self
, *args
, **kwargs
):
40 self
.to_stderr
= self
.to_screen
41 self
.processed_info_dicts
= []
42 super(YoutubeDL
, self
).__init
__(*args
, **kwargs
)
43 def report_warning(self
, message
):
44 # Don't accept warnings during tests
45 raise ExtractorError(message
)
46 def process_info(self
, info_dict
):
47 self
.processed_info_dicts
.append(info_dict
)
48 return super(YoutubeDL
, self
).process_info(info_dict
)
51 with open(fn
, 'rb') as f
:
52 return hashlib
.md5(f
.read()).hexdigest()
57 class TestDownload(unittest
.TestCase
):
62 ### Dynamically generate tests
63 def generator(test_case
):
65 def test_template(self
):
66 ie
= youtube_dl
.extractor
.get_info_extractor(test_case
['name'])
67 other_ies
= [get_info_extractor(ie_key
) for ie_key
in test_case
.get('add_ie', [])]
68 def print_skipping(reason
):
69 print('Skipping %s: %s' % (test_case
['name'], reason
))
71 print_skipping('IE marked as not _WORKING')
73 if 'playlist' not in test_case
:
74 info_dict
= test_case
.get('info_dict', {})
75 if not test_case
.get('file') and not (info_dict
.get('id') and info_dict
.get('ext')):
76 raise Exception('Test definition incorrect. The output file cannot be known. Are both \'id\' and \'ext\' keys present?')
77 if 'skip' in test_case
:
78 print_skipping(test_case
['skip'])
80 for other_ie
in other_ies
:
81 if not other_ie
.working():
82 print_skipping(u
'test depends on %sIE, marked as not WORKING' % other_ie
.ie_key())
85 params
= get_params(test_case
.get('params', {}))
87 ydl
= YoutubeDL(params
)
88 ydl
.add_default_info_extractors()
89 finished_hook_called
= set()
91 if status
['status'] == 'finished':
92 finished_hook_called
.add(status
['filename'])
93 ydl
.add_progress_hook(_hook
)
95 def get_tc_filename(tc
):
96 return tc
.get('file') or ydl
.prepare_filename(tc
.get('info_dict', {}))
98 test_cases
= test_case
.get('playlist', [test_case
])
99 def try_rm_tcs_files():
100 for tc
in test_cases
:
101 tc_filename
= get_tc_filename(tc
)
103 try_rm(tc_filename
+ '.part')
104 try_rm(os
.path
.splitext(tc_filename
)[0] + '.info.json')
110 ydl
.download([test_case
['url']])
111 except (DownloadError
, ExtractorError
) as err
:
112 # Check if the exception is not a network related one
113 if not err
.exc_info
[0] in (compat_urllib_error
.URLError
, socket
.timeout
, UnavailableVideoError
, compat_http_client
.BadStatusLine
) or (err
.exc_info
[0] == compat_HTTPError
and err
.exc_info
[1].code
== 503):
116 if try_num
== RETRIES
:
117 report_warning(u
'Failed due to network errors, skipping...')
120 print('Retrying: {0} failed tries\n\n##########\n\n'.format(try_num
))
126 for tc
in test_cases
:
127 tc_filename
= get_tc_filename(tc
)
128 if not test_case
.get('params', {}).get('skip_download', False):
129 self
.assertTrue(os
.path
.exists(tc_filename
), msg
='Missing file ' + tc_filename
)
130 self
.assertTrue(tc_filename
in finished_hook_called
)
131 info_json_fn
= os
.path
.splitext(tc_filename
)[0] + '.info.json'
132 self
.assertTrue(os
.path
.exists(info_json_fn
))
134 md5_for_file
= _file_md5(tc_filename
)
135 self
.assertEqual(md5_for_file
, tc
['md5'])
136 with io
.open(info_json_fn
, encoding
='utf-8') as infof
:
137 info_dict
= json
.load(infof
)
139 expect_info_dict(self
, tc
.get('info_dict', {}), info_dict
)
145 ### And add them to TestDownload
146 for n
, test_case
in enumerate(defs
):
147 test_method
= generator(test_case
)
148 tname
= 'test_' + str(test_case
['name'])
150 while hasattr(TestDownload
, tname
):
151 tname
= 'test_' + str(test_case
['name']) + '_' + str(i
)
153 test_method
.__name
__ = tname
154 setattr(TestDownload
, test_method
.__name
__, test_method
)
158 if __name__
== '__main__':