from django.http import HttpResponse, Http404, HttpResponseRedirect from django import forms from django.conf import settings from django.shortcuts import get_object_or_404 from django.utils.timesince import timeuntil from django.utils import timezone from datetime import datetime, timedelta import re import requests import requests_oauthlib import time from postgresqleu.util.forms import SubmitButtonField from postgresqleu.util.oauthapps import get_oauth_client, get_oauth_secret from postgresqleu.util.random import generate_random_token from postgresqleu.util.time import datetime_string from postgresqleu.util.widgets import StaticTextWidget from postgresqleu.confreg.models import MessagingProvider from postgresqleu.confreg.backendforms import BackendSeriesMessagingForm # Scopes to request when fetching token LINKEDIN_SCOPE = 'r_organization_social,w_organization_social' class LinkedinBackendForm(BackendSeriesMessagingForm): linkedininfo = forms.CharField(widget=StaticTextWidget, label="Account information", required=False) pageid = forms.IntegerField(required=True, help_text="The id number of the page to post to. Can be retrieved from the URL of the page.", label='Page ID') exclude_fields_from_validation = ['linkedininfo', ] @property def config_fields(self): f = ['linkedininfo', ] if self.instance.config.get('token', None): return f + ['pageid', ] else: return f @property def config_fieldsets(self): return [ {'id': 'linkedin', 'legend': 'Linkedin', 'fields': self.config_fields}, ] @property def config_readonly_fields(self): return ['linkedininfo', ] def fix_fields(self): super().fix_fields() auth_url, state = requests_oauthlib.OAuth2Session( get_oauth_client('https://api.linkedin.com'), redirect_uri='{}/oauth_return/messaging/'.format(settings.SITEBASE), state='{}_{}'.format(self.instance.id, generate_random_token()), scope=LINKEDIN_SCOPE, ).authorization_url('https://www.linkedin.com/oauth/v2/authorization') if self.instance.config.get('token', None): tokenexpires = timezone.make_aware(datetime.fromtimestamp(self.instance.config.get('token_expires'))) refreshtokenexpires = timezone.make_aware(datetime.fromtimestamp(self.instance.config.get('refresh_token_expires'))) self.initial.update({ 'linkedininfo': 'Connected to Linkedin account.
Current access token will expire in {} (on {}), and will be automatically renewed until {}, after which it must be manually re-authenticated.
To re-athenticate, click this link and log in with a Linkedin account with the appropriate permissions.'.format( timeuntil(tokenexpires), datetime_string(tokenexpires), datetime_string(refreshtokenexpires), auth_url, ), }) else: self.remove_field('pageid') if not self.instance.id: self.initial.update({ 'linkedininfo': 'Not connected. Save the provider first, and then return here to configure.', }) else: # Create an oauth URL for access # (XXX: we don't store the state for verification here, which maybe we should, but we don't care that much here) self.initial.update({ 'linkedininfo': 'Not connected. Please follow this link and authorize access to Linkedin.'.format(auth_url), }) def clean(self): d = super().clean() if d.get('active', False): if not d.get('pageid', 0): self.add_error('active', 'Cannot activate without a pageid') # Unfortunately, the linkedin api gives us no good way to validate that we have access to a page. If we could # work without a rate limit, we could create an unpublished post and then delete it, but with the default rate limiting that # is very wasteful so we skip it for now. return d class Linkedin(object): provider_form_class = LinkedinBackendForm can_process_incoming = False can_broadcast = True can_notification = False direct_message_max_length = None typename = 'Linkedin' max_post_length = 3000 re_adminpage_url = re.compile(r'https://www.linkedin.com/company/(\d+)/.*') re_publicpage_url = re.compile(r'https://www.linkedin.com/company/([^/]+)/.*') @classmethod def can_track_users_for(self, whatfor): return False @classmethod def validate_baseurl(self, baseurl): return None @classmethod def clean_identifier_form_value(self, whatfor, value): raise Exception("Not implemented") @classmethod def get_link_from_identifier(self, value): return 'https://linkedin.com/company/{}'.format(value) def __init__(self, id, config): self.providerid = id self.providerconfig = config self._sess = None if 'pageid' in self.providerconfig: self.urn = 'urn:li:organization:{}'.format(self.providerconfig['pageid']) else: self.urn = None @property def sess(self): if self._sess is None: self._sess = requests.Session() self._sess.headers.update({ 'Authorization': 'Bearer {}'.format(self.providerconfig['token']), 'LinkedIn-Version': '202505', }) return self._sess def _api_url(self, url): return 'https://api.linkedin.com/{}'.format(url) def _get(self, url, *args, **kwargs): return self.sess.get(self._api_url(url), timeout=30, *args, **kwargs) def _post(self, url, *args, **kwargs): return self.sess.post(self._api_url(url), timeout=30, *args, **kwargs) def oauth_return(self, request): try: tokens = requests_oauthlib.OAuth2Session( get_oauth_client('https://api.linkedin.com'), redirect_uri='{}/oauth_return/messaging/'.format(settings.SITEBASE), state='{}_{}'.format(self.providerid, generate_random_token()), scope=LINKEDIN_SCOPE, ).fetch_token( 'https://www.linkedin.com/oauth/v2/accessToken', code=request.GET['code'], include_client_id=True, client_secret=get_oauth_secret('https://api.linkedin.com'), scopes=LINKEDIN_SCOPE, ) except Exception as e: return 'Could not fetch token: {}'.format(e) m = get_object_or_404(MessagingProvider, pk=self.providerid) m.config.update({ 'token': tokens['access_token'], 'refresh_token': tokens['refresh_token'], 'token_expires': int(tokens['expires_in'] + time.time()), 'refresh_token_expires': int(tokens['refresh_token_expires_in'] + time.time()), }) m.save(update_fields=['config']) def post(self, text, image=None, replytotweetid=None): if not self.urn: # Can't post without an URN return d = { 'author': self.urn, 'commentary': text, 'visibility': 'PUBLIC', 'distribution': { 'feedDistribution': 'MAIN_FEED', }, 'lifecycleState': 'PUBLISHED', } if image: # Initiate multi-step image upload r = self._post('rest/images', params={'action': 'initializeUpload'}, json={ 'initializeUploadRequest': { 'owner': self.urn, } }) if r.status_code != 200: return (None, 'Failed to initialize image upload: {}'.format(r.text)) ir = self.sess.put(r.json()['value']['uploadUrl'], bytearray(image), timeout=60) if ir.status_code != 201: return (None, 'Failed to upload image: {}'.format(ir.text)) # Upload complete, we can use it! d['content'] = { 'media': { 'id': r.json()['value']['image'], }, } r = self._post('rest/posts', json=d) if r.status_code != 201: return (None, r.text) # Format of id is urn:li:share:7196857142283268096 return (r.headers['x-linkedin-id'], None) def repost(self, tweetid): raise Exception("Not implemented") def send_direct_message(self, recipient_config, msg): raise Exception("Not implemented") def poll_public_posts(self, lastpoll, checkpoint): raise Exception("Not implemented") def poll_incoming_private_messages(self, lastpoll, checkpoint): raise Exception("Not implemented") def get_regconfig_from_dm(self, dm): raise Exception("Not implemented") def get_regdisplayname_from_config(self, config): raise Exception("Not implemented") def get_public_url(self, post): return 'https://www.linkedin.com/feed/update/urn:li:share:{}/'.format(post.statusid) def get_attendee_string(self, token, messaging, attendeeconfig): raise Exception("Not implemented") def refresh_access_token(self): r = requests.post('https://www.linkedin.com/oauth/v2/accessToken', data={ 'grant_type': 'refresh_token', 'refresh_token': self.providerconfig['refresh_token'], 'client_id': get_oauth_client('https://api.linkedin.com'), 'client_secret': get_oauth_secret('https://api.linkedin.com'), }, timeout=30) if r.status_code == 200: tokens = r.json() provider = MessagingProvider.objects.get(pk=self.providerid) provider.config.update({ 'token': tokens['access_token'], 'refresh_token': tokens['refresh_token'], 'token_expires': int(tokens['expires_in'] + time.time()), 'refresh_token_expires': int(tokens['refresh_token_expires_in'] + time.time()), }) provider.save(update_fields=['config']) self.providerconfig = provider.config return True, None return False, r.text def check_messaging_config(self, state): tokenexpires = timezone.make_aware(datetime.fromtimestamp(self.providerconfig.get('token_expires'))) refreshtokenexpires = timezone.make_aware(datetime.fromtimestamp(self.providerconfig.get('refresh_token_expires'))) if tokenexpires < timezone.now() + timedelta(days=10): # We start trying to refresh when there are 10 days to go if refreshtokenexpires < timezone.now() + timedelta(days=1): # We add one day margin here return False, "Refresh token has expired, re-authentication needed." # Attempt to refresh ok, err = self.refresh_access_token() if ok: return True, "Access token refreshed, new token valid until {}.".format(timezone.make_aware(datetime.fromtimestamp(self.providerconfig.get('token_expires')))) else: return False, "Access token refresh failed: {}".format(err) if refreshtokenexpires < timezone.now() + timedelta(days=10): return True, "Refresh token will expire in {} (on {}), manual re-authentication needed!".format( timeuntil(refresh_token_expires), refresh_token_expires, ) # Token not expired or about to, so verify that what we have works. r = self._get( 'rest/posts', params={ 'author': self.urn, 'q': 'author', 'count': 1, }, ) if r.status_code != 200: return False, "Failed to get post (status {}): {}".format(r.status_code, r.text) # We can't really check that there is at least one post returned, because there might # be no posts available. But at least this way we have verified that the token is OK. return True, '' def get_link(self, id): return [ 'linkedin', 'https://www.linkedin.com/feed/update/{}/'.format(id), ]