Compare commits

...

8 Commits

Author SHA1 Message Date
bashonly
bd0a668169
[ie/pinterest] Fix extractor (#12538)
Closes #12529
Authored by: mikf

Co-authored-by: =?UTF-8?q?Mike=20F=C3=A4hrmann?= <mike_faehrmann@web.de>
2025-03-05 06:38:23 +00:00
bashonly
b8b4754704
[ie/twitter] Fix syndication token generation (#12537)
Fix 14cd7f3443c6da4d49edaefcc12da9dee86e243e

Authored by: bashonly
2025-03-05 06:22:52 +00:00
u-spec-png
9d70abe4de
[ie/N1] Fix extraction of newer articles (#12514)
Authored by: u-spec-png
2025-03-04 01:51:23 +01:00
sepro
8eb9c1bf3b
[ie/RTP] Rework extractor (#11638)
Closes #4661, Closes #10393, Closes #11244
Authored by: seproDev, vallovic, red-acid, pferreir, somini

Co-authored-by: vallovic <vallovic@gmail.com>
Co-authored-by: red-acid <161967284+red-acid@users.noreply.github.com>
Co-authored-by: Pedro Ferreira <pedro@dete.st>
Co-authored-by: somini <dev@somini.xyz>
2025-03-04 00:46:18 +01:00
fries1234
42b7440963
[ie/tvw] Add extractor (#12271)
Authored by: fries1234
2025-03-03 23:25:30 +01:00
sepro
172d5fcd77
[ie/MagellanTV] Fix extractor (#12505)
Closes #12498
Authored by: seproDev
2025-03-03 22:55:03 +01:00
Simon Sawicki
7d18fed8f1
[networking] Add keep_header_casing extension (#11652)
Authored by: coletdjnz, Grub4K

Co-authored-by: coletdjnz <coletdjnz@protonmail.com>
2025-03-03 00:10:01 +01:00
coletdjnz
79ec2fdff7
[ie/youtube] Warn on missing formats due to SSAP (#12483)
See https://github.com/yt-dlp/yt-dlp/issues/12482

Authored by: coletdjnz
2025-02-28 19:33:31 +13:00
17 changed files with 590 additions and 143 deletions

View File

@ -720,6 +720,15 @@ class TestHTTPRequestHandler(TestRequestHandlerBase):
rh, Request(
f'http://127.0.0.1:{self.http_port}/headers', proxies={'all': 'http://10.255.255.255'})).close()
@pytest.mark.skip_handlers_if(lambda _, handler: handler not in ['Urllib', 'CurlCFFI'], 'handler does not support keep_header_casing')
def test_keep_header_casing(self, handler):
with handler() as rh:
res = validate_and_send(
rh, Request(
f'http://127.0.0.1:{self.http_port}/headers', headers={'X-test-heaDer': 'test'}, extensions={'keep_header_casing': True})).read().decode()
assert 'X-test-heaDer: test' in res
@pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
class TestClientCertificate:
@ -1289,6 +1298,7 @@ class TestRequestHandlerValidation:
({'legacy_ssl': False}, False),
({'legacy_ssl': True}, False),
({'legacy_ssl': 'notabool'}, AssertionError),
({'keep_header_casing': True}, UnsupportedRequest),
]),
('Requests', 'http', [
({'cookiejar': 'notacookiejar'}, AssertionError),
@ -1299,6 +1309,9 @@ class TestRequestHandlerValidation:
({'legacy_ssl': False}, False),
({'legacy_ssl': True}, False),
({'legacy_ssl': 'notabool'}, AssertionError),
({'keep_header_casing': False}, False),
({'keep_header_casing': True}, False),
({'keep_header_casing': 'notabool'}, AssertionError),
]),
('CurlCFFI', 'http', [
({'cookiejar': 'notacookiejar'}, AssertionError),

View File

@ -3,19 +3,20 @@
# Allow direct execution
import os
import sys
import unittest
import unittest.mock
import warnings
import datetime as dt
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import contextlib
import datetime as dt
import io
import itertools
import json
import pickle
import subprocess
import unittest
import unittest.mock
import warnings
import xml.etree.ElementTree
from yt_dlp.compat import (
@ -2087,21 +2088,26 @@ Line 1
headers = HTTPHeaderDict()
headers['ytdl-test'] = b'0'
self.assertEqual(list(headers.items()), [('Ytdl-Test', '0')])
self.assertEqual(list(headers.sensitive().items()), [('ytdl-test', '0')])
headers['ytdl-test'] = 1
self.assertEqual(list(headers.items()), [('Ytdl-Test', '1')])
self.assertEqual(list(headers.sensitive().items()), [('ytdl-test', '1')])
headers['Ytdl-test'] = '2'
self.assertEqual(list(headers.items()), [('Ytdl-Test', '2')])
self.assertEqual(list(headers.sensitive().items()), [('Ytdl-test', '2')])
self.assertTrue('ytDl-Test' in headers)
self.assertEqual(str(headers), str(dict(headers)))
self.assertEqual(repr(headers), str(dict(headers)))
headers.update({'X-dlp': 'data'})
self.assertEqual(set(headers.items()), {('Ytdl-Test', '2'), ('X-Dlp', 'data')})
self.assertEqual(set(headers.sensitive().items()), {('Ytdl-test', '2'), ('X-dlp', 'data')})
self.assertEqual(dict(headers), {'Ytdl-Test': '2', 'X-Dlp': 'data'})
self.assertEqual(len(headers), 2)
self.assertEqual(headers.copy(), headers)
headers2 = HTTPHeaderDict({'X-dlp': 'data3'}, **headers, **{'X-dlp': 'data2'})
headers2 = HTTPHeaderDict({'X-dlp': 'data3'}, headers, **{'X-dlP': 'data2'})
self.assertEqual(set(headers2.items()), {('Ytdl-Test', '2'), ('X-Dlp', 'data2')})
self.assertEqual(set(headers2.sensitive().items()), {('Ytdl-test', '2'), ('X-dlP', 'data2')})
self.assertEqual(len(headers2), 2)
headers2.clear()
self.assertEqual(len(headers2), 0)
@ -2109,16 +2115,23 @@ Line 1
# ensure we prefer latter headers
headers3 = HTTPHeaderDict({'Ytdl-TeSt': 1}, {'Ytdl-test': 2})
self.assertEqual(set(headers3.items()), {('Ytdl-Test', '2')})
self.assertEqual(set(headers3.sensitive().items()), {('Ytdl-test', '2')})
del headers3['ytdl-tesT']
self.assertEqual(dict(headers3), {})
headers4 = HTTPHeaderDict({'ytdl-test': 'data;'})
self.assertEqual(set(headers4.items()), {('Ytdl-Test', 'data;')})
self.assertEqual(set(headers4.sensitive().items()), {('ytdl-test', 'data;')})
# common mistake: strip whitespace from values
# https://github.com/yt-dlp/yt-dlp/issues/8729
headers5 = HTTPHeaderDict({'ytdl-test': ' data; '})
self.assertEqual(set(headers5.items()), {('Ytdl-Test', 'data;')})
self.assertEqual(set(headers5.sensitive().items()), {('ytdl-test', 'data;')})
# test if picklable
headers6 = HTTPHeaderDict(a=1, b=2)
self.assertEqual(pickle.loads(pickle.dumps(headers6)), headers6)
def test_extract_basic_auth(self):
assert extract_basic_auth('http://:foo.bar') == ('http://:foo.bar', None)

View File

@ -44,7 +44,7 @@ def websocket_handler(websocket):
return websocket.send('2')
elif isinstance(message, str):
if message == 'headers':
return websocket.send(json.dumps(dict(websocket.request.headers)))
return websocket.send(json.dumps(dict(websocket.request.headers.raw_items())))
elif message == 'path':
return websocket.send(websocket.request.path)
elif message == 'source_address':
@ -266,18 +266,18 @@ class TestWebsSocketRequestHandlerConformance:
with handler(cookiejar=cookiejar) as rh:
ws = ws_validate_and_send(rh, Request(self.ws_base_url))
ws.send('headers')
assert json.loads(ws.recv())['cookie'] == 'test=ytdlp'
assert HTTPHeaderDict(json.loads(ws.recv()))['cookie'] == 'test=ytdlp'
ws.close()
with handler() as rh:
ws = ws_validate_and_send(rh, Request(self.ws_base_url))
ws.send('headers')
assert 'cookie' not in json.loads(ws.recv())
assert 'cookie' not in HTTPHeaderDict(json.loads(ws.recv()))
ws.close()
ws = ws_validate_and_send(rh, Request(self.ws_base_url, extensions={'cookiejar': cookiejar}))
ws.send('headers')
assert json.loads(ws.recv())['cookie'] == 'test=ytdlp'
assert HTTPHeaderDict(json.loads(ws.recv()))['cookie'] == 'test=ytdlp'
ws.close()
@pytest.mark.skip_handler('Websockets', 'Set-Cookie not supported by websockets')
@ -287,7 +287,7 @@ class TestWebsSocketRequestHandlerConformance:
ws_validate_and_send(rh, Request(f'{self.ws_base_url}/get_cookie', extensions={'cookiejar': YoutubeDLCookieJar()}))
ws = ws_validate_and_send(rh, Request(self.ws_base_url, extensions={'cookiejar': YoutubeDLCookieJar()}))
ws.send('headers')
assert 'cookie' not in json.loads(ws.recv())
assert 'cookie' not in HTTPHeaderDict(json.loads(ws.recv()))
ws.close()
@pytest.mark.skip_handler('Websockets', 'Set-Cookie not supported by websockets')
@ -298,12 +298,12 @@ class TestWebsSocketRequestHandlerConformance:
ws_validate_and_send(rh, Request(f'{self.ws_base_url}/get_cookie'))
ws = ws_validate_and_send(rh, Request(self.ws_base_url))
ws.send('headers')
assert json.loads(ws.recv())['cookie'] == 'test=ytdlp'
assert HTTPHeaderDict(json.loads(ws.recv()))['cookie'] == 'test=ytdlp'
ws.close()
cookiejar.clear_session_cookies()
ws = ws_validate_and_send(rh, Request(self.ws_base_url))
ws.send('headers')
assert 'cookie' not in json.loads(ws.recv())
assert 'cookie' not in HTTPHeaderDict(json.loads(ws.recv()))
ws.close()
def test_source_address(self, handler):
@ -341,6 +341,14 @@ class TestWebsSocketRequestHandlerConformance:
assert headers['test3'] == 'test3'
ws.close()
def test_keep_header_casing(self, handler):
with handler(headers=HTTPHeaderDict({'x-TeSt1': 'test'})) as rh:
ws = ws_validate_and_send(rh, Request(self.ws_base_url, headers={'x-TeSt2': 'test'}, extensions={'keep_header_casing': True}))
ws.send('headers')
headers = json.loads(ws.recv())
assert 'x-TeSt1' in headers
assert 'x-TeSt2' in headers
@pytest.mark.parametrize('client_cert', (
{'client_certificate': os.path.join(MTLS_CERT_DIR, 'clientwithkey.crt')},
{

View File

@ -2224,6 +2224,7 @@ from .tvplay import (
TVPlayIE,
)
from .tvplayer import TVPlayerIE
from .tvw import TvwIE
from .tweakers import TweakersIE
from .twentymin import TwentyMinutenIE
from .twentythreevideo import TwentyThreeVideoIE

View File

@ -1,35 +1,36 @@
from .common import InfoExtractor
from ..utils import parse_age_limit, parse_duration, traverse_obj
from ..utils import parse_age_limit, parse_duration, url_or_none
from ..utils.traversal import traverse_obj
class MagellanTVIE(InfoExtractor):
_VALID_URL = r'https?://(?:www\.)?magellantv\.com/(?:watch|video)/(?P<id>[\w-]+)'
_TESTS = [{
'url': 'https://www.magellantv.com/watch/my-dads-on-death-row?type=v',
'url': 'https://www.magellantv.com/watch/incas-the-new-story?type=v',
'info_dict': {
'id': 'my-dads-on-death-row',
'id': 'incas-the-new-story',
'ext': 'mp4',
'title': 'My Dad\'s On Death Row',
'description': 'md5:33ba23b9f0651fc4537ed19b1d5b0d7a',
'duration': 3780.0,
'title': 'Incas: The New Story',
'description': 'md5:936c7f6d711c02dfb9db22a067b586fe',
'age_limit': 14,
'tags': ['Justice', 'Reality', 'United States', 'True Crime'],
'duration': 3060.0,
'tags': ['Ancient History', 'Archaeology', 'Anthropology'],
},
'params': {'skip_download': 'm3u8'},
}, {
'url': 'https://www.magellantv.com/video/james-bulger-the-new-revelations',
'url': 'https://www.magellantv.com/video/tortured-to-death-murdering-the-nanny',
'info_dict': {
'id': 'james-bulger-the-new-revelations',
'id': 'tortured-to-death-murdering-the-nanny',
'ext': 'mp4',
'title': 'James Bulger: The New Revelations',
'description': 'md5:7b97922038bad1d0fe8d0470d8a189f2',
'title': 'Tortured to Death: Murdering the Nanny',
'description': 'md5:d87033594fa218af2b1a8b49f52511e5',
'age_limit': 14,
'duration': 2640.0,
'age_limit': 0,
'tags': ['Investigation', 'True Crime', 'Justice', 'Europe'],
'tags': ['True Crime', 'Murder'],
},
'params': {'skip_download': 'm3u8'},
}, {
'url': 'https://www.magellantv.com/watch/celebration-nation',
'url': 'https://www.magellantv.com/watch/celebration-nation?type=s',
'info_dict': {
'id': 'celebration-nation',
'ext': 'mp4',
@ -43,10 +44,19 @@ class MagellanTVIE(InfoExtractor):
def _real_extract(self, url):
video_id = self._match_id(url)
webpage = self._download_webpage(url, video_id)
data = traverse_obj(self._search_nextjs_data(webpage, video_id), (
'props', 'pageProps', 'reactContext',
(('video', 'detail'), ('series', 'currentEpisode')), {dict}), get_all=False)
formats, subtitles = self._extract_m3u8_formats_and_subtitles(data['jwpVideoUrl'], video_id)
context = self._search_nextjs_data(webpage, video_id)['props']['pageProps']['reactContext']
data = traverse_obj(context, ((('video', 'detail'), ('series', 'currentEpisode')), {dict}, any))
formats, subtitles = [], {}
for m3u8_url in set(traverse_obj(data, ((('manifests', ..., 'hls'), 'jwp_video_url'), {url_or_none}))):
fmts, subs = self._extract_m3u8_formats_and_subtitles(
m3u8_url, video_id, 'mp4', m3u8_id='hls', fatal=False)
formats.extend(fmts)
self._merge_subtitles(subs, target=subtitles)
if not formats and (error := traverse_obj(context, ('errorDetailPage', 'errorMessage', {str}))):
if 'available in your country' in error:
self.raise_geo_restricted(msg=error)
self.raise_no_formats(f'{self.IE_NAME} said: {error}', expected=True)
return {
'id': video_id,

View File

@ -4,7 +4,9 @@ from .common import InfoExtractor
from ..utils import (
extract_attributes,
unified_timestamp,
url_or_none,
)
from ..utils.traversal import traverse_obj
class N1InfoAssetIE(InfoExtractor):
@ -35,9 +37,9 @@ class N1InfoIIE(InfoExtractor):
IE_NAME = 'N1Info:article'
_VALID_URL = r'https?://(?:(?:\w+\.)?n1info\.\w+|nova\.rs)/(?:[^/?#]+/){1,2}(?P<id>[^/?#]+)'
_TESTS = [{
# Youtube embedded
# YouTube embedded
'url': 'https://rs.n1info.com/sport-klub/tenis/kako-je-djokovic-propustio-istorijsku-priliku-video/',
'md5': '01ddb6646d0fd9c4c7d990aa77fe1c5a',
'md5': '987ce6fd72acfecc453281e066b87973',
'info_dict': {
'id': 'L5Hd4hQVUpk',
'ext': 'mp4',
@ -45,7 +47,26 @@ class N1InfoIIE(InfoExtractor):
'title': 'Ozmo i USO21, ep. 13: Novak Đoković Danil Medvedev | Ključevi Poraza, Budućnost | SPORT KLUB TENIS',
'description': 'md5:467f330af1effedd2e290f10dc31bb8e',
'uploader': 'Sport Klub',
'uploader_id': 'sportklub',
'uploader_id': '@sportklub',
'uploader_url': 'https://www.youtube.com/@sportklub',
'channel': 'Sport Klub',
'channel_id': 'UChpzBje9Ro6CComXe3BgNaw',
'channel_url': 'https://www.youtube.com/channel/UChpzBje9Ro6CComXe3BgNaw',
'channel_is_verified': True,
'channel_follower_count': int,
'comment_count': int,
'view_count': int,
'like_count': int,
'age_limit': 0,
'duration': 1049,
'thumbnail': 'https://i.ytimg.com/vi/L5Hd4hQVUpk/maxresdefault.jpg',
'chapters': 'count:9',
'categories': ['Sports'],
'tags': 'count:10',
'timestamp': 1631522787,
'playable_in_embed': True,
'availability': 'public',
'live_status': 'not_live',
},
}, {
'url': 'https://rs.n1info.com/vesti/djilas-los-plan-za-metro-nece-resiti-nijedan-saobracajni-problem/',
@ -55,6 +76,7 @@ class N1InfoIIE(InfoExtractor):
'title': 'Đilas: Predlog izgradnje metroa besmislen; SNS odbacuje navode',
'upload_date': '20210924',
'timestamp': 1632481347,
'thumbnail': 'http://n1info.rs/wp-content/themes/ucnewsportal-n1/dist/assets/images/placeholder-image-video.jpg',
},
'params': {
'skip_download': True,
@ -67,6 +89,7 @@ class N1InfoIIE(InfoExtractor):
'title': 'Zadnji dnevi na kopališču Ilirija: “Ilirija ni umrla, ubili so jo”',
'timestamp': 1632567630,
'upload_date': '20210925',
'thumbnail': 'https://n1info.si/wp-content/uploads/2021/09/06/1630945843-tomaz3.png',
},
'params': {
'skip_download': True,
@ -81,6 +104,14 @@ class N1InfoIIE(InfoExtractor):
'upload_date': '20210924',
'timestamp': 1632448649.0,
'uploader': 'YouLotWhatDontStop',
'display_id': 'pu9wbx',
'channel_id': 'serbia',
'comment_count': int,
'like_count': int,
'dislike_count': int,
'age_limit': 0,
'duration': 134,
'thumbnail': 'https://external-preview.redd.it/5nmmawSeGx60miQM3Iq-ueC9oyCLTLjjqX-qqY8uRsc.png?format=pjpg&auto=webp&s=2f973400b04d23f871b608b178e47fc01f9b8f1d',
},
'params': {
'skip_download': True,
@ -93,6 +124,7 @@ class N1InfoIIE(InfoExtractor):
'title': 'Žaklina Tatalović Ani Brnabić: Pričate laži (VIDEO)',
'upload_date': '20211102',
'timestamp': 1635861677,
'thumbnail': 'https://nova.rs/wp-content/uploads/2021/11/02/1635860298-TNJG_Ana_Brnabic_i_Zaklina_Tatalovic_100_dana_Vlade_GP.jpg',
},
}, {
'url': 'https://n1info.rs/vesti/cuta-biti-u-kosovskoj-mitrovici-znaci-da-te-docekaju-eksplozivnim-napravama/',
@ -104,6 +136,16 @@ class N1InfoIIE(InfoExtractor):
'timestamp': 1687290536,
'thumbnail': 'https://cdn.brid.tv/live/partners/26827/snapshot/1332368_th_6492013a8356f_1687290170.jpg',
},
}, {
'url': 'https://n1info.rs/vesti/vuciceva-turneja-po-srbiji-najavljuje-kontrarevoluciju-preti-svom-narodu-vredja-novinare/',
'info_dict': {
'id': '2025974',
'ext': 'mp4',
'title': 'Vučićeva turneja po Srbiji: Najavljuje kontrarevoluciju, preti svom narodu, vređa novinare',
'thumbnail': 'https://cdn-uc.brid.tv/live/partners/26827/snapshot/2025974_fhd_67c4a23280a81_1740939826.jpg',
'timestamp': 1740939936,
'upload_date': '20250302',
},
}, {
'url': 'https://hr.n1info.com/vijesti/pravobraniteljica-o-ubojstvu-u-zagrebu-radi-se-o-doista-nezapamcenoj-situaciji/',
'only_matching': True,
@ -115,11 +157,11 @@ class N1InfoIIE(InfoExtractor):
title = self._html_search_regex(r'<h1[^>]+>(.+?)</h1>', webpage, 'title')
timestamp = unified_timestamp(self._html_search_meta('article:published_time', webpage))
plugin_data = self._html_search_meta('BridPlugin', webpage)
plugin_data = re.findall(r'\$bp\("(?:Brid|TargetVideo)_\d+",\s(.+)\);', webpage)
entries = []
if plugin_data:
site_id = self._html_search_regex(r'site:(\d+)', webpage, 'site id')
for video_data in re.findall(r'\$bp\("Brid_\d+", (.+)\);', webpage):
for video_data in plugin_data:
video_id = self._parse_json(video_data, title)['video']
entries.append({
'id': video_id,
@ -140,7 +182,7 @@ class N1InfoIIE(InfoExtractor):
'url': video_data.get('data-url'),
'id': video_data.get('id'),
'title': title,
'thumbnail': video_data.get('data-thumbnail'),
'thumbnail': traverse_obj(video_data, (('data-thumbnail', 'data-default_thumbnail'), {url_or_none}, any)),
'timestamp': timestamp,
'ie_key': 'N1InfoAsset',
})
@ -152,7 +194,7 @@ class N1InfoIIE(InfoExtractor):
if url.startswith('https://www.youtube.com'):
entries.append(self.url_result(url, ie='Youtube'))
elif url.startswith('https://www.redditmedia.com'):
entries.append(self.url_result(url, ie='RedditR'))
entries.append(self.url_result(url, ie='Reddit'))
return {
'_type': 'playlist',

View File

@ -23,9 +23,9 @@ class PinterestBaseIE(InfoExtractor):
def _call_api(self, resource, video_id, options):
return self._download_json(
f'https://www.pinterest.com/resource/{resource}Resource/get/',
video_id, f'Download {resource} JSON metadata', query={
'data': json.dumps({'options': options}),
})['resource_response']
video_id, f'Download {resource} JSON metadata',
query={'data': json.dumps({'options': options})},
headers={'X-Pinterest-PWS-Handler': 'www/[username].js'})['resource_response']
def _extract_video(self, data, extract_formats=True):
video_id = data['id']

View File

@ -3,12 +3,20 @@ import json
import re
import urllib.parse
from .common import InfoExtractor
from ..utils import js_to_json
from .common import InfoExtractor, Request
from ..utils import (
determine_ext,
int_or_none,
js_to_json,
parse_duration,
parse_iso8601,
url_or_none,
)
from ..utils.traversal import traverse_obj
class RTPIE(InfoExtractor):
_VALID_URL = r'https?://(?:www\.)?rtp\.pt/play/(?:(?:estudoemcasa|palco|zigzag)/)?p(?P<program_id>[0-9]+)/(?P<id>[^/?#]+)'
_VALID_URL = r'https?://(?:www\.)?rtp\.pt/play/(?:[^/#?]+/)?p(?P<program_id>\d+)/(?P<id>e\d+)'
_TESTS = [{
'url': 'http://www.rtp.pt/play/p405/e174042/paixoes-cruzadas',
'md5': 'e736ce0c665e459ddb818546220b4ef8',
@ -16,99 +24,173 @@ class RTPIE(InfoExtractor):
'id': 'e174042',
'ext': 'mp3',
'title': 'Paixões Cruzadas',
'description': 'As paixões musicais de António Cartaxo e António Macedo',
'description': 'md5:af979e58ba0ab73f78435fc943fdb070',
'thumbnail': r're:^https?://.*\.jpg',
'series': 'Paixões Cruzadas',
'duration': 2950.0,
'modified_timestamp': 1553693464,
'modified_date': '20190327',
'timestamp': 1417219200,
'upload_date': '20141129',
},
}, {
'url': 'https://www.rtp.pt/play/zigzag/p13166/e757904/25-curiosidades-25-de-abril',
'md5': '9a81ed53f2b2197cfa7ed455b12f8ade',
'md5': '5b4859940e3adef61247a77dfb76046a',
'info_dict': {
'id': 'e757904',
'ext': 'mp4',
'title': '25 Curiosidades, 25 de Abril',
'description': 'Estudar ou não estudar - Em cada um dos episódios descobrimos uma curiosidade acerca de como era viver em Portugal antes da revolução do 25 de abr',
'title': 'Estudar ou não estudar',
'description': 'md5:3bfd7eb8bebfd5711a08df69c9c14c35',
'thumbnail': r're:^https?://.*\.jpg',
'timestamp': 1711958401,
'duration': 146.0,
'upload_date': '20240401',
'modified_timestamp': 1712242991,
'series': '25 Curiosidades, 25 de Abril',
'episode_number': 2,
'episode': 'Estudar ou não estudar',
'modified_date': '20240404',
},
}, {
'url': 'http://www.rtp.pt/play/p831/a-quimica-das-coisas',
'only_matching': True,
}, {
'url': 'https://www.rtp.pt/play/estudoemcasa/p7776/portugues-1-ano',
'only_matching': True,
}, {
'url': 'https://www.rtp.pt/play/palco/p13785/l7nnon',
'only_matching': True,
# Episode not accessible through API
'url': 'https://www.rtp.pt/play/estudoemcasa/p7776/e500050/portugues-1-ano',
'md5': '57660c0b46db9f22118c52cbd65975e4',
'info_dict': {
'id': 'e500050',
'ext': 'mp4',
'title': 'Português - 1.º ano',
'duration': 1669.0,
'description': 'md5:be68925c81269f8c6886589f25fe83ea',
'upload_date': '20201020',
'timestamp': 1603180799,
'thumbnail': 'https://cdn-images.rtp.pt/EPG/imagens/39482_59449_64850.png?v=3&w=860',
},
}]
_USER_AGENT = 'rtpplay/2.0.66 (pt.rtp.rtpplay; build:2066; iOS 15.8.3) Alamofire/5.9.1'
_AUTH_TOKEN = None
def _fetch_auth_token(self):
if self._AUTH_TOKEN:
return self._AUTH_TOKEN
self._AUTH_TOKEN = traverse_obj(self._download_json(Request(
'https://rtpplayapi.rtp.pt/play/api/2/token-manager',
headers={
'Accept': '*/*',
'rtp-play-auth': 'RTPPLAY_MOBILE_IOS',
'rtp-play-auth-hash': 'fac9c328b2f27e26e03d7f8942d66c05b3e59371e16c2a079f5c83cc801bd3ee',
'rtp-play-auth-timestamp': '2145973229682',
'User-Agent': self._USER_AGENT,
}, extensions={'keep_header_casing': True}), None,
note='Fetching guest auth token', errnote='Could not fetch guest auth token',
fatal=False), ('token', 'token', {str}))
return self._AUTH_TOKEN
@staticmethod
def _cleanup_media_url(url):
if urllib.parse.urlparse(url).netloc == 'streaming-ondemand.rtp.pt':
return None
return url.replace('/drm-fps/', '/hls/').replace('/drm-dash/', '/dash/')
def _extract_formats(self, media_urls, episode_id):
formats = []
subtitles = {}
for media_url in set(traverse_obj(media_urls, (..., {url_or_none}, {self._cleanup_media_url}))):
ext = determine_ext(media_url)
if ext == 'm3u8':
fmts, subs = self._extract_m3u8_formats_and_subtitles(
media_url, episode_id, m3u8_id='hls', fatal=False)
formats.extend(fmts)
self._merge_subtitles(subs, target=subtitles)
elif ext == 'mpd':
fmts, subs = self._extract_mpd_formats_and_subtitles(
media_url, episode_id, mpd_id='dash', fatal=False)
formats.extend(fmts)
self._merge_subtitles(subs, target=subtitles)
else:
formats.append({
'url': media_url,
'format_id': 'http',
})
return formats, subtitles
def _extract_from_api(self, program_id, episode_id):
auth_token = self._fetch_auth_token()
if not auth_token:
return
episode_data = traverse_obj(self._download_json(
f'https://www.rtp.pt/play/api/1/get-episode/{program_id}/{episode_id[1:]}', episode_id,
query={'include_assets': 'true', 'include_webparams': 'true'},
headers={
'Accept': '*/*',
'Authorization': f'Bearer {auth_token}',
'User-Agent': self._USER_AGENT,
}, fatal=False), 'result', {dict})
if not episode_data:
return
asset_urls = traverse_obj(episode_data, ('assets', 0, 'asset_url', {dict}))
media_urls = traverse_obj(asset_urls, (
((('hls', 'dash'), 'stream_url'), ('multibitrate', ('url_hls', 'url_dash'))),))
formats, subtitles = self._extract_formats(media_urls, episode_id)
for sub_data in traverse_obj(asset_urls, ('subtitles', 'vtt_list', lambda _, v: url_or_none(v['file']))):
subtitles.setdefault(sub_data.get('code') or 'pt', []).append({
'url': sub_data['file'],
'name': sub_data.get('language'),
})
return {
'id': episode_id,
'formats': formats,
'subtitles': subtitles,
'thumbnail': traverse_obj(episode_data, ('assets', 0, 'asset_thumbnail', {url_or_none})),
**traverse_obj(episode_data, ('episode', {
'title': (('episode_title', 'program_title'), {str}, filter, any),
'alt_title': ('episode_subtitle', {str}, filter),
'description': (('episode_description', 'episode_summary'), {str}, filter, any),
'timestamp': ('episode_air_date', {parse_iso8601(delimiter=' ')}),
'modified_timestamp': ('episode_lastchanged', {parse_iso8601(delimiter=' ')}),
'duration': ('episode_duration_complete', {parse_duration}),
'episode': ('episode_title', {str}, filter),
'episode_number': ('episode_number', {int_or_none}),
'season': ('program_season', {str}, filter),
'series': ('program_title', {str}, filter),
})),
}
_RX_OBFUSCATION = re.compile(r'''(?xs)
atob\s*\(\s*decodeURIComponent\s*\(\s*
(\[[0-9A-Za-z%,'"]*\])
\s*\.\s*join\(\s*(?:""|'')\s*\)\s*\)\s*\)
''')
def __unobfuscate(self, data, *, video_id):
if data.startswith('{'):
data = self._RX_OBFUSCATION.sub(
lambda m: json.dumps(
base64.b64decode(urllib.parse.unquote(
''.join(self._parse_json(m.group(1), video_id)),
)).decode('iso-8859-1')),
data)
return js_to_json(data)
def __unobfuscate(self, data):
return self._RX_OBFUSCATION.sub(
lambda m: json.dumps(
base64.b64decode(urllib.parse.unquote(
''.join(json.loads(m.group(1))),
)).decode('iso-8859-1')),
data)
def _real_extract(self, url):
video_id = self._match_id(url)
webpage = self._download_webpage(url, video_id)
title = self._html_search_meta(
'twitter:title', webpage, display_name='title', fatal=True)
f, config = self._search_regex(
r'''(?sx)
(?:var\s+f\s*=\s*(?P<f>".*?"|{[^;]+?});\s*)?
var\s+player1\s+=\s+new\s+RTPPlayer\s*\((?P<config>{(?:(?!\*/).)+?})\);(?!\s*\*/)
''', webpage,
'player config', group=('f', 'config'))
config = self._parse_json(
config, video_id,
lambda data: self.__unobfuscate(data, video_id=video_id))
f = config['file'] if not f else self._parse_json(
f, video_id,
lambda data: self.__unobfuscate(data, video_id=video_id))
def _extract_from_html(self, url, episode_id):
webpage = self._download_webpage(url, episode_id)
formats = []
if isinstance(f, dict):
f_hls = f.get('hls')
if f_hls is not None:
formats.extend(self._extract_m3u8_formats(
f_hls, video_id, 'mp4', 'm3u8_native', m3u8_id='hls'))
f_dash = f.get('dash')
if f_dash is not None:
formats.extend(self._extract_mpd_formats(f_dash, video_id, mpd_id='dash'))
else:
formats.append({
'format_id': 'f',
'url': f,
'vcodec': 'none' if config.get('mediaType') == 'audio' else None,
})
subtitles = {}
vtt = config.get('vtt')
if vtt is not None:
for lcode, lname, url in vtt:
subtitles.setdefault(lcode, []).append({
'name': lname,
'url': url,
})
media_urls = traverse_obj(re.findall(r'(?:var\s+f\s*=|RTPPlayer\({[^}]+file:)\s*({[^}]+}|"[^"]+")', webpage), (
-1, (({self.__unobfuscate}, {js_to_json}, {json.loads}, {dict.values}, ...), {json.loads})))
formats, subtitles = self._extract_formats(media_urls, episode_id)
return {
'id': video_id,
'title': title,
'id': episode_id,
'formats': formats,
'description': self._html_search_meta(['description', 'twitter:description'], webpage),
'thumbnail': config.get('poster') or self._og_search_thumbnail(webpage),
'subtitles': subtitles,
'description': self._html_search_meta(['og:description', 'twitter:description'], webpage, default=None),
'thumbnail': self._html_search_meta(['og:image', 'twitter:image'], webpage, default=None),
**self._search_json_ld(webpage, episode_id, default={}),
'title': self._html_search_meta(['og:title', 'twitter:title'], webpage, default=None),
}
def _real_extract(self, url):
program_id, episode_id = self._match_valid_url(url).group('program_id', 'id')
return self._extract_from_api(program_id, episode_id) or self._extract_from_html(url, episode_id)

117
yt_dlp/extractor/tvw.py Normal file
View File

@ -0,0 +1,117 @@
import json
from .common import InfoExtractor
from ..utils import clean_html, remove_end, unified_timestamp, url_or_none
from ..utils.traversal import traverse_obj
class TvwIE(InfoExtractor):
_VALID_URL = r'https?://(?:www\.)?tvw\.org/video/(?P<id>[^/?#]+)'
_TESTS = [{
'url': 'https://tvw.org/video/billy-frank-jr-statue-maquette-unveiling-ceremony-2024011211/',
'md5': '9ceb94fe2bb7fd726f74f16356825703',
'info_dict': {
'id': '2024011211',
'ext': 'mp4',
'title': 'Billy Frank Jr. Statue Maquette Unveiling Ceremony',
'thumbnail': r're:^https?://.*\.(?:jpe?g|png)$',
'description': 'md5:58a8150017d985b4f377e11ee8f6f36e',
'timestamp': 1704902400,
'upload_date': '20240110',
'location': 'Legislative Building',
'display_id': 'billy-frank-jr-statue-maquette-unveiling-ceremony-2024011211',
'categories': ['General Interest'],
},
}, {
'url': 'https://tvw.org/video/ebeys-landing-state-park-2024081007/',
'md5': '71e87dae3deafd65d75ff3137b9a32fc',
'info_dict': {
'id': '2024081007',
'ext': 'mp4',
'title': 'Ebey\'s Landing State Park',
'thumbnail': r're:^https?://.*\.(?:jpe?g|png)$',
'description': 'md5:50c5bd73bde32fa6286a008dbc853386',
'timestamp': 1724310900,
'upload_date': '20240822',
'location': 'Ebeys Landing State Park',
'display_id': 'ebeys-landing-state-park-2024081007',
'categories': ['Washington State Parks'],
},
}, {
'url': 'https://tvw.org/video/home-warranties-workgroup-2',
'md5': 'f678789bf94d07da89809f213cf37150',
'info_dict': {
'id': '1999121000',
'ext': 'mp4',
'title': 'Home Warranties Workgroup',
'thumbnail': r're:^https?://.*\.(?:jpe?g|png)$',
'description': 'md5:861396cc523c9641d0dce690bc5c35f3',
'timestamp': 946389600,
'upload_date': '19991228',
'display_id': 'home-warranties-workgroup-2',
'categories': ['Legislative'],
},
}, {
'url': 'https://tvw.org/video/washington-to-washington-a-new-space-race-2022041111/?eventID=2022041111',
'md5': '6f5551090b351aba10c0d08a881b4f30',
'info_dict': {
'id': '2022041111',
'ext': 'mp4',
'title': 'Washington to Washington - A New Space Race',
'thumbnail': r're:^https?://.*\.(?:jpe?g|png)$',
'description': 'md5:f65a24eec56107afbcebb3aa5cd26341',
'timestamp': 1650394800,
'upload_date': '20220419',
'location': 'Hayner Media Center',
'display_id': 'washington-to-washington-a-new-space-race-2022041111',
'categories': ['Washington to Washington', 'General Interest'],
},
}]
def _real_extract(self, url):
display_id = self._match_id(url)
webpage = self._download_webpage(url, display_id)
client_id = self._html_search_meta('clientID', webpage, fatal=True)
video_id = self._html_search_meta('eventID', webpage, fatal=True)
video_data = self._download_json(
'https://api.v3.invintus.com/v2/Event/getDetailed', video_id,
headers={
'authorization': 'embedder',
'wsc-api-key': '7WhiEBzijpritypp8bqcU7pfU9uicDR',
},
data=json.dumps({
'clientID': client_id,
'eventID': video_id,
'showStreams': True,
}).encode())['data']
formats = []
subtitles = {}
for stream_url in traverse_obj(video_data, ('streamingURIs', ..., {url_or_none})):
fmts, subs = self._extract_m3u8_formats_and_subtitles(
stream_url, video_id, 'mp4', m3u8_id='hls', fatal=False)
formats.extend(fmts)
self._merge_subtitles(subs, target=subtitles)
if caption_url := traverse_obj(video_data, ('captionPath', {url_or_none})):
subtitles.setdefault('en', []).append({'url': caption_url, 'ext': 'vtt'})
return {
'id': video_id,
'display_id': display_id,
'formats': formats,
'subtitles': subtitles,
'title': remove_end(self._og_search_title(webpage, default=None), ' - TVW'),
'description': self._og_search_description(webpage, default=None),
**traverse_obj(video_data, {
'title': ('title', {str}),
'description': ('description', {clean_html}),
'categories': ('categories', ..., {str}),
'thumbnail': ('videoThumbnail', {url_or_none}),
'timestamp': ('startDateTime', {unified_timestamp}),
'location': ('locationName', {str}),
'is_live': ('eventStatus', {lambda x: x == 'live'}),
}),
}

View File

@ -1334,7 +1334,7 @@ class TwitterIE(TwitterBaseIE):
def _generate_syndication_token(self, twid):
# ((Number(twid) / 1e15) * Math.PI).toString(36).replace(/(0+|\.)/g, '')
translation = str.maketrans(dict.fromkeys('0.'))
return js_number_to_string((int(twid) / 1e15) * math.PI, 36).translate(translation)
return js_number_to_string((int(twid) / 1e15) * math.pi, 36).translate(translation)
def _call_syndication_api(self, twid):
self.report_warning(

View File

@ -4266,6 +4266,7 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
} for range_start in range(0, f['filesize'], CHUNK_SIZE))
for fmt in streaming_formats:
client_name = fmt[STREAMING_DATA_CLIENT_NAME]
if fmt.get('targetDurationSec'):
continue
@ -4310,6 +4311,12 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
fmt_url = url_or_none(try_get(sc, lambda x: x['url'][0]))
encrypted_sig = try_get(sc, lambda x: x['s'][0])
if not all((sc, fmt_url, player_url, encrypted_sig)):
self.report_warning(
f'Some {client_name} client formats have been skipped as they are missing a url. '
f'{"Your account" if self.is_authenticated else "The current session"} may have '
f'the SSAP (server-side ads) experiment which may be interfering with yt-dlp. '
f'Please see https://github.com/yt-dlp/yt-dlp/issues/12482 for more details.',
only_once=True)
continue
try:
fmt_url += '&{}={}'.format(
@ -4356,7 +4363,6 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
self.report_warning(
f'{video_id}: Some formats are possibly damaged. They will be deprioritized', only_once=True)
client_name = fmt[STREAMING_DATA_CLIENT_NAME]
po_token = fmt.get(STREAMING_DATA_INITIAL_PO_TOKEN)
if po_token:

View File

@ -296,6 +296,7 @@ class RequestsRH(RequestHandler, InstanceStoreMixin):
extensions.pop('cookiejar', None)
extensions.pop('timeout', None)
extensions.pop('legacy_ssl', None)
extensions.pop('keep_header_casing', None)
def _create_instance(self, cookiejar, legacy_ssl_support=None):
session = RequestsSession()
@ -312,11 +313,12 @@ class RequestsRH(RequestHandler, InstanceStoreMixin):
session.trust_env = False # no need, we already load proxies from env
return session
def _send(self, request):
headers = self._merge_headers(request.headers)
def _prepare_headers(self, _, headers):
add_accept_encoding_header(headers, SUPPORTED_ENCODINGS)
def _send(self, request):
headers = self._get_headers(request)
max_redirects_exceeded = False
session = self._get_instance(

View File

@ -379,13 +379,15 @@ class UrllibRH(RequestHandler, InstanceStoreMixin):
opener.addheaders = []
return opener
def _send(self, request):
headers = self._merge_headers(request.headers)
def _prepare_headers(self, _, headers):
add_accept_encoding_header(headers, SUPPORTED_ENCODINGS)
def _send(self, request):
headers = self._get_headers(request)
urllib_req = urllib.request.Request(
url=request.url,
data=request.data,
headers=dict(headers),
headers=headers,
method=request.method,
)

View File

@ -116,6 +116,7 @@ class WebsocketsRH(WebSocketRequestHandler):
extensions.pop('timeout', None)
extensions.pop('cookiejar', None)
extensions.pop('legacy_ssl', None)
extensions.pop('keep_header_casing', None)
def close(self):
# Remove the logging handler that contains a reference to our logger
@ -123,15 +124,16 @@ class WebsocketsRH(WebSocketRequestHandler):
for name, handler in self.__logging_handlers.items():
logging.getLogger(name).removeHandler(handler)
def _send(self, request):
timeout = self._calculate_timeout(request)
headers = self._merge_headers(request.headers)
def _prepare_headers(self, request, headers):
if 'cookie' not in headers:
cookiejar = self._get_cookiejar(request)
cookie_header = cookiejar.get_cookie_header(request.url)
if cookie_header:
headers['cookie'] = cookie_header
def _send(self, request):
timeout = self._calculate_timeout(request)
headers = self._get_headers(request)
wsuri = parse_uri(request.url)
create_conn_kwargs = {
'source_address': (self.source_address, 0) if self.source_address else None,

View File

@ -206,6 +206,7 @@ class RequestHandler(abc.ABC):
- `cookiejar`: Cookiejar to use for this request.
- `timeout`: socket timeout to use for this request.
- `legacy_ssl`: Enable legacy SSL options for this request. See legacy_ssl_support.
- `keep_header_casing`: Keep the casing of headers when sending the request.
To enable these, add extensions.pop('<extension>', None) to _check_extensions
Apart from the url protocol, proxies dict may contain the following keys:
@ -259,6 +260,23 @@ class RequestHandler(abc.ABC):
def _merge_headers(self, request_headers):
return HTTPHeaderDict(self.headers, request_headers)
def _prepare_headers(self, request: Request, headers: HTTPHeaderDict) -> None: # noqa: B027
"""Additional operations to prepare headers before building. To be extended by subclasses.
@param request: Request object
@param headers: Merged headers to prepare
"""
def _get_headers(self, request: Request) -> dict[str, str]:
"""
Get headers for external use.
Subclasses may define a _prepare_headers method to modify headers after merge but before building.
"""
headers = self._merge_headers(request.headers)
self._prepare_headers(request, headers)
if request.extensions.get('keep_header_casing'):
return headers.sensitive()
return dict(headers)
def _calculate_timeout(self, request):
return float(request.extensions.get('timeout') or self.timeout)
@ -317,6 +335,7 @@ class RequestHandler(abc.ABC):
assert isinstance(extensions.get('cookiejar'), (YoutubeDLCookieJar, NoneType))
assert isinstance(extensions.get('timeout'), (float, int, NoneType))
assert isinstance(extensions.get('legacy_ssl'), (bool, NoneType))
assert isinstance(extensions.get('keep_header_casing'), (bool, NoneType))
def _validate(self, request):
self._check_url_scheme(request)

View File

@ -5,11 +5,11 @@ from abc import ABC
from dataclasses import dataclass
from typing import Any
from .common import RequestHandler, register_preference
from .common import RequestHandler, register_preference, Request
from .exceptions import UnsupportedRequest
from ..compat.types import NoneType
from ..utils import classproperty, join_nonempty
from ..utils.networking import std_headers
from ..utils.networking import std_headers, HTTPHeaderDict
@dataclass(order=True, frozen=True)
@ -123,7 +123,17 @@ class ImpersonateRequestHandler(RequestHandler, ABC):
"""Get the requested target for the request"""
return self._resolve_target(request.extensions.get('impersonate') or self.impersonate)
def _get_impersonate_headers(self, request):
def _prepare_impersonate_headers(self, request: Request, headers: HTTPHeaderDict) -> None: # noqa: B027
"""Additional operations to prepare headers before building. To be extended by subclasses.
@param request: Request object
@param headers: Merged headers to prepare
"""
def _get_impersonate_headers(self, request: Request) -> dict[str, str]:
"""
Get headers for external impersonation use.
Subclasses may define a _prepare_impersonate_headers method to modify headers after merge but before building.
"""
headers = self._merge_headers(request.headers)
if self._get_request_target(request) is not None:
# remove all headers present in std_headers
@ -131,7 +141,11 @@ class ImpersonateRequestHandler(RequestHandler, ABC):
for k, v in std_headers.items():
if headers.get(k) == v:
headers.pop(k)
return headers
self._prepare_impersonate_headers(request, headers)
if request.extensions.get('keep_header_casing'):
return headers.sensitive()
return dict(headers)
@register_preference(ImpersonateRequestHandler)

View File

@ -1,9 +1,16 @@
from __future__ import annotations
import collections
import collections.abc
import random
import typing
import urllib.parse
import urllib.request
from ._utils import remove_start
if typing.TYPE_CHECKING:
T = typing.TypeVar('T')
from ._utils import NO_DEFAULT, remove_start
def random_user_agent():
@ -51,32 +58,141 @@ def random_user_agent():
return _USER_AGENT_TPL % random.choice(_CHROME_VERSIONS)
class HTTPHeaderDict(collections.UserDict, dict):
class HTTPHeaderDict(dict):
"""
Store and access keys case-insensitively.
The constructor can take multiple dicts, in which keys in the latter are prioritised.
Retains a case sensitive mapping of the headers, which can be accessed via `.sensitive()`.
"""
def __new__(cls, *args: typing.Any, **kwargs: typing.Any) -> typing.Self:
obj = dict.__new__(cls, *args, **kwargs)
obj.__sensitive_map = {}
return obj
def __init__(self, *args, **kwargs):
def __init__(self, /, *args, **kwargs):
super().__init__()
for dct in args:
if dct is not None:
self.update(dct)
self.update(kwargs)
self.__sensitive_map = {}
def __setitem__(self, key, value):
if isinstance(value, bytes):
value = value.decode('latin-1')
super().__setitem__(key.title(), str(value).strip())
for dct in filter(None, args):
self.update(dct)
if kwargs:
self.update(kwargs)
def __getitem__(self, key):
def sensitive(self, /) -> dict[str, str]:
return {
self.__sensitive_map[key]: value
for key, value in self.items()
}
def __contains__(self, key: str, /) -> bool:
return super().__contains__(key.title() if isinstance(key, str) else key)
def __delitem__(self, key: str, /) -> None:
key = key.title()
del self.__sensitive_map[key]
super().__delitem__(key)
def __getitem__(self, key, /) -> str:
return super().__getitem__(key.title())
def __delitem__(self, key):
super().__delitem__(key.title())
def __ior__(self, other, /):
if isinstance(other, type(self)):
other = other.sensitive()
if isinstance(other, dict):
self.update(other)
return
return NotImplemented
def __contains__(self, key):
return super().__contains__(key.title() if isinstance(key, str) else key)
def __or__(self, other, /) -> typing.Self:
if isinstance(other, type(self)):
other = other.sensitive()
if isinstance(other, dict):
return type(self)(self.sensitive(), other)
return NotImplemented
def __ror__(self, other, /) -> typing.Self:
if isinstance(other, type(self)):
other = other.sensitive()
if isinstance(other, dict):
return type(self)(other, self.sensitive())
return NotImplemented
def __setitem__(self, key: str, value, /) -> None:
if isinstance(value, bytes):
value = value.decode('latin-1')
key_title = key.title()
self.__sensitive_map[key_title] = key
super().__setitem__(key_title, str(value).strip())
def clear(self, /) -> None:
self.__sensitive_map.clear()
super().clear()
def copy(self, /) -> typing.Self:
return type(self)(self.sensitive())
@typing.overload
def get(self, key: str, /) -> str | None: ...
@typing.overload
def get(self, key: str, /, default: T) -> str | T: ...
def get(self, key, /, default=NO_DEFAULT):
key = key.title()
if default is NO_DEFAULT:
return super().get(key)
return super().get(key, default)
@typing.overload
def pop(self, key: str, /) -> str: ...
@typing.overload
def pop(self, key: str, /, default: T) -> str | T: ...
def pop(self, key, /, default=NO_DEFAULT):
key = key.title()
if default is NO_DEFAULT:
self.__sensitive_map.pop(key)
return super().pop(key)
self.__sensitive_map.pop(key, default)
return super().pop(key, default)
def popitem(self) -> tuple[str, str]:
self.__sensitive_map.popitem()
return super().popitem()
@typing.overload
def setdefault(self, key: str, /) -> str: ...
@typing.overload
def setdefault(self, key: str, /, default) -> str: ...
def setdefault(self, key, /, default=None) -> str:
key = key.title()
if key in self.__sensitive_map:
return super().__getitem__(key)
self[key] = default or ''
return self[key]
def update(self, other, /, **kwargs) -> None:
if isinstance(other, type(self)):
other = other.sensitive()
if isinstance(other, collections.abc.Mapping):
for key, value in other.items():
self[key] = value
elif hasattr(other, 'keys'):
for key in other.keys(): # noqa: SIM118
self[key] = other[key]
else:
for key, value in other:
self[key] = value
for key, value in kwargs.items():
self[key] = value
std_headers = HTTPHeaderDict({