public inbox for [email protected]  
help / color / mirror / Atom feed
[pgAdmin][SonarQube] Reduce cognitive complexity
9+ messages / 2 participants
[nested] [flat]

* [pgAdmin][SonarQube] Reduce cognitive complexity
@ 2020-09-01 10:04  Aditya Toshniwal <[email protected]>
  0 siblings, 1 reply; 9+ messages in thread

From: Aditya Toshniwal @ 2020-09-01 10:04 UTC (permalink / raw)
  To: pgadmin-hackers

Hi Hackers,

Attached patch reduces cognitive complexity as below:

web/.../server_groups/servers/pgagent/__init__.py - 23 to 15
web/pgadmin/utils/driver/psycopg2/__init__.py - 23 to 15, 17 to 15
web/setup.py 77 to 15

Please review.

-- 
Thanks,
Aditya Toshniwal
pgAdmin hacker | Sr. Software Engineer | *edbpostgres.com*
<http://edbpostgres.com;
"Don't Complain about Heat, Plant a TREE"


Attachments:

  [application/octet-stream] sonarqube.pgagent.patch (15.3K, 3-sonarqube.pgagent.patch)
  download | inline diff:
diff --git a/web/pgadmin/browser/server_groups/servers/pgagent/__init__.py b/web/pgadmin/browser/server_groups/servers/pgagent/__init__.py
index be175cda3..c30852c37 100644
--- a/web/pgadmin/browser/server_groups/servers/pgagent/__init__.py
+++ b/web/pgadmin/browser/server_groups/servers/pgagent/__init__.py
@@ -25,7 +25,7 @@ from pgadmin.utils.ajax import make_json_response, internal_server_error, \
 from pgadmin.utils.driver import get_driver
 from pgadmin.utils.preferences import Preferences
 from pgadmin.browser.server_groups.servers.pgagent.utils \
-    import format_schedule_data
+    import format_schedule_data, format_step_data
 
 
 class JobModule(CollectionNodeModule):
