]>
Raphaƫl G. Git Repositories - youtubedl/blob - test/test_download.py
dd5818dba91c166936e45f1c7d8779c752fa3b86
3 # Allow direct execution
7 sys
. path
. insert ( 0 , os
. path
. dirname ( os
. path
. dirname ( os
. path
. abspath ( __file__
))))
9 from test
. helper
import (
23 import youtube_dl
. YoutubeDL
24 from youtube_dl
. utils
import (
30 UnavailableVideoError
,
32 from youtube_dl
. extractor
import get_info_extractor
36 class YoutubeDL ( youtube_dl
. YoutubeDL
):
37 def __init__ ( self
, * args
, ** kwargs
):
38 self
. to_stderr
= self
. to_screen
39 self
. processed_info_dicts
= []
40 super ( YoutubeDL
, self
) .__ init
__ (* args
, ** kwargs
)
41 def report_warning ( self
, message
):
42 # Don't accept warnings during tests
43 raise ExtractorError ( message
)
44 def process_info ( self
, info_dict
):
45 self
. processed_info_dicts
. append ( info_dict
)
46 return super ( YoutubeDL
, self
). process_info ( info_dict
)
49 with open ( fn
, 'rb' ) as f
:
50 return hashlib
. md5 ( f
. read ()). hexdigest ()
52 defs
= get_testcases ()
55 class TestDownload ( unittest
. TestCase
):
60 ### Dynamically generate tests
61 def generator ( test_case
):
63 def test_template ( self
):
64 ie
= youtube_dl
. extractor
. get_info_extractor ( test_case
[ 'name' ])
65 other_ies
= [ get_info_extractor ( ie_key
) for ie_key
in test_case
. get ( 'add_ie' , [])]
66 def print_skipping ( reason
):
67 print ( 'Skipping %s : %s ' % ( test_case
[ 'name' ], reason
))
69 print_skipping ( 'IE marked as not _WORKING' )
71 if 'playlist' not in test_case
:
72 info_dict
= test_case
. get ( 'info_dict' , {})
73 if not test_case
. get ( 'file' ) and not ( info_dict
. get ( 'id' ) and info_dict
. get ( 'ext' )):
74 print_skipping ( 'The output file cannot be know, the "file" '
75 'key is missing or the info_dict is incomplete' )
77 if 'skip' in test_case
:
78 print_skipping ( test_case
[ 'skip' ])
80 for other_ie
in other_ies
:
81 if not other_ie
. working ():
82 print_skipping ( u
'test depends on %s IE, marked as not WORKING' % other_ie
. ie_key ())
85 params
= get_params ( test_case
. get ( 'params' , {}))
87 ydl
= YoutubeDL ( params
)
88 ydl
. add_default_info_extractors ()
89 finished_hook_called
= set ()
91 if status
[ 'status' ] == 'finished' :
92 finished_hook_called
. add ( status
[ 'filename' ])
93 ydl
. fd
. add_progress_hook ( _hook
)
95 def get_tc_filename ( tc
):
96 return tc
. get ( 'file' ) or ydl
. prepare_filename ( tc
. get ( 'info_dict' , {}))
98 test_cases
= test_case
. get ( 'playlist' , [ test_case
])
99 def try_rm_tcs_files ():
100 for tc
in test_cases
:
101 tc_filename
= get_tc_filename ( tc
)
103 try_rm ( tc_filename
+ '.part' )
104 try_rm ( os
. path
. splitext ( tc_filename
)[ 0 ] + '.info.json' )
110 ydl
. download ([ test_case
[ 'url' ]])
111 except ( DownloadError
, ExtractorError
) as err
:
112 # Check if the exception is not a network related one
113 if not err
. exc_info
[ 0 ] in ( compat_urllib_error
. URLError
, socket
. timeout
, UnavailableVideoError
) or ( err
. exc_info
[ 0 ] == compat_HTTPError
and err
. exc_info
[ 1 ]. code
== 503 ):
116 if try_num
== RETRIES
:
117 report_warning ( u
'Failed due to network errors, skipping...' )
120 print ( 'Retrying: {0} failed tries \n\n ########## \n\n ' . format ( try_num
))
126 for tc
in test_cases
:
127 tc_filename
= get_tc_filename ( tc
)
128 if not test_case
. get ( 'params' , {}). get ( 'skip_download' , False ):
129 self
. assertTrue ( os
. path
. exists ( tc_filename
), msg
= 'Missing file ' + tc_filename
)
130 self
. assertTrue ( tc_filename
in finished_hook_called
)
131 info_json_fn
= os
. path
. splitext ( tc_filename
)[ 0 ] + '.info.json'
132 self
. assertTrue ( os
. path
. exists ( info_json_fn
))
134 md5_for_file
= _file_md5 ( tc_filename
)
135 self
. assertEqual ( md5_for_file
, tc
[ 'md5' ])
136 with io
. open ( info_json_fn
, encoding
= 'utf-8' ) as infof
:
137 info_dict
= json
. load ( infof
)
138 for ( info_field
, expected
) in tc
. get ( 'info_dict' , {}). items ():
139 if isinstance ( expected
, compat_str
) and expected
. startswith ( 'md5:' ):
140 got
= 'md5:' + md5 ( info_dict
. get ( info_field
))
142 got
= info_dict
. get ( info_field
)
143 self
. assertEqual ( expected
, got
,
144 u
'invalid value for field %s , expected %r , got %r ' % ( info_field
, expected
, got
))
146 # If checkable fields are missing from the test case, print the info_dict
147 test_info_dict
= dict (( key
, value
if not isinstance ( value
, compat_str
) or len ( value
) < 250 else 'md5:' + md5 ( value
))
148 for key
, value
in info_dict
. items ()
149 if value
and key
in ( 'title' , 'description' , 'uploader' , 'upload_date' , 'uploader_id' , 'location' ))
150 if not all ( key
in tc
. get ( 'info_dict' , {}). keys () for key
in test_info_dict
. keys ()):
151 sys
. stderr
. write ( u
' \n "info_dict": ' + json
. dumps ( test_info_dict
, ensure_ascii
= False , indent
= 2 ) + u
' \n ' )
153 # Check for the presence of mandatory fields
154 for key
in ( 'id' , 'url' , 'title' , 'ext' ):
155 self
. assertTrue ( key
in info_dict
. keys () and info_dict
[ key
])
156 # Check for mandatory fields that are automatically set by YoutubeDL
157 for key
in [ 'webpage_url' , 'extractor' , 'extractor_key' ]:
158 self
. assertTrue ( info_dict
. get ( key
), u
'Missing field: %s ' % key
)
164 ### And add them to TestDownload
165 for n
, test_case
in enumerate ( defs
):
166 test_method
= generator ( test_case
)
167 tname
= 'test_' + str ( test_case
[ 'name' ])
169 while hasattr ( TestDownload
, tname
):
170 tname
= 'test_' + str ( test_case
[ 'name' ]) + '_' + str ( i
)
172 test_method
.__ name
__ = tname
173 setattr ( TestDownload
, test_method
.__ name
__ , test_method
)
177 if __name__
== '__main__' :