+ def test_postprocessors(self):
+ filename = 'post-processor-testfile.mp4'
+ audiofile = filename + '.mp3'
+
+ class SimplePP(PostProcessor):
+ def run(self, info):
+ with open(audiofile, 'wt') as f:
+ f.write('EXAMPLE')
+ return [info['filepath']], info
+
+ def run_pp(params, PP):
+ with open(filename, 'wt') as f:
+ f.write('EXAMPLE')
+ ydl = YoutubeDL(params)
+ ydl.add_post_processor(PP())
+ ydl.post_process(filename, {'filepath': filename})
+
+ run_pp({'keepvideo': True}, SimplePP)
+ self.assertTrue(os.path.exists(filename), '%s doesn\'t exist' % filename)
+ self.assertTrue(os.path.exists(audiofile), '%s doesn\'t exist' % audiofile)
+ os.unlink(filename)
+ os.unlink(audiofile)
+
+ run_pp({'keepvideo': False}, SimplePP)
+ self.assertFalse(os.path.exists(filename), '%s exists' % filename)
+ self.assertTrue(os.path.exists(audiofile), '%s doesn\'t exist' % audiofile)
+ os.unlink(audiofile)
+
+ class ModifierPP(PostProcessor):
+ def run(self, info):
+ with open(info['filepath'], 'wt') as f:
+ f.write('MODIFIED')
+ return [], info
+
+ run_pp({'keepvideo': False}, ModifierPP)
+ self.assertTrue(os.path.exists(filename), '%s doesn\'t exist' % filename)
+ os.unlink(filename)
+
+ def test_match_filter(self):
+ class FilterYDL(YDL):
+ def __init__(self, *args, **kwargs):
+ super(FilterYDL, self).__init__(*args, **kwargs)
+ self.params['simulate'] = True
+
+ def process_info(self, info_dict):
+ super(YDL, self).process_info(info_dict)
+
+ def _match_entry(self, info_dict, incomplete):
+ res = super(FilterYDL, self)._match_entry(info_dict, incomplete)
+ if res is None:
+ self.downloaded_info_dicts.append(info_dict)
+ return res
+
+ first = {
+ 'id': '1',
+ 'url': TEST_URL,
+ 'title': 'one',
+ 'extractor': 'TEST',
+ 'duration': 30,
+ 'filesize': 10 * 1024,
+ }
+ second = {
+ 'id': '2',
+ 'url': TEST_URL,
+ 'title': 'two',
+ 'extractor': 'TEST',
+ 'duration': 10,
+ 'description': 'foo',
+ 'filesize': 5 * 1024,
+ }
+ videos = [first, second]
+
+ def get_videos(filter_=None):
+ ydl = FilterYDL({'match_filter': filter_})
+ for v in videos:
+ ydl.process_ie_result(v, download=True)
+ return [v['id'] for v in ydl.downloaded_info_dicts]
+
+ res = get_videos()
+ self.assertEqual(res, ['1', '2'])
+
+ def f(v):
+ if v['id'] == '1':
+ return None
+ else:
+ return 'Video id is not 1'
+ res = get_videos(f)
+ self.assertEqual(res, ['1'])
+
+ f = match_filter_func('duration < 30')
+ res = get_videos(f)
+ self.assertEqual(res, ['2'])
+
+ f = match_filter_func('description = foo')
+ res = get_videos(f)
+ self.assertEqual(res, ['2'])
+
+ f = match_filter_func('description =? foo')
+ res = get_videos(f)
+ self.assertEqual(res, ['1', '2'])
+
+ f = match_filter_func('filesize > 5KiB')
+ res = get_videos(f)
+ self.assertEqual(res, ['1'])
+
+ def test_playlist_items_selection(self):
+ entries = [{
+ 'id': compat_str(i),
+ 'title': compat_str(i),
+ 'url': TEST_URL,
+ } for i in range(1, 5)]
+ playlist = {
+ '_type': 'playlist',
+ 'id': 'test',
+ 'entries': entries,
+ 'extractor': 'test:playlist',
+ 'extractor_key': 'test:playlist',
+ 'webpage_url': 'http://example.com',
+ }
+
+ def get_ids(params):
+ ydl = YDL(params)
+ # make a copy because the dictionary can be modified
+ ydl.process_ie_result(playlist.copy())
+ return [int(v['id']) for v in ydl.downloaded_info_dicts]
+
+ result = get_ids({})
+ self.assertEqual(result, [1, 2, 3, 4])
+
+ result = get_ids({'playlistend': 10})
+ self.assertEqual(result, [1, 2, 3, 4])
+
+ result = get_ids({'playlistend': 2})
+ self.assertEqual(result, [1, 2])
+
+ result = get_ids({'playliststart': 10})
+ self.assertEqual(result, [])
+
+ result = get_ids({'playliststart': 2})
+ self.assertEqual(result, [2, 3, 4])
+
+ result = get_ids({'playlist_items': '2-4'})
+ self.assertEqual(result, [2, 3, 4])
+
+ result = get_ids({'playlist_items': '2,4'})
+ self.assertEqual(result, [2, 4])
+
+ result = get_ids({'playlist_items': '10'})
+ self.assertEqual(result, [])
+
+