summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMagnus Hagander2025-12-12 17:07:17 +0000
committerMagnus Hagander2025-12-12 17:07:17 +0000
commit10dc982c1ff423b6eab488b7c2b7fb85ed015acc (patch)
tree556745fef0c456ab1ea21525bb9d0e1431e38a3b
parentac561c65087018ad52f08b4cc3f6c57d75f428d0 (diff)
Move token handling for youtube into a separate file
-rwxr-xr-xtools/videos/sync_videos.py70
-rw-r--r--tools/videos/videoutil.py73
2 files changed, 74 insertions, 69 deletions
diff --git a/tools/videos/sync_videos.py b/tools/videos/sync_videos.py
index 24f25d6c..80cc02f6 100755
--- a/tools/videos/sync_videos.py
+++ b/tools/videos/sync_videos.py
@@ -7,76 +7,8 @@ import json
import os
import requests
import sys
-import time
-import urllib
-
-def get_current_token():
- if not os.path.isfile('.google_keys.json'):
- print("You need to set up a Google cloud app and store the keys in a file called .google-keys.json in the current directory")
- print("Do NOT commit this file to a git repo, of course!")
- sys.exit(1)
-
- with open('.google_keys.json') as f:
- g = json.load(f)
-
- tokenchanged = False
- if os.path.isfile('.google_tokens.json'):
- with open('.google_tokens.json') as f:
- tokens = json.load(f)
- tokenexpires = int(tokens.get('access_token_expires_at', '0'))
- if tokenexpires < time.time():
- print("Token expired, refreshing...")
- r = requests.post(g['installed']['token_uri'], params={
- 'client_id': g['installed']['client_id'],
- 'client_secret': g['installed']['client_secret'],
- 'grant_type': 'refresh_token',
- 'refresh_token': tokens['refresh_token'],
- })
- r.raise_for_status()
- tokens['access_token'] = r.json()['access_token']
- tokens['expires_in'] = r.json()['expires_in']
- tokenchanged = True
- else:
- # No tokens yet, so initiate login
- url = '{}?{}'.format(
- g['installed']['auth_uri'],
- urllib.parse.urlencode({
- 'scope': 'https://www.googleapis.com/auth/youtube',
- 'response_type': 'code',
- 'client_id': g['installed']['client_id'],
- 'nonce': 'abc123def',
- 'state': 'def456abc',
- 'redirect_uri': 'http://localhost:1/oauth_receive',
- }))
- print("Login is needed! Please go to the following URL and grant access!")
- print(url)
- print("Once redirected back, please paste the URL you received here:")
- while True:
- url = input('URL: ')
- if not url:
- continue
- try:
- code = urllib.parse.parse_qs(urllib.parse.urlparse(url).query)['code'][0]
- break
- except Exception as e:
- print("Failed to parse: {}".format(e))
- r = requests.post(g['installed']['token_uri'], params={
- 'client_id': g['installed']['client_id'],
- 'client_secret': g['installed']['client_secret'],
- 'grant_type': 'authorization_code',
- 'code': code,
- 'redirect_uri': 'http://localhost:1/oauth_receive',
- })
- r.raise_for_status()
- tokens = r.json()
- tokenchanged = True
- if tokenchanged:
- tokens['access_token_expires_at'] = time.time() + int(tokens['expires_in'])
- with open('.google_tokens.json', 'w') as f:
- json.dump(tokens, f)
-
- return tokens['access_token']
+from videoutil import get_current_token
def get_videos_in_playlist(args, sess):
diff --git a/tools/videos/videoutil.py b/tools/videos/videoutil.py
new file mode 100644
index 00000000..5a65a8d3
--- /dev/null
+++ b/tools/videos/videoutil.py
@@ -0,0 +1,73 @@
+import json
+import os
+import requests
+import sys
+import time
+
+
+def get_current_token():
+ if not os.path.isfile('.google_keys.json'):
+ print("You need to set up a Google cloud app and store the keys in a file called .google-keys.json in the current directory")
+ print("Do NOT commit this file to a git repo, of course!")
+ sys.exit(1)
+
+ with open('.google_keys.json') as f:
+ g = json.load(f)
+
+ tokenchanged = False
+ if os.path.isfile('.google_tokens.json'):
+ with open('.google_tokens.json') as f:
+ tokens = json.load(f)
+ tokenexpires = int(tokens.get('access_token_expires_at', '0'))
+ if tokenexpires < time.time():
+ print("Token expired, refreshing...")
+ r = requests.post(g['installed']['token_uri'], params={
+ 'client_id': g['installed']['client_id'],
+ 'client_secret': g['installed']['client_secret'],
+ 'grant_type': 'refresh_token',
+ 'refresh_token': tokens['refresh_token'],
+ })
+ r.raise_for_status()
+ tokens['access_token'] = r.json()['access_token']
+ tokens['expires_in'] = r.json()['expires_in']
+ tokenchanged = True
+ else:
+ # No tokens yet, so initiate login
+ url = '{}?{}'.format(
+ g['installed']['auth_uri'],
+ urllib.parse.urlencode({
+ 'scope': 'https://www.googleapis.com/auth/youtube',
+ 'response_type': 'code',
+ 'client_id': g['installed']['client_id'],
+ 'nonce': 'abc123def',
+ 'state': 'def456abc',
+ 'redirect_uri': 'http://localhost:1/oauth_receive',
+ }))
+ print("Login is needed! Please go to the following URL and grant access!")
+ print(url)
+ print("Once redirected back, please paste the URL you received here:")
+ while True:
+ url = input('URL: ')
+ if not url:
+ continue
+ try:
+ code = urllib.parse.parse_qs(urllib.parse.urlparse(url).query)['code'][0]
+ break
+ except Exception as e:
+ print("Failed to parse: {}".format(e))
+ r = requests.post(g['installed']['token_uri'], params={
+ 'client_id': g['installed']['client_id'],
+ 'client_secret': g['installed']['client_secret'],
+ 'grant_type': 'authorization_code',
+ 'code': code,
+ 'redirect_uri': 'http://localhost:1/oauth_receive',
+ })
+ r.raise_for_status()
+ tokens = r.json()
+ tokenchanged = True
+ if tokenchanged:
+ tokens['access_token_expires_at'] = time.time() + int(tokens['expires_in'])
+ with open('.google_tokens.json', 'w') as f:
+ json.dump(tokens, f)
+
+ return tokens['access_token']