+# coding: utf-8
+from __future__ import unicode_literals
+
import re
-import socket
from .common import InfoExtractor
+from ..compat import compat_str
from ..utils import (
- compat_http_client,
- compat_str,
- compat_urllib_error,
- compat_urllib_parse,
- compat_urllib_request,
-
ExtractorError,
+ int_or_none,
+ qualities,
+ try_get,
unified_strdate,
)
-class ArteTvIE(InfoExtractor):
- """arte.tv information extractor."""
+# There are different sources of video in arte.tv, the extraction process
+# is different for each one. The videos usually expire in 7 days, so we can't
+# add tests.
+
+
+class ArteTVBaseIE(InfoExtractor):
+ def _extract_from_json_url(self, json_url, video_id, lang, title=None):
+ info = self._download_json(json_url, video_id)
+ player_info = info['videoJsonPlayer']
+
+ vsr = try_get(player_info, lambda x: x['VSR'], dict)
+ if not vsr:
+ error = None
+ if try_get(player_info, lambda x: x['custom_msg']['type']) == 'error':
+ error = try_get(
+ player_info, lambda x: x['custom_msg']['msg'], compat_str)
+ if not error:
+ error = 'Video %s is not available' % player_info.get('VID') or video_id
+ raise ExtractorError(error, expected=True)
+
+ upload_date_str = player_info.get('shootingDate')
+ if not upload_date_str:
+ upload_date_str = (player_info.get('VRA') or player_info.get('VDA') or '').split(' ')[0]
+
+ title = (player_info.get('VTI') or title or player_info['VID']).strip()
+ subtitle = player_info.get('VSU', '').strip()
+ if subtitle:
+ title += ' - %s' % subtitle
+
+ info_dict = {
+ 'id': player_info['VID'],
+ 'title': title,
+ 'description': player_info.get('VDE'),
+ 'upload_date': unified_strdate(upload_date_str),
+ 'thumbnail': player_info.get('programImage') or player_info.get('VTU', {}).get('IUR'),
+ }
+ qfunc = qualities(['MQ', 'HQ', 'EQ', 'SQ'])
+
+ LANGS = {
+ 'fr': 'F',
+ 'de': 'A',
+ 'en': 'E[ANG]',
+ 'es': 'E[ESP]',
+ 'it': 'E[ITA]',
+ 'pl': 'E[POL]',
+ }
- _VALID_URL = r'(?:http://)?videos\.arte\.tv/(?:fr|de)/videos/.*'
- _LIVE_URL = r'index-[0-9]+\.html$'
+ langcode = LANGS.get(lang, lang)
+
+ formats = []
+ for format_id, format_dict in vsr.items():
+ f = dict(format_dict)
+ versionCode = f.get('versionCode')
+ l = re.escape(langcode)
+
+ # Language preference from most to least priority
+ # Reference: section 6.8 of
+ # https://www.arte.tv/sites/en/corporate/files/complete-technical-guidelines-arte-geie-v1-07-1.pdf
+ PREFERENCES = (
+ # original version in requested language, without subtitles
+ r'VO{0}$'.format(l),
+ # original version in requested language, with partial subtitles in requested language
+ r'VO{0}-ST{0}$'.format(l),
+ # original version in requested language, with subtitles for the deaf and hard-of-hearing in requested language
+ r'VO{0}-STM{0}$'.format(l),
+ # non-original (dubbed) version in requested language, without subtitles
+ r'V{0}$'.format(l),
+ # non-original (dubbed) version in requested language, with subtitles partial subtitles in requested language
+ r'V{0}-ST{0}$'.format(l),
+ # non-original (dubbed) version in requested language, with subtitles for the deaf and hard-of-hearing in requested language
+ r'V{0}-STM{0}$'.format(l),
+ # original version in requested language, with partial subtitles in different language
+ r'VO{0}-ST(?!{0}).+?$'.format(l),
+ # original version in requested language, with subtitles for the deaf and hard-of-hearing in different language
+ r'VO{0}-STM(?!{0}).+?$'.format(l),
+ # original version in different language, with partial subtitles in requested language
+ r'VO(?:(?!{0}).+?)?-ST{0}$'.format(l),
+ # original version in different language, with subtitles for the deaf and hard-of-hearing in requested language
+ r'VO(?:(?!{0}).+?)?-STM{0}$'.format(l),
+ # original version in different language, without subtitles
+ r'VO(?:(?!{0}))?$'.format(l),
+ # original version in different language, with partial subtitles in different language
+ r'VO(?:(?!{0}).+?)?-ST(?!{0}).+?$'.format(l),
+ # original version in different language, with subtitles for the deaf and hard-of-hearing in different language
+ r'VO(?:(?!{0}).+?)?-STM(?!{0}).+?$'.format(l),
+ )
+
+ for pref, p in enumerate(PREFERENCES):
+ if re.match(p, versionCode):
+ lang_pref = len(PREFERENCES) - pref
+ break
+ else:
+ lang_pref = -1
+
+ format = {
+ 'format_id': format_id,
+ 'preference': -10 if f.get('videoFormat') == 'M3U8' else None,
+ 'language_preference': lang_pref,
+ 'format_note': '%s, %s' % (f.get('versionCode'), f.get('versionLibelle')),
+ 'width': int_or_none(f.get('width')),
+ 'height': int_or_none(f.get('height')),
+ 'tbr': int_or_none(f.get('bitrate')),
+ 'quality': qfunc(f.get('quality')),
+ }
+
+ if f.get('mediaType') == 'rtmp':
+ format['url'] = f['streamer']
+ format['play_path'] = 'mp4:' + f['url']
+ format['ext'] = 'flv'
+ else:
+ format['url'] = f['url']
- IE_NAME = u'arte.tv'
+ formats.append(format)
- def fetch_webpage(self, url):
- request = compat_urllib_request.Request(url)
- try:
- self.report_download_webpage(url)
- webpage = compat_urllib_request.urlopen(request).read()
- except (compat_urllib_error.URLError, compat_http_client.HTTPException, socket.error) as err:
- raise ExtractorError(u'Unable to retrieve video webpage: %s' % compat_str(err))
- except ValueError as err:
- raise ExtractorError(u'Invalid URL: %s' % url)
- return webpage
+ self._check_formats(formats, video_id)
+ self._sort_formats(formats)
- def grep_webpage(self, url, regex, regexFlags, matchTuples):
- page = self.fetch_webpage(url)
- mobj = re.search(regex, page, regexFlags)
- info = {}
+ info_dict['formats'] = formats
+ return info_dict
- if mobj is None:
- raise ExtractorError(u'Invalid URL: %s' % url)
- for (i, key, err) in matchTuples:
- if mobj.group(i) is None:
- raise ExtractorError(err)
- else:
- info[key] = mobj.group(i)
-
- return info
-
- # TODO implement Live Stream
- # def extractLiveStream(self, url):
- # video_lang = url.split('/')[-4]
- # info = self.grep_webpage(
- # url,
- # r'src="(.*?/videothek_js.*?\.js)',
- # 0,
- # [
- # (1, 'url', u'Invalid URL: %s' % url)
- # ]
- # )
- # http_host = url.split('/')[2]
- # next_url = 'http://%s%s' % (http_host, compat_urllib_parse.unquote(info.get('url')))
- # info = self.grep_webpage(
- # next_url,
- # r'(s_artestras_scst_geoFRDE_' + video_lang + '.*?)\'.*?' +
- # '(http://.*?\.swf).*?' +
- # '(rtmp://.*?)\'',
- # re.DOTALL,
- # [
- # (1, 'path', u'could not extract video path: %s' % url),
- # (2, 'player', u'could not extract video player: %s' % url),
- # (3, 'url', u'could not extract video url: %s' % url)
- # ]
- # )
- # video_url = u'%s/%s' % (info.get('url'), info.get('path'))
-
- def extractPlus7Stream(self, url):
- video_lang = url.split('/')[-3]
- info = self.grep_webpage(
- url,
- r'param name="movie".*?videorefFileUrl=(http[^\'"&]*)',
- 0,
- [
- (1, 'url', u'Invalid URL: %s' % url)
- ]
- )
- next_url = compat_urllib_parse.unquote(info.get('url'))
- info = self.grep_webpage(
- next_url,
- r'<video lang="%s" ref="(http[^\'"&]*)' % video_lang,
- 0,
- [
- (1, 'url', u'Could not find <video> tag: %s' % url)
- ]
- )
- next_url = compat_urllib_parse.unquote(info.get('url'))
-
- info = self.grep_webpage(
- next_url,
- r'<video id="(.*?)".*?>.*?' +
- '<name>(.*?)</name>.*?' +
- '<dateVideo>(.*?)</dateVideo>.*?' +
- '<url quality="hd">(.*?)</url>',
- re.DOTALL,
- [
- (1, 'id', u'could not extract video id: %s' % url),
- (2, 'title', u'could not extract video title: %s' % url),
- (3, 'date', u'could not extract video date: %s' % url),
- (4, 'url', u'could not extract video url: %s' % url)
- ]
+class ArteTVPlus7IE(ArteTVBaseIE):
+ IE_NAME = 'arte.tv:+7'
+ _VALID_URL = r'https?://(?:www\.)?arte\.tv/(?P<lang>fr|de|en|es|it|pl)/videos/(?P<id>\d{6}-\d{3}-[AF])'
+
+ _TESTS = [{
+ 'url': 'https://www.arte.tv/en/videos/088501-000-A/mexico-stealing-petrol-to-survive/',
+ 'info_dict': {
+ 'id': '088501-000-A',
+ 'ext': 'mp4',
+ 'title': 'Mexico: Stealing Petrol to Survive',
+ 'upload_date': '20190628',
+ },
+ }]
+
+ def _real_extract(self, url):
+ lang, video_id = re.match(self._VALID_URL, url).groups()
+ return self._extract_from_json_url(
+ 'https://api.arte.tv/api/player/v1/config/%s/%s' % (lang, video_id),
+ video_id, lang)
+
+
+class ArteTVEmbedIE(ArteTVPlus7IE):
+ IE_NAME = 'arte.tv:embed'
+ _VALID_URL = r'''(?x)
+ https://www\.arte\.tv
+ /player/v3/index\.php\?json_url=
+ (?P<json_url>
+ https?://api\.arte\.tv/api/player/v1/config/
+ (?P<lang>[^/]+)/(?P<id>\d{6}-\d{3}-[AF])
)
+ '''
- return {
- 'id': info.get('id'),
- 'url': compat_urllib_parse.unquote(info.get('url')),
- 'uploader': u'arte.tv',
- 'upload_date': unified_strdate(info.get('date')),
- 'title': info.get('title').decode('utf-8'),
- 'ext': u'mp4',
- 'format': u'NA',
- 'player_url': None,
- }
+ _TESTS = []
def _real_extract(self, url):
- video_id = url.split('/')[-1]
- self.report_extraction(video_id)
+ json_url, lang, video_id = re.match(self._VALID_URL, url).groups()
+ return self._extract_from_json_url(json_url, video_id, lang)
+
- if re.search(self._LIVE_URL, video_id) is not None:
- raise ExtractorError(u'Arte live streams are not yet supported, sorry')
- # self.extractLiveStream(url)
- # return
- else:
- info = self.extractPlus7Stream(url)
+class ArteTVPlaylistIE(ArteTVBaseIE):
+ IE_NAME = 'arte.tv:playlist'
+ _VALID_URL = r'https?://(?:www\.)?arte\.tv/(?P<lang>fr|de|en|es|it|pl)/videos/(?P<id>RC-\d{6})'
- return [info]
+ _TESTS = [{
+ 'url': 'https://www.arte.tv/en/videos/RC-016954/earn-a-living/',
+ 'info_dict': {
+ 'id': 'RC-016954',
+ 'title': 'Earn a Living',
+ 'description': 'md5:d322c55011514b3a7241f7fb80d494c2',
+ },
+ 'playlist_mincount': 6,
+ }]
+
+ def _real_extract(self, url):
+ lang, playlist_id = re.match(self._VALID_URL, url).groups()
+ collection = self._download_json(
+ 'https://api.arte.tv/api/player/v1/collectionData/%s/%s?source=videos'
+ % (lang, playlist_id), playlist_id)
+ title = collection.get('title')
+ description = collection.get('shortDescription') or collection.get('teaserText')
+ entries = [
+ self._extract_from_json_url(
+ video['jsonUrl'], video.get('programId') or playlist_id, lang)
+ for video in collection['videos'] if video.get('jsonUrl')]
+ return self.playlist_result(entries, playlist_id, title, description)