summaryrefslogtreecommitdiff
path: root/postgresqleu/util
diff options
context:
space:
mode:
Diffstat (limited to 'postgresqleu/util')
-rw-r--r--postgresqleu/util/apps.py3
-rw-r--r--postgresqleu/util/backendforms.py1
-rw-r--r--postgresqleu/util/backendviews.py83
-rw-r--r--postgresqleu/util/forms.py6
-rw-r--r--postgresqleu/util/management/commands/check_messaging_integrations.py60
-rw-r--r--postgresqleu/util/management/commands/fetch_direct_messages.py55
-rw-r--r--postgresqleu/util/management/commands/fetch_media_posts.py66
-rw-r--r--postgresqleu/util/management/commands/post_media_broadcasts.py45
-rw-r--r--postgresqleu/util/management/commands/send_notifications.py36
-rw-r--r--postgresqleu/util/management/commands/social_media_poster.py64
-rw-r--r--postgresqleu/util/messaging/__init__.py39
-rw-r--r--postgresqleu/util/messaging/mastodon.py265
-rw-r--r--postgresqleu/util/messaging/sender.py111
-rw-r--r--postgresqleu/util/messaging/telegram.py340
-rw-r--r--postgresqleu/util/messaging/twitter.py579
-rw-r--r--postgresqleu/util/messaging/util.py164
-rw-r--r--postgresqleu/util/migrations/0003_oauthapps.py37
-rw-r--r--postgresqleu/util/models.py15
-rw-r--r--postgresqleu/util/monitor.py9
-rw-r--r--postgresqleu/util/oauthapps.py79
-rw-r--r--postgresqleu/util/templates/forms/widgets/linkforcode_widget.html8
-rw-r--r--postgresqleu/util/views.py19
-rw-r--r--postgresqleu/util/widgets.py9
23 files changed, 2031 insertions, 62 deletions
diff --git a/postgresqleu/util/apps.py b/postgresqleu/util/apps.py
index 306fbb94..5576ac04 100644
--- a/postgresqleu/util/apps.py
+++ b/postgresqleu/util/apps.py
@@ -9,6 +9,7 @@ import types
from postgresqleu.util.forms import ConcurrentProtectedModelForm
from .auth import PERMISSION_GROUPS
+from .oauthapps import connect_oauth_signals
#
@@ -39,6 +40,8 @@ class UtilAppConfig(AppConfig):
name = 'postgresqleu.util'
def ready(self):
+ connect_oauth_signals()
+
post_migrate.connect(handle_post_migrate, sender=self)
# Override the default ModelAdmin in django to add our validation fields
diff --git a/postgresqleu/util/backendforms.py b/postgresqleu/util/backendforms.py
index 255e08ad..aa9a1e17 100644
--- a/postgresqleu/util/backendforms.py
+++ b/postgresqleu/util/backendforms.py
@@ -36,6 +36,7 @@ class BackendForm(ConcurrentProtectedModelForm):
filtercolumns = {}
defaultsort = []
readonly_fields = []
+ nosave_fields = []
linked_objects = {}
auto_cascade_delete_to = []
fieldsets = []
diff --git a/postgresqleu/util/backendviews.py b/postgresqleu/util/backendviews.py
index 9fad8e38..33e07356 100644
--- a/postgresqleu/util/backendviews.py
+++ b/postgresqleu/util/backendviews.py
@@ -11,6 +11,11 @@ from postgresqleu.util.lists import flatten_list
from postgresqleu.confreg.util import get_authenticated_conference
from postgresqleu.confreg.backendforms import BackendCopySelectConferenceForm
+from .models import OAuthApplication
+from .backendforms import BackendForm
+from .forms import SelectSetValueField
+from .oauthapps import oauth_application_choices, oauth_application_create
+
def backend_process_form(request, urlname, formclass, id, cancel_url='../', saved_url='../', allow_new=True, allow_delete=True, breadcrumbs=None, permissions_already_checked=False, conference=None, bypass_conference_filter=False, instancemaker=None, deleted_url=None, topadmin=None):
if not conference and not bypass_conference_filter:
@@ -160,7 +165,7 @@ def backend_process_form(request, urlname, formclass, id, cancel_url='../', save
form.pre_create_item()
form.save()
form._save_m2m()
- all_excludes = ['_validator', '_newformdata'] + form.readonly_fields
+ all_excludes = ['_validator', '_newformdata'] + list(form.readonly_fields) + form.nosave_fields
if form.json_form_fields:
for fn, ffields in form.json_form_fields.items():
all_excludes.extend(ffields)
@@ -170,7 +175,9 @@ def backend_process_form(request, urlname, formclass, id, cancel_url='../', save
# Merge fields stored in json
if form.json_form_fields:
for fn, ffields in form.json_form_fields.items():
- setattr(form.instance, fn, {fld: form.cleaned_data[fld] for fld in ffields})
+ d = getattr(form.instance, fn, {})
+ d.update({fld: form.cleaned_data[fld] for fld in ffields})
+ setattr(form.instance, fn, d)
form.instance.save(update_fields=form.json_form_fields.keys())
return HttpResponseRedirect(saved_url)
@@ -471,3 +478,75 @@ def backend_handle_copy_previous(request, formclass, restpieces, conference):
],
'helplink': formclass.helplink,
})
+
+
+#
+# Special direct views
+#
+class BackendOAuthappNewForm(forms.Form):
+ helplink = 'oauth'
+ apptype = forms.CharField() # Field type will be changed dynamically
+ baseurl = forms.URLField(label='Base URL')
+
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+
+ # We have to set this dynamically, otherwise the system won't start up enough to
+ # run the migrations, for some reason.
+ self.fields['apptype'] = SelectSetValueField(choices=oauth_application_choices,
+ setvaluefield='baseurl', label='Type of application')
+
+ def get_newform_data(self):
+ return "{}:{}".format(self.cleaned_data['apptype'], self.cleaned_data['baseurl'])
+
+ def clean_baseurl(self):
+ b = self.cleaned_data['baseurl'].rstrip('/')
+ if OAuthApplication.objects.filter(baseurl=b).exists():
+ raise ValidationError("An OAuth provider with this base URL is already configured!")
+ return b
+
+
+class BackendOAuthappForm(BackendForm):
+ helplink = 'oauth'
+ list_fields = ['name', 'baseurl']
+ readonly_fields = ['name', 'baseurl']
+ form_before_new = BackendOAuthappNewForm
+
+ class Meta:
+ model = OAuthApplication
+ fields = ['name', 'baseurl', 'client', 'secret']
+
+ def fix_fields(self):
+ super().fix_fields()
+ if self.newformdata:
+ (name, baseurl) = self.newformdata.split(':', 1)
+ self.instance.name = name
+ self.initial['name'] = name
+ self.instance.baseurl = baseurl
+ self.initial['baseurl'] = baseurl
+ if self.request.method == 'POST' and '_validator' not in self.request.POST:
+ try:
+ (client, secret) = oauth_application_create(name, baseurl)
+ except Exception as e:
+ messages.error(self.request, str(e))
+ return
+ if client:
+ self.instance.client = client
+ self.initial['client'] = client
+ self.instance.secret = secret
+ self.initial['secret'] = secret
+ messages.info(self.request, "OAuth client and secret automaticaly created, just hit save!")
+
+
+def edit_oauthapps(request, rest):
+ if not request.user.is_superuser:
+ raise PermissionDenied("Access denied")
+
+ return backend_list_editor(request,
+ None,
+ BackendOAuthappForm,
+ rest,
+ bypass_conference_filter=True,
+ topadmin='OAuth',
+ return_url='/admin/',
+ )
diff --git a/postgresqleu/util/forms.py b/postgresqleu/util/forms.py
index d504e3f3..7ade22c1 100644
--- a/postgresqleu/util/forms.py
+++ b/postgresqleu/util/forms.py
@@ -10,7 +10,7 @@ import base64
from itertools import groupby
from .widgets import InlineImageUploadWidget, InlinePdfUploadWidget
-from .widgets import SubmitButtonWidget, SelectSetValueWidget
+from .widgets import LinkForCodeWidget, SubmitButtonWidget, SelectSetValueWidget
class _ValidatorField(forms.Field):
@@ -145,6 +145,10 @@ class PdfBinaryFormField(ImageBinaryFormField):
widget = InlinePdfUploadWidget
+class LinkForCodeField(forms.Field):
+ widget = LinkForCodeWidget
+
+
class SubmitButtonField(forms.Field):
def __init__(self, *args, **kwargs):
if not kwargs:
diff --git a/postgresqleu/util/management/commands/check_messaging_integrations.py b/postgresqleu/util/management/commands/check_messaging_integrations.py
new file mode 100644
index 00000000..be6f0882
--- /dev/null
+++ b/postgresqleu/util/management/commands/check_messaging_integrations.py
@@ -0,0 +1,60 @@
+#
+# Script to (optionally) validate all messaging integrations
+#
+# This can include for example checking that webhooks are still existing,
+# and marked as valid. Actual implementation depends on the messaging provider.
+#
+
+
+from django.core.management.base import BaseCommand, CommandError
+from django.db import connection, transaction
+from django.utils import timezone
+
+import datetime
+import io
+import sys
+from collections import defaultdict
+
+from postgresqleu.util.messaging import get_messaging
+
+from postgresqleu.confreg.models import MessagingProvider
+
+
+class Command(BaseCommand):
+ help = 'Validate messaging integrations'
+
+ class ScheduledJob:
+ scheduled_time = datetime.time(4, 19)
+ default_notify_on_success = True
+
+ @classmethod
+ def should_run(self):
+ return MessagingProvider.objects.filter(active=True).exists()
+
+ def handle(self, *args, **options):
+ s = io.StringIO()
+ err = False
+
+ state = defaultdict(dict)
+ for provider in MessagingProvider.objects.filter(active=True).order_by('classname'):
+ impl = get_messaging(provider)
+
+ try:
+ result, out = impl.check_messaging_config(state[provider.classname])
+ except Exception as e:
+ result = False
+ out = "EXCEPTION: {}\n".format(e)
+
+ if out:
+ s.write("{}\n".format(provider.internalname))
+ s.write("{}\n".format('-' * len(provider.internalname)))
+ s.write(out)
+ s.write("\n\n")
+ if not result:
+ err = True
+
+ if s.tell() != 0:
+ print(s.getvalue())
+
+ if err:
+ sys.exit(1)
diff --git a/postgresqleu/util/management/commands/fetch_direct_messages.py b/postgresqleu/util/management/commands/fetch_direct_messages.py
new file mode 100644
index 00000000..61455412
--- /dev/null
+++ b/postgresqleu/util/management/commands/fetch_direct_messages.py
@@ -0,0 +1,55 @@
+#
+# Script to fetch incoming direct messages
+#
+
+
+from django.core.management.base import BaseCommand, CommandError
+from django.db import connection, transaction
+from django.utils import timezone
+
+from datetime import datetime, timedelta
+import sys
+import time
+
+from postgresqleu.util.messaging import get_messaging
+
+from postgresqleu.confreg.models import MessagingProvider
+
+
+class Command(BaseCommand):
+ help = 'Fetch direct messages'
+
+ class ScheduledJob:
+ # Normally this integreates with webhooks, so we run the catchup
+ # part separately and infrequently.
+ scheduled_interval = timedelta(minutes=15)
+
+ @classmethod
+ def should_run(self):
+ return MessagingProvider.objects.filter(Q(conferencemessaging__privatebcast=True) | Q(conferencemessaging__notification=True) | Q(conferencemessaging__orgnotification=True), active=True, series__isnull=False)
+
+ def handle(self, *args, **options):
+ curs = connection.cursor()
+ curs.execute("SELECT pg_try_advisory_lock(983231)")
+ if not curs.fetchall()[0][0]:
+ raise CommandError("Failed to get advisory lock, existing fetch_direct_messages process stuck?")
+
+ err = False
+
+ for provider in MessagingProvider.objects.raw("SELECT * FROM confreg_messagingprovider mp WHERE active AND series_id IS NOT NULL AND EXISTS (SELECT 1 FROM confreg_conferencemessaging m WHERE m.provider_id=mp.id AND (m.privatebcast OR m.notification OR m.orgnotification))"):
+ impl = get_messaging(provider)
+
+ try:
+ with transaction.atomic():
+ (lastpoll, checkpoint) = impl.poll_incoming_private_messages(provider.private_lastpoll, provider.private_checkpoint)
+ provider.private_lastpoll = lastpoll
+ provider.private_checkpoint = checkpoint
+ provider.save(update_fields=['private_lastpoll', 'private_checkpoint'])
+ except Exception as e:
+ print("Failed to poll {} for direct messages: {}".format(provider, e))
+ err = True
+
+ if err:
+ # Error message printed earlier, but we need to exit with non-zero exitcode
+ # to flag the whole job as failed.
+ sys.exit(1)
diff --git a/postgresqleu/util/management/commands/fetch_media_posts.py b/postgresqleu/util/management/commands/fetch_media_posts.py
new file mode 100644
index 00000000..ce7d8688
--- /dev/null
+++ b/postgresqleu/util/management/commands/fetch_media_posts.py
@@ -0,0 +1,66 @@
+#
+# Script to fetch incoming social media posts
+#
+
+
+from django.core.management.base import BaseCommand, CommandError
+from django.db import connection, transaction
+from django.utils import timezone
+
+from datetime import datetime, timedelta
+import sys
+import time
+
+from postgresqleu.util.messaging import get_messaging
+from postgresqleu.util.messaging.util import store_incoming_post
+
+from postgresqleu.confreg.models import ConferenceTweetQueue
+from postgresqleu.confreg.models import ConferenceIncomingTweet, ConferenceIncomingTweetMedia
+from postgresqleu.confreg.models import MessagingProvider
+
+
+class Command(BaseCommand):
+ help = 'Fetch from social media'
+
+ class ScheduledJob:
+ scheduled_interval = timedelta(minutes=5)
+
+ @classmethod
+ def should_run(self):
+ return MessagingProvider.objects.filter(active=True, series__isnull=False, route_incoming__isnull=False).exists()
+
+ def handle(self, *args, **options):
+ curs = connection.cursor()
+ curs.execute("SELECT pg_try_advisory_lock(981231)")
+ if not curs.fetchall()[0][0]:
+ raise CommandError("Failed to get advisory lock, existing fetch_media_posts process stuck?")
+
+ err = False
+
+ for provider in MessagingProvider.objects.filter(active=True, series__isnull=False, route_incoming__isnull=False):
+ impl = get_messaging(provider)
+
+ polltime = timezone.now()
+ num = 0
+ try:
+ with transaction.atomic():
+ for post in impl.poll_public_posts(provider.public_lastpoll, provider.public_checkpoint):
+ # Update our checkpoint *first*, if it happens that we have already
+ # seen everything.
+ provider.public_checkpoint = max(provider.public_checkpoint, post['id'])
+
+ if store_incoming_post(provider, post):
+ num += 1
+ # Always save last polled time, and updated checkpoint if it changed
+ provider.public_lastpoll = polltime
+ provider.save(update_fields=['public_checkpoint', 'public_lastpoll'])
+ if num:
+ print("Polled {} new posts from {}".format(num, provider))
+ except Exception as e:
+ print("Failed to poll {}: {}".format(provider, e))
+ err = True
+
+ if err:
+ # Error message printed earlier, but we need to exit with non-zero exitcode
+ # to flag the whole job as failed.
+ sys.exit(1)
diff --git a/postgresqleu/util/management/commands/post_media_broadcasts.py b/postgresqleu/util/management/commands/post_media_broadcasts.py
new file mode 100644
index 00000000..bc391035
--- /dev/null
+++ b/postgresqleu/util/management/commands/post_media_broadcasts.py
@@ -0,0 +1,45 @@
+#
+# Script to post previosly unposted news to social media
+#
+
+
+from django.core.management.base import BaseCommand, CommandError
+from django.template.defaultfilters import slugify
+from django.db import connection, transaction
+from django.conf import settings
+from django.utils import timezone
+
+from datetime import datetime, timedelta
+import sys
+
+from postgresqleu.util.messaging import ProviderCache
+from postgresqleu.util.messaging.sender import send_pending_posts
+
+
+class Command(BaseCommand):
+ help = 'Post to social media'
+
+ class ScheduledJob:
+ scheduled_interval = timedelta(minutes=5)
+
+ @classmethod
+ def should_run(self):
+ return ConferenceTweetQueue.objects.filter(approved=True, sent=False, datetime__lte=timezone.now()).exists() or ConferenceIncomingTweet.objects.filter(retweetstate=1).exists()
+
+ def handle(self, *args, **options):
+ curs = connection.cursor()
+ curs.execute("SELECT pg_try_advisory_lock(981279)")
+ if not curs.fetchall()[0][0]:
+ raise CommandError("Failed to get advisory lock, existing post_media_broadcasts process stuck?")
+
+ providers = ProviderCache()
+
+ ok, numposts, numreposts = send_pending_posts(providers)
+
+ if numposts:
+ print("Sent {} broadcast posts".format(numposts))
+ if numreposts:
+ print("Made {} broadcast reposts".format(numreposts))
+
+ if not ok:
+ sys.exit(1)
diff --git a/postgresqleu/util/management/commands/send_notifications.py b/postgresqleu/util/management/commands/send_notifications.py
new file mode 100644
index 00000000..6ba41dfd
--- /dev/null
+++ b/postgresqleu/util/management/commands/send_notifications.py
@@ -0,0 +1,36 @@
+#
+# Script to send outgoing notifications
+#
+
+
+from django.core.management.base import BaseCommand, CommandError
+from django.utils import timezone
+from django.db import connection
+
+from datetime import timedelta
+import sys
+
+from postgresqleu.util.messaging import ProviderCache
+from postgresqleu.util.messaging.sender import send_pending_messages
+
+
+class Command(BaseCommand):
+ help = 'Send pending notifications'
+
+ class ScheduledJob:
+ scheduled_interval = timedelta(minutes=10)
+
+ @classmethod
+ def should_run(self):
+ return Notification.objects.filter(time__lte=timezone.now()).exists()
+
+ def handle(self, *args, **options):
+ curs = connection.cursor()
+ curs.execute("SELECT pg_try_advisory_lock(931779)")
+ if not curs.fetchall()[0][0]:
+ raise CommandError("Failed to get advisory lock, existing post_media_broadcasts process stuck?")
+
+ providers = ProviderCache()
+
+ if not send_pending_messages(providers):
+ sys.exit(1)
diff --git a/postgresqleu/util/management/commands/social_media_poster.py b/postgresqleu/util/management/commands/social_media_poster.py
new file mode 100644
index 00000000..053a4a32
--- /dev/null
+++ b/postgresqleu/util/management/commands/social_media_poster.py
@@ -0,0 +1,64 @@
+#
+# Daemon to post all queued up notifications and social media posts
+#
+
+from django.core.management.base import BaseCommand, CommandError
+from django.core.management import load_command_class
+from django.db import connection
+from django.utils import autoreload, timezone
+from django.conf import settings
+
+from datetime import timedelta
+import time
+import io
+import sys
+import os
+import subprocess
+import threading
+import select
+
+from postgresqleu.util.messaging.sender import send_pending_messages, send_pending_posts
+from postgresqleu.util.messaging import ProviderCache
+
+
+class Command(BaseCommand):
+ help = 'Daemon to post notification and social media posts'
+
+ def handle(self, *args, **options):
+ # Automatically exit if our own code changes.
+ # This is not based on a published API, so quite likely will fail
+ # and need to be updated in a future version of django
+
+ # Start our work in a background thread
+ bthread = threading.Thread(target=self.inner_handle)
+ bthread.setDaemon(True)
+ bthread.start()
+
+ reloader = autoreload.get_reloader()
+ while not reloader.should_stop:
+ reloader.run(bthread)
+
+ self.stderr.write("Underlying code changed, exiting for a restart")
+ sys.exit(0)
+
+ def inner_handle(self):
+ with connection.cursor() as curs:
+ curs.execute("LISTEN pgeu_notification")
+ curs.execute("LISTEN pgeu_broadcast")
+ curs.execute("SET application_name = 'pgeu messages/media poster'")
+
+ while True:
+ providers = ProviderCache()
+
+ send_pending_messages(providers)
+ send_pending_posts(providers)
+
+ self.eat_notifications()
+
+ # Wake up to check if there is something to do every 5 minutes, just in case
+ select.select([connection.connection], [], [], 5 * 60)
+
+ def eat_notifications(self):
+ connection.connection.poll()
+ while connection.connection.notifies:
+ connection.connection.notifies.pop()
diff --git a/postgresqleu/util/messaging/__init__.py b/postgresqleu/util/messaging/__init__.py
index e69de29b..313c1577 100644
--- a/postgresqleu/util/messaging/__init__.py
+++ b/postgresqleu/util/messaging/__init__.py
@@ -0,0 +1,39 @@
+import re
+
+# Global regexps
+re_token = re.compile('[0-9a-z]{64}')
+
+messaging_implementations = {
+ 'postgresqleu.util.messaging.mastodon.Mastodon': ('https://mastodon.social', False),
+ 'postgresqleu.util.messaging.telegram.Telegram': ('https://api.telegram.org', True),
+ 'postgresqleu.util.messaging.twitter.Twitter': ('https://api.twitter.com', True),
+}
+
+
+def messaging_implementation_choices():
+ return [(k, k.split('.')[-1], v[0], v[1]) for k, v in messaging_implementations.items()]
+
+
+def get_messaging_class(classname):
+ if classname not in messaging_implementations:
+ raise Exception("Invalid messaging class")
+
+ pieces = classname.split('.')
+ modname = '.'.join(pieces[:-1])
+ classname = pieces[-1]
+ mod = __import__(modname, fromlist=[classname, ])
+ return getattr(mod, classname)
+
+
+def get_messaging(provider):
+ return get_messaging_class(provider.classname)(provider.id, provider.config)
+
+
+class ProviderCache(object):
+ def __init__(self):
+ self.providers = {}
+
+ def get(self, provider):
+ if provider.id not in self.providers:
+ self.providers[provider.id] = get_messaging_class(provider.classname)(provider.id, provider.config)
+ return self.providers[provider.id]
diff --git a/postgresqleu/util/messaging/mastodon.py b/postgresqleu/util/messaging/mastodon.py
new file mode 100644
index 00000000..8d71c9b0
--- /dev/null
+++ b/postgresqleu/util/messaging/mastodon.py
@@ -0,0 +1,265 @@
+from django import forms
+from django.utils import timezone
+from django.utils.html import strip_tags
+from django.conf import settings
+
+import requests_oauthlib
+import requests
+import dateutil.parser
+
+from postgresqleu.util.widgets import StaticTextWidget
+from postgresqleu.util.forms import LinkForCodeField
+from postgresqleu.util.oauthapps import get_oauth_client, get_oauth_secret, has_oauth_data
+from postgresqleu.util.models import OAuthApplication
+from postgresqleu.util.messaging import re_token
+
+from postgresqleu.confreg.backendforms import BackendSeriesMessagingForm
+from postgresqleu.confreg.models import ConferenceRegistration
+
+from .util import send_reg_direct_message
+
+
+# We always ask for this scope
+MASTODON_SCOPES = "read write:statuses write:media"
+
+
+class MastodonBackendForm(BackendSeriesMessagingForm):
+ initialconfig = LinkForCodeField(label='Get authorization code')
+ mastodoninfo = forms.CharField(widget=StaticTextWidget, label="Account information", required=False)
+
+ def __init__(self, *args, **kwargs):
+ self.baseurl = None
+ super().__init__(*args, **kwargs)
+
+ def fix_fields(self):
+ super().fix_fields()
+
+ if self.baseurl:
+ self.instance.config['baseurl'] = self.baseurl.rstrip('/')
+
+ if self.instance.config.get('token', None):
+ del self.fields['initialconfig']
+ self.config_fields = ['mastodoninfo', ]
+ self.config_fieldsets = [
+ {'id': 'mastodon', 'legend': 'Mastodon', 'fields': ['mastodoninfo', ]},
+ ]
+ self.config_readonly_fields = ['mastodoninfo', ]
+
+ try:
+ if 'username' not in self.instance.config:
+ self.instance.config.update(Mastodon(self.instance.id, self.instance.config).get_account_info())
+ self.instance.save(update_fields=['config'])
+ selfinfo = "Connected to mastodon account @{}.".format(self.instance.config['username'])
+ except Exception as e:
+ selfinfo = "ERROR verifying Mastodon access: {}".format(e)
+
+ self.initial.update({
+ 'mastodoninfo': selfinfo,
+ })
+ else:
+ # Not configured yet, so prepare for it!
+ del self.fields['mastodoninfo']
+ self.config_fields = ['initialconfig', ]
+ self.config_fieldsets = [
+ {'id': 'mastodon', 'legend': 'Mastodon', 'fields': ['initialconfig', ]},
+ ]
+ self.nosave_fields = ['initialconfig', ]
+
+ # Ugly power-grab here, but let's see what's in our POST
+ if self.request.POST.get('initialconfig', None):
+ # Token is included, so don't try to get a new one
+ self.fields['initialconfig'].widget.authurl = self.request.session['authurl']
+ else:
+ auth_url, state = self._get_oauth_session().authorization_url('{}/oauth/authorize'.format(self.instance.config['baseurl']))
+ self.request.session['authurl'] = auth_url
+
+ self.fields['initialconfig'].widget.authurl = auth_url
+
+ def clean(self):
+ d = super().clean()
+ if d.get('initialconfig', None):
+ # We have received an initial config, so try to attach ourselves to mastodon
+ try:
+ tokens = self._get_oauth_session().fetch_token(
+ '{}/oauth/token'.format(self.instance.config['baseurl']),
+ code=d.get('initialconfig'),
+ client_secret=get_oauth_secret(self.instance.config['baseurl']),
+ scopes=MASTODON_SCOPES
+ )
+
+ self.instance.config['token'] = tokens['access_token']
+ del self.request.session['authurl']
+ self.request.session.modified = True
+ except Exception as e:
+ self.add_error('initialconfig', 'Could not set up Mastodon: {}'.format(e))
+ self.add_error('initialconfig', 'You probably have to restart the process')
+ return d
+
+ def _get_oauth_session(self):
+ return requests_oauthlib.OAuth2Session(
+ get_oauth_client(self.instance.config['baseurl']),
+ redirect_uri='urn:ietf:wg:oauth:2.0:oob',
+ scope=MASTODON_SCOPES
+ )
+
+
+class Mastodon(object):
+ provider_form_class = MastodonBackendForm
+ can_process_incoming = True
+ can_broadcast = True
+ can_notification = True
+ direct_message_max_length = 450 # 500 is lenght, draw down some to handle username
+
+ @classmethod
+ def validate_baseurl(self, baseurl):
+ if not OAuthApplication.objects.filter(name='mastodon', baseurl=baseurl).exists():
+ return 'Global OAuth credentials for {} missing'.format(baseurl)
+
+ @property
+ def max_post_length(self):
+ return 500
+
+ def __init__(self, providerid, config):
+ self.providerid = providerid
+ self.providerconfig = config
+
+ self.sess = requests.Session()
+ self.sess.headers.update({
+ 'Authorization': 'Bearer {}'.format(self.providerconfig['token']),
+ })
+
+ def _api_url(self, url):
+ return '{}{}'.format(self.providerconfig['baseurl'], url)
+
+ def _get(self, url, *args, **kwargs):
+ return self.sess.get(self._api_url(url), *args, **kwargs)
+
+ def _post(self, url, *args, **kwargs):
+ return self.sess.post(self._api_url(url), *args, **kwargs)
+
+ def get_account_info(self):
+ r = self._get('/api/v1/accounts/verify_credentials')
+ r.raise_for_status()
+ j = r.json()
+ return {
+ 'username': j['username'],
+ }
+
+ def post(self, toot, image=None, replytotweetid=None):
+ d = {
+ 'status': toot,
+ 'visibility': 'public',
+ }
+ if replytotweetid:
+ d['in_reply_to_id'] = replytotweetid
+
+ if image:
+ r = self._post('/api/v1/media', files={
+ 'file': bytearray(image),
+ })
+ if r.status_code != 200:
+ return (False, 'Media upload: {}'.format(r.text))
+ d['media_ids'] = [int(r.json()['id']), ]
+
+ r = self._post('/api/v1/statuses', json=d)
+ if r.status_code != 200:
+ return (None, r.text)
+
+ return (r.json()['id'], None)
+
+ def repost(self, postid):
+ r = self._post('/api/v1/statuses/{}/reblog'.format(postid))
+ if r.status_code != 200:
+ return (None, r.text)
+ return (True, None)
+
+ def send_direct_message(self, recipient_config, msg):
+ d = {
+ 'status': '@{} {}'.format(recipient_config['username'], msg),
+ 'visibility': 'direct',
+ }
+
+ r = self._post('/api/v1/statuses', json=d)
+ r.raise_for_status()
+
+ def poll_public_posts(self, lastpoll, checkpoint):
+ p = {
+ 'limit': 200, # If it's this many, we should give up
+ 'exclude_types': ['follow', 'favourite', 'reblog', 'poll'],
+ }
+ if checkpoint:
+ p['since_id'] = checkpoint
+
+ r = self._get('/api/v1/notifications', params=p)
+ r.raise_for_status()
+
+ for n in r.json():
+ s = n['status']
+ d = {
+ 'id': int(s['id']),
+ 'datetime': dateutil.parser.parse(s['created_at']),
+ 'text': strip_tags(s['content']),
+ 'replytoid': s['in_reply_to_id'] and int(s['in_reply_to_id']) or None,
+ 'author': {
+ 'name': s['account']['display_name'] or s['account']['username'],
+ 'username': s['account']['username'],
+ 'id': s['account']['id'],
+ 'imageurl': s['account']['avatar_static'],
+ },
+ 'media': [m['url'] for m in s['media_attachments']],
+ }
+ # (mastodon doesn't have quoted status, so just leave that one non-existing)
+ yield d
+
+ def poll_incoming_private_messages(self, lastpoll, checkpoint):
+ p = {
+ 'limit': 40,
+ }
+ if checkpoint:
+ p['since_id'] = checkpoint
+
+ r = self._get('/api/v1/conversations', params=p)
+ r.raise_for_status()
+
+ j = r.json()
+ for c in j:
+ if len(c['accounts']) > 1:
+ # Can't handle group messages
+ continue
+ ls = c['last_status']
+ self.process_incoming_dm(ls)
+
+ if len(j):
+ # For some reason, it paginates by last_status->id, and not by id. Go figure.
+ return timezone.now(), max((c['last_status']['id'] for c in j))
+ else:
+ return timezone.now(), checkpoint
+
+ def process_incoming_dm(self, msg):
+ for m in re_token.findall(msg['content']):
+ try:
+ reg = ConferenceRegistration.objects.get(regtoken=m)
+
+ reg.messaging_config = {
+ 'username': msg['account']['username'],
+ }
+ reg.save(update_fields=['messaging_config'])
+
+ send_reg_direct_message(reg, 'Hello! This account is now configured to receive notifications for {}'.format(reg.conference))
+ except ConferenceRegistration.DoesNotExist:
+ pass
+
+ def get_public_url(self, post):
+ return '{}@{}/{}'.format(self.providerconfig['baseurl'], post.author_screenname, post.statusid)
+
+ def get_attendee_string(self, token, messaging, attendeeconfig):
+ if 'username' in attendeeconfig:
+ return "Your notifications will be sent to @{}.".format(attendeeconfig['username']), None
+ else:
+ return 'mastodon_invite.html', {
+ 'mastodonname': self.providerconfig['username'],
+ 'token': token,
+ }
+
+ def check_messaging_config(self, state):
+ return True, ''
diff --git a/postgresqleu/util/messaging/sender.py b/postgresqleu/util/messaging/sender.py
new file mode 100644
index 00000000..42dde1a3
--- /dev/null
+++ b/postgresqleu/util/messaging/sender.py
@@ -0,0 +1,111 @@
+# Message sending functionality lives in a separate module so it
+# can easily be used both from a scheduled job and from a daemone
+# when available.
+from django.utils import timezone
+from django.db import transaction
+
+from datetime import timedelta
+import time
+
+from postgresqleu.confreg.models import NotificationQueue
+from postgresqleu.confreg.models import ConferenceTweetQueue, ConferenceIncomingTweet
+from postgresqleu.util.messaging.util import truncate_shortened_post
+
+
+def send_pending_messages(providers):
+ err = False
+
+ # First delete any expired messages
+ NotificationQueue.objects.filter(expires__lte=timezone.now()).delete()
+
+ # Then one by one send off the ones that we have in the queue
+ first = True
+ while True:
+ with transaction.atomic():
+ msglist = list(NotificationQueue.objects.
+ select_for_update(of=('self',)).
+ select_related('reg', 'messaging', 'messaging__provider').
+ only('msg', 'channel', 'reg__messaging_config',
+ 'messaging__config', 'reg__messaging__provider',
+ ).filter(time__lte=timezone.now()).
+ order_by('time', 'id')[:1])
+ if len(msglist) == 0:
+ break
+ n = msglist[0]
+
+ # Actually Send Message (TM)
+ impl = providers.get(n.messaging.provider)
+ try:
+ if n.reg:
+ impl.send_direct_message(n.reg.messaging_config, n.msg)
+ else:
+ impl.post_channel_message(n.messaging.config, n.channel, n.msg)
+ except Exception as e:
+ print("Failed to send notification to {} using {}: {}. Will retry until {}.".format(
+ n.reg and n.reg or n.channel,
+ n.messaging.provider.internalname,
+ e, n.expires
+ ))
+ err = True
+
+ # Retry in 5 minutes
+ n.time += timedelta(minutes=5)
+ n.save()
+ else:
+ # Successfully posted, so delete it
+ n.delete()
+
+ # Rate limit us to one per second, that should usually be enough
+ if first:
+ first = False
+ else:
+ time.sleep(1)
+
+ return not err
+
+
+def send_pending_posts(providers):
+ err = False
+
+ numposts = 0
+ for t in ConferenceTweetQueue.objects.prefetch_related('remainingtosend').filter(approved=True, sent=False, datetime__lte=timezone.now()).order_by('datetime'):
+ sentany = False
+ for p in t.remainingtosend.all():
+ impl = providers.get(p)
+ (id, err) = impl.post(
+ truncate_shortened_post(t.contents, t.max_post_length),
+ t.image,
+ t.replytotweetid,
+ )
+
+ if id:
+ t.remainingtosend.remove(p)
+ # postids is a map of <provider status id> -> <provider id>. It's mapped
+ # "backwards" this way because the main check we do is if a key exists.
+ t.postids[id] = p.id
+ sentany = True
+ else:
+ sys.stderr.write("Failed to post to {}: {}".format(p, err))
+ err = True
+ if sentany:
+ numposts += 1
+ if not t.remainingtosend.exists():
+ t.sent = True
+ t.save(update_fields=['postids', 'sent'])
+
+ numreposts = 0
+ for t in ConferenceIncomingTweet.objects.select_related('provider').filter(retweetstate=1):
+ # These are tweets that should be retweeted. Retweets only happen on the same
+ # provider that they were posted on.
+ impl = providers.get(t.provider)
+
+ ok, msg = impl.repost(t.statusid)
+ if ok:
+ t.retweetstate = 2
+ t.save(update_fields=['retweetstate'])
+ numreposts += 1
+ else:
+ self.stderr.write("Failed to repost on {}: {}\n".format(t.provider, msg))
+ err = True
+
+ return err, numposts, numreposts
diff --git a/postgresqleu/util/messaging/telegram.py b/postgresqleu/util/messaging/telegram.py
new file mode 100644
index 00000000..406c1fb4
--- /dev/null
+++ b/postgresqleu/util/messaging/telegram.py
@@ -0,0 +1,340 @@
+from django import forms
+from django.http import HttpResponse
+from django.db import models
+from django.utils.functional import cached_property
+from django.utils import timezone
+from django.contrib import messages
+from django.conf import settings
+import django.utils.timezone
+
+import json
+import requests
+from datetime import datetime
+
+from postgresqleu.util.random import generate_random_token
+from postgresqleu.util.forms import SubmitButtonField
+from postgresqleu.util.widgets import StaticTextWidget
+from postgresqleu.util.messaging import re_token
+
+from postgresqleu.confreg.backendforms import BackendSeriesMessagingForm
+from postgresqleu.confreg.models import ConferenceMessaging, ConferenceRegistration, IncomingDirectMessage
+
+from .util import send_reg_direct_message, send_channel_message
+
+
+class TelegramBackendForm(BackendSeriesMessagingForm):
+ telegramtoken = forms.CharField(required=True, label="Telegram token",
+ widget=forms.widgets.PasswordInput(render_value=True, attrs={'autocomplete': 'new-password'}),
+ help_text='Create a new bot in the Telegram Botfather, and copy/paste the access token here')
+ telegramstatus = forms.CharField(widget=StaticTextWidget, label="Telegram information", required=False)
+ webhookenabler = SubmitButtonField(label="Enable webhook")
+
+ exclude_fields_from_validation = ['telegramstatus', 'webhookenabler', ]
+
+ @property
+ def config_fields(self):
+ f = ['telegramtoken', 'webhookenabler']
+ if self.instance.config.get('telegramtoken', None):
+ f.extend(['telegramstatus'])
+ return f
+
+ @property
+ def config_fieldsets(self):
+ return [
+ {'id': 'telegram', 'legend': 'Telegram', 'fields': self.config_fields},
+ ]
+
+ def fix_fields(self):
+ super().fix_fields()
+ if self.instance.config.get('telegramtoken', None):
+ # Existing token, make a call to telegram to see what's up, if necessary
+ try:
+ if 'botname' not in self.instance.config:
+ tgm = Telegram(self.instance.id, self.instance.config)
+ botinfo = tgm.get('getMe')
+ self.instance.config.update({
+ 'botname': botinfo['username'],
+ 'botfullname': botinfo['first_name'],
+ })
+ self.instance.save(update_fields=['config'])
+ self.initial['telegramstatus'] = 'Telegram bot username {}, full name "{}", connected.'.format(
+ self.instance.config['botname'],
+ self.instance.config['botfullname'],
+ )
+ except Exception as e:
+ self.initial['telegramstatus'] = 'Failed to get Telegram info: {}'.format(e)
+
+ # Is the webhook enabled?
+ if 'webhook' in self.instance.config:
+ # It is!
+ self.fields['webhookenabler'].label = 'Disable webhook'
+ self.fields['webhookenabler'].widget.label = 'Disable webhook'
+ self.fields['webhookenabler'].callback = self.disable_webhook
+ else:
+ self.fields['webhookenabler'].callback = self.enable_webhook
+ else:
+ self.remove_field('telegramstatus')
+ self.remove_field('webhookenabler')
+
+ def enable_webhook(self, request):
+ token = generate_random_token()
+
+ Telegram(self.instance.id, self.instance.config).post('setWebhook', {
+ 'url': '{}/wh/{}/{}/'.format(settings.SITEBASE, self.instance.id, token),
+ 'max_connections': 2,
+ 'allowed_updates': ['channel_post', 'message'],
+ })
+ self.instance.config['webhook'] = {
+ 'token': token,
+ }
+ self.instance.save(update_fields=['config'])
+
+ messages.info(request, "Webhook has been enabled!")
+ return True
+
+ def disable_webhook(self, request):
+ Telegram(self.instance.id, self.instance.config).post('deleteWebhook')
+
+ del self.instance.config['webhook']
+ self.instance.save(update_fields=['config'])
+
+ messages.info(request, "Webhook has been disabled!")
+ return True
+
+
+class Telegram(object):
+ provider_form_class = TelegramBackendForm
+ can_privatebcast = True
+ can_notification = True
+ can_orgnotification = True
+ direct_message_max_length = None
+
+ @classmethod
+ def validate_baseurl(self, baseurl):
+ return None
+
+ def __init__(self, id, config):
+ self.providerid = id
+ self.providerconfig = config
+
+ def refresh_messaging_config(self, config):
+ mod = False
+
+ if 'channeltoken' not in config:
+ config['channeltoken'] = {}
+ if 'tokenchannel' not in config:
+ config['tokenchannel'] = {}
+
+ for channel in ['privatebcast', 'orgnotification']:
+ if channel not in config['channeltoken']:
+ # Create a token!
+ t = generate_random_token()
+ config['channeltoken'][channel] = t
+ config['tokenchannel'][t] = channel
+ mod = True
+ return mod
+
+ def get_channel_field(self, instance, fieldname):
+ commontxt = "<br/><br/>To set or edit this, please invite the bot @{} to the selected channel as an administrator, and after that's done, plaste the token {} in the channel to associated. Then wait a bit and refresh this page (you will also be notified in the channel).".format(self.providerconfig['botname'], instance.config['channeltoken'][fieldname])
+ if 'channels' in instance.config and fieldname in instance.config['channels']:
+ txt = 'Configured to talk in channel with id {} and title "{}".'.format(
+ instance.config['channels'][fieldname]['id'],
+ instance.config['channels'][fieldname]['title'],
+ )
+ return SubmitButtonField(label='Disable channel', prefixparagraph=txt + commontxt, callback=self.disable_channel(instance, fieldname))
+ else:
+ txt = '<strong>Not currently attached to a channel!</strong>'
+ return forms.CharField(widget=StaticTextWidget, initial=txt + commontxt)
+
+ def disable_channel(self, instance, channelname):
+ def _disable_channel(request):
+ del instance.config['channels'][channelname]
+ instance.save(update_fields=['config', ])
+ return _disable_channel
+
+ def get(self, method, params={}):
+ r = requests.get(
+ 'https://api.telegram.org/bot{}/{}'.format(self.providerconfig['telegramtoken'], method),
+ params=params,
+ timeout=10
+ )
+ r.raise_for_status()
+ j = r.json()
+ if not j['ok']:
+ raise Exception("OK was {}".format(j['ok']))
+ return j['result']
+
+ def post(self, method, params={}, ignoreerrors=False):
+ r = requests.post(
+ 'https://api.telegram.org/bot{}/{}'.format(self.providerconfig['telegramtoken'], method),
+ data=params,
+ timeout=10
+ )
+ if ignoreerrors:
+ return None
+
+ r.raise_for_status()
+ j = r.json()
+ if not j['ok']:
+ raise Exception("OK was {}".format(j['ok']))
+ return j['result']
+
+ def send_direct_message(self, recipient_config, msg):
+ self.post('sendMessage', {
+ 'chat_id': recipient_config['userid'],
+ 'text': msg,
+ })
+
+ def post_channel_message(self, messagingconfig, channelname, msg):
+ self.post('sendMessage', {
+ 'chat_id': messagingconfig['channels'][channelname]['id'],
+ 'text': msg,
+ })
+
+ def poll_incoming_private_messages(self, lastpoll, checkpoint):
+ # If we are configured with a webhook, telegram will return an error if
+ # we try to get the data this way as well, so don't try that.
+ if 'webhook' in self.providerconfig:
+ return lastpoll, checkpoint
+
+ # We'll get up to 100 updates per run, which is the default
+ res = self.get('getUpdates', {
+ 'offset': checkpoint + 1,
+ 'allowed_updates': ['channel_post', 'message'],
+ })
+
+ # For now we don't store telegram input, we just do automated processing to figure
+ # out if we're connected to something.
+ for u in res:
+ if 'channel_post' in u:
+ self.process_channel_post(u['channel_post'])
+ elif 'message' in u:
+ self.process_incoming_chat_structure(u)
+
+ if res:
+ return timezone.now(), max((u['update_id'] for u in res))
+ else:
+ return timezone.now(), checkpoint
+
+ def process_webhook(self, request):
+ try:
+ j = json.loads(request.body.decode('utf8', errors='ignore'))
+ if 'channel_post' in j:
+ self.process_channel_post(j['channel_post'])
+ elif 'message' in j:
+ self.process_incoming_chat_structure(j)
+ # All other types we just ignore for now
+ return HttpResponse("OK")
+ except Exception as e:
+ return HttpResponse("Internal error", status=500)
+
+ def process_channel_post(self, p):
+ # Does it look like a token? If so, try to attach!
+ for m in re_token.findall(p['text']):
+ # Found a match.
+ # Now try to find if this is an actual token, and assign the channel
+ # as required.
+ try:
+ r = ConferenceMessaging.objects.get(
+ provider_id=self.providerid,
+ config__tokenchannel__has_key=m
+ )
+ chan = r.config['tokenchannel'][m]
+ hadprevchannel = 'channels' in r.config and chan in r.config['channels']
+ if 'channels' not in r.config:
+ r.config['channels'] = {}
+
+ # Export a channel invite link, so that we have one
+ self.post('exportChatInviteLink', {'chat_id': p['chat']['id']}, ignoreerrors=True)
+ chatobj = self.get('getChat', {'chat_id': p['chat']['id']})
+ r.config['channels'][chan] = {
+ 'id': p['chat']['id'],
+ 'title': p['chat']['title'],
+ 'invitelink': chatobj.get('invite_link', None),
+ }
+ r.save(update_fields=['config'])
+ try:
+ # Ignore if this fails, probably permissions
+ self.post('deleteMessage', {
+ 'chat_id': p['chat']['id'],
+ 'message_id': p['message_id']
+ })
+ except Exception as e:
+ pass
+
+ # Send a reply, and this should not fail
+ send_channel_message(r, chan,
+ 'Thank you, this channel has now been associated with {} channel {}'.format(
+ r.conference.conferencename,
+ chan
+ ))
+ if hadprevchannel:
+ send_channel_message(r, chan, 'The previously existing channel association has been removed.')
+
+ except ConferenceMessaging.DoesNotExist:
+ # Just ignore it, since it wasn't an active token.
+ pass
+
+ def process_incoming_chat_structure(self, u):
+ msgid = int(u['update_id'])
+ if IncomingDirectMessage.objects.filter(provider_id=self.providerid, postid=msgid).exists():
+ # We've already seen this one
+ return
+
+ msg = IncomingDirectMessage(
+ provider_id=self.providerid,
+ postid=msgid,
+ time=datetime.fromtimestamp(int(u['message']['date']), tz=django.utils.timezone.utc),
+ sender={
+ 'username': u['message']['from']['username'],
+ 'userid': u['message']['from']['id'],
+ },
+ txt=u['message']['text'],
+ )
+ self.process_incoming_chat_message(msg)
+ msg.save()
+
+ def process_incoming_chat_message(self, msg):
+ # Does it look like a token? If so, try to attach!
+ for m in re_token.findall(msg.txt):
+ try:
+ reg = ConferenceRegistration.objects.get(regtoken=m)
+ # Matched reg, so set it up
+ reg.messaging_config = msg.sender
+ reg.save(update_fields=['messaging_config'])
+
+ send_reg_direct_message(reg, 'Hello! This account is now configured to receive notifications for {}'.format(reg.conference))
+
+ msg.internallyprocessed = True
+ return
+ except ConferenceRegistration.DoesNotExist:
+ pass
+
+ def get_attendee_string(self, token, messaging, attendeeconfig):
+ if 'userid' in attendeeconfig:
+ return 'telegram_ready.html', {
+ 'username': attendeeconfig['username'],
+ 'invitelink': messaging.config.get('channels', {}).get('privatebcast', {}).get('invitelink', None),
+ }
+ else:
+ return 'telegram_invite.html', {
+ 'botname': self.providerconfig['botname'],
+ 'token': token,
+ }
+
+ def check_messaging_config(self, state):
+ if 'webhook' in self.providerconfig:
+ token = self.providerconfig['webhook']['token']
+
+ webhookurl = '{}/wh/{}/{}/'.format(settings.SITEBASE, self.providerid, token)
+
+ whi = self.get('getWebhookInfo')
+ if whi['url'] != webhookurl:
+ self.post('setWebhook', {
+ 'url': webhookurl,
+ 'max_connections': 2,
+ 'allowed_updates': ['channel_post', 'message'],
+ })
+ return True, 'Webhook resubscribed'
+
+ return True, ''
diff --git a/postgresqleu/util/messaging/twitter.py b/postgresqleu/util/messaging/twitter.py
index 4edb5027..d6a6f2cb 100644
--- a/postgresqleu/util/messaging/twitter.py
+++ b/postgresqleu/util/messaging/twitter.py
@@ -1,31 +1,220 @@
+from django.core.validators import ValidationError
+from django.http import HttpResponse
+from django import forms
from django.conf import settings
+from django.contrib import messages
+from django.db import transaction
+import django.utils.timezone
import requests_oauthlib
+from datetime import datetime
+import dateutil.parser
+import hmac
+import hashlib
+import json
+import base64
+
+from postgresqleu.util.widgets import StaticTextWidget
+from postgresqleu.util.forms import SubmitButtonField
+from postgresqleu.util.forms import LinkForCodeField
+from postgresqleu.util.oauthapps import get_oauth_client, get_oauth_secret, has_oauth_data
+from postgresqleu.util.messaging import re_token, get_messaging
+from postgresqleu.util.messaging.util import send_reg_direct_message, store_incoming_post
+
+from postgresqleu.confreg.models import ConferenceRegistration, MessagingProvider, IncomingDirectMessage
+from postgresqleu.confreg.backendforms import BackendSeriesMessagingForm
_cached_twitter_users = {}
-class Twitter(object):
- def __init__(self, conference=None):
- if conference:
- token = conference.twitter_token
- secret = conference.twitter_secret
+class TwitterBackendForm(BackendSeriesMessagingForm):
+ initialconfig = LinkForCodeField(label='Get PIN code')
+ twitterinfo = forms.CharField(widget=StaticTextWidget, label="Account information", required=False)
+ webhookenabler = SubmitButtonField(label="Enable webhook")
+
+ def fix_fields(self):
+ super().fix_fields()
+
+ if self.instance.config.get('token', None):
+ del self.fields['initialconfig']
+ self.config_fields = ['twitterinfo', 'webhookenabler']
+ if 'webhook' in self.instance.config:
+ self.fields['webhookenabler'].label = 'Disable webhook'
+ self.fields['webhookenabler'].widget.label = 'Disable webhook'
+ self.fields['webhookenabler'].callback = self.disable_webhook
+ else:
+ self.fields['webhookenabler'].callback = self.enable_webhook
+ self.config_fieldsets = [
+ {'id': 'twitter', 'legend': 'Twitter', 'fields': self.config_fields},
+ ]
+ self.config_readonly_fields = ['twitterinfo', ]
+
+ if 'screen_name' not in self.instance.config:
+ try:
+ ai = Twitter(self.instance.id, self.instance.config).get_account_info()
+ self.instance.config.update(ai)
+ self.instance.save(update_fields=['config'])
+ except Exception as e:
+ self.initial['twitterinfo'] = "Unable to fetch twitter account info: {}".format(e)
+ return
+
+ self.initial.update({
+ 'twitterinfo': "Connected to twitter account @{}.".format(self.instance.config.get('screen_name', '*unknown*')),
+ })
else:
- token = settings.TWITTER_NEWS_TOKEN
- secret = settings.TWITTER_NEWS_TOKENSECRET
+ # Not configured yet, so prepare for it!
+ del self.fields['twitterinfo']
+ del self.fields['webhookenabler']
+ self.config_fields = ['initialconfig', ]
+ self.config_fieldsets = [
+ {'id': 'twitter', 'legend': 'Twitter', 'fields': ['initialconfig', ]},
+ ]
+ self.nosave_fields = ['initialconfig', ]
+
+ # Ugly power-grab here, but let's see what's in our POST
+ if self.request.POST.get('initialconfig', None):
+ # Token is included, so don't try to get a new one
+ self.fields['initialconfig'].widget.authurl = self.request.session['authurl']
+ else:
+ # Prepare the twitter setup
+ try:
+ (auth_url, ownerkey, ownersecret) = TwitterSetup.get_authorization_data()
+ self.request.session['ownerkey'] = ownerkey
+ self.request.session['ownersecret'] = ownersecret
+ self.request.session['authurl'] = auth_url
+
+ self.fields['initialconfig'].widget.authurl = auth_url
+ except Exception as e:
+ messages.error(self.request, 'Failed to initialize setup with Twitter: {}'.format(e))
+ del self.fields['initialconfig']
+ self.config_fields = []
+ self.config_fieldsets = []
+
+ def clean(self):
+ d = super().clean()
+ if d.get('initialconfig', None):
+ # We have received an initial config, so try to attach ourselves to twitter.
+ try:
+ tokens = TwitterSetup.authorize(self.request.session['ownerkey'],
+ self.request.session['ownersecret'],
+ d.get('initialconfig'),
+ )
+ self.instance.config['token'] = tokens.get('oauth_token')
+ self.instance.config['secret'] = tokens.get('oauth_token_secret')
+ del self.request.session['authurl']
+ del self.request.session['ownerkey']
+ del self.request.session['ownersecret']
+ self.request.session.modified = True
+
+ ai = Twitter(self.instance.id, self.instance.config).get_account_info()
+ if MessagingProvider.objects.filter(
+ classname='postgresqleu.util.messaging.twitter.Twitter',
+ config__accountid=ai['accountid'],
+ series__isnull=True,
+ ).exclude(pk=self.instance.pk).exists():
+ raise Exception("Another messaging provider is already configured for this Twitter account")
+ self.instance.config.update(ai)
+ except Exception as e:
+ self.add_error('initialconfig', 'Could not set up Twitter: {}'.format(e))
+ self.add_error('initialconfig', 'You probably have to restart the process')
+ elif 'twitterinfo' not in d:
+ raise ValidationError("Twitter information is incomplete")
+ return d
+
+ # Webhooks are global to our client, but we have to turn on and off the individual
+ # subscriptions to the webhooks. If no webhook is configured, we will automtaically
+ # set it up when the first subscription should be added.
+ def enable_webhook(self, request):
+ t = Twitter(self.instance.id, self.instance.config)
+ try:
+ res, env, msg = t.check_global_webhook()
+ if not res:
+ messages.error(request, msg)
+ return
+
+ if msg:
+ messages.info(request, msg)
+
+ # Now subscribe to this webhook
+ r = t.tw.post('https://api.twitter.com/1.1/account_activity/all/{}/subscriptions.json'.format(env), params={
+ })
+ if r.status_code != 204:
+ r.raise_for_status()
+ messages.error(request, "Error registering subscription, status code {}".format(r.status_code))
+ return
+
+ # Else we are registered and have a subscription!
+ self.instance.config['webhook'] = {
+ 'ok': 1,
+ }
+ self.instance.save(update_fields=['config'])
+ messages.info(request, "Subscribed to webhook")
+ except Exception as e:
+ messages.error(request, "Error registering twitter webhook/subscription: {}".format(e))
- self.tw = requests_oauthlib.OAuth1Session(settings.TWITTER_CLIENT,
- settings.TWITTER_CLIENTSECRET,
- token,
- secret)
+ def disable_webhook(self, request):
+ t = Twitter(self.instance.id, self.instance.config)
+ try:
+ r = t.tw.delete('https://api.twitter.com/1.1/account_activity/all/pgeu/subscriptions.json')
+ if r.status_code != 204:
+ jj = r.json()
+ if 'errors' not in jj:
+ messages.error(request, "Error removing subscription, status {}".format(r.status_ocode))
+ return
+ # Code 34 means this subscription didn't exist in the first place, so treat it
+ # as if we removed it.
+ if jj['errors'][0]['code'] != 34:
+ messages.error(request, "Error removing subscription: {}".format(jj['errors'][0]['message']))
+ return
+ del self.instance.config['webhook']
+ self.instance.save(update_fields=['config'])
+ messages.info(request, "Webhook has been disabled!")
+ except Exception as e:
+ messages.error(request, "Error removing subscription: {}".format(e))
+
+
+class Twitter(object):
+ provider_form_class = TwitterBackendForm
+ can_process_incoming = True
+ can_broadcast = True
+ can_notification = True
+ direct_message_max_length = None
+
+ @classmethod
+ def validate_baseurl(self, baseurl):
+ return None
- def get_own_screen_name(self):
+ @property
+ def max_post_length(self):
+ return 280
+
+ def __init__(self, id, config):
+ self.providerid = id
+ self.providerconfig = config
+ self._tw = None
+
+ @property
+ def tw(self):
+ if not self._tw and 'token' in self.providerconfig:
+ self._tw = requests_oauthlib.OAuth1Session(
+ get_oauth_client('https://api.twitter.com'),
+ get_oauth_secret('https://api.twitter.com'),
+ self.providerconfig['token'],
+ self.providerconfig['secret'],
+ )
+ return self._tw
+
+ def get_account_info(self):
r = self.tw.get('https://api.twitter.com/1.1/account/verify_credentials.json?include_entities=false&skip_status=true&include_email=false')
if r.status_code != 200:
raise Exception("http status {}".format(r.status_code))
- return r.json()['screen_name']
+ j = r.json()
+ return {
+ 'screen_name': j['screen_name'],
+ 'accountid': j['id'],
+ }
- def post_tweet(self, tweet, image=None, replytotweetid=None):
+ def post(self, tweet, image=None, replytotweetid=None):
d = {
'status': tweet,
}
@@ -47,66 +236,269 @@ class Twitter(object):
return (None, r.text)
return (r.json()['id'], None)
- def retweet(self, tweetid):
+ def repost(self, tweetid):
r = self.tw.post('https://api.twitter.com/1.1/statuses/retweet/{0}.json'.format(tweetid))
if r.status_code != 200:
+ # If the error is "you have already retweeted this", we just ignore it
+ try:
+ if r.json()['errors'][0]['code'] == 327:
+ return (True, None)
+ except Exception:
+ pass
return (None, r.text)
return (True, None)
- def send_message(self, tousername, msg):
- # Nor the username
- tousername = tousername.lower().replace('@', '')
+ def send_direct_message(self, recipient_config, msg):
+ r = self.tw.post('https://api.twitter.com/1.1/direct_messages/events/new.json', json={
+ 'event': {
+ 'type': 'message_create',
+ 'message_create': {
+ 'target': {
+ 'recipient_id': recipient_config['twitterid'],
+ },
+ 'message_data': {
+ 'text': msg,
+ }
+ }
+ }
+ })
- # DM API calls require us to look up the userid, so do that with a
- # tiny cache first.
- if tousername not in _cached_twitter_users:
+ if r.status_code != 200:
try:
- r = self.tw.get('https://api.twitter.com/1.1/users/show.json',
- params={'screen_name': tousername})
- _cached_twitter_users[tousername] = r.json()['id']
+ # Normally these errors come back as json, so try to return that
+ ej = r.json()['errors'][0]
+ raise Exception('{}: {}'.format(ej['code'], ej['message']))
except Exception as e:
- return (False, None, "Failed to look up user %s: %s" % (tousername, e))
+ r.raise_for_status()
- try:
- r = self.tw.post('https://api.twitter.com/1.1/direct_messages/events/new.json', json={
- 'event': {
- 'type': 'message_create',
- 'message_create': {
- 'target': {
- 'recipient_id': _cached_twitter_users[tousername],
- },
- 'message_data': {
- 'text': msg,
- }
- }
+ def poll_public_posts(self, lastpoll, checkpoint):
+ if checkpoint:
+ sincestr = "&since_id={}".format(checkpoint)
+ else:
+ sincestr = ""
+ r = self.tw.get('https://api.twitter.com/1.1/statuses/mentions_timeline.json?tweet_mode=extended{}'.format(sincestr))
+ r.raise_for_status()
+ for tj in r.json():
+ yield self._parse_tweet_struct(tj)
+
+ def _parse_tweet_struct(self, tj):
+ d = {
+ 'id': int(tj['id']),
+ 'datetime': dateutil.parser.parse(tj['created_at']),
+ 'text': tj.get('full_text', tj.get('text')),
+ 'replytoid': tj['in_reply_to_status_id'] and int(tj['in_reply_to_status_id']) or None,
+ 'author': {
+ 'name': tj['user']['name'],
+ 'username': tj['user']['screen_name'],
+ 'id': int(tj['user']['id']),
+ 'imageurl': tj['user']['profile_image_url_https'],
+ },
+ 'media': [m['media_url_https'] for m in tj['entities'].get('media', [])],
+ }
+ if tj['is_quote_status'] and 'quoted_status_id' in tj:
+ d['quoted'] = {
+ 'id': int(tj['quoted_status_id']),
+ 'text': tj['quoted_status']['full_text'],
+ 'permalink': tj['quoted_status_permalink'],
+ }
+ return d
+
+ # This is delivered by the webhook if it's enabled
+ def poll_incoming_private_messages(self, lastpoll, checkpoint):
+ # Ugh. Seems twitter always delivers the last 30 days. So we need to do some manual
+ # checking and possibly page backwards. At least it seems they are coming back in
+ # reverse chronological order (which is not documented)
+ highdt = lastpoll
+ cursor = None
+ while True:
+ p = {
+ 'count': 50,
+ }
+ if cursor:
+ p['cursor'] = cursor
+
+ r = self.tw.get('https://api.twitter.com/1.1/direct_messages/events/list.json', params=p)
+ r.raise_for_status()
+
+ j = r.json()
+
+ for e in j['events']:
+ dt = datetime.fromtimestamp(int(e['created_timestamp']) / 1000, tz=django.utils.timezone.utc)
+ if dt < lastpoll:
+ break
+ highdt = max(dt, highdt)
+ if e['type'] == 'message_create':
+ self.process_incoming_message_create_struct(e['id'], dt, e['message_create'])
+
+ # Consumed all entries. Do we have a cursor for the next one
+ if 'next_cursor' in j:
+ cursor = j['next_cursor']
+ continue
+ else:
+ # No cursor, and we've consumed all, so we're done
+ break
+ return highdt, 0
+
+ def process_incoming_message_create_struct(self, idstr, dt, m):
+ if int(m['sender_id']) != int(self.providerconfig['accountid']):
+ # We ignore messages from ourselves
+ msgid = int(idstr)
+ if IncomingDirectMessage.objects.filter(provider_id=self.providerid, postid=msgid).exists():
+ # We've already seen this one
+ return
+
+ dm = IncomingDirectMessage(
+ provider_id=self.providerid,
+ postid=msgid,
+ time=dt,
+ sender={
+ 'id': int(m['sender_id']),
+ },
+ txt=m['message_data']['text'],
+ )
+ self.process_incoming_dm(dm)
+ dm.save()
+
+ _screen_names = {}
+
+ def get_user_screen_name(self, uid):
+ if uid not in self._screen_names:
+ r = self.tw.get('https://api.twitter.com/1.1/users/show.json', params={'user_id': uid})
+ r.raise_for_status()
+ self._screen_names[uid] = r.json()['screen_name']
+ return self._screen_names[uid]
+
+ def process_incoming_dm(self, dm):
+ for m in re_token.findall(dm.txt):
+ try:
+ reg = ConferenceRegistration.objects.get(regtoken=m)
+ # We get the screen_name so we can be friendly to the user! And when we've
+ # done that, we might as well cache it in the message info.
+
+ dm.sender['name'] = self.get_user_screen_name(dm.sender['id'])
+ reg.messaging_config = {
+ 'twitterid': dm.sender['id'],
+ 'screen_name': dm.sender['name'],
}
- })
- if r.status_code != 200:
- try:
- # Normally these errors come back as json
- ej = r.json()['errors'][0]
- return (False, ej['code'], ej['message'])
- except Exception as e:
- return (False, None, r.text)
- except Exception as e:
- return (False, None, e)
- return (True, None, None)
- def get_timeline(self, tlname, since=None):
- if since:
- sincestr = "&since={}".format(since)
+ reg.save(update_fields=['messaging_config'])
+
+ send_reg_direct_message(reg, 'Hello! This account is now configured to receive notifications for {}'.format(reg.conference))
+
+ dm.internallyprocessed = True
+ self.send_read_receipt(dm.sender['id'], dm.postid)
+ return
+ except ConferenceRegistration.DoesNotExist:
+ pass
+
+ def process_incoming_tweet_create_event(self, mp, tce):
+ d = self._parse_tweet_struct(tce)
+ store_incoming_post(mp, d)
+
+ def get_public_url(self, post):
+ return 'https://twitter.com/{}/status/{}'.format(post.author_screenname, post.statusid)
+
+ def get_attendee_string(self, token, messaging, attendeeconfig):
+ if 'screen_name' in attendeeconfig:
+ return "Your notifications will be sent to @{}.".format(attendeeconfig['screen_name']), None
else:
- sincestr = ""
- r = self.tw.get('https://api.twitter.com/1.1/statuses/{}_timeline.json?tweet_mode=extended{}'.format(tlname, sincestr))
- if r.status_code != 200:
- return None
- return r.json()
+ return 'twitter_invite.html', {
+ 'twittername': self.providerconfig['screen_name'],
+ 'twitterid': self.providerconfig['accountid'],
+ 'token': token,
+ }
+
+ def send_read_receipt(self, recipient, maxval):
+ r = self.tw.post('https://api.twitter.com/1.1/direct_messages/mark_read.json', params={
+ 'last_read_event_id': maxval,
+ 'recipient_id': recipient,
+ })
+ # Ignore errors
+
+ def check_messaging_config(self, state):
+ # Check that we can get our own account info
+ try:
+ self.get_account_info()
+ except Exception as e:
+ return False, 'Could not get own account information: {}'.format(e)
+
+ # If we have a webhook configured, make sure it's still live
+ if 'webhook' in self.providerconfig:
+ retmsg = ''
+ if 'global_webhook_checked' not in state:
+ res, env, msg = self.check_global_webhook()
+ if not res:
+ # If we failed to check the global webhook, it's now time to give up
+ return False, msg
+ state['env'] = env
+ state['global_webhook_checked'] = True
+ if msg:
+ retmsg += msg + "\n"
+
+ # Global webhook has been abled by this or previous run. Now check our subscription.
+ r = self.tw.get('https://api.twitter.com/1.1/account_activity/all/{}/subscriptions.json'.format(env))
+ if r.status_code == 204:
+ # We are subscribed!
+ return True, retmsg
+
+ # Attempt to re-subscribe
+ r = self.tw.post('https://api.twitter.com/1.1/account_activity/all/{}/subscriptions.json'.format(env), params={})
+ if r.status_code == 204:
+ return True, retmsg + 'Resubscribed user to webhook.'
+
+ return False, retmsg + 'Unable to resubscribe to webhook: {}'.format(r.status_code)
+
+ # 204 means we were resubecribed
+ return True, retmsg
+
+ # Webhook not configured, so everything is always good
+ return True, ''
+
+ def check_global_webhook(self):
+ # Check if the global webhook is here, and enabled!
+ r = self.tw.get('https://api.twitter.com/1.1/account_activity/all/webhooks.json')
+ r.raise_for_status()
+ j = r.json()
+
+ if len(j['environments']) == 0:
+ return False, None, "No environments found to enable webhook"
+ elif len(j['environments']) > 1:
+ return False, None, "More than one environment found to enable webhook, not supported"
+ env = j['environments'][0]['environment_name']
+
+ webhookurl = "{}/wh/twitter/".format(settings.SITEBASE)
+
+ for wh in j['environments'][0]['webhooks']:
+ if wh['url'] == webhookurl:
+ # Webhook is already configured! Is it valid?
+ if not wh['valid']:
+ # Re-enable the webhook
+ r = self.tw.put('https://api.twitter.com/1.1/account_activity/all/{}/webhooks/{}.json'.format(
+ env,
+ wh['id'],
+ ))
+ if r.status_code != 204:
+ return False, None, "Webhook marked invalid, and was unable to re-enable!"
+ else:
+ return True, env, "Webhook was marked invalid. Has now been re-enabled."
+
+ return True, env, ""
+
+ # No matching webhook for us, so we go create it
+ r = self.tw.post('https://api.twitter.com/1.1/account_activity/all/{}/webhooks.json'.format(env), params={
+ 'url': webhookurl,
+ })
+ jj = r.json()
+ if 'errors' in jj:
+ return False, None, "Error registering twitter webhook: {}".format(jj['errors'][0]['message'])
+ r.raise_for_status()
+ return True, env, "Global webhook has been registered"
class TwitterSetup(object):
@classmethod
def get_authorization_data(self):
- oauth = requests_oauthlib.OAuth1Session(settings.TWITTER_CLIENT, settings.TWITTER_CLIENTSECRET)
+ oauth = requests_oauthlib.OAuth1Session(get_oauth_client('https://api.twitter.com'), get_oauth_secret('https://api.twitter.com'))
fetch_response = oauth.fetch_request_token('https://api.twitter.com/oauth/request_token')
auth_url = oauth.authorization_url('https://api.twitter.com/oauth/authorize')
@@ -117,11 +509,80 @@ class TwitterSetup(object):
@classmethod
def authorize(self, ownerkey, ownersecret, pincode):
- oauth = requests_oauthlib.OAuth1Session(settings.TWITTER_CLIENT,
- settings.TWITTER_CLIENTSECRET,
+ oauth = requests_oauthlib.OAuth1Session(get_oauth_client('https://api.twitter.com'),
+ get_oauth_secret('https://api.twitter.com'),
resource_owner_key=ownerkey,
resource_owner_secret=ownersecret,
verifier=pincode)
tokens = oauth.fetch_access_token('https://api.twitter.com/oauth/access_token')
return tokens
+
+
+# Twitter needs a special webhook URL since it's global and not per provider
+def process_twitter_webhook(request):
+ if 'crc_token' in request.GET:
+ # This is a pingback from twitter to see if we are alive
+ d = hmac.new(
+ bytes(get_oauth_secret('https://api.twitter.com'), 'utf-8'),
+ msg=bytes(request.GET['crc_token'], 'utf-8'),
+ digestmod=hashlib.sha256
+ ).digest()
+ return HttpResponse(json.dumps({
+ 'response_token': 'sha256={}'.format(base64.b64encode(d).decode('utf8')),
+ }), content_type='application/json')
+
+ # Validate the signature
+ if 'HTTP_X_TWITTER_WEBHOOKS_SIGNATURE' not in request.META:
+ print("Twitter webhooks signature missing")
+ return HttpResponse('Webhooks signature missing', status=400)
+
+ if not request.META['HTTP_X_TWITTER_WEBHOOKS_SIGNATURE'].startswith('sha256='):
+ print("Invalid signature, not starting with sha256=: {}".format(request.META['HTTP_X_TWITTER_WEBHOOKS_SIGNATURE']))
+ return HttpResponse('Webhooks signature starts incorrectly', status=400)
+
+ sig = base64.b64decode(request.META['HTTP_X_TWITTER_WEBHOOKS_SIGNATURE'][7:])
+ d = hmac.new(
+ bytes(get_oauth_secret('https://api.twitter.com'), 'utf-8'),
+ msg=request.body,
+ digestmod=hashlib.sha256,
+ ).digest()
+ if not hmac.compare_digest(sig, d):
+ return HttpResponse('Webhooks signature is wrong', status=400)
+
+ # Load and parse the hook message
+ j = json.loads(request.body.decode('utf-8'))
+
+ _cached_messaging = {}
+
+ def _get_messaging_from_uid(uid):
+ if uid not in _cached_messaging:
+ try:
+ mp = MessagingProvider.objects.get(
+ classname='postgresqleu.util.messaging.twitter.Twitter',
+ config__accountid=uid,
+ series__isnull=False,
+ )
+ _cached_messaging[uid] = (mp, get_messaging(mp))
+ except MessagingProvider.DoesNotExist:
+ return None
+ return _cached_messaging[uid]
+
+ for dme in j.get('direct_message_events', []):
+ with transaction.atomic():
+ if 'message_create' in dme:
+ # Incoming direct message
+ recipient = int(dme['message_create']['target']['recipient_id'])
+ sender = int(dme['message_create']['sender_id'])
+ mp, tw = _get_messaging_from_uid(recipient)
+ if tw:
+ dt = datetime.fromtimestamp(int(dme['created_timestamp']) / 1000, tz=django.utils.timezone.utc)
+ tw.process_incoming_message_create_struct(dme['id'], dt, dme['message_create'])
+
+ for tce in j.get('tweet_create_events', []):
+ with transaction.atomic():
+ recipient = int(j['for_user_id'])
+ mp, tw = _get_messaging_from_uid(recipient)
+ tw.process_incoming_tweet_create_event(mp, tce)
+
+ return HttpResponse("OK")
diff --git a/postgresqleu/util/messaging/util.py b/postgresqleu/util/messaging/util.py
new file mode 100644
index 00000000..4ae6a7dc
--- /dev/null
+++ b/postgresqleu/util/messaging/util.py
@@ -0,0 +1,164 @@
+from django.utils import timezone
+
+from datetime import timedelta
+import re
+
+from postgresqleu.confreg.models import NotificationQueue
+from postgresqleu.confreg.models import ConferenceIncomingTweet, ConferenceIncomingTweetMedia
+from postgresqleu.confreg.models import ConferenceTweetQueue
+from postgresqleu.util.db import exec_no_result
+
+
+class _Notifier(object):
+ def __enter__(self):
+ self.notified = False
+ return self
+
+ def notify(self):
+ self.notified = True
+
+ def __exit__(self, *args):
+ if self.notified:
+ exec_no_result('NOTIFY pgeu_notification')
+
+
+def send_reg_direct_message(reg, msg, expiry=timedelta(hours=1)):
+ with _Notifier() as n:
+ if reg.messaging and reg.messaging.provider.active:
+ NotificationQueue(
+ time=timezone.now(),
+ expires=timezone.now() + expiry,
+ messaging=reg.messaging,
+ reg=reg,
+ channel=None,
+ msg=msg,
+ ).save()
+ n.notify()
+
+
+def send_private_broadcast(conference, msg, expiry=timedelta(hours=1)):
+ with _Notifier() as n:
+ for messaging in conference.conferencemessaging_set.filter(privatebcast=True, provider__active=True):
+ NotificationQueue(
+ time=timezone.now(),
+ expires=timezone.now() + expiry,
+ messaging=messaging,
+ reg=None,
+ channel="privatebcast",
+ msg=msg,
+ ).save()
+ n.notify()
+
+
+def send_org_notification(conference, msg, expiry=timedelta(hours=1)):
+ with _Notifier() as n:
+ for messaging in conference.conferencemessaging_set.filter(orgnotification=True, provider__active=True):
+ NotificationQueue(
+ time=timezone.now(),
+ expires=timezone.now() + expiry,
+ messaging=messaging,
+ reg=None,
+ channel="orgnotification",
+ msg=msg,
+ ).save()
+ n.notify()
+
+
+def send_channel_message(messaging, channel, msg, expiry=timedelta(hours=1)):
+ with _Notifier() as n:
+ NotificationQueue(
+ time=timezone.now(),
+ expires=timezone.now() + expiry,
+ messaging=messaging,
+ reg=None,
+ channel=channel,
+ msg=msg,
+ ).save()
+ n.notify()
+
+
+def store_incoming_post(provider, post):
+ # Have we already seen this post?
+ if ConferenceIncomingTweet.objects.filter(provider=provider, statusid=post['id']).exists():
+ return False
+
+ # Is this one of our own outgoing posts?
+ if ConferenceTweetQueue.objects.filter(postids__contains={post['id']: provider.id}).exists():
+ return False
+
+ i = ConferenceIncomingTweet(
+ conference=provider.route_incoming,
+ provider=provider,
+ statusid=post['id'],
+ created=post['datetime'],
+ text=post['text'],
+ replyto_statusid=post['replytoid'],
+ author_name=post['author']['name'],
+ author_screenname=post['author']['username'],
+ author_id=post['author']['id'],
+ author_image_url=post['author']['imageurl'],
+ )
+ if post.get('quoted', None):
+ i.quoted_statusid = post['quoted']['id']
+ i.quoted_text = post['quoted']['text']
+ i.quoted_permalink = post['quoted']['permalink']
+ i.save()
+ for seq, m in enumerate(post['media']):
+ ConferenceIncomingTweetMedia(incomingtweet=i,
+ sequence=seq,
+ mediaurl=m).save()
+
+ return True
+
+
+# This does not appear to match everything in any shape or form, but we are only
+# using it against URLs that we have typed in ourselves, so it should be easy
+# enough.
+# Should be in sync with regexp in js/admin.js
+_re_urlmatcher = re.compile(r'\bhttps?://\S+', re.I)
+
+# This is currently the value for Twitter and the default for Mastodon, so just
+# use that globally for now.
+_url_shortened_len = 23
+_url_counts_as_characters = "https://short.url/{}".format((_url_shortened_len - len("https://short.url/")) * 'x')
+
+
+def get_shortened_post_length(txt):
+ return len(_re_urlmatcher.sub(_url_counts_as_characters, txt))
+
+
+# Truncate a text, taking into account URL shorterners. WIll not truncate in the middle of an URL,
+# but right now will happily truncate in the middle of a word (room for improvement!)
+def truncate_shortened_post(txt, maxlen):
+ matches = list(_re_urlmatcher.finditer(txt))
+
+ if not matches:
+ # Not a single url, so just truncate
+ return txt[:maxlen]
+
+ firststart, firstend = matches[0].span()
+ if firstend > maxlen:
+ # We hit the size limit before the url or in the middle of it, so skip the whole url
+ return txt[:firststart]
+
+ inlen = firstend
+ outlen = firststart + _url_shortened_len
+ for i, curr in enumerate(matches[1:]):
+ prevstart, prevend = matches[i].span()
+ currstart, currend = curr.span()
+
+ betweenlen = currstart - prevend
+ if outlen + betweenlen > maxlen:
+ # The limit was hit in the text between urls
+ left = maxlen - outlen
+ return txt[:inlen + (maxlen - outlen)]
+ if outlen + betweenlen + _url_shortened_len > maxlen:
+ # The limit was hit in the middle of this URL, so include all the text
+ # up to it, but skip the url.
+ return txt[:inlen + betweenlen]
+
+ # The whole URL fit
+ inlen += betweenlen + currend - currstart
+ outlen += betweenlen + _url_shortened_len
+
+ return txt[:inlen + (maxlen - outlen)]
diff --git a/postgresqleu/util/migrations/0003_oauthapps.py b/postgresqleu/util/migrations/0003_oauthapps.py
new file mode 100644
index 00000000..090d6b00
--- /dev/null
+++ b/postgresqleu/util/migrations/0003_oauthapps.py
@@ -0,0 +1,37 @@
+# Generated by Django 2.2.11 on 2020-04-20 20:26
+
+from django.db import migrations, models
+from django.conf import settings
+
+
+def migrate_twitter(apps, schema_editor):
+ if getattr(settings, 'TWITTER_CLIENT', None):
+ apps.get_model('util', 'OAuthApplication')(name='twitter', baseurl='https://api.twitter.com', client=settings.TWITTER_CLIENT, secret=settings.TWITTER_CLIENTSECRET).save()
+
+
+def unmigrate_twitter(apps, schema_editor):
+ apps.get_model('util', 'OAuthApplication').objects.filter(name='twitter').delete()
+
+
+class Migration(migrations.Migration):
+
+ dependencies = [
+ ('util', '0002_bytea_smarter_storage'),
+ ]
+
+ operations = [
+ migrations.CreateModel(
+ name='OAuthApplication',
+ fields=[
+ ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
+ ('name', models.CharField(max_length=100, null=False, blank=False)),
+ ('baseurl', models.URLField(max_length=100, null=False, blank=False, unique=True, verbose_name='Base URL')),
+ ('client', models.CharField(max_length=200)),
+ ('secret', models.CharField(max_length=200)),
+ ],
+ options={
+ 'verbose_name': 'OAuth Application',
+ },
+ ),
+ migrations.RunPython(migrate_twitter, unmigrate_twitter),
+ ]
diff --git a/postgresqleu/util/models.py b/postgresqleu/util/models.py
index 53caac67..87bccd5f 100644
--- a/postgresqleu/util/models.py
+++ b/postgresqleu/util/models.py
@@ -1,6 +1,8 @@
# Some very simple models used by utilities
from django.db import models
+from .oauthapps import oauth_application_choices
+
class Storage(models.Model):
key = models.CharField(max_length=16, null=False, blank=False)
@@ -11,3 +13,16 @@ class Storage(models.Model):
unique_together = (
('key', 'storageid'),
)
+
+
+class OAuthApplication(models.Model):
+ name = models.CharField(max_length=100, null=False, blank=False)
+ baseurl = models.URLField(max_length=100, null=False, blank=False, unique=True, verbose_name='Base URL')
+ client = models.CharField(max_length=200, null=False, blank=False)
+ secret = models.CharField(max_length=200, null=False, blank=False)
+
+ def __str__(self):
+ return self.name
+
+ class Meta:
+ verbose_name = 'OAuth Application'
diff --git a/postgresqleu/util/monitor.py b/postgresqleu/util/monitor.py
index 4c363a74..0c3270d1 100644
--- a/postgresqleu/util/monitor.py
+++ b/postgresqleu/util/monitor.py
@@ -61,9 +61,18 @@ def nagios(request):
if exec_to_scalar("SELECT NOT EXISTS (SELECT 1 FROM pg_stat_activity WHERE application_name='pgeu scheduled job runner' AND datname=current_database())"):
errors.append('No job scheduler connected to database')
+ # Check that there are no outbound emails in the queue
if exec_to_scalar("SELECT EXISTS (SELECT 1 FROM mailqueue_queuedmail WHERE sendtime < now() - '2 minutes'::interval)"):
errors.append('Unsent emails are present in the outbound mailqueue')
+ # Check that there are no outbound notifications in the queue
+ if exec_to_scalar("SELECT EXISTS (SELECT 1 FROM confreg_notificationqueue WHERE time < now() - '10 minutes'::interval)"):
+ errors.append('Unsent notifications are present in the outbound queue')
+
+ # Check that there are no outbound social media broadcasts in the queue
+ if exec_to_scalar("SELECT EXISTS (SELECT 1 FROM confreg_conferencetweetqueue tq WHERE datetime < now() - '10 minutes'::interval AND approved AND EXISTS (SELECT 1 FROM confreg_conferencetweetqueue_remainingtosend rts WHERE rts.conferencetweetqueue_id=tq.id))"):
+ errors.append('Unsent social media broadcasts are present in the outbound queue')
+
# Check for email addresses not configured
errors.extend(check_all_emails(['DEFAULT_EMAIL', 'INVOICE_SENDER_EMAIL', 'INVOICE_NOTIFICATION_RECEIVER', 'SCHEDULED_JOBS_EMAIL', 'SCHEDULED_JOBS_EMAIL_SENDER', 'INVOICE_NOTIFICATION_RECEIVER', 'TREASURER_EMAIL', 'SERVER_EMAIL']))
diff --git a/postgresqleu/util/oauthapps.py b/postgresqleu/util/oauthapps.py
new file mode 100644
index 00000000..89f77b8b
--- /dev/null
+++ b/postgresqleu/util/oauthapps.py
@@ -0,0 +1,79 @@
+from django.apps import apps
+from django.conf import settings
+from django.db.models.signals import post_save, post_delete
+
+import requests
+
+# Wrapper that caches oauth information, since it very rearely updates
+
+
+class OAuthProviders(object):
+ def __init__(self):
+ self._providers = None
+
+ @property
+ def providers(self):
+ if not self._providers:
+ mod = apps.get_app_config('util').get_model('OAuthApplication')
+ self._providers = {a.baseurl: a for a in mod.objects.all()}
+ return self._providers
+
+ def invalidate_cache(self):
+ self._providers = None
+
+
+providers = OAuthProviders()
+
+
+def get_oauth_client(baseurl):
+ return providers.providers[baseurl].client
+
+
+def get_oauth_secret(baseurl):
+ return providers.providers[baseurl].secret
+
+
+def has_oauth_data(baseurl):
+ return baseurl in providers.providers
+
+
+def _mastodon_oauth_maker(baseurl):
+ # Mastodon allows automatic creation of apps
+ r = requests.post('{}/api/v1/apps'.format(baseurl), data={
+ 'client_name': settings.ORG_SHORTNAME,
+ 'redirect_uris': 'urn:ietf:wg:oauth:2.0:oob',
+ 'scopes': 'read write:statuses write:media',
+ })
+ r.raise_for_status()
+ j = r.json()
+ return (j['client_id'], j['client_secret'])
+
+
+_oauth_application_choices = {
+ 'mastodon': ('https://mastodon.social', 0, _mastodon_oauth_maker),
+ 'twitter': ('https://api.twitter.com', 1, None),
+}
+
+
+def oauth_application_choices():
+ for n, m in _oauth_application_choices.items():
+ # If the provider is "locked" to a baseurl and that baseurl is already added,
+ # then don't show it in the list.
+ if not (m[1] and m[0] in providers.providers):
+ yield (n, n, m[0], m[1])
+
+
+def oauth_application_create(app, baseurl):
+ if _oauth_application_choices.get(app, None):
+ if _oauth_application_choices[app][2]:
+ return _oauth_application_choices[app][2](baseurl)
+ return (None, None)
+
+
+def _invalidate_cache(**kwargs):
+ providers.invalidate_cache()
+
+
+def connect_oauth_signals():
+ post_save.connect(_invalidate_cache, sender=apps.get_app_config('util').get_model('OAuthApplication'))
+ post_delete.connect(_invalidate_cache, sender=apps.get_app_config('util').get_model('OAuthApplication'))
diff --git a/postgresqleu/util/templates/forms/widgets/linkforcode_widget.html b/postgresqleu/util/templates/forms/widgets/linkforcode_widget.html
new file mode 100644
index 00000000..b0f763c0
--- /dev/null
+++ b/postgresqleu/util/templates/forms/widgets/linkforcode_widget.html
@@ -0,0 +1,8 @@
+<div>
+ <p>
+ First click the button below and log in to the provider. Once that is done,
+ an authorizastion will be provided -- paste this code in the field below and save.
+ </p>
+ <a class="btn btn-default" target="_blank" href="{{authurl}}">Initiate login</a><br/><br/>
+ <input type="text" name="{{widget.name}}" placeholder="Enter authorization code" {% include "django/forms/widgets/attrs.html" %}/>
+</div>
diff --git a/postgresqleu/util/views.py b/postgresqleu/util/views.py
new file mode 100644
index 00000000..521fc331
--- /dev/null
+++ b/postgresqleu/util/views.py
@@ -0,0 +1,19 @@
+from django.views.decorators.csrf import csrf_exempt
+from django.shortcuts import get_object_or_404
+
+from postgresqleu.confreg.models import MessagingProvider
+from postgresqleu.util.messaging import get_messaging
+from postgresqleu.util.messaging.twitter import process_twitter_webhook
+
+
+@csrf_exempt
+def messaging_webhook(request, providerid, token):
+ provider = get_object_or_404(MessagingProvider, id=providerid, config__webhook__token=token)
+ impl = get_messaging(provider)
+ return impl.process_webhook(request)
+
+
+# Twitter needs a special webhook URL since it's global and not per provider
+@csrf_exempt
+def twitter_webhook(request):
+ return process_twitter_webhook(request)
diff --git a/postgresqleu/util/widgets.py b/postgresqleu/util/widgets.py
index 1a390b9d..65aad0a1 100644
--- a/postgresqleu/util/widgets.py
+++ b/postgresqleu/util/widgets.py
@@ -138,6 +138,15 @@ class Bootstrap4HtmlDateTimeInput(forms.DateTimeInput):
template_name = 'forms/widgets/bs4_datetime_input.html'
+class LinkForCodeWidget(TextInput):
+ template_name = 'forms/widgets/linkforcode_widget.html'
+
+ def get_context(self, name, value, attrs):
+ d = super().get_context(name, value, attrs)
+ d['authurl'] = self.authurl
+ return d
+
+
class SubmitButtonWidget(forms.Widget):
template_name = 'forms/widgets/submitbutton_widget.html'