diff --git a/web/pgadmin/utils/driver/abstract.py b/web/pgadmin/utils/driver/abstract.py index 8a23ccb..5369251 100644 --- a/web/pgadmin/utils/driver/abstract.py +++ b/web/pgadmin/utils/driver/abstract.py @@ -81,21 +81,21 @@ class BaseConnection(object): - Define this method to connect the server using that particular driver implementation. - * execute_scalar(query, params) + * execute_scalar(query, params, formatted_exception_msg) - Implement this method to execute the given query and returns single datum result. - * execute_async(query, params) + * execute_async(query, params, formatted_exception_msg) - Implement this method to execute the given query asynchronously and returns result. - * execute_void(query, params) + * execute_void(query, params, formatted_exception_msg) - Implement this method to execute the given query with no result. - * execute_2darray(query, params) + * execute_2darray(query, params, formatted_exception_msg) - Implement this method to execute the given query and returns the result as a 2 dimensional array. - * execute_dict(query, params) + * execute_dict(query, params, formatted_exception_msg) - Implement this method to execute the given query and returns the result as an array of dict (column name -> value) format. @@ -131,7 +131,7 @@ class BaseConnection(object): - Implement this method to wait for asynchronous connection with timeout. This must be a non blocking call. - * poll() + * poll(formatted_exception_msg) - Implement this method to poll the data of query running on asynchronous connection. @@ -147,29 +147,30 @@ class BaseConnection(object): ASYNC_READ_TIMEOUT = 2 ASYNC_WRITE_TIMEOUT = 3 ASYNC_NOT_CONNECTED = 4 + ASYNC_EXECUTION_ABORTED = 5 @abstractmethod def connect(self, **kwargs): pass @abstractmethod - def execute_scalar(self, query, params=None): + def execute_scalar(self, query, params=None, formatted_exception_msg=False): pass @abstractmethod - def execute_async(self, query, params=None): + def execute_async(self, query, params=None, formatted_exception_msg=True): pass @abstractmethod - def execute_void(self, query, params=None): + def execute_void(self, query, params=None, formatted_exception_msg=False): pass @abstractmethod - def execute_2darray(self, query, params=None): + def execute_2darray(self, query, params=None, formatted_exception_msg=False): pass @abstractmethod - def execute_dict(self, query, params=None): + def execute_dict(self, query, params=None, formatted_exception_msg=False): pass @abstractmethod @@ -201,7 +202,7 @@ class BaseConnection(object): pass @abstractmethod - def poll(self): + def poll(self, formatted_exception_msg=True): pass @abstractmethod diff --git a/web/pgadmin/utils/driver/psycopg2/__init__.py b/web/pgadmin/utils/driver/psycopg2/__init__.py index db938bc..87b5f53 100644 --- a/web/pgadmin/utils/driver/psycopg2/__init__.py +++ b/web/pgadmin/utils/driver/psycopg2/__init__.py @@ -35,7 +35,7 @@ from .keywords import ScanKeyword _ = gettext -ASYNC_WAIT_TIMEOUT = 0.1 # in seconds or 100 milliseconds +ASYNC_WAIT_TIMEOUT = 0.01 # in seconds or 10 milliseconds class Connection(BaseConnection): @@ -50,20 +50,20 @@ class Connection(BaseConnection): * connect(**kwargs) - Connect the PostgreSQL/Postgres Plus servers using the psycopg2 driver - * execute_scalar(query, params) + * execute_scalar(query, params, formatted_exception_msg) - Execute the given query and returns single datum result - * execute_async(query, params) + * execute_async(query, params, formatted_exception_msg) - Execute the given query asynchronously and returns result. - * execute_void(query, params) + * execute_void(query, params, formatted_exception_msg) - Execute the given query with no result. - * execute_2darray(query, params) + * execute_2darray(query, params, formatted_exception_msg) - Execute the given query and returns the result as a 2 dimensional array. - * execute_dict(query, params) + * execute_dict(query, params, formatted_exception_msg) - Execute the given query and returns the result as an array of dict (column name -> value) format. @@ -91,7 +91,7 @@ class Connection(BaseConnection): - This method is used to wait for asynchronous connection with timeout. This is a non blocking call. - * poll() + * poll(formatted_exception_msg) - This method is used to poll the data of query running on asynchronous connection. @@ -102,6 +102,12 @@ class Connection(BaseConnection): * messages() - Returns the list of messages/notices sends from the PostgreSQL database server. + + * _formatted_exception_msg(exception_obj, formatted_msg) + - This method is used to parse the psycopg2.Error object and returns the + formatted error message if flag is set to true else return + normal error message. + """ def __init__(self, manager, conn_id, db, auto_reconnect=True, async=0): assert(manager is not None) @@ -116,6 +122,7 @@ class Connection(BaseConnection): self.__async_cursor = None self.__async_query_id = None self.__backend_pid = None + self.execution_aborted = False super(Connection, self).__init__() @@ -224,6 +231,7 @@ Failed to connect to the database server(#{server_id}) for connection ({conn_id} self.conn = pg_conn self.__backend_pid = pg_conn.get_backend_pid() + self.execution_aborted = False # autocommit flag does not work with asynchronous connections. # By default asynchronous connection runs in autocommit mode. @@ -409,7 +417,7 @@ Attempt to reconnect it failed with the below error: if self.async == 1: self._wait(cur.connection) - def execute_scalar(self, query, params=None): + def execute_scalar(self, query, params=None, formatted_exception_msg=False): status, cur = self.__cursor() if not status: @@ -428,7 +436,7 @@ Attempt to reconnect it failed with the below error: self.__internal_blocking_execute(cur, query, params) except psycopg2.Error as pe: cur.close() - errmsg = str(pe) + errmsg = self._formatted_exception_msg(pe, formatted_exception_msg) current_app.logger.error( "Failed to execute query (execute_scalar) for the server #{server_id} - {conn_id} (Query-id: {query_id}):\nError Message:{errmsg}".format( server_id=self.manager.sid, @@ -447,13 +455,14 @@ Attempt to reconnect it failed with the below error: return True, None - def execute_async(self, query, params=None): + def execute_async(self, query, params=None, formatted_exception_msg=True): """ This function executes the given query asynchronously and returns result. Args: query: SQL query to run. params: extra parameters to the function + formatted_exception_msg: if True then function return the formatted exception message """ status, cur = self.__cursor() @@ -472,10 +481,11 @@ Execute (async) for server #{server_id} - {conn_id} (Query-id: {query_id}):\n{qu ) try: + self.execution_aborted = False cur.execute(query, params) res = self._wait_timeout(cur.connection, ASYNC_WAIT_TIMEOUT) except psycopg2.Error as pe: - errmsg = str(pe) + errmsg = self._formatted_exception_msg(pe, formatted_exception_msg) current_app.logger.error(""" Failed to execute query (execute_async) for the server #{server_id} - {conn_id} (Query-id: {query_id}):\nError Message:{errmsg} @@ -494,13 +504,14 @@ Failed to execute query (execute_async) for the server #{server_id} - {conn_id} return True, res - def execute_void(self, query, params=None): + def execute_void(self, query, params=None, formatted_exception_msg=False): """ This function executes the given query with no result. Args: query: SQL query to run. params: extra parameters to the function + formatted_exception_msg: if True then function return the formatted exception message """ status, cur = self.__cursor() @@ -522,7 +533,7 @@ Execute (void) for server #{server_id} - {conn_id} (Query-id: {query_id}):\n{que self.__internal_blocking_execute(cur, query, params) except psycopg2.Error as pe: cur.close() - errmsg = str(pe) + errmsg = self._formatted_exception_msg(pe, formatted_exception_msg) current_app.logger.error(""" Failed to execute query (execute_void) for the server #{server_id} - {conn_id} (Query-id: {query_id}):\nError Message:{errmsg} @@ -538,7 +549,7 @@ Failed to execute query (execute_void) for the server #{server_id} - {conn_id} return True, None - def execute_2darray(self, query, params=None): + def execute_2darray(self, query, params=None, formatted_exception_msg=False): status, cur = self.__cursor() if not status: @@ -557,7 +568,7 @@ Failed to execute query (execute_void) for the server #{server_id} - {conn_id} self.__internal_blocking_execute(cur, query, params) except psycopg2.Error as pe: cur.close() - errmsg = str(pe) + errmsg = self._formatted_exception_msg(pe, formatted_exception_msg) current_app.logger.error( "Failed to execute query (execute_2darray) for the server #{server_id} - {conn_id} (Query-id: {query_id}):\nError Message:{errmsg}".format( server_id=self.manager.sid, @@ -582,7 +593,7 @@ Failed to execute query (execute_void) for the server #{server_id} - {conn_id} return True, {'columns': columns, 'rows': rows} - def execute_dict(self, query, params=None): + def execute_dict(self, query, params=None, formatted_exception_msg=False): status, cur = self.__cursor() if not status: @@ -600,7 +611,7 @@ Failed to execute query (execute_void) for the server #{server_id} - {conn_id} self.__internal_blocking_execute(cur, query, params) except psycopg2.Error as pe: cur.close() - errmsg = str(pe) + errmsg = self._formatted_exception_msg(pe, formatted_exception_msg) current_app.logger.error( "Failed to execute query (execute_dict) for the server #{server_id}- {conn_id} (Query-id: {query_id}):\nError Message:{errmsg}".format( server_id=self.manager.sid, @@ -728,20 +739,26 @@ Failed to reset the connection of the server due to following error: if state == psycopg2.extensions.POLL_OK: return self.ASYNC_OK elif state == psycopg2.extensions.POLL_WRITE: - select.select([], [conn.fileno()], [], time) - return self.ASYNC_WRITE_TIMEOUT + if select.select([], [conn.fileno()], [], time) == ([], [], []): + return self.ASYNC_WRITE_TIMEOUT + # Call recursively if no timeout + self._wait_timeout(conn, time) elif state == psycopg2.extensions.POLL_READ: - select.select([conn.fileno()], [], [], time) - return self.ASYNC_READ_TIMEOUT + if select.select([conn.fileno()], [], [], time) == ([], [], []): + return self.ASYNC_READ_TIMEOUT + self._wait_timeout(conn, time) else: raise psycopg2.OperationalError("poll() returned %s from _wait_timeout function" % state) - def poll(self): + def poll(self, formatted_exception_msg=False): """ This function is a wrapper around connection's poll function. It internally uses the _wait_timeout method to poll the result on the connection object. In case of success it returns the result of the query. + + Args: + formatted_exception_msg: if True then function return the formatted exception message """ cur = self.__async_cursor @@ -751,19 +768,37 @@ Failed to reset the connection of the server due to following error: current_app.logger.log(25, """ Polling result for (Query-id: {query_id})""".format(query_id=self.__async_query_id)) - status = self._wait_timeout(self.conn, ASYNC_WAIT_TIMEOUT) + try: + status = self._wait_timeout(self.conn, ASYNC_WAIT_TIMEOUT) + except psycopg2.Error as pe: + errmsg = self._formatted_exception_msg(pe, formatted_exception_msg) + return False, errmsg, None + colinfo = None if status == self.ASYNC_OK: + + # if user has cancelled the transaction then changed the status + if self.execution_aborted: + status = self.ASYNC_EXECUTION_ABORTED + self.execution_aborted = False + return status, None, colinfo + # Fetch the column information - colinfo = [desc for desc in cur.description] + if cur.description is not None: + colinfo = [desc for desc in cur.description] + result = cur.statusmessage if cur.rowcount > 0: result = [] - # Fetch the data rows. - for row in cur: - result.append(dict(row)) - self.__async_cursor = None - return status, result, colinfo + + try: + for row in cur: + result.append(dict(row)) + except psycopg2.ProgrammingError as e: + result = cur.statusmessage + self.__async_cursor = None + return status, result, colinfo + return status, None, colinfo def cancel_transaction(self, conn_id, did=None): @@ -823,6 +858,9 @@ Polling result for (Query-id: {query_id})""".format(query_id=self.__async_query_ else: if self.connected(): status, msg = self.execute_void(query) + + if status: + cancel_conn.execution_aborted = True else: status = False msg = gettext("Not connected to the database server.") @@ -835,6 +873,69 @@ Polling result for (Query-id: {query_id})""".format(query_id=self.__async_query_ """ return self.conn.notices if self.conn else [] + def _formatted_exception_msg(self, exception_obj, formatted_msg): + """ + This method is used to parse the psycopg2.Error object and returns the + formatted error message if flag is set to true else return + normal error message. + + Args: + exception_obj: exception object + formatted_msg: if True then function return the formatted exception message + + """ + + if exception_obj.pgerror: + errmsg = exception_obj.pgerror + elif exception_obj.diag.message_detail: + errmsg = exception_obj.diag.message_detail + else: + errmsg = str(exception_obj) + + # if formatted_msg is false then return from the function + if not formatted_msg: + return errmsg + + errmsg += '********** Error **********\n\n' + + if exception_obj.diag.severity is not None \ + and exception_obj.diag.message_primary is not None: + errmsg += exception_obj.diag.severity + ": " + exception_obj.diag.message_primary + elif exception_obj.diag.message_primary is not None: + errmsg += exception_obj.diag.message_primary + + if exception_obj.diag.sqlstate is not None: + if not errmsg[:-1].endswith('\n'): + errmsg += '\n' + errmsg += gettext('SQL state: ') + errmsg += exception_obj.diag.sqlstate + + if exception_obj.diag.message_detail is not None: + if not errmsg[:-1].endswith('\n'): + errmsg += '\n' + errmsg += gettext('Detail: ') + errmsg += exception_obj.diag.message_detail + + if exception_obj.diag.message_hint is not None: + if not errmsg[:-1].endswith('\n'): + errmsg += '\n' + errmsg += gettext('Hint: ') + errmsg += exception_obj.diag.message_hint + + if exception_obj.diag.statement_position is not None: + if not errmsg[:-1].endswith('\n'): + errmsg += '\n' + errmsg += gettext('Character: ') + errmsg += exception_obj.diag.statement_position + + if exception_obj.diag.context is not None: + if not errmsg[:-1].endswith('\n'): + errmsg += '\n' + errmsg += gettext('Context: ') + errmsg += exception_obj.diag.context + + return errmsg + class ServerManager(object): """