@@ -566,39 +566,22 @@ SELECT EXISTS(
         :return:
         """
         # Format the schedule data. Convert the boolean array
-        for key in ['added', 'changed']:
-            jschedules = data.get('jschedules', {})
-            if key in jschedules:
-                for schedule in jschedules.get(key, []):
-                    format_schedule_data(schedule)
+        jschedules = data.get('jschedules', {})
+        if type(jschedules) == dict:
+            for schedule in jschedules.get('added', []):
+                format_schedule_data(schedule)
+            for schedule in jschedules.get('changed', []):
+                format_schedule_data(schedule)
 
         has_connection_str = self.manager.db_info['pgAgent']['has_connstr']
         jssteps = data.get('jsteps', {})
-        if 'changed' in jschedules:
+        if type(jssteps) == dict:
             for changed_step in jssteps.get('changed', []):
-                if 'jstconntype' not in changed_step and \
-                    ('jstdbname' in changed_step or
-                     'jstconnstr' in changed_step) and has_connection_str:
-                    status, rset = self.conn.execute_dict(
-                        render_template(
-                            "/".join([self.template_path, 'steps.sql']),
-                            jid=data['jobid'],
-                            jstid=changed_step['jstid'],
-                            conn=self.conn,
-                            has_connstr=has_connection_str
-                        )
-                    )
-                    if not status:
-                        return internal_server_error(errormsg=rset)
-
-                    row = rset['rows'][0]
-                    changed_step['jstconntype'] = row['jstconntype']
-                    if row['jstconntype']:
-                        changed_step['jstdbname'] = changed_step.get(
-                            'jstdbname', row['jstdbname'])
-                    else:
-                        changed_step['jstconnstr'] = changed_step.get(
-                            'jstconnstr', row['jstconnstr'])
+                status, res = format_step_data(
+                    data['jobid'], changed_step, has_connection_str,
+                    self.conn, self.template_path)
+                if not status:
+                    internal_server_error(errormsg=res)
 
 
 JobView.register_node_view(blueprint)
diff --git a/web/pgadmin/browser/server_groups/servers/pgagent/utils.py b/web/pgadmin/browser/server_groups/servers/pgagent/utils.py
index 72701ef3c..2c6fcdc37 100644
--- a/web/pgadmin/browser/server_groups/servers/pgagent/utils.py
+++ b/web/pgadmin/browser/server_groups/servers/pgagent/utils.py
@@ -8,6 +8,7 @@
 ##########################################################################
 
 """pgagent helper utilities"""
+from flask import render_template
 
 
 def format_boolean_array(value):
@@ -44,3 +45,40 @@ def format_schedule_data(data):
         data['jscmonths'] = format_boolean_array(data['jscmonths'])
 
     return data
+
+
+def format_step_data(job_id, data, has_connection_str, conn, template_path):
+    """
+    This function is used to format the step data. If data is not an
+    instance of list then format
+    :param job_id: Job ID
+    :param data: a step data
+    :param has_connection_str: has pgagent connection str
+    :param conn: Connection obj
+    :param conn: SQL template path
+    """
+    if 'jstconntype' not in data and \
+        ('jstdbname' in data or
+         'jstconnstr' in data) and has_connection_str:
+        status, rset = conn.execute_dict(
+            render_template(
+                "/".join([template_path, 'steps.sql']),
+                jid=job_id,
+                jstid=data['jstid'],
+                conn=conn,
+                has_connstr=has_connection_str
+            )
+        )
+        if not status:
+            return False, rset
+
+        row = rset['rows'][0]
+        data['jstconntype'] = row['jstconntype']
+        if row['jstconntype']:
+            data['jstdbname'] = data.get(
+                'jstdbname', row['jstdbname'])
+        else:
+            data['jstconnstr'] = data.get(
+                'jstconnstr', row['jstconnstr'])
+
+    return True, None
diff --git a/web/pgadmin/utils/driver/psycopg2/__init__.py b/web/pgadmin/utils/driver/psycopg2/__init__.py
index 2edbd64cd..6f61b81fe 100644
--- a/web/pgadmin/utils/driver/psycopg2/__init__.py
+++ b/web/pgadmin/utils/driver/psycopg2/__init__.py
@@ -14,6 +14,7 @@ object.
 
 """
 import datetime
+import re
 from flask import session
 from flask_login import current_user
 from werkzeug.exceptions import InternalServerError
@@ -64,6 +65,28 @@ class Driver(BaseDriver):
 
         super(Driver, self).__init__()
 
+    def _restore_connections_from_session(self):
+        """
+        Used internally by connection_manager to restore connections
+        from sessions.
+        """
+        if session.sid not in self.managers:
+            self.managers[session.sid] = managers = dict()
+            if '__pgsql_server_managers' in session:
+                session_managers = \
+                    session['__pgsql_server_managers'].copy()
+                for server in \
+                    Server.query.filter_by(
+                        user_id=current_user.id):
+                    manager = managers[str(server.id)] = \
+                        ServerManager(server)
+                    if server.id in session_managers:
+                        manager._restore(session_managers[server.id])
+                        manager.update_session()
+            return managers
+
+        return {}
+
     def connection_manager(self, sid=None):
         """
         connection_manager(...)
@@ -86,20 +109,7 @@ class Driver(BaseDriver):
             with connection_restore_lock:
                 # The wait is over but the object might have been loaded
                 # by some other thread check again
-                if session.sid not in self.managers:
-                    self.managers[session.sid] = managers = dict()
-                    if '__pgsql_server_managers' in session:
-                        session_managers =\
-                            session['__pgsql_server_managers'].copy()
-                        for server in \
-                                Server.query.filter_by(
-                                    user_id=current_user.id):
-                            manager = managers[str(server.id)] =\
-                                ServerManager(server)
-                            if server.id in session_managers:
-                                manager._restore(session_managers[server.id])
-                                manager.update_session()
-
+                managers = self._restore_connections_from_session()
         else:
             managers = self.managers[session.sid]
             if str(sid) in managers:
@@ -331,10 +341,8 @@ class Driver(BaseDriver):
         if '0' <= val_noarray[0] <= '9':
             return True
 
-        for c in val_noarray:
-            if (not ('a' <= c <= 'z') and c != '_' and
-                    not ('0' <= c <= '9')):
-                return True
+        if re.search('[^a-z_0-9]+', val_noarray):
+            return True
 
         # check string is keywaord or not
         category = Driver.ScanKeywordExtraLookup(value)
diff --git a/web/setup.py b/web/setup.py
index c88c4e1fa..e9c42decd 100644
--- a/web/setup.py
+++ b/web/setup.py
@@ -154,6 +154,45 @@ def dump_servers(args):
               (servers_dumped, args.dump_servers))
 
 
+def _validate_servers_data(data):
+    """
+    Used internally by load_servers to validate servers data.
+    :param data: servers data
+    :return: error message if any
+    """
+    # Loop through the servers...
+    if "Servers" not in data:
+        return ("'Servers' attribute not found in file '%s'" %
+                args.load_servers)
+
+    for server in data["Servers"]:
+        obj = data["Servers"][server]
+
+        def check_attrib(attrib):
+            if attrib not in obj:
+                return ("'%s' attribute not found for server '%s'" %
+                        (attrib, server))
+
+        check_attrib("Name")
+        check_attrib("Group")
+
+        is_service_attrib_available = obj.get("Service", None) is not None
+
+        if not is_service_attrib_available:
+            check_attrib("Port")
+            check_attrib("Username")
+
+        check_attrib("SSLMode")
+        check_attrib("MaintenanceDB")
+
+        if "Host" not in obj and "HostAddr" not in obj and not \
+                is_service_attrib_available:
+            return ("'Host', 'HostAddr' or 'Service' attribute "
+                    "not found for server '%s'" % server)
+
+    return None
+
+
 def load_servers(args):
     """Load server groups and servers.
 
@@ -162,10 +201,7 @@ def load_servers(args):
     """
 
     # What user?
-    if args.user is not None:
-        load_user = args.user
-    else:
-        load_user = config.DESKTOP_USER
+    load_user = args.user if args.user is not None else config.DESKTOP_USER
 
     # And the sqlite path
     if args.sqlite_path is not None:
@@ -212,48 +248,19 @@ def load_servers(args):
         def print_summary():
             print("Added %d Server Group(s) and %d Server(s)." %
                   (groups_added, servers_added))
-        # Loop through the servers...
-        if "Servers" not in data:
-            print("'Servers' attribute not found in file '%s'" %
-                  args.load_servers)
+
+        err_msg = _validate_servers_data(data)
+        if err_msg is not None:
+            print(err_msg)
             print_summary()
             sys.exit(1)
 
         for server in data["Servers"]:
             obj = data["Servers"][server]
 
-            def check_attrib(attrib):
-                if attrib not in obj:
-                    print("'%s' attribute not found for server '%s'" %
-                          (attrib, server))
-                    print_summary()
-                    sys.exit(1)
-
-            check_attrib("Name")
-            check_attrib("Group")
-
-            is_service_attrib_available = True if "Service" in obj else False
-
-            if not is_service_attrib_available:
-                check_attrib("Port")
-                check_attrib("Username")
-
-            check_attrib("SSLMode")
-            check_attrib("MaintenanceDB")
-
-            if "Host" not in obj and "HostAddr" not in obj:
-                if is_service_attrib_available is False:
-                    print("'Host', 'HostAddr' or 'Service' attribute "
-                          "not found for server '%s'" % server)
-                    print_summary()
-                    sys.exit(1)
-
             # Get the group. Create if necessary
-            group_id = -1
-            for g in groups:
-                if g.name == obj["Group"]:
-                    group_id = g.id
-                    break
+            group_id = next(
+                (g.id for g in groups if g.name == obj["Group"]), -1)
 
             if group_id == -1:
                 new_group = ServerGroup()
@@ -281,71 +288,52 @@ def load_servers(args):
             new_server.ssl_mode = obj["SSLMode"]
             new_server.maintenance_db = obj["MaintenanceDB"]
 
-            if "Host" in obj:
-                new_server.host = obj["Host"]
+            new_server.host = obj.get("Host", None)
+
+            new_server.hostaddr = obj.get("HostAddr", None)
 
-            if "HostAddr" in obj:
-                new_server.hostaddr = obj["HostAddr"]
+            new_server.port = obj.get("Port", None)
 
-            if "Port" in obj:
-                new_server.port = obj["Port"]
+            new_server.username = obj.get("Username", None)
 
-            if "Username" in obj:
-                new_server.username = obj["Username"]
+            new_server.role = obj.get("Role", None)
 
-            if "Role" in obj:
-                new_server.role = obj["Role"]
+            new_server.ssl_mode = obj["SSLMode"]
 
-            if "Comment" in obj:
-                new_server.comment = obj["Comment"]
+            new_server.comment = obj.get("Comment", None)
 
-            if "DBRestriction" in obj:
-                new_server.db_res = obj["DBRestriction"]
+            new_server.db_res = obj.get("DBRestriction", None)
 
-            if "PassFile" in obj:
-                new_server.passfile = obj["PassFile"]
+            new_server.passfile = obj.get("PassFile", None)
 
-            if "SSLCert" in obj:
-                new_server.sslcert = obj["SSLCert"]
+            new_server.sslcert = obj.get("SSLCert", None)
 
-            if "SSLKey" in obj:
-                new_server.sslkey = obj["SSLKey"]
+            new_server.sslkey = obj.get("SSLKey", None)
 
-            if "SSLRootCert" in obj:
-                new_server.sslrootcert = obj["SSLRootCert"]
+            new_server.sslrootcert = obj.get("SSLRootCert", None)
 
-            if "SSLCrl" in obj:
-                new_server.sslcrl = obj["SSLCrl"]
+            new_server.sslcrl = obj.get("SSLCrl", None)
 
-            if "SSLCompression" in obj:
-                new_server.sslcompression = obj["SSLCompression"]
+            new_server.sslcompression = obj.get("SSLCompression", None)
 
-            if "BGColor" in obj:
-                new_server.bgcolor = obj["BGColor"]
+            new_server.bgcolor = obj.get("BGColor", None)
 
-            if "FGColor" in obj:
-                new_server.fgcolor = obj["FGColor"]
+            new_server.fgcolor = obj.get("FGColor", None)
 
-            if is_service_attrib_available:
-                new_server.service = obj["Service"]
+            new_server.service = obj.get("Service", None)
 
-            if "Timeout" in obj:
-                new_server.connect_timeout = obj["Timeout"]
+            new_server.connect_timeout = obj.get("Timeout", None)
 
-            if "UseSSHTunnel" in obj:
-                new_server.use_ssh_tunnel = obj["UseSSHTunnel"]
+            new_server.use_ssh_tunnel = obj.get("UseSSHTunnel", None)
 
-            if "TunnelHost" in obj:
-                new_server.tunnel_host = obj["TunnelHost"]
+            new_server.tunnel_host = obj.get("TunnelHost", None)
 
-            if "TunnelPort" in obj:
-                new_server.tunnel_port = obj["TunnelPort"]
+            new_server.tunnel_port = obj.get("TunnelPort", None)
 
-            if "TunnelUsername" in obj:
-                new_server.tunnel_username = obj["TunnelUsername"]
+            new_server.tunnel_username = obj.get("TunnelUsername", None)
 
-            if "TunnelAuthentication" in obj:
-                new_server.tunnel_authentication = obj["TunnelAuthentication"]
+            new_server.tunnel_authentication = \
+                obj.get("TunnelAuthentication", None)
 
             db.session.add(new_server)
 


^ permalink  raw  reply  [nested|flat] 9+ messages in thread

* Re: [pgAdmin][SonarQube] Reduce cognitive complexity
@ 2020-09-02 12:00  Akshay Joshi <[email protected]>
  parent: Aditya Toshniwal <[email protected]>
  0 siblings, 0 replies; 9+ messages in thread

From: Akshay Joshi @ 2020-09-02 12:00 UTC (permalink / raw)
  To: Aditya Toshniwal <[email protected]>; +Cc: pgadmin-hackers

Thanks, patch applied.

On Tue, Sep 1, 2020 at 3:34 PM Aditya Toshniwal <
[email protected]> wrote:

> Hi Hackers,
>
> Attached patch reduces cognitive complexity as below:
>
> web/.../server_groups/servers/pgagent/__init__.py - 23 to 15
> web/pgadmin/utils/driver/psycopg2/__init__.py - 23 to 15, 17 to 15
> web/setup.py 77 to 15
>
> Please review.
>
> --
> Thanks,
> Aditya Toshniwal
> pgAdmin hacker | Sr. Software Engineer | *edbpostgres.com*
> <http://edbpostgres.com;
> "Don't Complain about Heat, Plant a TREE"
>


-- 
*Thanks & Regards*
*Akshay Joshi*
*pgAdmin Hacker | Sr. Software Architect*
*EDB Postgres <http://edbpostgres.com>*

*Mobile: +91 976-788-8246*


^ permalink  raw  reply  [nested|flat] 9+ messages in thread

* [pgAdmin][SonarQube] Reduce cognitive complexity
@ 2020-09-03 10:08  Aditya Toshniwal <[email protected]>
  0 siblings, 1 reply; 9+ messages in thread

From: Aditya Toshniwal @ 2020-09-03 10:08 UTC (permalink / raw)
  To: pgadmin-hackers

Hi Hackers,

Attached is the patch to reduce cognitive complexity as below:
web/pgadmin/misc/file_manager/__init__.py 20 to 15
web/pgadmin/tools/grant_wizard/__init__.py 55 to 15
web/pgadmin/tools/sqleditor/__init__.py 33 to 15

Please review.

-- 
Thanks,
Aditya Toshniwal
pgAdmin hacker | Sr. Software Engineer | *edbpostgres.com*
<http://edbpostgres.com;
"Don't Complain about Heat, Plant a TREE"


Attachments:

  [application/octet-stream] sonarqube.grant.patch (17.0K, 3-sonarqube.grant.patch)
  download | inline diff:
diff --git a/web/pgadmin/misc/file_manager/__init__.py b/web/pgadmin/misc/file_manager/__init__.py
index 955f98073..a66880d2c 100644
--- a/web/pgadmin/misc/file_manager/__init__.py
+++ b/web/pgadmin/misc/file_manager/__init__.py
@@ -672,30 +672,29 @@ class Filemanager(object):
 
     @staticmethod
     def check_access_permission(in_dir, path):
+        if not config.SERVER_MODE:
+            return
 
-        if config.SERVER_MODE:
-            if in_dir is None:
-                in_dir = ""
-            orig_path = Filemanager.get_abs_path(in_dir, path)
+        in_dir = '' if in_dir is None else in_dir
+        orig_path = Filemanager.get_abs_path(in_dir, path)
 
-            # This translates path with relative path notations
-            # like ./ and ../ to absolute path.
-            orig_path = os.path.abspath(orig_path)
+        # This translates path with relative path notations
+        # like ./ and ../ to absolute path.
+        orig_path = os.path.abspath(orig_path)
 
-            if in_dir:
-                if _platform == 'win32':
-                    if in_dir[-1] == '\\' or in_dir[-1] == '/':
-                        in_dir = in_dir[:-1]
-                else:
-                    if in_dir[-1] == '/':
-                        in_dir = in_dir[:-1]
-
-            # Do not allow user to access outside his storage dir
-            # in server mode.
-            if not orig_path.startswith(in_dir):
-                raise InternalServerError(
-                    gettext("Access denied ({0})").format(path))
-        return True
+        if in_dir:
+            if _platform == 'win32':
+                if in_dir[-1] == '\\' or in_dir[-1] == '/':
+                    in_dir = in_dir[:-1]
+            else:
+                if in_dir[-1] == '/':
+                    in_dir = in_dir[:-1]
+
+        # Do not allow user to access outside his storage dir
+        # in server mode.
+        if not orig_path.startswith(in_dir):
+            raise InternalServerError(
+                gettext("Access denied ({0})").format(path))
 
     @staticmethod
     def get_abs_path(in_dir, path):
diff --git a/web/pgadmin/tools/grant_wizard/__init__.py b/web/pgadmin/tools/grant_wizard/__init__.py
index ee62c4c6a..5c8687b2a 100644
--- a/web/pgadmin/tools/grant_wizard/__init__.py
+++ b/web/pgadmin/tools/grant_wizard/__init__.py
@@ -174,6 +174,79 @@ def acl_list(sid, did):
         mimetype="application/json")
 
 
+def _get_rows_for_type(conn, ntype, server_prop, node_id):
+    """
+    Used internally by properties to get rows for an object type
+    :param conn: connection object
+    :param ntype: object type
+    :param server_prop: server properties
+    :param node_id: oid
+    :return: status, execute response
+    """
+    function_sql_url = '/sql/function.sql'
+    status, res = True, []
+
+    if ntype in ['function']:
+        sql = render_template("/".join(
+            [server_prop['template_path'], function_sql_url]),
+            node_id=node_id, type='function')
+
+        status, res = conn.execute_dict(sql)
+    # Fetch procedures only if server type is EPAS or PG >= 11
+    elif len(server_prop) > 0 and (
+        server_prop['server_type'] == 'ppas' or (
+            server_prop['server_type'] == 'pg' and
+            server_prop['version'] >= 11000
+        )
+    ) and ntype in ['procedure']:
+        sql = render_template("/".join(
+            [server_prop['template_path'], function_sql_url]),
+            node_id=node_id, type='procedure')
+
+        status, res = conn.execute_dict(sql)
+
+    # Fetch trigger functions
+    elif ntype in ['trigger_function']:
+        sql = render_template("/".join(
+            [server_prop['template_path'], function_sql_url]),
+            node_id=node_id, type='trigger_function')
+        status, res = conn.execute_dict(sql)
+
+    # Fetch Sequences against schema
+    elif ntype in ['sequence']:
+        sql = render_template("/".join(
+            [server_prop['template_path'], '/sql/sequence.sql']),
+            node_id=node_id)
+
+        status, res = conn.execute_dict(sql)
+
+    # Fetch Tables against schema
+    elif ntype in ['table']:
+        sql = render_template("/".join(
+            [server_prop['template_path'], '/sql/table.sql']),
+            node_id=node_id)
+
+        status, res = conn.execute_dict(sql)
+
+    # Fetch Views against schema
+    elif ntype in ['view']:
+        sql = render_template("/".join(
+            [server_prop['template_path'], '/sql/view.sql']),
+            node_id=node_id, node_type='v')
+
+        status, res = conn.execute_dict(sql)
+
+    # Fetch Materialzed Views against schema
+    elif ntype in ['mview']:
+        sql = render_template("/".join(
+            [server_prop['template_path'], '/sql/view.sql']),
+            node_id=node_id, node_type='m')
+
+        status, res = conn.execute_dict(sql)
+
+    return status, res
+
+
 @blueprint.route(
     '/<int:sid>/<int:did>/<int:node_id>/<node_type>/',
     methods=['GET'], endpoint='objects'
@@ -185,7 +258,6 @@ def properties(sid, did, node_id, node_type):
        and render into selection page of wizard
     """
 
-    function_sql_url = '/sql/function.sql'
     get_schema_sql_url = '/sql/get_schemas.sql'
 
     # unquote encoded url parameter
@@ -198,130 +270,66 @@ def properties(sid, did, node_id, node_type):
     manager = get_driver(PG_DEFAULT_DRIVER).connection_manager(sid)
     conn = manager.connection(did=did)
 
-    node_types = []
     show_sysobj = blueprint.show_system_objects().get()
     if node_type == 'database':
-        # Fetch list of schemas
-        # Get sys_obj_values and get list of schemas
-        ntype = 'schema'
         sql = render_template("/".join(
             [server_prop['template_path'], get_schema_sql_url]),
             show_sysobj=show_sysobj)
-        status, res = conn.execute_dict(sql)
-
-        if not status:
-            return internal_server_error(errormsg=res)
-        node_types = res['rows']
+        ntype = 'schema'
     else:
         sql = render_template("/".join(
             [server_prop['template_path'], get_schema_sql_url]),
-            nspid=node_id, show_sysobj=False)
-        status, res = conn.execute_dict(sql)
+            show_sysobj=show_sysobj, nspid=node_id)
+        ntype = node_type
+
+    status, res = conn.execute_dict(sql)
+
+    if not status:
+        return internal_server_error(errormsg=res)
+    node_types = res['rows']
 
