diff --git a/web/pgadmin/feature_tests/test_data.json b/web/pgadmin/feature_tests/test_data.json new file mode 100644 index 0000000..7100048 --- /dev/null +++ b/web/pgadmin/feature_tests/test_data.json @@ -0,0 +1,23 @@ +{ + "table_insert_update_cases": { + "insert": { + "insert_with_defaults": { + "id": "1", + "data_default_nulls": "abc123", + "data_default_no_nulls": "def456" + } + }, + "update": { + "update_with_null_value": { + "id": "3", + "data_default_nulls": "", + "data_nulls": "" + }, + "update_with_empty_string": { + "id": "2", + "data_default_nulls": "''", + "data_nulls": "''" + } + } + } +} \ No newline at end of file diff --git a/web/pgadmin/feature_tests/view_data_dml_queries.py b/web/pgadmin/feature_tests/view_data_dml_queries.py new file mode 100644 index 0000000..5c45071 --- /dev/null +++ b/web/pgadmin/feature_tests/view_data_dml_queries.py @@ -0,0 +1,301 @@ +########################################################################## +# +# pgAdmin 4 - PostgreSQL Tools +# +# Copyright (C) 2013 - 2017, The pgAdmin Development Team +# This software is released under the PostgreSQL Licence +# +########################################################################## + +import json +import os +from selenium.webdriver import ActionChains +from regression.python_test_utils import test_utils +from regression.feature_utils.base_feature_test import BaseFeatureTest +import time +from selenium.webdriver.common.keys import Keys + + +CURRENT_PATH = os.path.dirname(os.path.realpath(__file__)) + +try: + with open(CURRENT_PATH + '/test_data.json') as data_file: + config_data = json.load(data_file)['table_insert_update_cases'] +except Exception as e: + print(str(e)) + + +class CheckForViewDataTest(BaseFeatureTest): + """ + Test cases to validate insert, update operations in table + with input test data + + First of all, the test data is inserted/updated into table and then + inserted data is compared with original data to check if expected data + is returned from table or not. + + We will cover test cases for, + 1) Insert with default values + 2) Update with null values + 3) Update with blank string + 4) Delete table row + """ + + scenarios = [ + ("Validate Insert, Update operations in View data with given test " + "data", + dict()) + ] + + # To create column id with nextval, first a sequence must be created. + create_sequence = """ +CREATE SEQUENCE public.defaults_id_seq + INCREMENT 1 + START 9 + MINVALUE 1 + MAXVALUE 9223372036854775807 + CACHE 1; + +ALTER SEQUENCE public.defaults_id_seq + OWNER TO postgres; + """ + + # query for creating 'defaults' table + create_table_query = """ +CREATE TABLE public.defaults +( + id bigint NOT NULL DEFAULT nextval('defaults_id_seq'::regclass), + data_default_nulls text COLLATE pg_catalog."default" DEFAULT 'abc123'::text, + data_default_no_nulls text COLLATE pg_catalog."default" NOT NULL +DEFAULT 'def456'::text, + data_nulls text COLLATE pg_catalog."default", + data_no_nulls text COLLATE pg_catalog."default" NOT NULL, + CONSTRAINT defaults_pkey PRIMARY KEY (id) +) + """ + + def before(self): + connection = test_utils.get_db_connection(self.server['db'], + self.server['username'], + self.server['db_password'], + self.server['host'], + self.server['port']) + test_utils.drop_database(connection, "acceptance_test_db") + test_utils.create_database(self.server, "acceptance_test_db") + test_utils.create_table_with_query( + self.server, + "acceptance_test_db", + CheckForViewDataTest.create_sequence) + + test_utils.create_table_with_query( + self.server, + "acceptance_test_db", + CheckForViewDataTest.create_table_query) + + def runTest(self): + self.page.wait_for_spinner_to_disappear() + self._connects_to_server() + self._tables_node_expandable() + + # Open Object -> View data + self._check_xss_in_view_data() + + # Run test to insert a new row in table with default values + self._insert_row_in_table() + + # Run test to update a row in table with null values + self._update_row_in_table() + + # Run test case to remove existed row + self._remove_row() + + def after(self): + time.sleep(1) + self.page.remove_server(self.server) + connection = test_utils.get_db_connection(self.server['db'], + self.server['username'], + self.server['db_password'], + self.server['host'], + self.server['port']) + test_utils.drop_database(connection, "acceptance_test_db") + + def _connects_to_server(self): + self.page.find_by_xpath("//*[@class='aciTreeText' and .='Servers']").click() + self.page.driver.find_element_by_link_text("Object").click() + ActionChains(self.page.driver) \ + .move_to_element(self.page.driver.find_element_by_link_text("Create")) \ + .perform() + self.page.find_by_partial_link_text("Server...").click() + + server_config = self.server + self.page.fill_input_by_field_name("name", server_config['name']) + self.page.find_by_partial_link_text("Connection").click() + self.page.fill_input_by_field_name("host", server_config['host']) + self.page.fill_input_by_field_name("port", server_config['port']) + self.page.fill_input_by_field_name("username", server_config['username']) + self.page.fill_input_by_field_name("password", server_config['db_password']) + self.page.find_by_xpath("//button[contains(.,'Save')]").click() + + def _tables_node_expandable(self): + self.page.toggle_open_tree_item(self.server['name']) + self.page.toggle_open_tree_item('Databases') + self.page.toggle_open_tree_item('acceptance_test_db') + self.page.toggle_open_tree_item('Schemas') + self.page.toggle_open_tree_item('public') + self.page.toggle_open_tree_item('Tables') + self.page.select_tree_item("defaults") + + def _check_xss_in_view_data(self): + self.page.driver.find_element_by_link_text("Object").click() + ActionChains(self.page.driver) \ + .move_to_element( + self.page.driver.find_element_by_link_text("View Data")) \ + .perform() + self.page.find_by_partial_link_text("View All Rows").click() + time.sleep(3) + self.page.driver.switch_to.frame( + self.page.driver.find_element_by_tag_name('iframe') + ) + + def _insert_row_in_table(self): + xpath_col1 = "//div[contains(@class, 'new-row')]//div[" \ + "contains(@class, 'r1')]" + time.sleep(1) + new_row = self.page.find_by_xpath(xpath_col1) + new_row.click() + field = new_row.find_element_by_xpath(".//input") + field.click() + ActionChains(self.driver).send_keys( + config_data['insert']['insert_with_defaults']['id'] + ).perform() + field.send_keys(Keys.TAB) + self.page.find_by_id("btn-save").click() + self._verify_insert_data() + + def _verify_insert_data(self): + time.sleep(0.5) + self.page.find_by_id("btn-flash").click() + time.sleep(1) + main_el = self.page.find_by_xpath('//*[@id="datagrid"]') + cell1 = main_el.find_element_by_xpath( + './/div[contains(@class, "r1")]//span' + ).get_attribute('innerHTML') + cell2 = main_el.find_element_by_xpath( + './/div[contains(@class, "r2")]' + ).get_attribute('innerHTML') + cell3 = main_el.find_element_by_xpath( + './/div[contains(@class, "r3")]' + ).get_attribute('innerHTML') + + test_verify_data = config_data['insert']['insert_with_defaults'] + + # compare updated cell values with original values + self.assertEquals(cell1, test_verify_data['id']) + self.assertEquals(cell2, test_verify_data['data_default_nulls']) + self.assertEquals(cell3, test_verify_data['data_default_no_nulls']) + + def _update_row_in_table(self): + xpath_cell1 = "//div[contains(@class, 'even')]//div[" \ + "contains(@class, 'r1')]" + xpath_cell2 = "//div[contains(@class, 'even')]//div[" \ + "contains(@class, 'r2')]" + xpath_cell3 = "//div[contains(@class, 'even')]//div[" \ + "contains(@class, 'r4')]" + for value in config_data['update']: + cell1 = config_data['update'][value]['id'] + cell2 = config_data['update'][value]['data_default_nulls'] + cell3 = config_data['update'][value]['data_nulls'] + + # Search cell 1 and update with given data + time.sleep(2) + cell1_el = self.page.find_by_xpath(xpath_cell1) + ActionChains(self.driver).move_to_element(cell1_el).double_click( + cell1_el + ).perform() + field = cell1_el.find_element_by_xpath(".//input") + field.clear() + field.click() + ActionChains(self.driver).send_keys(cell1).perform() + field.send_keys(Keys.TAB) # Press tab key + + # Search cell 2 and update with given data + cell2_el = self.page.find_by_xpath(xpath_cell2) + ActionChains(self.driver).move_to_element(cell2_el).double_click( + cell2_el).perform() + field = self.page.driver.find_element_by_css_selector( + "div[style*='z-index: 1000'] textarea" + ) + field.clear() + field.click() + time.sleep(1) + ActionChains(self.driver).send_keys(cell2).perform() + time.sleep(1) + self.page.driver.find_element_by_css_selector( + "div[style*='z-index: 1000'] div button:first-child" + ).click() # Click on editor's Save button + + # Search cell 3 and update with given data + cell3_el = self.page.find_by_xpath(xpath_cell3) + ActionChains(self.driver).move_to_element(cell3_el).double_click( + cell3_el).perform() + field = self.page.driver.find_element_by_css_selector( + "div[style*='z-index: 1000'] textarea" + ) + field.clear() + field.click() + time.sleep(0.5) + ActionChains(self.driver).send_keys(cell3).perform() + time.sleep(0.5) + self.page.driver.find_element_by_css_selector( + "div[style*='z-index: 1000'] div button:first-child" + ).click() # Click on editor's Save button + self.page.find_by_id("btn-save").click() # Save data + self._verify_update_data(value) # Verify updated data with original + + def _verify_update_data(self, test_case): + time.sleep(0.5) + self.page.find_by_id("btn-flash").click() + time.sleep(1) + main_el = self.page.find_by_xpath('//*[@id="datagrid"]') + cell1 = main_el.find_element_by_xpath( + './/div[contains(@class, "r1")]//span' + ).get_attribute('innerHTML') + + from selenium.common.exceptions import NoSuchElementException + try: + cell2 = main_el.find_element_by_xpath( + './/div[contains(@class, "r2")]//span' + ).get_attribute('innerHTML') + except NoSuchElementException: + # if [null] not found, then it is an empty string + cell2 = "''" + + try: + cell4 = main_el.find_element_by_xpath( + './/div[contains(@class, "r4")]//span' + ).get_attribute('innerHTML') + except NoSuchElementException: + # if [null] not found, then it is an empty string + cell4 = "''" + + # Handle cells having [null] values + # If cell has [null] replace it with '' single quotes + cell1 = '' if cell1 == '[null]' else cell1 + cell2 = '' if cell2 == '[null]' else cell2 + cell4 = '' if cell4 == '[null]' else cell4 + + test_verify_data = config_data['update'][test_case] + + # compare updated cell values with original values + self.assertEquals(cell1, test_verify_data['id']) + self.assertEquals(cell2, test_verify_data['data_default_nulls']) + self.assertEquals(cell4, test_verify_data['data_nulls']) + + def _remove_row(self): + xpath_col0 = "//div[contains(@class, 'ui-widget')]//div[" \ + "contains(@class, 'r0')]" + self.page.find_by_xpath(xpath_col0).click() + time.sleep(1) + self.page.find_by_xpath('//*[@id="btn-toolbar"]//button[' + '@id="btn-delete-row"]').click() + self.page.click_modal_ok('Yes') diff --git a/web/regression/feature_utils/pgadmin_page.py b/web/regression/feature_utils/pgadmin_page.py index f5d0ac7..4bb7d0f 100644 --- a/web/regression/feature_utils/pgadmin_page.py +++ b/web/regression/feature_utils/pgadmin_page.py @@ -33,11 +33,11 @@ class PgadminPage: self.click_modal_ok() self.wait_for_reloading_indicator_to_disappear() - def click_modal_ok(self): + def click_modal_ok(self, btn_label='OK'): time.sleep(0.5) # Find active alertify dialog in case of multiple alertify dialog & click on that dialog self.click_element( - self.find_by_xpath("//div[contains(@class, 'alertify') and not(contains(@class, 'ajs-hidden'))]//button[.='OK']") + self.find_by_xpath("//*//div[contains(@class, 'alertify') and not(contains(@class, 'ajs-hidden'))]//button[.='{0}']".format(btn_label)) ) def add_server(self, server_config): diff --git a/web/regression/python_test_utils/test_utils.py b/web/regression/python_test_utils/test_utils.py index 2b7c695..c50dd31 100644 --- a/web/regression/python_test_utils/test_utils.py +++ b/web/regression/python_test_utils/test_utils.py @@ -172,6 +172,35 @@ def create_table(server, db_name, table_name): except Exception: traceback.print_exc(file=sys.stderr) + +def create_table_with_query(server, db_name, query): + """ + This function create the table in given database name + :param server: server details + :type server: dict + :param db_name: database name + :type db_name: str + :param query: create table query + :type query: str + :return: None + """ + try: + connection = get_db_connection(db_name, + server['username'], + server['db_password'], + server['host'], + server['port']) + old_isolation_level = connection.isolation_level + connection.set_isolation_level(0) + pg_cursor = connection.cursor() + pg_cursor.execute(query) + connection.set_isolation_level(old_isolation_level) + connection.commit() + + except Exception: + traceback.print_exc(file=sys.stderr) + + def create_constraint( server, db_name, table_name, constraint_type="unique", constraint_name="test_unique"): @@ -274,7 +303,6 @@ def drop_database(connection, database_name): connection.commit() connection.close() - def drop_tablespace(connection): """This function used to drop the tablespace""" pg_cursor = connection.cursor()