Split code (forms/utils/views)

This commit is contained in:
Mathieu Pasquet 2021-04-18 12:20:04 +02:00
parent 1cbcaa7945
commit 803def63c3
5 changed files with 263 additions and 205 deletions

117
jabberfr/forms.py Normal file
View File

@ -0,0 +1,117 @@
from django import forms
from django.utils.safestring import mark_safe
from django.core.validators import RegexValidator, URLValidator
from jabberfr.validators import validate_jid_userpart
USERNAME_HELP = """
<ul>
<li>Ceci est le <em>nom d'utilisateur</em> que vous utiliserez pour vous identifier sur votre compte.</li>
<li>Le nom d'utilisateur est insensible à la casse, <em>votrenom</em> est idententique à <em>VotreNom</em>.</li>
<li>Un nom d'utilisateur ne peut contenir <span style="text-decoration: underline;">aucun</span> de ces caractères: @ : ' " &lt; &gt; &amp;</li>
<li>Certains serveurs limitent la taille du nom d'utilisateur à 16 caractères.</li>
</ul>
"""
SERVER_HELP = """
<ul>
<li>Vous pouvez utiliser certains services de serveurs autres que celui sur lequel vous êtes inscrit, comme par exemple les salons de discussions.</li>
</ul>
"""
PASSWORD_HELP = """
<ul>
<li>Ce mot de passe protège votre compte. Ne le divulguez à personne.
Personne d'autre que vous n'a besoin de votre mot de passe.</li>
<li>Vous pouvez toujours utiliser votre client Jabber pour changer votre mot de passe.</li>
<li>Vous pouvez prendre soin d'indiquer votre adresse e-mail dans les informations personnelles de votre compte afin de pouvoir vous faire envoyer votre mot de passe à l'aide du <a href="https://im.apinc.org/outils/password.php">formulaire</a>, autrement il vous sera impossible de récupérer le mot de passe en cas d'oubli.</li>
</ul>
"""
class HebergForm(forms.Form):
"""Form for hosted domains"""
domain = forms.CharField(
max_length=100,
required=True,
label='Votre nom de domaine',
validators=[
RegexValidator(
regex=URLValidator.host_re,
message='Ce nom de domaine nest pas valide'
)
],
)
email = forms.EmailField(
required=True,
label='Votre adresse email',
)
type = forms.ChoiceField(
choices=[
('users', 'Comptes utilisateurs'),
('muc', 'Salons'),
],
label='Type dhébergement',
required=True,
)
message = forms.CharField(
widget=forms.Textarea,
max_length=3000,
label='Un petit mot?'
)
cname = forms.BooleanField(
required=True,
label='Jai effectué la redirection CNAME',
)
cgu = forms.BooleanField(
required=True,
label=(
'Jaccepte les CGU'
)
)
def help_hack(txt: str) -> str:
"""Required to preserve html structure with our help text"""
return '</span></p>' + txt + '<p><span>'
class InscriptionForm(forms.Form):
username = forms.CharField(
help_text=mark_safe(help_hack(USERNAME_HELP)),
label='Nom dutilisateur',
validators=[validate_jid_userpart],
required=True,
)
server = forms.ChoiceField(
choices=[
('jabber.fr', 'jabber.fr'),
('im.apinc.org', 'im.apinc.org'),
],
label='Serveur Jabber',
required=True,
help_text=mark_safe(help_hack(SERVER_HELP)),
validators=[
RegexValidator(
regex=URLValidator.host_re,
message='Ce nom de domaine nest pas valide'
)
],
)
password = forms.CharField(
widget=forms.PasswordInput,
label='Mot de passe',
required=True,
)
confirm_password = forms.CharField(
widget=forms.PasswordInput,
label='Vérification du mot de passe',
required=True,
help_text=mark_safe(help_hack(PASSWORD_HELP)),
)
def __init__(self, *args, **kwargs):
choices = kwargs.pop('custom_choices', [])
super().__init__(*args, **kwargs)
if choices:
self.fields['server'].widget.choices = choices

View File

