diff options
Diffstat (limited to 'postgresqleu')
42 files changed, 2983 insertions, 523 deletions
diff --git a/postgresqleu/confreg/backendforms.py b/postgresqleu/confreg/backendforms.py index dfb5a8e9..717317bb 100644 --- a/postgresqleu/confreg/backendforms.py +++ b/postgresqleu/confreg/backendforms.py @@ -2,6 +2,7 @@ from django.core.exceptions import ValidationError from django.core.validators import MinValueValidator, MaxValueValidator from django.db.models import Q from django.db.models.expressions import F +from django.contrib import messages import django.forms import django.forms.widgets from django.utils.safestring import mark_safe @@ -16,11 +17,13 @@ from decimal import Decimal import pytz from postgresqleu.util.db import exec_to_single_list, exec_to_scalar -from postgresqleu.util.forms import ConcurrentProtectedModelForm +from postgresqleu.util.forms import ConcurrentProtectedModelForm, SelectSetValueField from postgresqleu.util.widgets import StaticTextWidget, EmailTextWidget, MonospaceTextarea from postgresqleu.util.widgets import TagOptionsTextWidget from postgresqleu.util.random import generate_random_token from postgresqleu.util.backendforms import BackendForm +from postgresqleu.util.messaging import messaging_implementation_choices, get_messaging, get_messaging_class +from postgresqleu.util.messaging.util import get_shortened_post_length import postgresqleu.accounting.models @@ -35,6 +38,8 @@ from postgresqleu.confreg.models import ConferenceNews from postgresqleu.confreg.models import ConferenceTweetQueue, ConferenceHashtag from postgresqleu.confreg.models import ShirtSize from postgresqleu.confreg.models import RefundPattern +from postgresqleu.confreg.models import ConferenceMessaging +from postgresqleu.confreg.models import MessagingProvider from postgresqleu.newsevents.models import NewsPosterProfile from postgresqleu.confreg.models import valid_status_transitions, get_status_string @@ -948,6 +953,165 @@ class BackendNewsForm(BackendForm): self.fields['title'].help_text = 'Note! Title will be prefixed with "{0} - " on shared frontpage and RSS!'.format(self.conference.conferencename) +class BackendMessagingForm(BackendForm): + helplink = 'integrations#messaging' + list_fields = ['providername', 'broadcast', 'privatebcast', 'notification', 'orgnotification'] + verbose_field_names = { + 'providername': 'Provider name', + } + queryset_extra_fields = { + 'providername': '(SELECT internalname FROM confreg_messagingprovider WHERE id=confreg_conferencemessaging.provider_id)', + } + + @property + def fieldsets(self): + fs = [ + {'id': 'provider', 'legend': 'Provider', 'fields': ['provider', ]}, + {'id': 'actions', 'legend': 'Actions', 'fields': ['broadcast', 'privatebcast', 'notification', 'orgnotification']}, + ] + cf = list(self._channel_fields()) + if cf: + fs.append( + {'id': 'channels', 'legend': 'Channels/Groups', 'fields': list(self._channel_fields())} + ) + return fs + + class Meta: + model = ConferenceMessaging + fields = ['provider', 'broadcast', 'privatebcast', 'notification', 'orgnotification', ] + + def _channel_fields(self): + for fld in ('privatebcast', 'orgnotification'): + if getattr(self.impl, 'can_{}'.format(fld), False): + yield '{}channel'.format(fld) + + @property + def readonly_fields(self): + yield 'provider' + for fld in ('broadcast', 'privatebcast', 'notification', 'orgnotification'): + if not getattr(self.impl, 'can_{}'.format(fld), False): + yield fld + yield from self._channel_fields() + + def __init__(self, *args, **kwargs): + self.impl = get_messaging(kwargs['instance'].provider) + if hasattr(self.impl, 'refresh_messaging_config'): + if self.impl.refresh_messaging_config(kwargs['instance'].config): + kwargs['instance'].save(update_fields=['config']) + super().__init__(*args, **kwargs) + + _channel_fieldnames = { + 'privatebcast': 'Private broadcast channel', + 'orgnotification': 'Organisation notification channel', + } + + def fix_fields(self): + super().fix_fields() + self.fields['provider'].widget.attrs['disabled'] = True + self.fields['provider'].required = False + + # Update the different types of supported fields + for fld in ('broadcast', 'privatebcast', 'notification', 'orgnotification'): + if not getattr(self.impl, 'can_{}'.format(fld), False): + self.fields[fld].widget.attrs['disabled'] = True + self.fields[fld].help_text = 'Action is not supported by this provider' + + for fld in ('privatebcast', 'orgnotification'): + if getattr(self.impl, 'can_{}'.format(fld), False): + self.fields['{}channel'.format(fld)] = self.impl.get_channel_field(self.instance, fld) + self.fields['{}channel'.format(fld)].label = self._channel_fieldnames[fld] + self.fields['{}channel'.format(fld)].required = False + + +class BackendSeriesMessagingNewForm(django.forms.Form): + helplink = 'integrations#provider' + classname = SelectSetValueField(choices=messaging_implementation_choices(), + setvaluefield='baseurl', label='Implementation class') + baseurl = django.forms.URLField(label='Base URL') + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + def get_newform_data(self): + return '{}:{}'.format(self.cleaned_data['classname'], self.cleaned_data['baseurl']) + + def clean_baseurl(self): + return self.cleaned_data['baseurl'].rstrip('/') + + def clean(self): + d = super().clean() + if 'classname' in d and 'baseurl' in d: + # Both fields specified, so verify they're an allowed combination + r = get_messaging_class(d['classname']).validate_baseurl(d['baseurl']) + if r: + self.add_error('baseurl', r) + return d + + +class BackendSeriesMessagingForm(BackendForm): + helplink = 'integrations#provider' + list_fields = ['internalname', 'publicname', 'active', 'route_incoming', 'classname_short', ] + queryset_select_related = ['route_incoming', ] + form_before_new = BackendSeriesMessagingNewForm + verbose_field_names = { + 'classname_short': 'Implementation', + } + queryset_extra_fields = { + 'classname_short': r"substring(classname, '[^\.]+$')", + } + auto_cascade_delete_to = ['conferencemessaging', ] + + config_fields = [] + config_fieldsets = [] + config_readonly_fields = [] + + process_incoming = False + no_incoming_processing = False + + class Meta: + model = MessagingProvider + fields = ['internalname', 'publicname', 'active', 'classname', 'route_incoming'] + + @property + def readonly_fields(self): + return ['classname', ] + self.config_readonly_fields + + @property + def json_form_fields(self): + return { + 'config': self.config_fields, + } + + @property + def fieldsets(self): + fs = [ + {'id': 'common', 'legend': 'Common', 'fields': ['internalname', 'publicname', 'active', 'classname'], }, + ] + if self.process_incoming: + fs.append( + {'id': 'incoming', 'legend': 'Incoming', 'fields': ['route_incoming', ], } + ) + + return fs + self.config_fieldsets + + def fix_fields(self): + super().fix_fields() + if self.newformdata: + classname, baseurl = self.newformdata.split(':', 1) + self.instance.classname = classname + self.initial['classname'] = classname + self.baseurl = baseurl + + impl = get_messaging_class(self.instance.classname) + if getattr(impl, 'can_process_incoming', False) and not self.no_incoming_processing: + self.process_incoming = True + self.fields['route_incoming'].queryset = Conference.objects.filter(series=self.instance.series) + self.fields['route_incoming'].help_text = 'Incoming messages from this provider will be added to the specified conference' + else: + del self.fields['route_incoming'] + self.update_protected_fields() + + # # Form to pick a conference to copy from # @@ -964,19 +1128,8 @@ class BackendCopySelectConferenceForm(django.forms.Form): # # Form for twitter integration # -class TwitterForm(ConcurrentProtectedModelForm): - class Meta: - model = Conference - fields = ['twittersync_active', 'twitterincoming_active', 'twitterreminders_active', 'twitter_postpolicy'] - - -class TwitterTestForm(django.forms.Form): - recipient = django.forms.CharField(max_length=64) - message = django.forms.CharField(max_length=200) - - class BackendTweetQueueForm(BackendForm): - helplink = 'integrations#twitter' + helplink = 'integrations#broadcast' list_fields = ['datetime', 'contents', 'author', 'approved', 'approvedby', 'sent', 'hasimage', ] verbose_field_names = { 'hasimage': 'Has image', @@ -989,6 +1142,7 @@ class BackendTweetQueueForm(BackendForm): queryset_extra_fields = { 'hasimage': "image is not null and image != ''", } + auto_cascade_delete_to = ['conferencetweetqueue_remainingtosend', ] class Meta: model = ConferenceTweetQueue @@ -1004,18 +1158,32 @@ class BackendTweetQueueForm(BackendForm): self.fields['contents'].widget.attrs['class'] += " textarea-with-charcount" else: self.fields['contents'].widget.attrs['class'] = "textarea-with-charcount" + self.fields['contents'].widget.attrs['data-length-function'] = 'shortened_post_length' + + lengthstr = 'Maximum lengths are: {}'.format(', '.join(['{}: {}'.format(mess.provider.internalname, get_messaging(mess.provider).max_post_length) for mess in self.conference.conferencemessaging_set.select_related('provider').filter(broadcast=True, provider__active=True)])) + + self.fields['contents'].help_text = lengthstr def clean_datetime(self): if self.instance: t = self.cleaned_data['datetime'].time() - if self.conference.twitter_timewindow_start and self.conference.twitter_timewindow_start != datetime.time(0, 0, 0): + if self.conference and self.conference.twitter_timewindow_start and self.conference.twitter_timewindow_start != datetime.time(0, 0, 0): if t < self.conference.twitter_timewindow_start: raise ValidationError("Tweets for this conference cannot be scheduled before {}".format(self.conference.twitter_timewindow_start)) - if self.conference.twitter_timewindow_end: + if self.conference and self.conference.twitter_timewindow_end: if t > self.conference.twitter_timewindow_end and self.conference.twitter_timewindow_end != datetime.time(0, 0, 0): raise ValidationError("Tweets for this conference cannot be scheduled after {}".format(self.conference.twitter_timewindow_end)) return self.cleaned_data['datetime'] + def clean_contents(self): + d = self.cleaned_data['contents'] + shortlen = get_shortened_post_length(d) + for mess in self.conference.conferencemessaging_set.select_related('provider').filter(broadcast=True, provider__active=True): + impl = get_messaging(mess.provider) + if shortlen > impl.max_post_length: + messages.warning(self.request, "Post will be truncated to {} characters on {}".format(impl.max_post_length, mess.provider.internalname)) + return d + @classmethod def get_assignable_columns(cls, conference): return [ @@ -1034,16 +1202,24 @@ class BackendTweetQueueForm(BackendForm): @classmethod def get_column_filters(cls, conference): - return { - 'Author': exec_to_single_list('SELECT DISTINCT username FROM confreg_conferencetweetqueue q INNER JOIN auth_user u ON u.id=q.author_id WHERE q.conference_id=%(confid)s', {'confid': conference.id, }), - 'Approved': ['true', 'false'], - 'Approved by': exec_to_single_list('SELECT DISTINCT username FROM confreg_conferencetweetqueue q INNER JOIN auth_user u ON u.id=q.approvedby_id WHERE q.conference_id=%(confid)s', {'confid': conference.id, }), - 'Sent': ['true', 'false'], - } + if conference: + return { + 'Author': exec_to_single_list('SELECT DISTINCT username FROM confreg_conferencetweetqueue q INNER JOIN auth_user u ON u.id=q.author_id WHERE q.conference_id=%(confid)s', {'confid': conference.id, }), + 'Approved': ['true', 'false'], + 'Approved by': exec_to_single_list('SELECT DISTINCT username FROM confreg_conferencetweetqueue q INNER JOIN auth_user u ON u.id=q.approvedby_id WHERE q.conference_id=%(confid)s', {'confid': conference.id, }), + 'Sent': ['true', 'false'], + } + else: + return { + 'Author': exec_to_single_list('SELECT DISTINCT username FROM confreg_conferencetweetqueue q INNER JOIN auth_user u ON u.id=q.author_id WHERE q.conference_id IS NULL'), + 'Approved': ['true', 'false'], + 'Approved by': exec_to_single_list('SELECT DISTINCT username FROM confreg_conferencetweetqueue q INNER JOIN auth_user u ON u.id=q.approvedby_id WHERE q.conference_id IS NULL'), + 'Sent': ['true', 'false'], + } class BackendHashtagForm(BackendForm): - helplink = 'integrations#twitter' + helplink = 'integrations#broadcast' list_fields = ['hashtag', ] class Meta: @@ -1182,3 +1358,13 @@ class BackendSendEmailForm(django.forms.Form): raise ValidationError("Maximum length of subject is {}, to leave room for prefix. You entered {} characters.".format(maxlen, len(self.cleaned_data['subject']))) return self.cleaned_data['subject'] + + +class BackendRegistrationDmForm(django.forms.Form): + message = django.forms.CharField(max_length=500, required=True) + + def __init__(self, maxlength, *args, **kwargs): + super().__init__(*args, **kwargs) + if maxlength: + self.fields['message'].max_length = maxlength + self.fields['message'].help_text = 'Maximum message length for this provider is {} characters.'.format(maxlength) diff --git a/postgresqleu/confreg/backendviews.py b/postgresqleu/confreg/backendviews.py index 76825289..ac5a29ab 100644 --- a/postgresqleu/confreg/backendviews.py +++ b/postgresqleu/confreg/backendviews.py @@ -1,6 +1,6 @@ from django.shortcuts import render, get_object_or_404 from django.utils.html import escape -from django.db import transaction +from django.db import transaction, connection from django.db.models import Count from django.core.exceptions import PermissionDenied from django.core.serializers.json import DjangoJSONEncoder @@ -16,9 +16,11 @@ from collections import OrderedDict from postgresqleu.util.db import exec_to_list, exec_to_dict, exec_no_result, exec_to_scalar from postgresqleu.util.decorators import superuser_required from postgresqleu.util.messaging.twitter import Twitter, TwitterSetup +from postgresqleu.util.messaging import messaging_implementations, get_messaging_class +from postgresqleu.util.messaging.util import send_reg_direct_message from postgresqleu.util.backendviews import backend_list_editor, backend_process_form +from postgresqleu.confreg.util import get_authenticated_conference, get_authenticated_series from postgresqleu.util.request import get_int_or_error -from postgresqleu.confreg.util import get_authenticated_conference from .jinjafunc import JINJA_TEMPLATE_ROOT from .jinjapdf import render_jinja_ticket, render_jinja_badges @@ -32,6 +34,7 @@ from .models import AccessToken from .models import ShirtSize from .models import PendingAdditionalOrder from .models import ConferenceTweetQueue +from .models import MessagingProvider from postgresqleu.invoices.models import Invoice from postgresqleu.invoices.util import InvoiceManager @@ -49,13 +52,16 @@ from .backendforms import BackendAccessTokenForm from .backendforms import BackendConferenceSeriesForm from .backendforms import BackendTshirtSizeForm from .backendforms import BackendNewsForm -from .backendforms import TwitterForm, TwitterTestForm, BackendTweetQueueForm, BackendHashtagForm +from .backendforms import BackendTweetQueueForm, BackendHashtagForm from .backendforms import TweetCampaignSelectForm from .backendforms import BackendSendEmailForm from .backendforms import BackendRefundPatternForm from .backendforms import ConferenceInvoiceCancelForm from .backendforms import PurchasedVoucherRefundForm from .backendforms import BulkPaymentRefundForm +from .backendforms import BackendMessagingForm +from .backendforms import BackendSeriesMessagingForm +from .backendforms import BackendRegistrationDmForm from .campaigns import get_campaign_from_id @@ -284,6 +290,76 @@ def edit_hashtags(request, urlname, rest): ) +def edit_messaging(request, urlname, rest): + conference = get_authenticated_conference(request, urlname) + # How about this for ugly :) Make sure this conference has an instance for every + # available messaging on the series. + with connection.cursor() as curs: + curs.execute( + """INSERT INTO confreg_conferencemessaging (conference_id, provider_id, broadcast, privatebcast, notification, orgnotification, config) +SELECT %(confid)s, id, false, false, false, false, '{}' +FROM confreg_messagingprovider mp +WHERE mp.series_id=%(seriesid)s AND NOT EXISTS ( + SELECT 1 FROM confreg_conferencemessaging m2 WHERE m2.conference_id=%(confid)s AND m2.provider_id=mp.id +)""", + { + 'confid': conference.id, + 'seriesid': conference.series_id, + }) + + return backend_list_editor(request, + urlname, + BackendMessagingForm, + rest, + conference=conference, + allow_new=False, + allow_delete=False, + ) + + +def edit_series_messaging(request, seriesid, rest): + series = get_authenticated_series(request, seriesid) + + def _load_messaging_formclass(classname): + return getattr(get_messaging_class(classname), 'provider_form_class', BackendSeriesMessagingForm) + + formclass = BackendSeriesMessagingForm + u = rest and rest.rstrip('/') or rest + if u and u != '' and u.isdigit(): + # Editing an existing one, so pick the correct subclass! + provider = get_object_or_404(MessagingProvider, pk=u, series=series) + formclass = _load_messaging_formclass(provider.classname) + elif u == 'new': + if '_newformdata' in request.POST or 'classname' in request.POST: + if '_newformdata' in request.POST: + c = request.POST['_newformdata'].split(':')[0] + else: + c = request.POST['classname'] + + if c not in messaging_implementations: + raise PermissionDenied() + + formclass = _load_messaging_formclass(c) + + # Note! Sync with newsevents/backendviews.py + formclass.no_incoming_processing = False + formclass.verbose_name = 'messaging provider' + formclass.verbose_name_plural = 'messaging providers' + + return backend_list_editor(request, + None, + formclass, + rest, + bypass_conference_filter=True, + object_queryset=MessagingProvider.objects.filter(series=series), + instancemaker=lambda: MessagingProvider(series=series), + breadcrumbs=[ + ('/events/admin/', 'Series'), + ('/events/admin/_series/{}/'.format(series.id), series.name), + ] + ) + + ### # Non-simple-editor views ### @@ -525,102 +601,6 @@ FROM confreg_conferenceregistration WHERE conference_id=%(confid)s""", { }) -@transaction.atomic -def twitter_integration(request, urlname): - conference = get_authenticated_conference(request, urlname) - - if request.method == 'POST': - if request.POST.get('activate_twitter', '') == '1': - # Fetch the oauth codes and re-render the form - try: - (auth_url, ownerkey, ownersecret) = TwitterSetup.get_authorization_data() - request.session['ownerkey'] = ownerkey - request.session['ownersecret'] = ownersecret - except Exception as e: - messages.error(request, 'Failed to talk to twitter: %s' % e) - return HttpResponseRedirect('.') - - return render(request, 'confreg/admin_integ_twitter.html', { - 'conference': conference, - 'twitter_token_url': auth_url, - 'helplink': 'integrations#twitter', - }) - elif request.POST.get('pincode', ''): - if not ('ownerkey' in request.session and 'ownersecret' in request.session): - messages.error(request, 'Missing data in session, cannot continue') - return HttpResponseRedirect('.') - try: - tokens = TwitterSetup.authorize(request.session.pop('ownerkey'), - request.session.pop('ownersecret'), - request.POST.get('pincode'), - ) - except Exception as e: - messages.error(request, 'Failed to get tokens from twitter.') - return HttpResponseRedirect('.') - - conference.twitter_token = tokens.get('oauth_token') - conference.twitter_secret = tokens.get('oauth_token_secret') - conference.twittersync_active = False - conference.twitterincoming_active = False - tw = Twitter(conference) - try: - conference.twitter_user = tw.get_own_screen_name() - except Exception as e: - messages.error(request, 'Failed to verify account credentials and get username: {}'.format(e)) - return HttpResponseRedirect('.') - - conference.save() - messages.info(request, 'Twitter integration enabled') - return HttpResponseRedirect('.') - elif request.POST.get('deactivate_twitter', '') == '1': - conference.twitter_user = '' - conference.twitter_token = '' - conference.twitter_secret = '' - conference.twittersync_active = False - conference.twitterincoming_active = False - conference.save() - messages.info(request, 'Twitter integration disabled') - return HttpResponseRedirect('.') - elif request.POST.get('test_twitter', '') == '1': - testform = TwitterTestForm(data=request.POST) - if testform.is_valid(): - tw = Twitter(conference) - recipient = testform.cleaned_data['recipient'] - message = testform.cleaned_data['message'] - - ok, code, msg = tw.send_message(recipient, message) - if ok: - messages.info(request, 'Message successfully sent to {0}'.format(recipient)) - elif code == 150: - messages.warning(request, 'Cannot send message to users not being followed') - else: - messages.error(request, 'Failed to send to {0}: {1}'.format(recipient, msg)) - return HttpResponseRedirect('.') - form = TwitterForm(instance=conference) - else: - form = TwitterForm(instance=conference, data=request.POST) - if form.is_valid(): - form.save() - return HttpResponseRedirect('.') - else: - form = TwitterForm(instance=conference) - testform = TwitterTestForm() - - if conference.twitter_user: - sameuser = Conference.objects.filter(twitterincoming_active=True, twitter_user=conference.twitter_user).exclude(pk=conference.pk).order_by('urlname') - else: - sameuser = [] - - return render(request, 'confreg/admin_integ_twitter.html', { - 'conference': conference, - 'form': form, - 'testform': testform, - 'conferences_with_same_user': sameuser, - 'twitter_app_configured': settings.TWITTER_CLIENT != '' and settings.TWITTER_CLIENTSECRET != '', - 'helplink': 'integrations#twitter', - }) - - def tweetcampaignselect(request, urlname): conference = get_authenticated_conference(request, urlname) @@ -678,6 +658,15 @@ def tweetcampaign(request, urlname, typeid): }) +def manage_series(request, seriesid): + series = get_authenticated_series(request, seriesid) + + return render(request, 'confreg/admin_dashboard_series.html', { + 'series': series, + 'breadcrumbs': (('/events/admin/', 'Series'),), + }) + + class DelimitedWriter(object): def __init__(self, delimiter): self.delimiter = delimiter @@ -907,3 +896,34 @@ WHERE EXISTS ( AND speaker_id=s.id)""", [('../', 'Conference sessions'), ], ) + + +@transaction.atomic +def registration_dashboard_send_dm(request, urlname, regid): + conference = get_authenticated_conference(request, urlname) + reg = get_object_or_404(ConferenceRegistration, conference=conference, pk=regid) + + if not reg.messaging: + # Should never have the link, but just in case + messages.warning(request, 'This registration has no direct messaging configured') + return HttpResponseRedirect("../") + + maxlength = get_messaging_class(reg.messaging.provider.classname).direct_message_max_length + if request.method == 'POST': + form = BackendRegistrationDmForm(maxlength, data=request.POST) + if form.is_valid(): + send_reg_direct_message(reg, form.cleaned_data['message']) + messages.info(request, "Direct message sent.") + return HttpResponseRedirect("../") + else: + form = BackendRegistrationDmForm(maxlength) + + return render(request, 'confreg/admin_backend_form.html', { + 'conference': conference, + 'basetemplate': 'confreg/confadmin_base.html', + 'form': form, + 'what': 'new direct message', + 'savebutton': 'Send direct message', + 'cancelurl': '../', + 'breadcrumbs': [('../../', 'Registration list'), ('../', reg.fullname)], + }) diff --git a/postgresqleu/confreg/campaigns.py b/postgresqleu/confreg/campaigns.py index 4ca7d3bd..2a23891e 100644 --- a/postgresqleu/confreg/campaigns.py +++ b/postgresqleu/confreg/campaigns.py @@ -7,7 +7,7 @@ from postgresqleu.confreg.jinjafunc import JinjaTemplateValidator, render_sandbo from postgresqleu.util.widgets import MonospaceTextarea from postgresqleu.confreg.models import ConferenceSession, Track -from postgresqleu.confreg.twitter import post_conference_tweet +from postgresqleu.confreg.twitter import post_conference_social import datetime import random @@ -113,11 +113,11 @@ class ApprovedSessionsCampaignForm(BaseCampaignForm): def generate_tweets(self, author): sessions = list(self.get_queryset().order_by('?')) for ts, session in zip(_timestamps_for_tweets(self.conference, self.cleaned_data['starttime'], self.cleaned_data['timebetween'], self.cleaned_data['timerandom'], len(sessions)), sessions): - post_conference_tweet(self.conference, - self.generate_tweet(self.conference, session, self.cleaned_data['content_template']), - approved=False, - posttime=ts, - author=author) + post_conference_social(self.conference, + self.generate_tweet(self.conference, session, self.cleaned_data['content_template']), + approved=False, + posttime=ts, + author=author) class ApprovedSessionsCampaign(object): diff --git a/postgresqleu/confreg/management/commands/confreg_fetch_twitter.py b/postgresqleu/confreg/management/commands/confreg_fetch_twitter.py deleted file mode 100644 index d0eacf5b..00000000 --- a/postgresqleu/confreg/management/commands/confreg_fetch_twitter.py +++ /dev/null @@ -1,82 +0,0 @@ -# -# Fetch incoming twitter posts (if any) -# - -from django.core.management.base import BaseCommand -from django.db import transaction, connection -from django.db.models import Max -from django.conf import settings - -from datetime import datetime, timedelta -import dateutil.parser - -from postgresqleu.confreg.models import Conference, ConferenceIncomingTweet, ConferenceTweetQueue -from postgresqleu.confreg.models import ConferenceIncomingTweetMedia - -from postgresqleu.util.messaging.twitter import Twitter - - -class Command(BaseCommand): - help = 'Fetch incoming tweets' - - class ScheduledJob: - scheduled_interval = timedelta(minutes=5) - - @classmethod - def should_run(self): - return Conference.objects.filter(twitterincoming_active=True) \ - .exclude(twitter_token='') \ - .exclude(twitter_secret='').exists() - - def handle(self, *args, **options): - if not settings.TWITTER_CLIENT or not settings.TWITTER_CLIENTSECRET: - return - - curs = connection.cursor() - curs.execute("SELECT pg_try_advisory_lock(94032416)") - if not curs.fetchall()[0][0]: - raise CommandError("Failed to get advisory lock, existing tweet-fetcher stuck?") - - for conference in Conference.objects.filter(twitterincoming_active=True) \ - .exclude(twitter_token='') \ - .exclude(twitter_secret=''): - tw = Twitter(conference) - - maxid = ConferenceIncomingTweet.objects.filter(conference=conference).aggregate(Max('statusid'))['statusid__max'] - - with transaction.atomic(): - # Fetch anythning incoming - r = tw.get_timeline('mentions', maxid) - if r: - for tj in r: - if ConferenceIncomingTweet.objects.filter(statusid=tj['id']).exists(): - # Just skip duplicates - continue - if ConferenceTweetQueue.objects.filter(tweetid=tj['id']).exists(): - # Also skip if we hit one of our own generated tweets - continue - - it = ConferenceIncomingTweet( - conference=conference, - statusid=tj['id'], - created=dateutil.parser.parse(tj['created_at']), - text=tj['full_text'], - replyto_statusid=tj['in_reply_to_status_id'], - author_name=tj['user']['name'], - author_screenname=tj['user']['screen_name'], - author_id=tj['user']['id'], - author_image_url=tj['user']['profile_image_url_https'], - ) - if tj['is_quote_status'] and 'quoted_status_id' in tj: - # Skip the ones that have no quoted_status_id, which is the case - # when it's a quoted tweet that we don't have permissions on. - it.quoted_statusid = tj['quoted_status_id'] - it.quoted_text = tj['quoted_status']['full_text'] - it.quoted_permalink = tj['quoted_status_permalink'] - it.save() - - if 'media' in tj['entities']: - for seq, m in enumerate(tj['entities']['media']): - ConferenceIncomingTweetMedia(incomingtweet=it, - sequence=seq, - mediaurl=m['media_url_https']).save() diff --git a/postgresqleu/confreg/management/commands/confreg_frequent_reminders.py b/postgresqleu/confreg/management/commands/confreg_frequent_reminders.py index 64d21899..1f835566 100644 --- a/postgresqleu/confreg/management/commands/confreg_frequent_reminders.py +++ b/postgresqleu/confreg/management/commands/confreg_frequent_reminders.py @@ -1,5 +1,5 @@ # -# Send frequent reminders using interfaces like twitter DMs +# Send frequent reminders using direct message # # For now this only means sending a reminder to speakers 10-15 minutes # before their session begins. @@ -10,14 +10,14 @@ from django.db import transaction, connection from django.conf import settings from django.utils import timezone -import sys from datetime import timedelta from postgresqleu.confreg.models import Conference, ConferenceSession from postgresqleu.confreg.models import ConferenceRegistration +from postgresqleu.confreg.util import get_conference_or_404 -from postgresqleu.util.messaging.twitter import Twitter from postgresqleu.util.time import today_global +from postgresqleu.util.messaging.util import send_reg_direct_message, send_private_broadcast class Command(BaseCommand): @@ -25,55 +25,42 @@ class Command(BaseCommand): class ScheduledJob: scheduled_interval = timedelta(minutes=5) + internal = True @classmethod def should_run(self): - if not settings.TWITTER_CLIENT or not settings.TWITTER_CLIENTSECRET: - # If we don't have twitter set up, don't run. - return False - # We check for conferences to run at two days before and two days after to cover # any extreme timezone differences. - return Conference.objects.filter(twitterreminders_active=True, - startdate__lte=today_global() + timedelta(days=2), - enddate__gte=today_global() - timedelta(days=2)) \ - .exclude(twitter_token='') \ - .exclude(twitter_secret='').exists() + return Conference.objects.filter(startdate__lte=today_global() + timedelta(days=2), + enddate__gte=today_global() - timedelta(days=2)).exists() def handle(self, *args, **options): - if not settings.TWITTER_CLIENT or not settings.TWITTER_CLIENTSECRET: - return - - curs = connection.cursor() - curs.execute("SELECT pg_try_advisory_lock(94012426)") - if not curs.fetchall()[0][0]: - raise CommandError("Failed to get advisory lock, existing frequent reminder process stuck?") - # Only conferences that are actually running right now need to be considered. # Normally this is likely just one. - # We can also filter for conferences that actually have reminders active. - # Right now that's only twitter reminders, but in the future there can be - # more plugins. - has_error = False - for conference in Conference.objects.filter(twitterreminders_active=True, - startdate__lte=today_global() + timedelta(days=2), - enddate__gte=today_global() - timedelta(days=2)) \ - .exclude(twitter_token='') \ - .exclude(twitter_secret=''): + for conference in Conference.objects.filter(startdate__lte=today_global() + timedelta(days=2), + enddate__gte=today_global() - timedelta(days=2)): # Re-get the conference object to switch the timezone for django conference = get_conference_or_404(conference.urlname) - tw = Twitter(conference) with transaction.atomic(): - # Sessions that can take reminders (yes we could make a more complete join at one - # step here, but that will likely fall apart later with more integrations anyway) + # Sessions that can take reminders for s in ConferenceSession.objects.select_related('room') \ .filter(conference=conference, starttime__gt=timezone.now(), starttime__lt=timezone.now() + timedelta(minutes=15), status=1, reminder_sent=False): + + send_private_broadcast(conference, + 'The session "{0}" will start soon (at {1}){2}'.format( + s.title, + timezone.localtime(s.starttime).strftime("%H:%M"), + s.room and " in room {}".format(s.room) or '', + ), + expiry=timedelta(minutes=15)) + + # Now also send DM reminders out to the speakers who have registered to get one for reg in ConferenceRegistration.objects.filter( conference=conference, attendee__speaker__conferencesession=s): @@ -83,18 +70,10 @@ class Command(BaseCommand): timezone.localtime(s.starttime).strftime("%H:%M"), s.room and s.room.roomname or 'unknown', ) - if reg.twittername: - # Twitter name registered, so send reminder - ok, code, err = tw.send_message(reg.twittername, msg) - if not ok and code != 150: - # Code 150 means trying to send DM to user not following us, so just - # ignore that one. Other errors should be shown. - self.stderr.write("Failed to send twitter DM to {0}: {1}".format(reg.twittername, err)) - has_error = True + + # Send the message. Make it expire in 15 minutes, because that's after + # the session started anyway. + send_reg_direct_message(reg, msg, expiry=timedelta(minutes=15)) s.reminder_sent = True s.save() - - if has_error: - self.stderr.write("One or more messages failed to send. They will *not* be retried!") - sys.exit(1) diff --git a/postgresqleu/confreg/management/commands/confreg_post_news.py b/postgresqleu/confreg/management/commands/confreg_post_news.py index 58677276..da5e3805 100644 --- a/postgresqleu/confreg/management/commands/confreg_post_news.py +++ b/postgresqleu/confreg/management/commands/confreg_post_news.py @@ -1,8 +1,8 @@ # -# Post tweets about news. +# Post social media broadcast about news. # -# This doesn't actually post the tweets -- it just places them in the -# outbound queue for the global twitter posting script to handle. +# This doesn't actually make a post -- it just places them in the +# outbound queue for the global social media script to handle. # # Copyright (C) 2019, PostgreSQL Europe @@ -16,25 +16,25 @@ from django.conf import settings from datetime import timedelta from postgresqleu.confreg.models import ConferenceNews -from postgresqleu.confreg.twitter import post_conference_tweet +from postgresqleu.confreg.twitter import post_conference_social class Command(BaseCommand): - help = 'Schedule tweets about conference news' + help = 'Schedule social media posts about conference news' class ScheduledJob: - scheduled_interval = timedelta(minutes=10) + scheduled_interval = timedelta(minutes=5) internal = True @classmethod def should_run(self): - # Any untweeted news from a conference with twitter active where the news is dated in the past (so that it + # Any unposted news from a conference where the news is dated in the past (so that it # is actually visible), but not more than 7 days in the past (in which case we skip it). - return ConferenceNews.objects.filter(tweeted=False, conference__twittersync_active=True, datetime__lt=timezone.now(), datetime__gt=timezone.now() - timedelta(days=7)).exists() + return ConferenceNews.objects.filter(tweeted=False, datetime__lt=timezone.now(), datetime__gt=timezone.now() - timedelta(days=7)).exists() @transaction.atomic def handle(self, *args, **options): - for n in ConferenceNews.objects.filter(tweeted=False, conference__twittersync_active=True, datetime__lt=timezone.now(), datetime__gt=timezone.now() - timedelta(days=7)): + for n in ConferenceNews.objects.filter(tweeted=False, datetime__lt=timezone.now(), datetime__gt=timezone.now() - timedelta(days=7)): statusstr = "{0} {1}/events/{2}/news/{3}-{4}/".format( n.title[:250 - 40], settings.SITEBASE, @@ -42,6 +42,6 @@ class Command(BaseCommand): slugify(n.title), n.id, ) - post_conference_tweet(n.conference, statusstr, approved=True) + post_conference_social(n.conference, statusstr, approved=True) n.tweeted = True n.save() diff --git a/postgresqleu/confreg/migrations/0075_messaging.py b/postgresqleu/confreg/migrations/0075_messaging.py new file mode 100644 index 00000000..8b1a56cc --- /dev/null +++ b/postgresqleu/confreg/migrations/0075_messaging.py @@ -0,0 +1,215 @@ +# Generated by Django 2.2.11 on 2020-04-20 12:17 + +import django.contrib.postgres.fields.jsonb +from django.contrib.postgres.indexes import GinIndex +import django.core.serializers.json +from django.db import migrations, models +from django.conf import settings + + +def migrate_news_twitter(apps, schema_editor): + if getattr(settings, 'TWITTER_NEWS_TOKEN', None): + apps.get_model('confreg', 'MessagingProvider')( + series=None, + internalname='Twitter news', + publicname='Twitter', + classname='postgresqleu.util.messaging.twitter.Twitter', + active=True, + config={ + 'token': settings.TWITTER_NEWS_TOKEN, + 'secret': settings.TWITTER_NEWS_TOKENSECRET, + }, + route_incoming=None, + ).save() + + +def unmigrate_news_twitter(apps, schema_editor): + aps.get_model('confreg', 'MessagingProvider').objects.filter(classname='postgresqleu.util.messaging.twitter.Twitter', series__isnull=True).delete() + + +class Migration(migrations.Migration): + + dependencies = [ + ('confreg', '0074_activate_timezones'), + ('util', '0003_oauthapps'), + ] + + operations = [ + migrations.CreateModel( + name='MessagingProvider', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('series', models.ForeignKey('confreg.ConferenceSeries', null=True, blank=True, on_delete=models.CASCADE)), + ('internalname', models.CharField(max_length=100, verbose_name='Internal name')), + ('publicname', models.CharField(max_length=100, verbose_name='Public name')), + ('classname', models.CharField(max_length=200, verbose_name='Implementation class')), + ('active', models.BooleanField(null=False, blank=False, default=False)), + ('config', django.contrib.postgres.fields.jsonb.JSONField(default=dict, encoder=django.core.serializers.json.DjangoJSONEncoder)), + ('route_incoming', models.ForeignKey('confreg.Conference', null=True, blank=True, on_delete=models.SET_NULL, verbose_name='Route incoming messages to', related_name='incoming_messaging_route_for')), + ('public_checkpoint', models.BigIntegerField(null=False, blank=False, default=0)), + ('public_lastpoll', models.DateTimeField(null=False, blank=False, auto_now_add=True)), + ('private_checkpoint', models.BigIntegerField(null=False, blank=False, default=0)), + ('private_lastpoll', models.DateTimeField(null=False, blank=False, auto_now_add=True)), + ], + options={ + 'ordering': ('internalname', ), + } + ), + migrations.CreateModel( + name='ConferenceMessaging', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('broadcast', models.BooleanField(default=False, verbose_name='Broadcasts')), + ('privatebcast', models.BooleanField(default=False, verbose_name='Attendee only broadcasts')), + ('notification', models.BooleanField(default=False, verbose_name='Private notifications')), + ('orgnotification', models.BooleanField(default=False, verbose_name='Organizer notifications')), + ('config', django.contrib.postgres.fields.jsonb.JSONField(default=dict)), + ('conference', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='confreg.Conference')), + ('provider', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='confreg.MessagingProvider')), + ], + options={ + 'verbose_name': 'messaging configuration', + 'ordering': ('provider__name',), + }, + ), + migrations.AlterUniqueTogether( + name='conferencemessaging', + unique_together=set([('conference', 'provider', )]), + ), + # Create messaging providers for any conference series with existing twitter accounts. + # Pick the latest conference in each series that actually has a token. + migrations.RunSQL("""INSERT INTO confreg_messagingprovider + (series_id, internalname, publicname, classname, config, active, route_incoming_id, private_lastpoll, private_checkpoint, public_lastpoll, public_checkpoint) +SELECT DISTINCT ON (s.name) s.id, 'Twitter ' || twitter_user, 'Twitter', + 'postgresqleu.util.messaging.twitter.Twitter', + jsonb_build_object('token', c.twitter_token, 'secret', c.twitter_secret, 'screen_name', c.twitter_user), + true, + CASE WHEN c.twitterincoming_active THEN c.id ELSE NULL END, + current_timestamp, 0, current_timestamp, 0 +FROM confreg_conferenceseries s +INNER JOIN confreg_conference c ON c.series_id=s.id +WHERE twitter_token != '' +ORDER BY s.name, c.twittersync_active desc, c.startdate desc +"""), + + migrations.AddField( + model_name='ConferenceTweetQueue', + name='remainingtosend', + field=models.ManyToManyField('confreg.MessagingProvider', blank=True), + ), + migrations.AddField( + model_name='ConferenceTweetqueue', + name='postids', + field=django.contrib.postgres.fields.jsonb.JSONField(default=dict), + ), + migrations.RunSQL("""UPDATE confreg_conferencetweetqueue cq SET postids=jsonb_build_object(tweetid, (SELECT mp.id FROM confreg_messagingprovider mp INNER JOIN confreg_conference c ON c.series_id=mp.series_id WHERE c.id=cq.conference_id AND mp.classname='postgresqleu.util.messaging.twitter.Twitter')) WHERE tweetid!=0""" + ), + migrations.RemoveField( + model_name='ConferenceTweetQueue', + name='tweetid', + ), + migrations.AlterField( + model_name='ConferenceTweetQueue', + name='conference', + field=models.ForeignKey('confreg.Conference', null=True, on_delete=models.CASCADE), + ), + migrations.AlterField( + model_name='ConferenceTweetQueue', + name='contents', + field=models.CharField(max_length=1000, null=False, blank=False), + ), + migrations.AlterUniqueTogether( + name='conferencetweetqueue', + unique_together=set(), + ), + migrations.AlterField( + model_name='ConferenceIncomingTweet', + name='statusid', + field=models.BigIntegerField(null=False, blank=False), + ), + migrations.AddField( + model_name='ConferenceIncomingTweet', + name='provider', + field=models.ForeignKey('confreg.MessagingProvider', null=True, on_delete=models.SET_NULL), + ), + migrations.AlterUniqueTogether( + name='conferenceincomingtweet', + unique_together=set([('statusid', 'provider')]), + ), + migrations.AlterUniqueTogether( + name='conferenceincomingtweetmedia', + unique_together=set([('incomingtweet', 'sequence')]), + ), + migrations.AddIndex( + model_name='conferencetweetqueue', + index=GinIndex(name='tweetqueue_postids_idx', fields=['postids'], opclasses=['jsonb_path_ops']), + ), + migrations.AddField( + model_name='conferenceregistration', + name='messaging', + field=models.ForeignKey('ConferenceMessaging', null=True, blank=True, on_delete=models.SET_NULL), + ), + migrations.AddField( + model_name='conferenceregistration', + name='messaging_copiedfrom', + field=models.ForeignKey('Conference', null=True, blank=True, on_delete=models.SET_NULL, related_name='reg_messaging_copiedfrom'), + ), + migrations.AddField( + model_name='conferenceregistration', + name='messaging_config', + field=django.contrib.postgres.fields.jsonb.JSONField(default=dict), + ), + migrations.CreateModel( + name='NotificationQueue', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('time', models.DateTimeField()), + ('expires', models.DateTimeField()), + ('channel', models.CharField(blank=True, max_length=50, null=True)), + ('msg', models.TextField()), + ('messaging', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='confreg.ConferenceMessaging')), + ('reg', models.ForeignKey(null=True, blank=True, on_delete=django.db.models.deletion.CASCADE, to='confreg.ConferenceRegistration')), + ], + ), + migrations.CreateModel( + name='IncomingDirectMessage', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('provider', models.ForeignKey('confreg.MessagingProvider', null=False, blank=False, on_delete=models.CASCADE)), + ('time', models.DateTimeField(null=False, blank=False)), + ('postid', models.BigIntegerField(null=False, blank=False)), + ('internallyprocessed', models.BooleanField(null=False, blank=False, default=False)), + ('sender', django.contrib.postgres.fields.jsonb.JSONField(default=dict)), + ('txt', models.TextField(null=False, blank=True)), + ], + ), + migrations.AlterUniqueTogether( + name='incomingdirectmessage', + unique_together=set([('postid', 'provider', )]), + ), + migrations.RunPython(migrate_news_twitter, unmigrate_news_twitter), + migrations.RemoveField( + model_name='Conference', + name='twittersync_active', + ), + migrations.RemoveField( + model_name='Conference', + name='twitterincoming_active', + ), + migrations.RemoveField( + model_name='Conference', + name='twitterreminders_active', + ), + migrations.RemoveField( + model_name='Conference', + name='twitter_user', + ), + migrations.RemoveField( + model_name='Conference', + name='twitter_token', + ), + migrations.RemoveField( + model_name='Conference', + name='twitter_secret', + ), + ] diff --git a/postgresqleu/confreg/models.py b/postgresqleu/confreg/models.py index 3699d0bf..494384a6 100644 --- a/postgresqleu/confreg/models.py +++ b/postgresqleu/confreg/models.py @@ -12,6 +12,7 @@ from django.utils.functional import cached_property from django.utils import timezone from django.template.defaultfilters import slugify from django.contrib.postgres.fields import DateTimeRangeField, JSONField +from django.contrib.postgres.indexes import GinIndex from django.core.serializers.json import DjangoJSONEncoder from django.utils import timezone @@ -21,6 +22,7 @@ from postgresqleu.util.validators import PictureUrlValidator from postgresqleu.util.forms import ChoiceArrayField from postgresqleu.util.fields import LowercaseEmailField, ImageBinaryField from postgresqleu.util.time import today_conference +from postgresqleu.util.db import exec_no_result import base64 import datetime @@ -121,6 +123,8 @@ class ConferenceSeries(models.Model): visible = models.BooleanField(null=False, default=True) administrators = models.ManyToManyField(User, blank=True) + _safe_attributes = ('name', 'intro', 'visible') + def __str__(self): return self.name @@ -174,12 +178,6 @@ class Conference(models.Model): schedulewidth = models.IntegerField(blank=False, default=600, null=False, verbose_name="Width of HTML schedule") pixelsperminute = models.FloatField(blank=False, default=1.5, null=False, verbose_name="Vertical pixels per minute") confurl = models.CharField(max_length=128, blank=False, null=False, validators=[validate_lowercase, ], verbose_name="Conference URL") - twittersync_active = models.BooleanField(null=False, default=False, verbose_name='Twitter posting active') - twitterincoming_active = models.BooleanField(null=False, default=False, verbose_name='Twitter incoming polling active') - twitterreminders_active = models.BooleanField(null=False, default=False, verbose_name='Twitter reminder DMs active') - twitter_user = models.CharField(max_length=32, blank=True, null=False) - twitter_token = models.CharField(max_length=128, blank=True, null=False) - twitter_secret = models.CharField(max_length=128, blank=True, null=False) twitter_timewindow_start = models.TimeField(null=False, blank=False, default='00:00', verbose_name="Don't post tweets before") twitter_timewindow_end = models.TimeField(null=False, blank=False, default='23:59:59', verbose_name="Don't post tweets after") twitter_postpolicy = models.IntegerField(null=False, blank=False, default=0, choices=TWITTER_POST_CHOICES, @@ -225,7 +223,9 @@ class Conference(models.Model): 'callforpapersintro', 'callforpapersopen', 'callforpaperstags', 'allowedit', 'conferencefeedbackopen', 'confurl', 'contactaddr', 'tickets', 'conferencedatestr', 'location', 'welcomemail', - 'feedbackopen', 'skill_levels', 'urlname', 'conferencename') + 'feedbackopen', 'skill_levels', 'urlname', 'conferencename', + 'series', + ) def safe_export(self): d = dict((a, getattr(self, a) and str(getattr(self, a))) for a in self._safe_attributes) @@ -295,6 +295,10 @@ class Conference(models.Model): return False + @cached_property + def has_social_broadcast(self): + return self.conferencemessaging_set.filter(broadcast=True, provider__active=True).exists() + @property def needs_data_purge(self): return self.enddate < today_conference() and not self.personal_data_purged @@ -574,6 +578,11 @@ class ConferenceRegistration(models.Model): # as a QR code on a badge, for others to scan. publictoken = models.TextField(null=False, blank=False, unique=True) + # Messaging configuration + messaging = models.ForeignKey('ConferenceMessaging', null=True, blank=True, on_delete=models.SET_NULL) + messaging_copiedfrom = models.ForeignKey(Conference, null=True, blank=True, on_delete=models.SET_NULL, related_name='reg_messaging_copiedfrom') + messaging_config = JSONField(null=False, blank=False, default=dict) + @property def fullname(self): return "%s %s" % (self.firstname, self.lastname) @@ -649,7 +658,7 @@ class ConferenceRegistration(models.Model): @cached_property def is_tweeter(self): - if self.conference.twittersync_active: + if self.conference.has_social_broadcast: if self.conference.twitter_postpolicy != 0: if self.conference.administrators.filter(pk=self.attendee_id).exists(): return True @@ -1329,29 +1338,98 @@ class ConferenceHashtag(models.Model): ordering = ['hashtag', ] -class ConferenceTweetQueue(models.Model): +class MessagingProvider(models.Model): + series = models.ForeignKey(ConferenceSeries, null=True, blank=True, on_delete=models.CASCADE) + internalname = models.CharField(max_length=100, null=False, blank=False, verbose_name='Internal name') + publicname = models.CharField(max_length=100, null=False, blank=False, verbose_name='Public name') + classname = models.CharField(max_length=200, null=False, blank=False, verbose_name="Implementation class") + active = models.BooleanField(null=False, blank=False, default=False) + config = JSONField(blank=False, null=False, default=dict, encoder=DjangoJSONEncoder) + route_incoming = models.ForeignKey(Conference, null=True, blank=True, verbose_name="Route incoming messages to", on_delete=models.SET_NULL, related_name='incoming_messaging_route_for') + private_checkpoint = models.BigIntegerField(null=False, blank=False, default=0) + private_lastpoll = models.DateTimeField(null=False, blank=False, auto_now_add=True) + public_checkpoint = models.BigIntegerField(null=False, blank=False, default=0) + public_lastpoll = models.DateTimeField(null=False, blank=False, auto_now_add=True) + + def __str__(self): + return self.internalname + + class Meta: + ordering = ('internalname', ) + + +class ConferenceMessaging(models.Model): conference = models.ForeignKey(Conference, null=False, on_delete=models.CASCADE) + provider = models.ForeignKey(MessagingProvider, null=False, blank=False, on_delete=models.CASCADE) + + broadcast = models.BooleanField(null=False, blank=False, default=False, verbose_name='Broadcasts') + privatebcast = models.BooleanField(null=False, blank=False, default=False, verbose_name='Attendee only broadcasts') + notification = models.BooleanField(null=False, blank=False, default=False, verbose_name='Private notifications') + orgnotification = models.BooleanField(null=False, blank=False, default=False, verbose_name='Organizer notifications') + config = JSONField(blank=False, null=False, default=dict) + + class Meta: + verbose_name = 'messaging configuration' + ordering = ('provider__name', ) + unique_together = ( + ('conference', 'provider'), + ) + + def __str__(self): + return self.provider.publicname + + @property + def full_info(self): + if self.notification and self.privatebcast: + return "{} - personal notifications and announcements".format(self) + elif self.notification: + return "{} - personal notifications only".format(self) + elif self.privatebcast: + return "{} - announcements only".format(self) + else: + return str(self) + + +class ConferenceTweetQueue(models.Model): + conference = models.ForeignKey(Conference, null=True, on_delete=models.CASCADE) datetime = models.DateTimeField(blank=False, default=timezone.now, verbose_name="Date and time", help_text="Date and time to send tweet") - contents = models.CharField(max_length=250, null=False, blank=False) + contents = models.CharField(max_length=1000, null=False, blank=False) image = ImageBinaryField(null=True, blank=True, max_length=1000000) imagethumb = ImageBinaryField(null=True, blank=True, max_length=100000) approved = models.BooleanField(null=False, default=False, blank=False) author = models.ForeignKey(User, null=True, blank=True, on_delete=models.CASCADE) approvedby = models.ForeignKey(User, null=True, blank=True, related_name="tweetapprovals", on_delete=models.CASCADE) sent = models.BooleanField(null=False, default=False, blank=False) - tweetid = models.BigIntegerField(null=False, blank=False, default=0, db_index=True) + postids = JSONField(null=False, blank=False, default=dict) replytotweetid = models.BigIntegerField(null=True, blank=True, verbose_name="Reply to tweet") + remainingtosend = models.ManyToManyField(MessagingProvider, blank=True) class Meta: ordering = ['sent', 'datetime', ] verbose_name_plural = 'Conference Tweets' verbose_name = 'Conference Tweet' + indexes = [ + GinIndex(name='tweetqueue_postids_idx', fields=['postids'], opclasses=['jsonb_path_ops']), + ] + + def save(self, *args, **kwargs): + super().save(*args, **kwargs) + + # When we are saving, *if* we have not yet been sent, materialize a list of + # which providers to send to. + if self.approved and not self.sent: + if self.conference: + self.remainingtosend.set(MessagingProvider.objects.filter(active=True, conferencemessaging__conference=self.conference, conferencemessaging__broadcast=True)) + else: + self.remainingtosend.set(MessagingProvider.objects.filter(active=True, series__isnull=True)) + exec_no_result("NOTIFY pgeu_broadcast") class ConferenceIncomingTweet(models.Model): conference = models.ForeignKey(Conference, null=False, on_delete=models.CASCADE) - statusid = models.BigIntegerField(null=False, blank=False, unique=True) + provider = models.ForeignKey(MessagingProvider, null=True, on_delete=models.SET_NULL) + statusid = models.BigIntegerField(null=False, blank=False) created = models.DateTimeField(null=False, blank=False) processedat = models.DateTimeField(null=True, blank=True) processedby = models.ForeignKey(User, null=True, blank=True, on_delete=models.CASCADE) @@ -1366,8 +1444,42 @@ class ConferenceIncomingTweet(models.Model): quoted_permalink = models.URLField(max_length=1024, null=True, blank=True) retweetstate = models.IntegerField(null=False, blank=False, default=0, choices=((0, 'No retweet'), (1, 'Scheduled'), (2, 'Retweeted'))) + class Meta: + unique_together = ( + ('statusid', 'provider'), + ) + class ConferenceIncomingTweetMedia(models.Model): incomingtweet = models.ForeignKey(ConferenceIncomingTweet, null=False, blank=False, on_delete=models.CASCADE) sequence = models.IntegerField(null=False, blank=False) mediaurl = models.URLField(max_length=1024, null=False, blank=False) + + class Meta: + unique_together = ( + ('incomingtweet', 'sequence'), + ) + + +# Either reg *or* channel is set! +class NotificationQueue(models.Model): + time = models.DateTimeField(null=False, blank=False) + expires = models.DateTimeField(null=False, blank=False) + messaging = models.ForeignKey(ConferenceMessaging, null=False, blank=False, on_delete=models.CASCADE) + reg = models.ForeignKey(ConferenceRegistration, null=True, blank=True, on_delete=models.CASCADE) + channel = models.CharField(max_length=50, null=True, blank=True) + msg = models.TextField(null=False, blank=False) + + +class IncomingDirectMessage(models.Model): + provider = models.ForeignKey(MessagingProvider, null=False, blank=False, on_delete=models.CASCADE) + time = models.DateTimeField(null=False, blank=False) + postid = models.BigIntegerField(null=False, blank=False) + internallyprocessed = models.BooleanField(null=False, blank=False, default=False) + sender = JSONField(null=False, blank=False, default=dict) + txt = models.TextField(null=False, blank=True) + + class Meta: + unique_together = ( + ('postid', 'provider', ), + ) diff --git a/postgresqleu/confreg/twitter.py b/postgresqleu/confreg/twitter.py index 39bd0232..c9cf8784 100644 --- a/postgresqleu/confreg/twitter.py +++ b/postgresqleu/confreg/twitter.py @@ -5,6 +5,7 @@ from django.shortcuts import render, get_object_or_404 from django.http import HttpResponse, HttpResponseRedirect, Http404 from django.views.decorators.csrf import csrf_exempt from django.contrib.postgres.aggregates import ArrayAgg +from django.db import transaction from django.utils import timezone import datetime @@ -14,11 +15,12 @@ from PIL import Image, ImageFile from postgresqleu.scheduler.util import trigger_immediate_job_run from postgresqleu.util.request import get_int_or_error -from .models import ConferenceTweetQueue, ConferenceIncomingTweet +from postgresqleu.util.messaging import ProviderCache +from .models import ConferenceTweetQueue, ConferenceIncomingTweet, ConferenceMessaging from .models import Conference, ConferenceRegistration -def post_conference_tweet(conference, contents, approved=False, posttime=None, author=None): +def post_conference_social(conference, contents, approved=False, posttime=None, author=None): if not posttime: posttime = timezone.now() @@ -54,13 +56,14 @@ def _json_response(d): @csrf_exempt +@transaction.atomic def volunteer_twitter(request, urlname, token): try: conference = Conference.objects.select_related('series').get(urlname=urlname) except Conference.DoesNotExist: raise Http404() - if not conference.twittersync_active: + if not conference.has_social_broadcast: raise Http404() reg = get_object_or_404(ConferenceRegistration, conference=conference, regtoken=token) @@ -77,6 +80,8 @@ def volunteer_twitter(request, urlname, token): canpostdirect = conference.twitter_postpolicy == 4 canmoderate = conference.twitter_postpolicy == 3 + providers = ProviderCache() + if request.method == 'POST': if request.POST.get('op', '') == 'post': approved = False @@ -100,7 +105,7 @@ def volunteer_twitter(request, urlname, token): # Check if we have *exactly the same tweet* in the queue already, in the past 5 minutes. # in which case it's most likely a clicked-too-many-times. - if ConferenceTweetQueue.objects.filter(conference=conference, contents=request.POST['txt'][:280], author=reg.attendee, datetime__gt=timezone.now() - datetime.timedelta(minutes=5)): + if ConferenceTweetQueue.objects.filter(conference=conference, contents=request.POST['txt'], author=reg.attendee, datetime__gt=timezone.now() - datetime.timedelta(minutes=5)): return _json_response({'error': 'Duplicate post detected'}) # Now insert it in the queue, bypassing time validation since it's not an automatically @@ -143,7 +148,12 @@ def volunteer_twitter(request, urlname, token): t.save() if request.POST.get('replyid', None): - ConferenceIncomingTweet.objects.filter(conference=conference, statusid=get_int_or_error(request.POST, 'replyid')).update(processedat=timezone.now(), processedby=reg.attendee) + orig = ConferenceIncomingTweet.objects.select_related('provider').get(conference=conference, statusid=get_int_or_error(request.POST, 'replyid')) + orig.processedat = timezone.now() + orig.processedby = reg.attendee + orig.save() + # When when replying to a tweet, it goes to the original provider *only* + t.remainingtosend.set([orig.provider]) return _json_response({}) elif request.POST.get('op', None) in ('approve', 'discard'): @@ -216,7 +226,14 @@ def volunteer_twitter(request, urlname, token): def _postdata(objs): return [ - {'id': t.id, 'txt': t.contents, 'author': t.author and t.author.username or '', 'time': t.datetime, 'hasimage': t.hasimage} + { + 'id': t.id, + 'txt': t.contents, + 'author': t.author and t.author.username or '', + 'time': t.datetime, + 'hasimage': t.hasimage, + 'delivered': t.sent, + } for t in objs] return _json_response({ @@ -224,15 +241,23 @@ def volunteer_twitter(request, urlname, token): 'latest': _postdata(latest), }) elif request.GET.get('op', None) == 'incoming': - if conference.twitterincoming_active: - incoming = ConferenceIncomingTweet.objects.filter(conference=conference, processedat__isnull=True).order_by('created') - latest = ConferenceIncomingTweet.objects.filter(conference=conference, processedat__isnull=False).order_by('-processedat')[:5] - else: - incoming = latest = [] + incoming = ConferenceIncomingTweet.objects.select_related('provider').filter(conference=conference, processedat__isnull=True).order_by('created') + latest = ConferenceIncomingTweet.objects.select_related('provider').filter(conference=conference, processedat__isnull=False).order_by('-processedat')[:5] def _postdata(objs): return [ - {'id': str(t.statusid), 'txt': t.text, 'author': t.author_screenname, 'authorfullname': t.author_name, 'time': t.created, 'rt': t.retweetstate, 'media': [m for m in t.media if m is not None]} + { + 'id': str(t.statusid), + 'txt': t.text, + 'author': t.author_screenname, + 'authorfullname': t.author_name, + 'time': t.created, + 'rt': t.retweetstate, + 'provider': t.provider.publicname, + 'media': [m for m in t.media if m is not None], + 'url': providers.get(t.provider).get_public_url(t), + 'replymaxlength': providers.get(t.provider).max_post_length, + } for t in objs.annotate(media=ArrayAgg('conferenceincomingtweetmedia__mediaurl'))] return _json_response({ 'incoming': _postdata(incoming), @@ -258,7 +283,18 @@ def volunteer_twitter(request, urlname, token): t.imagethumb = b.getvalue() t.save() - return HttpResponse(t.imagethumb, content_type='image/png') + resp = HttpResponse(content_type='image/png') + resp.write(bytes(t.imagethumb)) + return resp + + # Maximum length from any of the configured providers + providermaxlength = { + m.provider.publicname: providers.get(m.provider).max_post_length + for m in + ConferenceMessaging.objects.select_related('provider').filter(conference=conference, + broadcast=True, + provider__active=True) + } return render(request, 'confreg/twitter.html', { 'conference': conference, @@ -266,4 +302,6 @@ def volunteer_twitter(request, urlname, token): 'poster': canpost and 1 or 0, 'directposter': canpostdirect and 1 or 0, 'moderator': canmoderate and 1 or 0, + 'providerlengths': ", ".join(["{}: {}".format(k, v) for k, v in providermaxlength.items()]), + 'maxlength': max((v for k, v in providermaxlength.items())), }) diff --git a/postgresqleu/confreg/util.py b/postgresqleu/confreg/util.py index 49dbf49f..dc61ca7c 100644 --- a/postgresqleu/confreg/util.py +++ b/postgresqleu/confreg/util.py @@ -14,11 +14,12 @@ import re from postgresqleu.mailqueue.util import send_simple_mail from postgresqleu.util.middleware import RedirectException from postgresqleu.util.time import today_conference +from postgresqleu.util.messaging.util import send_org_notification from postgresqleu.confreg.jinjafunc import JINJA_TEMPLATE_ROOT, render_jinja_conference_template from postgresqleu.confreg.jinjapdf import render_jinja_ticket from .models import PrepaidVoucher, DiscountCode, RegistrationWaitlistHistory -from .models import ConferenceRegistration, Conference +from .models import ConferenceRegistration, Conference, ConferenceSeries from .models import AttendeeMail from .models import ConferenceRegistrationLog @@ -396,6 +397,19 @@ def get_authenticated_conference(request, urlname=None, confid=None): raise PermissionDenied() +def get_authenticated_series(request, seriesid): + if not request.user.is_authenticated: + raise RedirectException("{0}?{1}".format(settings.LOGIN_URL, urllib.parse.urlencode({'next': request.build_absolute_uri()}))) + + s = get_object_or_404(ConferenceSeries, pk=seriesid) + if request.user.is_superuser: + return s + else: + if s.administrators.filter(pk=request.user.id).exists(): + return s + raise PermissionDenied() + + def get_conference_or_404(urlname): conference = get_object_or_404(Conference, urlname=urlname) @@ -411,6 +425,7 @@ def send_conference_notification(conference, subject, message): subject, message, sendername=conference.conferencename) + send_org_notification(conference, message) def send_conference_notification_template(conference, subject, templatename, templateattr): diff --git a/postgresqleu/confreg/views.py b/postgresqleu/confreg/views.py index 9538cc96..d3063fbd 100644 --- a/postgresqleu/confreg/views.py +++ b/postgresqleu/confreg/views.py @@ -31,6 +31,7 @@ from .models import RegistrationWaitlistEntry, RegistrationWaitlistHistory from .models import STATUS_CHOICES from .models import ConferenceNews, ConferenceTweetQueue from .models import SavedReportDefinition +from .models import ConferenceMessaging from .forms import ConferenceRegistrationForm, RegistrationChangeForm, ConferenceSessionFeedbackForm from .forms import ConferenceFeedbackForm, SpeakerProfileForm from .forms import CallForPapersForm @@ -59,6 +60,7 @@ from postgresqleu.util.request import get_int_or_error from postgresqleu.util.decorators import superuser_required from postgresqleu.util.random import generate_random_token from postgresqleu.util.time import today_conference +from postgresqleu.util.messaging import get_messaging from postgresqleu.invoices.models import Invoice, InvoicePaymentMethod, InvoiceRow from postgresqleu.invoices.util import InvoiceWrapper from postgresqleu.confwiki.models import Wikipage @@ -194,6 +196,16 @@ def _registration_dashboard(request, conference, reg, has_other_multiregs, redir else: scanned_by_sponsors = None + messaging = ConferenceMessaging.objects.filter(Q(notification=True) | Q(privatebcast=True), conference=conference) + if reg.messaging: + t, c = get_messaging(reg.messaging.provider).get_attendee_string(reg.regtoken, reg.messaging, reg.messaging_config) + if c is None: + current_messaging_info = t + else: + current_messaging_info = render_jinja_conference_template(conference, 'confreg/messaging/{}'.format(t), c) + else: + current_messaging_info = '' + return render_conference_response(request, conference, 'reg', 'confreg/registration_dashboard.html', { 'redir_root': redir_root, 'reg': reg, @@ -209,6 +221,8 @@ def _registration_dashboard(request, conference, reg, has_other_multiregs, redir 'scanned_by_sponsors': scanned_by_sponsors, 'changeform': changeform, 'displayfields': displayfields, + 'current_messaging_info': current_messaging_info, + 'messaging': messaging, }) @@ -417,6 +431,30 @@ def changereg(request, confname): @login_required @transaction.atomic +def reg_config_messaging(request, confname): + conference = get_conference_or_404(confname) + reg = get_object_or_404(ConferenceRegistration, conference=conference, attendee=request.user, payconfirmedat__isnull=False) + + if request.method != 'POST': + raise Http404() + + if request.POST.get('op', None) == 'deactivate': + reg.messaging = None + reg.messaging_copiedfrom = None + reg.messaging_config = {} + else: + # Else we're at the setup one + reg.messaging = get_object_or_404(ConferenceMessaging, Q(id=request.POST['messagingid'], conference=conference) & (Q(privatebcast=True) | Q(notification=True))) + reg.messaging_copiedfrom = None + reg.messaging_config = {} + + reg.save(update_fields=['messaging', 'messaging_copiedfrom', 'messaging_config']) + + return HttpResponseRedirect('../#notifications') + + +@login_required +@transaction.atomic def multireg(request, confname, regid=None): # "Register for somebody else" functionality. conference = get_conference_or_404(confname) diff --git a/postgresqleu/confsponsor/views.py b/postgresqleu/confsponsor/views.py index 1c247b22..7eee79b6 100644 --- a/postgresqleu/confsponsor/views.py +++ b/postgresqleu/confsponsor/views.py @@ -20,7 +20,7 @@ from postgresqleu.confreg.models import Conference, PrepaidVoucher, PrepaidBatch from postgresqleu.confreg.util import get_authenticated_conference, get_conference_or_404 from postgresqleu.confreg.jinjafunc import render_sandboxed_template from postgresqleu.confreg.util import send_conference_mail, send_conference_notification -from postgresqleu.confreg.twitter import post_conference_tweet +from postgresqleu.confreg.twitter import post_conference_social from postgresqleu.mailqueue.util import send_simple_mail from postgresqleu.util.storage import InlineEncodedStorage from postgresqleu.util.decorators import superuser_required @@ -809,14 +809,14 @@ def _confirm_benefit(request, benefit): # Potentially send tweet if benefit.benefit.tweet_template: - post_conference_tweet(conference, - render_sandboxed_template(benefit.benefit.tweet_template, { - 'benefit': benefit.benefit, - 'level': benefit.benefit.level, - 'conference': conference, - 'sponsor': benefit.sponsor - }), - approved=True) + post_conference_social(conference, + render_sandboxed_template(benefit.benefit.tweet_template, { + 'benefit': benefit.benefit, + 'level': benefit.benefit.level, + 'conference': conference, + 'sponsor': benefit.sponsor + }), + approved=True) def _unclaim_benefit(request, claimed_benefit): diff --git a/postgresqleu/newsevents/backendforms.py b/postgresqleu/newsevents/backendforms.py index 63292525..d6902142 100644 --- a/postgresqleu/newsevents/backendforms.py +++ b/postgresqleu/newsevents/backendforms.py @@ -3,6 +3,7 @@ from django.contrib.auth.models import User from postgresqleu.util.backendforms import BackendForm from postgresqleu.newsevents.models import News, NewsPosterProfile +from postgresqleu.confreg.backendforms import BackendTweetQueueForm class BackendNewsForm(BackendForm): @@ -49,3 +50,8 @@ class BackendAuthorForm(BackendForm): # We must force the system to do an insert at this point. Since we set 'pk', # it will otherwise think it's an edit, do an UPDATE, and fail. self.force_insert = True + + +class BackendPostQueueForm(BackendTweetQueueForm): + verbose_name = 'news social media post' + verbose_name_plural = 'news social media posts' diff --git a/postgresqleu/newsevents/backendviews.py b/postgresqleu/newsevents/backendviews.py index a65b5853..8488cbca 100644 --- a/postgresqleu/newsevents/backendviews.py +++ b/postgresqleu/newsevents/backendviews.py @@ -1,8 +1,13 @@ from django.core.exceptions import PermissionDenied +from django.shortcuts import get_object_or_404 from postgresqleu.util.backendviews import backend_list_editor from postgresqleu.util.auth import authenticate_backend_group +from postgresqleu.util.messaging import messaging_implementations, get_messaging_class +from postgresqleu.confreg.models import ConferenceTweetQueue, MessagingProvider +from postgresqleu.confreg.backendforms import BackendSeriesMessagingForm from postgresqleu.newsevents.backendforms import BackendNewsForm, BackendAuthorForm +from postgresqleu.newsevents.backendforms import BackendPostQueueForm def edit_news(request, rest): @@ -32,3 +37,58 @@ def edit_author(request, rest): topadmin='News', return_url='/admin/', ) + + +def edit_postqueue(request, rest): + authenticate_backend_group(request, 'News administrators') + + return backend_list_editor(request, + None, + BackendPostQueueForm, + rest, + bypass_conference_filter=True, + object_queryset=ConferenceTweetQueue.objects.filter(conference__isnull=True), + topadmin='News', + return_url='/admin/', + ) + + +def edit_messagingproviders(request, rest): + if not request.user.is_superuser: + raise PermissionDenied("Access denied") + + def _load_messaging_formclass(classname): + return getattr(get_messaging_class(classname), 'provider_form_class', BackendSeriesMessagingForm) + + formclass = BackendSeriesMessagingForm + u = rest and rest.rstrip('/') or rest + if u and u != '' and u.isdigit(): + # Editing an existing one, so pick the correct subclass! + provider = get_object_or_404(MessagingProvider, pk=u, series__isnull=True) + formclass = _load_messaging_formclass(provider.classname) + elif u == 'new': + if '_newformdata' in request.POST or 'classname' in request.POST: + if '_newformdata' in request.POST: + c = request.POST['_newformdata'].split(':')[0] + else: + c = request.POST['classname'] + + if c not in messaging_implementations: + raise PermissionDenied() + + formclass = _load_messaging_formclass(c) + + # Note! Sync with confreg/backendviews.py + formclass.no_incoming_processing = True + formclass.verbose_name = 'news messaging provider' + formclass.verbose_name_plural = 'news messaging providers' + + return backend_list_editor(request, + None, + formclass, + rest, + bypass_conference_filter=True, + object_queryset=MessagingProvider.objects.filter(series__isnull=True), + topadmin='News', + return_url='/admin/', + ) diff --git a/postgresqleu/newsevents/management/commands/news_social_post.py b/postgresqleu/newsevents/management/commands/news_social_post.py new file mode 100644 index 00000000..b8a79058 --- /dev/null +++ b/postgresqleu/newsevents/management/commands/news_social_post.py @@ -0,0 +1,51 @@ +# +# Make social media posts about news +# +# (actually just writes it to the queue for the next job to pick up) +# + +from django.core.management.base import BaseCommand, CommandError +from django.template.defaultfilters import slugify +from django.db import transaction +from django.conf import settings +from django.utils import timezone + +from datetime import timedelta + +from postgresqleu.newsevents.models import News +from postgresqleu.confreg.models import ConferenceTweetQueue + + +def news_tweets_queryset(): + return News.objects.filter(tweeted=False, datetime__gt=timezone.now() - timedelta(days=7), datetime__lt=timezone.now()) + + +class Command(BaseCommand): + help = 'Post news to social media' + + class ScheduledJob: + internal = True + scheduled_interval = timedelta(minutes=5) + + @classmethod + def should_run(self): + return MessagingProvider.objects.filter(series__isnull=True).exists() and \ + News.objects.filter(tweeted=False, datetime__gt=timezone.now() - timedelta(days=7), datetime__lt=timezone.now()).exists() + + @transaction.atomic + def handle(self, *args, **options): + for n in News.objects.filter(tweeted=False, datetime__gt=timezone.now() - timedelta(days=7), datetime__lt=timezone.now()): + # We hardcode 30 chars for the URL shortener. And then 10 to cover the intro and spacing. + statusstr = "{0} {1}/news/{2}-{3}/".format(n.title[:140 - 40], + settings.SITEBASE, + slugify(n.title), + n.id) + ConferenceTweetQueue( + conference=None, + contents=statusstr, + approved=True, + datetime=n.datetime, + ).save() + + n.tweeted = True + n.save() diff --git a/postgresqleu/newsevents/management/commands/register_news_twitter.py b/postgresqleu/newsevents/management/commands/register_news_twitter.py deleted file mode 100644 index 89219da1..00000000 --- a/postgresqleu/newsevents/management/commands/register_news_twitter.py +++ /dev/null @@ -1,47 +0,0 @@ -# -# Script to register twitter oauth privileges for the main news posting -# -# NOTE! Conferences are registered directly from the conference management -# interface! -# -# - -from django.core.management.base import BaseCommand, CommandError -from django.conf import settings - -import requests_oauthlib - - -class Command(BaseCommand): - help = 'Register with twitter oauth' - - def handle(self, *args, **options): - if not settings.TWITTER_CLIENT: - raise CommandError("TWITTER_CLIENT must be set in settings_local.py") - if not settings.TWITTER_CLIENTSECRET: - raise CommandError("TWITTER_CLIENTSECRET must be set in settings_local.py") - if settings.TWITTER_NEWS_TOKEN: - raise CommandError("TWITTER_NEWS_TOKEN is already set in settings_local.py") - if settings.TWITTER_NEWS_TOKENSECRET: - raise CommandError("TWITTER_NEWS_TOKENSECRET is already set in settings_local.py") - - # OK, now we're good to go :) - oauth = requests_oauthlib.OAuth1Session(settings.TWITTER_CLIENT, settings.TWITTER_CLIENTSECRET) - fetch_response = oauth.fetch_request_token('https://api.twitter.com/oauth/request_token') - - authorization_url = oauth.authorization_url('https://api.twitter.com/oauth/authorize') - print('Please go here and authorize: %s' % authorization_url) - - pin = input('Paste the PIN here: ') - - oauth = requests_oauthlib.OAuth1Session(settings.TWITTER_CLIENT, - settings.TWITTER_CLIENTSECRET, - resource_owner_key=fetch_response.get('oauth_token'), - resource_owner_secret=fetch_response.get('oauth_token_secret'), - verifier=pin) - oauth_tokens = oauth.fetch_access_token('https://api.twitter.com/oauth/access_token') - - print("Authorized. Please configure:") - print("TWITTER_NEWS_TOKEN='%s'" % oauth_tokens.get('oauth_token')) - print("TWITTER_NEWS_TOKENSECRET='%s'" % oauth_tokens.get('oauth_token_secret')) - print("In your local_settings.py!") diff --git a/postgresqleu/newsevents/management/commands/twitter_post.py b/postgresqleu/newsevents/management/commands/twitter_post.py deleted file mode 100644 index ddb75d2e..00000000 --- a/postgresqleu/newsevents/management/commands/twitter_post.py +++ /dev/null @@ -1,107 +0,0 @@ -# -# Script to post previosly unposted news to twitter -# -# - -from django.core.management.base import BaseCommand, CommandError -from django.template.defaultfilters import slugify -from django.db import connection -from django.conf import settings -from django.utils import timezone - -from datetime import datetime, timedelta -import sys -import time - -from postgresqleu.newsevents.models import News -from postgresqleu.confreg.models import Conference, ConferenceNews, ConferenceTweetQueue, ConferenceIncomingTweet - -from postgresqleu.util.messaging.twitter import Twitter - - -def news_tweets_queryset(): - return News.objects.filter(tweeted=False, datetime__gt=timezone.now() - timedelta(days=7), datetime__lt=timezone.now()) - - -def conferences_with_tweets_queryset(): - return Conference.objects.filter(twittersync_active=True).extra(where=[ - "(EXISTS (SELECT 1 FROM confreg_conferencetweetqueue q WHERE q.conference_id=confreg_conference.id AND q.approved AND NOT q.sent) OR EXISTS (SELECT 1 FROM confreg_conferenceincomingtweet i WHERE i.conference_id=confreg_conference.id AND i.retweetstate=1))" - ]) - - -class Command(BaseCommand): - help = 'Post to twitter' - - class ScheduledJob: - scheduled_interval = timedelta(minutes=5) - - @classmethod - def should_run(self): - if settings.TWITTER_NEWS_TOKEN: - if news_tweets_queryset().exists(): - return True - if conferences_with_tweets_queryset().exists(): - return True - - return False - - def handle(self, *args, **options): - curs = connection.cursor() - curs.execute("SELECT pg_try_advisory_lock(981273)") - if not curs.fetchall()[0][0]: - raise CommandError("Failed to get advisory lock, existing twitter_post process stuck?") - - err = False - - if settings.TWITTER_NEWS_TOKEN: - tw = Twitter() - - for a in news_tweets_queryset().order_by('datetime'): - # We hardcode 30 chars for the URL shortener. And then 10 to cover the intro and spacing. - statusstr = "{0} {1}/news/{2}-{3}/".format(a.title[:140 - 40], - settings.SITEBASE, - slugify(a.title), - a.id) - id, msg = tw.post_tweet(statusstr) - if id: - a.tweeted = True - a.save() - else: - err = True - self.stderr.write("Failed to post to twitter: %s" % msg) - - # Don't post more often than once / 10 seconds, to not trigger flooding detection. - time.sleep(10) - - # Send off the conference twitter queue (which should normally only be one or two tweets, due to the filtering - # on datetime. - for c in conferences_with_tweets_queryset(): - tw = Twitter(c) - - for t in ConferenceTweetQueue.objects.filter(conference=c, approved=True, sent=False, datetime__lte=timezone.now()).order_by('datetime'): - id, msg = tw.post_tweet(t.contents, t.image, t.replytotweetid) - if id: - t.sent = True - t.tweetid = id - t.save(update_fields=['sent', 'tweetid', ]) - else: - err = True - self.stderr.write("Failed to post to twitter: %s" % msg) - - # Don't post more often than once / 10 seconds, to not trigger flooding detection. - time.sleep(10) - - for t in ConferenceIncomingTweet.objects.filter(conference=c, retweetstate=1): - ok, msg = tw.retweet(t.statusid) - if ok: - t.retweetstate = 2 - t.save(update_fields=['retweetstate']) - else: - self.stderr.write("Failed to retweet: %s" % msg) - - time.sleep(2) - - if err: - # Error message printed earlier, but we need to exit with non-zero exitcode - # to flag the whole job as failed. - sys.exit(1) diff --git a/postgresqleu/settings.py b/postgresqleu/settings.py index ccfcfa8a..0a8f43aa 100644 --- a/postgresqleu/settings.py +++ b/postgresqleu/settings.py @@ -192,14 +192,6 @@ SCHEDULED_JOBS_EMAIL = DEFAULT_EMAIL # end-user reference, and never actually by the system to send and receive. TREASURER_EMAIL = DEFAULT_EMAIL -# Twitter application keys -TWITTER_CLIENT = "" -TWITTER_CLIENTSECRET = "" - -# Twitter user keys for the account posting main news -TWITTER_NEWS_TOKEN = "" -TWITTER_NEWS_TOKENSECRET = "" - # If there is a local_settings.py, let it override our settings try: from .local_settings import * diff --git a/postgresqleu/urls.py b/postgresqleu/urls.py index b8962df0..774739ea 100644 --- a/postgresqleu/urls.py +++ b/postgresqleu/urls.py @@ -33,6 +33,8 @@ import postgresqleu.accountinfo.views import postgresqleu.util.docsviews import postgresqleu.mailqueue.backendviews import postgresqleu.util.monitor +import postgresqleu.util.views +import postgresqleu.util.backendviews from postgresqleu.newsevents.feeds import LatestNews from postgresqleu.confreg.feeds import LatestEvents, ConferenceNewsFeed @@ -81,6 +83,8 @@ urlpatterns.extend([ # News url(r'^admin/news/news/(.*/)?$', postgresqleu.newsevents.backendviews.edit_news), url(r'^admin/news/authors/(.*/)?$', postgresqleu.newsevents.backendviews.edit_author), + url(r'^admin/news/postqueue/(.*/?)$', postgresqleu.newsevents.backendviews.edit_postqueue), + url(r'^admin/news/messagingproviders/(.*/?)$', postgresqleu.newsevents.backendviews.edit_messagingproviders), # Conference management url(r'^events/(?P<confname>[^/]+)/register/(?P<whatfor>(self)/)?$', postgresqleu.confreg.views.register), @@ -90,6 +94,7 @@ urlpatterns.extend([ url(r'^events/(?P<confname>[^/]+)/register/other/b(?P<bulkid>(\d+))/cancel/$', postgresqleu.confreg.views.multireg_bulk_cancel), url(r'^events/(?P<confname>[^/]+)/register/other/z/$', postgresqleu.confreg.views.multireg_zeropay), url(r'^events/(?P<confname>[^/]+)/register/change/$', postgresqleu.confreg.views.changereg), + url(r'^events/(?P<confname>[^/]+)/register/msgconfig/$', postgresqleu.confreg.views.reg_config_messaging), url(r'^events/register/attach/([a-z0-9]{64})/$', postgresqleu.confreg.views.multireg_attach), url(r'^events/([^/]+)/prepaid/(\d+)/$', postgresqleu.confreg.views.viewvouchers_user), @@ -153,6 +158,7 @@ urlpatterns.extend([ url(r'^events/admin/crossmail/$', postgresqleu.confreg.views.crossmail), url(r'^events/admin/crossmail/options/$', postgresqleu.confreg.views.crossmailoptions), url(r'^events/admin/reports/time/$', postgresqleu.confreg.reporting.timereport), + url(r'^events/admin/_series/(\d+)/$', postgresqleu.confreg.backendviews.manage_series), url(r'^events/admin/([^/]+)/reports/$', postgresqleu.confreg.views.reports), url(r'^events/admin/([^/]+)/reports/simple/$', postgresqleu.confreg.views.simple_report), url(r'^events/admin/([^/]+)/reports/feedback/$', postgresqleu.confreg.feedback.feedback_report), @@ -183,6 +189,7 @@ urlpatterns.extend([ url(r'^events/admin/(\w+)/regdashboard/list/(\d+)/ticket/$', postgresqleu.confreg.backendviews.view_registration_ticket), url(r'^events/admin/(\w+)/regdashboard/list/(\d+)/badge/$', postgresqleu.confreg.backendviews.view_registration_badge), url(r'^events/admin/(\w+)/regdashboard/list/(\d+)/resendwelcome/$', postgresqleu.confreg.views.admin_registration_resendwelcome), + url(r'^events/admin/(\w+)/regdashboard/list/(\d+)/senddm/$', postgresqleu.confreg.backendviews.registration_dashboard_send_dm), url(r'^events/admin/(\w+)/regdashboard/list/sendmail/$', postgresqleu.confreg.backendviews.registration_dashboard_send_email), url(r'^events/admin/(\w+)/prepaid/$', postgresqleu.confreg.views.createvouchers), url(r'^events/admin/(\w+)/prepaid/list/$', postgresqleu.confreg.views.listvouchers), @@ -223,6 +230,7 @@ urlpatterns.extend([ url(r'^events/admin/(\w+)/volunteerslots/(.*/)?$', postgresqleu.confreg.backendviews.edit_volunteerslots), url(r'^events/admin/(\w+)/feedbackquestions/(.*/)?$', postgresqleu.confreg.backendviews.edit_feedbackquestions), url(r'^events/admin/(\w+)/discountcodes/(.*/)?$', postgresqleu.confreg.backendviews.edit_discountcodes), + url(r'^events/admin/(\w+)/messaging/(.*/)?$', postgresqleu.confreg.backendviews.edit_messaging), url(r'^events/admin/(\w+)/accesstokens/(.*/)?$', postgresqleu.confreg.backendviews.edit_accesstokens), url(r'^events/admin/(\w+)/news/(.*/)?$', postgresqleu.confreg.backendviews.edit_news), url(r'^events/admin/(\w+)/tweet/queue/(.*/)?$', postgresqleu.confreg.backendviews.edit_tweetqueue), @@ -235,7 +243,7 @@ urlpatterns.extend([ url(r'^events/admin/(\w+)/multiregs/(\d+)/refund/$', postgresqleu.confreg.backendviews.multireg_refund), url(r'^events/admin/(\w+)/addoptorders/$', postgresqleu.confreg.backendviews.addoptorders), url(r'^events/admin/(\w+)/purgedata/$', postgresqleu.confreg.backendviews.purge_personal_data), - url(r'^events/admin/(\w+)/integ/twitter/$', postgresqleu.confreg.backendviews.twitter_integration), + url(r'^events/admin/_series/(\d+)/messaging/(.*/)?$', postgresqleu.confreg.backendviews.edit_series_messaging), url(r'^events/admin/([^/]+)/talkvote/$', postgresqleu.confreg.views.talkvote), url(r'^events/admin/([^/]+)/talkvote/changestatus/$', postgresqleu.confreg.views.talkvote_status), url(r'^events/admin/([^/]+)/talkvote/vote/$', postgresqleu.confreg.views.talkvote_vote), @@ -329,6 +337,10 @@ urlpatterns.extend([ # Mail queue url(r'^admin/mailqueue/(.*/)?$', postgresqleu.mailqueue.backendviews.edit_mailqueue), + # Webhooks for messaging + url(r'^wh/(\d+)/([a-z0-9]+)/$', postgresqleu.util.views.messaging_webhook), + url(r'^wh/twitter/', postgresqleu.util.views.twitter_webhook), + # Handle paypal data returns url(r'^p/paypal_return/(\d+)/$', postgresqleu.paypal.views.paypal_return_handler), @@ -340,6 +352,9 @@ urlpatterns.extend([ url(r'^accountinfo/search/$', postgresqleu.accountinfo.views.search), url(r'^accountinfo/import/$', postgresqleu.accountinfo.views.importuser), + # OAuth application registry + url(r'^admin/oauthapps/(.*/)?$', postgresqleu.util.backendviews.edit_oauthapps), + # Monitoring endpoints url(r'^monitor/git/$', postgresqleu.util.monitor.gitinfo), url(r'^monitor/nagios/$', postgresqleu.util.monitor.nagios), diff --git a/postgresqleu/util/apps.py b/postgresqleu/util/apps.py index 306fbb94..5576ac04 100644 --- a/postgresqleu/util/apps.py +++ b/postgresqleu/util/apps.py @@ -9,6 +9,7 @@ import types from postgresqleu.util.forms import ConcurrentProtectedModelForm from .auth import PERMISSION_GROUPS +from .oauthapps import connect_oauth_signals # @@ -39,6 +40,8 @@ class UtilAppConfig(AppConfig): name = 'postgresqleu.util' def ready(self): + connect_oauth_signals() + post_migrate.connect(handle_post_migrate, sender=self) # Override the default ModelAdmin in django to add our validation fields diff --git a/postgresqleu/util/backendforms.py b/postgresqleu/util/backendforms.py index 255e08ad..aa9a1e17 100644 --- a/postgresqleu/util/backendforms.py +++ b/postgresqleu/util/backendforms.py @@ -36,6 +36,7 @@ class BackendForm(ConcurrentProtectedModelForm): filtercolumns = {} defaultsort = [] readonly_fields = [] + nosave_fields = [] linked_objects = {} auto_cascade_delete_to = [] fieldsets = [] diff --git a/postgresqleu/util/backendviews.py b/postgresqleu/util/backendviews.py index 9fad8e38..33e07356 100644 --- a/postgresqleu/util/backendviews.py +++ b/postgresqleu/util/backendviews.py @@ -11,6 +11,11 @@ from postgresqleu.util.lists import flatten_list from postgresqleu.confreg.util import get_authenticated_conference from postgresqleu.confreg.backendforms import BackendCopySelectConferenceForm +from .models import OAuthApplication +from .backendforms import BackendForm +from .forms import SelectSetValueField +from .oauthapps import oauth_application_choices, oauth_application_create + def backend_process_form(request, urlname, formclass, id, cancel_url='../', saved_url='../', allow_new=True, allow_delete=True, breadcrumbs=None, permissions_already_checked=False, conference=None, bypass_conference_filter=False, instancemaker=None, deleted_url=None, topadmin=None): if not conference and not bypass_conference_filter: @@ -160,7 +165,7 @@ def backend_process_form(request, urlname, formclass, id, cancel_url='../', save form.pre_create_item() form.save() form._save_m2m() - all_excludes = ['_validator', '_newformdata'] + form.readonly_fields + all_excludes = ['_validator', '_newformdata'] + list(form.readonly_fields) + form.nosave_fields if form.json_form_fields: for fn, ffields in form.json_form_fields.items(): all_excludes.extend(ffields) @@ -170,7 +175,9 @@ def backend_process_form(request, urlname, formclass, id, cancel_url='../', save # Merge fields stored in json if form.json_form_fields: for fn, ffields in form.json_form_fields.items(): - setattr(form.instance, fn, {fld: form.cleaned_data[fld] for fld in ffields}) + d = getattr(form.instance, fn, {}) + d.update({fld: form.cleaned_data[fld] for fld in ffields}) + setattr(form.instance, fn, d) form.instance.save(update_fields=form.json_form_fields.keys()) return HttpResponseRedirect(saved_url) @@ -471,3 +478,75 @@ def backend_handle_copy_previous(request, formclass, restpieces, conference): ], 'helplink': formclass.helplink, }) + + +# +# Special direct views +# +class BackendOAuthappNewForm(forms.Form): + helplink = 'oauth' + apptype = forms.CharField() # Field type will be changed dynamically + baseurl = forms.URLField(label='Base URL') + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + # We have to set this dynamically, otherwise the system won't start up enough to + # run the migrations, for some reason. + self.fields['apptype'] = SelectSetValueField(choices=oauth_application_choices, + setvaluefield='baseurl', label='Type of application') + + def get_newform_data(self): + return "{}:{}".format(self.cleaned_data['apptype'], self.cleaned_data['baseurl']) + + def clean_baseurl(self): + b = self.cleaned_data['baseurl'].rstrip('/') + if OAuthApplication.objects.filter(baseurl=b).exists(): + raise ValidationError("An OAuth provider with this base URL is already configured!") + return b + + +class BackendOAuthappForm(BackendForm): + helplink = 'oauth' + list_fields = ['name', 'baseurl'] + readonly_fields = ['name', 'baseurl'] + form_before_new = BackendOAuthappNewForm + + class Meta: + model = OAuthApplication + fields = ['name', 'baseurl', 'client', 'secret'] + + def fix_fields(self): + super().fix_fields() + if self.newformdata: + (name, baseurl) = self.newformdata.split(':', 1) + self.instance.name = name + self.initial['name'] = name + self.instance.baseurl = baseurl + self.initial['baseurl'] = baseurl + if self.request.method == 'POST' and '_validator' not in self.request.POST: + try: + (client, secret) = oauth_application_create(name, baseurl) + except Exception as e: + messages.error(self.request, str(e)) + return + if client: + self.instance.client = client + self.initial['client'] = client + self.instance.secret = secret + self.initial['secret'] = secret + messages.info(self.request, "OAuth client and secret automaticaly created, just hit save!") + + +def edit_oauthapps(request, rest): + if not request.user.is_superuser: + raise PermissionDenied("Access denied") + + return backend_list_editor(request, + None, + BackendOAuthappForm, + rest, + bypass_conference_filter=True, + topadmin='OAuth', + return_url='/admin/', + ) diff --git a/postgresqleu/util/forms.py b/postgresqleu/util/forms.py index d504e3f3..7ade22c1 100644 --- a/postgresqleu/util/forms.py +++ b/postgresqleu/util/forms.py @@ -10,7 +10,7 @@ import base64 from itertools import groupby from .widgets import InlineImageUploadWidget, InlinePdfUploadWidget -from .widgets import SubmitButtonWidget, SelectSetValueWidget +from .widgets import LinkForCodeWidget, SubmitButtonWidget, SelectSetValueWidget class _ValidatorField(forms.Field): @@ -145,6 +145,10 @@ class PdfBinaryFormField(ImageBinaryFormField): widget = InlinePdfUploadWidget +class LinkForCodeField(forms.Field): + widget = LinkForCodeWidget + + class SubmitButtonField(forms.Field): def __init__(self, *args, **kwargs): if not kwargs: diff --git a/postgresqleu/util/management/commands/check_messaging_integrations.py b/postgresqleu/util/management/commands/check_messaging_integrations.py new file mode 100644 index 00000000..be6f0882 --- /dev/null +++ b/postgresqleu/util/management/commands/check_messaging_integrations.py @@ -0,0 +1,60 @@ +# +# Script to (optionally) validate all messaging integrations +# +# This can include for example checking that webhooks are still existing, +# and marked as valid. Actual implementation depends on the messaging provider. +# + + +from django.core.management.base import BaseCommand, CommandError +from django.db import connection, transaction +from django.utils import timezone + +import datetime +import io +import sys +from collections import defaultdict + +from postgresqleu.util.messaging import get_messaging + +from postgresqleu.confreg.models import MessagingProvider + + +class Command(BaseCommand): + help = 'Validate messaging integrations' + + class ScheduledJob: + scheduled_time = datetime.time(4, 19) + default_notify_on_success = True + + @classmethod + def should_run(self): + return MessagingProvider.objects.filter(active=True).exists() + + def handle(self, *args, **options): + s = io.StringIO() + err = False + + state = defaultdict(dict) + for provider in MessagingProvider.objects.filter(active=True).order_by('classname'): + impl = get_messaging(provider) + + try: + result, out = impl.check_messaging_config(state[provider.classname]) + except Exception as e: + result = False + out = "EXCEPTION: {}\n".format(e) + + if out: + s.write("{}\n".format(provider.internalname)) + s.write("{}\n".format('-' * len(provider.internalname))) + s.write(out) + s.write("\n\n") + if not result: + err = True + + if s.tell() != 0: + print(s.getvalue()) + + if err: + sys.exit(1) diff --git a/postgresqleu/util/management/commands/fetch_direct_messages.py b/postgresqleu/util/management/commands/fetch_direct_messages.py new file mode 100644 index 00000000..61455412 --- /dev/null +++ b/postgresqleu/util/management/commands/fetch_direct_messages.py @@ -0,0 +1,55 @@ +# +# Script to fetch incoming direct messages +# + + +from django.core.management.base import BaseCommand, CommandError +from django.db import connection, transaction +from django.utils import timezone + +from datetime import datetime, timedelta +import sys +import time + +from postgresqleu.util.messaging import get_messaging + +from postgresqleu.confreg.models import MessagingProvider + + +class Command(BaseCommand): + help = 'Fetch direct messages' + + class ScheduledJob: + # Normally this integreates with webhooks, so we run the catchup + # part separately and infrequently. + scheduled_interval = timedelta(minutes=15) + + @classmethod + def should_run(self): + return MessagingProvider.objects.filter(Q(conferencemessaging__privatebcast=True) | Q(conferencemessaging__notification=True) | Q(conferencemessaging__orgnotification=True), active=True, series__isnull=False) + + def handle(self, *args, **options): + curs = connection.cursor() + curs.execute("SELECT pg_try_advisory_lock(983231)") + if not curs.fetchall()[0][0]: + raise CommandError("Failed to get advisory lock, existing fetch_direct_messages process stuck?") + + err = False + + for provider in MessagingProvider.objects.raw("SELECT * FROM confreg_messagingprovider mp WHERE active AND series_id IS NOT NULL AND EXISTS (SELECT 1 FROM confreg_conferencemessaging m WHERE m.provider_id=mp.id AND (m.privatebcast OR m.notification OR m.orgnotification))"): + impl = get_messaging(provider) + + try: + with transaction.atomic(): + (lastpoll, checkpoint) = impl.poll_incoming_private_messages(provider.private_lastpoll, provider.private_checkpoint) + provider.private_lastpoll = lastpoll + provider.private_checkpoint = checkpoint + provider.save(update_fields=['private_lastpoll', 'private_checkpoint']) + except Exception as e: + print("Failed to poll {} for direct messages: {}".format(provider, e)) + err = True + + if err: + # Error message printed earlier, but we need to exit with non-zero exitcode + # to flag the whole job as failed. + sys.exit(1) diff --git a/postgresqleu/util/management/commands/fetch_media_posts.py b/postgresqleu/util/management/commands/fetch_media_posts.py new file mode 100644 index 00000000..ce7d8688 --- /dev/null +++ b/postgresqleu/util/management/commands/fetch_media_posts.py @@ -0,0 +1,66 @@ +# +# Script to fetch incoming social media posts +# + + +from django.core.management.base import BaseCommand, CommandError +from django.db import connection, transaction +from django.utils import timezone + +from datetime import datetime, timedelta +import sys +import time + +from postgresqleu.util.messaging import get_messaging +from postgresqleu.util.messaging.util import store_incoming_post + +from postgresqleu.confreg.models import ConferenceTweetQueue +from postgresqleu.confreg.models import ConferenceIncomingTweet, ConferenceIncomingTweetMedia +from postgresqleu.confreg.models import MessagingProvider + + +class Command(BaseCommand): + help = 'Fetch from social media' + + class ScheduledJob: + scheduled_interval = timedelta(minutes=5) + + @classmethod + def should_run(self): + return MessagingProvider.objects.filter(active=True, series__isnull=False, route_incoming__isnull=False).exists() + + def handle(self, *args, **options): + curs = connection.cursor() + curs.execute("SELECT pg_try_advisory_lock(981231)") + if not curs.fetchall()[0][0]: + raise CommandError("Failed to get advisory lock, existing fetch_media_posts process stuck?") + + err = False + + for provider in MessagingProvider.objects.filter(active=True, series__isnull=False, route_incoming__isnull=False): + impl = get_messaging(provider) + + polltime = timezone.now() + num = 0 + try: + with transaction.atomic(): + for post in impl.poll_public_posts(provider.public_lastpoll, provider.public_checkpoint): + # Update our checkpoint *first*, if it happens that we have already + # seen everything. + provider.public_checkpoint = max(provider.public_checkpoint, post['id']) + + if store_incoming_post(provider, post): + num += 1 + # Always save last polled time, and updated checkpoint if it changed + provider.public_lastpoll = polltime + provider.save(update_fields=['public_checkpoint', 'public_lastpoll']) + if num: + print("Polled {} new posts from {}".format(num, provider)) + except Exception as e: + print("Failed to poll {}: {}".format(provider, e)) + err = True + + if err: + # Error message printed earlier, but we need to exit with non-zero exitcode + # to flag the whole job as failed. + sys.exit(1) diff --git a/postgresqleu/util/management/commands/post_media_broadcasts.py b/postgresqleu/util/management/commands/post_media_broadcasts.py new file mode 100644 index 00000000..bc391035 --- /dev/null +++ b/postgresqleu/util/management/commands/post_media_broadcasts.py @@ -0,0 +1,45 @@ +# +# Script to post previosly unposted news to social media +# + + +from django.core.management.base import BaseCommand, CommandError +from django.template.defaultfilters import slugify +from django.db import connection, transaction +from django.conf import settings +from django.utils import timezone + +from datetime import datetime, timedelta +import sys + +from postgresqleu.util.messaging import ProviderCache +from postgresqleu.util.messaging.sender import send_pending_posts + + +class Command(BaseCommand): + help = 'Post to social media' + + class ScheduledJob: + scheduled_interval = timedelta(minutes=5) + + @classmethod + def should_run(self): + return ConferenceTweetQueue.objects.filter(approved=True, sent=False, datetime__lte=timezone.now()).exists() or ConferenceIncomingTweet.objects.filter(retweetstate=1).exists() + + def handle(self, *args, **options): + curs = connection.cursor() + curs.execute("SELECT pg_try_advisory_lock(981279)") + if not curs.fetchall()[0][0]: + raise CommandError("Failed to get advisory lock, existing post_media_broadcasts process stuck?") + + providers = ProviderCache() + + ok, numposts, numreposts = send_pending_posts(providers) + + if numposts: + print("Sent {} broadcast posts".format(numposts)) + if numreposts: + print("Made {} broadcast reposts".format(numreposts)) + + if not ok: + sys.exit(1) diff --git a/postgresqleu/util/management/commands/send_notifications.py b/postgresqleu/util/management/commands/send_notifications.py new file mode 100644 index 00000000..6ba41dfd --- /dev/null +++ b/postgresqleu/util/management/commands/send_notifications.py @@ -0,0 +1,36 @@ +# +# Script to send outgoing notifications +# + + +from django.core.management.base import BaseCommand, CommandError +from django.utils import timezone +from django.db import connection + +from datetime import timedelta +import sys + +from postgresqleu.util.messaging import ProviderCache +from postgresqleu.util.messaging.sender import send_pending_messages + + +class Command(BaseCommand): + help = 'Send pending notifications' + + class ScheduledJob: + scheduled_interval = timedelta(minutes=10) + + @classmethod + def should_run(self): + return Notification.objects.filter(time__lte=timezone.now()).exists() + + def handle(self, *args, **options): + curs = connection.cursor() + curs.execute("SELECT pg_try_advisory_lock(931779)") + if not curs.fetchall()[0][0]: + raise CommandError("Failed to get advisory lock, existing post_media_broadcasts process stuck?") + + providers = ProviderCache() + + if not send_pending_messages(providers): + sys.exit(1) diff --git a/postgresqleu/util/management/commands/social_media_poster.py b/postgresqleu/util/management/commands/social_media_poster.py new file mode 100644 index 00000000..053a4a32 --- /dev/null +++ b/postgresqleu/util/management/commands/social_media_poster.py @@ -0,0 +1,64 @@ +# +# Daemon to post all queued up notifications and social media posts +# + +from django.core.management.base import BaseCommand, CommandError +from django.core.management import load_command_class +from django.db import connection +from django.utils import autoreload, timezone +from django.conf import settings + +from datetime import timedelta +import time +import io +import sys +import os +import subprocess +import threading +import select + +from postgresqleu.util.messaging.sender import send_pending_messages, send_pending_posts +from postgresqleu.util.messaging import ProviderCache + + +class Command(BaseCommand): + help = 'Daemon to post notification and social media posts' + + def handle(self, *args, **options): + # Automatically exit if our own code changes. + # This is not based on a published API, so quite likely will fail + # and need to be updated in a future version of django + + # Start our work in a background thread + bthread = threading.Thread(target=self.inner_handle) + bthread.setDaemon(True) + bthread.start() + + reloader = autoreload.get_reloader() + while not reloader.should_stop: + reloader.run(bthread) + + self.stderr.write("Underlying code changed, exiting for a restart") + sys.exit(0) + + def inner_handle(self): + with connection.cursor() as curs: + curs.execute("LISTEN pgeu_notification") + curs.execute("LISTEN pgeu_broadcast") + curs.execute("SET application_name = 'pgeu messages/media poster'") + + while True: + providers = ProviderCache() + + send_pending_messages(providers) + send_pending_posts(providers) + + self.eat_notifications() + + # Wake up to check if there is something to do every 5 minutes, just in case + select.select([connection.connection], [], [], 5 * 60) + + def eat_notifications(self): + connection.connection.poll() + while connection.connection.notifies: + connection.connection.notifies.pop() diff --git a/postgresqleu/util/messaging/__init__.py b/postgresqleu/util/messaging/__init__.py index e69de29b..313c1577 100644 --- a/postgresqleu/util/messaging/__init__.py +++ b/postgresqleu/util/messaging/__init__.py @@ -0,0 +1,39 @@ +import re + +# Global regexps +re_token = re.compile('[0-9a-z]{64}') + +messaging_implementations = { + 'postgresqleu.util.messaging.mastodon.Mastodon': ('https://mastodon.social', False), + 'postgresqleu.util.messaging.telegram.Telegram': ('https://api.telegram.org', True), + 'postgresqleu.util.messaging.twitter.Twitter': ('https://api.twitter.com', True), +} + + +def messaging_implementation_choices(): + return [(k, k.split('.')[-1], v[0], v[1]) for k, v in messaging_implementations.items()] + + +def get_messaging_class(classname): + if classname not in messaging_implementations: + raise Exception("Invalid messaging class") + + pieces = classname.split('.') + modname = '.'.join(pieces[:-1]) + classname = pieces[-1] + mod = __import__(modname, fromlist=[classname, ]) + return getattr(mod, classname) + + +def get_messaging(provider): + return get_messaging_class(provider.classname)(provider.id, provider.config) + + +class ProviderCache(object): + def __init__(self): + self.providers = {} + + def get(self, provider): + if provider.id not in self.providers: + self.providers[provider.id] = get_messaging_class(provider.classname)(provider.id, provider.config) + return self.providers[provider.id] diff --git a/postgresqleu/util/messaging/mastodon.py b/postgresqleu/util/messaging/mastodon.py new file mode 100644 index 00000000..8d71c9b0 --- /dev/null +++ b/postgresqleu/util/messaging/mastodon.py @@ -0,0 +1,265 @@ +from django import forms +from django.utils import timezone +from django.utils.html import strip_tags +from django.conf import settings + +import requests_oauthlib +import requests +import dateutil.parser + +from postgresqleu.util.widgets import StaticTextWidget +from postgresqleu.util.forms import LinkForCodeField +from postgresqleu.util.oauthapps import get_oauth_client, get_oauth_secret, has_oauth_data +from postgresqleu.util.models import OAuthApplication +from postgresqleu.util.messaging import re_token + +from postgresqleu.confreg.backendforms import BackendSeriesMessagingForm +from postgresqleu.confreg.models import ConferenceRegistration + +from .util import send_reg_direct_message + + +# We always ask for this scope +MASTODON_SCOPES = "read write:statuses write:media" + + +class MastodonBackendForm(BackendSeriesMessagingForm): + initialconfig = LinkForCodeField(label='Get authorization code') + mastodoninfo = forms.CharField(widget=StaticTextWidget, label="Account information", required=False) + + def __init__(self, *args, **kwargs): + self.baseurl = None + super().__init__(*args, **kwargs) + + def fix_fields(self): + super().fix_fields() + + if self.baseurl: + self.instance.config['baseurl'] = self.baseurl.rstrip('/') + + if self.instance.config.get('token', None): + del self.fields['initialconfig'] + self.config_fields = ['mastodoninfo', ] + self.config_fieldsets = [ + {'id': 'mastodon', 'legend': 'Mastodon', 'fields': ['mastodoninfo', ]}, + ] + self.config_readonly_fields = ['mastodoninfo', ] + + try: + if 'username' not in self.instance.config: + self.instance.config.update(Mastodon(self.instance.id, self.instance.config).get_account_info()) + self.instance.save(update_fields=['config']) + selfinfo = "Connected to mastodon account @{}.".format(self.instance.config['username']) + except Exception as e: + selfinfo = "ERROR verifying Mastodon access: {}".format(e) + + self.initial.update({ + 'mastodoninfo': selfinfo, + }) + else: + # Not configured yet, so prepare for it! + del self.fields['mastodoninfo'] + self.config_fields = ['initialconfig', ] + self.config_fieldsets = [ + {'id': 'mastodon', 'legend': 'Mastodon', 'fields': ['initialconfig', ]}, + ] + self.nosave_fields = ['initialconfig', ] + + # Ugly power-grab here, but let's see what's in our POST + if self.request.POST.get('initialconfig', None): + # Token is included, so don't try to get a new one + self.fields['initialconfig'].widget.authurl = self.request.session['authurl'] + else: + auth_url, state = self._get_oauth_session().authorization_url('{}/oauth/authorize'.format(self.instance.config['baseurl'])) + self.request.session['authurl'] = auth_url + + self.fields['initialconfig'].widget.authurl = auth_url + + def clean(self): + d = super().clean() + if d.get('initialconfig', None): + # We have received an initial config, so try to attach ourselves to mastodon + try: + tokens = self._get_oauth_session().fetch_token( + '{}/oauth/token'.format(self.instance.config['baseurl']), + code=d.get('initialconfig'), + client_secret=get_oauth_secret(self.instance.config['baseurl']), + scopes=MASTODON_SCOPES + ) + + self.instance.config['token'] = tokens['access_token'] + del self.request.session['authurl'] + self.request.session.modified = True + except Exception as e: + self.add_error('initialconfig', 'Could not set up Mastodon: {}'.format(e)) + self.add_error('initialconfig', 'You probably have to restart the process') + return d + + def _get_oauth_session(self): + return requests_oauthlib.OAuth2Session( + get_oauth_client(self.instance.config['baseurl']), + redirect_uri='urn:ietf:wg:oauth:2.0:oob', + scope=MASTODON_SCOPES + ) + + +class Mastodon(object): + provider_form_class = MastodonBackendForm + can_process_incoming = True + can_broadcast = True + can_notification = True + direct_message_max_length = 450 # 500 is lenght, draw down some to handle username + + @classmethod + def validate_baseurl(self, baseurl): + if not OAuthApplication.objects.filter(name='mastodon', baseurl=baseurl).exists(): + return 'Global OAuth credentials for {} missing'.format(baseurl) + + @property + def max_post_length(self): + return 500 + + def __init__(self, providerid, config): + self.providerid = providerid + self.providerconfig = config + + self.sess = requests.Session() + self.sess.headers.update({ + 'Authorization': 'Bearer {}'.format(self.providerconfig['token']), + }) + + def _api_url(self, url): + return '{}{}'.format(self.providerconfig['baseurl'], url) + + def _get(self, url, *args, **kwargs): + return self.sess.get(self._api_url(url), *args, **kwargs) + + def _post(self, url, *args, **kwargs): + return self.sess.post(self._api_url(url), *args, **kwargs) + + def get_account_info(self): + r = self._get('/api/v1/accounts/verify_credentials') + r.raise_for_status() + j = r.json() + return { + 'username': j['username'], + } + + def post(self, toot, image=None, replytotweetid=None): + d = { + 'status': toot, + 'visibility': 'public', + } + if replytotweetid: + d['in_reply_to_id'] = replytotweetid + + if image: + r = self._post('/api/v1/media', files={ + 'file': bytearray(image), + }) + if r.status_code != 200: + return (False, 'Media upload: {}'.format(r.text)) + d['media_ids'] = [int(r.json()['id']), ] + + r = self._post('/api/v1/statuses', json=d) + if r.status_code != 200: + return (None, r.text) + + return (r.json()['id'], None) + + def repost(self, postid): + r = self._post('/api/v1/statuses/{}/reblog'.format(postid)) + if r.status_code != 200: + return (None, r.text) + return (True, None) + + def send_direct_message(self, recipient_config, msg): + d = { + 'status': '@{} {}'.format(recipient_config['username'], msg), + 'visibility': 'direct', + } + + r = self._post('/api/v1/statuses', json=d) + r.raise_for_status() + + def poll_public_posts(self, lastpoll, checkpoint): + p = { + 'limit': 200, # If it's this many, we should give up + 'exclude_types': ['follow', 'favourite', 'reblog', 'poll'], + } + if checkpoint: + p['since_id'] = checkpoint + + r = self._get('/api/v1/notifications', params=p) + r.raise_for_status() + + for n in r.json(): + s = n['status'] + d = { + 'id': int(s['id']), + 'datetime': dateutil.parser.parse(s['created_at']), + 'text': strip_tags(s['content']), + 'replytoid': s['in_reply_to_id'] and int(s['in_reply_to_id']) or None, + 'author': { + 'name': s['account']['display_name'] or s['account']['username'], + 'username': s['account']['username'], + 'id': s['account']['id'], + 'imageurl': s['account']['avatar_static'], + }, + 'media': [m['url'] for m in s['media_attachments']], + } + # (mastodon doesn't have quoted status, so just leave that one non-existing) + yield d + + def poll_incoming_private_messages(self, lastpoll, checkpoint): + p = { + 'limit': 40, + } + if checkpoint: + p['since_id'] = checkpoint + + r = self._get('/api/v1/conversations', params=p) + r.raise_for_status() + + j = r.json() + for c in j: + if len(c['accounts']) > 1: + # Can't handle group messages + continue + ls = c['last_status'] + self.process_incoming_dm(ls) + + if len(j): + # For some reason, it paginates by last_status->id, and not by id. Go figure. + return timezone.now(), max((c['last_status']['id'] for c in j)) + else: + return timezone.now(), checkpoint + + def process_incoming_dm(self, msg): + for m in re_token.findall(msg['content']): + try: + reg = ConferenceRegistration.objects.get(regtoken=m) + + reg.messaging_config = { + 'username': msg['account']['username'], + } + reg.save(update_fields=['messaging_config']) + + send_reg_direct_message(reg, 'Hello! This account is now configured to receive notifications for {}'.format(reg.conference)) + except ConferenceRegistration.DoesNotExist: + pass + + def get_public_url(self, post): + return '{}@{}/{}'.format(self.providerconfig['baseurl'], post.author_screenname, post.statusid) + + def get_attendee_string(self, token, messaging, attendeeconfig): + if 'username' in attendeeconfig: + return "Your notifications will be sent to @{}.".format(attendeeconfig['username']), None + else: + return 'mastodon_invite.html', { + 'mastodonname': self.providerconfig['username'], + 'token': token, + } + + def check_messaging_config(self, state): + return True, '' diff --git a/postgresqleu/util/messaging/sender.py b/postgresqleu/util/messaging/sender.py new file mode 100644 index 00000000..42dde1a3 --- /dev/null +++ b/postgresqleu/util/messaging/sender.py @@ -0,0 +1,111 @@ +# Message sending functionality lives in a separate module so it +# can easily be used both from a scheduled job and from a daemone +# when available. +from django.utils import timezone +from django.db import transaction + +from datetime import timedelta +import time + +from postgresqleu.confreg.models import NotificationQueue +from postgresqleu.confreg.models import ConferenceTweetQueue, ConferenceIncomingTweet +from postgresqleu.util.messaging.util import truncate_shortened_post + + +def send_pending_messages(providers): + err = False + + # First delete any expired messages + NotificationQueue.objects.filter(expires__lte=timezone.now()).delete() + + # Then one by one send off the ones that we have in the queue + first = True + while True: + with transaction.atomic(): + msglist = list(NotificationQueue.objects. + select_for_update(of=('self',)). + select_related('reg', 'messaging', 'messaging__provider'). + only('msg', 'channel', 'reg__messaging_config', + 'messaging__config', 'reg__messaging__provider', + ).filter(time__lte=timezone.now()). + order_by('time', 'id')[:1]) + if len(msglist) == 0: + break + n = msglist[0] + + # Actually Send Message (TM) + impl = providers.get(n.messaging.provider) + try: + if n.reg: + impl.send_direct_message(n.reg.messaging_config, n.msg) + else: + impl.post_channel_message(n.messaging.config, n.channel, n.msg) + except Exception as e: + print("Failed to send notification to {} using {}: {}. Will retry until {}.".format( + n.reg and n.reg or n.channel, + n.messaging.provider.internalname, + e, n.expires + )) + err = True + + # Retry in 5 minutes + n.time += timedelta(minutes=5) + n.save() + else: + # Successfully posted, so delete it + n.delete() + + # Rate limit us to one per second, that should usually be enough + if first: + first = False + else: + time.sleep(1) + + return not err + + +def send_pending_posts(providers): + err = False + + numposts = 0 + for t in ConferenceTweetQueue.objects.prefetch_related('remainingtosend').filter(approved=True, sent=False, datetime__lte=timezone.now()).order_by('datetime'): + sentany = False + for p in t.remainingtosend.all(): + impl = providers.get(p) + (id, err) = impl.post( + truncate_shortened_post(t.contents, t.max_post_length), + t.image, + t.replytotweetid, + ) + + if id: + t.remainingtosend.remove(p) + # postids is a map of <provider status id> -> <provider id>. It's mapped + # "backwards" this way because the main check we do is if a key exists. + t.postids[id] = p.id + sentany = True + else: + sys.stderr.write("Failed to post to {}: {}".format(p, err)) + err = True + if sentany: + numposts += 1 + if not t.remainingtosend.exists(): + t.sent = True + t.save(update_fields=['postids', 'sent']) + + numreposts = 0 + for t in ConferenceIncomingTweet.objects.select_related('provider').filter(retweetstate=1): + # These are tweets that should be retweeted. Retweets only happen on the same + # provider that they were posted on. + impl = providers.get(t.provider) + + ok, msg = impl.repost(t.statusid) + if ok: + t.retweetstate = 2 + t.save(update_fields=['retweetstate']) + numreposts += 1 + else: + self.stderr.write("Failed to repost on {}: {}\n".format(t.provider, msg)) + err = True + + return err, numposts, numreposts diff --git a/postgresqleu/util/messaging/telegram.py b/postgresqleu/util/messaging/telegram.py new file mode 100644 index 00000000..406c1fb4 --- /dev/null +++ b/postgresqleu/util/messaging/telegram.py @@ -0,0 +1,340 @@ +from django import forms +from django.http import HttpResponse +from django.db import models +from django.utils.functional import cached_property +from django.utils import timezone +from django.contrib import messages +from django.conf import settings +import django.utils.timezone + +import json +import requests +from datetime import datetime + +from postgresqleu.util.random import generate_random_token +from postgresqleu.util.forms import SubmitButtonField +from postgresqleu.util.widgets import StaticTextWidget +from postgresqleu.util.messaging import re_token + +from postgresqleu.confreg.backendforms import BackendSeriesMessagingForm +from postgresqleu.confreg.models import ConferenceMessaging, ConferenceRegistration, IncomingDirectMessage + +from .util import send_reg_direct_message, send_channel_message + + +class TelegramBackendForm(BackendSeriesMessagingForm): + telegramtoken = forms.CharField(required=True, label="Telegram token", + widget=forms.widgets.PasswordInput(render_value=True, attrs={'autocomplete': 'new-password'}), + help_text='Create a new bot in the Telegram Botfather, and copy/paste the access token here') + telegramstatus = forms.CharField(widget=StaticTextWidget, label="Telegram information", required=False) + webhookenabler = SubmitButtonField(label="Enable webhook") + + exclude_fields_from_validation = ['telegramstatus', 'webhookenabler', ] + + @property + def config_fields(self): + f = ['telegramtoken', 'webhookenabler'] + if self.instance.config.get('telegramtoken', None): + f.extend(['telegramstatus']) + return f + + @property + def config_fieldsets(self): + return [ + {'id': 'telegram', 'legend': 'Telegram', 'fields': self.config_fields}, + ] + + def fix_fields(self): + super().fix_fields() + if self.instance.config.get('telegramtoken', None): + # Existing token, make a call to telegram to see what's up, if necessary + try: + if 'botname' not in self.instance.config: + tgm = Telegram(self.instance.id, self.instance.config) + botinfo = tgm.get('getMe') + self.instance.config.update({ + 'botname': botinfo['username'], + 'botfullname': botinfo['first_name'], + }) + self.instance.save(update_fields=['config']) + self.initial['telegramstatus'] = 'Telegram bot username {}, full name "{}", connected.'.format( + self.instance.config['botname'], + self.instance.config['botfullname'], + ) + except Exception as e: + self.initial['telegramstatus'] = 'Failed to get Telegram info: {}'.format(e) + + # Is the webhook enabled? + if 'webhook' in self.instance.config: + # It is! + self.fields['webhookenabler'].label = 'Disable webhook' + self.fields['webhookenabler'].widget.label = 'Disable webhook' + self.fields['webhookenabler'].callback = self.disable_webhook + else: + self.fields['webhookenabler'].callback = self.enable_webhook + else: + self.remove_field('telegramstatus') + self.remove_field('webhookenabler') + + def enable_webhook(self, request): + token = generate_random_token() + + Telegram(self.instance.id, self.instance.config).post('setWebhook', { + 'url': '{}/wh/{}/{}/'.format(settings.SITEBASE, self.instance.id, token), + 'max_connections': 2, + 'allowed_updates': ['channel_post', 'message'], + }) + self.instance.config['webhook'] = { + 'token': token, + } + self.instance.save(update_fields=['config']) + + messages.info(request, "Webhook has been enabled!") + return True + + def disable_webhook(self, request): + Telegram(self.instance.id, self.instance.config).post('deleteWebhook') + + del self.instance.config['webhook'] + self.instance.save(update_fields=['config']) + + messages.info(request, "Webhook has been disabled!") + return True + + +class Telegram(object): + provider_form_class = TelegramBackendForm + can_privatebcast = True + can_notification = True + can_orgnotification = True + direct_message_max_length = None + + @classmethod + def validate_baseurl(self, baseurl): + return None + + def __init__(self, id, config): + self.providerid = id + self.providerconfig = config + + def refresh_messaging_config(self, config): + mod = False + + if 'channeltoken' not in config: + config['channeltoken'] = {} + if 'tokenchannel' not in config: + config['tokenchannel'] = {} + + for channel in ['privatebcast', 'orgnotification']: + if channel not in config['channeltoken']: + # Create a token! + t = generate_random_token() + config['channeltoken'][channel] = t + config['tokenchannel'][t] = channel + mod = True + return mod + + def get_channel_field(self, instance, fieldname): + commontxt = "<br/><br/>To set or edit this, please invite the bot @{} to the selected channel as an administrator, and after that's done, plaste the token {} in the channel to associated. Then wait a bit and refresh this page (you will also be notified in the channel).".format(self.providerconfig['botname'], instance.config['channeltoken'][fieldname]) + if 'channels' in instance.config and fieldname in instance.config['channels']: + txt = 'Configured to talk in channel with id {} and title "{}".'.format( + instance.config['channels'][fieldname]['id'], + instance.config['channels'][fieldname]['title'], + ) + return SubmitButtonField(label='Disable channel', prefixparagraph=txt + commontxt, callback=self.disable_channel(instance, fieldname)) + else: + txt = '<strong>Not currently attached to a channel!</strong>' + return forms.CharField(widget=StaticTextWidget, initial=txt + commontxt) + + def disable_channel(self, instance, channelname): + def _disable_channel(request): + del instance.config['channels'][channelname] + instance.save(update_fields=['config', ]) + return _disable_channel + + def get(self, method, params={}): + r = requests.get( + 'https://api.telegram.org/bot{}/{}'.format(self.providerconfig['telegramtoken'], method), + params=params, + timeout=10 + ) + r.raise_for_status() + j = r.json() + if not j['ok']: + raise Exception("OK was {}".format(j['ok'])) + return j['result'] + + def post(self, method, params={}, ignoreerrors=False): + r = requests.post( + 'https://api.telegram.org/bot{}/{}'.format(self.providerconfig['telegramtoken'], method), + data=params, + timeout=10 + ) + if ignoreerrors: + return None + + r.raise_for_status() + j = r.json() + if not j['ok']: + raise Exception("OK was {}".format(j['ok'])) + return j['result'] + + def send_direct_message(self, recipient_config, msg): + self.post('sendMessage', { + 'chat_id': recipient_config['userid'], + 'text': msg, + }) + + def post_channel_message(self, messagingconfig, channelname, msg): + self.post('sendMessage', { + 'chat_id': messagingconfig['channels'][channelname]['id'], + 'text': msg, + }) + + def poll_incoming_private_messages(self, lastpoll, checkpoint): + # If we are configured with a webhook, telegram will return an error if + # we try to get the data this way as well, so don't try that. + if 'webhook' in self.providerconfig: + return lastpoll, checkpoint + + # We'll get up to 100 updates per run, which is the default + res = self.get('getUpdates', { + 'offset': checkpoint + 1, + 'allowed_updates': ['channel_post', 'message'], + }) + + # For now we don't store telegram input, we just do automated processing to figure + # out if we're connected to something. + for u in res: + if 'channel_post' in u: + self.process_channel_post(u['channel_post']) + elif 'message' in u: + self.process_incoming_chat_structure(u) + + if res: + return timezone.now(), max((u['update_id'] for u in res)) + else: + return timezone.now(), checkpoint + + def process_webhook(self, request): + try: + j = json.loads(request.body.decode('utf8', errors='ignore')) + if 'channel_post' in j: + self.process_channel_post(j['channel_post']) + elif 'message' in j: + self.process_incoming_chat_structure(j) + # All other types we just ignore for now + return HttpResponse("OK") + except Exception as e: + return HttpResponse("Internal error", status=500) + + def process_channel_post(self, p): + # Does it look like a token? If so, try to attach! + for m in re_token.findall(p['text']): + # Found a match. + # Now try to find if this is an actual token, and assign the channel + # as required. + try: + r = ConferenceMessaging.objects.get( + provider_id=self.providerid, + config__tokenchannel__has_key=m + ) + chan = r.config['tokenchannel'][m] + hadprevchannel = 'channels' in r.config and chan in r.config['channels'] + if 'channels' not in r.config: + r.config['channels'] = {} + + # Export a channel invite link, so that we have one + self.post('exportChatInviteLink', {'chat_id': p['chat']['id']}, ignoreerrors=True) + chatobj = self.get('getChat', {'chat_id': p['chat']['id']}) + r.config['channels'][chan] = { + 'id': p['chat']['id'], + 'title': p['chat']['title'], + 'invitelink': chatobj.get('invite_link', None), + } + r.save(update_fields=['config']) + try: + # Ignore if this fails, probably permissions + self.post('deleteMessage', { + 'chat_id': p['chat']['id'], + 'message_id': p['message_id'] + }) + except Exception as e: + pass + + # Send a reply, and this should not fail + send_channel_message(r, chan, + 'Thank you, this channel has now been associated with {} channel {}'.format( + r.conference.conferencename, + chan + )) + if hadprevchannel: + send_channel_message(r, chan, 'The previously existing channel association has been removed.') + + except ConferenceMessaging.DoesNotExist: + # Just ignore it, since it wasn't an active token. + pass + + def process_incoming_chat_structure(self, u): + msgid = int(u['update_id']) + if IncomingDirectMessage.objects.filter(provider_id=self.providerid, postid=msgid).exists(): + # We've already seen this one + return + + msg = IncomingDirectMessage( + provider_id=self.providerid, + postid=msgid, + time=datetime.fromtimestamp(int(u['message']['date']), tz=django.utils.timezone.utc), + sender={ + 'username': u['message']['from']['username'], + 'userid': u['message']['from']['id'], + }, + txt=u['message']['text'], + ) + self.process_incoming_chat_message(msg) + msg.save() + + def process_incoming_chat_message(self, msg): + # Does it look like a token? If so, try to attach! + for m in re_token.findall(msg.txt): + try: + reg = ConferenceRegistration.objects.get(regtoken=m) + # Matched reg, so set it up + reg.messaging_config = msg.sender + reg.save(update_fields=['messaging_config']) + + send_reg_direct_message(reg, 'Hello! This account is now configured to receive notifications for {}'.format(reg.conference)) + + msg.internallyprocessed = True + return + except ConferenceRegistration.DoesNotExist: + pass + + def get_attendee_string(self, token, messaging, attendeeconfig): + if 'userid' in attendeeconfig: + return 'telegram_ready.html', { + 'username': attendeeconfig['username'], + 'invitelink': messaging.config.get('channels', {}).get('privatebcast', {}).get('invitelink', None), + } + else: + return 'telegram_invite.html', { + 'botname': self.providerconfig['botname'], + 'token': token, + } + + def check_messaging_config(self, state): + if 'webhook' in self.providerconfig: + token = self.providerconfig['webhook']['token'] + + webhookurl = '{}/wh/{}/{}/'.format(settings.SITEBASE, self.providerid, token) + + whi = self.get('getWebhookInfo') + if whi['url'] != webhookurl: + self.post('setWebhook', { + 'url': webhookurl, + 'max_connections': 2, + 'allowed_updates': ['channel_post', 'message'], + }) + return True, 'Webhook resubscribed' + + return True, '' diff --git a/postgresqleu/util/messaging/twitter.py b/postgresqleu/util/messaging/twitter.py index 4edb5027..d6a6f2cb 100644 --- a/postgresqleu/util/messaging/twitter.py +++ b/postgresqleu/util/messaging/twitter.py @@ -1,31 +1,220 @@ +from django.core.validators import ValidationError +from django.http import HttpResponse +from django import forms from django.conf import settings +from django.contrib import messages +from django.db import transaction +import django.utils.timezone import requests_oauthlib +from datetime import datetime +import dateutil.parser +import hmac +import hashlib +import json +import base64 + +from postgresqleu.util.widgets import StaticTextWidget +from postgresqleu.util.forms import SubmitButtonField +from postgresqleu.util.forms import LinkForCodeField +from postgresqleu.util.oauthapps import get_oauth_client, get_oauth_secret, has_oauth_data +from postgresqleu.util.messaging import re_token, get_messaging +from postgresqleu.util.messaging.util import send_reg_direct_message, store_incoming_post + +from postgresqleu.confreg.models import ConferenceRegistration, MessagingProvider, IncomingDirectMessage +from postgresqleu.confreg.backendforms import BackendSeriesMessagingForm _cached_twitter_users = {} -class Twitter(object): - def __init__(self, conference=None): - if conference: - token = conference.twitter_token - secret = conference.twitter_secret +class TwitterBackendForm(BackendSeriesMessagingForm): + initialconfig = LinkForCodeField(label='Get PIN code') + twitterinfo = forms.CharField(widget=StaticTextWidget, label="Account information", required=False) + webhookenabler = SubmitButtonField(label="Enable webhook") + + def fix_fields(self): + super().fix_fields() + + if self.instance.config.get('token', None): + del self.fields['initialconfig'] + self.config_fields = ['twitterinfo', 'webhookenabler'] + if 'webhook' in self.instance.config: + self.fields['webhookenabler'].label = 'Disable webhook' + self.fields['webhookenabler'].widget.label = 'Disable webhook' + self.fields['webhookenabler'].callback = self.disable_webhook + else: + self.fields['webhookenabler'].callback = self.enable_webhook + self.config_fieldsets = [ + {'id': 'twitter', 'legend': 'Twitter', 'fields': self.config_fields}, + ] + self.config_readonly_fields = ['twitterinfo', ] + + if 'screen_name' not in self.instance.config: + try: + ai = Twitter(self.instance.id, self.instance.config).get_account_info() + self.instance.config.update(ai) + self.instance.save(update_fields=['config']) + except Exception as e: + self.initial['twitterinfo'] = "Unable to fetch twitter account info: {}".format(e) + return + + self.initial.update({ + 'twitterinfo': "Connected to twitter account @{}.".format(self.instance.config.get('screen_name', '*unknown*')), + }) else: - token = settings.TWITTER_NEWS_TOKEN - secret = settings.TWITTER_NEWS_TOKENSECRET + # Not configured yet, so prepare for it! + del self.fields['twitterinfo'] + del self.fields['webhookenabler'] + self.config_fields = ['initialconfig', ] + self.config_fieldsets = [ + {'id': 'twitter', 'legend': 'Twitter', 'fields': ['initialconfig', ]}, + ] + self.nosave_fields = ['initialconfig', ] + + # Ugly power-grab here, but let's see what's in our POST + if self.request.POST.get('initialconfig', None): + # Token is included, so don't try to get a new one + self.fields['initialconfig'].widget.authurl = self.request.session['authurl'] + else: + # Prepare the twitter setup + try: + (auth_url, ownerkey, ownersecret) = TwitterSetup.get_authorization_data() + self.request.session['ownerkey'] = ownerkey + self.request.session['ownersecret'] = ownersecret + self.request.session['authurl'] = auth_url + + self.fields['initialconfig'].widget.authurl = auth_url + except Exception as e: + messages.error(self.request, 'Failed to initialize setup with Twitter: {}'.format(e)) + del self.fields['initialconfig'] + self.config_fields = [] + self.config_fieldsets = [] + + def clean(self): + d = super().clean() + if d.get('initialconfig', None): + # We have received an initial config, so try to attach ourselves to twitter. + try: + tokens = TwitterSetup.authorize(self.request.session['ownerkey'], + self.request.session['ownersecret'], + d.get('initialconfig'), + ) + self.instance.config['token'] = tokens.get('oauth_token') + self.instance.config['secret'] = tokens.get('oauth_token_secret') + del self.request.session['authurl'] + del self.request.session['ownerkey'] + del self.request.session['ownersecret'] + self.request.session.modified = True + + ai = Twitter(self.instance.id, self.instance.config).get_account_info() + if MessagingProvider.objects.filter( + classname='postgresqleu.util.messaging.twitter.Twitter', + config__accountid=ai['accountid'], + series__isnull=True, + ).exclude(pk=self.instance.pk).exists(): + raise Exception("Another messaging provider is already configured for this Twitter account") + self.instance.config.update(ai) + except Exception as e: + self.add_error('initialconfig', 'Could not set up Twitter: {}'.format(e)) + self.add_error('initialconfig', 'You probably have to restart the process') + elif 'twitterinfo' not in d: + raise ValidationError("Twitter information is incomplete") + return d + + # Webhooks are global to our client, but we have to turn on and off the individual + # subscriptions to the webhooks. If no webhook is configured, we will automtaically + # set it up when the first subscription should be added. + def enable_webhook(self, request): + t = Twitter(self.instance.id, self.instance.config) + try: + res, env, msg = t.check_global_webhook() + if not res: + messages.error(request, msg) + return + + if msg: + messages.info(request, msg) + + # Now subscribe to this webhook + r = t.tw.post('https://api.twitter.com/1.1/account_activity/all/{}/subscriptions.json'.format(env), params={ + }) + if r.status_code != 204: + r.raise_for_status() + messages.error(request, "Error registering subscription, status code {}".format(r.status_code)) + return + + # Else we are registered and have a subscription! + self.instance.config['webhook'] = { + 'ok': 1, + } + self.instance.save(update_fields=['config']) + messages.info(request, "Subscribed to webhook") + except Exception as e: + messages.error(request, "Error registering twitter webhook/subscription: {}".format(e)) - self.tw = requests_oauthlib.OAuth1Session(settings.TWITTER_CLIENT, - settings.TWITTER_CLIENTSECRET, - token, - secret) + def disable_webhook(self, request): + t = Twitter(self.instance.id, self.instance.config) + try: + r = t.tw.delete('https://api.twitter.com/1.1/account_activity/all/pgeu/subscriptions.json') + if r.status_code != 204: + jj = r.json() + if 'errors' not in jj: + messages.error(request, "Error removing subscription, status {}".format(r.status_ocode)) + return + # Code 34 means this subscription didn't exist in the first place, so treat it + # as if we removed it. + if jj['errors'][0]['code'] != 34: + messages.error(request, "Error removing subscription: {}".format(jj['errors'][0]['message'])) + return + del self.instance.config['webhook'] + self.instance.save(update_fields=['config']) + messages.info(request, "Webhook has been disabled!") + except Exception as e: + messages.error(request, "Error removing subscription: {}".format(e)) + + +class Twitter(object): + provider_form_class = TwitterBackendForm + can_process_incoming = True + can_broadcast = True + can_notification = True + direct_message_max_length = None + + @classmethod + def validate_baseurl(self, baseurl): + return None - def get_own_screen_name(self): + @property + def max_post_length(self): + return 280 + + def __init__(self, id, config): + self.providerid = id + self.providerconfig = config + self._tw = None + + @property + def tw(self): + if not self._tw and 'token' in self.providerconfig: + self._tw = requests_oauthlib.OAuth1Session( + get_oauth_client('https://api.twitter.com'), + get_oauth_secret('https://api.twitter.com'), + self.providerconfig['token'], + self.providerconfig['secret'], + ) + return self._tw + + def get_account_info(self): r = self.tw.get('https://api.twitter.com/1.1/account/verify_credentials.json?include_entities=false&skip_status=true&include_email=false') if r.status_code != 200: raise Exception("http status {}".format(r.status_code)) - return r.json()['screen_name'] + j = r.json() + return { + 'screen_name': j['screen_name'], + 'accountid': j['id'], + } - def post_tweet(self, tweet, image=None, replytotweetid=None): + def post(self, tweet, image=None, replytotweetid=None): d = { 'status': tweet, } @@ -47,66 +236,269 @@ class Twitter(object): return (None, r.text) return (r.json()['id'], None) - def retweet(self, tweetid): + def repost(self, tweetid): r = self.tw.post('https://api.twitter.com/1.1/statuses/retweet/{0}.json'.format(tweetid)) if r.status_code != 200: + # If the error is "you have already retweeted this", we just ignore it + try: + if r.json()['errors'][0]['code'] == 327: + return (True, None) + except Exception: + pass return (None, r.text) return (True, None) - def send_message(self, tousername, msg): - # Nor the username - tousername = tousername.lower().replace('@', '') + def send_direct_message(self, recipient_config, msg): + r = self.tw.post('https://api.twitter.com/1.1/direct_messages/events/new.json', json={ + 'event': { + 'type': 'message_create', + 'message_create': { + 'target': { + 'recipient_id': recipient_config['twitterid'], + }, + 'message_data': { + 'text': msg, + } + } + } + }) - # DM API calls require us to look up the userid, so do that with a - # tiny cache first. - if tousername not in _cached_twitter_users: + if r.status_code != 200: try: - r = self.tw.get('https://api.twitter.com/1.1/users/show.json', - params={'screen_name': tousername}) - _cached_twitter_users[tousername] = r.json()['id'] + # Normally these errors come back as json, so try to return that + ej = r.json()['errors'][0] + raise Exception('{}: {}'.format(ej['code'], ej['message'])) except Exception as e: - return (False, None, "Failed to look up user %s: %s" % (tousername, e)) + r.raise_for_status() - try: - r = self.tw.post('https://api.twitter.com/1.1/direct_messages/events/new.json', json={ - 'event': { - 'type': 'message_create', - 'message_create': { - 'target': { - 'recipient_id': _cached_twitter_users[tousername], - }, - 'message_data': { - 'text': msg, - } - } + def poll_public_posts(self, lastpoll, checkpoint): + if checkpoint: + sincestr = "&since_id={}".format(checkpoint) + else: + sincestr = "" + r = self.tw.get('https://api.twitter.com/1.1/statuses/mentions_timeline.json?tweet_mode=extended{}'.format(sincestr)) + r.raise_for_status() + for tj in r.json(): + yield self._parse_tweet_struct(tj) + + def _parse_tweet_struct(self, tj): + d = { + 'id': int(tj['id']), + 'datetime': dateutil.parser.parse(tj['created_at']), + 'text': tj.get('full_text', tj.get('text')), + 'replytoid': tj['in_reply_to_status_id'] and int(tj['in_reply_to_status_id']) or None, + 'author': { + 'name': tj['user']['name'], + 'username': tj['user']['screen_name'], + 'id': int(tj['user']['id']), + 'imageurl': tj['user']['profile_image_url_https'], + }, + 'media': [m['media_url_https'] for m in tj['entities'].get('media', [])], + } + if tj['is_quote_status'] and 'quoted_status_id' in tj: + d['quoted'] = { + 'id': int(tj['quoted_status_id']), + 'text': tj['quoted_status']['full_text'], + 'permalink': tj['quoted_status_permalink'], + } + return d + + # This is delivered by the webhook if it's enabled + def poll_incoming_private_messages(self, lastpoll, checkpoint): + # Ugh. Seems twitter always delivers the last 30 days. So we need to do some manual + # checking and possibly page backwards. At least it seems they are coming back in + # reverse chronological order (which is not documented) + highdt = lastpoll + cursor = None + while True: + p = { + 'count': 50, + } + if cursor: + p['cursor'] = cursor + + r = self.tw.get('https://api.twitter.com/1.1/direct_messages/events/list.json', params=p) + r.raise_for_status() + + j = r.json() + + for e in j['events']: + dt = datetime.fromtimestamp(int(e['created_timestamp']) / 1000, tz=django.utils.timezone.utc) + if dt < lastpoll: + break + highdt = max(dt, highdt) + if e['type'] == 'message_create': + self.process_incoming_message_create_struct(e['id'], dt, e['message_create']) + + # Consumed all entries. Do we have a cursor for the next one + if 'next_cursor' in j: + cursor = j['next_cursor'] + continue + else: + # No cursor, and we've consumed all, so we're done + break + return highdt, 0 + + def process_incoming_message_create_struct(self, idstr, dt, m): + if int(m['sender_id']) != int(self.providerconfig['accountid']): + # We ignore messages from ourselves + msgid = int(idstr) + if IncomingDirectMessage.objects.filter(provider_id=self.providerid, postid=msgid).exists(): + # We've already seen this one + return + + dm = IncomingDirectMessage( + provider_id=self.providerid, + postid=msgid, + time=dt, + sender={ + 'id': int(m['sender_id']), + }, + txt=m['message_data']['text'], + ) + self.process_incoming_dm(dm) + dm.save() + + _screen_names = {} + + def get_user_screen_name(self, uid): + if uid not in self._screen_names: + r = self.tw.get('https://api.twitter.com/1.1/users/show.json', params={'user_id': uid}) + r.raise_for_status() + self._screen_names[uid] = r.json()['screen_name'] + return self._screen_names[uid] + + def process_incoming_dm(self, dm): + for m in re_token.findall(dm.txt): + try: + reg = ConferenceRegistration.objects.get(regtoken=m) + # We get the screen_name so we can be friendly to the user! And when we've + # done that, we might as well cache it in the message info. + + dm.sender['name'] = self.get_user_screen_name(dm.sender['id']) + reg.messaging_config = { + 'twitterid': dm.sender['id'], + 'screen_name': dm.sender['name'], } - }) - if r.status_code != 200: - try: - # Normally these errors come back as json - ej = r.json()['errors'][0] - return (False, ej['code'], ej['message']) - except Exception as e: - return (False, None, r.text) - except Exception as e: - return (False, None, e) - return (True, None, None) - def get_timeline(self, tlname, since=None): - if since: - sincestr = "&since={}".format(since) + reg.save(update_fields=['messaging_config']) + + send_reg_direct_message(reg, 'Hello! This account is now configured to receive notifications for {}'.format(reg.conference)) + + dm.internallyprocessed = True + self.send_read_receipt(dm.sender['id'], dm.postid) + return + except ConferenceRegistration.DoesNotExist: + pass + + def process_incoming_tweet_create_event(self, mp, tce): + d = self._parse_tweet_struct(tce) + store_incoming_post(mp, d) + + def get_public_url(self, post): + return 'https://twitter.com/{}/status/{}'.format(post.author_screenname, post.statusid) + + def get_attendee_string(self, token, messaging, attendeeconfig): + if 'screen_name' in attendeeconfig: + return "Your notifications will be sent to @{}.".format(attendeeconfig['screen_name']), None else: - sincestr = "" - r = self.tw.get('https://api.twitter.com/1.1/statuses/{}_timeline.json?tweet_mode=extended{}'.format(tlname, sincestr)) - if r.status_code != 200: - return None - return r.json() + return 'twitter_invite.html', { + 'twittername': self.providerconfig['screen_name'], + 'twitterid': self.providerconfig['accountid'], + 'token': token, + } + + def send_read_receipt(self, recipient, maxval): + r = self.tw.post('https://api.twitter.com/1.1/direct_messages/mark_read.json', params={ + 'last_read_event_id': maxval, + 'recipient_id': recipient, + }) + # Ignore errors + + def check_messaging_config(self, state): + # Check that we can get our own account info + try: + self.get_account_info() + except Exception as e: + return False, 'Could not get own account information: {}'.format(e) + + # If we have a webhook configured, make sure it's still live + if 'webhook' in self.providerconfig: + retmsg = '' + if 'global_webhook_checked' not in state: + res, env, msg = self.check_global_webhook() + if not res: + # If we failed to check the global webhook, it's now time to give up + return False, msg + state['env'] = env + state['global_webhook_checked'] = True + if msg: + retmsg += msg + "\n" + + # Global webhook has been abled by this or previous run. Now check our subscription. + r = self.tw.get('https://api.twitter.com/1.1/account_activity/all/{}/subscriptions.json'.format(env)) + if r.status_code == 204: + # We are subscribed! + return True, retmsg + + # Attempt to re-subscribe + r = self.tw.post('https://api.twitter.com/1.1/account_activity/all/{}/subscriptions.json'.format(env), params={}) + if r.status_code == 204: + return True, retmsg + 'Resubscribed user to webhook.' + + return False, retmsg + 'Unable to resubscribe to webhook: {}'.format(r.status_code) + + # 204 means we were resubecribed + return True, retmsg + + # Webhook not configured, so everything is always good + return True, '' + + def check_global_webhook(self): + # Check if the global webhook is here, and enabled! + r = self.tw.get('https://api.twitter.com/1.1/account_activity/all/webhooks.json') + r.raise_for_status() + j = r.json() + + if len(j['environments']) == 0: + return False, None, "No environments found to enable webhook" + elif len(j['environments']) > 1: + return False, None, "More than one environment found to enable webhook, not supported" + env = j['environments'][0]['environment_name'] + + webhookurl = "{}/wh/twitter/".format(settings.SITEBASE) + + for wh in j['environments'][0]['webhooks']: + if wh['url'] == webhookurl: + # Webhook is already configured! Is it valid? + if not wh['valid']: + # Re-enable the webhook + r = self.tw.put('https://api.twitter.com/1.1/account_activity/all/{}/webhooks/{}.json'.format( + env, + wh['id'], + )) + if r.status_code != 204: + return False, None, "Webhook marked invalid, and was unable to re-enable!" + else: + return True, env, "Webhook was marked invalid. Has now been re-enabled." + + return True, env, "" + + # No matching webhook for us, so we go create it + r = self.tw.post('https://api.twitter.com/1.1/account_activity/all/{}/webhooks.json'.format(env), params={ + 'url': webhookurl, + }) + jj = r.json() + if 'errors' in jj: + return False, None, "Error registering twitter webhook: {}".format(jj['errors'][0]['message']) + r.raise_for_status() + return True, env, "Global webhook has been registered" class TwitterSetup(object): @classmethod def get_authorization_data(self): - oauth = requests_oauthlib.OAuth1Session(settings.TWITTER_CLIENT, settings.TWITTER_CLIENTSECRET) + oauth = requests_oauthlib.OAuth1Session(get_oauth_client('https://api.twitter.com'), get_oauth_secret('https://api.twitter.com')) fetch_response = oauth.fetch_request_token('https://api.twitter.com/oauth/request_token') auth_url = oauth.authorization_url('https://api.twitter.com/oauth/authorize') @@ -117,11 +509,80 @@ class TwitterSetup(object): @classmethod def authorize(self, ownerkey, ownersecret, pincode): - oauth = requests_oauthlib.OAuth1Session(settings.TWITTER_CLIENT, - settings.TWITTER_CLIENTSECRET, + oauth = requests_oauthlib.OAuth1Session(get_oauth_client('https://api.twitter.com'), + get_oauth_secret('https://api.twitter.com'), resource_owner_key=ownerkey, resource_owner_secret=ownersecret, verifier=pincode) tokens = oauth.fetch_access_token('https://api.twitter.com/oauth/access_token') return tokens + + +# Twitter needs a special webhook URL since it's global and not per provider +def process_twitter_webhook(request): + if 'crc_token' in request.GET: + # This is a pingback from twitter to see if we are alive + d = hmac.new( + bytes(get_oauth_secret('https://api.twitter.com'), 'utf-8'), + msg=bytes(request.GET['crc_token'], 'utf-8'), + digestmod=hashlib.sha256 + ).digest() + return HttpResponse(json.dumps({ + 'response_token': 'sha256={}'.format(base64.b64encode(d).decode('utf8')), + }), content_type='application/json') + + # Validate the signature + if 'HTTP_X_TWITTER_WEBHOOKS_SIGNATURE' not in request.META: + print("Twitter webhooks signature missing") + return HttpResponse('Webhooks signature missing', status=400) + + if not request.META['HTTP_X_TWITTER_WEBHOOKS_SIGNATURE'].startswith('sha256='): + print("Invalid signature, not starting with sha256=: {}".format(request.META['HTTP_X_TWITTER_WEBHOOKS_SIGNATURE'])) + return HttpResponse('Webhooks signature starts incorrectly', status=400) + + sig = base64.b64decode(request.META['HTTP_X_TWITTER_WEBHOOKS_SIGNATURE'][7:]) + d = hmac.new( + bytes(get_oauth_secret('https://api.twitter.com'), 'utf-8'), + msg=request.body, + digestmod=hashlib.sha256, + ).digest() + if not hmac.compare_digest(sig, d): + return HttpResponse('Webhooks signature is wrong', status=400) + + # Load and parse the hook message + j = json.loads(request.body.decode('utf-8')) + + _cached_messaging = {} + + def _get_messaging_from_uid(uid): + if uid not in _cached_messaging: + try: + mp = MessagingProvider.objects.get( + classname='postgresqleu.util.messaging.twitter.Twitter', + config__accountid=uid, + series__isnull=False, + ) + _cached_messaging[uid] = (mp, get_messaging(mp)) + except MessagingProvider.DoesNotExist: + return None + return _cached_messaging[uid] + + for dme in j.get('direct_message_events', []): + with transaction.atomic(): + if 'message_create' in dme: + # Incoming direct message + recipient = int(dme['message_create']['target']['recipient_id']) + sender = int(dme['message_create']['sender_id']) + mp, tw = _get_messaging_from_uid(recipient) + if tw: + dt = datetime.fromtimestamp(int(dme['created_timestamp']) / 1000, tz=django.utils.timezone.utc) + tw.process_incoming_message_create_struct(dme['id'], dt, dme['message_create']) + + for tce in j.get('tweet_create_events', []): + with transaction.atomic(): + recipient = int(j['for_user_id']) + mp, tw = _get_messaging_from_uid(recipient) + tw.process_incoming_tweet_create_event(mp, tce) + + return HttpResponse("OK") diff --git a/postgresqleu/util/messaging/util.py b/postgresqleu/util/messaging/util.py new file mode 100644 index 00000000..4ae6a7dc --- /dev/null +++ b/postgresqleu/util/messaging/util.py @@ -0,0 +1,164 @@ +from django.utils import timezone + +from datetime import timedelta +import re + +from postgresqleu.confreg.models import NotificationQueue +from postgresqleu.confreg.models import ConferenceIncomingTweet, ConferenceIncomingTweetMedia +from postgresqleu.confreg.models import ConferenceTweetQueue +from postgresqleu.util.db import exec_no_result + + +class _Notifier(object): + def __enter__(self): + self.notified = False + return self + + def notify(self): + self.notified = True + + def __exit__(self, *args): + if self.notified: + exec_no_result('NOTIFY pgeu_notification') + + +def send_reg_direct_message(reg, msg, expiry=timedelta(hours=1)): + with _Notifier() as n: + if reg.messaging and reg.messaging.provider.active: + NotificationQueue( + time=timezone.now(), + expires=timezone.now() + expiry, + messaging=reg.messaging, + reg=reg, + channel=None, + msg=msg, + ).save() + n.notify() + + +def send_private_broadcast(conference, msg, expiry=timedelta(hours=1)): + with _Notifier() as n: + for messaging in conference.conferencemessaging_set.filter(privatebcast=True, provider__active=True): + NotificationQueue( + time=timezone.now(), + expires=timezone.now() + expiry, + messaging=messaging, + reg=None, + channel="privatebcast", + msg=msg, + ).save() + n.notify() + + +def send_org_notification(conference, msg, expiry=timedelta(hours=1)): + with _Notifier() as n: + for messaging in conference.conferencemessaging_set.filter(orgnotification=True, provider__active=True): + NotificationQueue( + time=timezone.now(), + expires=timezone.now() + expiry, + messaging=messaging, + reg=None, + channel="orgnotification", + msg=msg, + ).save() + n.notify() + + +def send_channel_message(messaging, channel, msg, expiry=timedelta(hours=1)): + with _Notifier() as n: + NotificationQueue( + time=timezone.now(), + expires=timezone.now() + expiry, + messaging=messaging, + reg=None, + channel=channel, + msg=msg, + ).save() + n.notify() + + +def store_incoming_post(provider, post): + # Have we already seen this post? + if ConferenceIncomingTweet.objects.filter(provider=provider, statusid=post['id']).exists(): + return False + + # Is this one of our own outgoing posts? + if ConferenceTweetQueue.objects.filter(postids__contains={post['id']: provider.id}).exists(): + return False + + i = ConferenceIncomingTweet( + conference=provider.route_incoming, + provider=provider, + statusid=post['id'], + created=post['datetime'], + text=post['text'], + replyto_statusid=post['replytoid'], + author_name=post['author']['name'], + author_screenname=post['author']['username'], + author_id=post['author']['id'], + author_image_url=post['author']['imageurl'], + ) + if post.get('quoted', None): + i.quoted_statusid = post['quoted']['id'] + i.quoted_text = post['quoted']['text'] + i.quoted_permalink = post['quoted']['permalink'] + i.save() + for seq, m in enumerate(post['media']): + ConferenceIncomingTweetMedia(incomingtweet=i, + sequence=seq, + mediaurl=m).save() + + return True + + +# This does not appear to match everything in any shape or form, but we are only +# using it against URLs that we have typed in ourselves, so it should be easy +# enough. +# Should be in sync with regexp in js/admin.js +_re_urlmatcher = re.compile(r'\bhttps?://\S+', re.I) + +# This is currently the value for Twitter and the default for Mastodon, so just +# use that globally for now. +_url_shortened_len = 23 +_url_counts_as_characters = "https://short.url/{}".format((_url_shortened_len - len("https://short.url/")) * 'x') + + +def get_shortened_post_length(txt): + return len(_re_urlmatcher.sub(_url_counts_as_characters, txt)) + + +# Truncate a text, taking into account URL shorterners. WIll not truncate in the middle of an URL, +# but right now will happily truncate in the middle of a word (room for improvement!) +def truncate_shortened_post(txt, maxlen): + matches = list(_re_urlmatcher.finditer(txt)) + + if not matches: + # Not a single url, so just truncate + return txt[:maxlen] + + firststart, firstend = matches[0].span() + if firstend > maxlen: + # We hit the size limit before the url or in the middle of it, so skip the whole url + return txt[:firststart] + + inlen = firstend + outlen = firststart + _url_shortened_len + for i, curr in enumerate(matches[1:]): + prevstart, prevend = matches[i].span() + currstart, currend = curr.span() + + betweenlen = currstart - prevend + if outlen + betweenlen > maxlen: + # The limit was hit in the text between urls + left = maxlen - outlen + return txt[:inlen + (maxlen - outlen)] + if outlen + betweenlen + _url_shortened_len > maxlen: + # The limit was hit in the middle of this URL, so include all the text + # up to it, but skip the url. + return txt[:inlen + betweenlen] + + # The whole URL fit + inlen += betweenlen + currend - currstart + outlen += betweenlen + _url_shortened_len + + return txt[:inlen + (maxlen - outlen)] diff --git a/postgresqleu/util/migrations/0003_oauthapps.py b/postgresqleu/util/migrations/0003_oauthapps.py new file mode 100644 index 00000000..090d6b00 --- /dev/null +++ b/postgresqleu/util/migrations/0003_oauthapps.py @@ -0,0 +1,37 @@ +# Generated by Django 2.2.11 on 2020-04-20 20:26 + +from django.db import migrations, models +from django.conf import settings + + +def migrate_twitter(apps, schema_editor): + if getattr(settings, 'TWITTER_CLIENT', None): + apps.get_model('util', 'OAuthApplication')(name='twitter', baseurl='https://api.twitter.com', client=settings.TWITTER_CLIENT, secret=settings.TWITTER_CLIENTSECRET).save() + + +def unmigrate_twitter(apps, schema_editor): + apps.get_model('util', 'OAuthApplication').objects.filter(name='twitter').delete() + + +class Migration(migrations.Migration): + + dependencies = [ + ('util', '0002_bytea_smarter_storage'), + ] + + operations = [ + migrations.CreateModel( + name='OAuthApplication', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('name', models.CharField(max_length=100, null=False, blank=False)), + ('baseurl', models.URLField(max_length=100, null=False, blank=False, unique=True, verbose_name='Base URL')), + ('client', models.CharField(max_length=200)), + ('secret', models.CharField(max_length=200)), + ], + options={ + 'verbose_name': 'OAuth Application', + }, + ), + migrations.RunPython(migrate_twitter, unmigrate_twitter), + ] diff --git a/postgresqleu/util/models.py b/postgresqleu/util/models.py index 53caac67..87bccd5f 100644 --- a/postgresqleu/util/models.py +++ b/postgresqleu/util/models.py @@ -1,6 +1,8 @@ # Some very simple models used by utilities from django.db import models +from .oauthapps import oauth_application_choices + class Storage(models.Model): key = models.CharField(max_length=16, null=False, blank=False) @@ -11,3 +13,16 @@ class Storage(models.Model): unique_together = ( ('key', 'storageid'), ) + + +class OAuthApplication(models.Model): + name = models.CharField(max_length=100, null=False, blank=False) + baseurl = models.URLField(max_length=100, null=False, blank=False, unique=True, verbose_name='Base URL') + client = models.CharField(max_length=200, null=False, blank=False) + secret = models.CharField(max_length=200, null=False, blank=False) + + def __str__(self): + return self.name + + class Meta: + verbose_name = 'OAuth Application' diff --git a/postgresqleu/util/monitor.py b/postgresqleu/util/monitor.py index 4c363a74..0c3270d1 100644 --- a/postgresqleu/util/monitor.py +++ b/postgresqleu/util/monitor.py @@ -61,9 +61,18 @@ def nagios(request): if exec_to_scalar("SELECT NOT EXISTS (SELECT 1 FROM pg_stat_activity WHERE application_name='pgeu scheduled job runner' AND datname=current_database())"): errors.append('No job scheduler connected to database') + # Check that there are no outbound emails in the queue if exec_to_scalar("SELECT EXISTS (SELECT 1 FROM mailqueue_queuedmail WHERE sendtime < now() - '2 minutes'::interval)"): errors.append('Unsent emails are present in the outbound mailqueue') + # Check that there are no outbound notifications in the queue + if exec_to_scalar("SELECT EXISTS (SELECT 1 FROM confreg_notificationqueue WHERE time < now() - '10 minutes'::interval)"): + errors.append('Unsent notifications are present in the outbound queue') + + # Check that there are no outbound social media broadcasts in the queue + if exec_to_scalar("SELECT EXISTS (SELECT 1 FROM confreg_conferencetweetqueue tq WHERE datetime < now() - '10 minutes'::interval AND approved AND EXISTS (SELECT 1 FROM confreg_conferencetweetqueue_remainingtosend rts WHERE rts.conferencetweetqueue_id=tq.id))"): + errors.append('Unsent social media broadcasts are present in the outbound queue') + # Check for email addresses not configured errors.extend(check_all_emails(['DEFAULT_EMAIL', 'INVOICE_SENDER_EMAIL', 'INVOICE_NOTIFICATION_RECEIVER', 'SCHEDULED_JOBS_EMAIL', 'SCHEDULED_JOBS_EMAIL_SENDER', 'INVOICE_NOTIFICATION_RECEIVER', 'TREASURER_EMAIL', 'SERVER_EMAIL'])) diff --git a/postgresqleu/util/oauthapps.py b/postgresqleu/util/oauthapps.py new file mode 100644 index 00000000..89f77b8b --- /dev/null +++ b/postgresqleu/util/oauthapps.py @@ -0,0 +1,79 @@ +from django.apps import apps +from django.conf import settings +from django.db.models.signals import post_save, post_delete + +import requests + +# Wrapper that caches oauth information, since it very rearely updates + + +class OAuthProviders(object): + def __init__(self): + self._providers = None + + @property + def providers(self): + if not self._providers: + mod = apps.get_app_config('util').get_model('OAuthApplication') + self._providers = {a.baseurl: a for a in mod.objects.all()} + return self._providers + + def invalidate_cache(self): + self._providers = None + + +providers = OAuthProviders() + + +def get_oauth_client(baseurl): + return providers.providers[baseurl].client + + +def get_oauth_secret(baseurl): + return providers.providers[baseurl].secret + + +def has_oauth_data(baseurl): + return baseurl in providers.providers + + +def _mastodon_oauth_maker(baseurl): + # Mastodon allows automatic creation of apps + r = requests.post('{}/api/v1/apps'.format(baseurl), data={ + 'client_name': settings.ORG_SHORTNAME, + 'redirect_uris': 'urn:ietf:wg:oauth:2.0:oob', + 'scopes': 'read write:statuses write:media', + }) + r.raise_for_status() + j = r.json() + return (j['client_id'], j['client_secret']) + + +_oauth_application_choices = { + 'mastodon': ('https://mastodon.social', 0, _mastodon_oauth_maker), + 'twitter': ('https://api.twitter.com', 1, None), +} + + +def oauth_application_choices(): + for n, m in _oauth_application_choices.items(): + # If the provider is "locked" to a baseurl and that baseurl is already added, + # then don't show it in the list. + if not (m[1] and m[0] in providers.providers): + yield (n, n, m[0], m[1]) + + +def oauth_application_create(app, baseurl): + if _oauth_application_choices.get(app, None): + if _oauth_application_choices[app][2]: + return _oauth_application_choices[app][2](baseurl) + return (None, None) + + +def _invalidate_cache(**kwargs): + providers.invalidate_cache() + + +def connect_oauth_signals(): + post_save.connect(_invalidate_cache, sender=apps.get_app_config('util').get_model('OAuthApplication')) + post_delete.connect(_invalidate_cache, sender=apps.get_app_config('util').get_model('OAuthApplication')) diff --git a/postgresqleu/util/templates/forms/widgets/linkforcode_widget.html b/postgresqleu/util/templates/forms/widgets/linkforcode_widget.html new file mode 100644 index 00000000..b0f763c0 --- /dev/null +++ b/postgresqleu/util/templates/forms/widgets/linkforcode_widget.html @@ -0,0 +1,8 @@ +<div> + <p> + First click the button below and log in to the provider. Once that is done, + an authorizastion will be provided -- paste this code in the field below and save. + </p> + <a class="btn btn-default" target="_blank" href="{{authurl}}">Initiate login</a><br/><br/> + <input type="text" name="{{widget.name}}" placeholder="Enter authorization code" {% include "django/forms/widgets/attrs.html" %}/> +</div> diff --git a/postgresqleu/util/views.py b/postgresqleu/util/views.py new file mode 100644 index 00000000..521fc331 --- /dev/null +++ b/postgresqleu/util/views.py @@ -0,0 +1,19 @@ +from django.views.decorators.csrf import csrf_exempt +from django.shortcuts import get_object_or_404 + +from postgresqleu.confreg.models import MessagingProvider +from postgresqleu.util.messaging import get_messaging +from postgresqleu.util.messaging.twitter import process_twitter_webhook + + +@csrf_exempt +def messaging_webhook(request, providerid, token): + provider = get_object_or_404(MessagingProvider, id=providerid, config__webhook__token=token) + impl = get_messaging(provider) + return impl.process_webhook(request) + + +# Twitter needs a special webhook URL since it's global and not per provider +@csrf_exempt +def twitter_webhook(request): + return process_twitter_webhook(request) diff --git a/postgresqleu/util/widgets.py b/postgresqleu/util/widgets.py index 1a390b9d..65aad0a1 100644 --- a/postgresqleu/util/widgets.py +++ b/postgresqleu/util/widgets.py @@ -138,6 +138,15 @@ class Bootstrap4HtmlDateTimeInput(forms.DateTimeInput): template_name = 'forms/widgets/bs4_datetime_input.html' +class LinkForCodeWidget(TextInput): + template_name = 'forms/widgets/linkforcode_widget.html' + + def get_context(self, name, value, attrs): + d = super().get_context(name, value, attrs) + d['authurl'] = self.authurl + return d + + class SubmitButtonWidget(forms.Widget): template_name = 'forms/widgets/submitbutton_widget.html' |