]>
Raphaƫl G. Git Repositories - youtubedl/blob - test/test_download.py
3 # Allow direct execution
7 sys
. path
. insert ( 0 , os
. path
. dirname ( os
. path
. dirname ( os
. path
. abspath ( __file__
))))
9 from test
. helper
import (
25 import youtube_dl
. YoutubeDL
26 from youtube_dl
. utils
import (
32 UnavailableVideoError
,
34 from youtube_dl
. extractor
import get_info_extractor
38 class YoutubeDL ( youtube_dl
. YoutubeDL
):
39 def __init__ ( self
, * args
, ** kwargs
):
40 self
. to_stderr
= self
. to_screen
41 self
. processed_info_dicts
= []
42 super ( YoutubeDL
, self
) .__ init
__ (* args
, ** kwargs
)
43 def report_warning ( self
, message
):
44 # Don't accept warnings during tests
45 raise ExtractorError ( message
)
46 def process_info ( self
, info_dict
):
47 self
. processed_info_dicts
. append ( info_dict
)
48 return super ( YoutubeDL
, self
). process_info ( info_dict
)
51 with open ( fn
, 'rb' ) as f
:
52 return hashlib
. md5 ( f
. read ()). hexdigest ()
54 defs
= get_testcases ()
57 class TestDownload ( unittest
. TestCase
):
62 ### Dynamically generate tests
63 def generator ( test_case
):
65 def test_template ( self
):
66 ie
= youtube_dl
. extractor
. get_info_extractor ( test_case
[ 'name' ])
67 other_ies
= [ get_info_extractor ( ie_key
) for ie_key
in test_case
. get ( 'add_ie' , [])]
68 def print_skipping ( reason
):
69 print ( 'Skipping %s : %s ' % ( test_case
[ 'name' ], reason
))
71 print_skipping ( 'IE marked as not _WORKING' )
73 if 'playlist' not in test_case
:
74 info_dict
= test_case
. get ( 'info_dict' , {})
75 if not test_case
. get ( 'file' ) and not ( info_dict
. get ( 'id' ) and info_dict
. get ( 'ext' )):
76 print_skipping ( 'The output file cannot be know, the "file" '
77 'key is missing or the info_dict is incomplete' )
79 if 'skip' in test_case
:
80 print_skipping ( test_case
[ 'skip' ])
82 for other_ie
in other_ies
:
83 if not other_ie
. working ():
84 print_skipping ( u
'test depends on %s IE, marked as not WORKING' % other_ie
. ie_key ())
87 params
= get_params ( test_case
. get ( 'params' , {}))
89 ydl
= YoutubeDL ( params
)
90 ydl
. add_default_info_extractors ()
91 finished_hook_called
= set ()
93 if status
[ 'status' ] == 'finished' :
94 finished_hook_called
. add ( status
[ 'filename' ])
95 ydl
. fd
. add_progress_hook ( _hook
)
97 def get_tc_filename ( tc
):
98 return tc
. get ( 'file' ) or ydl
. prepare_filename ( tc
. get ( 'info_dict' , {}))
100 test_cases
= test_case
. get ( 'playlist' , [ test_case
])
101 def try_rm_tcs_files ():
102 for tc
in test_cases
:
103 tc_filename
= get_tc_filename ( tc
)
105 try_rm ( tc_filename
+ '.part' )
106 try_rm ( tc_filename
+ '.info.json' )
112 ydl
. download ([ test_case
[ 'url' ]])
113 except ( DownloadError
, ExtractorError
) as err
:
114 # Check if the exception is not a network related one
115 if not err
. exc_info
[ 0 ] in ( compat_urllib_error
. URLError
, socket
. timeout
, UnavailableVideoError
) or ( err
. exc_info
[ 0 ] == compat_HTTPError
and err
. exc_info
[ 1 ]. code
== 503 ):
118 if try_num
== RETRIES
:
119 report_warning ( u
'Failed due to network errors, skipping...' )
122 print ( 'Retrying: {0} failed tries \n\n ########## \n\n ' . format ( try_num
))
128 for tc
in test_cases
:
129 tc_filename
= get_tc_filename ( tc
)
130 if not test_case
. get ( 'params' , {}). get ( 'skip_download' , False ):
131 self
. assertTrue ( os
. path
. exists ( tc_filename
), msg
= 'Missing file ' + tc_filename
)
132 self
. assertTrue ( tc_filename
in finished_hook_called
)
133 self
. assertTrue ( os
. path
. exists ( tc_filename
+ '.info.json' ))
135 md5_for_file
= _file_md5 ( tc_filename
)
136 self
. assertEqual ( md5_for_file
, tc
[ 'md5' ])
137 with io
. open ( tc_filename
+ '.info.json' , encoding
= 'utf-8' ) as infof
:
138 info_dict
= json
. load ( infof
)
139 for ( info_field
, expected
) in tc
. get ( 'info_dict' , {}). items ():
140 if isinstance ( expected
, compat_str
) and expected
. startswith ( 'md5:' ):
141 got
= 'md5:' + md5 ( info_dict
. get ( info_field
))
143 got
= info_dict
. get ( info_field
)
144 self
. assertEqual ( expected
, got
,
145 u
'invalid value for field %s , expected %r , got %r ' % ( info_field
, expected
, got
))
147 # If checkable fields are missing from the test case, print the info_dict
148 test_info_dict
= dict (( key
, value
if not isinstance ( value
, compat_str
) or len ( value
) < 250 else 'md5:' + md5 ( value
))
149 for key
, value
in info_dict
. items ()
150 if value
and key
in ( 'title' , 'description' , 'uploader' , 'upload_date' , 'uploader_id' , 'location' ))
151 if not all ( key
in tc
. get ( 'info_dict' , {}). keys () for key
in test_info_dict
. keys ()):
152 sys
. stderr
. write ( u
' \n "info_dict": ' + json
. dumps ( test_info_dict
, ensure_ascii
= False , indent
= 2 ) + u
' \n ' )
154 # Check for the presence of mandatory fields
155 for key
in ( 'id' , 'url' , 'title' , 'ext' ):
156 self
. assertTrue ( key
in info_dict
. keys () and info_dict
[ key
])
157 # Check for mandatory fields that are automatically set by YoutubeDL
158 for key
in [ 'webpage_url' , 'extractor' , 'extractor_key' ]:
159 self
. assertTrue ( info_dict
. get ( key
), u
'Missing field: %s ' % key
)
165 ### And add them to TestDownload
166 for n
, test_case
in enumerate ( defs
):
167 test_method
= generator ( test_case
)
168 tname
= 'test_' + str ( test_case
[ 'name' ])
170 while hasattr ( TestDownload
, tname
):
171 tname
= 'test_' + str ( test_case
[ 'name' ]) + '_' + str ( i
)
173 test_method
.__ name
__ = tname
174 setattr ( TestDownload
, test_method
.__ name
__ , test_method
)
178 if __name__
== '__main__' :