@ -13,8 +13,8 @@ Including another URLconf
1. Import the include() function: from django.urls import include, path
2. Add a URL to urlpatterns: path('blog/', include('blog.urls'))
"""
from django.urls import path
from django.views.generic import TemplateView
from django.urls import path, re_path
from django.views.generic import TemplateView, RedirectView
from jabberfr.views import get_root_index, hebergement, inscription
@ -27,8 +27,10 @@ PAGES = [
urlpatterns = [
path('', get_root_index),
path('hébergement', hebergement),
path('inscription', inscription),
re_path('hébergement/?', hebergement),
re_path('hebergement/?', RedirectView.as_view(url='/hébergement')),
re_path('inscription/?', inscription),
path('inscription/<str:domain>', inscription),
]
urlpatterns += [

115
jabberfr/utils.py Normal file
View File

@ -0,0 +1,115 @@
import logging
import xml.etree.ElementTree as ET
from asyncio import gather, get_event_loop
from aiodns import DNSResolver
from aiohttp import ClientSession, ClientTimeout
from django import forms
log = logging.getLogger(__name__)
BASE_MUC_URL = 'http://[::1]:5280/muc_list/?'
DOMAINS_URL = 'https://jabberfr.org/domains.xml'
async def get_chatrooms(*, limit: int = 25, order: str = 'users') -> str:
params = {
'limit': str(limit),
'order': order,
'class': 'lastposts',
}
params_str = ';'.join(f'{key}={value}' for key, value in params.items())
url = BASE_MUC_URL + params_str
tmout = ClientTimeout(total=10)
async with ClientSession(raise_for_status=True, timeout=tmout) as session:
async with session.get(url) as resp:
return await resp.text()
async def get_custom_domains() -> set[str]:
results = set()
tmout = ClientTimeout(total=10)
async with ClientSession(raise_for_status=True, timeout=tmout) as session:
try:
async with session.get(DOMAINS_URL) as resp:
text = await resp.text()
root = ET.fromstring(text)
for child in root:
results.add(child.attrib['jid'])
except BaseException:
log.error('Unable to parse custom domains', exc_info=True)
return results
def check_srv_contents(srv_client, srv_server) -> bool:
if not isinstance(srv_client, Exception): # No SRV
for srv in srv_client:
if srv.host != 'jabberfr.org' or srv.port != 5222:
return True
if not isinstance(srv_server, Exception): # No SRV
for srv in srv_server:
if srv.host != 'jabberfr.org' or srv.port != 5269:
return True
return False
def format_srv_errors(srv_client, srv_server) -> str:
msg = (
'<p>Vous avez des enregistrements SRV qui ne pointent pas '
'vers JabberFR:</p>\n'
)
records = []
if not isinstance(srv_client, Exception):
for srv in srv_client:
records.append(srv)
if not isinstance(srv_server, Exception):
for srv in srv_server:
records.append(srv)
text = '<pre>'
for rec in records:
text += (
f'{rec.host}. {rec.type} {rec.ttl} {rec.priority} '
f'{rec.weight} {rec.port}\n'
)
text += '</pre>'
return msg + text
async def check_dns_config(data: dict) -> list[str]:
errors = []
resolver = DNSResolver(loop=get_event_loop())
cname, srv_client, srv_server = await gather(
resolver.query(data['domain'], 'CNAME'),
resolver.query(f'_xmpp-server._tcp.{data["domain"]}', 'SRV'),
resolver.query(f'_xmpp-client._tcp.{data["domain"]}', 'SRV'),
return_exceptions=True,
)
if check_srv_contents(srv_client, srv_server):
errors.append(format_srv_errors(srv_client, srv_server))
if isinstance(cname, BaseException):
errors.append(
f'<p>Vous navez pas denregistrement <code>CNAME</code> pour ce'
f' domaine, ajoutez-en un qui pointe vers JabberFR:\n'
f'<pre><code>{data["domain"]} IN CNAME jabberfr.org</code></pre>'
)
elif cname.cname != 'jabberfr.org':
errors.append(
f'<p>Votre enregistrement <code>CNAME</code> ne pointe pas '
f'vers <code>jabberfr.org</code> mais vers '
f'<code>{cname.cname}</code>, vous devriez changer ça dans '
f'votre interface de gestion DNS.</p>'
)
return errors
def check_passwords(form: forms.Form) -> bool:
if form.cleaned_data['password'] != form.cleaned_data['confirm_password']:
form.add_error(
'confirm_password', 'Les mots de passe ne correspondent pas'
)
return False
return True

View File

@ -1,13 +1,15 @@
from asyncio import gather, get_event_loop
from aiohttp import ClientSession, ClientError, ClientTimeout
from aiodns import DNSResolver
import logging
from aiohttp import ClientError
from django.shortcuts import render
from django import forms
from django.utils.safestring import mark_safe
from django.core.validators import RegexValidator, URLValidator
from jabberfr.validators import validate_jid_userpart
from jabberfr.forms import HebergForm, InscriptionForm
from jabberfr.utils import (
get_chatrooms,
get_custom_domains,
check_dns_config,
check_passwords,
)
BASE_MUC_URL = 'http://[::1]:5280/muc_list/?'
log = logging.getLogger(__name__)
TABLE_END = """
@ -16,48 +18,6 @@ TABLE_END = """
</table>
"""
USERNAME_HELP = """
<ul>
<li>Ceci est le <em>nom d'utilisateur</em> que vous utiliserez pour vous identifier sur votre compte.</li>
<li>Le nom d'utilisateur est insensible à la casse, <em>votrenom</em> est idententique à <em>VotreNom</em>.</li>
<li>Un nom d'utilisateur ne peut contenir <span style="text-decoration: underline;">aucun</span> de ces caractères: @ : ' " &lt; &gt; &amp;</li>
<li>Certains serveurs limitent la taille du nom d'utilisateur à 16 caractères.</li>
</ul>
"""
SERVER_HELP = """
<ul>
<li>Vous pouvez utiliser certains services de serveurs autres que celui sur lequel vous êtes inscrit, comme par exemple les salons de discussions.</li>
<li>Les serveurs proposés ici sont des serveurs faisant partis de la <a href="https://jabberfr.org/federation">fédération JabberFR</a>, JabberFR n'étant pas un serveur Jabber, mais un regroupement de serveurs autour des services comme le <a href="https://forum.jabberfr.org">forum</a> et le <a href="https://wiki.jabberfr.org">wiki</a>.</li>
</ul>
"""
PASSWORD_HELP = """
<ul>
<li>Ce mot de passe protège votre compte. Ne le divulguez à personne.
Personne d'autre que vous n'a besoin de votre mot de passe.</li>
<li>Vous pouvez toujours utiliser votre client Jabber pour changer votre mot de passe.</li>
<li>Vous pouvez prendre soin d'indiquer votre adresse e-mail dans les informations personnelles de votre compte afin de pouvoir vous faire envoyer votre mot de passe à l'aide du <a href="https://im.apinc.org/outils/password.php">formulaire</a>, autrement il vous sera impossible de récupérer le mot de passe en cas d'oubli.</li>
</ul>
"""
async def get_chatrooms(*, limit: int = 25, order: str = 'users') -> str:
params = {
'limit': str(limit),
'order': order,
'class': 'lastposts',
}
params_str = ';'.join(f'{key}={value}' for key, value in params.items())
url = BASE_MUC_URL + params_str
async with ClientSession(raise_for_status=True, timeout=ClientTimeout(total=10)) as session:
async with session.get(url) as resp:
return await resp.text()
async def get_root_index(request):
try:
@ -84,112 +44,6 @@ async def get_chat_index(request):
return render(request, 'chat/index.html', context=context)
class HebergForm(forms.Form):
"""Form for hosted domains"""
domain = forms.CharField(
max_length=100,
required=True,
label='Votre nom de domaine',
validators=[
RegexValidator(
regex=URLValidator.host_re,
message='Ce nom de domaine nest pas valide'
)
],
)
email = forms.EmailField(
required=True,
label='Votre adresse email',
)
type = forms.ChoiceField(
choices=[
('users', 'Comptes utilisateurs'),
('muc', 'Salons'),
],
label='Type dhébergement',
required=True,
)
message = forms.CharField(
widget=forms.Textarea,
max_length=3000,
label='Un petit mot?'
)
cname = forms.BooleanField(
required=True,
label='Jai effectué la redirection CNAME',
)
cgu = forms.BooleanField(
required=True,
label=(
'Jaccepte les CGU'
)
)
def check_srv_contents(srv_client, srv_server) -> bool:
if not isinstance(srv_client, Exception): # No SRV
for srv in srv_client:
if srv.host != 'jabberfr.org' or srv.port != 5222:
return True
if not isinstance(srv_server, Exception): # No SRV
for srv in srv_server:
if srv.host != 'jabberfr.org' or srv.port != 5269:
return True
return False
def format_srv_errors(srv_client, srv_server) -> str:
msg = (
'<p>Vous avez des enregistrements SRV qui ne pointent pas '
'vers JabberFR:</p>\n'
)
records = []
if not isinstance(srv_client, Exception):
for srv in srv_client:
records.append(srv)
if not isinstance(srv_server, Exception):
for srv in srv_server:
records.append(srv)
text = '<pre>'
for rec in records:
text += (
f'{rec.host}. {rec.type} {rec.ttl} {rec.priority} '
f'{rec.weight} {rec.port}\n'
)
text += '</pre>'
return msg + text
async def check_dns_config(form: HebergForm) -> list[str]:
errors = []
data = form.cleaned_data
resolver = DNSResolver(loop=get_event_loop())
cname, srv_client, srv_server = await gather(
resolver.query(data['domain'], 'CNAME'),
resolver.query(f'_xmpp-server._tcp.{data["domain"]}', 'SRV'),
resolver.query(f'_xmpp-client._tcp.{data["domain"]}', 'SRV'),
return_exceptions=True,
)
if check_srv_contents(srv_client, srv_server):
errors.append(format_srv_errors(srv_client, srv_server))
if isinstance(cname, BaseException):
errors.append(
f'<p>Vous navez pas denregistrement <code>CNAME</code> pour ce'
f' domaine, ajoutez-en un qui pointe vers JabberFR:\n'
f'<pre><code>{data["domain"]} IN CNAME jabberfr.org</code></pre>'
)
elif cname.cname != 'jabberfr.org':
errors.append(
f'<p>Votre enregistrement <code>CNAME</code> ne pointe pas '
f'vers <code>jabberfr.org</code> mais vers '
f'<code>{cname.cname}</code>, vous devriez changer ça dans '
f'votre interface de gestion DNS.</p>'
)
return errors
async def hebergement(request):
context = {
'title': 'Hébergement chez JabberFR',
@ -208,54 +62,24 @@ async def hebergement(request):
return render(request, 'hebergement.html', context=context)
class InscriptionForm(forms.Form):
username = forms.CharField(
help_text=mark_safe(USERNAME_HELP),
label='Nom dutilisateur',
validators=[validate_jid_userpart],
required=True,
)
server = forms.CharField(
label='Serveur Jabber',
required=True,
help_text=mark_safe(SERVER_HELP),
validators=[
RegexValidator(
regex=URLValidator.host_re,
message='Ce nom de domaine nest pas valide'
)
],
)
password = forms.CharField(
widget=forms.PasswordInput,
label='Mot de passe',
required=True,
)
confirm_password = forms.CharField(
widget=forms.PasswordInput,
label='Vérification du mot de passe',
required=True,
help_text=mark_safe(PASSWORD_HELP),
)
def check_passwords(form):
if form.cleaned_data['password'] != form.cleaned_data['confirm_password']:
form.add_error(
'confirm_password', 'Les mots de passe ne correspondent pas'
)
return False
return True
async def inscription(request):
context = {}
async def inscription(request, domain=None):
if domain and domain not in await get_custom_domains():
domain = None
custom = domain
context = {'custom': custom}
if not domain:
choices = [
('jabber.fr', 'jabber.fr'),
('im.apinc.org', 'im.apinc.org'),
]
else:
choices = [(domain, domain)]
if request.method == 'POST':
form = InscriptionForm(request.POST)
form = InscriptionForm(request.POST, custom_choices=choices)
context['form'] = form
if form.is_valid() and check_passwords(form):
pass
else:
form = InscriptionForm()
form = InscriptionForm(custom_choices=choices)
context['form'] = form
return render(request, 'inscription.html', context=context)