+    def _append_rows(status, res, disp_type):
         if not status:
-            return internal_server_error(errormsg=res)
-        node_types = res['rows']
-        ntype = node_type
+            current_app.logger.error(res)
+            failed_objects.append(disp_type)
+        else:
+            res_data.extend(res['rows'])
 
     for row in node_types:
         if 'oid' in row:
             node_id = row['oid']
 
-        # Fetch functions against schema
-        if ntype in ['schema', 'function']:
-            sql = render_template("/".join(
-                [server_prop['template_path'], function_sql_url]),
-                node_id=node_id, type='function')
-
-            status, res = conn.execute_dict(sql)
-            if not status:
-                current_app.logger.error(res)
-                failed_objects.append('function')
-            else:
-                res_data.extend(res['rows'])
-
-        # Fetch procedures only if server type is EPAS or PG >= 11
-        if (len(server_prop) > 0 and
-            (server_prop['server_type'] == 'ppas' or
-             (server_prop['server_type'] == 'pg' and
-              server_prop['version'] >= 11000)) and
-                ntype in ['schema', 'procedure']):
-            sql = render_template("/".join(
-                [server_prop['template_path'], function_sql_url]),
-                node_id=node_id, type='procedure')
-
-            status, res = conn.execute_dict(sql)
-
-            if not status:
-                current_app.logger.error(res)
-                failed_objects.append('procedure')
-            else:
-                res_data.extend(res['rows'])
-
-        # Fetch trigger functions
-        if ntype in ['schema', 'trigger_function']:
-            sql = render_template("/".join(
-                [server_prop['template_path'], function_sql_url]),
-                node_id=node_id, type='trigger_function')
-            status, res = conn.execute_dict(sql)
-
-            if not status:
-                current_app.logger.error(res)
-                failed_objects.append('trigger function')
-            else:
-                res_data.extend(res['rows'])
-
-        # Fetch Sequences against schema
-        if ntype in ['schema', 'sequence']:
-            sql = render_template("/".join(
-                [server_prop['template_path'], '/sql/sequence.sql']),
-                node_id=node_id)
-
-            status, res = conn.execute_dict(sql)
-            if not status:
-                current_app.logger.error(res)
-                failed_objects.append('sequence')
-            else:
-                res_data.extend(res['rows'])
-
-        # Fetch Tables against schema
-        if ntype in ['schema', 'table']:
-            sql = render_template("/".join(
-                [server_prop['template_path'], '/sql/table.sql']),
-                node_id=node_id)
-
-            status, res = conn.execute_dict(sql)
-            if not status:
-                current_app.logger.error(res)
-                failed_objects.append('table')
-            else:
-                res_data.extend(res['rows'])
-
-        # Fetch Views against schema
-        if ntype in ['schema', 'view']:
-            sql = render_template("/".join(
-                [server_prop['template_path'], '/sql/view.sql']),
-                node_id=node_id, node_type='v')
-
-            status, res = conn.execute_dict(sql)
-            if not status:
-                current_app.logger.error(res)
-                failed_objects.append('view')
-            else:
-                res_data.extend(res['rows'])
-
-        # Fetch Materialzed Views against schema
-        if ntype in ['schema', 'mview']:
-            sql = render_template("/".join(
-                [server_prop['template_path'], '/sql/view.sql']),
-                node_id=node_id, node_type='m')
-
-            status, res = conn.execute_dict(sql)
-            if not status:
-                current_app.logger.error(res)
-                failed_objects.append('materialized view')
-            else:
-                res_data.extend(res['rows'])
+        if ntype == 'schema':
+            status, res = _get_rows_for_type(
+                conn, 'function', server_prop, node_id)
+            _append_rows(status, res, 'function')
+
+            status, res = _get_rows_for_type(
+                conn, 'procedure', server_prop, node_id)
+            _append_rows(status, res, 'procedure')
+
+            status, res = _get_rows_for_type(
+                conn, 'trigger_function', server_prop, node_id)
+            _append_rows(status, res, 'trigger function')
+
+            status, res = _get_rows_for_type(
+                conn, 'sequence', server_prop, node_id)
+            _append_rows(status, res, 'sequence')
+
+            status, res = _get_rows_for_type(
+                conn, 'table', server_prop, node_id)
+            _append_rows(status, res, 'table')
+
+            status, res = _get_rows_for_type(
+                conn, 'view', server_prop, node_id)
+            _append_rows(status, res, 'view')
+
+            status, res = _get_rows_for_type(
+                conn, 'mview', server_prop, node_id)
+            _append_rows(status, res, 'materialized view')
+        else:
+            status, res = _get_rows_for_type(conn, ntype, server_prop, node_id)
+            _append_rows(status, res, 'function')
 
     msg = None
     if len(failed_objects) > 0:
diff --git a/web/pgadmin/tools/sqleditor/__init__.py b/web/pgadmin/tools/sqleditor/__init__.py
index 014192763..b16c72e0c 100644
--- a/web/pgadmin/tools/sqleditor/__init__.py
+++ b/web/pgadmin/tools/sqleditor/__init__.py
@@ -1316,76 +1316,79 @@ def save_file():
 )
 @login_required
 def start_query_download_tool(trans_id):
