launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #24135
[Merge] ~cjwatson/launchpad:faster-soyuz-webservice-tests into launchpad:master
Colin Watson has proposed merging ~cjwatson/launchpad:faster-soyuz-webservice-tests into launchpad:master with ~cjwatson/launchpad:faster-bugs-webservice-tests as a prerequisite.
Commit message:
Stop using launchpadlib in Soyuz webservice tests
Port the Soyuz webservice tests to use in-process webservice calls
rather than launchpadlib and AppServerLayer. This takes the test time
for these test suites from 198 seconds to 52 seconds on my laptop.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/+git/launchpad/+merge/375891
--
Your team Launchpad code reviewers is requested to review the proposed merge of ~cjwatson/launchpad:faster-soyuz-webservice-tests into launchpad:master.
diff --git a/lib/lp/soyuz/browser/tests/test_archive_webservice.py b/lib/lp/soyuz/browser/tests/test_archive_webservice.py
index 44aaa41..48cb911 100644
--- a/lib/lp/soyuz/browser/tests/test_archive_webservice.py
+++ b/lib/lp/soyuz/browser/tests/test_archive_webservice.py
@@ -6,19 +6,17 @@ from __future__ import absolute_import, print_function, unicode_literals
__metaclass__ = type
from datetime import timedelta
+import json
-from lazr.restfulclient.errors import (
- BadRequest,
- HTTPError,
- NotFound,
- Unauthorized as LRUnauthorized,
- )
-from testtools import ExpectedException
from testtools.matchers import (
+ Contains,
+ ContainsDict,
+ EndsWith,
Equals,
+ MatchesListwise,
+ MatchesRegex,
MatchesStructure,
)
-import transaction
from zope.component import getUtility
from lp.app.interfaces.launchpad import ILaunchpadCelebrities
@@ -34,12 +32,12 @@ from lp.soyuz.interfaces.packagecopyjob import IPlainPackageCopyJobSource
from lp.soyuz.model.archivepermission import ArchivePermission
from lp.testing import (
admin_logged_in,
+ ANONYMOUS,
api_url,
- launchpadlib_for,
+ login,
person_logged_in,
record_two_runs,
TestCaseWithFactory,
- WebServiceTestCase,
)
from lp.testing.layers import DatabaseFunctionalLayer
from lp.testing.matchers import HasQueryCount
@@ -54,46 +52,35 @@ class TestArchiveWebservice(TestCaseWithFactory):
def setUp(self):
super(TestArchiveWebservice, self).setUp()
- with admin_logged_in():
+ with admin_logged_in() as _admin:
+ admin = _admin
self.archive = self.factory.makeArchive(
purpose=ArchivePurpose.PRIMARY)
distroseries = self.factory.makeDistroSeries(
distribution=self.archive.distribution)
person = self.factory.makePerson()
- distro_name = self.archive.distribution.name
- distroseries_name = distroseries.name
- person_name = person.name
-
- self.launchpad = launchpadlib_for(
- "archive test", "salgado", "WRITE_PUBLIC")
- self.distribution = self.launchpad.distributions[distro_name]
- self.distroseries = self.distribution.getSeries(
- name_or_version=distroseries_name)
- self.main_archive = self.distribution.main_archive
- self.person = self.launchpad.people[person_name]
+ self.main_archive_url = api_url(self.archive)
+ self.distroseries_url = api_url(distroseries)
+ self.person_url = api_url(person)
+ self.ws = webservice_for_person(
+ admin, permission=OAuthPermission.WRITE_PUBLIC,
+ default_api_version="devel")
def test_checkUpload_bad_pocket(self):
# Make sure a 403 error and not an OOPS is returned when
# CannotUploadToPocket is raised when calling checkUpload.
-
- # When we're on Python 2.7, this code will be much nicer as
- # assertRaises is a context manager so you can do something like
- # with self.assertRaises(HTTPError) as cm; do_something
- # .... then you have cm.exception available.
- def _do_check():
- self.main_archive.checkUpload(
- distroseries=self.distroseries,
- sourcepackagename="mozilla-firefox",
- pocket="Updates",
- component="restricted",
- person=self.person)
-
- e = self.assertRaises(HTTPError, _do_check)
-
- self.assertEqual(403, e.response.status)
- self.assertIn(
- "Not permitted to upload to the UPDATES pocket in a series "
- "in the 'DEVELOPMENT' state.", e.content)
+ response = self.ws.named_get(
+ self.main_archive_url, "checkUpload",
+ distroseries=self.distroseries_url,
+ sourcepackagename="mozilla-firefox",
+ pocket="Updates",
+ component="restricted",
+ person=self.person_url)
+ self.assertThat(response, MatchesStructure.byEquality(
+ status=403,
+ body=(
+ b"Not permitted to upload to the UPDATES pocket in a series "
+ b"in the 'DEVELOPMENT' state.")))
def test_getAllPermissions_constant_query_count(self):
# getAllPermissions has a query count constant in the number of
@@ -106,7 +93,7 @@ class TestArchiveWebservice(TestCaseWithFactory):
permission=ArchivePermissionType.UPLOAD)
def get_permissions():
- list(self.main_archive.getAllPermissions())
+ self.ws.named_get(self.main_archive_url, "getAllPermissions")
recorder1, recorder2 = record_two_runs(
get_permissions, create_permission, 1)
@@ -117,21 +104,20 @@ class TestArchiveWebservice(TestCaseWithFactory):
ppa = self.factory.makeArchive(purpose=ArchivePurpose.PPA)
ppa_url = api_url(ppa)
ws = webservice_for_person(
- ppa.owner, permission=OAuthPermission.WRITE_PRIVATE)
+ ppa.owner, permission=OAuthPermission.WRITE_PRIVATE,
+ default_api_version="devel")
# DELETE on an archive resource doesn't actually remove it
# immediately, but it asks the publisher to delete it later.
self.assertEqual(
- 'Active',
- ws.get(ppa_url, api_version='devel').jsonBody()['status'])
- self.assertEqual(200, ws.delete(ppa_url, api_version='devel').status)
+ 'Active', self.getWebserviceJSON(ws, ppa_url)['status'])
+ self.assertEqual(200, ws.delete(ppa_url).status)
self.assertEqual(
- 'Deleting',
- ws.get(ppa_url, api_version='devel').jsonBody()['status'])
+ 'Deleting', self.getWebserviceJSON(ws, ppa_url)['status'])
# Deleting the PPA again fails.
self.assertThat(
- ws.delete(ppa_url, api_version='devel'),
+ ws.delete(ppa_url),
MatchesStructure.byEquality(
status=400, body="Archive already deleted."))
@@ -141,89 +127,132 @@ class TestArchiveWebservice(TestCaseWithFactory):
ppa_url = api_url(ppa)
ws = webservice_for_person(
self.factory.makePerson(),
- permission=OAuthPermission.WRITE_PRIVATE)
+ permission=OAuthPermission.WRITE_PRIVATE,
+ default_api_version="devel")
# A random user can't delete someone else's PPA.
- self.assertEqual(401, ws.delete(ppa_url, api_version='devel').status)
+ self.assertEqual(401, ws.delete(ppa_url).status)
+
+class TestExternalDependencies(TestCaseWithFactory):
-class TestExternalDependencies(WebServiceTestCase):
+ layer = DatabaseFunctionalLayer
def test_external_dependencies_random_user(self):
"""Normal users can look but not touch."""
archive = self.factory.makeArchive()
- transaction.commit()
- ws_archive = self.wsObject(archive)
- self.assertIs(None, ws_archive.external_dependencies)
- ws_archive.external_dependencies = "random"
- with ExpectedException(LRUnauthorized, '.*'):
- ws_archive.lp_save()
+ archive_url = api_url(archive)
+ ws = webservice_for_person(
+ self.factory.makePerson(), permission=OAuthPermission.WRITE_PUBLIC)
+ ws_archive = self.getWebserviceJSON(ws, archive_url)
+ self.assertIsNone(ws_archive["external_dependencies"])
+ response = ws.patch(
+ archive_url, "application/json",
+ json.dumps({"external_dependencies": "random"}))
+ self.assertEqual(401, response.status)
def test_external_dependencies_owner(self):
"""Normal archive owners can look but not touch."""
archive = self.factory.makeArchive()
- transaction.commit()
- ws_archive = self.wsObject(archive, archive.owner)
- self.assertIs(None, ws_archive.external_dependencies)
- ws_archive.external_dependencies = "random"
- with ExpectedException(LRUnauthorized, '.*'):
- ws_archive.lp_save()
+ archive_url = api_url(archive)
+ ws = webservice_for_person(
+ archive.owner, permission=OAuthPermission.WRITE_PUBLIC)
+ ws_archive = self.getWebserviceJSON(ws, archive_url)
+ self.assertIsNone(ws_archive["external_dependencies"])
+ response = ws.patch(
+ archive_url, "application/json",
+ json.dumps({"external_dependencies": "random"}))
+ self.assertEqual(401, response.status)
def test_external_dependencies_ppa_owner_invalid(self):
"""PPA admins can look and touch."""
ppa_admin_team = getUtility(ILaunchpadCelebrities).ppa_admin
ppa_admin = self.factory.makePerson(member_of=[ppa_admin_team])
archive = self.factory.makeArchive(owner=ppa_admin)
- transaction.commit()
- ws_archive = self.wsObject(archive, archive.owner)
- self.assertIs(None, ws_archive.external_dependencies)
- ws_archive.external_dependencies = "random"
- regex = '(\n|.)*Invalid external dependencies(\n|.)*'
- with ExpectedException(BadRequest, regex):
- ws_archive.lp_save()
+ archive_url = api_url(archive)
+ ws = webservice_for_person(
+ archive.owner, permission=OAuthPermission.WRITE_PUBLIC)
+ ws_archive = self.getWebserviceJSON(ws, archive_url)
+ self.assertIsNone(ws_archive["external_dependencies"])
+ response = ws.patch(
+ archive_url, "application/json",
+ json.dumps({"external_dependencies": "random"}))
+ self.assertThat(response, MatchesStructure(
+ status=Equals(400),
+ body=Contains(b"Invalid external dependencies")))
def test_external_dependencies_ppa_owner_valid(self):
"""PPA admins can look and touch."""
ppa_admin_team = getUtility(ILaunchpadCelebrities).ppa_admin
ppa_admin = self.factory.makePerson(member_of=[ppa_admin_team])
archive = self.factory.makeArchive(owner=ppa_admin)
- transaction.commit()
- ws_archive = self.wsObject(archive, archive.owner)
- self.assertIs(None, ws_archive.external_dependencies)
- ws_archive.external_dependencies = (
- "deb http://example.org suite components")
- ws_archive.lp_save()
+ archive_url = api_url(archive)
+ ws = webservice_for_person(
+ archive.owner, permission=OAuthPermission.WRITE_PUBLIC)
+ ws_archive = self.getWebserviceJSON(ws, archive_url)
+ self.assertIsNone(ws_archive["external_dependencies"])
+ response = ws.patch(
+ archive_url, "application/json",
+ json.dumps({
+ "external_dependencies":
+ "deb http://example.org suite components",
+ }))
+ self.assertEqual(209, response.status)
-class TestArchiveDependencies(WebServiceTestCase):
+class TestArchiveDependencies(TestCaseWithFactory):
+
+ layer = DatabaseFunctionalLayer
def test_addArchiveDependency_random_user(self):
"""Normal users cannot add archive dependencies."""
archive = self.factory.makeArchive()
dependency = self.factory.makeArchive()
- transaction.commit()
- ws_archive = self.wsObject(archive)
- ws_dependency = self.wsObject(dependency)
- self.assertContentEqual([], ws_archive.dependencies)
- failure_regex = '(.|\n)*addArchiveDependency.*launchpad.Edit(.|\n)*'
- with ExpectedException(LRUnauthorized, failure_regex):
- dependency = ws_archive.addArchiveDependency(
- dependency=ws_dependency, pocket='Release', component='main')
+ archive_url = api_url(archive)
+ dependency_url = api_url(dependency)
+ ws = webservice_for_person(
+ self.factory.makePerson(), permission=OAuthPermission.WRITE_PUBLIC,
+ default_api_version="devel")
+ ws_archive = self.getWebserviceJSON(ws, archive_url)
+ ws_dependencies = self.getWebserviceJSON(
+ ws, ws_archive["dependencies_collection_link"])
+ self.assertEqual([], ws_dependencies["entries"])
+ response = ws.named_post(
+ archive_url, "addArchiveDependency",
+ dependency=dependency_url, pocket="Release", component="main")
+ self.assertThat(response, MatchesStructure(
+ status=Equals(401),
+ body=MatchesRegex(br".*addArchiveDependency.*launchpad.Edit.*")))
def test_addArchiveDependency_owner(self):
- """Normal users cannot add archive dependencies."""
+ """Archive owners can add archive dependencies."""
archive = self.factory.makeArchive()
dependency = self.factory.makeArchive()
- transaction.commit()
- ws_archive = self.wsObject(archive, archive.owner)
- ws_dependency = self.wsObject(dependency)
- self.assertContentEqual([], ws_archive.dependencies)
- with ExpectedException(NotFound, '(.|\n)*asdf(.|\n)*'):
- ws_archive.addArchiveDependency(
- dependency=ws_dependency, pocket='Release', component='asdf')
- dependency = ws_archive.addArchiveDependency(
- dependency=ws_dependency, pocket='Release', component='main')
- self.assertContentEqual([dependency], ws_archive.dependencies)
+ archive_url = api_url(archive)
+ dependency_url = api_url(dependency)
+ ws = webservice_for_person(
+ archive.owner, permission=OAuthPermission.WRITE_PUBLIC,
+ default_api_version="devel")
+ ws_archive = self.getWebserviceJSON(ws, archive_url)
+ ws_dependencies = self.getWebserviceJSON(
+ ws, ws_archive["dependencies_collection_link"])
+ self.assertEqual([], ws_dependencies["entries"])
+ response = ws.named_post(
+ archive_url, "addArchiveDependency",
+ dependency=dependency_url, pocket="Release", component="asdf")
+ self.assertThat(
+ response,
+ MatchesStructure(status=Equals(404), body=Contains(b"asdf")))
+ response = ws.named_post(
+ archive_url, "addArchiveDependency",
+ dependency=dependency_url, pocket="Release", component="main")
+ self.assertEqual(201, response.status)
+ archive_dependency_url = response.getHeader("Location")
+ ws_dependencies = self.getWebserviceJSON(
+ ws, ws_archive["dependencies_collection_link"])
+ self.assertThat(ws_dependencies["entries"], MatchesListwise([
+ ContainsDict({"self_link": Equals(archive_dependency_url)}),
+ ]))
def test_addArchiveDependency_invalid(self):
"""Invalid requests generate a BadRequest error."""
@@ -232,79 +261,98 @@ class TestArchiveDependencies(WebServiceTestCase):
with person_logged_in(archive.owner):
archive.addArchiveDependency(
dependency, PackagePublishingPocket.RELEASE)
- transaction.commit()
- ws_archive = self.wsObject(archive, archive.owner)
- ws_dependency = self.wsObject(dependency)
- expected_re = '(.|\n)*This dependency is already registered(.|\n)*'
- with ExpectedException(BadRequest, expected_re):
- ws_archive.addArchiveDependency(
- dependency=ws_dependency, pocket='Release')
+ archive_url = api_url(archive)
+ dependency_url = api_url(dependency)
+ ws = webservice_for_person(
+ archive.owner, permission=OAuthPermission.WRITE_PUBLIC,
+ default_api_version="devel")
+ response = ws.named_post(
+ archive_url, "addArchiveDependency",
+ dependency=dependency_url, pocket="Release")
+ self.assertThat(response, MatchesStructure.byEquality(
+ status=400, body=b"This dependency is already registered."))
def test_removeArchiveDependency_random_user(self):
- """Normal users can remove archive dependencies."""
+ """Normal users cannot remove archive dependencies."""
archive = self.factory.makeArchive()
dependency = self.factory.makeArchive()
with person_logged_in(archive.owner):
archive.addArchiveDependency(
dependency, PackagePublishingPocket.RELEASE)
- transaction.commit()
- ws_archive = self.wsObject(archive)
- ws_dependency = self.wsObject(dependency)
- failure_regex = '(.|\n)*remove.*Dependency.*launchpad.Edit(.|\n)*'
- with ExpectedException(LRUnauthorized, failure_regex):
- ws_archive.removeArchiveDependency(dependency=ws_dependency)
+ archive_url = api_url(archive)
+ dependency_url = api_url(dependency)
+ ws = webservice_for_person(
+ self.factory.makePerson(), permission=OAuthPermission.WRITE_PUBLIC,
+ default_api_version="devel")
+ response = ws.named_post(
+ archive_url, "removeArchiveDependency", dependency=dependency_url)
+ self.assertThat(response, MatchesStructure(
+ status=Equals(401),
+ body=MatchesRegex(
+ br".*removeArchiveDependency.*launchpad.Edit.*")))
def test_removeArchiveDependency_owner(self):
- """Normal users can remove archive dependencies."""
+ """Archive owners can remove archive dependencies."""
archive = self.factory.makeArchive()
dependency = self.factory.makeArchive()
with person_logged_in(archive.owner):
archive.addArchiveDependency(
dependency, PackagePublishingPocket.RELEASE)
- transaction.commit()
- ws_archive = self.wsObject(archive, archive.owner)
- ws_dependency = self.wsObject(dependency)
- ws_archive.removeArchiveDependency(dependency=ws_dependency)
- self.assertContentEqual([], ws_archive.dependencies)
+ archive_url = api_url(archive)
+ dependency_url = api_url(dependency)
+ ws = webservice_for_person(
+ archive.owner, permission=OAuthPermission.WRITE_PUBLIC,
+ default_api_version="devel")
+ response = ws.named_post(
+ archive_url, "removeArchiveDependency", dependency=dependency_url)
+ self.assertEqual(200, response.status)
+ ws_archive = self.getWebserviceJSON(ws, archive_url)
+ ws_dependencies = self.getWebserviceJSON(
+ ws, ws_archive["dependencies_collection_link"])
+ self.assertEqual([], ws_dependencies["entries"])
-class TestProcessors(WebServiceTestCase):
+class TestProcessors(TestCaseWithFactory):
"""Test the enabled_restricted_processors property and methods."""
+ layer = DatabaseFunctionalLayer
+
def test_erpNotAvailableInBeta(self):
"""The enabled_restricted_processors property is not in beta."""
- self.ws_version = 'beta'
archive = self.factory.makeArchive()
ppa_admin_team = getUtility(ILaunchpadCelebrities).ppa_admin
ppa_admin = self.factory.makePerson(member_of=[ppa_admin_team])
- transaction.commit()
- ws_archive = self.wsObject(archive, user=ppa_admin)
- expected_re = (
- "(.|\n)*object has no attribute "
- "'enabled_restricted_processors'(.|\n)*")
- with ExpectedException(AttributeError, expected_re):
- ws_archive.enabled_restricted_processors
+ archive_url = api_url(archive)
+ ws = webservice_for_person(ppa_admin, default_api_version="beta")
+ ws_archive = self.getWebserviceJSON(ws, archive_url)
+ self.assertNotIn(
+ "enabled_restricted_processors_collection_link", ws_archive)
def test_erpAvailableInDevel(self):
"""The enabled_restricted_processors property is in devel."""
- self.ws_version = 'devel'
archive = self.factory.makeArchive()
ppa_admin_team = getUtility(ILaunchpadCelebrities).ppa_admin
ppa_admin = self.factory.makePerson(member_of=[ppa_admin_team])
- transaction.commit()
- ws_archive = self.wsObject(archive, user=ppa_admin)
- self.assertContentEqual([], ws_archive.enabled_restricted_processors)
+ archive_url = api_url(archive)
+ ws = webservice_for_person(ppa_admin, default_api_version="devel")
+ ws_archive = self.getWebserviceJSON(ws, archive_url)
+ ws_erp = self.getWebserviceJSON(
+ ws, ws_archive["enabled_restricted_processors_collection_link"])
+ self.assertEqual([], ws_erp["entries"])
def test_processors(self):
"""Attributes about processors are available."""
- self.ws_version = 'devel'
self.factory.makeProcessor(
'new-arm', 'New ARM Title', 'New ARM Description')
- transaction.commit()
- ws_proc = self.service.processors.getByName(name='new-arm')
- self.assertEqual('new-arm', ws_proc.name)
- self.assertEqual('New ARM Title', ws_proc.title)
- self.assertEqual('New ARM Description', ws_proc.description)
+ ws = webservice_for_person(
+ self.factory.makePerson(), default_api_version="devel")
+ response = ws.named_get("/+processors", "getByName", name="new-arm")
+ self.assertEqual(200, response.status)
+ self.assertThat(response.jsonBody(), ContainsDict({
+ "name": Equals("new-arm"),
+ "title": Equals("New ARM Title"),
+ "description": Equals("New ARM Description"),
+ }))
def setProcessors(self, user, archive_url, names):
ws = webservice_for_person(
@@ -382,63 +430,80 @@ class TestProcessors(WebServiceTestCase):
def test_enableRestrictedProcessor(self):
"""A new processor can be added to the enabled restricted set."""
- self.ws_version = 'devel'
archive = self.factory.makeArchive()
- self.factory.makeProcessor(
+ arm = self.factory.makeProcessor(
name='arm', restricted=True, build_by_default=False)
ppa_admin_team = getUtility(ILaunchpadCelebrities).ppa_admin
ppa_admin = self.factory.makePerson(member_of=[ppa_admin_team])
- transaction.commit()
- ws_arm = self.service.processors.getByName(name='arm')
- ws_archive = self.wsObject(archive, user=ppa_admin)
- self.assertContentEqual([], ws_archive.enabled_restricted_processors)
- ws_archive.enableRestrictedProcessor(processor=ws_arm)
- self.assertContentEqual(
- [ws_arm], ws_archive.enabled_restricted_processors)
+ archive_url = api_url(archive)
+ arm_url = api_url(arm)
+ ws = webservice_for_person(
+ ppa_admin, permission=OAuthPermission.WRITE_PUBLIC,
+ default_api_version="devel")
+ ws_archive = self.getWebserviceJSON(ws, archive_url)
+ ws_erp = self.getWebserviceJSON(
+ ws, ws_archive["enabled_restricted_processors_collection_link"])
+ self.assertEqual([], ws_erp["entries"])
+ response = ws.named_post(
+ archive_url, "enableRestrictedProcessor", processor=arm_url)
+ self.assertEqual(200, response.status)
+ ws_erp = self.getWebserviceJSON(
+ ws, ws_archive["enabled_restricted_processors_collection_link"])
+ self.assertThat(ws_erp["entries"], MatchesListwise([
+ ContainsDict({"self_link": EndsWith(arm_url)}),
+ ]))
def test_enableRestrictedProcessor_owner(self):
"""A new processor can be added to the enabled restricted set.
An unauthorized user, even the archive owner, is not allowed.
"""
- self.ws_version = 'devel'
archive = self.factory.makeArchive()
- self.factory.makeProcessor(
+ arm = self.factory.makeProcessor(
name='arm', restricted=True, build_by_default=False)
- transaction.commit()
- ws_arm = self.service.processors.getByName(name='arm')
- ws_archive = self.wsObject(archive, user=archive.owner)
- self.assertContentEqual([], ws_archive.enabled_restricted_processors)
- expected_re = (
- "(.|\n)*'launchpad\.Admin'(.|\n)*")
- with ExpectedException(LRUnauthorized, expected_re):
- ws_archive.enableRestrictedProcessor(processor=ws_arm)
+ archive_url = api_url(archive)
+ arm_url = api_url(arm)
+ ws = webservice_for_person(
+ archive.owner, permission=OAuthPermission.WRITE_PUBLIC,
+ default_api_version="devel")
+ ws_archive = self.getWebserviceJSON(ws, archive_url)
+ ws_erp = self.getWebserviceJSON(
+ ws, ws_archive["enabled_restricted_processors_collection_link"])
+ self.assertEqual([], ws_erp["entries"])
+ response = ws.named_post(
+ archive_url, "enableRestrictedProcessor", processor=arm_url)
+ self.assertThat(response, MatchesStructure(
+ status=Equals(401), body=Contains(b"'launchpad.Admin'")))
def test_enableRestrictedProcessor_nonPrivUser(self):
"""A new processor can be added to the enabled restricted set.
An unauthorized user, some regular user, is not allowed.
"""
- self.ws_version = 'devel'
archive = self.factory.makeArchive()
- self.factory.makeProcessor(
+ arm = self.factory.makeProcessor(
name='arm', restricted=True, build_by_default=False)
- just_some_guy = self.factory.makePerson()
- transaction.commit()
- ws_arm = self.service.processors.getByName(name='arm')
- ws_archive = self.wsObject(archive, user=just_some_guy)
- self.assertContentEqual([], ws_archive.enabled_restricted_processors)
- expected_re = (
- "(.|\n)*'launchpad\.Admin'(.|\n)*")
- with ExpectedException(LRUnauthorized, expected_re):
- ws_archive.enableRestrictedProcessor(processor=ws_arm)
-
-
-class TestCopyPackage(WebServiceTestCase):
+ archive_url = api_url(archive)
+ arm_url = api_url(arm)
+ ws = webservice_for_person(
+ self.factory.makePerson(), permission=OAuthPermission.WRITE_PUBLIC,
+ default_api_version="devel")
+ ws_archive = self.getWebserviceJSON(ws, archive_url)
+ ws_erp = self.getWebserviceJSON(
+ ws, ws_archive["enabled_restricted_processors_collection_link"])
+ self.assertEqual([], ws_erp["entries"])
+ response = ws.named_post(
+ archive_url, "enableRestrictedProcessor", processor=arm_url)
+ self.assertThat(response, MatchesStructure(
+ status=Equals(401), body=Contains(b"'launchpad.Admin'")))
+
+
+class TestCopyPackage(TestCaseWithFactory):
"""Webservice test cases for the copyPackage/copyPackages methods"""
+ layer = DatabaseFunctionalLayer
+
def setup_data(self):
- self.ws_version = "devel"
uploader_dude = self.factory.makePerson()
sponsored_dude = self.factory.makePerson()
source_archive = self.factory.makeArchive()
@@ -453,7 +518,6 @@ class TestCopyPackage(WebServiceTestCase):
distribution=target_archive.distribution)
with person_logged_in(target_archive.owner):
target_archive.newComponentUploader(uploader_dude, "universe")
- transaction.commit()
return (source, source_archive, source_name, target_archive,
to_pocket, to_series, uploader_dude, sponsored_dude, version)
@@ -463,17 +527,22 @@ class TestCopyPackage(WebServiceTestCase):
to_series, uploader_dude, sponsored_dude,
version) = self.setup_data()
- ws_target_archive = self.wsObject(target_archive, user=uploader_dude)
- ws_source_archive = self.wsObject(source_archive)
- ws_sponsored_dude = self.wsObject(sponsored_dude)
+ target_archive_url = api_url(target_archive)
+ source_archive_url = api_url(source_archive)
+ sponsored_dude_url = api_url(sponsored_dude)
+ ws = webservice_for_person(
+ uploader_dude, permission=OAuthPermission.WRITE_PUBLIC,
+ default_api_version="devel")
- ws_target_archive.copyPackage(
+ response = ws.named_post(
+ target_archive_url, "copyPackage",
source_name=source_name, version=version,
- from_archive=ws_source_archive, to_pocket=to_pocket.name,
+ from_archive=source_archive_url, to_pocket=to_pocket.name,
to_series=to_series.name, include_binaries=False,
- sponsored=ws_sponsored_dude)
- transaction.commit()
+ sponsored=sponsored_dude_url)
+ self.assertEqual(200, response.status)
+ login(ANONYMOUS)
job_source = getUtility(IPlainPackageCopyJobSource)
copy_job = job_source.getActiveJobs(target_archive).one()
self.assertEqual(target_archive, copy_job.target_archive)
@@ -483,53 +552,68 @@ class TestCopyPackage(WebServiceTestCase):
(source, source_archive, source_name, target_archive, to_pocket,
to_series, uploader_dude, sponsored_dude,
version) = self.setup_data()
+ from_series = source.distroseries
- ws_target_archive = self.wsObject(target_archive, user=uploader_dude)
- ws_source_archive = self.wsObject(source_archive)
- ws_sponsored_dude = self.wsObject(sponsored_dude)
+ target_archive_url = api_url(target_archive)
+ source_archive_url = api_url(source_archive)
+ sponsored_dude_url = api_url(sponsored_dude)
+ ws = webservice_for_person(
+ uploader_dude, permission=OAuthPermission.WRITE_PUBLIC,
+ default_api_version="devel")
- ws_target_archive.copyPackages(
- source_names=[source_name], from_archive=ws_source_archive,
+ response = ws.named_post(
+ target_archive_url, "copyPackages",
+ source_names=[source_name], from_archive=source_archive_url,
to_pocket=to_pocket.name, to_series=to_series.name,
- from_series=source.distroseries.name, include_binaries=False,
- sponsored=ws_sponsored_dude)
- transaction.commit()
+ from_series=from_series.name, include_binaries=False,
+ sponsored=sponsored_dude_url)
+ self.assertEqual(200, response.status)
+ login(ANONYMOUS)
job_source = getUtility(IPlainPackageCopyJobSource)
copy_job = job_source.getActiveJobs(target_archive).one()
self.assertEqual(target_archive, copy_job.target_archive)
-class TestgetPublishedBinaries(WebServiceTestCase):
- """test getPublishedSources."""
+class TestGetPublishedBinaries(TestCaseWithFactory):
+ """Test getPublishedBinaries."""
+
+ layer = DatabaseFunctionalLayer
def setUp(self):
super(TestgetPublishedBinaries, self).setUp()
- self.ws_version = 'beta'
self.person = self.factory.makePerson()
self.archive = self.factory.makeArchive()
+ self.person_url = api_url(self.person)
+ self.archive_url = api_url(self.archive)
def test_getPublishedBinaries(self):
self.factory.makeBinaryPackagePublishingHistory(archive=self.archive)
- ws_archive = self.wsObject(self.archive, user=self.person)
- self.assertEqual(1, len(ws_archive.getPublishedBinaries()))
+ ws = webservice_for_person(self.person, default_api_version="beta")
+ response = ws.named_get(self.archive_url, "getPublishedBinaries")
+ self.assertEqual(200, response.status)
+ self.assertEqual(1, response.jsonBody()["total_size"])
def test_getPublishedBinaries_created_since_date(self):
datecreated = self.factory.getUniqueDate()
later_date = datecreated + timedelta(minutes=1)
self.factory.makeBinaryPackagePublishingHistory(
- archive=self.archive, datecreated=datecreated)
- ws_archive = self.wsObject(self.archive, user=self.person)
- publications = ws_archive.getPublishedBinaries(
- created_since_date=later_date)
- self.assertEqual(0, len(publications))
+ archive=self.archive, datecreated=datecreated)
+ ws = webservice_for_person(self.person, default_api_version="beta")
+ response = ws.named_get(
+ self.archive_url, "getPublishedBinaries",
+ created_since_date=later_date.isoformat())
+ self.assertEqual(200, response.status)
+ self.assertEqual(0, response.jsonBody()["total_size"])
def test_getPublishedBinaries_no_ordering(self):
self.factory.makeBinaryPackagePublishingHistory(archive=self.archive)
self.factory.makeBinaryPackagePublishingHistory(archive=self.archive)
- ws_archive = self.wsObject(self.archive, user=self.person)
- publications = ws_archive.getPublishedBinaries(ordered=False)
- self.assertEqual(2, len(publications))
+ ws = webservice_for_person(self.person, default_api_version="beta")
+ response = ws.named_get(
+ self.archive_url, "getPublishedBinaries", ordered=False)
+ self.assertEqual(200, response.status)
+ self.assertEqual(2, response.jsonBody()["total_size"])
def test_getPublishedBinaries_query_count(self):
# getPublishedBinaries has a query count constant in the number of
@@ -577,14 +661,16 @@ class TestgetPublishedBinaries(WebServiceTestCase):
recorder2, HasQueryCount(Equals(recorder1.count + 3), recorder1))
-class TestremoveCopyNotification(WebServiceTestCase):
+class TestRemoveCopyNotification(TestCaseWithFactory):
"""Test removeCopyNotification."""
+ layer = DatabaseFunctionalLayer
+
def setUp(self):
super(TestremoveCopyNotification, self).setUp()
- self.ws_version = 'devel'
self.person = self.factory.makePerson()
self.archive = self.factory.makeArchive(owner=self.person)
+ self.archive_url = api_url(self.archive)
def test_removeCopyNotification(self):
distroseries = self.factory.makeDistroSeries()
@@ -600,10 +686,14 @@ class TestremoveCopyNotification(WebServiceTestCase):
job.start()
job.fail()
- ws_archive = self.wsObject(self.archive, user=self.person)
- ws_archive.removeCopyNotification(job_id=job.id)
- transaction.commit()
+ ws = webservice_for_person(
+ self.person, permission=OAuthPermission.WRITE_PUBLIC,
+ default_api_version="devel")
+ response = ws.named_post(
+ self.archive_url, "removeCopyNotification", job_id=job.id)
+ self.assertEqual(200, response.status)
+ login(ANONYMOUS)
source = getUtility(IPlainPackageCopyJobSource)
self.assertEqual(
None, source.getIncompleteJobsForArchive(self.archive).any())