from django.contrib import admin
from gitadmin.adm.models import *
+
class RepositoryPermissionInline(admin.TabularInline):
model = RepositoryPermission
+
class RepositoryAdmin(admin.ModelAdmin):
- list_display = ('name','approved', )
+ list_display = ('name', 'approved', )
ordering = ('approved', 'name', )
- inlines = [ RepositoryPermissionInline, ]
+ inlines = [RepositoryPermissionInline, ]
+
admin.site.register(Repository, RepositoryAdmin)
admin.site.register(RemoteRepository)
-
from gitadmin.adm.models import Repository
+
class RepositoryForm(ModelForm):
- initialclone = forms.RegexField(r'^(git://.+/.+|[^:]+)$',max_length=256,required=False,
- label="Initial clone",
-# help_text='<a href="javascript:popupRepoList()">Select repository</a>')
- help_text='Input a valid local repository name or git:// URL')
+ initialclone = forms.RegexField(r'^(git://.+/.+|[^:]+)$', max_length=256, required=False,
+ label="Initial clone",
+ help_text='Input a valid local repository name or git:// URL')
class Meta:
model = Repository
- exclude = ('repoid','name', )
+ exclude = ('repoid', 'name', )
class ConfirmDeleteForm(Form):
- confirmed = forms.BooleanField(required=True,label="Confirm deleting the repository")
-
+ confirmed = forms.BooleanField(required=True, label="Confirm deleting the repository")
class Meta:
db_table = 'remoterepositorytypes'
+
class RemoteRepository(models.Model):
repotype = models.ForeignKey(RemoteRepositoryType, null=False)
- remoteurl = models.CharField(max_length=256, blank=False) # rsync or cvs
+ remoteurl = models.CharField(max_length=256, blank=False) # rsync or cvs
remotemodule = models.CharField(max_length=32, blank=False)
lastsynced = models.DateTimeField(null=False, default=datetime.datetime.now)
db_table = 'remoterepositories'
verbose_name_plural = 'remote repositories'
+
class Repository(models.Model):
repoid = models.AutoField(blank=False, primary_key=True)
name = models.CharField(max_length=64, blank=False, unique=True)
description = models.TextField(max_length=1024, blank=False)
- anonymous = models.BooleanField(blank=False,default=False,verbose_name='Enable anonymous access')
- web = models.BooleanField(blank=False,default=False,verbose_name='Enable gitweb access')
+ anonymous = models.BooleanField(blank=False, default=False, verbose_name='Enable anonymous access')
+ web = models.BooleanField(blank=False, default=False, verbose_name='Enable gitweb access')
approved = models.BooleanField(blank=False, default=False)
tabwidth = models.IntegerField(default=8, null=False)
initialclone = models.CharField(max_length=256, blank=True, null=True)
remoterepository = models.ForeignKey(RemoteRepository, null=True, blank=True,
- verbose_name='Remote repository')
+ verbose_name='Remote repository')
def ValidateOwnerPermissions(self, user):
if self.repositorypermission_set.filter(userid=user.username, level=2).count() != 1:
db_table = 'repositories'
verbose_name_plural = 'repositories'
+
class RepositoryPermission(models.Model):
repository = models.ForeignKey(Repository, db_column='repository')
userid = models.CharField(max_length=255, blank=False)
@property
def owner(self):
return (self.level > 1)
-
+
def __str__(self):
return "%s (%s)" % (self.userid, self.__permstr())
elif self.level == 1:
return "Write"
return "Read"
-
+
class Meta:
db_table = 'repository_permissions'
- unique_together = (('repository','userid'),)
+ unique_together = (('repository', 'userid'), )
class GitUser(models.Model):
class Meta:
db_table = 'git_users'
-
url(r'^new/$', gitadmin.adm.views.newrepo),
url(r'^help/$', gitadmin.adm.views.help),
-# Log in/out
+ # Log in/out
url(r'^login/$', gitadmin.auth.login),
url(r'^logout/$', gitadmin.auth.logout),
url(r'^auth_receive/$', gitadmin.auth.auth_receive),
from gitadmin.adm.models import *
from gitadmin.adm.forms import *
+
# Utility classes
class FormIsNotValid(Exception):
pass
-# Utility functions
+# Utility functions
def _MissingSshkey(user):
if not user.is_authenticated():
return False
except:
return True
+
def context_add(request):
return {
'missing_sshkey': _MissingSshkey(request.user),
}
-# Views
+# Views
@login_required
def index(request):
- repos = Repository.objects.extra(
+ repos = Repository.objects.extra(
where=["remoterepository_id IS NULL AND repoid IN (SELECT repository FROM repository_permissions where userid=%s)"],
- select={'perm':"SELECT CASE WHEN level>1 THEN 't'::boolean ELSE 'f'::boolean END FROM repository_permissions WHERE userid=%s AND repository_permissions.repository=repositories.repoid"},
+ select={'perm': "SELECT CASE WHEN level>1 THEN 't'::boolean ELSE 'f'::boolean END FROM repository_permissions WHERE userid=%s AND repository_permissions.repository=repositories.repoid"},
params=[request.user.username], select_params=[request.user.username]).order_by('name')
return render(request, 'index.html', {
- 'repos': repos,
- })
+ 'repos': repos,
+ })
+
def help(request):
return render(request, 'help.html')
+
@login_required
@transaction.atomic
def editrepo(request, repoid):
savedat = None
form = None
- formfactory = inlineformset_factory(Repository, RepositoryPermission, extra=1, fields=['userid','level'])
+ formfactory = inlineformset_factory(Repository, RepositoryPermission, extra=1, fields=['userid', 'level'])
if request.method == "POST":
form = RepositoryForm(data=request.POST, instance=repo)
if form.is_valid() and formset.is_valid():
try:
# Manually validate the repository entered if there is one to clone
- if form.cleaned_data.has_key('initialclone') and form.cleaned_data['initialclone']:
+ if 'initialclone' in form.cleaned_data and form.cleaned_data['initialclone']:
if form.cleaned_data['initialclone'].startswith('git://'):
# Validate hostnames and stuff?
pass
perm = repo.repositorypermission_set.all()
return render(request, 'repoview.html', {
- 'form': form,
- 'formset': formset,
- 'repo': repo,
- 'repoperm': perm,
- 'form_saved_at': savedat,
- })
+ 'form': form,
+ 'formset': formset,
+ 'repo': repo,
+ 'repoperm': perm,
+ 'form_saved_at': savedat,
+ })
@login_required
perm.save()
return HttpResponseRedirect('../repo/%s/' % repo.repoid)
-
import datetime
+
class AllReposFeed(Feed):
title = "pggit - all repositories"
link = "/"
def items(self):
return Repository.objects.all().order_by('repoid')
-
+
def item_link(self, repo):
return "https://git.postgresql.org/gitweb/%s" % repo
'ENGINE': 'django.db.backends.postgresql_psycopg2',
'NAME': 'pggit',
'USER': 'pggit',
- }
}
+}
# Local time zone for this installation. Choices can be found here:
# http://en.wikipedia.org/wiki/List_of_tz_zones_by_name
LOGIN_URL = '/adm/login'
-AUTH_CONNECTION_STRING="override authentication connection string in the local settings file"
+AUTH_CONNECTION_STRING = "override authentication connection string in the local settings file"
# If there is a local_settings.py, let it override our settings
try:
from local_settings import *
except:
pass
-
# Feeds
feeds = {
- 'all': AllReposFeed,
+ 'all': AllReposFeed,
}
# Uncomment the next line to enable the admin:
url(r'^adm/admin/', include(admin.site.urls)),
-# Feeds
+ # Feeds
url(r'^feeds/all/$', AllReposFeed()),
url(r'adm/', include('gitadmin.adm.urls')),
-# vim: ai ts=4 sts=4 sw=4
#!/usr/bin/env python
"""
import filecmp
from util.LockFile import LockFile
+
class TabRemover(object):
"""
Trivial class that removes leading tabs from each row of a file
curs = self.db.cursor()
curs.execute("SELECT userid,sshkey FROM git_users ORDER BY userid")
f = open("%s/.ssh/authorized_keys.tmp" % self.conf.get("paths", "githome"), "w")
- for userid,sshkey in curs:
+ for userid, sshkey in curs:
for key in sshkey.split("\n"):
f.write("command=\"%s %s\",no-port-forwarding,no-X11-forwarding,no-agent-forwarding,no-pty %s\n" % (self.conf.get("paths", "pggit"), userid, key))
f.close()
# it is, we need to also reload lighttpd at this point.
if os.path.isfile(self.conf.get("paths", "lighttpdconf")) and filecmp.cmp(
self.conf.get("paths", "lighttpdconf"),
- "%s.tmp" % self.conf.get("paths", "lighttpdconf")):
+ "%s.tmp" % self.conf.get("paths", "lighttpdconf")):
# No changes, so just get rid of the temp file
os.remove("%s.tmp" % self.conf.get("paths", "lighttpdconf"))
else:
print "Removing container directory %s" % rootpath
try:
os.rmdir("%s" % rootpath)
- except Exception,e:
+ except Exception, e:
print "FAIL: unable to remove container directory: %s" % e
return
# Ends with '.git', meaning it's a repository. Let's figure out if it should
# be here.
d = d[:-4]
- if not allrepos.has_key(os.path.join(relativepath, d)[1:]):
+ if os.path.join(relativepath, d)[1:] not in allrepos:
print "Removing repository %s" % os.path.join(relativepath, d)
try:
shutil.rmtree("%s.git" % os.path.join(rootpath, d))
- except Exception,e:
+ except Exception, e:
print "FAIL: unable to remove directory: %s" % e
c = ConfigParser.ConfigParser()
c.read("pggit.settings")
lock = LockFile("%s/repos/.gitdump_interlock" % c.get("paths", "githome"))
- db = psycopg2.connect(c.get('database','db'))
+ db = psycopg2.connect(c.get('database', 'db'))
AuthorizedKeysDumper(db, c).dump()
-
import datetime
from Crypto.Cipher import AES
+
class KeySynchronizer(object):
def __init__(self, db):
self.db = db
self.db.commit()
+
if __name__ == "__main__":
c = ConfigParser.ConfigParser()
c.read("pggit.settings")
- db = psycopg2.connect(c.get('database','db'))
+ db = psycopg2.connect(c.get('database', 'db'))
KeySynchronizer(db).sync()
-
ALLOWED_COMMANDS = ('git-upload-pack', 'git-receive-pack')
WRITE_COMMANDS = ('git-receive-pack')
+
class Logger(object):
def __init__(self, cfg):
self.user = "Unknown"
- self.logfile = cfg.get('paths','logfile')
+ self.logfile = cfg.get('paths', 'logfile')
def log(self, message):
- f = open(self.logfile,"a")
+ f = open(self.logfile, "a")
f.write("%s: (%s) %s" % (datetime.now(), self.user, message))
f.write("\n")
f.close()
if user:
self.user = user
+
class InternalException(Exception):
pass
+
class PgGit(object):
user = None
command = None
def __init__(self, cfg):
self.cfg = cfg
self.logger = Logger(cfg)
- self.repoprefix = "%s/repos/" % cfg.get('paths','githome')
+ self.repoprefix = "%s/repos/" % cfg.get('paths', 'githome')
if cfg.has_option('trigger', 'pushtrigger'):
pieces = cfg.get('trigger', 'pushtrigger').split('.')
modname = '.'.join(pieces[:-1])
env = os.environ.get('SSH_ORIGINAL_COMMAND', None)
if not env:
raise InternalException("No SSH_ORIGINAL_COMMAND present!")
-
+
# env contains "git-<command> <argument>" or "git <command> <argument>"
command, args = env.split(None, 1)
if command == "git":
- subcommand, args = args.split(None,1)
+ subcommand, args = args.split(None, 1)
command = "git-%s" % subcommand
- if not command in ALLOWED_COMMANDS:
+ if command not in ALLOWED_COMMANDS:
raise InternalException("Command '%s' not allowed" % command)
self.command = command
def check_permissions(self):
writeperm = False
- db = psycopg2.connect(self.cfg.get('database','db'))
+ db = psycopg2.connect(self.cfg.get('database', 'db'))
curs = db.cursor()
curs.execute("SELECT CASE WHEN remoterepository_id IS NULL THEN level ELSE 0 END FROM repository_permissions INNER JOIN repositories ON repoid=repository WHERE userid=%s AND name=%s",
- (self.user, self.subpath))
+ (self.user, self.subpath))
+
try:
writeperm = (curs.fetchone()[0] > 0)
except:
if self.command in WRITE_COMMANDS:
if not writeperm:
raise InternalException("Write permission denied on repository for user %s" % self.user)
-
def run_command(self):
self.logger.log("Running \"git shell %s %s\"" % (self.command, "'%s'" % self.path))
- subprocess.call(['git', 'shell', '-c', "%s %s" % (self.command, "'%s'" % self.path)])
+ subprocess.call(['git', 'shell', '-c', "%s %s" % (self.command, "'%s'" % self.path)])
def run(self):
try:
sys.stderr.write("An unhandled exception occurred on the server\n")
sys.exit(1)
+
if __name__ == "__main__":
c = ConfigParser.ConfigParser()
c.read("%s/pggit.settings" % os.path.abspath(sys.path[0]))
PgGit(c).run()
-
import ConfigParser
from util.LockFile import LockFile
+
class RepoSync(object):
def __init__(self, db, conf):
self.db = db
INNER JOIN remoterepositories ON repositories.remoterepository_id=remoterepositories.id
WHERE approved ORDER BY name
""")
- for id,name,repotype,remoteurl,remotemodule,lastsynced in curs:
- if name.find('/')>0:
+ for id, name, repotype, remoteurl, remotemodule, lastsynced in curs:
+ if name.find('/') > 0:
print "Subdirectories not supported when synchronizing"
continue
s = SyncMethod.get(repotype)
- s.init(self.conf, name,remoteurl,remotemodule)
+ s.init(self.conf, name, remoteurl, remotemodule)
s.sync()
s.finalize()
c2 = self.db.cursor()
c2.execute("UPDATE remoterepositories SET lastsynced=CURRENT_TIMESTAMP WHERE id=%s", (id, ))
self.db.commit()
-
+
class Callable:
def __init__(self, anycallable):
self.__call__ = anycallable
+
class SyncMethod(object):
def get(repotype):
if repotype == "cvs":
return SyncMethodGit()
raise Exception("No implementation for repository type %s found" % repotype)
get = Callable(get)
-
+
def __init__(self):
self.name = self.remoteurl = self.remotemodule = None
-
+
def init(self, conf, name, remoteurl, remotemodule):
self.conf = conf
self.name = name
self.remoteurl = remoteurl
self.remotemodule = remotemodule
self.repopath = "%s/repos/%s.git" % (self.conf.get("paths", "githome"), self.name)
-
+
os.environ['GIT_DIR'] = self.repopath
-
+
def sync(self):
if not os.path.isdir(self.repopath):
self.initialsync()
def normalsync(self):
raise NotImplementedError("sync method not implemented")
-
+
def finalize(self):
savedir = os.getcwd()
os.chdir(self.repopath)
self.system("git update-server-info")
os.chdir(savedir)
- if os.environ.has_key('GIT_DIR'):
+ if 'GIT_DIR' in os.environ:
del os.environ['GIT_DIR']
def system(self, cmd):
raise Exception("Failed to execute \"%s\": %s" % (cmd, r))
return 0
-
+
class SyncMethodCvs(SyncMethod):
-# Synchronize using "git cvsimport", which deals with remove CVS repositories
-# but apparantly does not deal with branches very well.
+ # Synchronize using "git cvsimport", which deals with remove CVS repositories
+ # but apparantly does not deal with branches very well.
def initialsync(self):
# git cvsimport is evil. The first time, we need to let it create
# a non-bare repository. Otherwise it creates one inside the bare one
"%s/.git/%s" % (self.repopath, n),
"%s/%s" % (self.repopath, n)
)
- os.rmdir("%s/.git" % self.repopath)
+ os.rmdir("%s/.git" % self.repopath)
def normalsync(self):
# Not initial sync, so just do a sync
self.repopath,
# cvs module
self.remotemodule,
- ))
+ ))
class SyncMethodRsyncCvs(SyncMethod):
-# Rsync a cvs repository, and then use fromcvs to convert it to git.
-# This is really only used for the main repository
+ # Rsync a cvs repository, and then use fromcvs to convert it to git.
+ # This is really only used for the main repository
def initialsync(self):
# We really only use this for the main repo, so way too lazy to set
# this up now. Do it manually ;-)
# rsync process will remove it again, so we need to redo this after
# each time we rsync.
if os.path.isfile("%s/authormap/%s" % (
- self.conf.get("paths", "githome"), self.name)):
+ self.conf.get("paths", "githome"), self.name)):
shutil.copyfile(
"%s/authormap/%s" % (
self.conf.get("paths", "githome"), self.name),
"%s/CVSROOT/authormap" % rsyncpath)
-
# Now perform Git Import Magic (TM)
savedir = os.getcwd()
os.chdir("%s/sw/fromcvs" % self.conf.get("paths", "githome"))
-
+
# Perform Magic!
self.system("ruby togit.rb %s %s %s" % (
rsyncpath,
self.remotemodule,
self.repopath,
))
-
+
# Repack changes
os.chdir(self.repopath)
self.system("git repack -f -d")
-
+
# Restore working dir
- os.chdir(savedir)
+ os.chdir(savedir)
class SyncMethodGit(SyncMethod):
-# Sync with a remote git repository.
+ # Sync with a remote git repository.
def initialsync(self):
self.system("git clone --no-checkout --bare %s %s" % (
self.remoteurl,
self.system("git fetch -u %s master:master" % self.remoteurl)
os.chdir(savedir)
+
if __name__ == "__main__":
c = ConfigParser.ConfigParser()
c.read("pggit.settings")
lock = LockFile("%s/repos/.reposync_interlock" % c.get("paths", "githome"))
- db = psycopg2.connect(c.get('database','db'))
+ db = psycopg2.connect(c.get('database', 'db'))
RepoSync(db, c).sync()
-
import httplib
+
class varnishpurger(object):
"""
Push trigger that purges repositories from varnish. The idea being that
print "Varnish purge failed, website may become slightly out of date"
return
-
def _internal_purge(self, url):
try:
conn = httplib.HTTPConnection(self.host)
import os
+
class LockFile:
def __init__(self, filename):
self.filename = None
if os.path.isfile(filename):
- raise Exception("Script is already running (says interlock file %s)" %
- filename)
+ raise Exception("Script is already running (says interlock file %s)" % filename)
self.filename = filename
f = open(self.filename, "w")
f.writelines(('Interlock file', ))
def __del__(self):
if self.filename:
os.remove(self.filename)
-
-