-    sync_conn = None
     (status, error_msg, sync_conn, trans_obj,
      session_obj) = check_transaction_status(trans_id)
 
-    if status and sync_conn is not None and \
-       trans_obj is not None and session_obj is not None:
+    if not status or sync_conn is None or trans_obj is None or \
+            session_obj is None:
+        return internal_server_error(
+            errormsg=gettext("Transaction status check failed.")
+        )
 
-        data = request.values if request.values else None
-        try:
-            if data and 'query' in data:
-                sql = data['query']
+    data = request.values if request.values else None
+    if data is None or (data and 'query' not in data):
+        return make_json_response(
+            status=410,
+            success=0,
+            errormsg=gettext(
+                "Could not find the required parameter (query)."
+            )
+        )
 
-                # This returns generator of records.
-                status, gen = sync_conn.execute_on_server_as_csv(
-                    sql, records=2000
-                )
+    try:
+        sql = data['query']
 
-                if not status:
-                    return make_json_response(
-                        data={
-                            'status': status, 'result': gen
-                        }
-                    )
-
-                r = Response(
-                    gen(
-                        quote=blueprint.csv_quoting.get(),
-                        quote_char=blueprint.csv_quote_char.get(),
-                        field_separator=blueprint.csv_field_separator.get(),
-                        replace_nulls_with=blueprint.replace_nulls_with.get()
-                    ),
-                    mimetype='text/csv' if
-                    blueprint.csv_field_separator.get() == ','
-                    else 'text/plain'
-                )
+        # This returns generator of records.
+        status, gen = sync_conn.execute_on_server_as_csv(
+            sql, records=2000
+        )
 
