public inbox for [email protected]
help / color / mirror / Atom feedFrom: Navnath Gadakh <[email protected]>
To: Dave Page <[email protected]>
Cc: pgadmin-hackers <[email protected]>
Cc: Kanchan Mohitey <[email protected]>
Subject: pgAdmin IV: API test cases for Tables, Types and Views
Date: Fri, 14 Oct 2016 18:29:39 +0530
Message-ID: <CAOAJCYqHQxKtmLgK==Y2o3r12sJj7_ykXuiVKf+6rGmDGkYbZQ@mail.gmail.com> (raw)
List-Unsubscribe: <mailto:[email protected]?body=unsub%20pgadmin-hackers>
Hi Dave,
Please find the patch for API test cases of Tables and sub-nodes, types,
views and materialized views.
To run the test suite, use:
python runtests.py
Thanks.
--
Regards,
Navnath Gadakh
EnterpriseDB Corporation
The Enterprise PostgreSQL Company
--
Sent via pgadmin-hackers mailing list ([email protected])
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgadmin-hackers
Attachments:
[application/octet-stream] tables_types_views.patch (209.7K, 3-tables_types_views.patch)
download | inline diff:
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/__init__.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/__init__.py
index 61db480..e8564b8 100644
--- a/web/pgadmin/browser/server_groups/servers/databases/schemas/__init__.py
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/__init__.py
@@ -998,7 +998,7 @@ It may have been removed by another user.
sql_header = """
-- CATALOG: {0}
--- DROP SCHEMA {0};
+-- DROP SCHEMA {0};(
""".format(old_data['name'])
if hasattr(str, 'decode'):
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/functions/tests/utils.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/functions/tests/utils.py
index 86615b4..fd120ce 100644
--- a/web/pgadmin/browser/server_groups/servers/databases/schemas/functions/tests/utils.py
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/functions/tests/utils.py
@@ -39,6 +39,33 @@ def create_trigger_function(server, db_name, schema_name, func_name):
traceback.print_exc(file=sys.stderr)
+def create_trigger_function_with_trigger(server, db_name, schema_name,
+ func_name):
+ """This function add the trigger function to schema"""
+ try:
+ connection = utils.get_db_connection(db_name,
+ server['username'],
+ server['db_password'],
+ server['host'],
+ server['port'])
+ pg_cursor = connection.cursor()
+ query = "CREATE FUNCTION "+schema_name+"."+func_name+"()" \
+ " RETURNS trigger LANGUAGE 'plpgsql' STABLE LEAKPROOF" \
+ " SECURITY DEFINER SET enable_sort=true AS $BODY$ BEGIN" \
+ " NULL; END; $BODY$"
+ pg_cursor.execute(query)
+ connection.commit()
+ # Get 'oid' from newly created function
+ pg_cursor.execute("SELECT pro.oid, pro.proname FROM"
+ " pg_proc pro WHERE pro.proname='%s'" %
+ func_name)
+ functions = pg_cursor.fetchone()
+ connection.close()
+ return functions
+ except Exception:
+ traceback.print_exc(file=sys.stderr)
+
+
def verify_trigger_function(server, db_name, func_name):
"""This function verifies the trigger function in db"""
connection = utils.get_db_connection(db_name,
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/column/tests/__init__.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/column/tests/__init__.py
new file mode 100644
index 0000000..f6b8f5f
--- /dev/null
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/column/tests/__init__.py
@@ -0,0 +1,15 @@
+##########################################################################
+#
+# pgAdmin 4 - PostgreSQL Tools
+#
+# Copyright (C) 2013 - 2016, The pgAdmin Development Team
+# This software is released under the PostgreSQL Licence
+#
+##########################################################################
+from pgadmin.utils.route import BaseTestGenerator
+
+
+class ColumnsTestGenerator(BaseTestGenerator):
+
+ def runTest(self):
+ return []
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/column/tests/test_column_add.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/column/tests/test_column_add.py
new file mode 100644
index 0000000..d103b22
--- /dev/null
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/column/tests/test_column_add.py
@@ -0,0 +1,74 @@
+# #################################################################
+#
+# pgAdmin 4 - PostgreSQL Tools
+#
+# Copyright (C) 2013 - 2016, The pgAdmin Development Team
+# This software is released under the PostgreSQL Licence
+#
+# ##################################################################
+import json
+import uuid
+
+from regression import test_utils as utils
+from regression import parent_node_dict
+from pgadmin.utils.route import BaseTestGenerator
+from pgadmin.browser.server_groups.servers.databases.tests import utils as \
+ database_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
+ utils as schema_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
+ import utils as tables_utils
+
+
+class ColumnAddTestCase(BaseTestGenerator):
+ """This class will add new column under table node."""
+ scenarios = [
+ ('Add table Node URL', dict(url='/browser/column/obj/'))
+ ]
+
+ def setUp(self):
+ self.db_name = parent_node_dict["database"][-1]["db_name"]
+ schema_info = parent_node_dict["schema"][-1]
+ self.server_id = schema_info["server_id"]
+ self.db_id = schema_info["db_id"]
+ db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
+ self.server_id, self.db_id)
+ if not db_con['data']["connected"]:
+ raise Exception("Could not connect to database to add a table.")
+ self.schema_id = schema_info["schema_id"]
+ self.schema_name = schema_info["schema_name"]
+ schema_response = schema_utils.verify_schemas(self.server,
+ self.db_name,
+ self.schema_name)
+ if not schema_response:
+ raise Exception("Could not find the schema to add a table.")
+ self.table_name = "table_column_%s" % (str(uuid.uuid4())[1:6])
+ self.table_id = tables_utils.create_table(self.server, self.db_name,
+ self.schema_name,
+ self.table_name)
+
+ def runTest(self):
+ """This function will add column under table node."""
+ self.column_name = "test_column_add_%s" % (str(uuid.uuid4())[1:6])
+ data = {"name": self.column_name,
+ "cltype": "\"char\"",
+ "attacl": [],
+ "is_primary_key": False,
+ "attnotnull": False,
+ "attlen": False,
+ "attprecision": None,
+ "attoptions": [],
+ "seclabels": []
+ }
+ # Add table
+ response = self.tester.post(
+ self.url + str(utils.SERVER_GROUP) + '/' +
+ str(self.server_id) + '/' + str(self.db_id) +
+ '/' + str(self.schema_id) + '/' + str(self.table_id) + '/',
+ data=json.dumps(data),
+ content_type='html/json')
+ self.assertEquals(response.status_code, 200)
+
+ def tearDown(self):
+ # Disconnect the database
+ database_utils.disconnect_database(self, self.server_id, self.db_id)
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/column/tests/test_column_delete.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/column/tests/test_column_delete.py
new file mode 100644
index 0000000..c7de181
--- /dev/null
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/column/tests/test_column_delete.py
@@ -0,0 +1,73 @@
+# #################################################################
+#
+# pgAdmin 4 - PostgreSQL Tools
+#
+# Copyright (C) 2013 - 2016, The pgAdmin Development Team
+# This software is released under the PostgreSQL Licence
+#
+# ##################################################################
+import uuid
+
+from regression import test_utils as utils
+from regression import parent_node_dict
+from pgadmin.utils.route import BaseTestGenerator
+from pgadmin.browser.server_groups.servers.databases.tests import utils as \
+ database_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
+ utils as schema_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
+ import utils as tables_utils
+from . import utils as columns_utils
+
+
+class ColumnDeleteTestCase(BaseTestGenerator):
+ """This class will delete column under table node."""
+ scenarios = [
+ ('Delete table Node URL', dict(url='/browser/column/obj/'))
+ ]
+
+ def setUp(self):
+ self.db_name = parent_node_dict["database"][-1]["db_name"]
+ schema_info = parent_node_dict["schema"][-1]
+ self.server_id = schema_info["server_id"]
+ self.db_id = schema_info["db_id"]
+ db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
+ self.server_id, self.db_id)
+ if not db_con['data']["connected"]:
+ raise Exception("Could not connect to database to add a table.")
+ self.schema_id = schema_info["schema_id"]
+ self.schema_name = schema_info["schema_name"]
+ schema_response = schema_utils.verify_schemas(self.server,
+ self.db_name,
+ self.schema_name)
+ if not schema_response:
+ raise Exception("Could not find the schema to add a table.")
+ self.table_name = "table_column_%s" % (str(uuid.uuid4())[1:6])
+ self.table_id = tables_utils.create_table(self.server, self.db_name,
+ self.schema_name,
+ self.table_name)
+ self.column_name = "test_column_delete_%s" % (str(uuid.uuid4())[1:6])
+ self.column_id = columns_utils.create_column(self.server,
+ self.db_name,
+ self.schema_name,
+ self.table_name,
+ self.column_name)
+
+ def runTest(self):
+ """This function will drop column under table node."""
+ col_response = columns_utils.verify_column(self.server, self.db_name,
+ self.column_name)
+ if not col_response:
+ raise Exception("Could not find the column to drop.")
+ response = self.tester.delete(self.url + str(utils.SERVER_GROUP) +
+ '/' + str(self.server_id) + '/' +
+ str(self.db_id) + '/' +
+ str(self.schema_id) + '/' +
+ str(self.table_id) + '/' +
+ str(self.column_id),
+ follow_redirects=True)
+ self.assertEquals(response.status_code, 200)
+
+ def tearDown(self):
+ # Disconnect the database
+ database_utils.disconnect_database(self, self.server_id, self.db_id)
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/column/tests/test_column_get.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/column/tests/test_column_get.py
new file mode 100644
index 0000000..3299b79
--- /dev/null
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/column/tests/test_column_get.py
@@ -0,0 +1,69 @@
+# #################################################################
+#
+# pgAdmin 4 - PostgreSQL Tools
+#
+# Copyright (C) 2013 - 2016, The pgAdmin Development Team
+# This software is released under the PostgreSQL Licence
+#
+# ##################################################################
+import uuid
+
+from regression import test_utils as utils
+from regression import parent_node_dict
+from pgadmin.utils.route import BaseTestGenerator
+from pgadmin.browser.server_groups.servers.databases.tests import utils as \
+ database_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
+ utils as schema_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
+ import utils as tables_utils
+from . import utils as columns_utils
+
+
+class ColumnGetTestCase(BaseTestGenerator):
+ """This class will get column under table node."""
+ scenarios = [
+ ('Fetch table Node URL', dict(url='/browser/column/obj/'))
+ ]
+
+ def setUp(self):
+ self.db_name = parent_node_dict["database"][-1]["db_name"]
+ schema_info = parent_node_dict["schema"][-1]
+ self.server_id = schema_info["server_id"]
+ self.db_id = schema_info["db_id"]
+ db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
+ self.server_id, self.db_id)
+ if not db_con['data']["connected"]:
+ raise Exception("Could not connect to database to add a table.")
+ self.schema_id = schema_info["schema_id"]
+ self.schema_name = schema_info["schema_name"]
+ schema_response = schema_utils.verify_schemas(self.server,
+ self.db_name,
+ self.schema_name)
+ if not schema_response:
+ raise Exception("Could not find the schema to add a table.")
+ self.table_name = "table_column_%s" % (str(uuid.uuid4())[1:6])
+ self.table_id = tables_utils.create_table(self.server, self.db_name,
+ self.schema_name,
+ self.table_name)
+ self.column_name = "test_column_delete_%s" % (str(uuid.uuid4())[1:6])
+ self.column_id = columns_utils.create_column(self.server,
+ self.db_name,
+ self.schema_name,
+ self.table_name,
+ self.column_name)
+
+ def runTest(self):
+ """This function will fetch the column under table node."""
+ response = self.tester.get(self.url + str(utils.SERVER_GROUP) +
+ '/' + str(self.server_id) + '/' +
+ str(self.db_id) + '/' +
+ str(self.schema_id) + '/' +
+ str(self.table_id) + '/' +
+ str(self.column_id),
+ follow_redirects=True)
+ self.assertEquals(response.status_code, 200)
+
+ def tearDown(self):
+ # Disconnect the database
+ database_utils.disconnect_database(self, self.server_id, self.db_id)
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/column/tests/test_column_put.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/column/tests/test_column_put.py
new file mode 100644
index 0000000..057052b
--- /dev/null
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/column/tests/test_column_put.py
@@ -0,0 +1,81 @@
+# #################################################################
+#
+# pgAdmin 4 - PostgreSQL Tools
+#
+# Copyright (C) 2013 - 2016, The pgAdmin Development Team
+# This software is released under the PostgreSQL Licence
+#
+# ##################################################################
+import uuid
+import json
+
+from regression import test_utils as utils
+from regression import parent_node_dict
+from pgadmin.utils.route import BaseTestGenerator
+from pgadmin.browser.server_groups.servers.databases.tests import utils as \
+ database_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
+ utils as schema_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
+ import utils as tables_utils
+from . import utils as columns_utils
+
+
+class ColumnPutTestCase(BaseTestGenerator):
+ """This class will update the column under table node."""
+ scenarios = [
+ ('Put table Node URL', dict(url='/browser/column/obj/'))
+ ]
+
+ def setUp(self):
+ self.db_name = parent_node_dict["database"][-1]["db_name"]
+ schema_info = parent_node_dict["schema"][-1]
+ self.server_id = schema_info["server_id"]
+ self.db_id = schema_info["db_id"]
+ db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
+ self.server_id, self.db_id)
+ if not db_con['data']["connected"]:
+ raise Exception("Could not connect to database to add a table.")
+ self.schema_id = schema_info["schema_id"]
+ self.schema_name = schema_info["schema_name"]
+ schema_response = schema_utils.verify_schemas(self.server,
+ self.db_name,
+ self.schema_name)
+ if not schema_response:
+ raise Exception("Could not find the schema to add a table.")
+ self.table_name = "table_column_%s" % (str(uuid.uuid4())[1:6])
+ self.table_id = tables_utils.create_table(self.server, self.db_name,
+ self.schema_name,
+ self.table_name)
+ self.column_name = "test_column_put_%s" % (str(uuid.uuid4())[1:6])
+ self.column_id = columns_utils.create_column(self.server,
+ self.db_name,
+ self.schema_name,
+ self.table_name,
+ self.column_name)
+
+ def runTest(self):
+ """This function will update the column under table node."""
+ col_response = columns_utils.verify_column(self.server, self.db_name,
+ self.column_name)
+ if not col_response:
+ raise Exception("Could not find the column to update.")
+ data = {
+ "attnum": self.column_id,
+ "name": self.column_name,
+ "description": "This is test comment for column"
+ }
+ response = self.tester.put(
+ self.url + str(utils.SERVER_GROUP) + '/' +
+ str(self.server_id) + '/' +
+ str(self.db_id) + '/' +
+ str(self.schema_id) + '/' +
+ str(self.table_id) + '/' +
+ str(self.column_id),
+ data=json.dumps(data),
+ follow_redirects=True)
+ self.assertEquals(response.status_code, 200)
+
+ def tearDown(self):
+ # Disconnect the database
+ database_utils.disconnect_database(self, self.server_id, self.db_id)
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/column/tests/utils.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/column/tests/utils.py
new file mode 100644
index 0000000..f177d42
--- /dev/null
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/column/tests/utils.py
@@ -0,0 +1,86 @@
+# ##########################################################################
+#
+# #pgAdmin 4 - PostgreSQL Tools
+#
+# #Copyright (C) 2013 - 2016, The pgAdmin Development Team
+# #This software is released under the PostgreSQL Licence
+#
+# ##########################################################################
+from __future__ import print_function
+import traceback
+import sys
+
+from regression import test_utils as utils
+
+
+def create_column(server, db_name, schema_name, table_name, col_name):
+ """
+ This function creates a column under provided table.
+ :param server: server details
+ :type server: dict
+ :param db_name: database name
+ :type db_name: str
+ :param schema_name: schema name
+ :type schema_name: str
+ :param table_name: table name
+ :type table_name: str
+ :param col_name: column name
+ :type col_name: str
+ :return table_id: table id
+ :rtype: int
+ """
+ try:
+ connection = utils.get_db_connection(db_name,
+ server['username'],
+ server['db_password'],
+ server['host'],
+ server['port'])
+ old_isolation_level = connection.isolation_level
+ connection.set_isolation_level(0)
+ pg_cursor = connection.cursor()
+ query = "ALTER TABLE %s.%s ADD COLUMN %s char" % \
+ (schema_name, table_name, col_name)
+ pg_cursor.execute(query)
+ connection.set_isolation_level(old_isolation_level)
+ connection.commit()
+ # Get column position of newly added column
+ pg_cursor.execute("select attnum from pg_attribute where"
+ " attname='%s'" % col_name)
+ col = pg_cursor.fetchone()
+ col_pos = ''
+ if col:
+ col_pos = col[0]
+ connection.close()
+ return col_pos
+ except Exception:
+ traceback.print_exc(file=sys.stderr)
+ raise
+
+
+def verify_column(server, db_name, col_name):
+ """
+ This function verifies table exist in database or not.
+ :param server: server details
+ :type server: dict
+ :param db_name: database name
+ :type db_name: str
+ :param col_name: column name
+ :type col_name: str
+ :return table: table record from database
+ :rtype: tuple
+ """
+ try:
+ connection = utils.get_db_connection(db_name,
+ server['username'],
+ server['db_password'],
+ server['host'],
+ server['port'])
+ pg_cursor = connection.cursor()
+ pg_cursor.execute("select * from pg_attribute where attname='%s'" %
+ col_name)
+ col = pg_cursor.fetchone()
+ connection.close()
+ return col
+ except Exception:
+ traceback.print_exc(file=sys.stderr)
+ raise
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/constraints/check_constraint/tests/__init__.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/constraints/check_constraint/tests/__init__.py
new file mode 100644
index 0000000..0684486
--- /dev/null
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/constraints/check_constraint/tests/__init__.py
@@ -0,0 +1,14 @@
+# ##########################################################################
+# #
+# # pgAdmin 4 - PostgreSQL Tools
+# #
+# # Copyright (C) 2013 - 2016, The pgAdmin Development Team
+# # This software is released under the PostgreSQL Licence
+# #
+# ##########################################################################
+# from pgadmin.utils.route import BaseTestGenerator
+#
+#
+# class CheckConstraintTestGenerator(BaseTestGenerator):
+# def runTest(self):
+# return []
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/constraints/check_constraint/tests/test_check_constraint_add.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/constraints/check_constraint/tests/test_check_constraint_add.py
new file mode 100644
index 0000000..5e29bb5
--- /dev/null
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/constraints/check_constraint/tests/test_check_constraint_add.py
@@ -0,0 +1,73 @@
+# #################################################################
+#
+# pgAdmin 4 - PostgreSQL Tools
+#
+# Copyright (C) 2013 - 2016, The pgAdmin Development Team
+# This software is released under the PostgreSQL Licence
+#
+# ##################################################################
+import json
+import uuid
+
+from regression import test_utils as utils
+from regression import parent_node_dict
+from pgadmin.utils.route import BaseTestGenerator
+from pgadmin.browser.server_groups.servers.databases.tests import utils as \
+ database_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
+ utils as schema_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
+ import utils as tables_utils
+
+
+class CheckConstraintAddTestCase(BaseTestGenerator):
+ """This class will add check constraint to existing table"""
+ scenarios = [
+ ('Add check constraint to table',
+ dict(url='/browser/check_constraints/obj/'))
+ ]
+
+ def setUp(self):
+ self.db_name = parent_node_dict["database"][-1]["db_name"]
+ schema_info = parent_node_dict["schema"][-1]
+ self.server_id = schema_info["server_id"]
+ self.db_id = schema_info["db_id"]
+ db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
+ self.server_id, self.db_id)
+ if not db_con['data']["connected"]:
+ raise Exception("Could not connect to database to add a check "
+ "constraint.")
+ self.schema_id = schema_info["schema_id"]
+ self.schema_name = schema_info["schema_name"]
+ schema_response = schema_utils.verify_schemas(self.server,
+ self.db_name,
+ self.schema_name)
+ if not schema_response:
+ raise Exception("Could not find the schema to add a check "
+ "constraint.")
+ self.table_name = "table_checkconstraint_add_%s" % \
+ (str(uuid.uuid4())[1:6])
+ self.table_id = tables_utils.create_table(self.server,
+ self.db_name,
+ self.schema_name,
+ self.table_name)
+
+ def runTest(self):
+ """This function will add check constraint to table."""
+ check_constraint_name = "test_checkconstraint_add_%s" % \
+ (str(uuid.uuid4())[1:6])
+ data = {"name": check_constraint_name,
+ "consrc": " (id > 0)",
+ "convalidated": True,
+ "comment": "this is test comment"}
+ response = self.tester.post(
+ self.url + str(utils.SERVER_GROUP) + '/' +
+ str(self.server_id) + '/' + str(self.db_id) +
+ '/' + str(self.schema_id) + '/' + str(self.table_id) + '/',
+ data=json.dumps(data),
+ content_type='html/json')
+ self.assertEquals(response.status_code, 200)
+
+ def tearDown(self):
+ # Disconnect the database
+ database_utils.disconnect_database(self, self.server_id, self.db_id)
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/constraints/check_constraint/tests/test_check_constraint_delete.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/constraints/check_constraint/tests/test_check_constraint_delete.py
new file mode 100644
index 0000000..23cd716
--- /dev/null
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/constraints/check_constraint/tests/test_check_constraint_delete.py
@@ -0,0 +1,79 @@
+# #################################################################
+#
+# pgAdmin 4 - PostgreSQL Tools
+#
+# Copyright (C) 2013 - 2016, The pgAdmin Development Team
+# This software is released under the PostgreSQL Licence
+#
+# ##################################################################
+import uuid
+
+from regression import test_utils as utils
+from regression import parent_node_dict
+from pgadmin.utils.route import BaseTestGenerator
+from pgadmin.browser.server_groups.servers.databases.tests import utils as \
+ database_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
+ utils as schema_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
+ import utils as tables_utils
+from . import utils as chk_constraint_utils
+
+
+class CheckConstraintDeleteTestCase(BaseTestGenerator):
+ """This class will delete check constraint to existing table"""
+ scenarios = [
+ ('Delete check constraint to table',
+ dict(url='/browser/check_constraints/obj/'))
+ ]
+
+ def setUp(self):
+ self.db_name = parent_node_dict["database"][-1]["db_name"]
+ schema_info = parent_node_dict["schema"][-1]
+ self.server_id = schema_info["server_id"]
+ self.db_id = schema_info["db_id"]
+ db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
+ self.server_id, self.db_id)
+ if not db_con['data']["connected"]:
+ raise Exception("Could not connect to database to delete a check "
+ "constraint.")
+ self.schema_id = schema_info["schema_id"]
+ self.schema_name = schema_info["schema_name"]
+ schema_response = schema_utils.verify_schemas(self.server,
+ self.db_name,
+ self.schema_name)
+ if not schema_response:
+ raise Exception("Could not find the schema to delete a check "
+ "constraint.")
+ self.table_name = "table_checkconstraint_delete_%s" % \
+ (str(uuid.uuid4())[1:6])
+ self.table_id = tables_utils.create_table(self.server,
+ self.db_name,
+ self.schema_name,
+ self.table_name)
+ self.check_constraint_name = "test_checkconstraint_delete_%s" % \
+ (str(uuid.uuid4())[1:6])
+ self.check_constraint_id = \
+ chk_constraint_utils.create_check_constraint(
+ self.server, self.db_name, self.schema_name, self.table_name,
+ self.check_constraint_name)
+
+ def runTest(self):
+ """This function will delete check constraint to table."""
+ chk_constraint = chk_constraint_utils.verify_check_constraint(
+ self.server, self.db_name, self.check_constraint_name)
+ if not chk_constraint:
+ raise Exception("Could not find the check constraint to delete.")
+ response = self.tester.delete(
+ "{0}{1}/{2}/{3}/{4}/{5}/{6}".format(self.url, utils.SERVER_GROUP,
+ self.server_id, self.db_id,
+ self.schema_id,
+ self.table_id,
+ self.check_constraint_id),
+ follow_redirects=True
+ )
+ self.assertEquals(response.status_code, 200)
+
+ def tearDown(self):
+ # Disconnect the database
+ database_utils.disconnect_database(self, self.server_id, self.db_id)
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/constraints/check_constraint/tests/test_check_constraint_get.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/constraints/check_constraint/tests/test_check_constraint_get.py
new file mode 100644
index 0000000..aaa7f86
--- /dev/null
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/constraints/check_constraint/tests/test_check_constraint_get.py
@@ -0,0 +1,75 @@
+# #################################################################
+#
+# pgAdmin 4 - PostgreSQL Tools
+#
+# Copyright (C) 2013 - 2016, The pgAdmin Development Team
+# This software is released under the PostgreSQL Licence
+#
+# ##################################################################
+import uuid
+
+from regression import test_utils as utils
+from regression import parent_node_dict
+from pgadmin.utils.route import BaseTestGenerator
+from pgadmin.browser.server_groups.servers.databases.tests import utils as \
+ database_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
+ utils as schema_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
+ import utils as tables_utils
+from . import utils as chk_constraint_utils
+
+
+class CheckConstraintGetTestCase(BaseTestGenerator):
+ """This class will fetch check constraint to existing table"""
+ scenarios = [
+ ('Fetch check constraint to table',
+ dict(url='/browser/check_constraints/obj/'))
+ ]
+
+ def setUp(self):
+ self.db_name = parent_node_dict["database"][-1]["db_name"]
+ schema_info = parent_node_dict["schema"][-1]
+ self.server_id = schema_info["server_id"]
+ self.db_id = schema_info["db_id"]
+ db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
+ self.server_id, self.db_id)
+ if not db_con['data']["connected"]:
+ raise Exception("Could not connect to database to fetch a check "
+ "constraint.")
+ self.schema_id = schema_info["schema_id"]
+ self.schema_name = schema_info["schema_name"]
+ schema_response = schema_utils.verify_schemas(self.server,
+ self.db_name,
+ self.schema_name)
+ if not schema_response:
+ raise Exception("Could not find the schema to fetch a check "
+ "constraint.")
+ self.table_name = "table_checkconstraint_get_%s" % \
+ (str(uuid.uuid4())[1:6])
+ self.table_id = tables_utils.create_table(self.server,
+ self.db_name,
+ self.schema_name,
+ self.table_name)
+ self.check_constraint_name = "test_checkconstraint_get_%s" % \
+ (str(uuid.uuid4())[1:6])
+ self.check_constraint_id = \
+ chk_constraint_utils.create_check_constraint(
+ self.server, self.db_name, self.schema_name, self.table_name,
+ self.check_constraint_name)
+
+ def runTest(self):
+ """This function will fetch check constraint to table."""
+ response = self.tester.get(
+ "{0}{1}/{2}/{3}/{4}/{5}/{6}".format(self.url, utils.SERVER_GROUP,
+ self.server_id, self.db_id,
+ self.schema_id,
+ self.table_id,
+ self.check_constraint_id),
+ follow_redirects=True
+ )
+ self.assertEquals(response.status_code, 200)
+
+ def tearDown(self):
+ # Disconnect the database
+ database_utils.disconnect_database(self, self.server_id, self.db_id)
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/constraints/check_constraint/tests/test_check_constraint_put.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/constraints/check_constraint/tests/test_check_constraint_put.py
new file mode 100644
index 0000000..fed8547
--- /dev/null
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/constraints/check_constraint/tests/test_check_constraint_put.py
@@ -0,0 +1,82 @@
+# #################################################################
+#
+# pgAdmin 4 - PostgreSQL Tools
+#
+# Copyright (C) 2013 - 2016, The pgAdmin Development Team
+# This software is released under the PostgreSQL Licence
+#
+# ##################################################################
+import uuid
+import json
+
+from regression import test_utils as utils
+from regression import parent_node_dict
+from pgadmin.utils.route import BaseTestGenerator
+from pgadmin.browser.server_groups.servers.databases.tests import utils as \
+ database_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
+ utils as schema_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
+ import utils as tables_utils
+from . import utils as chk_constraint_utils
+
+
+class CheckConstraintPutTestCase(BaseTestGenerator):
+ """This class will update check constraint to existing table"""
+ scenarios = [
+ ('Update check constraint to table',
+ dict(url='/browser/check_constraints/obj/'))
+ ]
+
+ def setUp(self):
+ self.db_name = parent_node_dict["database"][-1]["db_name"]
+ schema_info = parent_node_dict["schema"][-1]
+ self.server_id = schema_info["server_id"]
+ self.db_id = schema_info["db_id"]
+ db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
+ self.server_id, self.db_id)
+ if not db_con['data']["connected"]:
+ raise Exception("Could not connect to database to update a check "
+ "constraint.")
+ self.schema_id = schema_info["schema_id"]
+ self.schema_name = schema_info["schema_name"]
+ schema_response = schema_utils.verify_schemas(self.server,
+ self.db_name,
+ self.schema_name)
+ if not schema_response:
+ raise Exception("Could not find the schema to update a check "
+ "constraint.")
+ self.table_name = "table_checkconstraint_put_%s" % \
+ (str(uuid.uuid4())[1:6])
+ self.table_id = tables_utils.create_table(self.server,
+ self.db_name,
+ self.schema_name,
+ self.table_name)
+ self.check_constraint_name = "test_checkconstraint_put_%s" % \
+ (str(uuid.uuid4())[1:6])
+ self.check_constraint_id = \
+ chk_constraint_utils.create_check_constraint(
+ self.server, self.db_name, self.schema_name, self.table_name,
+ self.check_constraint_name)
+
+ def runTest(self):
+ """This function will delete check constraint to table."""
+ chk_constraint = chk_constraint_utils.verify_check_constraint(
+ self.server, self.db_name, self.check_constraint_name)
+ if not chk_constraint:
+ raise Exception("Could not find the check constraint to update.")
+ data = {"oid": self.check_constraint_id,
+ "comment": "This is test comment for check constraint."}
+ response = self.tester.put(
+ "{0}{1}/{2}/{3}/{4}/{5}/{6}".format(self.url, utils.SERVER_GROUP,
+ self.server_id, self.db_id,
+ self.schema_id,
+ self.table_id,
+ self.check_constraint_id),
+ data=json.dumps(data),
+ follow_redirects=True)
+ self.assertEquals(response.status_code, 200)
+
+ def tearDown(self):
+ # Disconnect the database
+ database_utils.disconnect_database(self, self.server_id, self.db_id)
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/constraints/check_constraint/tests/utils.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/constraints/check_constraint/tests/utils.py
new file mode 100644
index 0000000..876eb38
--- /dev/null
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/constraints/check_constraint/tests/utils.py
@@ -0,0 +1,89 @@
+# ##########################################################################
+#
+# #pgAdmin 4 - PostgreSQL Tools
+#
+# #Copyright (C) 2013 - 2016, The pgAdmin Development Team
+# #This software is released under the PostgreSQL Licence
+#
+# ##########################################################################
+from __future__ import print_function
+import traceback
+import sys
+
+from regression import test_utils as utils
+
+
+def create_check_constraint(server, db_name, schema_name, table_name,
+ check_constraint_name):
+ """
+ This function creates a check constraint under provided table.
+ :param server: server details
+ :type server: dict
+ :param db_name: database name
+ :type db_name: str
+ :param schema_name: schema name
+ :type schema_name: str
+ :param table_name: table name
+ :type table_name: str
+ :param check_constraint_name: constraint name
+ :type check_constraint_name: str
+ :return chk_constraint_id: check constraint id
+ :rtype: int
+ """
+ try:
+ connection = utils.get_db_connection(db_name,
+ server['username'],
+ server['db_password'],
+ server['host'],
+ server['port'])
+ old_isolation_level = connection.isolation_level
+ connection.set_isolation_level(0)
+ pg_cursor = connection.cursor()
+ query = "ALTER TABLE %s.%s ADD CONSTRAINT %s CHECK ( (id > 0)) " \
+ "NOT VALID; COMMENT ON CONSTRAINT %s ON %s.%s IS " \
+ "'this is test comment'" % (schema_name, table_name,
+ check_constraint_name,
+ check_constraint_name,
+ schema_name, table_name)
+ pg_cursor.execute(query)
+ connection.set_isolation_level(old_isolation_level)
+ connection.commit()
+ # Get oid of newly added check constraint
+ pg_cursor.execute(
+ "SELECT oid FROM pg_constraint where conname='%s'" %
+ check_constraint_name)
+ chk_constraint_record = pg_cursor.fetchone()
+ connection.close()
+ chk_constraint_id = chk_constraint_record[0]
+ return chk_constraint_id
+ except Exception:
+ traceback.print_exc(file=sys.stderr)
+
+
+def verify_check_constraint(server, db_name, check_constraint_name):
+ """
+ This function verifies check constraint constraint exist or not.
+ :param server: server details
+ :type server: dict
+ :param db_name: database name
+ :type db_name: str
+ :param check_constraint_name: constraint name
+ :type check_constraint_name: str
+ :return chk_constraint_record: check constraint record from database
+ :rtype: tuple
+ """
+ try:
+ connection = utils.get_db_connection(db_name,
+ server['username'],
+ server['db_password'],
+ server['host'],
+ server['port'])
+ pg_cursor = connection.cursor()
+ pg_cursor.execute(
+ "SELECT oid FROM pg_constraint where conname='%s'" %
+ check_constraint_name)
+ chk_constraint_record = pg_cursor.fetchone()
+ connection.close()
+ return chk_constraint_record
+ except Exception:
+ traceback.print_exc(file=sys.stderr)
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/constraints/foreign_key/tests/__init__.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/constraints/foreign_key/tests/__init__.py
new file mode 100644
index 0000000..5445016
--- /dev/null
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/constraints/foreign_key/tests/__init__.py
@@ -0,0 +1,15 @@
+# ##########################################################################
+# #
+# # pgAdmin 4 - PostgreSQL Tools
+# #
+# # Copyright (C) 2013 - 2016, The pgAdmin Development Team
+# # This software is released under the PostgreSQL Licence
+# #
+# ##########################################################################
+# from pgadmin.utils.route import BaseTestGenerator
+#
+#
+# class ForeignKeyTestGenerator(BaseTestGenerator):
+#
+# def runTest(self):
+# return []
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/constraints/foreign_key/tests/test_foreign_key_add.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/constraints/foreign_key/tests/test_foreign_key_add.py
new file mode 100644
index 0000000..e2cb2fa
--- /dev/null
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/constraints/foreign_key/tests/test_foreign_key_add.py
@@ -0,0 +1,79 @@
+# #################################################################
+#
+# pgAdmin 4 - PostgreSQL Tools
+#
+# Copyright (C) 2013 - 2016, The pgAdmin Development Team
+# This software is released under the PostgreSQL Licence
+#
+# ##################################################################
+import json
+import uuid
+
+from regression import test_utils as utils
+from regression import parent_node_dict
+from pgadmin.utils.route import BaseTestGenerator
+from pgadmin.browser.server_groups.servers.databases.tests import utils as \
+ database_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
+ utils as schema_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
+ import utils as tables_utils
+
+
+class ForeignKeyAddTestCase(BaseTestGenerator):
+ """This class will add foreign key to existing table"""
+ scenarios = [
+ ('Add foreign Key constraint to table',
+ dict(url='/browser/foreign_key/obj/'))
+ ]
+
+ def setUp(self):
+ self.db_name = parent_node_dict["database"][-1]["db_name"]
+ schema_info = parent_node_dict["schema"][-1]
+ self.server_id = schema_info["server_id"]
+ self.db_id = schema_info["db_id"]
+ db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
+ self.server_id, self.db_id)
+ if not db_con['data']["connected"]:
+ raise Exception("Could not connect to database to add a foreign "
+ "key constraint.")
+ self.schema_id = schema_info["schema_id"]
+ self.schema_name = schema_info["schema_name"]
+ schema_response = schema_utils.verify_schemas(self.server,
+ self.db_name,
+ self.schema_name)
+ if not schema_response:
+ raise Exception("Could not find the schema to add a foreign "
+ "key constraint.")
+ self.local_table_name = "table_foreignkey_%s" % \
+ (str(uuid.uuid4())[1:6])
+ self.local_table_id = tables_utils.create_table(self.server,
+ self.db_name,
+ self.schema_name,
+ self.local_table_name)
+ self.foreign_table_name = "table_foreignkey_%s" % \
+ (str(uuid.uuid4())[1:6])
+ self.foreign_table_id = tables_utils.create_table(
+ self.server, self.db_name, self.schema_name,
+ self.foreign_table_name)
+
+ def runTest(self):
+ """This function will add foreign key table column."""
+ foreignkey_name = "test_foreignkey_add_%s" % \
+ (str(uuid.uuid4())[1:6])
+ data = {"name": foreignkey_name,
+ "columns": [{"local_column": "id",
+ "references": self.foreign_table_id,
+ "referenced": "id"}],
+ "confupdtype": "a", "confdeltype": "a", "autoindex": False}
+ response = self.tester.post(
+ self.url + str(utils.SERVER_GROUP) + '/' +
+ str(self.server_id) + '/' + str(self.db_id) +
+ '/' + str(self.schema_id) + '/' + str(self.local_table_id) + '/',
+ data=json.dumps(data),
+ content_type='html/json')
+ self.assertEquals(response.status_code, 200)
+
+ def tearDown(self):
+ # Disconnect the database
+ database_utils.disconnect_database(self, self.server_id, self.db_id)
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/constraints/foreign_key/tests/test_foreign_key_delete.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/constraints/foreign_key/tests/test_foreign_key_delete.py
new file mode 100644
index 0000000..72ff94b
--- /dev/null
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/constraints/foreign_key/tests/test_foreign_key_delete.py
@@ -0,0 +1,83 @@
+# #################################################################
+#
+# pgAdmin 4 - PostgreSQL Tools
+#
+# Copyright (C) 2013 - 2016, The pgAdmin Development Team
+# This software is released under the PostgreSQL Licence
+#
+# ##################################################################
+import uuid
+
+from regression import test_utils as utils
+from regression import parent_node_dict
+from pgadmin.utils.route import BaseTestGenerator
+from pgadmin.browser.server_groups.servers.databases.tests import utils as \
+ database_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
+ utils as schema_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
+ import utils as tables_utils
+from . import utils as fk_utils
+
+
+class ForeignKeyDeleteTestCase(BaseTestGenerator):
+ """This class will delete foreign key to existing table"""
+ scenarios = [
+ ('Delete foreign Key constraint.',
+ dict(url='/browser/foreign_key/obj/'))
+ ]
+
+ def setUp(self):
+ self.db_name = parent_node_dict["database"][-1]["db_name"]
+ schema_info = parent_node_dict["schema"][-1]
+ self.server_id = schema_info["server_id"]
+ self.db_id = schema_info["db_id"]
+ db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
+ self.server_id, self.db_id)
+ if not db_con['data']["connected"]:
+ raise Exception(
+ "Could not connect to database to delete a foreign "
+ "key constraint.")
+ self.schema_id = schema_info["schema_id"]
+ self.schema_name = schema_info["schema_name"]
+ schema_response = schema_utils.verify_schemas(self.server,
+ self.db_name,
+ self.schema_name)
+ if not schema_response:
+ raise Exception("Could not find the schema to delete a foreign "
+ "key constraint.")
+ self.local_table_name = "local_table_foreignkey_delete_%s" % \
+ (str(uuid.uuid4())[1:6])
+ self.local_table_id = tables_utils.create_table(
+ self.server, self.db_name, self.schema_name, self.local_table_name)
+ self.foreign_table_name = "foreign_table_foreignkey_delete_%s" % \
+ (str(uuid.uuid4())[1:6])
+ self.foreign_table_id = tables_utils.create_table(
+ self.server, self.db_name, self.schema_name,
+ self.foreign_table_name)
+ self.foreign_key_name = "test_foreignkey_delete_%s" % \
+ (str(uuid.uuid4())[1:6])
+ self.foreign_key_id = fk_utils.create_foreignkey(
+ self.server, self.db_name, self.schema_name, self.local_table_name,
+ self.foreign_table_name)
+
+ def runTest(self):
+ """This function will delete foreign key attached to table column."""
+ fk_response = fk_utils.verify_foreignkey(self.server, self.db_name,
+ self.local_table_name)
+ if not fk_response:
+ raise Exception("Could not find the foreign key constraint to "
+ "delete.")
+ response = self.tester.delete(
+ "{0}{1}/{2}/{3}/{4}/{5}/{6}".format(self.url, utils.SERVER_GROUP,
+ self.server_id, self.db_id,
+ self.schema_id,
+ self.local_table_id,
+ self.foreign_key_id),
+ follow_redirects=True
+ )
+ self.assertEquals(response.status_code, 200)
+
+ def tearDown(self):
+ # Disconnect the database
+ database_utils.disconnect_database(self, self.server_id, self.db_id)
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/constraints/foreign_key/tests/test_foreign_key_get.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/constraints/foreign_key/tests/test_foreign_key_get.py
new file mode 100644
index 0000000..b07d6c7
--- /dev/null
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/constraints/foreign_key/tests/test_foreign_key_get.py
@@ -0,0 +1,78 @@
+# #################################################################
+#
+# pgAdmin 4 - PostgreSQL Tools
+#
+# Copyright (C) 2013 - 2016, The pgAdmin Development Team
+# This software is released under the PostgreSQL Licence
+#
+# ##################################################################
+import uuid
+
+from regression import test_utils as utils
+from regression import parent_node_dict
+from pgadmin.utils.route import BaseTestGenerator
+from pgadmin.browser.server_groups.servers.databases.tests import utils as \
+ database_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
+ utils as schema_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
+ import utils as tables_utils
+from . import utils as fk_utils
+
+
+class ForeignGetDeleteTestCase(BaseTestGenerator):
+ """This class will fetch foreign key from existing table"""
+ scenarios = [
+ ('Fetch foreign Key constraint.',
+ dict(url='/browser/foreign_key/obj/'))
+ ]
+
+ def setUp(self):
+ self.db_name = parent_node_dict["database"][-1]["db_name"]
+ schema_info = parent_node_dict["schema"][-1]
+ self.server_id = schema_info["server_id"]
+ self.db_id = schema_info["db_id"]
+ db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
+ self.server_id, self.db_id)
+ if not db_con['data']["connected"]:
+ raise Exception(
+ "Could not connect to database to fetch a foreign "
+ "key constraint.")
+ self.schema_id = schema_info["schema_id"]
+ self.schema_name = schema_info["schema_name"]
+ schema_response = schema_utils.verify_schemas(self.server,
+ self.db_name,
+ self.schema_name)
+ if not schema_response:
+ raise Exception("Could not find the schema to fetch a foreign "
+ "key constraint.")
+ self.local_table_name = "local_table_foreignkey_get_%s" % \
+ (str(uuid.uuid4())[1:6])
+ self.local_table_id = tables_utils.create_table(
+ self.server, self.db_name, self.schema_name, self.local_table_name)
+ self.foreign_table_name = "foreign_table_foreignkey_get_%s" % \
+ (str(uuid.uuid4())[1:6])
+ self.foreign_table_id = tables_utils.create_table(
+ self.server, self.db_name, self.schema_name,
+ self.foreign_table_name)
+ self.foreign_key_name = "test_foreignkey_get_%s" % \
+ (str(uuid.uuid4())[1:6])
+ self.foreign_key_id = fk_utils.create_foreignkey(
+ self.server, self.db_name, self.schema_name, self.local_table_name,
+ self.foreign_table_name)
+
+ def runTest(self):
+ """This function will delete foreign key attached to table column."""
+ response = self.tester.get(
+ "{0}{1}/{2}/{3}/{4}/{5}/{6}".format(self.url, utils.SERVER_GROUP,
+ self.server_id, self.db_id,
+ self.schema_id,
+ self.local_table_id,
+ self.foreign_key_id),
+ follow_redirects=True
+ )
+ self.assertEquals(response.status_code, 200)
+
+ def tearDown(self):
+ # Disconnect the database
+ database_utils.disconnect_database(self, self.server_id, self.db_id)
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/constraints/foreign_key/tests/test_foreign_key_put.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/constraints/foreign_key/tests/test_foreign_key_put.py
new file mode 100644
index 0000000..d1996cd
--- /dev/null
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/constraints/foreign_key/tests/test_foreign_key_put.py
@@ -0,0 +1,82 @@
+# #################################################################
+#
+# pgAdmin 4 - PostgreSQL Tools
+#
+# Copyright (C) 2013 - 2016, The pgAdmin Development Team
+# This software is released under the PostgreSQL Licence
+#
+# ##################################################################
+import uuid
+import json
+
+from regression import test_utils as utils
+from regression import parent_node_dict
+from pgadmin.utils.route import BaseTestGenerator
+from pgadmin.browser.server_groups.servers.databases.tests import utils as \
+ database_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
+ utils as schema_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
+ import utils as tables_utils
+from . import utils as fk_utils
+
+
+class ForeignPutDeleteTestCase(BaseTestGenerator):
+ """This class will update foreign key from existing table"""
+ scenarios = [
+ ('Fetch foreign Key constraint.',
+ dict(url='/browser/foreign_key/obj/'))
+ ]
+
+ def setUp(self):
+ self.db_name = parent_node_dict["database"][-1]["db_name"]
+ schema_info = parent_node_dict["schema"][-1]
+ self.server_id = schema_info["server_id"]
+ self.db_id = schema_info["db_id"]
+ db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
+ self.server_id, self.db_id)
+ if not db_con['data']["connected"]:
+ raise Exception(
+ "Could not connect to database to fetch a foreign "
+ "key constraint.")
+ self.schema_id = schema_info["schema_id"]
+ self.schema_name = schema_info["schema_name"]
+ schema_response = schema_utils.verify_schemas(self.server,
+ self.db_name,
+ self.schema_name)
+ if not schema_response:
+ raise Exception("Could not find the schema to fetch a foreign "
+ "key constraint.")
+ self.local_table_name = "local_table_foreignkey_get_%s" % \
+ (str(uuid.uuid4())[1:6])
+ self.local_table_id = tables_utils.create_table(
+ self.server, self.db_name, self.schema_name, self.local_table_name)
+ self.foreign_table_name = "foreign_table_foreignkey_get_%s" % \
+ (str(uuid.uuid4())[1:6])
+ self.foreign_table_id = tables_utils.create_table(
+ self.server, self.db_name, self.schema_name,
+ self.foreign_table_name)
+ self.foreign_key_name = "test_foreignkey_get_%s" % \
+ (str(uuid.uuid4())[1:6])
+ self.foreign_key_id = fk_utils.create_foreignkey(
+ self.server, self.db_name, self.schema_name, self.local_table_name,
+ self.foreign_table_name)
+
+ def runTest(self):
+ """This function will update foreign key attached to table column."""
+ data = {"oid": self.foreign_key_id,
+ "comment": "This is TEST comment for foreign key constraint."
+ }
+ response = self.tester.put(
+ "{0}{1}/{2}/{3}/{4}/{5}/{6}".format(self.url, utils.SERVER_GROUP,
+ self.server_id, self.db_id,
+ self.schema_id,
+ self.local_table_id,
+ self.foreign_key_id),
+ data=json.dumps(data),
+ follow_redirects=True)
+ self.assertEquals(response.status_code, 200)
+
+ def tearDown(self):
+ # Disconnect the database
+ database_utils.disconnect_database(self, self.server_id, self.db_id)
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/constraints/foreign_key/tests/utils.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/constraints/foreign_key/tests/utils.py
new file mode 100644
index 0000000..8111801
--- /dev/null
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/constraints/foreign_key/tests/utils.py
@@ -0,0 +1,88 @@
+# ##########################################################################
+#
+# #pgAdmin 4 - PostgreSQL Tools
+#
+# #Copyright (C) 2013 - 2016, The pgAdmin Development Team
+# #This software is released under the PostgreSQL Licence
+#
+# ##########################################################################
+from __future__ import print_function
+import traceback
+import sys
+
+from regression import test_utils as utils
+
+
+def create_foreignkey(server, db_name, schema_name, local_table_name,
+ foreign_table_name):
+ """
+ This function creates a column under provided table.
+ :param server: server details
+ :type server: dict
+ :param db_name: database name
+ :type db_name: str
+ :param schema_name: schema name
+ :type schema_name: str
+ :param local_table_name: local table name
+ :type local_table_name: str
+ :param foreign_table_name: foreign table name
+ :type foreign_table_name: str
+ :return table_id: table id
+ :rtype: int
+ """
+ try:
+ connection = utils.get_db_connection(db_name,
+ server['username'],
+ server['db_password'],
+ server['host'],
+ server['port'])
+ old_isolation_level = connection.isolation_level
+ connection.set_isolation_level(0)
+ pg_cursor = connection.cursor()
+ query = "ALTER TABLE %s.%s ADD FOREIGN KEY (id) REFERENCES %s.%s " \
+ "(id) MATCH SIMPLE ON UPDATE NO ACTION ON DELETE NO ACTION" % \
+ (
+ schema_name, local_table_name, schema_name,
+ foreign_table_name)
+ pg_cursor.execute(query)
+ connection.set_isolation_level(old_isolation_level)
+ connection.commit()
+ # Get oid of newly added foreign key
+ pg_cursor.execute(
+ "SELECT oid FROM pg_constraint where conname='%s_id_fkey'" %
+ local_table_name)
+ fk_record = pg_cursor.fetchone()
+ connection.close()
+ fk_id = fk_record[0]
+ return fk_id
+ except Exception:
+ traceback.print_exc(file=sys.stderr)
+
+
+def verify_foreignkey(server, db_name, local_table_name):
+ """
+ This function verifies foreign key constraint exist or not.
+ :param server: server details
+ :type server: dict
+ :param db_name: database name
+ :type db_name: str
+ :param local_table_name: local table name
+ :type local_table_name: str
+ :return table: table record from database
+ :rtype: tuple
+ """
+ try:
+ connection = utils.get_db_connection(db_name,
+ server['username'],
+ server['db_password'],
+ server['host'],
+ server['port'])
+ pg_cursor = connection.cursor()
+ pg_cursor.execute(
+ "SELECT oid FROM pg_constraint where conname='%s_id_fkey'" %
+ local_table_name)
+ fk_record = pg_cursor.fetchone()
+ connection.close()
+ return fk_record
+ except Exception:
+ traceback.print_exc(file=sys.stderr)
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/constraints/index_constraint/__init__.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/constraints/index_constraint/__init__.py
index f350345..399fd06 100644
--- a/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/constraints/index_constraint/__init__.py
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/constraints/index_constraint/__init__.py
@@ -577,7 +577,6 @@ class IndexConstraintView(PGChildNodeView):
data=data, conn=self.conn,
constraint_name=self.constraint_name
)
-
status, msg = self.conn.execute_scalar(SQL)
if not status:
self.end_transaction()
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/constraints/index_constraint/tests/__init__.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/constraints/index_constraint/tests/__init__.py
new file mode 100644
index 0000000..8b2cf38
--- /dev/null
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/constraints/index_constraint/tests/__init__.py
@@ -0,0 +1,15 @@
+# ##########################################################################
+# #
+# # pgAdmin 4 - PostgreSQL Tools
+# #
+# # Copyright (C) 2013 - 2016, The pgAdmin Development Team
+# # This software is released under the PostgreSQL Licence
+# #
+# ##########################################################################
+# from pgadmin.utils.route import BaseTestGenerator
+#
+#
+# class IndexConstraintTestGenerator(BaseTestGenerator):
+#
+# def runTest(self):
+# return []
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/constraints/index_constraint/tests/test_index_constraint_add.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/constraints/index_constraint/tests/test_index_constraint_add.py
new file mode 100644
index 0000000..297f249
--- /dev/null
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/constraints/index_constraint/tests/test_index_constraint_add.py
@@ -0,0 +1,84 @@
+# #################################################################
+#
+# pgAdmin 4 - PostgreSQL Tools
+#
+# Copyright (C) 2013 - 2016, The pgAdmin Development Team
+# This software is released under the PostgreSQL Licence
+#
+# ##################################################################
+import json
+import uuid
+
+from regression import test_utils as utils
+from regression import parent_node_dict
+from pgadmin.utils.route import BaseTestGenerator
+from pgadmin.browser.server_groups.servers.databases.tests import utils as \
+ database_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
+ utils as schema_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
+ import utils as tables_utils
+
+
+class IndexConstraintAddTestCase(BaseTestGenerator):
+ """This class will add index constraint(primary key or unique key) to
+ table column"""
+ primary_key_name = "test_primarykey_add_%s" % \
+ (str(uuid.uuid4())[1:6])
+ primary_key_data = {"name": primary_key_name,
+ "spcname": "pg_default",
+ "columns": [{"column": "id"}]
+ }
+ unique_key_name = "test_uniquekey_add_%s" % \
+ (str(uuid.uuid4())[1:6])
+ unique_key_data = {"name": unique_key_name,
+ "spcname": "pg_default",
+ "columns": [{"column": "id"}]}
+ scenarios = [
+ ('Add primary Key constraint to table',
+ dict(url='/browser/primary_key/obj/', data=primary_key_data)),
+ ('Add unique Key constraint to table',
+ dict(url='/browser/unique_constraint/obj/', data=unique_key_data))
+ ]
+
+ @classmethod
+ def setUpClass(cls):
+ cls.db_name = parent_node_dict["database"][-1]["db_name"]
+ schema_info = parent_node_dict["schema"][-1]
+ cls.server_id = schema_info["server_id"]
+ cls.db_id = schema_info["db_id"]
+ db_con = database_utils.connect_database(cls, utils.SERVER_GROUP,
+ cls.server_id, cls.db_id)
+ if not db_con['data']["connected"]:
+ raise Exception("Could not connect to database to add a "
+ "index constraint(primary key or unique key).")
+ cls.schema_id = schema_info["schema_id"]
+ cls.schema_name = schema_info["schema_name"]
+ schema_response = schema_utils.verify_schemas(cls.server,
+ cls.db_name,
+ cls.schema_name)
+ if not schema_response:
+ raise Exception("Could not find the schema to add a index "
+ "constraint(primary key or unique key).")
+ cls.table_name = "table_indexconstraint_%s" % \
+ (str(uuid.uuid4())[1:6])
+ cls.table_id = tables_utils.create_table(cls.server,
+ cls.db_name,
+ cls.schema_name,
+ cls.table_name)
+
+ def runTest(self):
+ """This function will add index constraint(primary key or unique key)
+ to table column."""
+ response = self.tester.post(
+ self.url + str(utils.SERVER_GROUP) + '/' +
+ str(self.server_id) + '/' + str(self.db_id) +
+ '/' + str(self.schema_id) + '/' + str(self.table_id) + '/',
+ data=json.dumps(self.data),
+ content_type='html/json')
+ self.assertEquals(response.status_code, 200)
+
+ @classmethod
+ def tearDownClass(cls):
+ # Disconnect the database
+ database_utils.disconnect_database(cls, cls.server_id, cls.db_id)
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/constraints/index_constraint/tests/test_index_constraint_delete.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/constraints/index_constraint/tests/test_index_constraint_delete.py
new file mode 100644
index 0000000..2552a1b
--- /dev/null
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/constraints/index_constraint/tests/test_index_constraint_delete.py
@@ -0,0 +1,85 @@
+# #################################################################
+#
+# pgAdmin 4 - PostgreSQL Tools
+#
+# Copyright (C) 2013 - 2016, The pgAdmin Development Team
+# This software is released under the PostgreSQL Licence
+#
+# ##################################################################
+import uuid
+
+from regression import test_utils as utils
+from regression import parent_node_dict
+from pgadmin.utils.route import BaseTestGenerator
+from pgadmin.browser.server_groups.servers.databases.tests import utils as \
+ database_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
+ utils as schema_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
+ import utils as tables_utils
+from . import utils as index_constraint_utils
+
+
+class IndexConstraintDeleteTestCase(BaseTestGenerator):
+ """This class will delete index constraint(primary key or unique key) of
+ table column"""
+ primary_key_name = "test_primarykey_delete_%s" % \
+ (str(uuid.uuid4())[1:6])
+ unique_key_name = "test_uniquekey_delete_%s" % \
+ (str(uuid.uuid4())[1:6])
+ scenarios = [
+ ('Delete primary Key constraint of table',
+ dict(url='/browser/primary_key/obj/', name=primary_key_name,
+ type="PRIMARY KEY")),
+ ('Delete unique Key constraint of table',
+ dict(url='/browser/unique_constraint/obj/', name=unique_key_name,
+ type="UNIQUE"))
+ ]
+
+ @classmethod
+ def setUpClass(cls):
+ cls.db_name = parent_node_dict["database"][-1]["db_name"]
+ schema_info = parent_node_dict["schema"][-1]
+ cls.server_id = schema_info["server_id"]
+ cls.db_id = schema_info["db_id"]
+ db_con = database_utils.connect_database(cls, utils.SERVER_GROUP,
+ cls.server_id, cls.db_id)
+ if not db_con['data']["connected"]:
+ raise Exception("Could not connect to database to add a "
+ "index constraint(primary key or unique key).")
+ cls.schema_id = schema_info["schema_id"]
+ cls.schema_name = schema_info["schema_name"]
+ schema_response = schema_utils.verify_schemas(cls.server,
+ cls.db_name,
+ cls.schema_name)
+ if not schema_response:
+ raise Exception("Could not find the schema to add a index "
+ "constraint(primary key or unique key).")
+ cls.table_name = "table_indexconstraint_%s" % \
+ (str(uuid.uuid4())[1:6])
+ cls.table_id = tables_utils.create_table(cls.server,
+ cls.db_name,
+ cls.schema_name,
+ cls.table_name)
+
+ def runTest(self):
+ """This function will delete index constraint(primary key or
+ unique key) of table column."""
+ index_constraint_id = \
+ index_constraint_utils.create_index_constraint(
+ self.server, self.db_name, self.schema_name, self.table_name,
+ self.name, self.type)
+ response = self.tester.delete(
+ "{0}{1}/{2}/{3}/{4}/{5}/{6}".format(self.url, utils.SERVER_GROUP,
+ self.server_id, self.db_id,
+ self.schema_id,
+ self.table_id,
+ index_constraint_id),
+ follow_redirects=True
+ )
+ self.assertEquals(response.status_code, 200)
+
+ @classmethod
+ def tearDownClass(cls):
+ # Disconnect the database
+ database_utils.disconnect_database(cls, cls.server_id, cls.db_id)
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/constraints/index_constraint/tests/test_index_constraint_get.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/constraints/index_constraint/tests/test_index_constraint_get.py
new file mode 100644
index 0000000..ca2cf63
--- /dev/null
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/constraints/index_constraint/tests/test_index_constraint_get.py
@@ -0,0 +1,85 @@
+# #################################################################
+#
+# pgAdmin 4 - PostgreSQL Tools
+#
+# Copyright (C) 2013 - 2016, The pgAdmin Development Team
+# This software is released under the PostgreSQL Licence
+#
+# ##################################################################
+import uuid
+
+from regression import test_utils as utils
+from regression import parent_node_dict
+from pgadmin.utils.route import BaseTestGenerator
+from pgadmin.browser.server_groups.servers.databases.tests import utils as \
+ database_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
+ utils as schema_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
+ import utils as tables_utils
+from . import utils as index_constraint_utils
+
+
+class IndexConstraintGetTestCase(BaseTestGenerator):
+ """This class will fetch the index constraint(primary key or unique key) of
+ table column"""
+ primary_key_name = "test_primarykey_delete_%s" % \
+ (str(uuid.uuid4())[1:6])
+ unique_key_name = "test_uniquekey_delete_%s" % \
+ (str(uuid.uuid4())[1:6])
+ scenarios = [
+ ('Fetch primary Key constraint of table',
+ dict(url='/browser/primary_key/obj/', name=primary_key_name,
+ type="PRIMARY KEY")),
+ ('Fetch unique Key constraint of table',
+ dict(url='/browser/unique_constraint/obj/', name=unique_key_name,
+ type="UNIQUE"))
+ ]
+
+ @classmethod
+ def setUpClass(cls):
+ cls.db_name = parent_node_dict["database"][-1]["db_name"]
+ schema_info = parent_node_dict["schema"][-1]
+ cls.server_id = schema_info["server_id"]
+ cls.db_id = schema_info["db_id"]
+ db_con = database_utils.connect_database(cls, utils.SERVER_GROUP,
+ cls.server_id, cls.db_id)
+ if not db_con['data']["connected"]:
+ raise Exception("Could not connect to database to add a "
+ "index constraint(primary key or unique key).")
+ cls.schema_id = schema_info["schema_id"]
+ cls.schema_name = schema_info["schema_name"]
+ schema_response = schema_utils.verify_schemas(cls.server,
+ cls.db_name,
+ cls.schema_name)
+ if not schema_response:
+ raise Exception("Could not find the schema to add a index "
+ "constraint(primary key or unique key).")
+ cls.table_name = "table_indexconstraint_%s" % \
+ (str(uuid.uuid4())[1:6])
+ cls.table_id = tables_utils.create_table(cls.server,
+ cls.db_name,
+ cls.schema_name,
+ cls.table_name)
+
+ def runTest(self):
+ """This function will fetch the index constraint(primary key or
+ unique key) of table column."""
+ index_constraint_id = \
+ index_constraint_utils.create_index_constraint(
+ self.server, self.db_name, self.schema_name, self.table_name,
+ self.name, self.type)
+ response = self.tester.get(
+ "{0}{1}/{2}/{3}/{4}/{5}/{6}".format(self.url, utils.SERVER_GROUP,
+ self.server_id, self.db_id,
+ self.schema_id,
+ self.table_id,
+ index_constraint_id),
+ follow_redirects=True
+ )
+ self.assertEquals(response.status_code, 200)
+
+ @classmethod
+ def tearDownClass(cls):
+ # Disconnect the database
+ database_utils.disconnect_database(cls, cls.server_id, cls.db_id)
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/constraints/index_constraint/tests/test_index_constraint_put.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/constraints/index_constraint/tests/test_index_constraint_put.py
new file mode 100644
index 0000000..ba19681
--- /dev/null
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/constraints/index_constraint/tests/test_index_constraint_put.py
@@ -0,0 +1,88 @@
+# #################################################################
+#
+# pgAdmin 4 - PostgreSQL Tools
+#
+# Copyright (C) 2013 - 2016, The pgAdmin Development Team
+# This software is released under the PostgreSQL Licence
+#
+# ##################################################################
+import uuid
+import json
+
+from regression import test_utils as utils
+from regression import parent_node_dict
+from pgadmin.utils.route import BaseTestGenerator
+from pgadmin.browser.server_groups.servers.databases.tests import utils as \
+ database_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
+ utils as schema_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
+ import utils as tables_utils
+from . import utils as index_constraint_utils
+
+
+class IndexConstraintUpdateTestCase(BaseTestGenerator):
+ """This class will update index constraint(primary key or unique key) of
+ table column"""
+ primary_key_name = "test_primarykey_put_%s" % \
+ (str(uuid.uuid4())[1:6])
+ unique_key_name = "test_uniquekey_put_%s" % \
+ (str(uuid.uuid4())[1:6])
+ data = {"oid": "", "comment": "this is test comment"}
+ scenarios = [
+ ('Update primary Key constraint of table',
+ dict(url='/browser/primary_key/obj/', name=primary_key_name,
+ type="PRIMARY KEY", data=data)),
+ ('Update unique Key constraint of table',
+ dict(url='/browser/unique_constraint/obj/', name=unique_key_name,
+ type="UNIQUE", data=data))
+ ]
+
+ @classmethod
+ def setUpClass(cls):
+ cls.db_name = parent_node_dict["database"][-1]["db_name"]
+ schema_info = parent_node_dict["schema"][-1]
+ cls.server_id = schema_info["server_id"]
+ cls.db_id = schema_info["db_id"]
+ db_con = database_utils.connect_database(cls, utils.SERVER_GROUP,
+ cls.server_id, cls.db_id)
+ if not db_con['data']["connected"]:
+ raise Exception("Could not connect to database to add a "
+ "index constraint(primary key or unique key).")
+ cls.schema_id = schema_info["schema_id"]
+ cls.schema_name = schema_info["schema_name"]
+ schema_response = schema_utils.verify_schemas(cls.server,
+ cls.db_name,
+ cls.schema_name)
+ if not schema_response:
+ raise Exception("Could not find the schema to add a index "
+ "constraint(primary key or unique key).")
+ cls.table_name = "table_indexconstraint_%s" % \
+ (str(uuid.uuid4())[1:6])
+ cls.table_id = tables_utils.create_table(cls.server,
+ cls.db_name,
+ cls.schema_name,
+ cls.table_name)
+
+ def runTest(self):
+ """This function will update index constraint(primary key or
+ unique key) of table column."""
+ index_constraint_id = \
+ index_constraint_utils.create_index_constraint(
+ self.server, self.db_name, self.schema_name, self.table_name,
+ self.name, self.type)
+ self.data["oid"] = index_constraint_id
+ response = self.tester.put(
+ "{0}{1}/{2}/{3}/{4}/{5}/{6}".format(self.url, utils.SERVER_GROUP,
+ self.server_id, self.db_id,
+ self.schema_id, self.table_id,
+ index_constraint_id
+ ),
+ data=json.dumps(self.data),
+ follow_redirects=True)
+ self.assertEquals(response.status_code, 200)
+
+ @classmethod
+ def tearDownClass(cls):
+ # Disconnect the database
+ database_utils.disconnect_database(cls, cls.server_id, cls.db_id)
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/constraints/index_constraint/tests/utils.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/constraints/index_constraint/tests/utils.py
new file mode 100644
index 0000000..3e0ec99
--- /dev/null
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/constraints/index_constraint/tests/utils.py
@@ -0,0 +1,86 @@
+# ##########################################################################
+#
+# #pgAdmin 4 - PostgreSQL Tools
+#
+# #Copyright (C) 2013 - 2016, The pgAdmin Development Team
+# #This software is released under the PostgreSQL Licence
+#
+# ##########################################################################
+from __future__ import print_function
+import traceback
+import sys
+
+from regression import test_utils as utils
+
+
+def create_index_constraint(server, db_name, schema_name, table_name,
+ key_name, key_type):
+ """
+ This function creates a index constraint(PK or UK) under provided table.
+ :param server: server details
+ :type server: dict
+ :param db_name: database name
+ :type db_name: str
+ :param schema_name: schema name
+ :type schema_name: str
+ :param table_name: table name
+ :type table_name: str
+ :param key_name: test name for primary or unique key
+ :type key_name: str
+ :param key_type: key type i.e. primary or unique key
+ :type key_type: str
+ :return oid: key constraint id
+ :rtype: int
+ """
+ try:
+ connection = utils.get_db_connection(db_name,
+ server['username'],
+ server['db_password'],
+ server['host'],
+ server['port'])
+ old_isolation_level = connection.isolation_level
+ connection.set_isolation_level(0)
+ pg_cursor = connection.cursor()
+ query = "ALTER TABLE %s.%s ADD CONSTRAINT %s %s (id)" % \
+ (schema_name, table_name, key_name, key_type)
+ pg_cursor.execute(query)
+ connection.set_isolation_level(old_isolation_level)
+ connection.commit()
+ # Get oid of newly added index constraint
+ pg_cursor.execute(
+ "SELECT conindid FROM pg_constraint where conname='%s'" % key_name)
+ index_constraint = pg_cursor.fetchone()
+ connection.close()
+ oid = index_constraint[0]
+ return oid
+ except Exception:
+ traceback.print_exc(file=sys.stderr)
+
+
+def verify_index_constraint(server, db_name, table_name):
+ """
+ This function verifies that index constraint(PK or UK) is exists or not.
+ :param server: server details
+ :type server: dict
+ :param db_name: database name
+ :type db_name: str
+ :param table_name: table name
+ :type table_name: str
+ :return index_constraint: index constraint record from database
+ :rtype: tuple
+ """
+ try:
+ connection = utils.get_db_connection(db_name,
+ server['username'],
+ server['db_password'],
+ server['host'],
+ server['port'])
+ pg_cursor = connection.cursor()
+ pg_cursor.execute(
+ "SELECT oid FROM pg_constraint where conname='%s'" %
+ table_name)
+ index_constraint = pg_cursor.fetchone()
+ connection.close()
+ return index_constraint
+ except Exception:
+ traceback.print_exc(file=sys.stderr)
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/constraints/tests/__init__.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/constraints/tests/__init__.py
new file mode 100644
index 0000000..4b351a6
--- /dev/null
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/constraints/tests/__init__.py
@@ -0,0 +1,15 @@
+##########################################################################
+#
+# pgAdmin 4 - PostgreSQL Tools
+#
+# Copyright (C) 2013 - 2016, The pgAdmin Development Team
+# This software is released under the PostgreSQL Licence
+#
+##########################################################################
+from pgadmin.utils.route import BaseTestGenerator
+
+
+class ConstraintsTestGenerator(BaseTestGenerator):
+
+ def runTest(self):
+ return []
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/indexes/tests/__init__.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/indexes/tests/__init__.py
new file mode 100644
index 0000000..87ed41d
--- /dev/null
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/indexes/tests/__init__.py
@@ -0,0 +1,15 @@
+##########################################################################
+#
+# pgAdmin 4 - PostgreSQL Tools
+#
+# Copyright (C) 2013 - 2016, The pgAdmin Development Team
+# This software is released under the PostgreSQL Licence
+#
+##########################################################################
+from pgadmin.utils.route import BaseTestGenerator
+
+
+class IndexesTestGenerator(BaseTestGenerator):
+
+ def runTest(self):
+ return []
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/indexes/tests/test_indexes_add.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/indexes/tests/test_indexes_add.py
new file mode 100644
index 0000000..347cfa3
--- /dev/null
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/indexes/tests/test_indexes_add.py
@@ -0,0 +1,68 @@
+# #################################################################
+#
+# pgAdmin 4 - PostgreSQL Tools
+#
+# Copyright (C) 2013 - 2016, The pgAdmin Development Team
+# This software is released under the PostgreSQL Licence
+#
+# ##################################################################
+import json
+import uuid
+
+from regression import test_utils as utils
+from regression import parent_node_dict
+from pgadmin.utils.route import BaseTestGenerator
+from pgadmin.browser.server_groups.servers.databases.tests import utils as \
+ database_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
+ utils as schema_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
+ import utils as tables_utils
+
+
+class IndexesAddTestCase(BaseTestGenerator):
+ """This class will add new index to existing table column"""
+ scenarios = [
+ ('Add index Node URL', dict(url='/browser/index/obj/'))
+ ]
+
+ def setUp(self):
+ self.db_name = parent_node_dict["database"][-1]["db_name"]
+ schema_info = parent_node_dict["schema"][-1]
+ self.server_id = schema_info["server_id"]
+ self.db_id = schema_info["db_id"]
+ db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
+ self.server_id, self.db_id)
+ if not db_con['data']["connected"]:
+ raise Exception("Could not connect to database to add a table.")
+ self.schema_id = schema_info["schema_id"]
+ self.schema_name = schema_info["schema_name"]
+ schema_response = schema_utils.verify_schemas(self.server,
+ self.db_name,
+ self.schema_name)
+ if not schema_response:
+ raise Exception("Could not find the schema to add a table.")
+ self.table_name = "table_for_column_%s" % (str(uuid.uuid4())[1:6])
+ self.table_id = tables_utils.create_table(self.server, self.db_name,
+ self.schema_name,
+ self.table_name)
+
+ def runTest(self):
+ """This function will add index to existing table column."""
+ self.index_name = "test_index_add_%s" % (str(uuid.uuid4())[1:6])
+ data = {"name": self.index_name,
+ "spcname": "pg_default",
+ "amname": "btree",
+ "columns": [
+ {"colname": "id", "sort_order": False, "nulls": False}]}
+ response = self.tester.post(
+ self.url + str(utils.SERVER_GROUP) + '/' +
+ str(self.server_id) + '/' + str(self.db_id) +
+ '/' + str(self.schema_id) + '/' + str(self.table_id) + '/',
+ data=json.dumps(data),
+ content_type='html/json')
+ self.assertEquals(response.status_code, 200)
+
+ def tearDown(self):
+ # Disconnect the database
+ database_utils.disconnect_database(self, self.server_id, self.db_id)
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/indexes/tests/test_indexes_delete.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/indexes/tests/test_indexes_delete.py
new file mode 100644
index 0000000..60bc27b
--- /dev/null
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/indexes/tests/test_indexes_delete.py
@@ -0,0 +1,81 @@
+# #################################################################
+#
+# pgAdmin 4 - PostgreSQL Tools
+#
+# Copyright (C) 2013 - 2016, The pgAdmin Development Team
+# This software is released under the PostgreSQL Licence
+#
+# ##################################################################
+import uuid
+
+from regression import test_utils as utils
+from regression import parent_node_dict
+from pgadmin.utils.route import BaseTestGenerator
+from pgadmin.browser.server_groups.servers.databases.tests import utils as \
+ database_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
+ utils as schema_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
+ import utils as tables_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tables.column. \
+ tests import utils as columns_utils
+from . import utils as indexes_utils
+
+
+class IndexesDeleteTestCase(BaseTestGenerator):
+ """This class will delete the existing index of column."""
+ scenarios = [
+ ('Delete index Node URL', dict(url='/browser/index/obj/'))
+ ]
+
+ def setUp(self):
+ self.db_name = parent_node_dict["database"][-1]["db_name"]
+ schema_info = parent_node_dict["schema"][-1]
+ self.server_id = schema_info["server_id"]
+ self.db_id = schema_info["db_id"]
+ db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
+ self.server_id, self.db_id)
+ if not db_con['data']["connected"]:
+ raise Exception("Could not connect to database to add a table.")
+ self.schema_id = schema_info["schema_id"]
+ self.schema_name = schema_info["schema_name"]
+ schema_response = schema_utils.verify_schemas(self.server,
+ self.db_name,
+ self.schema_name)
+ if not schema_response:
+ raise Exception("Could not find the schema to add a table.")
+ self.table_name = "table_column_%s" % (str(uuid.uuid4())[1:6])
+ self.table_id = tables_utils.create_table(self.server, self.db_name,
+ self.schema_name,
+ self.table_name)
+ self.column_name = "test_column_delete_%s" % (str(uuid.uuid4())[1:6])
+ self.column_id = columns_utils.create_column(self.server,
+ self.db_name,
+ self.schema_name,
+ self.table_name,
+ self.column_name)
+ self.index_name = "test_index_delete_%s" % (str(uuid.uuid4())[1:6])
+ self.index_id = indexes_utils.create_index(self.server, self.db_name,
+ self.schema_name,
+ self.table_name,
+ self.index_name,
+ self.column_name)
+
+ def runTest(self):
+ """This function will delete index of existing column."""
+ index_response = indexes_utils.verify_index(self.server, self.db_name,
+ self.index_name)
+ if not index_response:
+ raise Exception("Could not find the index to delete.")
+ response = self.tester.delete(self.url + str(utils.SERVER_GROUP) +
+ '/' + str(self.server_id) + '/' +
+ str(self.db_id) + '/' +
+ str(self.schema_id) + '/' +
+ str(self.table_id) + '/' +
+ str(self.index_id),
+ follow_redirects=True)
+ self.assertEquals(response.status_code, 200)
+
+ def tearDown(self):
+ # Disconnect the database
+ database_utils.disconnect_database(self, self.server_id, self.db_id)
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/indexes/tests/test_indexes_get.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/indexes/tests/test_indexes_get.py
new file mode 100644
index 0000000..c8609ec
--- /dev/null
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/indexes/tests/test_indexes_get.py
@@ -0,0 +1,76 @@
+# #################################################################
+#
+# pgAdmin 4 - PostgreSQL Tools
+#
+# Copyright (C) 2013 - 2016, The pgAdmin Development Team
+# This software is released under the PostgreSQL Licence
+#
+# ##################################################################
+import uuid
+
+from regression import test_utils as utils
+from regression import parent_node_dict
+from pgadmin.utils.route import BaseTestGenerator
+from pgadmin.browser.server_groups.servers.databases.tests import utils as \
+ database_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
+ utils as schema_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
+ import utils as tables_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tables.column. \
+ tests import utils as columns_utils
+from . import utils as indexes_utils
+
+
+class IndexesGetTestCase(BaseTestGenerator):
+ """This class will fetch the existing index of column."""
+ scenarios = [
+ ('Fetch index Node URL', dict(url='/browser/index/obj/'))
+ ]
+
+ def setUp(self):
+ self.db_name = parent_node_dict["database"][-1]["db_name"]
+ schema_info = parent_node_dict["schema"][-1]
+ self.server_id = schema_info["server_id"]
+ self.db_id = schema_info["db_id"]
+ db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
+ self.server_id, self.db_id)
+ if not db_con['data']["connected"]:
+ raise Exception("Could not connect to database to add a table.")
+ self.schema_id = schema_info["schema_id"]
+ self.schema_name = schema_info["schema_name"]
+ schema_response = schema_utils.verify_schemas(self.server,
+ self.db_name,
+ self.schema_name)
+ if not schema_response:
+ raise Exception("Could not find the schema to add a table.")
+ self.table_name = "table_column_%s" % (str(uuid.uuid4())[1:6])
+ self.table_id = tables_utils.create_table(self.server, self.db_name,
+ self.schema_name,
+ self.table_name)
+ self.column_name = "test_column_delete_%s" % (str(uuid.uuid4())[1:6])
+ self.column_id = columns_utils.create_column(self.server,
+ self.db_name,
+ self.schema_name,
+ self.table_name,
+ self.column_name)
+ self.index_name = "test_index_delete_%s" % (str(uuid.uuid4())[1:6])
+ self.index_id = indexes_utils.create_index(self.server, self.db_name,
+ self.schema_name,
+ self.table_name,
+ self.index_name,
+ self.column_name)
+
+ def runTest(self):
+ """This function will fetch the existing column index."""
+ response = self.tester.get(
+ "{0}{1}/{2}/{3}/{4}/{5}/{6}".format(self.url, utils.SERVER_GROUP,
+ self.server_id, self.db_id,
+ self.schema_id, self.table_id,
+ self.index_id),
+ follow_redirects=True)
+ self.assertEquals(response.status_code, 200)
+
+ def tearDown(self):
+ # Disconnect the database
+ database_utils.disconnect_database(self, self.server_id, self.db_id)
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/indexes/tests/test_indexes_put.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/indexes/tests/test_indexes_put.py
new file mode 100644
index 0000000..42daa0d
--- /dev/null
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/indexes/tests/test_indexes_put.py
@@ -0,0 +1,84 @@
+# #################################################################
+#
+# pgAdmin 4 - PostgreSQL Tools
+#
+# Copyright (C) 2013 - 2016, The pgAdmin Development Team
+# This software is released under the PostgreSQL Licence
+#
+# ##################################################################
+import uuid
+import json
+
+from regression import test_utils as utils
+from regression import parent_node_dict
+from pgadmin.utils.route import BaseTestGenerator
+from pgadmin.browser.server_groups.servers.databases.tests import utils as \
+ database_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
+ utils as schema_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
+ import utils as tables_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tables.column. \
+ tests import utils as columns_utils
+from . import utils as indexes_utils
+
+
+class IndexesUpdateTestCase(BaseTestGenerator):
+ """This class will update the existing index of column."""
+ scenarios = [
+ ('Put index Node URL', dict(url='/browser/index/obj/'))
+ ]
+
+ def setUp(self):
+ self.db_name = parent_node_dict["database"][-1]["db_name"]
+ schema_info = parent_node_dict["schema"][-1]
+ self.server_id = schema_info["server_id"]
+ self.db_id = schema_info["db_id"]
+ db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
+ self.server_id, self.db_id)
+ if not db_con['data']["connected"]:
+ raise Exception("Could not connect to database to add a table.")
+ self.schema_id = schema_info["schema_id"]
+ self.schema_name = schema_info["schema_name"]
+ schema_response = schema_utils.verify_schemas(self.server,
+ self.db_name,
+ self.schema_name)
+ if not schema_response:
+ raise Exception("Could not find the schema to add a table.")
+ self.table_name = "table_column_%s" % (str(uuid.uuid4())[1:6])
+ self.table_id = tables_utils.create_table(self.server, self.db_name,
+ self.schema_name,
+ self.table_name)
+ self.column_name = "test_column_delete_%s" % (str(uuid.uuid4())[1:6])
+ self.column_id = columns_utils.create_column(self.server,
+ self.db_name,
+ self.schema_name,
+ self.table_name,
+ self.column_name)
+ self.index_name = "test_index_delete_%s" % (str(uuid.uuid4())[1:6])
+ self.index_id = indexes_utils.create_index(self.server, self.db_name,
+ self.schema_name,
+ self.table_name,
+ self.index_name,
+ self.column_name)
+
+ def runTest(self):
+ """This function will update the index of existing column."""
+ index_response = indexes_utils.verify_index(self.server, self.db_name,
+ self.index_name)
+ if not index_response:
+ raise Exception("Could not find the index to update.")
+ data = {"oid": self.index_id,
+ "description": "This is test comment for index"}
+ response = self.tester.put(
+ "{0}{1}/{2}/{3}/{4}/{5}/{6}".format(self.url, utils.SERVER_GROUP,
+ self.server_id, self.db_id,
+ self.schema_id, self.table_id,
+ self.index_id),
+ data=json.dumps(data),
+ follow_redirects=True)
+ self.assertEquals(response.status_code, 200)
+
+ def tearDown(self):
+ # Disconnect the database
+ database_utils.disconnect_database(self, self.server_id, self.db_id)
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/indexes/tests/utils.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/indexes/tests/utils.py
new file mode 100644
index 0000000..6a91505
--- /dev/null
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/indexes/tests/utils.py
@@ -0,0 +1,90 @@
+# ##########################################################################
+#
+# #pgAdmin 4 - PostgreSQL Tools
+#
+# #Copyright (C) 2013 - 2016, The pgAdmin Development Team
+# #This software is released under the PostgreSQL Licence
+#
+# ##########################################################################
+from __future__ import print_function
+import traceback
+import sys
+
+from regression import test_utils as utils
+
+
+def create_index(server, db_name, schema_name, table_name, index_name,
+ col_name):
+ """
+ This function will add the new index to existing column.
+ :param server: server details
+ :type server: dict
+ :param db_name: database name
+ :type db_name: str
+ :param schema_name: schema name
+ :type schema_name: str
+ :param table_name: table name
+ :type table_name: str
+ :param index_name: index name
+ :type index_name: str
+ :param col_name: column name
+ :type col_name: str
+ :return table_id: table id
+ :rtype: int
+ """
+ try:
+ connection = utils.get_db_connection(db_name,
+ server['username'],
+ server['db_password'],
+ server['host'],
+ server['port'])
+ old_isolation_level = connection.isolation_level
+ connection.set_isolation_level(0)
+ pg_cursor = connection.cursor()
+ query = "CREATE INDEX %s ON %s.%s USING btree (%s ASC NULLS LAST) " \
+ "TABLESPACE pg_default" % (index_name, schema_name,
+ table_name, col_name)
+ pg_cursor.execute(query)
+ connection.set_isolation_level(old_isolation_level)
+ connection.commit()
+ # Get oid of newly added index
+ pg_cursor.execute("select oid from pg_class where relname='%s'" %
+ index_name)
+ index_record = pg_cursor.fetchone()
+ index_oid = ''
+ if index_record:
+ index_oid = index_record[0]
+ connection.close()
+ return index_oid
+ except Exception:
+ traceback.print_exc(file=sys.stderr)
+ raise
+
+
+def verify_index(server, db_name, index_name):
+ """
+ This function verifies index exist or not.
+ :param server: server details
+ :type server: dict
+ :param db_name: database name
+ :type db_name: str
+ :param index_name: index name
+ :type index_name: str
+ :return table: table record from database
+ :rtype: tuple
+ """
+ try:
+ connection = utils.get_db_connection(db_name,
+ server['username'],
+ server['db_password'],
+ server['host'],
+ server['port'])
+ pg_cursor = connection.cursor()
+ pg_cursor.execute("select * from pg_class where relname='%s'" %
+ index_name)
+ index_record = pg_cursor.fetchone()
+ connection.close()
+ return index_record
+ except Exception:
+ traceback.print_exc(file=sys.stderr)
+ raise
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/rules/tests/__init__.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/rules/tests/__init__.py
new file mode 100644
index 0000000..6f88b19
--- /dev/null
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/rules/tests/__init__.py
@@ -0,0 +1,15 @@
+##########################################################################
+#
+# pgAdmin 4 - PostgreSQL Tools
+#
+# Copyright (C) 2013 - 2016, The pgAdmin Development Team
+# This software is released under the PostgreSQL Licence
+#
+##########################################################################
+from pgadmin.utils.route import BaseTestGenerator
+
+
+class RulesTestGenerator(BaseTestGenerator):
+
+ def runTest(self):
+ return []
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/rules/tests/test_rules_add.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/rules/tests/test_rules_add.py
new file mode 100644
index 0000000..fcf9e47
--- /dev/null
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/rules/tests/test_rules_add.py
@@ -0,0 +1,69 @@
+# #################################################################
+#
+# pgAdmin 4 - PostgreSQL Tools
+#
+# Copyright (C) 2013 - 2016, The pgAdmin Development Team
+# This software is released under the PostgreSQL Licence
+#
+# ##################################################################
+import json
+import uuid
+
+from regression import test_utils as utils
+from regression import parent_node_dict
+from pgadmin.utils.route import BaseTestGenerator
+from pgadmin.browser.server_groups.servers.databases.tests import utils as \
+ database_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
+ utils as schema_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
+ import utils as tables_utils
+
+
+class RulesAddTestCase(BaseTestGenerator):
+ """This class will add new rule under table node."""
+ scenarios = [
+ ('Add rule Node URL', dict(url='/browser/rule/obj/'))
+ ]
+
+ def setUp(self):
+ self.db_name = parent_node_dict["database"][-1]["db_name"]
+ schema_info = parent_node_dict["schema"][-1]
+ self.server_id = schema_info["server_id"]
+ self.db_id = schema_info["db_id"]
+ db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
+ self.server_id, self.db_id)
+ if not db_con['data']["connected"]:
+ raise Exception("Could not connect to database to add a rule.")
+ self.schema_id = schema_info["schema_id"]
+ self.schema_name = schema_info["schema_name"]
+ schema_response = schema_utils.verify_schemas(self.server,
+ self.db_name,
+ self.schema_name)
+ if not schema_response:
+ raise Exception("Could not find the schema to add a rule.")
+ self.table_name = "table_column_%s" % (str(uuid.uuid4())[1:6])
+ self.table_id = tables_utils.create_table(self.server, self.db_name,
+ self.schema_name,
+ self.table_name)
+
+ def runTest(self):
+ """This function will rule under table node."""
+ rule_name = "test_rule_add_%s" % (str(uuid.uuid4())[1:6])
+ data = {"schema": self.schema_name,
+ "view": self.table_name,
+ "name": rule_name,
+ "event": "Update"
+ }
+ response = self.tester.post(
+ "{0}{1}/{2}/{3}/{4}/{5}/".format(self.url, utils.SERVER_GROUP,
+ self.server_id, self.db_id,
+ self.schema_id, self.table_id),
+ data=json.dumps(data),
+ content_type='html/json'
+ )
+ self.assertEquals(response.status_code, 200)
+
+ def tearDown(self):
+ # Disconnect the database
+ database_utils.disconnect_database(self, self.server_id, self.db_id)
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/rules/tests/test_rules_delete.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/rules/tests/test_rules_delete.py
new file mode 100644
index 0000000..85c55f5
--- /dev/null
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/rules/tests/test_rules_delete.py
@@ -0,0 +1,72 @@
+# #################################################################
+#
+# pgAdmin 4 - PostgreSQL Tools
+#
+# Copyright (C) 2013 - 2016, The pgAdmin Development Team
+# This software is released under the PostgreSQL Licence
+#
+# ##################################################################
+import uuid
+
+from regression import test_utils as utils
+from regression import parent_node_dict
+from pgadmin.utils.route import BaseTestGenerator
+from pgadmin.browser.server_groups.servers.databases.tests import utils as \
+ database_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
+ utils as schema_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
+ import utils as tables_utils
+from . import utils as rules_utils
+
+
+class RulesDeleteTestCase(BaseTestGenerator):
+ """This class will delete rule under table node."""
+ scenarios = [
+ ('Delete rule Node URL', dict(url='/browser/rule/obj/'))
+ ]
+
+ def setUp(self):
+ self.db_name = parent_node_dict["database"][-1]["db_name"]
+ schema_info = parent_node_dict["schema"][-1]
+ self.server_id = schema_info["server_id"]
+ self.db_id = schema_info["db_id"]
+ db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
+ self.server_id, self.db_id)
+ if not db_con['data']["connected"]:
+ raise Exception("Could not connect to database to delete rule.")
+ self.schema_id = schema_info["schema_id"]
+ self.schema_name = schema_info["schema_name"]
+ schema_response = schema_utils.verify_schemas(self.server,
+ self.db_name,
+ self.schema_name)
+ if not schema_response:
+ raise Exception("Could not find the schema to delete rule.")
+ self.table_name = "table_column_%s" % (str(uuid.uuid4())[1:6])
+ self.table_id = tables_utils.create_table(self.server, self.db_name,
+ self.schema_name,
+ self.table_name)
+ self.rule_name = "test_rule_delete_%s" % (str(uuid.uuid4())[1:6])
+ self.rule_id = rules_utils.create_rule(self.server, self.db_name,
+ self.schema_name,
+ self.table_name,
+ self.rule_name)
+
+ def runTest(self):
+ """This function will delete rule under table node."""
+ rule_response = rules_utils.verify_rule(self.server, self.db_name,
+ self.rule_name)
+ if not rule_response:
+ raise Exception("Could not find the rule to delete.")
+ response = self.tester.delete(
+ "{0}{1}/{2}/{3}/{4}/{5}/{6}".format(self.url, utils.SERVER_GROUP,
+ self.server_id, self.db_id,
+ self.schema_id, self.table_id,
+ self.rule_id),
+ follow_redirects=True
+ )
+ self.assertEquals(response.status_code, 200)
+
+ def tearDown(self):
+ # Disconnect the database
+ database_utils.disconnect_database(self, self.server_id, self.db_id)
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/rules/tests/test_rules_get.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/rules/tests/test_rules_get.py
new file mode 100644
index 0000000..bfcc135
--- /dev/null
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/rules/tests/test_rules_get.py
@@ -0,0 +1,68 @@
+# #################################################################
+#
+# pgAdmin 4 - PostgreSQL Tools
+#
+# Copyright (C) 2013 - 2016, The pgAdmin Development Team
+# This software is released under the PostgreSQL Licence
+#
+# ##################################################################
+import uuid
+
+from regression import test_utils as utils
+from regression import parent_node_dict
+from pgadmin.utils.route import BaseTestGenerator
+from pgadmin.browser.server_groups.servers.databases.tests import utils as \
+ database_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
+ utils as schema_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
+ import utils as tables_utils
+from . import utils as rules_utils
+
+
+class RulesGetTestCase(BaseTestGenerator):
+ """This class will fetch the rule under table node."""
+ scenarios = [
+ ('Fetch rule Node URL', dict(url='/browser/rule/obj/'))
+ ]
+
+ def setUp(self):
+ self.db_name = parent_node_dict["database"][-1]["db_name"]
+ schema_info = parent_node_dict["schema"][-1]
+ self.server_id = schema_info["server_id"]
+ self.db_id = schema_info["db_id"]
+ db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
+ self.server_id, self.db_id)
+ if not db_con['data']["connected"]:
+ raise Exception("Could not connect to database to delete rule.")
+ self.schema_id = schema_info["schema_id"]
+ self.schema_name = schema_info["schema_name"]
+ schema_response = schema_utils.verify_schemas(self.server,
+ self.db_name,
+ self.schema_name)
+ if not schema_response:
+ raise Exception("Could not find the schema to delete rule.")
+ self.table_name = "table_column_%s" % (str(uuid.uuid4())[1:6])
+ self.table_id = tables_utils.create_table(self.server, self.db_name,
+ self.schema_name,
+ self.table_name)
+ self.rule_name = "test_rule_delete_%s" % (str(uuid.uuid4())[1:6])
+ self.rule_id = rules_utils.create_rule(self.server, self.db_name,
+ self.schema_name,
+ self.table_name,
+ self.rule_name)
+
+ def runTest(self):
+ """This function will fetch the rule under table node."""
+ response = self.tester.get(
+ "{0}{1}/{2}/{3}/{4}/{5}/{6}".format(self.url, utils.SERVER_GROUP,
+ self.server_id, self.db_id,
+ self.schema_id, self.table_id,
+ self.rule_id),
+ follow_redirects=True
+ )
+ self.assertEquals(response.status_code, 200)
+
+ def tearDown(self):
+ # Disconnect the database
+ database_utils.disconnect_database(self, self.server_id, self.db_id)
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/rules/tests/test_rules_put.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/rules/tests/test_rules_put.py
new file mode 100644
index 0000000..f3ec469
--- /dev/null
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/rules/tests/test_rules_put.py
@@ -0,0 +1,76 @@
+# #################################################################
+#
+# pgAdmin 4 - PostgreSQL Tools
+#
+# Copyright (C) 2013 - 2016, The pgAdmin Development Team
+# This software is released under the PostgreSQL Licence
+#
+# ##################################################################
+import uuid
+import json
+
+from regression import test_utils as utils
+from regression import parent_node_dict
+from pgadmin.utils.route import BaseTestGenerator
+from pgadmin.browser.server_groups.servers.databases.tests import utils as \
+ database_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
+ utils as schema_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
+ import utils as tables_utils
+from . import utils as rules_utils
+
+
+class RulesUpdateTestCase(BaseTestGenerator):
+ """This class will update the rule under table node."""
+ scenarios = [
+ ('Put rule Node URL', dict(url='/browser/rule/obj/'))
+ ]
+
+ def setUp(self):
+ self.db_name = parent_node_dict["database"][-1]["db_name"]
+ schema_info = parent_node_dict["schema"][-1]
+ self.server_id = schema_info["server_id"]
+ self.db_id = schema_info["db_id"]
+ db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
+ self.server_id, self.db_id)
+ if not db_con['data']["connected"]:
+ raise Exception("Could not connect to database to delete rule.")
+ self.schema_id = schema_info["schema_id"]
+ self.schema_name = schema_info["schema_name"]
+ schema_response = schema_utils.verify_schemas(self.server,
+ self.db_name,
+ self.schema_name)
+ if not schema_response:
+ raise Exception("Could not find the schema to delete rule.")
+ self.table_name = "table_column_%s" % (str(uuid.uuid4())[1:6])
+ self.table_id = tables_utils.create_table(self.server, self.db_name,
+ self.schema_name,
+ self.table_name)
+ self.rule_name = "test_rule_delete_%s" % (str(uuid.uuid4())[1:6])
+ self.rule_id = rules_utils.create_rule(self.server, self.db_name,
+ self.schema_name,
+ self.table_name,
+ self.rule_name)
+
+ def runTest(self):
+ """This function will update the rule under table node."""
+ rule_response = rules_utils.verify_rule(self.server, self.db_name,
+ self.rule_name)
+ data = {"id": self.rule_id,
+ "comment": "This is testing comment."
+ }
+ if not rule_response:
+ raise Exception("Could not find the rule to update.")
+ response = self.tester.put(
+ "{0}{1}/{2}/{3}/{4}/{5}/{6}".format(self.url, utils.SERVER_GROUP,
+ self.server_id, self.db_id,
+ self.schema_id, self.table_id,
+ self.rule_id),
+ data=json.dumps(data),
+ follow_redirects=True)
+ self.assertEquals(response.status_code, 200)
+
+ def tearDown(self):
+ # Disconnect the database
+ database_utils.disconnect_database(self, self.server_id, self.db_id)
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/rules/tests/utils.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/rules/tests/utils.py
new file mode 100644
index 0000000..cd6468a
--- /dev/null
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/rules/tests/utils.py
@@ -0,0 +1,86 @@
+# ##########################################################################
+#
+# #pgAdmin 4 - PostgreSQL Tools
+#
+# #Copyright (C) 2013 - 2016, The pgAdmin Development Team
+# #This software is released under the PostgreSQL Licence
+#
+# ##########################################################################
+from __future__ import print_function
+import traceback
+import sys
+
+from regression import test_utils as utils
+
+
+def create_rule(server, db_name, schema_name, table_name, rule_name):
+ """
+ This function creates a rule under provided table.
+ :param server: server details
+ :type server: dict
+ :param db_name: database name
+ :type db_name: str
+ :param schema_name: schema name
+ :type schema_name: str
+ :param table_name: table name
+ :type table_name: str
+ :param rule_name: rule name
+ :type rule_name: str
+ :return rule_id: role id
+ :rtype: int
+ """
+ try:
+ connection = utils.get_db_connection(db_name,
+ server['username'],
+ server['db_password'],
+ server['host'],
+ server['port'])
+ old_isolation_level = connection.isolation_level
+ connection.set_isolation_level(0)
+ pg_cursor = connection.cursor()
+ query = "CREATE OR REPLACE RULE %s AS ON UPDATE TO %s.%s DO NOTHING" %\
+ (rule_name, schema_name, table_name)
+ pg_cursor.execute(query)
+ connection.set_isolation_level(old_isolation_level)
+ connection.commit()
+ # Get role oid of newly added rule
+ pg_cursor.execute("select oid from pg_rewrite where rulename='%s'" %
+ rule_name)
+ rule = pg_cursor.fetchone()
+ rule_id = ''
+ if rule:
+ rule_id = rule[0]
+ connection.close()
+ return rule_id
+ except Exception:
+ traceback.print_exc(file=sys.stderr)
+ raise
+
+
+def verify_rule(server, db_name, rule_name):
+ """
+ This function verifies rule exist in database or not.
+ :param server: server details
+ :type server: dict
+ :param db_name: database name
+ :type db_name: str
+ :param rule_name: rule name
+ :type rule_name: str
+ :return rule: rule record from database
+ :rtype: tuple
+ """
+ try:
+ connection = utils.get_db_connection(db_name,
+ server['username'],
+ server['db_password'],
+ server['host'],
+ server['port'])
+ pg_cursor = connection.cursor()
+ pg_cursor.execute("select * from pg_rewrite where rulename='%s'" %
+ rule_name)
+ rule = pg_cursor.fetchone()
+ connection.close()
+ return rule
+ except Exception:
+ traceback.print_exc(file=sys.stderr)
+ raise
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/tests/__init__.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/tests/__init__.py
new file mode 100644
index 0000000..e6bc81a
--- /dev/null
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/tests/__init__.py
@@ -0,0 +1,16 @@
+##########################################################################
+#
+# pgAdmin 4 - PostgreSQL Tools
+#
+# Copyright (C) 2013 - 2016, The pgAdmin Development Team
+# This software is released under the PostgreSQL Licence
+#
+##########################################################################
+
+from pgadmin.utils.route import BaseTestGenerator
+
+
+class TablesTestGenerator(BaseTestGenerator):
+
+ def runTest(self):
+ return []
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/tests/test_table_add.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/tests/test_table_add.py
new file mode 100644
index 0000000..374af17
--- /dev/null
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/tests/test_table_add.py
@@ -0,0 +1,182 @@
+# #################################################################
+#
+# pgAdmin 4 - PostgreSQL Tools
+#
+# Copyright (C) 2013 - 2016, The pgAdmin Development Team
+# This software is released under the PostgreSQL Licence
+#
+# ##################################################################
+import json
+import uuid
+
+from regression import test_utils as utils
+from regression import parent_node_dict
+from pgadmin.utils.route import BaseTestGenerator
+from pgadmin.browser.server_groups.servers.databases.tests import utils as \
+ database_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
+ utils as schema_utils
+from pgadmin.browser.server_groups.servers.tablespaces.tests import utils as \
+ tablespace_utils
+from . import utils as table_utils
+
+
+class TableAddTestCase(BaseTestGenerator):
+ """ This class will add new collation under schema node. """
+ scenarios = [
+ # Fetching default URL for table node.
+ ('Fetch table Node URL', dict(url='/browser/table/obj/'))
+ ]
+
+ def setUp(self):
+ self.db_name = parent_node_dict["database"][-1]["db_name"]
+ schema_info = parent_node_dict["schema"][-1]
+ self.server_id = schema_info["server_id"]
+ self.db_id = schema_info["db_id"]
+ db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
+ self.server_id, self.db_id)
+ if not db_con['data']["connected"]:
+ raise Exception("Could not connect to database to add a table.")
+ self.schema_id = schema_info["schema_id"]
+ self.schema_name = schema_info["schema_name"]
+ schema_response = schema_utils.verify_schemas(self.server,
+ self.db_name,
+ self.schema_name)
+ if not schema_response:
+ raise Exception("Could not find the schema to add a table.")
+
+ def runTest(self):
+ """ This function will add table under schema node. """
+ db_user = self.server["username"]
+ self.table_name = "test_table_add_%s" % (str(uuid.uuid4())[1:6])
+ data = {
+ "check_constraint": [],
+ "coll_inherits": "[]",
+ "columns": [
+ {
+ "name": "empno",
+ "cltype": "numeric",
+ "attacl": [],
+ "is_primary_key": False,
+ "attoptions": [],
+ "seclabels": []
+ },
+ {
+ "name": "empname",
+ "cltype": "character[]",
+ "attacl": [],
+ "is_primary_key": False,
+ "attoptions": [],
+ "seclabels": []
+ },
+ {"name": "DOJ",
+ "cltype": "date[]",
+ "attacl": [],
+ "is_primary_key": False,
+ "attoptions": [],
+ "seclabels": []
+ }
+ ],
+ "exclude_constraint": [],
+ "fillfactor": "11",
+ "hastoasttable": True,
+ "like_constraints": True,
+ "like_default_value": True,
+ "like_relation": "pg_catalog.pg_tables",
+ "name": self.table_name,
+ "primary_key": [],
+ "relacl": [
+ {
+ "grantee": db_user,
+ "grantor": db_user,
+ "privileges":
+ [
+ {
+ "privilege_type": "a",
+ "privilege": True,
+ "with_grant": True
+ },
+ {
+ "privilege_type": "r",
+ "privilege": True,
+ "with_grant": False
+ },
+ {
+ "privilege_type": "w",
+ "privilege": True,
+ "with_grant": False
+ }
+ ]
+ }
+ ],
+ "relhasoids": True,
+ "relowner": db_user,
+ "schema": self.schema_name,
+ "seclabels": [],
+ "spcname": "pg_default",
+ "unique_constraint": [],
+ "vacuum_table": [
+ {
+ "name": "autovacuum_analyze_scale_factor"
+ },
+ {
+ "name": "autovacuum_analyze_threshold"
+ },
+ {
+ "name": "autovacuum_freeze_max_age"
+ },
+ {
+ "name": "autovacuum_vacuum_cost_delay"
+ },
+ {
+ "name": "autovacuum_vacuum_cost_limit"
+ },
+ {
+ "name": "autovacuum_vacuum_scale_factor"
+ },
+ {
+ "name": "autovacuum_vacuum_threshold"
+ },
+ {
+ "name": "autovacuum_freeze_min_age"
+ },
+ {
+ "name": "autovacuum_freeze_table_age"
+ }
+ ],
+ "vacuum_toast": [
+ {
+ "name": "autovacuum_freeze_max_age"
+ },
+ {
+ "name": "autovacuum_vacuum_cost_delay"
+ },
+ {
+ "name": "autovacuum_vacuum_cost_limit"
+ },
+ {
+ "name": "autovacuum_vacuum_scale_factor"
+ },
+ {
+ "name": "autovacuum_vacuum_threshold"
+ },
+ {
+ "name": "autovacuum_freeze_min_age"
+ },
+ {
+ "name": "autovacuum_freeze_table_age"
+ }
+ ]
+ }
+ # Add table
+ response = self.tester.post(
+ self.url + str(utils.SERVER_GROUP) + '/' +
+ str(self.server_id) + '/' + str(self.db_id) +
+ '/' + str(self.schema_id) + '/',
+ data=json.dumps(data),
+ content_type='html/json')
+ self.assertEquals(response.status_code, 200)
+
+ def tearDown(self):
+ # Disconnect the database
+ database_utils.disconnect_database(self, self.server_id, self.db_id)
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/tests/test_table_delete.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/tests/test_table_delete.py
new file mode 100644
index 0000000..362ca19
--- /dev/null
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/tests/test_table_delete.py
@@ -0,0 +1,65 @@
+# #################################################################
+#
+# pgAdmin 4 - PostgreSQL Tools
+#
+# Copyright (C) 2013 - 2016, The pgAdmin Development Team
+# This software is released under the PostgreSQL Licence
+#
+# ##################################################################
+import uuid
+
+from regression import test_utils as utils
+from regression import parent_node_dict
+from pgadmin.utils.route import BaseTestGenerator
+from pgadmin.browser.server_groups.servers.databases.tests import utils as \
+ database_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
+ utils as schema_utils
+from . import utils as tables_utils
+
+
+class TableDeleteTestCase(BaseTestGenerator):
+ """This class will delete new table under schema node."""
+ scenarios = [
+ # Fetching default URL for table node.
+ ('Fetch table Node URL', dict(url='/browser/table/obj/'))
+ ]
+
+ def setUp(self):
+ self.db_name = parent_node_dict["database"][-1]["db_name"]
+ schema_info = parent_node_dict["schema"][-1]
+ self.server_id = schema_info["server_id"]
+ self.db_id = schema_info["db_id"]
+ db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
+ self.server_id, self.db_id)
+ if not db_con['data']["connected"]:
+ raise Exception("Could not connect to database to add a table.")
+ self.schema_id = schema_info["schema_id"]
+ self.schema_name = schema_info["schema_name"]
+ schema_response = schema_utils.verify_schemas(self.server,
+ self.db_name,
+ self.schema_name)
+ if not schema_response:
+ raise Exception("Could not find the schema to add a table.")
+ self.table_name = "test_table_delete_%s" % (str(uuid.uuid4())[1:6])
+ self.table_id = tables_utils.create_table(self.server, self.db_name,
+ self.schema_name,
+ self.table_name)
+
+ def runTest(self):
+ """This function will delete added table under schema node."""
+ table_response = tables_utils.verify_table(self.server, self.db_name,
+ self.table_id)
+ if not table_response:
+ raise Exception("Could not find the table to delete.")
+ response = self.tester.delete(self.url + str(utils.SERVER_GROUP) +
+ '/' + str(self.server_id) + '/' +
+ str(self.db_id) + '/' +
+ str(self.schema_id) + '/' +
+ str(self.table_id),
+ follow_redirects=True)
+ self.assertEquals(response.status_code, 200)
+
+ def tearDown(self):
+ # Disconnect the database
+ database_utils.disconnect_database(self, self.server_id, self.db_id)
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/tests/test_table_get.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/tests/test_table_get.py
new file mode 100644
index 0000000..666751f
--- /dev/null
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/tests/test_table_get.py
@@ -0,0 +1,61 @@
+# #################################################################
+#
+# pgAdmin 4 - PostgreSQL Tools
+#
+# Copyright (C) 2013 - 2016, The pgAdmin Development Team
+# This software is released under the PostgreSQL Licence
+#
+# ##################################################################
+import uuid
+
+from regression import test_utils as utils
+from regression import parent_node_dict
+from pgadmin.utils.route import BaseTestGenerator
+from pgadmin.browser.server_groups.servers.databases.tests import utils as \
+ database_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
+ utils as schema_utils
+from . import utils as tables_utils
+
+
+class TableGetTestCase(BaseTestGenerator):
+ """This class will add new collation under schema node."""
+ scenarios = [
+ # Fetching default URL for table node.
+ ('Fetch table Node URL', dict(url='/browser/table/obj/'))
+ ]
+
+ def setUp(self):
+ self.db_name = parent_node_dict["database"][-1]["db_name"]
+ schema_info = parent_node_dict["schema"][-1]
+ self.server_id = schema_info["server_id"]
+ self.db_id = schema_info["db_id"]
+ db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
+ self.server_id, self.db_id)
+ if not db_con['data']["connected"]:
+ raise Exception("Could not connect to database to add a table.")
+ self.schema_id = schema_info["schema_id"]
+ self.schema_name = schema_info["schema_name"]
+ schema_response = schema_utils.verify_schemas(self.server,
+ self.db_name,
+ self.schema_name)
+ if not schema_response:
+ raise Exception("Could not find the schema to add a table.")
+ self.table_name = "test_table_get_%s" % (str(uuid.uuid4())[1:6])
+ self.table_id = tables_utils.create_table(self.server, self.db_name,
+ self.schema_name,
+ self.table_name)
+
+ def runTest(self):
+ """This function will delete added table under schema node."""
+ response = self.tester.get(self.url + str(utils.SERVER_GROUP) +
+ '/' + str(self.server_id) + '/' +
+ str(self.db_id) + '/' +
+ str(self.schema_id) + '/' +
+ str(self.table_id),
+ follow_redirects=True)
+ self.assertEquals(response.status_code, 200)
+
+ def tearDown(self):
+ # Disconnect the database
+ database_utils.disconnect_database(self, self.server_id, self.db_id)
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/tests/test_table_put.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/tests/test_table_put.py
new file mode 100644
index 0000000..4441ac2
--- /dev/null
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/tests/test_table_put.py
@@ -0,0 +1,69 @@
+# #################################################################
+#
+# pgAdmin 4 - PostgreSQL Tools
+#
+# Copyright (C) 2013 - 2016, The pgAdmin Development Team
+# This software is released under the PostgreSQL Licence
+#
+# ##################################################################
+import uuid
+import json
+
+from regression import test_utils as utils
+from regression import parent_node_dict
+from pgadmin.utils.route import BaseTestGenerator
+from pgadmin.browser.server_groups.servers.databases.tests import utils as \
+ database_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
+ utils as schema_utils
+from . import utils as tables_utils
+
+
+class TableUpdateTestCase(BaseTestGenerator):
+ """This class will add new collation under schema node."""
+ scenarios = [
+ # Fetching default URL for table node.
+ ('Fetch table Node URL', dict(url='/browser/table/obj/'))
+ ]
+
+ def setUp(self):
+ self.db_name = parent_node_dict["database"][-1]["db_name"]
+ schema_info = parent_node_dict["schema"][-1]
+ self.server_id = schema_info["server_id"]
+ self.db_id = schema_info["db_id"]
+ db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
+ self.server_id, self.db_id)
+ if not db_con['data']["connected"]:
+ raise Exception("Could not connect to database to add a table.")
+ self.schema_id = schema_info["schema_id"]
+ self.schema_name = schema_info["schema_name"]
+ schema_response = schema_utils.verify_schemas(self.server,
+ self.db_name,
+ self.schema_name)
+ if not schema_response:
+ raise Exception("Could not find the schema to add a table.")
+ self.table_name = "test_table_put_%s" % (str(uuid.uuid4())[1:6])
+ self.table_id = tables_utils.create_table(self.server, self.db_name,
+ self.schema_name,
+ self.table_name)
+
+ def runTest(self):
+ """This function will fetch added table under schema node."""
+ table_response = tables_utils.verify_table(self.server, self.db_name,
+ self.table_id)
+ if not table_response:
+ raise Exception("Could not find the table to update.")
+ data = {
+ "description": "This is test comment for table",
+ "id": self.table_id
+ }
+ response = self.tester.put(
+ self.url + str(utils.SERVER_GROUP) + '/' +
+ str(self.server_id) + '/' + str(self.db_id) + '/' +
+ str(self.schema_id) + '/' + str(self.table_id),
+ data=json.dumps(data), follow_redirects=True)
+ self.assertEquals(response.status_code, 200)
+
+ def tearDown(self):
+ # Disconnect the database
+ database_utils.disconnect_database(self, self.server_id, self.db_id)
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/tests/utils.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/tests/utils.py
new file mode 100644
index 0000000..680f869
--- /dev/null
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/tests/utils.py
@@ -0,0 +1,85 @@
+# ##########################################################################
+#
+# #pgAdmin 4 - PostgreSQL Tools
+#
+# #Copyright (C) 2013 - 2016, The pgAdmin Development Team
+# #This software is released under the PostgreSQL Licence
+#
+# ##########################################################################
+from __future__ import print_function
+import traceback
+import sys
+
+from regression import test_utils as utils
+
+
+def create_table(server, db_name, schema_name, table_name):
+ """
+ This function creates a table under provided schema.
+ :param server: server details
+ :type server: dict
+ :param db_name: database name
+ :type db_name: str
+ :param schema_name: schema name
+ :type schema_name: str
+ :param table_name: table name
+ :type table_name: str
+ :return table_id: table id
+ :rtype: int
+ """
+ try:
+ connection = utils.get_db_connection(db_name,
+ server['username'],
+ server['db_password'],
+ server['host'],
+ server['port'])
+ old_isolation_level = connection.isolation_level
+ connection.set_isolation_level(0)
+ pg_cursor = connection.cursor()
+ query = "CREATE TABLE %s.%s(id serial UNIQUE NOT NULL, name text," \
+ " location text)" %\
+ (schema_name, table_name)
+ pg_cursor.execute(query)
+ connection.set_isolation_level(old_isolation_level)
+ connection.commit()
+ # Get 'oid' from newly created table
+ pg_cursor.execute("select oid from pg_class where relname='%s'" %
+ table_name)
+ table = pg_cursor.fetchone()
+ table_id = ''
+ if table:
+ table_id = table[0]
+ connection.close()
+ return table_id
+ except Exception:
+ traceback.print_exc(file=sys.stderr)
+ raise
+
+
+def verify_table(server, db_name, table_id):
+ """
+ This function verifies table exist in database or not.
+ :param server: server details
+ :type server: dict
+ :param db_name: database name
+ :type db_name: str
+ :param table_id: schema name
+ :type table_id: int
+ :return table: table record from database
+ :rtype: tuple
+ """
+ try:
+ connection = utils.get_db_connection(db_name,
+ server['username'],
+ server['db_password'],
+ server['host'],
+ server['port'])
+ pg_cursor = connection.cursor()
+ pg_cursor.execute("SELECT * FROM pg_class tb WHERE tb.oid=%s" %
+ table_id)
+ table = pg_cursor.fetchone()
+ connection.close()
+ return table
+ except Exception:
+ traceback.print_exc(file=sys.stderr)
+ raise
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/triggers/tests/__init__.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/triggers/tests/__init__.py
new file mode 100644
index 0000000..b9f6e4d
--- /dev/null
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/triggers/tests/__init__.py
@@ -0,0 +1,15 @@
+##########################################################################
+#
+# pgAdmin 4 - PostgreSQL Tools
+#
+# Copyright (C) 2013 - 2016, The pgAdmin Development Team
+# This software is released under the PostgreSQL Licence
+#
+##########################################################################
+from pgadmin.utils.route import BaseTestGenerator
+
+
+class TriggersTestGenerator(BaseTestGenerator):
+
+ def runTest(self):
+ return []
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/triggers/tests/test_triggers_add.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/triggers/tests/test_triggers_add.py
new file mode 100644
index 0000000..d90d6c3
--- /dev/null
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/triggers/tests/test_triggers_add.py
@@ -0,0 +1,78 @@
+# #################################################################
+#
+# pgAdmin 4 - PostgreSQL Tools
+#
+# Copyright (C) 2013 - 2016, The pgAdmin Development Team
+# This software is released under the PostgreSQL Licence
+#
+# ##################################################################
+import json
+import uuid
+
+from regression import test_utils as utils
+from regression import parent_node_dict
+from pgadmin.utils.route import BaseTestGenerator
+from pgadmin.browser.server_groups.servers.databases.tests import utils as \
+ database_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
+ utils as schema_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
+ import utils as tables_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.functions.tests \
+ import utils as trigger_funcs_utils
+
+
+class TriggersAddTestCase(BaseTestGenerator):
+ """This class will add new trigger under table node."""
+ scenarios = [
+ ('Add trigger Node URL', dict(url='/browser/trigger/obj/'))
+ ]
+
+ def setUp(self):
+ self.db_name = parent_node_dict["database"][-1]["db_name"]
+ schema_info = parent_node_dict["schema"][-1]
+ self.server_id = schema_info["server_id"]
+ self.db_id = schema_info["db_id"]
+ db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
+ self.server_id, self.db_id)
+ if not db_con['data']["connected"]:
+ raise Exception("Could not connect to database to add a trigger.")
+ self.schema_id = schema_info["schema_id"]
+ self.schema_name = schema_info["schema_name"]
+ schema_response = schema_utils.verify_schemas(self.server,
+ self.db_name,
+ self.schema_name)
+ if not schema_response:
+ raise Exception("Could not find the schema to add a trigger.")
+ self.table_name = "table_trigger_%s" % (str(uuid.uuid4())[1:6])
+ self.table_id = tables_utils.create_table(self.server, self.db_name,
+ self.schema_name,
+ self.table_name)
+ self.func_name = "trigger_func_add_%s" % str(uuid.uuid4())[1:6]
+ self.function_info = \
+ trigger_funcs_utils.create_trigger_function_with_trigger(
+ self.server, self.db_name, self.schema_name, self.func_name)
+
+ def runTest(self):
+ """This function will trigger under table node."""
+ trigger_name = "test_trigger_add_%s" % (str(uuid.uuid4())[1:6])
+ data = {"name": trigger_name,
+ "is_row_trigger": True,
+ "fires": "BEFORE",
+ "columns": [],
+ "tfunction": "{0}.{1}".format(self.schema_name,
+ self.func_name),
+ "evnt_insert": True
+ }
+ response = self.tester.post(
+ "{0}{1}/{2}/{3}/{4}/{5}/".format(self.url, utils.SERVER_GROUP,
+ self.server_id, self.db_id,
+ self.schema_id, self.table_id),
+ data=json.dumps(data),
+ content_type='html/json'
+ )
+ self.assertEquals(response.status_code, 200)
+
+ def tearDown(self):
+ # Disconnect the database
+ database_utils.disconnect_database(self, self.server_id, self.db_id)
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/triggers/tests/test_triggers_delete.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/triggers/tests/test_triggers_delete.py
new file mode 100644
index 0000000..bbbb664
--- /dev/null
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/triggers/tests/test_triggers_delete.py
@@ -0,0 +1,81 @@
+# #################################################################
+#
+# pgAdmin 4 - PostgreSQL Tools
+#
+# Copyright (C) 2013 - 2016, The pgAdmin Development Team
+# This software is released under the PostgreSQL Licence
+#
+# ##################################################################
+import uuid
+
+from regression import test_utils as utils
+from regression import parent_node_dict
+from pgadmin.utils.route import BaseTestGenerator
+from pgadmin.browser.server_groups.servers.databases.tests import utils as \
+ database_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
+ utils as schema_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
+ import utils as tables_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.functions.tests \
+ import utils as trigger_funcs_utils
+from . import utils as triggers_utils
+
+
+class TriggersDeleteTestCase(BaseTestGenerator):
+ """This class will delete trigger under table node."""
+ scenarios = [
+ ('Delete trigger Node URL', dict(url='/browser/trigger/obj/'))
+ ]
+
+ def setUp(self):
+ self.db_name = parent_node_dict["database"][-1]["db_name"]
+ schema_info = parent_node_dict["schema"][-1]
+ self.server_id = schema_info["server_id"]
+ self.db_id = schema_info["db_id"]
+ db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
+ self.server_id, self.db_id)
+ if not db_con['data']["connected"]:
+ raise Exception("Could not connect to database to delete trigger.")
+ self.schema_id = schema_info["schema_id"]
+ self.schema_name = schema_info["schema_name"]
+ schema_response = schema_utils.verify_schemas(self.server,
+ self.db_name,
+ self.schema_name)
+ if not schema_response:
+ raise Exception("Could not find the schema to delete trigger.")
+ self.table_name = "table_trigger_%s" % (str(uuid.uuid4())[1:6])
+ self.table_id = tables_utils.create_table(self.server, self.db_name,
+ self.schema_name,
+ self.table_name)
+ self.func_name = "trigger_func_delete_%s" % str(uuid.uuid4())[1:6]
+ self.function_info = \
+ trigger_funcs_utils.create_trigger_function_with_trigger(
+ self.server, self.db_name, self.schema_name, self.func_name)
+ self.trigger_name = "test_trigger_delete_%s" % (str(uuid.uuid4())[1:6])
+ self.trigger_id = triggers_utils.create_trigger(self.server,
+ self.db_name,
+ self.schema_name,
+ self.table_name,
+ self.trigger_name,
+ self.func_name)
+
+ def runTest(self):
+ """This function will delete trigger under table node."""
+ trigger_response = triggers_utils.verify_trigger(self.server,
+ self.db_name,
+ self.trigger_name)
+ if not trigger_response:
+ raise Exception("Could not find the trigger to delete.")
+ response = self.tester.delete(
+ "{0}{1}/{2}/{3}/{4}/{5}/{6}".format(self.url, utils.SERVER_GROUP,
+ self.server_id, self.db_id,
+ self.schema_id, self.table_id,
+ self.trigger_id),
+ follow_redirects=True
+ )
+ self.assertEquals(response.status_code, 200)
+
+ def tearDown(self):
+ # Disconnect the database
+ database_utils.disconnect_database(self, self.server_id, self.db_id)
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/triggers/tests/test_triggers_get.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/triggers/tests/test_triggers_get.py
new file mode 100644
index 0000000..2e06453
--- /dev/null
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/triggers/tests/test_triggers_get.py
@@ -0,0 +1,76 @@
+# #################################################################
+#
+# pgAdmin 4 - PostgreSQL Tools
+#
+# Copyright (C) 2013 - 2016, The pgAdmin Development Team
+# This software is released under the PostgreSQL Licence
+#
+# ##################################################################
+import uuid
+
+from regression import test_utils as utils
+from regression import parent_node_dict
+from pgadmin.utils.route import BaseTestGenerator
+from pgadmin.browser.server_groups.servers.databases.tests import utils as \
+ database_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
+ utils as schema_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
+ import utils as tables_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.functions.tests \
+ import utils as trigger_funcs_utils
+from . import utils as triggers_utils
+
+
+class TriggersGetTestCase(BaseTestGenerator):
+ """This class will fetch trigger under table node."""
+ scenarios = [
+ ('Fetch trigger Node URL', dict(url='/browser/trigger/obj/'))
+ ]
+
+ def setUp(self):
+ self.db_name = parent_node_dict["database"][-1]["db_name"]
+ schema_info = parent_node_dict["schema"][-1]
+ self.server_id = schema_info["server_id"]
+ self.db_id = schema_info["db_id"]
+ db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
+ self.server_id, self.db_id)
+ if not db_con['data']["connected"]:
+ raise Exception("Could not connect to database to get a trigger.")
+ self.schema_id = schema_info["schema_id"]
+ self.schema_name = schema_info["schema_name"]
+ schema_response = schema_utils.verify_schemas(self.server,
+ self.db_name,
+ self.schema_name)
+ if not schema_response:
+ raise Exception("Could not find the schema to get a trigger.")
+ self.table_name = "table_trigger_%s" % (str(uuid.uuid4())[1:6])
+ self.table_id = tables_utils.create_table(self.server, self.db_name,
+ self.schema_name,
+ self.table_name)
+ self.func_name = "trigger_func_get_%s" % str(uuid.uuid4())[1:6]
+ self.function_info = \
+ trigger_funcs_utils.create_trigger_function_with_trigger(
+ self.server, self.db_name, self.schema_name, self.func_name)
+ self.trigger_name = "test_trigger_get_%s" % (str(uuid.uuid4())[1:6])
+ self.trigger_id = triggers_utils.create_trigger(self.server,
+ self.db_name,
+ self.schema_name,
+ self.table_name,
+ self.trigger_name,
+ self.func_name)
+
+ def runTest(self):
+ """This function will fetch trigger under table node."""
+ response = self.tester.get(
+ "{0}{1}/{2}/{3}/{4}/{5}/{6}".format(self.url, utils.SERVER_GROUP,
+ self.server_id, self.db_id,
+ self.schema_id, self.table_id,
+ self.trigger_id),
+ follow_redirects=True
+ )
+ self.assertEquals(response.status_code, 200)
+
+ def tearDown(self):
+ # Disconnect the database
+ database_utils.disconnect_database(self, self.server_id, self.db_id)
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/triggers/tests/test_triggers_put.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/triggers/tests/test_triggers_put.py
new file mode 100644
index 0000000..52c9d7e
--- /dev/null
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/triggers/tests/test_triggers_put.py
@@ -0,0 +1,87 @@
+# #################################################################
+#
+# pgAdmin 4 - PostgreSQL Tools
+#
+# Copyright (C) 2013 - 2016, The pgAdmin Development Team
+# This software is released under the PostgreSQL Licence
+#
+# ##################################################################
+import uuid
+import json
+
+from regression import test_utils as utils
+from regression import parent_node_dict
+from pgadmin.utils.route import BaseTestGenerator
+from pgadmin.browser.server_groups.servers.databases.tests import utils as \
+ database_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
+ utils as schema_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
+ import utils as tables_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.functions.tests \
+ import utils as trigger_funcs_utils
+from . import utils as triggers_utils
+
+
+class TriggersUpdateTestCase(BaseTestGenerator):
+ """This class will update trigger under table node."""
+ scenarios = [
+ ('Put trigger Node URL', dict(url='/browser/trigger/obj/'))
+ ]
+
+ def setUp(self):
+ self.db_name = parent_node_dict["database"][-1]["db_name"]
+ schema_info = parent_node_dict["schema"][-1]
+ self.server_id = schema_info["server_id"]
+ self.db_id = schema_info["db_id"]
+ db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
+ self.server_id, self.db_id)
+ if not db_con['data']["connected"]:
+ raise Exception(
+ "Could not connect to database to update a trigger.")
+ self.schema_id = schema_info["schema_id"]
+ self.schema_name = schema_info["schema_name"]
+ schema_response = schema_utils.verify_schemas(self.server,
+ self.db_name,
+ self.schema_name)
+ if not schema_response:
+ raise Exception("Could not find the schema to update a trigger.")
+ self.table_name = "table_trigger_%s" % (str(uuid.uuid4())[1:6])
+ self.table_id = tables_utils.create_table(self.server, self.db_name,
+ self.schema_name,
+ self.table_name)
+ self.func_name = "trigger_func_add_%s" % str(uuid.uuid4())[1:6]
+ self.function_info = \
+ trigger_funcs_utils.create_trigger_function_with_trigger(
+ self.server, self.db_name, self.schema_name, self.func_name)
+ self.trigger_name = "test_trigger_delete_%s" % (str(uuid.uuid4())[1:6])
+ self.trigger_id = triggers_utils.create_trigger(self.server,
+ self.db_name,
+ self.schema_name,
+ self.table_name,
+ self.trigger_name,
+ self.func_name)
+
+ def runTest(self):
+ """This function will update trigger under table node."""
+ trigger_response = triggers_utils.verify_trigger(self.server,
+ self.db_name,
+ self.trigger_name)
+ if not trigger_response:
+ raise Exception("Could not find the trigger to delete.")
+ data = {"id": self.trigger_id,
+ "description": "This is test comment."
+ }
+ response = self.tester.put(
+ "{0}{1}/{2}/{3}/{4}/{5}/{6}".format(self.url, utils.SERVER_GROUP,
+ self.server_id, self.db_id,
+ self.schema_id, self.table_id,
+ self.trigger_id),
+ data=json.dumps(data),
+ follow_redirects=True
+ )
+ self.assertEquals(response.status_code, 200)
+
+ def tearDown(self):
+ # Disconnect the database
+ database_utils.disconnect_database(self, self.server_id, self.db_id)
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/triggers/tests/utils.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/triggers/tests/utils.py
new file mode 100644
index 0000000..b53ca5f
--- /dev/null
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/tables/triggers/tests/utils.py
@@ -0,0 +1,90 @@
+# ##########################################################################
+#
+# #pgAdmin 4 - PostgreSQL Tools
+#
+# #Copyright (C) 2013 - 2016, The pgAdmin Development Team
+# #This software is released under the PostgreSQL Licence
+#
+# ##########################################################################
+from __future__ import print_function
+import traceback
+import sys
+
+from regression import test_utils as utils
+
+
+def create_trigger(server, db_name, schema_name, table_name, trigger_name,
+ trigger_func_name):
+ """
+ This function creates a column under provided table.
+ :param server: server details
+ :type server: dict
+ :param db_name: database name
+ :type db_name: str
+ :param schema_name: schema name
+ :type schema_name: str
+ :param table_name: table name
+ :type table_name: str
+ :param trigger_name: trigger name
+ :type trigger_name: str
+ :param trigger_func_name: trigger function name
+ :type trigger_func_name: str
+ :return trigger_id: trigger id
+ :rtype: int
+ """
+ try:
+ connection = utils.get_db_connection(db_name,
+ server['username'],
+ server['db_password'],
+ server['host'],
+ server['port'])
+ old_isolation_level = connection.isolation_level
+ connection.set_isolation_level(0)
+ pg_cursor = connection.cursor()
+ query = "CREATE TRIGGER %s BEFORE INSERT ON %s.%s FOR EACH ROW " \
+ "EXECUTE PROCEDURE %s.%s()" % (trigger_name, schema_name,
+ table_name, schema_name,
+ trigger_func_name)
+ pg_cursor.execute(query)
+ connection.set_isolation_level(old_isolation_level)
+ connection.commit()
+ pg_cursor.execute("SELECT oid FROM pg_trigger where tgname='%s'" %
+ trigger_name)
+ trigger = pg_cursor.fetchone()
+ trigger_id = ''
+ if trigger:
+ trigger_id = trigger[0]
+ connection.close()
+ return trigger_id
+ except Exception:
+ traceback.print_exc(file=sys.stderr)
+ raise
+
+
+def verify_trigger(server, db_name, trigger_name):
+ """
+ This function verifies table exist in database or not.
+ :param server: server details
+ :type server: dict
+ :param db_name: database name
+ :type db_name: str
+ :param trigger_name: column name
+ :type trigger_name: str
+ :return table: table record from database
+ :rtype: tuple
+ """
+ try:
+ connection = utils.get_db_connection(db_name,
+ server['username'],
+ server['db_password'],
+ server['host'],
+ server['port'])
+ pg_cursor = connection.cursor()
+ pg_cursor.execute("SELECT oid FROM pg_trigger where tgname='%s'" %
+ trigger_name)
+ trigger = pg_cursor.fetchone()
+ connection.close()
+ return trigger
+ except Exception:
+ traceback.print_exc(file=sys.stderr)
+ raise
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/types/tests/__init__.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/types/tests/__init__.py
new file mode 100644
index 0000000..4b2b82a
--- /dev/null
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/types/tests/__init__.py
@@ -0,0 +1,16 @@
+##########################################################################
+#
+# pgAdmin 4 - PostgreSQL Tools
+#
+# Copyright (C) 2013 - 2016, The pgAdmin Development Team
+# This software is released under the PostgreSQL Licence
+#
+##########################################################################
+
+from pgadmin.utils.route import BaseTestGenerator
+
+
+class TypesTestGenerator(BaseTestGenerator):
+
+ def runTest(self):
+ return []
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/types/tests/test_types_add.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/types/tests/test_types_add.py
new file mode 100644
index 0000000..7f93408
--- /dev/null
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/types/tests/test_types_add.py
@@ -0,0 +1,68 @@
+# #################################################################
+#
+# pgAdmin 4 - PostgreSQL Tools
+#
+# Copyright (C) 2013 - 2016, The pgAdmin Development Team
+# This software is released under the PostgreSQL Licence
+#
+# ##################################################################
+import json
+import uuid
+
+from regression import test_utils as utils
+from regression import parent_node_dict
+from pgadmin.utils.route import BaseTestGenerator
+from pgadmin.browser.server_groups.servers.databases.tests import utils as \
+ database_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
+ utils as schema_utils
+
+
+class TypesAddTestCase(BaseTestGenerator):
+ """ This class will add type under schema node. """
+ scenarios = [
+ ('Add type under schema node', dict(url='/browser/type/obj/'))
+ ]
+
+ def setUp(self):
+ self.db_name = parent_node_dict["database"][-1]["db_name"]
+ schema_info = parent_node_dict["schema"][-1]
+ self.server_id = schema_info["server_id"]
+ self.db_id = schema_info["db_id"]
+ db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
+ self.server_id, self.db_id)
+ if not db_con['data']["connected"]:
+ raise Exception("Could not connect to database to add a type.")
+ self.schema_id = schema_info["schema_id"]
+ self.schema_name = schema_info["schema_name"]
+ schema_response = schema_utils.verify_schemas(self.server,
+ self.db_name,
+ self.schema_name)
+ if not schema_response:
+ raise Exception("Could not find the schema to add a type.")
+
+ def runTest(self):
+ """ This function will add type under schema node. """
+ db_user = self.server["username"]
+ self.type_name = "test_type_add_%s" % (str(uuid.uuid4())[1:6])
+ data = {"name": self.type_name,
+ "is_sys_type": False,
+ "typtype": "c",
+ "typeowner": db_user,
+ "schema": self.schema_name,
+ "composite": [{"member_name": "one", "type": "abstime",
+ "is_tlength": False, "is_precision": False},
+ {"member_name": "two", "type": "\"char\"[]",
+ "is_tlength": False, "is_precision": False}],
+ "enum": [], "typacl": [], "seclabels": []}
+ response = self.tester.post(
+ self.url + str(utils.SERVER_GROUP) + '/' +
+ str(self.server_id) + '/' + str(self.db_id) +
+ '/' + str(self.schema_id) + '/',
+ data=json.dumps(data),
+ content_type='html/json')
+ self.assertEquals(response.status_code, 200)
+
+ def tearDown(self):
+ # Disconnect the database
+ database_utils.disconnect_database(self, self.server_id, self.db_id)
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/types/tests/test_types_delete.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/types/tests/test_types_delete.py
new file mode 100644
index 0000000..74cd3a2
--- /dev/null
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/types/tests/test_types_delete.py
@@ -0,0 +1,65 @@
+# #################################################################
+#
+# pgAdmin 4 - PostgreSQL Tools
+#
+# Copyright (C) 2013 - 2016, The pgAdmin Development Team
+# This software is released under the PostgreSQL Licence
+#
+# ##################################################################
+import uuid
+
+from regression import test_utils as utils
+from regression import parent_node_dict
+from pgadmin.utils.route import BaseTestGenerator
+from pgadmin.browser.server_groups.servers.databases.tests import utils as \
+ database_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
+ utils as schema_utils
+from . import utils as types_utils
+
+
+class TypesDeleteTestCase(BaseTestGenerator):
+ """ This class will delete type under schema node. """
+ scenarios = [
+ ('Delete type under schema node', dict(url='/browser/type/obj/'))
+ ]
+
+ def setUp(self):
+ self.db_name = parent_node_dict["database"][-1]["db_name"]
+ schema_info = parent_node_dict["schema"][-1]
+ self.server_id = schema_info["server_id"]
+ self.db_id = schema_info["db_id"]
+ db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
+ self.server_id, self.db_id)
+ if not db_con['data']["connected"]:
+ raise Exception("Could not connect to database to delete a type.")
+ self.schema_id = schema_info["schema_id"]
+ self.schema_name = schema_info["schema_name"]
+ schema_response = schema_utils.verify_schemas(self.server,
+ self.db_name,
+ self.schema_name)
+ if not schema_response:
+ raise Exception("Could not find the schema to delete a type.")
+ self.type_name = "test_type_delete_%s" % (str(uuid.uuid4())[1:6])
+ self.type_id = types_utils.create_type(self.server, self.db_name,
+ self.schema_name, self.type_name
+ )
+
+ def runTest(self):
+ """ This function will delete type under schema node. """
+ type_response = types_utils.verify_type(self.server, self.db_name,
+ self.type_name)
+ if not type_response:
+ raise Exception("Could not find the type to delete.")
+ response = self.tester.delete(
+ "{0}{1}/{2}/{3}/{4}/{5}".format(self.url, utils.SERVER_GROUP,
+ self.server_id, self.db_id,
+ self.schema_id, self.type_id
+ ),
+ follow_redirects=True
+ )
+ self.assertEquals(response.status_code, 200)
+
+ def tearDown(self):
+ # Disconnect the database
+ database_utils.disconnect_database(self, self.server_id, self.db_id)
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/types/tests/test_types_get.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/types/tests/test_types_get.py
new file mode 100644
index 0000000..61c2117
--- /dev/null
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/types/tests/test_types_get.py
@@ -0,0 +1,61 @@
+# #################################################################
+#
+# pgAdmin 4 - PostgreSQL Tools
+#
+# Copyright (C) 2013 - 2016, The pgAdmin Development Team
+# This software is released under the PostgreSQL Licence
+#
+# ##################################################################
+import uuid
+
+from regression import test_utils as utils
+from regression import parent_node_dict
+from pgadmin.utils.route import BaseTestGenerator
+from pgadmin.browser.server_groups.servers.databases.tests import utils as \
+ database_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
+ utils as schema_utils
+from . import utils as types_utils
+
+
+class TypesGetTestCase(BaseTestGenerator):
+ """ This class will get the type under schema node. """
+ scenarios = [
+ ('Get type under schema node', dict(url='/browser/type/obj/'))
+ ]
+
+ def setUp(self):
+ self.db_name = parent_node_dict["database"][-1]["db_name"]
+ schema_info = parent_node_dict["schema"][-1]
+ self.server_id = schema_info["server_id"]
+ self.db_id = schema_info["db_id"]
+ db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
+ self.server_id, self.db_id)
+ if not db_con['data']["connected"]:
+ raise Exception("Could not connect to database to get a type.")
+ self.schema_id = schema_info["schema_id"]
+ self.schema_name = schema_info["schema_name"]
+ schema_response = schema_utils.verify_schemas(self.server,
+ self.db_name,
+ self.schema_name)
+ if not schema_response:
+ raise Exception("Could not find the schema to get a type.")
+ self.type_name = "test_type_get_%s" % (str(uuid.uuid4())[1:6])
+ self.type_id = types_utils.create_type(self.server, self.db_name,
+ self.schema_name, self.type_name
+ )
+
+ def runTest(self):
+ """ This function will get a type under schema node. """
+ response = self.tester.get(
+ "{0}{1}/{2}/{3}/{4}/{5}".format(self.url, utils.SERVER_GROUP,
+ self.server_id, self.db_id,
+ self.schema_id, self.type_id
+ ),
+ follow_redirects=True
+ )
+ self.assertEquals(response.status_code, 200)
+
+ def tearDown(self):
+ # Disconnect the database
+ database_utils.disconnect_database(self, self.server_id, self.db_id)
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/types/tests/test_types_put.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/types/tests/test_types_put.py
new file mode 100644
index 0000000..30a9d0a
--- /dev/null
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/types/tests/test_types_put.py
@@ -0,0 +1,68 @@
+# #################################################################
+#
+# pgAdmin 4 - PostgreSQL Tools
+#
+# Copyright (C) 2013 - 2016, The pgAdmin Development Team
+# This software is released under the PostgreSQL Licence
+#
+# ##################################################################
+import uuid
+import json
+
+from regression import test_utils as utils
+from regression import parent_node_dict
+from pgadmin.utils.route import BaseTestGenerator
+from pgadmin.browser.server_groups.servers.databases.tests import utils as \
+ database_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
+ utils as schema_utils
+from . import utils as types_utils
+
+
+class TypesUpdateTestCase(BaseTestGenerator):
+ """ This class will update type under schema node. """
+ scenarios = [
+ ('Update type under schema node', dict(url='/browser/type/obj/'))
+ ]
+
+ def setUp(self):
+ self.db_name = parent_node_dict["database"][-1]["db_name"]
+ schema_info = parent_node_dict["schema"][-1]
+ self.server_id = schema_info["server_id"]
+ self.db_id = schema_info["db_id"]
+ db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
+ self.server_id, self.db_id)
+ if not db_con['data']["connected"]:
+ raise Exception("Could not connect to database to update a type.")
+ self.schema_id = schema_info["schema_id"]
+ self.schema_name = schema_info["schema_name"]
+ schema_response = schema_utils.verify_schemas(self.server,
+ self.db_name,
+ self.schema_name)
+ if not schema_response:
+ raise Exception("Could not find the schema to update a type.")
+ self.type_name = "test_type_put_%s" % (str(uuid.uuid4())[1:6])
+ self.type_id = types_utils.create_type(self.server, self.db_name,
+ self.schema_name, self.type_name
+ )
+
+ def runTest(self):
+ """ This function will update type under schema node. """
+ type_response = types_utils.verify_type(self.server, self.db_name,
+ self.type_name)
+ if not type_response:
+ raise Exception("Could not find the type to update.")
+ data = {"id": self.type_id,
+ "description": "this is test comment."}
+ response = self.tester.put(
+ "{0}{1}/{2}/{3}/{4}/{5}".format(self.url, utils.SERVER_GROUP,
+ self.server_id, self.db_id,
+ self.schema_id, self.type_id
+ ),
+ data=json.dumps(data),
+ follow_redirects=True)
+ self.assertEquals(response.status_code, 200)
+
+ def tearDown(self):
+ # Disconnect the database
+ database_utils.disconnect_database(self, self.server_id, self.db_id)
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/types/tests/utils.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/types/tests/utils.py
new file mode 100644
index 0000000..d5fd44d
--- /dev/null
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/types/tests/utils.py
@@ -0,0 +1,82 @@
+# ##########################################################################
+#
+# #pgAdmin 4 - PostgreSQL Tools
+#
+# #Copyright (C) 2013 - 2016, The pgAdmin Development Team
+# #This software is released under the PostgreSQL Licence
+#
+# ##########################################################################
+from __future__ import print_function
+import traceback
+import sys
+
+from regression import test_utils as utils
+
+
+def create_type(server, db_name, schema_name, type_name):
+ """
+ This function creates a type under provided schema.
+ :param server: server details
+ :type server: dict
+ :param db_name: database name
+ :type db_name: str
+ :param schema_name: schema name
+ :type schema_name: str
+ :param type_name: type name
+ :type type_name: str
+ :return type_id: type id
+ :rtype: int
+ """
+ try:
+ connection = utils.get_db_connection(db_name,
+ server['username'],
+ server['db_password'],
+ server['host'],
+ server['port'])
+ old_isolation_level = connection.isolation_level
+ connection.set_isolation_level(0)
+ pg_cursor = connection.cursor()
+ query = 'CREATE TYPE %s.%s AS (one "char", two "char"[]); ' \
+ 'ALTER TYPE %s.%s OWNER TO %s' % (schema_name, type_name,
+ schema_name, type_name,
+ server['username'])
+ pg_cursor.execute(query)
+ connection.set_isolation_level(old_isolation_level)
+ connection.commit()
+ # Get 'oid' from newly created type
+ pg_cursor.execute(
+ "select oid from pg_type where typname='%s'" % type_name)
+ schema_type = pg_cursor.fetchone()
+ type_id = schema_type[0]
+ connection.close()
+ return type_id
+ except Exception:
+ traceback.print_exc(file=sys.stderr)
+
+
+def verify_type(server, db_name, type_name):
+ """
+ This function verifies type exist in database or not.
+ :param server: server details
+ :type server: dict
+ :param db_name: database name
+ :type db_name: str
+ :param type_name: type name
+ :type type_name: str
+ :return schema_type: type record from database
+ :rtype: tuple
+ """
+ try:
+ connection = utils.get_db_connection(db_name,
+ server['username'],
+ server['db_password'],
+ server['host'],
+ server['port'])
+ pg_cursor = connection.cursor()
+ pg_cursor.execute(
+ "select oid from pg_type where typname='%s'" % type_name)
+ schema_type = pg_cursor.fetchone()
+ connection.close()
+ return schema_type
+ except Exception:
+ traceback.print_exc(file=sys.stderr)
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/views/tests/__init__.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/views/tests/__init__.py
new file mode 100644
index 0000000..af4485e
--- /dev/null
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/views/tests/__init__.py
@@ -0,0 +1,16 @@
+##########################################################################
+#
+# pgAdmin 4 - PostgreSQL Tools
+#
+# Copyright (C) 2013 - 2016, The pgAdmin Development Team
+# This software is released under the PostgreSQL Licence
+#
+##########################################################################
+
+from pgadmin.utils.route import BaseTestGenerator
+
+
+class ViewsTestGenerator(BaseTestGenerator):
+
+ def generate_tests(self):
+ return
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/views/tests/test_views_add.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/views/tests/test_views_add.py
new file mode 100644
index 0000000..ab1ee58
--- /dev/null
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/views/tests/test_views_add.py
@@ -0,0 +1,95 @@
+# #################################################################
+#
+# pgAdmin 4 - PostgreSQL Tools
+#
+# Copyright (C) 2013 - 2016, The pgAdmin Development Team
+# This software is released under the PostgreSQL Licence
+#
+# ##################################################################
+import uuid
+import json
+
+from regression import test_utils as utils
+from regression import parent_node_dict
+from pgadmin.utils.route import BaseTestGenerator
+from pgadmin.browser.server_groups.servers.databases.tests import utils as \
+ database_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
+ utils as schema_utils
+
+
+class ViewsAddTestCase(BaseTestGenerator):
+ """This class will add new view under schema node."""
+ view_name = "test_view_add_%s" % (str(uuid.uuid4())[1:6])
+ v_data = {"schema": "",
+ "owner": "",
+ "datacl": [],
+ "seclabels": [],
+ "name": view_name,
+ "definition": "SELECT 'Hello World';"
+ }
+ m_view_name = "test_mview_add_%s" % (str(uuid.uuid4())[1:6])
+ m_view_data = {"spcname": "pg_default",
+ "toast_autovacuum_enabled": False,
+ "autovacuum_enabled": False,
+ "schema": "",
+ "owner": "",
+ "vacuum_table": [
+ {"name": "autovacuum_analyze_scale_factor"},
+ {"name": "autovacuum_analyze_threshold"},
+ {"name": "autovacuum_freeze_max_age"},
+ {"name": "autovacuum_vacuum_cost_delay"},
+ {"name": "autovacuum_vacuum_cost_limit"},
+ {"name": "autovacuum_vacuum_scale_factor"},
+ {"name": "autovacuum_vacuum_threshold"},
+ {"name": "autovacuum_freeze_min_age"},
+ {"name": "autovacuum_freeze_table_age"}],
+ "vacuum_toast": [{"name": "autovacuum_freeze_max_age"},
+ {"name": "autovacuum_vacuum_cost_delay"},
+ {"name": "autovacuum_vacuum_cost_limit"},
+ {"name": "autovacuum_vacuum_scale_factor"},
+ {"name": "autovacuum_vacuum_threshold"},
+ {"name": "autovacuum_freeze_min_age"},
+ {"name": "autovacuum_freeze_table_age"}],
+ "datacl": [],
+ "seclabels": [],
+ "name": m_view_name,
+ "definition": "SELECT 'test_pgadmin';"}
+ scenarios = [
+ ('Add view under schema node', dict(url='/browser/view/obj/',
+ data=v_data)),
+ ('Add materialized view under schema node',
+ dict(url='/browser/mview/obj/', data=m_view_data))
+ ]
+
+ def setUp(self):
+ self.db_name = parent_node_dict["database"][-1]["db_name"]
+ schema_info = parent_node_dict["schema"][-1]
+ self.server_id = schema_info["server_id"]
+ self.db_id = schema_info["db_id"]
+ db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
+ self.server_id, self.db_id)
+ if not db_con['data']["connected"]:
+ raise Exception("Could not connect to database to add view.")
+ self.schema_id = schema_info["schema_id"]
+ self.schema_name = schema_info["schema_name"]
+ schema_response = schema_utils.verify_schemas(self.server,
+ self.db_name,
+ self.schema_name)
+ if not schema_response:
+ raise Exception("Could not find the schema to add the view.")
+
+ def runTest(self):
+ """This function will add view under schema node."""
+ db_user = self.server["username"]
+ self.data["schema"] = self.schema_name
+ self.data["owner"] = db_user
+ response = self.tester.post(
+ self.url + str(utils.SERVER_GROUP) + '/' + str(self.server_id)
+ + '/' + str(self.db_id) + '/' + str(self.schema_id) + '/',
+ data=json.dumps(self.data), content_type='html/json')
+ self.assertEquals(response.status_code, 200)
+
+ def tearDown(self):
+ # Disconnect the database
+ database_utils.disconnect_database(self, self.server_id, self.db_id)
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/views/tests/test_views_delete.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/views/tests/test_views_delete.py
new file mode 100644
index 0000000..85c27e5
--- /dev/null
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/views/tests/test_views_delete.py
@@ -0,0 +1,79 @@
+# #################################################################
+#
+# pgAdmin 4 - PostgreSQL Tools
+#
+# Copyright (C) 2013 - 2016, The pgAdmin Development Team
+# This software is released under the PostgreSQL Licence
+#
+# ##################################################################
+import uuid
+
+from regression import test_utils as utils
+from regression import parent_node_dict
+from pgadmin.utils.route import BaseTestGenerator
+from pgadmin.browser.server_groups.servers.databases.tests import utils as \
+ database_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
+ utils as schema_utils
+from . import utils as views_utils
+
+
+class ViewsDeleteTestCase(BaseTestGenerator):
+ """This class will delete the view/mview under schema node."""
+ view_sql = "CREATE OR REPLACE VIEW %s.%s AS SELECT 'Hello World'; " \
+ "ALTER TABLE %s.%s OWNER TO %s"
+ m_view_sql = "CREATE MATERIALIZED VIEW %s.%s TABLESPACE pg_default AS " \
+ "SELECT 'test_pgadmin' WITH NO DATA;ALTER TABLE %s.%s OWNER" \
+ " TO %s"
+ scenarios = [
+ ('Delete view under schema node', dict(url='/browser/view/obj/',
+ view_name=
+ "test_view_delete_%s" %
+ (str(uuid.uuid4())[1:6]),
+ sql_query=view_sql)),
+ ('Delete materialized view under schema node',
+ dict(url='/browser/mview/obj/',
+ view_name="test_mview_delete_%s" % (str(uuid.uuid4())[1:6]),
+ sql_query=m_view_sql))
+ ]
+
+ def setUp(self):
+ self.db_name = parent_node_dict["database"][-1]["db_name"]
+ schema_info = parent_node_dict["schema"][-1]
+ self.server_id = schema_info["server_id"]
+ self.db_id = schema_info["db_id"]
+ db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
+ self.server_id, self.db_id)
+ if not db_con['data']["connected"]:
+ raise Exception("Could not connect to database to delete view.")
+ self.schema_id = schema_info["schema_id"]
+ self.schema_name = schema_info["schema_name"]
+ schema_response = schema_utils.verify_schemas(self.server,
+ self.db_name,
+ self.schema_name)
+ if not schema_response:
+ raise Exception("Could not find the schema to delete the view.")
+ self.view_id = views_utils.create_view(self.server,
+ self.db_name,
+ self.schema_name,
+ self.sql_query,
+ self.view_name)
+
+ def runTest(self):
+ """This function will delete the view/mview under schema node."""
+ view_response = views_utils.verify_view(self.server, self.db_name,
+ self.view_name)
+ if not view_response:
+ raise Exception("Could not find the view to delete.")
+ response = self.tester.delete(
+ "{0}{1}/{2}/{3}/{4}/{5}".format(self.url, utils.SERVER_GROUP,
+ self.server_id, self.db_id,
+ self.schema_id, self.view_id
+ ),
+ follow_redirects=True
+ )
+ self.assertEquals(response.status_code, 200)
+
+ def tearDown(self):
+ # Disconnect the database
+ database_utils.disconnect_database(self, self.server_id, self.db_id)
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/views/tests/test_views_get.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/views/tests/test_views_get.py
new file mode 100644
index 0000000..cddba1b
--- /dev/null
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/views/tests/test_views_get.py
@@ -0,0 +1,75 @@
+# #################################################################
+#
+# pgAdmin 4 - PostgreSQL Tools
+#
+# Copyright (C) 2013 - 2016, The pgAdmin Development Team
+# This software is released under the PostgreSQL Licence
+#
+# ##################################################################
+import uuid
+
+from regression import test_utils as utils
+from regression import parent_node_dict
+from pgadmin.utils.route import BaseTestGenerator
+from pgadmin.browser.server_groups.servers.databases.tests import utils as \
+ database_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
+ utils as schema_utils
+from . import utils as views_utils
+
+
+class ViewsGetTestCase(BaseTestGenerator):
+ """This class will fetch the view under schema node."""
+ view_sql = "CREATE OR REPLACE VIEW %s.%s AS SELECT 'Hello World'; " \
+ "ALTER TABLE %s.%s OWNER TO %s"
+ m_view_sql = "CREATE MATERIALIZED VIEW %s.%s TABLESPACE pg_default AS " \
+ "SELECT 'test_pgadmin' WITH NO DATA;ALTER TABLE %s.%s OWNER" \
+ " TO %s"
+ scenarios = [
+ ('Get view under schema node', dict(url='/browser/view/obj/',
+ view_name=
+ "test_view_get_%s" %
+ (str(uuid.uuid4())[1:6]),
+ sql_query=view_sql)),
+ ('Get materialized view under schema node',
+ dict(url='/browser/mview/obj/',
+ view_name="test_mview_get_%s" % (str(uuid.uuid4())[1:6]),
+ sql_query=m_view_sql))
+ ]
+
+ def setUp(self):
+ self.db_name = parent_node_dict["database"][-1]["db_name"]
+ schema_info = parent_node_dict["schema"][-1]
+ self.server_id = schema_info["server_id"]
+ self.db_id = schema_info["db_id"]
+ db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
+ self.server_id, self.db_id)
+ if not db_con['data']["connected"]:
+ raise Exception("Could not connect to database to fetch the view.")
+ self.schema_id = schema_info["schema_id"]
+ self.schema_name = schema_info["schema_name"]
+ schema_response = schema_utils.verify_schemas(self.server,
+ self.db_name,
+ self.schema_name)
+ if not schema_response:
+ raise Exception("Could not find the schema to fetch the view.")
+ self.view_id = views_utils.create_view(self.server,
+ self.db_name,
+ self.schema_name,
+ self.sql_query,
+ self.view_name)
+
+ def runTest(self):
+ """This function will fetch the view/mview under schema node."""
+ response = self.tester.get(
+ "{0}{1}/{2}/{3}/{4}/{5}".format(self.url, utils.SERVER_GROUP,
+ self.server_id, self.db_id,
+ self.schema_id, self.view_id
+ ),
+ follow_redirects=True
+ )
+ self.assertEquals(response.status_code, 200)
+
+ def tearDown(self):
+ # Disconnect the database
+ database_utils.disconnect_database(self, self.server_id, self.db_id)
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/views/tests/test_views_put.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/views/tests/test_views_put.py
new file mode 100644
index 0000000..e89bd91
--- /dev/null
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/views/tests/test_views_put.py
@@ -0,0 +1,83 @@
+# #################################################################
+#
+# pgAdmin 4 - PostgreSQL Tools
+#
+# Copyright (C) 2013 - 2016, The pgAdmin Development Team
+# This software is released under the PostgreSQL Licence
+#
+# ##################################################################
+import uuid
+import json
+
+from regression import test_utils as utils
+from regression import parent_node_dict
+from pgadmin.utils.route import BaseTestGenerator
+from pgadmin.browser.server_groups.servers.databases.tests import utils as \
+ database_utils
+from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
+ utils as schema_utils
+from . import utils as views_utils
+
+
+class ViewsUpdateTestCase(BaseTestGenerator):
+ """This class will update the view/mview under schema node."""
+ view_sql = "CREATE OR REPLACE VIEW %s.%s AS SELECT 'Hello World'; " \
+ "ALTER TABLE %s.%s OWNER TO %s"
+ m_view_sql = "CREATE MATERIALIZED VIEW %s.%s TABLESPACE pg_default AS " \
+ "SELECT 'test_pgadmin' WITH NO DATA;ALTER TABLE %s.%s OWNER" \
+ " TO %s"
+ scenarios = [
+ ('Update view under schema node', dict(url='/browser/view/obj/',
+ view_name=
+ "test_view_put_%s" %
+ (str(uuid.uuid4())[1:6]),
+ sql_query=view_sql)),
+ ('Update materialized view under schema node',
+ dict(url='/browser/mview/obj/',
+ view_name="test_mview_put_%s" % (str(uuid.uuid4())[1:6]),
+ sql_query=m_view_sql))
+ ]
+
+ def setUp(self):
+ self.db_name = parent_node_dict["database"][-1]["db_name"]
+ schema_info = parent_node_dict["schema"][-1]
+ self.server_id = schema_info["server_id"]
+ self.db_id = schema_info["db_id"]
+ db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
+ self.server_id, self.db_id)
+ if not db_con['data']["connected"]:
+ raise Exception("Could not connect to database to update a view.")
+ self.schema_id = schema_info["schema_id"]
+ self.schema_name = schema_info["schema_name"]
+ schema_response = schema_utils.verify_schemas(self.server,
+ self.db_name,
+ self.schema_name)
+ if not schema_response:
+ raise Exception("Could not find the schema to update a view.")
+ self.view_id = views_utils.create_view(self.server,
+ self.db_name,
+ self.schema_name,
+ self.sql_query,
+ self.view_name)
+
+ def runTest(self):
+ """This function will update the view/mview under schema node."""
+ view_response = views_utils.verify_view(self.server, self.db_name,
+ self.view_name)
+ if not view_response:
+ raise Exception("Could not find the view to update.")
+ data = {"id": self.view_id,
+ "comment": "This is test comment"
+ }
+ response = self.tester.put(
+ "{0}{1}/{2}/{3}/{4}/{5}".format(self.url, utils.SERVER_GROUP,
+ self.server_id, self.db_id,
+ self.schema_id, self.view_id
+ ),
+ data=json.dumps(data),
+ follow_redirects=True)
+ self.assertEquals(response.status_code, 200)
+
+ def tearDown(self):
+ # Disconnect the database
+ database_utils.disconnect_database(self, self.server_id, self.db_id)
diff --git a/web/pgadmin/browser/server_groups/servers/databases/schemas/views/tests/utils.py b/web/pgadmin/browser/server_groups/servers/databases/schemas/views/tests/utils.py
new file mode 100644
index 0000000..b981f8a
--- /dev/null
+++ b/web/pgadmin/browser/server_groups/servers/databases/schemas/views/tests/utils.py
@@ -0,0 +1,84 @@
+# ##########################################################################
+#
+# #pgAdmin 4 - PostgreSQL Tools
+#
+# #Copyright (C) 2013 - 2016, The pgAdmin Development Team
+# #This software is released under the PostgreSQL Licence
+#
+# ##########################################################################
+from __future__ import print_function
+import traceback
+import sys
+
+from regression import test_utils as utils
+
+
+def create_view(server, db_name, schema_name, sql_query, view_name):
+ """
+ This function creates a table under provided schema.
+ :param server: server details
+ :type server: dict
+ :param db_name: database name
+ :type db_name: str
+ :param schema_name: schema name
+ :type schema_name: str
+ :param sql_query: sql query to create view
+ :type sql_query: str
+ :param view_name: view name
+ :type view_name: str
+ :return view_id: view id
+ :rtype: int
+ """
+ try:
+ connection = utils.get_db_connection(db_name,
+ server['username'],
+ server['db_password'],
+ server['host'],
+ server['port'])
+ old_isolation_level = connection.isolation_level
+ connection.set_isolation_level(0)
+ pg_cursor = connection.cursor()
+ query = sql_query % (schema_name, view_name, schema_name, view_name,
+ server['username'])
+ pg_cursor.execute(query)
+ connection.set_isolation_level(old_isolation_level)
+ connection.commit()
+ # Get 'oid' from newly created view
+ pg_cursor.execute("select oid from pg_class where relname='%s'" %
+ view_name)
+ view = pg_cursor.fetchone()
+ view_id = view[0]
+ connection.close()
+ return view_id
+ except Exception:
+ traceback.print_exc(file=sys.stderr)
+ raise
+
+
+def verify_view(server, db_name, view_name):
+ """
+ This function verifies view exist in database or not.
+ :param server: server details
+ :type server: dict
+ :param db_name: database name
+ :type db_name: str
+ :param view_name: view name
+ :type view_name: str
+ :return table: table record from database
+ :rtype: tuple
+ """
+ try:
+ connection = utils.get_db_connection(db_name,
+ server['username'],
+ server['db_password'],
+ server['host'],
+ server['port'])
+ pg_cursor = connection.cursor()
+ pg_cursor.execute("select oid from pg_class where relname='%s'" %
+ view_name)
+ view = pg_cursor.fetchone()
+ connection.close()
+ return view
+ except Exception:
+ traceback.print_exc(file=sys.stderr)
+ raise
diff --git a/web/pgadmin/browser/server_groups/servers/databases/tests/utils.py b/web/pgadmin/browser/server_groups/servers/databases/tests/utils.py
index f50bca0..1e0f66b 100644
--- a/web/pgadmin/browser/server_groups/servers/databases/tests/utils.py
+++ b/web/pgadmin/browser/server_groups/servers/databases/tests/utils.py
@@ -136,7 +136,7 @@ def connect_database(self, server_group, server_id, db_id):
db_con = self.tester.post('{0}{1}/{2}/{3}'.format(
DATABASE_CONNECT_URL, server_group, server_id, db_id),
follow_redirects=True)
- self.assertEquals(db_con.status_code, 200)
+ assert db_con.status_code == 200
db_con = json.loads(db_con.data.decode('utf-8'))
return db_con
@@ -146,4 +146,4 @@ def disconnect_database(self, server_id, db_id):
db_con = self.tester.delete('{0}{1}/{2}/{3}'.format(
'browser/database/connect/', utils.SERVER_GROUP, server_id, db_id),
follow_redirects=True)
- self.assertEquals(db_con.status_code, 200)
+ assert db_con.status_code == 200
view thread (2+ messages) latest in thread
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: [email protected]
Cc: [email protected], [email protected], [email protected]
Subject: Re: pgAdmin IV: API test cases for Tables, Types and Views
In-Reply-To: <CAOAJCYqHQxKtmLgK==Y2o3r12sJj7_ykXuiVKf+6rGmDGkYbZQ@mail.gmail.com>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox