]>
Raphaƫl G. Git Repositories - youtubedl/blob - test/test_download.py
3 # Allow direct execution
7 sys
.path
.insert(0, os
.path
.dirname(os
.path
.dirname(os
.path
.abspath(__file__
))))
9 from test
.helper
import (
23 import youtube_dl
.YoutubeDL
24 from youtube_dl
.utils
import (
31 UnavailableVideoError
,
33 from youtube_dl
.extractor
import get_info_extractor
37 class YoutubeDL(youtube_dl
.YoutubeDL
):
38 def __init__(self
, *args
, **kwargs
):
39 self
.to_stderr
= self
.to_screen
40 self
.processed_info_dicts
= []
41 super(YoutubeDL
, self
).__init
__(*args
, **kwargs
)
42 def report_warning(self
, message
):
43 # Don't accept warnings during tests
44 raise ExtractorError(message
)
45 def process_info(self
, info_dict
):
46 self
.processed_info_dicts
.append(info_dict
)
47 return super(YoutubeDL
, self
).process_info(info_dict
)
50 with open(fn
, 'rb') as f
:
51 return hashlib
.md5(f
.read()).hexdigest()
53 defs
= get_testcases()
56 class TestDownload(unittest
.TestCase
):
61 ### Dynamically generate tests
62 def generator(test_case
):
64 def test_template(self
):
65 ie
= youtube_dl
.extractor
.get_info_extractor(test_case
['name'])
66 other_ies
= [get_info_extractor(ie_key
) for ie_key
in test_case
.get('add_ie', [])]
67 def print_skipping(reason
):
68 print('Skipping %s: %s' % (test_case
['name'], reason
))
70 print_skipping('IE marked as not _WORKING')
72 if 'playlist' not in test_case
:
73 info_dict
= test_case
.get('info_dict', {})
74 if not test_case
.get('file') and not (info_dict
.get('id') and info_dict
.get('ext')):
75 print_skipping('The output file cannot be know, the "file" '
76 'key is missing or the info_dict is incomplete')
78 if 'skip' in test_case
:
79 print_skipping(test_case
['skip'])
81 for other_ie
in other_ies
:
82 if not other_ie
.working():
83 print_skipping(u
'test depends on %sIE, marked as not WORKING' % other_ie
.ie_key())
86 params
= get_params(test_case
.get('params', {}))
88 ydl
= YoutubeDL(params
)
89 ydl
.add_default_info_extractors()
90 finished_hook_called
= set()
92 if status
['status'] == 'finished':
93 finished_hook_called
.add(status
['filename'])
94 ydl
.add_progress_hook(_hook
)
96 def get_tc_filename(tc
):
97 return tc
.get('file') or ydl
.prepare_filename(tc
.get('info_dict', {}))
99 test_cases
= test_case
.get('playlist', [test_case
])
100 def try_rm_tcs_files():
101 for tc
in test_cases
:
102 tc_filename
= get_tc_filename(tc
)
104 try_rm(tc_filename
+ '.part')
105 try_rm(os
.path
.splitext(tc_filename
)[0] + '.info.json')
111 ydl
.download([test_case
['url']])
112 except (DownloadError
, ExtractorError
) as err
:
113 # Check if the exception is not a network related one
114 if not err
.exc_info
[0] in (compat_urllib_error
.URLError
, socket
.timeout
, UnavailableVideoError
, compat_http_client
.BadStatusLine
) or (err
.exc_info
[0] == compat_HTTPError
and err
.exc_info
[1].code
== 503):
117 if try_num
== RETRIES
:
118 report_warning(u
'Failed due to network errors, skipping...')
121 print('Retrying: {0} failed tries\n\n##########\n\n'.format(try_num
))
127 for tc
in test_cases
:
128 tc_filename
= get_tc_filename(tc
)
129 if not test_case
.get('params', {}).get('skip_download', False):
130 self
.assertTrue(os
.path
.exists(tc_filename
), msg
='Missing file ' + tc_filename
)
131 self
.assertTrue(tc_filename
in finished_hook_called
)
132 info_json_fn
= os
.path
.splitext(tc_filename
)[0] + '.info.json'
133 self
.assertTrue(os
.path
.exists(info_json_fn
))
135 md5_for_file
= _file_md5(tc_filename
)
136 self
.assertEqual(md5_for_file
, tc
['md5'])
137 with io
.open(info_json_fn
, encoding
='utf-8') as infof
:
138 info_dict
= json
.load(infof
)
139 for (info_field
, expected
) in tc
.get('info_dict', {}).items():
140 if isinstance(expected
, compat_str
) and expected
.startswith('md5:'):
141 got
= 'md5:' + md5(info_dict
.get(info_field
))
143 got
= info_dict
.get(info_field
)
144 self
.assertEqual(expected
, got
,
145 u
'invalid value for field %s, expected %r, got %r' % (info_field
, expected
, got
))
147 # If checkable fields are missing from the test case, print the info_dict
148 test_info_dict
= dict((key
, value
if not isinstance(value
, compat_str
) or len(value
) < 250 else 'md5:' + md5(value
))
149 for key
, value
in info_dict
.items()
150 if value
and key
in ('title', 'description', 'uploader', 'upload_date', 'uploader_id', 'location'))
151 if not all(key
in tc
.get('info_dict', {}).keys() for key
in test_info_dict
.keys()):
152 sys
.stderr
.write(u
'\n"info_dict": ' + json
.dumps(test_info_dict
, ensure_ascii
=False, indent
=4) + u
'\n')
154 # Check for the presence of mandatory fields
155 for key
in ('id', 'url', 'title', 'ext'):
156 self
.assertTrue(key
in info_dict
.keys() and info_dict
[key
])
157 # Check for mandatory fields that are automatically set by YoutubeDL
158 for key
in ['webpage_url', 'extractor', 'extractor_key']:
159 self
.assertTrue(info_dict
.get(key
), u
'Missing field: %s' % key
)
165 ### And add them to TestDownload
166 for n
, test_case
in enumerate(defs
):
167 test_method
= generator(test_case
)
168 tname
= 'test_' + str(test_case
['name'])
170 while hasattr(TestDownload
, tname
):
171 tname
= 'test_' + str(test_case
['name']) + '_' + str(i
)
173 test_method
.__name
__ = tname
174 setattr(TestDownload
, test_method
.__name
__, test_method
)
178 if __name__
== '__main__':