-                if 'filename' in data and data['filename'] != "":
-                    filename = data['filename']
-                else:
-                    import time
-                    filename = '{0}.{1}'. \
-                        format(int(time.time()), 'csv' if blueprint.
-                               csv_field_separator.get() == ',' else 'txt')
-
-                # We will try to encode report file name with latin-1
-                # If it fails then we will fallback to default ascii file name
-                # werkzeug only supports latin-1 encoding supported values
-                try:
-                    tmp_file_name = filename
-                    tmp_file_name.encode('latin-1', 'strict')
-                except UnicodeEncodeError:
-                    filename = "download.csv"
-
-                r.headers[
-                    "Content-Disposition"
-                ] = "attachment;filename={0}".format(filename)
-
-                return r
-        except (ConnectionLost, SSHTunnelConnectionLost):
-            raise
-        except Exception as e:
-            current_app.logger.error(e)
-            err_msg = "Error: {0}".format(
-                e.strerror if hasattr(e, 'strerror') else str(e))
-            return internal_server_error(errormsg=err_msg)
-    else:
-        return internal_server_error(
-            errormsg=gettext("Transaction status check failed.")
+        if not status:
+            return make_json_response(
+                data={
+                    'status': status, 'result': gen
+                }
+            )
+
+        r = Response(
+            gen(
+                quote=blueprint.csv_quoting.get(),
+                quote_char=blueprint.csv_quote_char.get(),
+                field_separator=blueprint.csv_field_separator.get(),
+                replace_nulls_with=blueprint.replace_nulls_with.get()
+            ),
+            mimetype='text/csv' if
+            blueprint.csv_field_separator.get() == ','
+            else 'text/plain'
         )
 
+        import time
+        extn = 'csv' if blueprint.csv_field_separator.get() == ',' else 'txt'
+        filename = data['filename'] if data.get('filename', '') != "" else \
+            '{0}.{1}'.format(int(time.time()), extn)
+
+        # We will try to encode report file name with latin-1
+        # If it fails then we will fallback to default ascii file name
+        # werkzeug only supports latin-1 encoding supported values
+        try:
+            tmp_file_name = filename
+            tmp_file_name.encode('latin-1', 'strict')
+        except UnicodeEncodeError:
+            filename = "download.csv"
+
+        r.headers[
+            "Content-Disposition"
+        ] = "attachment;filename={0}".format(filename)
+
+        return r
+    except (ConnectionLost, SSHTunnelConnectionLost):
+        raise
+    except Exception as e:
+        current_app.logger.error(e)
+        err_msg = "Error: {0}".format(
+            e.strerror if hasattr(e, 'strerror') else str(e))
+        return internal_server_error(errormsg=err_msg)
+
 
 @blueprint.route(
     '/status/<int:trans_id>',


^ permalink  raw  reply  [nested|flat] 9+ messages in thread

* Re: [pgAdmin][SonarQube] Reduce cognitive complexity
@ 2020-09-03 13:26  Akshay Joshi <[email protected]>
  parent: Aditya Toshniwal <[email protected]>
  0 siblings, 0 replies; 9+ messages in thread

From: Akshay Joshi @ 2020-09-03 13:26 UTC (permalink / raw)
  To: Aditya Toshniwal <[email protected]>; +Cc: pgadmin-hackers

Thanks, patch applied.

On Thu, Sep 3, 2020 at 3:38 PM Aditya Toshniwal <
[email protected]> wrote:

> Hi Hackers,
>
> Attached is the patch to reduce cognitive complexity as below:
> web/pgadmin/misc/file_manager/__init__.py 20 to 15
> web/pgadmin/tools/grant_wizard/__init__.py 55 to 15
> web/pgadmin/tools/sqleditor/__init__.py 33 to 15
>
> Please review.
>
> --
> Thanks,
> Aditya Toshniwal
> pgAdmin hacker | Sr. Software Engineer | *edbpostgres.com*
> <http://edbpostgres.com;
> "Don't Complain about Heat, Plant a TREE"
>


-- 
*Thanks & Regards*
*Akshay Joshi*
*pgAdmin Hacker | Sr. Software Architect*
*EDB Postgres <http://edbpostgres.com>*

*Mobile: +91 976-788-8246*


^ permalink  raw  reply  [nested|flat] 9+ messages in thread

* [pgAdmin][SonarQube] Reduce cognitive complexity
@ 2020-09-08 12:42  Aditya Toshniwal <[email protected]>
  0 siblings, 1 reply; 9+ messages in thread

From: Aditya Toshniwal @ 2020-09-08 12:42 UTC (permalink / raw)
  To: pgadmin-hackers

Attached patch reduces the cognitive complexity as below:
web/pgadmin/tools/sqleditor/utils/is_begin_required.py - 89 to 15
web/pgadmin/utils/driver/psycopg2/connection.py - 16 to 15
web/setup.py - 17 to 15

Please review.

-- 
Thanks,
Aditya Toshniwal
pgAdmin hacker | Sr. Software Engineer | *edbpostgres.com*
<http://edbpostgres.com;
"Don't Complain about Heat, Plant a TREE"


Attachments:

  [application/octet-stream] sonarqube.begin.patch (10.9K, 3-sonarqube.begin.patch)
  download | inline diff:
diff --git a/web/pgadmin/tools/sqleditor/utils/is_begin_required.py b/web/pgadmin/tools/sqleditor/utils/is_begin_required.py
index 52a20df7f..b118a2815 100644
--- a/web/pgadmin/tools/sqleditor/utils/is_begin_required.py
+++ b/web/pgadmin/tools/sqleditor/utils/is_begin_required.py
@@ -7,18 +7,49 @@
 #
 ##########################################################################
 
-"""Check if requires BEGIN in the current query."""
+
+def _get_keyword(query):
+    """
+    Calculate word len, used internally by is_begin_required
+    :param query: query
+    :return: keyword len, keyword
+    """
+    query_len = len(query)
+    word_len = 0
+    while (word_len < query_len) and query[word_len].isalpha():
+        word_len += 1
+
+    keyword = query[0:word_len]
+    return word_len, keyword
+
+
+def _check_next_keyword(query, word_len, keyword_list):
+    """
+    Check if the next keyword is from the keyword list
+    :param query: query
+    :param word_len: current keyword len
+    :param keyword_list: next keyword list
+    :return: boolean
+    """
+    if keyword_list is None:
+        return True
+    query_len = len(query)
+    query = query[word_len:query_len]
+    query = query.strip()
+    word_len, keyword = _get_keyword(query)
+
+    if keyword.lower() in keyword_list:
+        return False
+    return True
 
 
 def is_begin_required(query):
-    word_len = 0
+    """Check if requires BEGIN in the current query."""
     query = query.strip()
     query_len = len(query)
 
     # Check word length (since "beginx" is not "begin").
-    while (word_len < query_len) and query[word_len].isalpha():
-        word_len += 1
-
+    word_len, keyword = _get_keyword(query)
     # Transaction control commands.  These should include every keyword that
     #  gives rise to a TransactionStmt in the backend grammar, except for the
     #  savepoint-related commands.
@@ -26,42 +57,14 @@ def is_begin_required(query):
     #  (We assume that START must be START TRANSACTION, since there is
     #  presently no other "START foo" command.)
 
-    keyword = query[0:word_len]
-
-    if word_len == 5 and keyword.lower() == "abort":
-        return False
-    if word_len == 5 and keyword.lower() == "begin":
-        return False
-    if word_len == 5 and keyword.lower() == "start":
-        return False
-    if word_len == 6 and keyword.lower() == "commit":
-        return False
-    if word_len == 3 and keyword.lower() == "end":
-        return False
-    if word_len == 8 and keyword.lower() == "rollback":
-        return False
-    if word_len == 7 and keyword.lower() == "prepare":
-        # PREPARE TRANSACTION is a TC command, PREPARE foo is not
-        query = query[word_len:query_len]
-        query = query.strip()
-        query_len = len(query)
-        word_len = 0
-
-        while (word_len < query_len) and query[word_len].isalpha():
-            word_len += 1
-
-        keyword = query[0:word_len]
-        if word_len == 11 and keyword.lower() == "transaction":
-            return False
-        return True
-
     # Commands not allowed within transactions. The statements checked for
     # here should be exactly those that call PreventTransactionChain() in the
     # backend.
-    if word_len == 6 and keyword.lower() == "vacuum":
+    if keyword.lower() in ["abort", "begin", "start", "commit", "vacuum",
+                           "end", "rollback"]:
         return False
 
-    if word_len == 7 and keyword.lower() == "cluster":
+    if keyword.lower() == "cluster":
         # CLUSTER with any arguments is allowed in transactions
         query = query[word_len:query_len]
         query = query.strip()
@@ -70,98 +73,44 @@ def is_begin_required(query):
             return True  # has additional words
         return False  # it's CLUSTER without arguments
 
-    if word_len == 6 and keyword.lower() == "create":
+    if keyword.lower() == "create":
         query = query[word_len:query_len]
         query = query.strip()
         query_len = len(query)
-        word_len = 0
-
-        while (word_len < query_len) and query[word_len].isalpha():
-            word_len += 1
+        word_len, keyword = _get_keyword(query)
 
-        keyword = query[0:word_len]
-        if word_len == 8 and keyword.lower() == "database":
-            return False
-        if word_len == 10 and keyword.lower() == "tablespace":
+        if keyword.lower() in ["database", "tablespace"]:
             return False
 
         # CREATE [UNIQUE] INDEX CONCURRENTLY isn't allowed in xacts
-        if word_len == 7 and keyword.lower() == "cluster":
+        if keyword.lower() == "cluster":
             query = query[word_len:query_len]
             query = query.strip()
             query_len = len(query)
-            word_len = 0
-
-            while (word_len < query_len) and query[word_len].isalpha():
-                word_len += 1
-
-            keyword = query[0:word_len]
+            word_len, keyword = _get_keyword(query)
 
-        if word_len == 5 and keyword.lower() == "index":
+        if keyword.lower() == "index":
             query = query[word_len:query_len]
             query = query.strip()
-            query_len = len(query)
-            word_len = 0
-
-            while (word_len < query_len) and query[word_len].isalpha():
-                word_len += 1
+            word_len, keyword = _get_keyword(query)
 
-            keyword = query[0:word_len]
-            if word_len == 12 and keyword.lower() == "concurrently":
+            if keyword.lower() == "concurrently":
                 return False
         return True
 
-    if word_len == 5 and keyword.lower() == "alter":
-        query = query[word_len:query_len]
-        query = query.strip()
-        query_len = len(query)
-        word_len = 0
-
-        while (word_len < query_len) and query[word_len].isalpha():
-            word_len += 1
-
-        keyword = query[0:word_len]
-
+    next_keyword_map = {
+        # PREPARE TRANSACTION is a TC command, PREPARE foo is not
+        "prepare": ["transaction"],
         # ALTER SYSTEM isn't allowed in xacts
-        if word_len == 6 and keyword.lower() == "system":
-            return False
-        return True
-
-    # Note: these tests will match DROP SYSTEM and REINDEX TABLESPACE, which
-    # aren't really valid commands so we don't care much. The other four
-    # possible matches are correct.
-    if word_len == 4 and keyword.lower() == "drop" \
-            or word_len == 7 and keyword.lower() == "reindex":
-        query = query[word_len:query_len]
-        query = query.strip()
-        query_len = len(query)
-        word_len = 0
-
-        while (word_len < query_len) and query[word_len].isalpha():
-            word_len += 1
-
-        keyword = query[0:word_len]
-        if word_len == 8 and keyword.lower() == "database":
-            return False
-        if word_len == 6 and keyword.lower() == "system":
-            return False
-        if word_len == 10 and keyword.lower() == "tablespace":
-            return False
-        return True
-
-    # DISCARD ALL isn't allowed in xacts, but other variants are allowed.
-    if word_len == 7 and keyword.lower() == "discard":
-        query = query[word_len:query_len]
-        query = query.strip()
-        query_len = len(query)
-        word_len = 0
-
-        while (word_len < query_len) and query[word_len].isalpha():
-            word_len += 1
-
-        keyword = query[0:word_len]
-        if word_len == 3 and keyword.lower() == "all":
-            return False
-        return True
-
-    return True
+        "alter": ["system"],
+        # Note: these tests will match DROP SYSTEM and REINDEX TABLESPACE, which
+        # aren't really valid commands so we don't care much. The other four
+        # possible matches are correct.
+        "drop": ["database", "system", "tablespace"],
+        "reindex": ["database", "system", "tablespace"],
+        # DISCARD ALL isn't allowed in xacts, but other variants are allowed.
+        "discard": ["all"],
+    }
+
+    return _check_next_keyword(
+        query, word_len, next_keyword_map.get(keyword.lower(), None))
diff --git a/web/pgadmin/utils/driver/psycopg2/connection.py b/web/pgadmin/utils/driver/psycopg2/connection.py
index 9e9992563..749d91a6c 100644
--- a/web/pgadmin/utils/driver/psycopg2/connection.py
+++ b/web/pgadmin/utils/driver/psycopg2/connection.py
@@ -1125,9 +1125,8 @@ WHERE
 
         rows = []
         self.row_count = cur.rowcount
-        if cur.rowcount > 0:
-            for row in cur:
-                rows.append(dict(row))
+        for row in cur:
+            rows.append(dict(row))
 
         return True, {'columns': columns, 'rows': rows}
 
diff --git a/web/setup.py b/web/setup.py
index cfdf2045a..4cd090d15 100644
--- a/web/setup.py
+++ b/web/setup.py
@@ -155,12 +155,13 @@ def dump_servers(args):
               (servers_dumped, args.dump_servers))
 
 
-def _validate_servers_data(data):
+def _validate_servers_data(data, is_admin):
     """
     Used internally by load_servers to validate servers data.
     :param data: servers data
     :return: error message if any
     """
+    skip_servers = []
     # Loop through the servers...
     if "Servers" not in data:
         return ("'Servers' attribute not found in file '%s'" %
@@ -169,6 +170,13 @@ def _validate_servers_data(data):
     for server in data["Servers"]:
         obj = data["Servers"][server]
 
+        # Check if server is shared.Won't import if user is non-admin
+        if obj.get('Shared', None) and not is_admin:
+            print("Won't import the server '%s' as it is shared " %
+                  obj["Name"])
+            skip_servers.append(server)
+            continue
+
         def check_attrib(attrib):
             if attrib not in obj:
                 return ("'%s' attribute not found for server '%s'" %
@@ -191,6 +199,8 @@ def _validate_servers_data(data):
             return ("'Host', 'HostAddr' or 'Service' attribute "
                     "not found for server '%s'" % server)
 
+    for server in skip_servers:
+        del data["Servers"][server]
     return None
 
 
@@ -250,7 +260,7 @@ def load_servers(args):
             print("Added %d Server Group(s) and %d Server(s)." %
                   (groups_added, servers_added))
 
-        err_msg = _validate_servers_data(data)
+        err_msg = _validate_servers_data(data, user.has_role("Administrator"))
         if err_msg is not None:
             print(err_msg)
             print_summary()
@@ -259,14 +269,6 @@ def load_servers(args):
         for server in data["Servers"]:
             obj = data["Servers"][server]
 
-            # Check if server is shared.Won't import if user is non-admin
-            if 'Shared' in obj \
-                and obj['Shared'] and \
-                    not user.has_role("Administrator"):
-                print("Can't import the server '%s' as it is shared " %
-                      obj["Name"])
-                continue
-
             # Get the group. Create if necessary
             group_id = next(
                 (g.id for g in groups if g.name == obj["Group"]), -1)


^ permalink  raw  reply  [nested|flat] 9+ messages in thread

* Re: [pgAdmin][SonarQube] Reduce cognitive complexity
@ 2020-09-09 05:56  Akshay Joshi <[email protected]>
  parent: Aditya Toshniwal <[email protected]>
  0 siblings, 0 replies; 9+ messages in thread

From: Akshay Joshi @ 2020-09-09 05:56 UTC (permalink / raw)
  To: Aditya Toshniwal <[email protected]>; +Cc: pgadmin-hackers

Thanks, patch applied.

On Tue, Sep 8, 2020 at 6:13 PM Aditya Toshniwal <
[email protected]> wrote:

> Attached patch reduces the cognitive complexity as below:
> web/pgadmin/tools/sqleditor/utils/is_begin_required.py - 89 to 15
> web/pgadmin/utils/driver/psycopg2/connection.py - 16 to 15
> web/setup.py - 17 to 15
>
> Please review.
>
> --
> Thanks,
> Aditya Toshniwal
> pgAdmin hacker | Sr. Software Engineer | *edbpostgres.com*
> <http://edbpostgres.com;
> "Don't Complain about Heat, Plant a TREE"
>


-- 
*Thanks & Regards*
*Akshay Joshi*
*pgAdmin Hacker | Sr. Software Architect*
*EDB Postgres <http://edbpostgres.com>*

*Mobile: +91 976-788-8246*


^ permalink  raw  reply  [nested|flat] 9+ messages in thread

* [pgAdmin][SonarQube] Reduce cognitive complexity
@ 2020-09-22 05:33  Aditya Toshniwal <[email protected]>
  0 siblings, 1 reply; 9+ messages in thread

From: Aditya Toshniwal @ 2020-09-22 05:33 UTC (permalink / raw)
  To: pgadmin-hackers

Hi Hackers,

Attached path will reduce the cognitive complexity as
below:web/pgadmin/misc/file_manager/__init__.py
40 to 15; 43 to 15

Please review.


-- 
Thanks,
Aditya Toshniwal
pgAdmin hacker | Sr. Software Engineer | *edbpostgres.com*
<http://edbpostgres.com;
"Don't Complain about Heat, Plant a TREE"


Attachments:

  [application/x-patch] sonarqube.filemanager.patch (14.1K, 3-sonarqube.filemanager.patch)
  download | inline diff:
diff --git a/web/pgadmin/misc/file_manager/__init__.py b/web/pgadmin/misc/file_manager/__init__.py
index 9ededc1b2..2ee5a989a 100644
--- a/web/pgadmin/misc/file_manager/__init__.py
+++ b/web/pgadmin/misc/file_manager/__init__.py
@@ -338,6 +338,35 @@ class Filemanager(object):
         if self.dir is not None and isinstance(self.dir, list):
             self.dir = ""
 
+    @staticmethod
+    def get_closest_parent(storage_dir, last_dir):
+        """
+        Check if path exists and if not then get closest parent which exists
+        :param storage_dir: Base dir
+        :param last_dir: check dir
+        :return: exist dir
+        """
+        if len(last_dir) > 1 and \
+                (last_dir.endswith('/') or last_dir.endswith('\\')):
+            last_dir = last_dir[:-1]
+        while last_dir:
+            if os.path.exists(storage_dir or '' + last_dir):
+                break
+            index = max(last_dir.rfind('\\'), last_dir.rfind('/')) \
+                if _platform == 'win32' else last_dir.rfind('/')
+            last_dir = last_dir[0:index]
+
+        if _platform == 'win32':
+            if not last_dir.endswith('\\'):
+                last_dir += "\\"
+
+            return last_dir
+
+        if not last_dir.endswith('/'):
+            last_dir += "/"
+
+        return last_dir
+
     @staticmethod
     def create_new_transaction(params):
         """
@@ -357,29 +386,38 @@ class Filemanager(object):
         # It is used in utitlity js to decide to
         # show or hide select file type options
         show_volumes = isinstance(storage_dir, list) or not storage_dir
-        supp_types = allow_upload_files = params['supported_types'] \
-            if 'supported_types' in params else []
-        if fm_type == 'select_file':
-            capabilities = ['select_file', 'rename', 'upload', 'create']
-            files_only = True
-            folders_only = False
-            title = gettext("Select File")
-        elif fm_type == 'select_folder':
-            capabilities = ['select_folder', 'rename', 'create']
-            files_only = False
-            folders_only = True
-            title = gettext("Select Folder")
-        elif fm_type == 'create_file':
-            capabilities = ['select_file', 'rename', 'create']
-            files_only = True
-            folders_only = False
-            title = gettext("Create File")
-        elif fm_type == 'storage_dialog':
-            capabilities = ['select_folder', 'select_file', 'download',
-                            'rename', 'delete', 'upload', 'create']
-            files_only = True
-            folders_only = False
-            title = gettext("Storage Manager")
+        supp_types = allow_upload_files = params.get('supported_types', [])
+
+        # tuples with (capabilities, files_only, folders_only, title)
+        capability_map = {
+            'select_file': (
+                ['select_file', 'rename', 'upload', 'create'],
+                True,
+                False,
+                gettext("Select File")
+            ),
+            'select_folder': (
+                ['select_folder', 'rename', 'create'],
+                False,
+                True,
+                gettext("Select Folder")
+            ),
+            'create_file': (
+                ['select_file', 'rename', 'create'],
+                True,
+                False,
+                gettext("Create File")
+            ),
+            'storage_dialog': (
+                ['select_folder', 'select_file', 'download',
+                 'rename', 'delete', 'upload', 'create'],
+                True,
+                False,
+                gettext("Storage Manager")
+            ),
+        }
+
+        capabilities, files_only, folders_only, title = capability_map[fm_type]
 
         # Using os.path.join to make sure we have trailing '/' or '\'
         homedir = '/' if (config.SERVER_MODE) \
@@ -389,43 +427,16 @@ class Filemanager(object):
         # order to find closest parent directory
         last_dir = blueprint.last_directory_visited.get()
         check_dir_exists = False
-        if storage_dir is None:
-            if last_dir is None:
-                last_dir = "/"
-            else:
-                check_dir_exists = True
+        if last_dir is None:
+            last_dir = "/"
         else:
-            if last_dir is not None:
-                check_dir_exists = True
-            else:
-                last_dir = "/"
+            check_dir_exists = True
 
         if not config.SERVER_MODE and last_dir == "/" or last_dir == "/":
             last_dir = homedir
 
         if check_dir_exists:
-            if len(last_dir) > 1 and \
-                    (last_dir.endswith('/') or last_dir.endswith('\\')):
-                last_dir = last_dir[:-1]
-            while last_dir:
-                if os.path.exists(
-                        storage_dir
-                        if storage_dir is not None else '' + last_dir):
-                    break
-                if _platform == 'win32':
-                    index = max(last_dir.rfind('\\'), last_dir.rfind('/'))
-                else:
-                    index = last_dir.rfind('/')
-                last_dir = last_dir[0:index]
-            if not last_dir:
-                last_dir = "/"
-
-            if _platform == 'win32':
-                if not (last_dir.endswith('\\') or last_dir.endswith('/')):
-                    last_dir += "\\"
-            else:
-                if not last_dir.endswith('/'):
-                    last_dir += "/"
+            last_dir = Filemanager.get_closest_parent(storage_dir, last_dir)
 
         # create configs using above configs
         configs = {
@@ -502,7 +513,7 @@ class Filemanager(object):
         return make_json_response(data={'status': True})
 
     @staticmethod
-    def _get_drives(drive_name=None):
+    def _get_drives_with_size(drive_name=None):
         """
         This is a generic function which returns the default path for storage
         manager dialog irrespective of any Platform type to list all
@@ -512,21 +523,29 @@ class Filemanager(object):
         Platform unix:
         it returns path to root directory if no path is specified.
         """
+        def _get_drive_size(path):
+            try:
+                drive_size = getdrivesize(path)
+                return sizeof_fmt(drive_size)
+            except Exception:
+                return 0
+
         if _platform == "win32":
             try:
                 drives = []
                 bitmask = ctypes.windll.kernel32.GetLogicalDrives()
                 for letter in string.ascii_uppercase:
                     if bitmask & 1:
-                        drives.append(letter)
+                        drives.append((letter, _get_drive_size(letter)))
                     bitmask >>= 1
                 if (drive_name != '' and drive_name is not None and
                         drive_name in drives):
-                    return "{0}{1}".format(drive_name, ':')
+                    letter = "{0}{1}".format(drive_name, ':')
+                    return (letter, _get_drive_size(letter))
                 else:
                     return drives  # return drives if no argument is passed
             except Exception:
-                return ['C:']
+                return [('C:', _get_drive_size('C:'))]
         else:
             return '/'
 
@@ -548,6 +567,58 @@ class Filemanager(object):
             # Resume windows error
             kernel32.SetThreadErrorMode(oldmode, ctypes.byref(oldmode))
 
+    @staticmethod
+    def get_files_in_path(
+        show_hidden_files, files_only, folders_only, supported_types,
+            file_type, user_dir, orig_path):
+        files = {}
+        for f in sorted(os.listdir(orig_path)):
+            system_path = os.path.join(os.path.join(orig_path, f))
+
+            # continue if file/folder is hidden (based on user preference)
+            if not show_hidden_files and \
+                    (is_folder_hidden(system_path) or f.startswith('.')):
+                continue
+
+            user_path = os.path.join(os.path.join(user_dir, f))
+            created = time.ctime(os.path.getctime(system_path))
+            modified = time.ctime(os.path.getmtime(system_path))
+            file_extension = str(splitext(system_path))
+
+            # set protected to 1 if no write or read permission
+            protected = 0
+            if (not os.access(system_path, os.R_OK) or
+                    not os.access(system_path, os.W_OK)):
+                protected = 1
+
+            # list files only or folders only
+            if os.path.isdir(system_path):
+                if files_only == 'true':
+                    continue
+                file_extension = "dir"
+                user_path = "{0}/".format(user_path)
+            # filter files based on file_type
+            elif file_type is not None and file_type != "*" and \
+                (folders_only or len(supported_types) > 0 and
+                 file_extension not in supported_types or
+                    file_type != file_extension):
+                continue
+
+            # create a list of files and folders
+            files[f] = {
+                "Filename": f,
+                "Path": user_path,
+                "file_type": file_extension,
+                "Protected": protected,
+                "Properties": {
+                    "Date Created": created,
+                    "Date Modified": modified,
+                    "Size": sizeof_fmt(getsize(system_path))
+                }
+            }
+
+        return files
+
     @staticmethod
     def list_filesystem(in_dir, path, trans_data, file_type, show_hidden):
         """
@@ -572,25 +643,18 @@ class Filemanager(object):
         files = {}
         if (_platform == "win32" and (path == '/' or path == '\\'))\
                 and in_dir is None:
-            drives = Filemanager._get_drives()
-            for drive in drives:
-                protected = 0
+            drives = Filemanager._get_drives_with_size()
+            for drive, drive_size in drives:
                 path = file_name = "{0}:".format(drive)
-                try:
-                    drive_size = getdrivesize(path)
-                    drive_size_in_units = sizeof_fmt(drive_size)
-                except Exception:
-                    drive_size = 0
-                protected = 1 if drive_size == 0 else 0
                 files[file_name] = {
                     "Filename": file_name,
                     "Path": path,
                     "file_type": 'drive',
-                    "Protected": protected,
+                    "Protected": 1 if drive_size == 0 else 0,
                     "Properties": {
                         "Date Created": "",
                         "Date Modified": "",
-                        "Size": drive_size_in_units
+                        "Size": drive_size
                     }
                 }
             Filemanager.resume_windows_warning()
@@ -606,68 +670,23 @@ class Filemanager(object):
             }
 
         user_dir = path
-        folders_only = trans_data['folders_only'] \
-            if 'folders_only' in trans_data else ''
-        files_only = trans_data['files_only'] \
-            if 'files_only' in trans_data else ''
-        supported_types = trans_data['supported_types'] \
-            if 'supported_types' in trans_data else []
+        folders_only = trans_data.get('folders_only', '')
+        files_only = trans_data.get('files_only', '')
+        supported_types = trans_data.get('supported_types', [])
 
         orig_path = unquote(orig_path)
         try:
-            mylist = [x for x in sorted(os.listdir(orig_path))]
-            for f in mylist:
-                protected = 0
-                system_path = os.path.join(os.path.join(orig_path, f))
-
-                # continue if file/folder is hidden (based on user preference)
-                if not is_show_hidden_files and \
-                        (is_folder_hidden(system_path) or f.startswith('.')):
-                    continue
-
-                user_path = os.path.join(os.path.join(user_dir, f))
-                created = time.ctime(os.path.getctime(system_path))
-                modified = time.ctime(os.path.getmtime(system_path))
-                file_extension = str(splitext(system_path))
-
-                # set protected to 1 if no write or read permission
-                if (not os.access(system_path, os.R_OK) or
-                        not os.access(system_path, os.W_OK)):
-                    protected = 1
-
-                # list files only or folders only
-                if os.path.isdir(system_path):
-                    if files_only == 'true':
-                        continue
-                    file_extension = "dir"
-                    user_path = "{0}/".format(user_path)
-                else:
-                    # filter files based on file_type
-                    if file_type is not None and file_type != "*" and \
-                        (folders_only or len(supported_types) > 0 and
-                         file_extension not in supported_types or
-                            file_type != file_extension):
-                        continue
-
-                # create a list of files and folders
-                files[f] = {
-                    "Filename": f,
-                    "Path": user_path,
-                    "file_type": file_extension,
-                    "Protected": protected,
-                    "Properties": {
-                        "Date Created": created,
-                        "Date Modified": modified,
-                        "Size": sizeof_fmt(getsize(system_path))
-                    }
-                }
+            files = Filemanager.get_files_in_path(
+                is_show_hidden_files, files_only, folders_only,
+                supported_types, file_type, user_dir, orig_path
+            )
         except Exception as e:
             Filemanager.resume_windows_warning()
+            err_msg = str(e)
             if (hasattr(e, 'strerror') and
                     e.strerror == gettext('Permission denied')):
                 err_msg = str(e.strerror)
-            else:
-                err_msg = str(e)
+
             files = {
                 'Code': 0,
                 'Error': err_msg


^ permalink  raw  reply  [nested|flat] 9+ messages in thread

* Re: [pgAdmin][SonarQube] Reduce cognitive complexity
@ 2020-09-22 09:26  Akshay Joshi <[email protected]>
  parent: Aditya Toshniwal <[email protected]>
  0 siblings, 1 reply; 9+ messages in thread

From: Akshay Joshi @ 2020-09-22 09:26 UTC (permalink / raw)
  To: Aditya Toshniwal <[email protected]>; +Cc: pgadmin-hackers

Hi Aditya

The patch fixes only 1 code smell and introduce 1 new. Please fix those and
resend the patch.

On Tue, Sep 22, 2020 at 11:04 AM Aditya Toshniwal <
[email protected]> wrote:

> Hi Hackers,
>
> Attached path will reduce the cognitive complexity as below:web/pgadmin/misc/file_manager/__init__.py
> 40 to 15; 43 to 15
>
> Please review.
>
>
> --
> Thanks,
> Aditya Toshniwal
> pgAdmin hacker | Sr. Software Engineer | *edbpostgres.com*
> <http://edbpostgres.com;
> "Don't Complain about Heat, Plant a TREE"
>


-- 
*Thanks & Regards*
*Akshay Joshi*
*pgAdmin Hacker | Sr. Software Architect*
*EDB Postgres <http://edbpostgres.com>*

*Mobile: +91 976-788-8246*


^ permalink  raw  reply  [nested|flat] 9+ messages in thread

* Re: [pgAdmin][SonarQube] Reduce cognitive complexity
@ 2020-09-22 11:18  Aditya Toshniwal <[email protected]>
  parent: Akshay Joshi <[email protected]>
  0 siblings, 0 replies; 9+ messages in thread

From: Aditya Toshniwal @ 2020-09-22 11:18 UTC (permalink / raw)
  To: Akshay Joshi <[email protected]>; +Cc: pgadmin-hackers

Hi,

Please find the updated patch.

On Tue, Sep 22, 2020 at 2:56 PM Akshay Joshi <[email protected]>
wrote:

> Hi Aditya
>
> The patch fixes only 1 code smell and introduce 1 new. Please fix those
> and resend the patch.
>
> On Tue, Sep 22, 2020 at 11:04 AM Aditya Toshniwal <
> [email protected]> wrote:
>
>> Hi Hackers,
>>
>> Attached path will reduce the cognitive complexity as below:web/pgadmin/misc/file_manager/__init__.py
>> 40 to 15; 43 to 15
>>
>> Please review.
>>
>>
>> --
>> Thanks,
>> Aditya Toshniwal
>> pgAdmin hacker | Sr. Software Engineer | *edbpostgres.com*
>> <http://edbpostgres.com;
>> "Don't Complain about Heat, Plant a TREE"
>>
>
>
> --
> *Thanks & Regards*
> *Akshay Joshi*
> *pgAdmin Hacker | Sr. Software Architect*
> *EDB Postgres <http://edbpostgres.com>*
>
> *Mobile: +91 976-788-8246*
>


-- 
Thanks,
Aditya Toshniwal
pgAdmin hacker | Sr. Software Engineer | *edbpostgres.com*
<http://edbpostgres.com;
"Don't Complain about Heat, Plant a TREE"


Attachments:

  [application/octet-stream] sonarqube.filemanagerv2.patch (15.0K, 3-sonarqube.filemanagerv2.patch)
  download | inline diff:
diff --git a/web/pgadmin/misc/file_manager/__init__.py b/web/pgadmin/misc/file_manager/__init__.py
index 9ededc1b2..6a4fdf816 100644
--- a/web/pgadmin/misc/file_manager/__init__.py
+++ b/web/pgadmin/misc/file_manager/__init__.py
@@ -95,7 +95,8 @@ def is_folder_hidden(filepath):
         except (AttributeError, AssertionError):
             result = False
         return result
-    return False
+    else:
+        return os.path.basename(filepath).startswith('.')
 
 
 class FileManagerModule(PgAdminModule):
@@ -338,6 +339,35 @@ class Filemanager(object):
         if self.dir is not None and isinstance(self.dir, list):
             self.dir = ""
 
+    @staticmethod
+    def get_closest_parent(storage_dir, last_dir):
+        """
+        Check if path exists and if not then get closest parent which exists
+        :param storage_dir: Base dir
+        :param last_dir: check dir
+        :return: exist dir
+        """
+        if len(last_dir) > 1 and \
+                (last_dir.endswith('/') or last_dir.endswith('\\')):
+            last_dir = last_dir[:-1]
+        while last_dir:
+            if os.path.exists(storage_dir or '' + last_dir):
+                break
+            index = max(last_dir.rfind('\\'), last_dir.rfind('/')) \
+                if _platform == 'win32' else last_dir.rfind('/')
+            last_dir = last_dir[0:index]
+
+        if _platform == 'win32':
+            if not last_dir.endswith('\\'):
+                last_dir += "\\"
+
+            return last_dir
+
+        if not last_dir.endswith('/'):
+            last_dir += "/"
+
+        return last_dir
+
     @staticmethod
     def create_new_transaction(params):
         """
@@ -357,29 +387,38 @@ class Filemanager(object):
         # It is used in utitlity js to decide to
         # show or hide select file type options
         show_volumes = isinstance(storage_dir, list) or not storage_dir
-        supp_types = allow_upload_files = params['supported_types'] \
-            if 'supported_types' in params else []
-        if fm_type == 'select_file':
-            capabilities = ['select_file', 'rename', 'upload', 'create']
-            files_only = True
-            folders_only = False
-            title = gettext("Select File")
-        elif fm_type == 'select_folder':
-            capabilities = ['select_folder', 'rename', 'create']
-            files_only = False
-            folders_only = True
-            title = gettext("Select Folder")
-        elif fm_type == 'create_file':
-            capabilities = ['select_file', 'rename', 'create']
-            files_only = True
-            folders_only = False
-            title = gettext("Create File")
-        elif fm_type == 'storage_dialog':
-            capabilities = ['select_folder', 'select_file', 'download',
-                            'rename', 'delete', 'upload', 'create']
-            files_only = True
-            folders_only = False
-            title = gettext("Storage Manager")
+        supp_types = allow_upload_files = params.get('supported_types', [])
+
+        # tuples with (capabilities, files_only, folders_only, title)
+        capability_map = {
+            'select_file': (
+                ['select_file', 'rename', 'upload', 'create'],
+                True,
+                False,
+                gettext("Select File")
+            ),
+            'select_folder': (
+                ['select_folder', 'rename', 'create'],
+                False,
+                True,
+                gettext("Select Folder")
+            ),
+            'create_file': (
+                ['select_file', 'rename', 'create'],
+                True,
+                False,
+                gettext("Create File")
+            ),
+            'storage_dialog': (
+                ['select_folder', 'select_file', 'download',
+                 'rename', 'delete', 'upload', 'create'],
+                True,
+                False,
+                gettext("Storage Manager")
+            ),
+        }
+
+        capabilities, files_only, folders_only, title = capability_map[fm_type]
 
         # Using os.path.join to make sure we have trailing '/' or '\'
         homedir = '/' if (config.SERVER_MODE) \
@@ -389,43 +428,16 @@ class Filemanager(object):
         # order to find closest parent directory
         last_dir = blueprint.last_directory_visited.get()
         check_dir_exists = False
-        if storage_dir is None:
-            if last_dir is None:
-                last_dir = "/"
-            else:
-                check_dir_exists = True
+        if last_dir is None:
+            last_dir = "/"
         else:
-            if last_dir is not None:
-                check_dir_exists = True
-            else:
-                last_dir = "/"
+            check_dir_exists = True
 
         if not config.SERVER_MODE and last_dir == "/" or last_dir == "/":
             last_dir = homedir
 
         if check_dir_exists:
-            if len(last_dir) > 1 and \
-                    (last_dir.endswith('/') or last_dir.endswith('\\')):
-                last_dir = last_dir[:-1]
-            while last_dir:
-                if os.path.exists(
-                        storage_dir
-                        if storage_dir is not None else '' + last_dir):
-                    break
-                if _platform == 'win32':
-                    index = max(last_dir.rfind('\\'), last_dir.rfind('/'))
-                else:
-                    index = last_dir.rfind('/')
-                last_dir = last_dir[0:index]
-            if not last_dir:
-                last_dir = "/"
-
-            if _platform == 'win32':
-                if not (last_dir.endswith('\\') or last_dir.endswith('/')):
-                    last_dir += "\\"
-            else:
-                if not last_dir.endswith('/'):
-                    last_dir += "/"
+            last_dir = Filemanager.get_closest_parent(storage_dir, last_dir)
 
         # create configs using above configs
         configs = {
@@ -502,7 +514,7 @@ class Filemanager(object):
         return make_json_response(data={'status': True})
 
     @staticmethod
-    def _get_drives(drive_name=None):
+    def _get_drives_with_size(drive_name=None):
         """
         This is a generic function which returns the default path for storage
         manager dialog irrespective of any Platform type to list all
@@ -512,21 +524,29 @@ class Filemanager(object):
         Platform unix:
         it returns path to root directory if no path is specified.
         """
+        def _get_drive_size(path):
+            try:
+                drive_size = getdrivesize(path)
+                return sizeof_fmt(drive_size)
+            except Exception:
+                return 0
+
         if _platform == "win32":
             try:
                 drives = []
                 bitmask = ctypes.windll.kernel32.GetLogicalDrives()
                 for letter in string.ascii_uppercase:
                     if bitmask & 1:
-                        drives.append(letter)
+                        drives.append((letter, _get_drive_size(letter)))
                     bitmask >>= 1
                 if (drive_name != '' and drive_name is not None and
                         drive_name in drives):
-                    return "{0}{1}".format(drive_name, ':')
+                    letter = "{0}{1}".format(drive_name, ':')
+                    return (letter, _get_drive_size(letter))
                 else:
                     return drives  # return drives if no argument is passed
             except Exception:
-                return ['C:']
+                return [('C:', _get_drive_size('C:'))]
         else:
             return '/'
 
@@ -548,6 +568,79 @@ class Filemanager(object):
             # Resume windows error
             kernel32.SetThreadErrorMode(oldmode, ctypes.byref(oldmode))
 
+    @staticmethod
+    def _skip_file_extension(
+            file_type, supported_types, folders_only, file_extension):
+        """
+        Used internally by get_files_in_path to check if
+        the file extn to be skipped
+        """
+        return file_type is not None and file_type != "*" and (
+            folders_only or len(supported_types) > 0 and
+            file_extension not in supported_types or
+            file_type != file_extension)
+
+    @staticmethod
+    def get_files_in_path(
+        show_hidden_files, files_only, folders_only, supported_types,
+            file_type, user_dir, orig_path):
+        """
+        Get list of files and dirs in the path
+        :param show_hidden_files: boolean
+        :param files_only: boolean
+        :param folders_only: boolean
+        :param supported_types: array of supported types
+        :param file_type: file type
+        :param user_dir: base user dir
+        :param orig_path: path after user dir
+        :return:
+        """
+        files = {}
+
+        for f in sorted(os.listdir(orig_path)):
+            system_path = os.path.join(os.path.join(orig_path, f))
+
+            # continue if file/folder is hidden (based on user preference)
+            if not show_hidden_files and is_folder_hidden(system_path):
+                continue
+
+            user_path = os.path.join(os.path.join(user_dir, f))
+            created = time.ctime(os.path.getctime(system_path))
+            modified = time.ctime(os.path.getmtime(system_path))
+            file_extension = str(splitext(system_path))
+
+            # set protected to 1 if no write or read permission
+            protected = 0
+            if (not os.access(system_path, os.R_OK) or
+                    not os.access(system_path, os.W_OK)):
+                protected = 1
+
+            # list files only or folders only
+            if os.path.isdir(system_path):
+                if files_only == 'true':
+                    continue
+                file_extension = "dir"
+                user_path = "{0}/".format(user_path)
+            # filter files based on file_type
+            elif Filemanager._skip_file_extension(
+                    file_type, supported_types, folders_only, file_extension):
+                continue
+
+            # create a list of files and folders
+            files[f] = {
+                "Filename": f,
+                "Path": user_path,
+                "file_type": file_extension,
+                "Protected": protected,
+                "Properties": {
+                    "Date Created": created,
+                    "Date Modified": modified,
+                    "Size": sizeof_fmt(getsize(system_path))
+                }
+            }
+
+        return files
+
     @staticmethod
     def list_filesystem(in_dir, path, trans_data, file_type, show_hidden):
         """
@@ -572,25 +665,18 @@ class Filemanager(object):
         files = {}
         if (_platform == "win32" and (path == '/' or path == '\\'))\
                 and in_dir is None:
-            drives = Filemanager._get_drives()
-            for drive in drives:
-                protected = 0
+            drives = Filemanager._get_drives_with_size()
+            for drive, drive_size in drives:
                 path = file_name = "{0}:".format(drive)
-                try:
-                    drive_size = getdrivesize(path)
-                    drive_size_in_units = sizeof_fmt(drive_size)
-                except Exception:
-                    drive_size = 0
-                protected = 1 if drive_size == 0 else 0
                 files[file_name] = {
                     "Filename": file_name,
                     "Path": path,
                     "file_type": 'drive',
-                    "Protected": protected,
+                    "Protected": 1 if drive_size == 0 else 0,
                     "Properties": {
                         "Date Created": "",
                         "Date Modified": "",
-                        "Size": drive_size_in_units
+                        "Size": drive_size
                     }
                 }
             Filemanager.resume_windows_warning()
@@ -606,68 +692,23 @@ class Filemanager(object):
             }
 
         user_dir = path
-        folders_only = trans_data['folders_only'] \
-            if 'folders_only' in trans_data else ''
-        files_only = trans_data['files_only'] \
-            if 'files_only' in trans_data else ''
-        supported_types = trans_data['supported_types'] \
-            if 'supported_types' in trans_data else []
+        folders_only = trans_data.get('folders_only', '')
+        files_only = trans_data.get('files_only', '')
+        supported_types = trans_data.get('supported_types', [])
 
         orig_path = unquote(orig_path)
         try:
-            mylist = [x for x in sorted(os.listdir(orig_path))]
-            for f in mylist:
-                protected = 0
-                system_path = os.path.join(os.path.join(orig_path, f))
-
-                # continue if file/folder is hidden (based on user preference)
-                if not is_show_hidden_files and \
-                        (is_folder_hidden(system_path) or f.startswith('.')):
-                    continue
-
-                user_path = os.path.join(os.path.join(user_dir, f))
-                created = time.ctime(os.path.getctime(system_path))
-                modified = time.ctime(os.path.getmtime(system_path))
-                file_extension = str(splitext(system_path))
-
-                # set protected to 1 if no write or read permission
-                if (not os.access(system_path, os.R_OK) or
-                        not os.access(system_path, os.W_OK)):
-                    protected = 1
-
-                # list files only or folders only
-                if os.path.isdir(system_path):
-                    if files_only == 'true':
-                        continue
-                    file_extension = "dir"
-                    user_path = "{0}/".format(user_path)
-                else:
-                    # filter files based on file_type
-                    if file_type is not None and file_type != "*" and \
-                        (folders_only or len(supported_types) > 0 and
-                         file_extension not in supported_types or
-                            file_type != file_extension):
-                        continue
-
-                # create a list of files and folders
-                files[f] = {
-                    "Filename": f,
-                    "Path": user_path,
-                    "file_type": file_extension,
-                    "Protected": protected,
-                    "Properties": {
-                        "Date Created": created,
-                        "Date Modified": modified,
-                        "Size": sizeof_fmt(getsize(system_path))
-                    }
-                }
+            files = Filemanager.get_files_in_path(
+                is_show_hidden_files, files_only, folders_only,
+                supported_types, file_type, user_dir, orig_path
+            )
         except Exception as e:
             Filemanager.resume_windows_warning()
+            err_msg = str(e)
             if (hasattr(e, 'strerror') and
                     e.strerror == gettext('Permission denied')):
                 err_msg = str(e.strerror)
-            else:
-                err_msg = str(e)
+
             files = {
                 'Code': 0,
                 'Error': err_msg


^ permalink  raw  reply  [nested|flat] 9+ messages in thread


end of thread, other threads:[~2020-09-22 11:18 UTC | newest]

Thread overview: 9+ messages (download: mbox mbox.gz follow: Atom feed)
-- links below jump to the message on this page --
2020-09-01 10:04 [pgAdmin][SonarQube] Reduce cognitive complexity Aditya Toshniwal <[email protected]>
2020-09-02 12:00 ` Akshay Joshi <[email protected]>
2020-09-03 10:08 [pgAdmin][SonarQube] Reduce cognitive complexity Aditya Toshniwal <[email protected]>
2020-09-03 13:26 ` Akshay Joshi <[email protected]>
2020-09-08 12:42 [pgAdmin][SonarQube] Reduce cognitive complexity Aditya Toshniwal <[email protected]>
2020-09-09 05:56 ` Akshay Joshi <[email protected]>
2020-09-22 05:33 [pgAdmin][SonarQube] Reduce cognitive complexity Aditya Toshniwal <[email protected]>
2020-09-22 09:26 ` Akshay Joshi <[email protected]>
2020-09-22 11:18   ` Aditya Toshniwal <[email protected]>

This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox