]>
Raphaƫl G. Git Repositories - youtubedl/blob - test/test_download.py
3 # Allow direct execution
7 sys
.path
.insert(0, os
.path
.dirname(os
.path
.dirname(os
.path
.abspath(__file__
))))
9 from test
.helper
import (
23 import youtube_dl
.YoutubeDL
24 from youtube_dl
.utils
import (
30 UnavailableVideoError
,
32 from youtube_dl
.extractor
import get_info_extractor
36 class YoutubeDL(youtube_dl
.YoutubeDL
):
37 def __init__(self
, *args
, **kwargs
):
38 self
.to_stderr
= self
.to_screen
39 self
.processed_info_dicts
= []
40 super(YoutubeDL
, self
).__init
__(*args
, **kwargs
)
41 def report_warning(self
, message
):
42 # Don't accept warnings during tests
43 raise ExtractorError(message
)
44 def process_info(self
, info_dict
):
45 self
.processed_info_dicts
.append(info_dict
)
46 return super(YoutubeDL
, self
).process_info(info_dict
)
49 with open(fn
, 'rb') as f
:
50 return hashlib
.md5(f
.read()).hexdigest()
55 class TestDownload(unittest
.TestCase
):
60 ### Dynamically generate tests
61 def generator(test_case
):
63 def test_template(self
):
64 ie
= youtube_dl
.extractor
.get_info_extractor(test_case
['name'])
65 other_ies
= [get_info_extractor(ie_key
) for ie_key
in test_case
.get('add_ie', [])]
66 def print_skipping(reason
):
67 print('Skipping %s: %s' % (test_case
['name'], reason
))
69 print_skipping('IE marked as not _WORKING')
71 if 'playlist' not in test_case
:
72 info_dict
= test_case
.get('info_dict', {})
73 if not test_case
.get('file') and not (info_dict
.get('id') and info_dict
.get('ext')):
74 raise Exception('Test definition incorrect. The output file cannot be known. Are both \'id\' and \'ext\' keys present?')
75 if 'skip' in test_case
:
76 print_skipping(test_case
['skip'])
78 for other_ie
in other_ies
:
79 if not other_ie
.working():
80 print_skipping(u
'test depends on %sIE, marked as not WORKING' % other_ie
.ie_key())
83 params
= get_params(test_case
.get('params', {}))
85 ydl
= YoutubeDL(params
)
86 ydl
.add_default_info_extractors()
87 finished_hook_called
= set()
89 if status
['status'] == 'finished':
90 finished_hook_called
.add(status
['filename'])
91 ydl
.add_progress_hook(_hook
)
93 def get_tc_filename(tc
):
94 return tc
.get('file') or ydl
.prepare_filename(tc
.get('info_dict', {}))
96 test_cases
= test_case
.get('playlist', [test_case
])
97 def try_rm_tcs_files():
99 tc_filename
= get_tc_filename(tc
)
101 try_rm(tc_filename
+ '.part')
102 try_rm(os
.path
.splitext(tc_filename
)[0] + '.info.json')
108 ydl
.download([test_case
['url']])
109 except (DownloadError
, ExtractorError
) as err
:
110 # Check if the exception is not a network related one
111 if not err
.exc_info
[0] in (compat_urllib_error
.URLError
, socket
.timeout
, UnavailableVideoError
, compat_http_client
.BadStatusLine
) or (err
.exc_info
[0] == compat_HTTPError
and err
.exc_info
[1].code
== 503):
114 if try_num
== RETRIES
:
115 report_warning(u
'Failed due to network errors, skipping...')
118 print('Retrying: {0} failed tries\n\n##########\n\n'.format(try_num
))
124 for tc
in test_cases
:
125 tc_filename
= get_tc_filename(tc
)
126 if not test_case
.get('params', {}).get('skip_download', False):
127 self
.assertTrue(os
.path
.exists(tc_filename
), msg
='Missing file ' + tc_filename
)
128 self
.assertTrue(tc_filename
in finished_hook_called
)
129 info_json_fn
= os
.path
.splitext(tc_filename
)[0] + '.info.json'
130 self
.assertTrue(os
.path
.exists(info_json_fn
))
132 md5_for_file
= _file_md5(tc_filename
)
133 self
.assertEqual(md5_for_file
, tc
['md5'])
134 with io
.open(info_json_fn
, encoding
='utf-8') as infof
:
135 info_dict
= json
.load(infof
)
137 expect_info_dict(self
, tc
.get('info_dict', {}), info_dict
)
143 ### And add them to TestDownload
144 for n
, test_case
in enumerate(defs
):
145 test_method
= generator(test_case
)
146 tname
= 'test_' + str(test_case
['name'])
148 while hasattr(TestDownload
, tname
):
149 tname
= 'test_' + str(test_case
['name']) + '_' + str(i
)
151 test_method
.__name
__ = tname
152 setattr(TestDownload
, test_method
.__name
__, test_method
)
156 if __name__
== '__main__':