Spaces:
Sleeping
Sleeping
import os.path | |
import base64 | |
import json | |
import pickle | |
from google.auth.transport.requests import Request | |
from google.oauth2.credentials import Credentials | |
from google_auth_oauthlib.flow import InstalledAppFlow | |
from googleapiclient.discovery import build | |
from googleapiclient.errors import HttpError | |
# If modifying these SCOPES, delete the file token.json. | |
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly'] | |
def authenticate_gmail(): | |
"""Shows basic usage of the Gmail API. | |
Lists the user's Gmail labels. | |
""" | |
creds = None | |
# The file token.json stores the user's access and refresh tokens, and is | |
# created automatically when the authorization flow completes for the first time. | |
if os.path.exists('token.json'): | |
creds = Credentials.from_authorized_user_file('token.json', SCOPES) | |
# If there are no (valid) credentials available, let the user log in. | |
if not creds or not creds.valid: | |
if creds and creds.expired and creds.refresh_token: | |
creds.refresh(Request()) | |
else: | |
flow = InstalledAppFlow.from_client_secrets_file( | |
'Credentials.json', SCOPES) | |
creds = flow.run_local_server(port=0) | |
# Save the credentials for the next run | |
with open('token.json', 'w') as token: | |
token.write(creds.to_json()) | |
try: | |
# Call the Gmail API | |
service = build('gmail', 'v1', credentials=creds) | |
return service | |
except HttpError as error: | |
print(f'An error occurred: {error}') | |
return None | |
def get_latest_email(service): | |
"""Get the latest email from the inbox""" | |
try: | |
# Retrieve a list of messages | |
results = service.users().messages().list(userId='me', maxResults=1).execute() | |
messages = results.get('messages', []) | |
if not messages: | |
print('No messages found.') | |
return | |
# Get the message ID | |
message_id = messages[0]['id'] | |
# Fetch the email content by message ID | |
message = service.users().messages().get(userId='me', id=message_id, format='full').execute() | |
# Get the email payload | |
payload = message['payload'] | |
headers = payload.get('headers', []) | |
# Print the sender, subject, and body of the email | |
for header in headers: | |
if header['name'] == 'From': | |
print(f"From: {header['value']}") | |
if header['name'] == 'Subject': | |
print(f"Subject: {header['value']}") | |
# Extract the body | |
parts = payload.get('parts', []) | |
for part in parts: | |
if part.get('mimeType') == 'text/plain': | |
body = part.get('body', {}).get('data', '') | |
body_decoded = base64.urlsafe_b64decode(body).decode('utf-8') | |
print(f"Body: {body_decoded}") | |
except HttpError as error: | |
print(f'An error occurred: {error}') | |
if __name__ == '__main__': | |
service = authenticate_gmail() | |
print(service) | |
if service: | |
get_latest_email(service) | |