public inbox for [email protected]
help / color / mirror / Atom feedpgAdmin IV: schemas API test cases
2+ messages / 1 participants
[nested] [flat]
* pgAdmin IV: schemas API test cases
@ 2016-09-28 09:12 Navnath Gadakh <[email protected]>
2016-09-28 11:17 ` Re: pgAdmin IV: schemas API test cases Navnath Gadakh <[email protected]>
0 siblings, 1 reply; 2+ messages in thread
From: Navnath Gadakh @ 2016-09-28 09:12 UTC (permalink / raw)
To: Dave Page <[email protected]>; +Cc: pgadmin-hackers; Kanchan Mohitey <[email protected]>
Hi Dave,
Please find the attached patch for Schema node. (Changes required due to
drop object's functionality).
To run test cases:
python regression/runtests.py --pkg browser.server_groups.servers.
databases.schemas
Thanks.
--
Regards,
Navnath Gadakh
EnterpriseDB Corporation
The Enterprise PostgreSQL Company
--
Sent via pgadmin-hackers mailing list ([email protected])
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgadmin-hackers
Attachments:
[application/octet-stream] schemas.patch (39.2K, 3-schemas.patch)
download | inline diff:
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/tests/test_schema_add.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/tests/test_schema_add.py
index 8fd7226..cb45156 100644
--- a/web/pgadmin/browser/server_groups/servers/databases/schemas/tests/test_schema_add.py
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/tests/test_schema_add.py
@@ -6,10 +6,11 @@
# This software is released under the PostgreSQL Licence
#
# ##################################################################
+import json
from regression import test_utils as utils
from pgadmin.utils.route import BaseTestGenerator
-from pgadmin.browser.server_groups.servers.tests import utils as server_utils
+from regression import test_server_dict
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from . import utils as schema_utils
@@ -17,49 +18,27 @@ from . import utils as schema_utils
class SchemaAddTestCase(BaseTestGenerator):
""" This class will add new schema under database node. """
-
scenarios = [
# Fetching default URL for schema node.
('Check Schema Node URL', dict(url='/browser/schema/obj/'))
]
- @classmethod
- def setUpClass(cls):
- """
- This function perform the three tasks
- 1. Add the test server
- 2. Connect to server
- 3. Add the databases
-
- :return: None
- """
-
- # Firstly, add the server
- server_utils.add_server(cls.tester)
- # Connect to server
- cls.server_connect_response, cls.server_group, cls.server_ids = \
- server_utils.connect_server(cls.tester)
- if len(cls.server_connect_response) == 0:
- raise Exception("No Server(s) connected to add the database!!!")
- # Add database
- database_utils.add_database(cls.tester, cls.server_connect_response,
- cls.server_ids)
-
def runTest(self):
""" This function will add schema under database node. """
-
- schema_utils.add_schemas(self.tester)
-
- @classmethod
- def tearDownClass(cls):
- """
- This function deletes the added schemas, database, server
- and the 'parent_id.pkl' file which is created in setup() function.
-
- :return: None
- """
-
- schema_utils.delete_schema(cls.tester)
- database_utils.delete_database(cls.tester)
- server_utils.delete_server(cls.tester)
- utils.delete_parent_id_file()
+ database_info = test_server_dict["database"][0]
+ server_id = database_info["server_id"]
+
+ db_id = database_info["db_id"]
+ db_con = database_utils.verify_database(self,
+ utils.SERVER_GROUP,
+ server_id,
+ db_id)
+ if not db_con["info"] == "Database connected.":
+ raise Exception("Could not connect to database to add the schema.")
+
+ data = schema_utils.get_schema_config_data(self.server["username"])
+ response = self.tester.post(self.url + str(utils.SERVER_GROUP) + '/' +
+ str(server_id) + '/' + str(db_id) +
+ '/', data=json.dumps(data),
+ content_type='html/json')
+ self.assertEquals(response.status_code, 200)
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/tests/test_schema_delete.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/tests/test_schema_delete.py
index 995d8ad..53ed134 100644
--- a/web/pgadmin/browser/server_groups/servers/databases/schemas/tests/test_schema_delete.py
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/tests/test_schema_delete.py
@@ -6,10 +6,11 @@
# This software is released under the PostgreSQL Licence
#
# ##################################################################
+import uuid
from regression import test_utils as utils
+from regression import test_server_dict
from pgadmin.utils.route import BaseTestGenerator
-from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from . import utils as schema_utils
@@ -23,44 +24,42 @@ class SchemaDeleteTestCase(BaseTestGenerator):
('Check Schema Node URL', dict(url='/browser/schema/obj/'))
]
- @classmethod
- def setUpClass(cls):
- """
- This function perform the three tasks
- 1. Add the test server
- 2. Connect to server
- 3. Add the databases
-
- :return: None
- """
-
- # Firstly, add the server
- server_utils.add_server(cls.tester)
- # Connect to server
- cls.server_connect_response, cls.server_group, cls.server_ids = \
- server_utils.connect_server(cls.tester)
- if len(cls.server_connect_response) == 0:
- raise Exception("No Server(s) connected to add the database!!!")
- # Add database
- database_utils.add_database(cls.tester, cls.server_connect_response,
- cls.server_ids)
- # Add schemas
- schema_utils.add_schemas(cls.tester)
+ def setUp(self):
+ self.database_info = test_server_dict["database"][0]
+ self.db_name = self.database_info["db_name"]
+ # Change the db name, so that schema will create in newly created db
+ self.schema_name = "schema_get_%s" % str(uuid.uuid4())[1:6]
+ connection = utils.get_db_connection(self.db_name,
+ self.server['username'],
+ self.server['db_password'],
+ self.server['host'],
+ self.server['port'])
+ self.schema_details = schema_utils.create_schema(connection,
+ self.schema_name)
def runTest(self):
""" This function will delete schema under database node. """
+ server_id = self.database_info["server_id"]
+ db_id = self.database_info["db_id"]
+ db_con = database_utils.verify_database(self, utils.SERVER_GROUP,
+ server_id, db_id)
+ if not db_con['data']["connected"]:
+ raise Exception("Could not connect to database to delete the"
+ " schema.")
- schema_utils.delete_schema(self.tester)
-
- @classmethod
- def tearDownClass(cls):
- """
- This function deletes the added schemas, database, server
- and the 'parent_id.pkl' file which is created in setup() function.
+ schema_id = self.schema_details[0]
+ schema_name = self.schema_details[1]
+ schema_response = schema_utils.verify_schemas(self.server,
+ self.db_name,
+ schema_name)
+ if not schema_response:
+ raise Exception("Could not find the schema to delete.")
- :return: None
- """
+ response = self.tester.delete(self.url + str(utils.SERVER_GROUP)
+ + '/' + str(server_id) + '/' +
+ str(db_id) + '/' + str(schema_id),
+ follow_redirects=True)
+ self.assertEquals(response.status_code, 200)
- database_utils.delete_database(cls.tester)
- server_utils.delete_server(cls.tester)
- utils.delete_parent_id_file()
\ No newline at end of file
+ def tearDown(self):
+ pass
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/tests/test_schema_get.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/tests/test_schema_get.py
index c70482b..05af43b 100644
--- a/web/pgadmin/browser/server_groups/servers/databases/schemas/tests/test_schema_get.py
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/tests/test_schema_get.py
@@ -6,77 +6,45 @@
# This software is released under the PostgreSQL Licence
#
# ##################################################################
-
from regression import test_utils as utils
+from regression import test_server_dict
from pgadmin.utils.route import BaseTestGenerator
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
-from . import utils as schema_utils
class SchemaGetTestCase(BaseTestGenerator):
""" This class will add new schema under database node. """
-
scenarios = [
# Fetching default URL for extension node.
('Check Schema Node URL', dict(url='/browser/schema/obj/'))
]
- @classmethod
- def setUpClass(cls):
- """
- This function perform the three tasks
- 1. Add the test server
- 2. Connect to server
- 3. Add the databases
-
- :return: None
- """
-
- # Firstly, add the server
- server_utils.add_server(cls.tester)
- # Connect to server
- cls.server_connect_response, cls.server_group, cls.server_ids = \
- server_utils.connect_server(cls.tester)
- if len(cls.server_connect_response) == 0:
- raise Exception("No Server(s) connected to add the database!!!")
- # Add database
- database_utils.add_database(cls.tester, cls.server_connect_response,
- cls.server_ids)
- # Add schemas
- schema_utils.add_schemas(cls.tester)
-
def runTest(self):
""" This function will delete schema under database node. """
-
- all_id = utils.get_ids()
- server_ids = all_id["sid"]
- db_ids_dict = all_id["did"][0]
- schema_ids_dict = all_id["scid"][0]
-
- for server_id in server_ids:
- db_id = db_ids_dict[int(server_id)]
- db_con = database_utils.verify_database(self.tester,
- utils.SERVER_GROUP,
- server_id, db_id)
- if db_con['data']["connected"]:
- schema_id = schema_ids_dict[int(server_id)][0]
- schema_response = schema_utils.verify_schemas(self.tester,
- server_id, db_id,
- schema_id)
- self.assertTrue(schema_response.status_code, 200)
-
- @classmethod
- def tearDownClass(cls):
- """
- This function deletes the added schemas, database, server
- and the 'parent_id.pkl' file which is created in setup() function.
-
- :return: None
- """
-
- schema_utils.delete_schema(cls.tester)
- database_utils.delete_database(cls.tester)
- server_utils.delete_server(cls.tester)
- utils.delete_parent_id_file()
+ schema = test_server_dict["schema"][0]
+ db_id = schema["db_id"]
+ server_id = schema["server_id"]
+
+ server_response = server_utils.connect_server(self, server_id)
+ if not server_response["data"]["connected"]:
+ raise Exception("Could not connect to server to connect the"
+ " database.")
+
+ db_con = database_utils.verify_database(self,
+ utils.SERVER_GROUP,
+ server_id,
+ db_id)
+ if not db_con["info"] == "Database connected.":
+ raise Exception("Could not connect to database to get the schema.")
+
+ schema_id = schema["schema_id"]
+ schema_response = self.tester.get(
+ self.url + str(utils.SERVER_GROUP) + '/' +
+ str(server_id) + '/' + str(db_id) +
+ '/' + str(schema_id),
+ content_type='html/json')
+ self.assertEquals(schema_response.status_code, 200)
+ # Disconnect the database
+ database_utils.disconnect_database(self, server_id, db_id)
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/tests/test_schema_put.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/tests/test_schema_put.py
index d3d4bfd..a4312d6 100644
--- a/web/pgadmin/browser/server_groups/servers/databases/schemas/tests/test_schema_put.py
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/tests/test_schema_put.py
@@ -6,107 +6,133 @@
# This software is released under the PostgreSQL Licence
#
# ##################################################################
-
import json
+import uuid
from regression import test_utils as utils
+from regression import test_server_dict
from pgadmin.utils.route import BaseTestGenerator
-from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
-from regression.test_setup import advanced_config_data
from . import utils as schema_utils
class SchemaPutTestCase(BaseTestGenerator):
""" This class will update the schema under database node. """
-
scenarios = [
# Fetching default URL for extension node.
('Check Schema Node URL', dict(url='/browser/schema/obj/'))
]
- @classmethod
- def setUpClass(cls):
- """
- This function perform the three tasks
- 1. Add the test server
- 2. Connect to server
- 3. Add the databases
-
- :return: None
- """
-
- # Firstly, add the server
- server_utils.add_server(cls.tester)
- # Connect to server
- cls.server_connect_response, cls.server_group, cls.server_ids = \
- server_utils.connect_server(cls.tester)
- if len(cls.server_connect_response) == 0:
- raise Exception("No Server(s) connected to add the database!!!")
- # Add database
- database_utils.add_database(cls.tester, cls.server_connect_response,
- cls.server_ids)
- # Add schemas
- schema_utils.add_schemas(cls.tester)
+ def setUp(self):
+ self.database_info = test_server_dict["database"][0]
+ self.db_name = self.database_info["db_name"]
+ # Change the db name, so that schema will create in newly created db
+ self.schema_name = "schema_get_%s" % str(uuid.uuid4())[1:6]
+ connection = utils.get_db_connection(self.db_name,
+ self.server['username'],
+ self.server['db_password'],
+ self.server['host'],
+ self.server['port'])
+ self.schema_details = schema_utils.create_schema(connection,
+ self.schema_name)
def runTest(self):
""" This function will delete schema under database node. """
- all_id = utils.get_ids()
- db_ids_dict = all_id["did"][0]
- schema_ids_dict = all_id["scid"][0]
-
- for server_connect_data, server_id in zip(self.server_connect_response,
- self.server_ids):
- db_id = db_ids_dict[int(server_id)]
- db_con = database_utils.verify_database(self.tester,
- utils.SERVER_GROUP,
- server_id, db_id)
- if db_con['data']["connected"]:
- schema_id = schema_ids_dict[int(server_id)][0]
- schema_response = schema_utils.verify_schemas(self.tester,
- server_id, db_id,
- schema_id)
- schema_response = json.loads(
- schema_response.data.decode('utf-8'))
- if not schema_response:
- raise Exception("No schema(s) to update.")
-
- adv_config_data = None
- data = None
- db_user = server_connect_data['data']['user']['name']
- # Get the config data of appropriate db user
- for config_test_data in advanced_config_data['schema_update_data']:
- if db_user == config_test_data['owner']:
- adv_config_data = config_test_data
-
- if adv_config_data is not None:
- data = {
- "deffuncacl": adv_config_data["func_acl"],
- "defseqacl": adv_config_data["seq_acl"],
- "deftblacl": adv_config_data["tbl_acl"],
- "id": schema_id
- }
- put_response = self.tester.put(
- self.url + str(utils.SERVER_GROUP) + '/' + str(server_id) +
- '/' + str(db_id) + '/' + str(schema_id),
- data=json.dumps(data), follow_redirects=True)
-
- self.assertEquals(put_response.status_code, 200)
- response_data = json.loads(put_response.data.decode('utf-8'))
- self.assertTrue(response_data['success'], 1)
-
- @classmethod
- def tearDownClass(cls):
- """
- This function deletes the added schemas, database, server
- and the 'parent_id.pkl' file which is created in setup() function.
-
- :return: None
- """
-
- schema_utils.delete_schema(cls.tester)
- database_utils.delete_database(cls.tester)
- server_utils.delete_server(cls.tester)
- utils.delete_parent_id_file()
+ server_id = self.database_info["server_id"]
+ db_id = self.database_info["db_id"]
+ db_con = database_utils.verify_database(self, utils.SERVER_GROUP,
+ server_id, db_id)
+ if not db_con['data']["connected"]:
+ raise Exception("Could not connect to database to delete the"
+ " schema.")
+ schema_id = self.schema_details[0]
+ schema_name = self.schema_details[1]
+ schema_response = schema_utils.verify_schemas(self.server,
+ self.db_name,
+ schema_name)
+ if not schema_response:
+ raise Exception("Could not find the schema to update.")
+
+ db_user = self.server["username"]
+ data = {
+ "deffuncacl": {
+ "added":
+ [
+ {
+ "grantee": db_user,
+ "grantor": db_user,
+ "privileges":
+ [
+ {
+ "privilege_type": "X",
+ "privilege": True,
+ "with_grant": True
+ }
+ ]
+ }
+ ]
+ },
+ "defseqacl": {
+ "added":
+ [
+ {
+ "grantee": db_user,
+ "grantor": db_user,
+ "privileges":
+ [
+ {
+ "privilege_type": "r",
+ "privilege": True,
+ "with_grant": False
+ },
+ {
+ "privilege_type": "w",
+ "privilege": True,
+ "with_grant": False
+ },
+ {
+ "privilege_type": "U",
+ "privilege": True,
+ "with_grant": False
+ }
+ ]
+ }
+ ]
+ },
+ "deftblacl": {
+ "added":
+ [
+ {
+ "grantee": "public",
+ "grantor": db_user,
+ "privileges":
+ [
+ {
+ "privilege_type": "D",
+ "privilege": True,
+ "with_grant": False
+ },
+ {
+ "privilege_type": "x",
+ "privilege": True,
+ "with_grant": False
+ }
+ ]
+ }
+ ]
+ },
+ "id": schema_id
+ }
+ put_response = self.tester.put(
+ self.url + str(utils.SERVER_GROUP) + '/' + str(server_id) +
+ '/' + str(db_id) + '/' + str(schema_id),
+ data=json.dumps(data), follow_redirects=True)
+
+ self.assertEquals(put_response.status_code, 200)
+ # Disconnect the database
+ database_utils.disconnect_database(self, server_id, db_id)
+
+ def tearDown(self):
+ pass
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/tests/utils.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/tests/utils.py
index 4f91484..cc6f6b5 100644
--- a/web/pgadmin/browser/server_groups/servers/databases/schemas/tests/utils.py
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/tests/utils.py
@@ -6,13 +6,15 @@
# This software is released under the PostgreSQL Licence
#
# ##################################################################
+from __future__ import print_function
import json
import os
+import sys
import pickle
import uuid
+import traceback
-from regression.test_setup import pickle_path, advanced_config_data
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
@@ -20,102 +22,77 @@ from regression import test_utils as utils
SCHEMA_URL = '/browser/schema/obj/'
SCHEMA_DELETE_URL = '/browser/schema/delete/'
+file_name = os.path.realpath(__file__)
-def get_schema_config_data(server_connect_response):
+def get_schema_config_data(db_user):
"""This function is used to get advance config test data for schema"""
-
- adv_config_data = None
- data = None
- db_user = server_connect_response['data']['user']['name']
-
- # Get the config data of appropriate db user
- for config_test_data in advanced_config_data['schema_credentials']:
- if db_user == config_test_data['owner']:
- adv_config_data = config_test_data
-
- if adv_config_data is not None:
- data = {
- "deffuncacl": adv_config_data['func_acl'],
- "defseqacl": adv_config_data['seq_acl'],
- "deftblacl": adv_config_data['tbl_acl'],
- "deftypeacl": adv_config_data['type_acl'],
- "name": "schema_{0}".format(str(uuid.uuid4())[1:8]),
- "namespaceowner": adv_config_data['owner'],
- "nspacl": adv_config_data['privilege'],
- "seclabels": adv_config_data['sec_label']
- }
+ data = {
+ "deffuncacl": [],
+ "defseqacl": [],
+ "deftblacl": [],
+ "deftypeacl": [],
+ "name": "test_schema_{0}".format(str(uuid.uuid4())[1:8]),
+ "namespaceowner": db_user,
+ "nspacl": [
+ {
+ "grantee": db_user,
+ "grantor": db_user,
+ "privileges":
+ [
+ {
+ "privilege_type": "C",
+ "privilege": True,
+ "with_grant": False
+ },
+ {
+ "privilege_type": "U",
+ "privilege": True,
+ "with_grant": False
+ }
+ ]
+ }
+ ],
+ "seclabels": []
+ }
return data
-def write_schema_id(response_data, server_id):
- """
- This function writes the schema id into parent_id.pkl
-
- :param response_data: schema add response data
- :type response_data: dict
- :param server_id: server id
- :type server_id: str
- :return: None
- """
-
- schema_id = response_data['node']['_id']
- schema_name = str(response_data['node']['label'])
- pickle_id_dict = utils.get_pickle_id_dict()
- if os.path.isfile(pickle_path):
- existing_server_id = open(pickle_path, 'rb')
- tol_server_id = pickle.load(existing_server_id)
- pickle_id_dict = tol_server_id
- if 'scid' in pickle_id_dict:
- if pickle_id_dict['scid']:
- # Add the schema_id as value in dict
- pickle_id_dict["scid"][0].update(
- {int(server_id): [schema_id, schema_name]})
- else:
- # Create new dict with server_id and schema_id
- pickle_id_dict["scid"].append(
- {int(server_id): [schema_id, schema_name]})
- schema_output = open(pickle_path, 'wb')
- pickle.dump(pickle_id_dict, schema_output)
- schema_output.close()
-
-
-def add_schemas(tester):
+def create_schema(connection, schema_name):
"""This function add the schemas into databases"""
-
- all_id = utils.get_ids()
-
- server_ids = all_id["sid"]
- db_ids_dict = all_id["did"][0]
-
- for server_id in server_ids:
- server_connect_response = server_utils.verify_server(
- tester, str(utils.SERVER_GROUP), server_id)
- db_id = db_ids_dict[int(server_id)]
- db_con = database_utils.verify_database(tester, utils.SERVER_GROUP,
- server_id, db_id)
- if db_con['data']["connected"]:
- data = get_schema_config_data(
- server_connect_response)
- response = tester.post(
- SCHEMA_URL + str(utils.SERVER_GROUP) + '/' +
- str(server_id) + '/' + str(db_id) +
- '/', data=json.dumps(data),
- content_type='html/json')
-
- assert response.status_code == 200
- response_data = json.loads(response.data.decode('utf-8'))
- write_schema_id(response_data, server_id)
-
-
-def verify_schemas(tester, server_id, db_id, schema_id):
+ try:
+ old_isolation_level = connection.isolation_level
+ connection.set_isolation_level(0)
+ pg_cursor = connection.cursor()
+ pg_cursor.execute("CREATE SCHEMA %s" % schema_name)
+ connection.set_isolation_level(old_isolation_level)
+ connection.commit()
+ # Get schema details of newly created schema
+ pg_cursor.execute("SELECT sch.oid, sch.nspname FROM pg_namespace sch"
+ " WHERE sch.nspname='%s'" % schema_name)
+ schema = pg_cursor.fetchone()
+ connection.close()
+ return schema
+ except Exception:
+ traceback.print_exc(file=sys.stderr)
+
+
+def verify_schemas(server, db_name, schema_name):
"""This function verifies the schema is exists"""
-
- response = tester.get(SCHEMA_URL + str(utils.SERVER_GROUP) + '/' +
- str(server_id) + '/' + str(db_id) +
- '/' + str(schema_id),
- content_type='html/json')
- return response
+ try:
+ connection = utils.get_db_connection(db_name,
+ server['username'],
+ server['db_password'],
+ server['host'],
+ server['port'])
+ pg_cursor = connection.cursor()
+ pg_cursor.execute("SELECT * FROM pg_namespace sch"
+ " WHERE sch.nspname='%s'" % schema_name)
+ schema = pg_cursor.fetchone()
+ connection.close()
+ return schema
+ except Exception:
+ traceback.print_exc(file=sys.stderr)
def delete_schema(tester):
diff --git a/web/regression/__init__.py b/web/regression/__init__.py
index 6b7cb2f..567bf51 100644
--- a/web/regression/__init__.py
+++ b/web/regression/__init__.py
@@ -9,6 +9,8 @@
import pgadmin.browser.server_groups.servers.roles.tests.utils as roles_utils
import pgadmin.browser.server_groups.servers.tablespaces.tests.utils as \
tablespace_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tests import\
+ utils as schema_utils
global node_info_dict
node_info_dict = {
@@ -32,6 +34,5 @@ global test_server_dict
test_server_dict = {
"server": [],
"database": [],
- "tablespace": [],
- "role": []
+ "schema": []
}
diff --git a/web/regression/runtests.py b/web/regression/runtests.py
index 87b6004..0ebe003 100644
--- a/web/regression/runtests.py
+++ b/web/regression/runtests.py
@@ -10,14 +10,18 @@
""" This file collect all modules/files present in tests directory and add
them to TestSuite. """
from __future__ import print_function
-
import argparse
import os
import sys
import signal
import atexit
-import unittest
import logging
+import traceback
+
+if sys.version_info < (2, 7):
+ import unittest2 as unittest
+else:
+ import unittest
logger = logging.getLogger(__name__)
file_name = os.path.basename(__file__)
@@ -35,7 +39,7 @@ if sys.path[0] != root:
from pgadmin import create_app
import config
-import test_setup
+from regression import test_setup
# Delete SQLite db file if exists
if os.path.isfile(config.TEST_SQLITE_PATH):
@@ -59,14 +63,14 @@ if pgadmin_credentials:
'login_password']
# Execute the setup file
-exec (open("setup.py").read())
+exec(open("setup.py").read())
# Get the config database schema version. We store this in pgadmin.model
# as it turns out that putting it in the config files isn't a great idea
from pgadmin.model import SCHEMA_VERSION
# Delay the import test_utils as it needs updated config.SQLITE_PATH
-import test_utils
+from regression import test_utils
config.SETTINGS_SCHEMA_VERSION = SCHEMA_VERSION
@@ -189,11 +193,8 @@ def get_tests_result(test_suite):
if class_name not in failed_cases_result:
skipped_cases_result.append(class_name)
return total_ran, failed_cases_result, skipped_cases_result
- except Exception as exception:
- exception = "Exception: %s: line:%s %s" % (
- file_name, sys.exc_traceback.tb_lineno, exception)
- print(exception)
- logger.exception(exception)
+ except Exception:
+ traceback.print_exc(file=sys.stderr)
class StreamToLogger(object):
@@ -248,12 +249,13 @@ if __name__ == '__main__':
test_utils.login_tester_account(test_client)
servers_info = test_utils.get_config_data()
+ node_name = args['pkg'].split('.')[-1]
try:
for server in servers_info:
print("\n=============Running the test cases for '%s'============="
% server['name'], file=sys.stderr)
# Create test server
- test_utils.create_test_server(server)
+ test_utils.create_parent_server_node(server, node_name)
suite = get_suite(test_module_list, server, test_client)
tests = unittest.TextTestRunner(stream=sys.stderr,
diff --git a/web/regression/test_utils.py b/web/regression/test_utils.py
index 731f1b6..115a0d3 100644
--- a/web/regression/test_utils.py
+++ b/web/regression/test_utils.py
@@ -20,7 +20,7 @@ import test_setup
import regression
SERVER_GROUP = test_setup.config_data['server_group']
-file_name = os.path.basename(__file__)
+file_name = os.path.realpath(__file__)
def get_db_connection(db, username, password, host, port):
@@ -40,7 +40,7 @@ def login_tester_account(tester):
:type tester: flask test client object
:return: None
"""
- if os.environ['PGADMIN_SETUP_EMAIL'] and\
+ if os.environ['PGADMIN_SETUP_EMAIL'] and \
os.environ['PGADMIN_SETUP_PASSWORD']:
email = os.environ['PGADMIN_SETUP_EMAIL']
password = os.environ['PGADMIN_SETUP_PASSWORD']
@@ -145,26 +145,18 @@ def create_database(server, db_name):
def drop_database(connection, database_name):
"""This function used to drop the database"""
-
- try:
+ if database_name not in ["postgres", "template1", "template0"]:
pg_cursor = connection.cursor()
- pg_cursor.execute("SELECT * FROM pg_database db WHERE db.datname='%s'"
- % database_name)
+ pg_cursor.execute(
+ "SELECT * FROM pg_database db WHERE db.datname='%s'"
+ % database_name)
if pg_cursor.fetchall():
- # Release pid if any process using database
- pg_cursor.execute("select pg_terminate_backend(pid) from"
- " pg_stat_activity where datname='%s'" %
- database_name)
old_isolation_level = connection.isolation_level
connection.set_isolation_level(0)
pg_cursor.execute('''DROP DATABASE "%s"''' % database_name)
connection.set_isolation_level(old_isolation_level)
connection.commit()
connection.close()
- except Exception as exception:
- exception = "%s: line:%s %s" % (
- file_name, sys.exc_traceback.tb_lineno, exception)
- print(exception, file=sys.stderr)
def create_server(server):
@@ -198,55 +190,61 @@ def delete_server(tester, sid):
print(exception, file=sys.stderr)
-def delete_server_from_sqlite(sid):
- """This function used to delete server from SQLite"""
- try:
- con = sqlite3.connect(config.SQLITE_PATH)
- cur = con.cursor()
- server_objects = cur.execute('SELECT * FROM server WHERE id=%s' % sid)
- ss = server_objects.fetchall()
- # for i in ss:
- # print(">>>>>>>>>>>", i)
- servers_count = len(ss)
- # print(">>>>>>>", sid)
- if servers_count:
- cur.execute('DELETE FROM server WHERE id=%s' % sid)
- con.commit()
- con.close()
- except Exception as exception:
- exception = "%s: line:%s %s" % (
- file_name, sys.exc_traceback.tb_lineno, exception)
- print(exception, file=sys.stderr)
+def add_db_to_parent_node_dict(srv_id, db_id, test_db_name):
+ regression.test_server_dict["database"].append({"server_id": srv_id,
+ "db_id": db_id,
+ "db_name": test_db_name})
+
+
+def add_schema_to_parent_node_dict(srv_id, db_id, schema_id, schema_name):
+ regression.test_server_dict["schema"].append({"server_id": srv_id,
+ "db_id": db_id,
+ "schema_id": schema_id,
+ "schema_name": schema_name})
-def create_test_server(server_info):
+def create_parent_server_node(server_info, node_name):
"""
This function create the test server which will act as parent server,
the other node will add under this server
:param server_info: server details
:type server_info: dict
+ :param node_name: node name
+ :type node_name: str
:return: None
"""
# Create the server
srv_id = create_server(server_info)
-
- # Create test database
- test_db_name = "test_db_%s" % str(uuid.uuid4())[1:8]
- db_id = create_database(server_info, test_db_name)
+ if node_name == "databases":
+ # Create test database
+ test_db_name = "test_db_%s" % str(uuid.uuid4())[1:6]
+ db_id = create_database(server_info, test_db_name)
+ add_db_to_parent_node_dict(srv_id, db_id, test_db_name)
+ elif node_name == "schemas":
+ test_db_name = "test_db_%s" % str(uuid.uuid4())[1:6]
+ db_id = create_database(server_info, test_db_name)
+ add_db_to_parent_node_dict(srv_id, db_id, test_db_name)
+ # Create schema
+ schema_name = "test_schema_%s" % str(uuid.uuid4())[1:6]
+ connection = get_db_connection(test_db_name,
+ server_info['username'],
+ server_info['db_password'],
+ server_info['host'],
+ server_info['port'])
+
+ schema = regression.schema_utils.create_schema(connection, schema_name)
+ add_schema_to_parent_node_dict(srv_id, db_id, schema[0],
+ schema_name[1])
# Add server info to test_server_dict
regression.test_server_dict["server"].append({"server_id": srv_id,
"server": server_info})
- regression.test_server_dict["database"].append({"server_id": srv_id,
- "db_id": db_id,
- "db_name": test_db_name})
def delete_test_server(tester):
test_server_dict = regression.test_server_dict
test_servers = test_server_dict["server"]
test_databases = test_server_dict["database"]
- test_table_spaces = test_server_dict["tablespace"]
try:
for test_server in test_servers:
srv_id = test_server["server_id"]
@@ -281,6 +279,7 @@ def remove_db_file():
def _drop_objects(tester):
"""This function use to cleanup the created the objects(servers, databases,
schemas etc) during the test suite run"""
+
try:
conn = sqlite3.connect(config.SQLITE_PATH)
cur = conn.cursor()
@@ -319,9 +318,7 @@ def _drop_objects(tester):
server_info[1],
server_info[2])
# Do not drop the default databases
- if db[0] not in ["postgres", "template1",
- "template0"]:
- drop_database(connection, db[0])
+ drop_database(connection, db[0])
# Delete tablespace
connection = get_db_connection(server_info[3],
@@ -373,7 +370,7 @@ def _drop_objects(tester):
delete_server(tester, server_id)
except Exception as exception:
exception = "Exception while deleting server: %s:" \
- " line:%s %s" %\
+ " line:%s %s" % \
(file_name, sys.exc_traceback.tb_lineno,
exception)
print(exception, file=sys.stderr)
^ permalink raw reply [nested|flat] 2+ messages in thread
* Re: pgAdmin IV: schemas API test cases
2016-09-28 09:12 pgAdmin IV: schemas API test cases Navnath Gadakh <[email protected]>
@ 2016-09-28 11:17 ` Navnath Gadakh <[email protected]>
0 siblings, 0 replies; 2+ messages in thread
From: Navnath Gadakh @ 2016-09-28 11:17 UTC (permalink / raw)
To: Dave Page <[email protected]>; +Cc: pgadmin-hackers; Kanchan Mohitey <[email protected]>
Hi Dave,
Please ignore previous patch. There are some conflicts in the code. I
will send a revised patch later once 'casts' patch committed.
Thanks.
On Wed, Sep 28, 2016 at 2:42 PM, Navnath Gadakh <
[email protected]> wrote:
> Hi Dave,
>
> Please find the attached patch for Schema node. (Changes required due to
> drop object's functionality).
>
> To run test cases:
>
> python regression/runtests.py --pkg browser.server_groups.servers.
> databases.schemas
> Thanks.
>
> --
> Regards,
> Navnath Gadakh
>
> EnterpriseDB Corporation
> The Enterprise PostgreSQL Company
>
>
>
--
Regards,
Navnath Gadakh
EnterpriseDB Corporation
The Enterprise PostgreSQL Company
^ permalink raw reply [nested|flat] 2+ messages in thread
end of thread, other threads:[~2016-09-28 11:17 UTC | newest]
Thread overview: 2+ messages (download: mbox mbox.gz follow: Atom feed)
-- links below jump to the message on this page --
2016-09-28 09:12 pgAdmin IV: schemas API test cases Navnath Gadakh <[email protected]>
2016-09-28 11:17 ` Navnath Gadakh <[email protected]>
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox