# # Copyright (c) 2016, 2024, Oracle and/or its affiliates. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License, version 2.0, # as published by the Free Software Foundation. # # This program is designed to work with certain software (including # but not limited to OpenSSL) that is licensed under separate terms, # as designated in a particular file or component or in included license # documentation. The authors of MySQL hereby grant you an additional # permission to link the program and your derivative works with the # separately licensed software that they have either included with # the program or referenced in the documentation. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See # the GNU General Public License, version 2.0, for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA # """ This module contains the methods to support common operations over the ip address or hostnames and parsing of connection strings. """ import re import os import logging # Use backported OrderedDict if not available (for Python 2.6) try: from collections import OrderedDict except ImportError: from ordered_dict_backport import OrderedDict from mysql_gadgets.common.logger import CustomLevelLogger from mysql_gadgets.exceptions import GadgetCnxFormatError logging.setLoggerClass(CustomLevelLogger) _LOGGER = logging.getLogger(__name__) _BAD_CONN_FORMAT = (u"Connection '{0}' cannot be parsed. Please review the " u"used connection string (accepted formats: " u"@[:][:]") _BAD_QUOTED_HOST = u"Connection '{0}' has a malformed quoted host" _INVALID_PORT = (u"Invalid port '{0}' for connection string '{1}'. " u"Port must be >= 0 and <= 65535.") _UNPARSED_CONN_FORMAT = ("Connection '{0}' not parsed completely. Parsed " "elements '{1}', unparsed elements '{2}'") _CONN_QUOTEDHOST = re.compile( r"((?:^[\'].*[\'])|(?:^[\"].*[\"]))" # quoted host name r"(?:\:(\d+))?" # Optional port number r"(?:\:([\/\\w+.\w+.\-]+))?" # Optional path to socket ) _CONN_LOGINPATH = re.compile( r"((?:\\\"|[^:])+|(?:\\\'|[^:])+)" # login-path r"(?:\:(\d+))?" # Optional port number r"(?:\:([\/\\w+.\w+.\-]+))?" # Optional path to socket ) _CONN_CONFIGPATH = re.compile( r"([\w\:]+(?:\\\"|[^[])+|(?:\\\'|[^[])+)" # config-path r"(?:\[([^]]+))?", # group re.U ) _CONN_ANY_HOST = re.compile( r"""( (?![.-]) # Match cannot start by '.' or '-' [\w._-]* # start with 0 or more: alphanum or '%' % # must have a wildcard '%' at least [\w.%_-]* # then 0 or more: alphanum, '%', '.' or '-' (? 65535: raise GadgetCnxFormatError(_INVALID_PORT.format(port, connection_str)) connection["port"] = port elif socket is not None and os.name == "posix": connection['unix_socket'] = socket # Port is mandatory to create Server instance. connection["port"] = None else: connection["port"] = _DEFAULT_PORT return connection def parse_server_address(connection_str): """Parses host, port and socket from the given connection string. :param connection_str: Connection string in the form: user@host[:port][:socket]. :type connection_str: string :return: a tuple of (host, port, socket, add_type) where add_type is the name of the parser that successfully parsed the hostname from the connection string. :rtype: tuple :raises GadgetCnxFormatError: if a parsing error occurs. """ # Default values to return. host = None port = None socket = None address_type = None unparsed = None # From the matchers look the one that match a host. for ip_matcher in IP_MATCHERS: try: group = _match(IP_MATCHERS[ip_matcher], connection_str) if group: host = group[0] if ip_matcher == IPV6: host = "[%s]" % host if group[1]: part2_port_socket = _match(_CONN_PORT_ONLY, group[1], throw_error=False) if not part2_port_socket: unparsed = group[1] else: port = part2_port_socket[0] if part2_port_socket[1]: part4 = _match(_CONN_SOCKET_ONLY, part2_port_socket[1], throw_error=False) if not part4: unparsed = part2_port_socket[1] else: socket = part4[0] unparsed = part4[1] # If host is match we stop looking as is the most significant. if host: address_type = ip_matcher break # ignore the error trying to match. except GadgetCnxFormatError: pass # we must alert, that the connection could not be parsed. if host is None: raise GadgetCnxFormatError(_BAD_CONN_FORMAT.format(connection_str)) _verify_parsing(connection_str, host, port, socket, address_type, unparsed) _LOGGER.debug("->parse_server_address \n host: %s \n address_type: %s", host, address_type) return host, port, socket, address_type def _verify_parsing(connection_str, host, port, socket, address_type, unparsed): """Verify connection string parsing. Verify that the connection string was totally parsed and not parts of it where not matched, otherwise raise an error. :param connection_str: Connection string in the form: user@host[:port][:socket]. :type connection_str: string :param host: the parsed host :type host: string :param port: the parsed port :type port: string :param socket: the parsed socket :type socket: string :param address_type: Type of the parsed host, one of the following value: "IPv4", "IPv6" or "hostname" :type address_type: string :param unparsed: unparsed string (not identified part) :type unparsed: string :raises GadgetCnxFormatError: if a part of the connection string was not identified while parsed. """ exp_connection_str = connection_str # _LOGGER.debug("exp_connection_str %s", exp_connection_str) parsed_connection_list = [] if host: # _LOGGER.debug("host %s", host) if address_type == IPV6 and "[" not in connection_str: host = host.replace("[", "") host = host.replace("]", "") parsed_connection_list.append(host) if port: # _LOGGER.debug("port %s", port) parsed_connection_list.append(port) if socket: # _LOGGER.debug("socket %s", socket) parsed_connection_list.append(socket) parsed_connection = ":".join(parsed_connection_list) # _LOGGER.debug('parsed_connection %s', parsed_connection) diff = None if not unparsed: # _LOGGER.debug('not unparsed found, creating diff') diff = connection_str.replace(host, "") if port: diff = diff.replace(port, "") if socket: diff = diff.replace(socket, "") # _LOGGER.debug("diff %s", diff) # _LOGGER.debug("unparsed %s", unparsed) if unparsed or (exp_connection_str != parsed_connection and (diff and diff != ":")): # _LOGGER.debug("raising exception") parsed_args = "host:%s, port:%s, socket:%s" % (host, port, socket) err_msg = _UNPARSED_CONN_FORMAT.format(connection_str, parsed_args, unparsed) # _LOGGER.warning(err_msg) raise GadgetCnxFormatError(err_msg) def _match(pattern, connection_str, throw_error=True): """Check pattern match with connection string. Tries to match a pattern with the connection string and returns the groups. :param pattern: Regular expression object used to parse the connection string. :type pattern: re.RegexObject :param connection_str: Connection string in the form: user@host[:port][:socket]. :type connection_str: string :param throw_error: Indicate if an exception is raised (True) if the connection string does not match the pattern, or False is returned. By default: True. :type throw_error: boolean :return: tuple containing all the subgroups of the matched pattern. If no match is found False is returned if 'throw_error' is set to False. :rtype: tuple or boolean :raises GadgetCnxFormatError: if connection string does not match the given pattern and throw_error is set to True. """ grp = pattern.match(connection_str) if not grp: if throw_error: raise GadgetCnxFormatError(_BAD_CONN_FORMAT.format(connection_str)) return False return grp.groups() def clean_IPv6(host_address): """Clean IPv6 host address. :param host_address: host address (IPv6) :type host_address: string :return: the given host address without '[' and ']' characters (removed). :rtype: string """ if host_address: host_address = host_address.replace("[", "") host_address = host_address.replace("]", "") return host_address def parse_user_host(user_host): """Parse user, passwd, host, port from user:passwd@host :param user_host: MySQL user string (user:passwd@host) :type user_host: string :return: Tuple with the user and host. :rtype: tuple :raises: GadgetError if the user and host could not be parsed """ no_ticks = user_host.replace("'", "") try: conn_values = parse_connection(no_ticks) except GadgetCnxFormatError as err: raise GadgetCnxFormatError("Cannot parse user@host from: {0}." "".format(no_ticks), cause=err) return (conn_values['user'], conn_values['host'])