Commit fa9df254 authored by okhin's avatar okhin

CSV Importer is working

parent ad8a14c3
Pipeline #713 failed with stage
in 58 seconds
......@@ -10,7 +10,7 @@ from picampaign.organization.models import Organization
class MockRequest(object):
pass
class ArugmentaryAdminTest(TestCase):
class ArgumentaryAdminTest(TestCase):
def setUp(self):
self.organization = Organization.objects.create(
name='Majestic 12',
......
......@@ -7,6 +7,13 @@ class ImporterAdmin(admin.ModelAdmin):
list_display = ['name', 'kind', 'category']
actions = ['import_action']
def get_queryset(self, request):
qs = super(ImporterAdmin, self).get_queryset(request)
if request.user.is_superuser:
return qs
user_orgs = [x.id for x in request.user.organizations.all()]
return qs.filter(campaign__organization__in=user_orgs)
def import_action(self, request, queryset):
"""
We're going to launch the import of data
......@@ -15,7 +22,7 @@ class ImporterAdmin(admin.ModelAdmin):
obj.handle()
self.message_user(request, _("Imported %(last_imported)s of %(last_count)s contacts") % {
'last_imported': obj.last_imported,
'last_count': obj.last_count})
'last_count': obj.last_count}, fail_silently='True')
import_action.short_description = _("Run the selected importers")
admin.site.register(Importer, ImporterAdmin)
......@@ -143,12 +143,44 @@ class Importer(models.Model):
# If we're a csv format, let's use DictReader on a file object
reader = DictReader(f)
for item in reader:
# We need to create the groups according to the header
import_data.append(item)
# FIXME: We need to create the groups according to the header
# Which means for each mandate:kind columsn we must add a item['mandates']
# entry with an end-date of ''
# One can belong to different groups
mandates = []
parsed_item = {}
for key in [k for k in item if k.startswith('mandates:')]:
for group in item[key].split(','):
mandates.append({'group':
{'kind':key.split(':')[1],
'name': group
},
'end_date': ''
})
# Same for phones and websites
contacts = {}
contacts['phones'] = []
for key in [k for k in item if k.startswith('phones:')]:
contacts['phones'].append({'kind': 'office phone',
'number': item[key]})
contacts['emails'] = []
for key in [k for k in item if k.startswith('emails:')]:
contacts['emails'].append({'kind': key.split(':')[1],
'email': item[key]})
contacts['websites'] = []
for key in [k for k in item if k == 'twitter']:
contacts['websites'].append({'url': item[key],
'kind': 'twitter'})
parsed_item['first_name'] = item['first_name']
parsed_item['last_name'] = item['last_name']
parsed_item['birth_date'] = item['birth_date']
parsed_item['mandates'] = mandates
parsed_item['contacts'] = contacts
import_data.append(parsed_item)
else:
# We're in json, let's load from a file
import_data = json.load(f)
# Now, let's get the correct call
if self.category == 'repr':
self.representative(import_data)
self.representative(import_data)
return
from unittest import mock
from django.test import TestCase
from django.contrib.admin.sites import AdminSite
from django.contrib.admin.options import ModelAdmin
from django.contrib.auth.models import User
from django.contrib.messages.storage.fallback import FallbackStorage
from django.test import TestCase, RequestFactory
from django.core.files.uploadedfile import SimpleUploadedFile
from picampaign.importer.models import Importer
from picampaign.contact.models import Contact, Phone
from picampaign.contact.models import Contact
from picampaign.campaign.models import Campaign, CampaignContact
from picampaign.organization.models import Group, GroupType, Organization
from picampaign.importer.admin import ImporterAdmin
class ImporterAdminTest(TestCase):
def setUp(self):
self.factory = RequestFactory()
self.user = User.objects.create_user(
username='john',
email='johndoe@example.com',
password='top_sikret'
)
self.super_user = User.objects.create_superuser(
username='Jane',
email='janedoe@exmaple.com',
password='password'
)
self.organization = Organization.objects.create(
name='Majestic 12',
sip_key='majestic-12'
)
self.campaign = Campaign.objects.create(
title='Test Campaign',
start_date='2000-01-01',
end_date='2100-12-31',
organization=self.organization
)
GroupType.objects.create(
organization=self.organization,
name='group'
)
GroupType.objects.create(
organization=self.organization,
name='chamber'
)
GroupType.objects.create(
organization=self.organization,
name='committee'
)
json_file = None
with open('picampaign/importer/data_tests.json', u'rb') as f:
json_file = SimpleUploadedFile('data_tests.json', f.read())
self.importer = Importer.objects.create(
name='test importer',
category='repr',
campaign = self.campaign,
file=json_file
)
self.site = AdminSite()
def test_get_queryset(self):
ma = ImporterAdmin(Importer, self.site)
request = self.factory.get('/admin/picampaign/importers/')
setattr(request, 'session', 'session')
messages = FallbackStorage(request)
setattr(request, '_messags', messages)
# Superuser can see anything
request.user = self.super_user
self.assertEqual(list(ma.get_queryset(request)), list(Importer.objects.all()))
# Normal user outside of orgs sees nothing
request.user = self.user
self.assertEqual(list(ma.get_queryset(request)), [])
# User in orgs see what they own
request.user.organizations = Organization.objects.all()
self.assertEqual(list(ma.get_queryset(request)), list(Importer.objects.all()))
def test_import_action(self):
ma = ImporterAdmin(Importer, self.site)
request = self.factory.get('/admin/picampaign/importers/')
request.user = self.user
request.user.organizations = Organization.objects.all()
# Launch the action now
self.assertEqual(self.importer.last_imported, None)
self.assertEqual(self.importer.last_count, None)
ma.import_action(request, ma.get_queryset(request))
self.importer.refresh_from_db()
self.assertEqual(self.importer.last_imported, 3)
self.assertEqual(self.importer.last_count, 10)
class ImporterTest(TestCase):
def setUp(self):
......@@ -54,7 +135,7 @@ class ImporterTest(TestCase):
importer.url = 'http://www.example.com/'
self.assertEqual(importer.kind(), 'url')
def test_representative(self):
def test_representative_json(self):
json_file = None
with open('picampaign/importer/data_tests.json', u'rb') as f:
json_file = SimpleUploadedFile('data_tests.json', f.read())
......@@ -73,3 +154,23 @@ class ImporterTest(TestCase):
self.assertEqual(importer.last_imported, 3)
self.assertEqual(len(Group.objects.all()), 13)
self.assertEqual(len(Contact.objects.all()), 3)
def test_representative_csv(self):
csv_file = None
with open('picampaign/importer/data_tests.csv', u'rb') as f:
csv_file = SimpleUploadedFile('data_tests.csv', f.read())
importer = Importer.objects.create(
name='test updater',
category='updater',
campaign=self.campaign,
file=csv_file
)
self.assertEqual(importer.kind(), 'file')
importer.handle()
importer.refresh_from_db()
# There's 2 Meps in the file
self.assertEqual(importer.last_count, 2)
self.assertEqual(importer.last_imported, 2)
self.assertEqual(len(Group.objects.all()), 4)
self.assertEqual(len(Contact.objects.all()), 2)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment