BUJΖddlZddlZddlmZddlmZddlmZddlm Z ej e Z dZ Gdd eZd Zd Zd Zd ZejdZejdZejdZejdZejdZejdZejdZdZejdZejdZdS)N)config)engines)url)compatc6eZdZdZedZdZdZdS)registerci|_dSNfns)selfs /srv/buildsys-work-dir/castor/build_node/builder-2/WGSG1/unpkd_srcs/cloudlinux-venv-1.0.6/venv/lib/python3.11/site-packages/sqlalchemy/testing/provision.py__init__zregister.__init__s cVtd|SN*)r for_db)clsfns rinitz register.inits$%xzz  %%b)))rcfd}|S)Nc|j<Sr r )rdbnamers rdecoratez!register.for_db..decorates!DHV Kr)rrrs`` rrzregister.for_dbs)      rc<t|tjrtj|}n)t|tjr|}n |jj}|}||j vr|j ||g|RS|j d|g|RSr) isinstancer string_typessa_urlmake_urlURLdbrget_backend_namer)rcfgargrbackends r__call__zregister.__call__s c6. / / /#&&CC VZ ( ( CC&*C&&(( dh  $48G$S/3/// / 48C=+s+++ +rN)__name__ __module__ __qualname__r classmethodrrr*rrrr r s\**[* , , , , ,rr ctD]>}td||jjt ||j|?dS)NzCREATE database %s, URI %r)_configs_for_db_operationloginfor%r create_dbfollower_identr's rcreate_follower_dbr6+sS(**// -~svzJJJ#sv~....//rctj|}||rt ||}i}t ||t j||}t|||| tj ||||}|rt|||Sr )r"r# get_dialectload_provisioningfollower_url_from_mainupdate_db_optsrtesting_enginepost_configure_engineconnectcloserConfigr configure_follower)db_urloptions file_configr5dialectdb_optsengr's r setup_configrH1sof%%1133G @'??G67###   1 1C&#~666KKMM - gw D DC03/// JrctD]>}td||jjt ||j|?dS)NzDROP database %s, URI %r)r0r1r2r%rdrop_dbr4s rdrop_follower_dbrKFsS(**-- +^SVZHHHSV^,,,,--rc#Kt}tjD]}|jtjD]T}|jj}|}||j|j |j f}||vr|V| |UtjD]}|jdSr ) setrr@ all_configsr%disposerr&usernamehostdatabaseadd)hostsr'rr) host_confs rr0r0Ls EEE}((** }((**!!fj&&((clCHclC E ! !III IIi }((** rc0td|jz)zDynamically create a database for testing. Used when a test run will employ multiple processes, e.g., when run via `tox` or `pytest -n4`. z"no DB creation routine for cfg: %sNotImplementedErrorrr'rGidents rr3r3_s BSWL M MMrc0td|jz)z8Drop a database that we dynamically created for testing.zno DB drop routine for cfg: %srWrYs rrJrJis >H I IIrcdS)zCSet database options (db_opts) for a test database that we created.Nr)rBrFs rr;r;o  DrcdS)zPerform extra steps after configuring an engine for testing. (For the internal dialects, currently only used by sqlite.) Nr)renginer5s rr=r=us  Drc<tj|}||_|S)zCreate a connection URL for a dynamically-created test database. :param url: the connection URL specified when the test run was invoked :param ident: the pytest-xdist "worker identifier" to be used as the database name )r"r#rRrrZs rr:r:~s /#  CCL JrcdS)z@Create dialect-specific config settings for a follower database.Nr)r'rZs rrArAr]rcdS)a[Remove databases that were created during the test process, after the process has ended. This is an optional step that is invoked for certain backends that do not reliably release locks on the database as long as a process is still in use. For the internal dialects, this is currently only necessary for mssql and oracle. Nrras r run_reap_dbsrds  Drctdtjt}tjt}i}t |5}|D]}|}|d\}}tj |}||vr1| ||<|| | |j f} || ||| | dddn #1swxYwY|D]5} t|| d} || } t!| | 6dS)NzReaping databases... r)r1r2 collections defaultdictrMopenstripsplitr"r#r8r9r&rQrSlistrd) idents_fileurlsidentsdialectsfile_linedb_namerBurl_objurl_keyrrZs rreap_dbsrvsHH #$$$  "3 ' 'D  $S ) )FH k   )e ) )D::<D;c0td|jz)aOSpecify keyword arguments for creating a temporary Table. Dialect-specific implementations of this method will return the kwargs that are passed to the Table method when creating a temporary table for testing, e.g., in the define_temp_tables method of the ComponentReflectionTest class in suite/test_reflection.py z.no temp table keyword args routine for cfg: %srW)r'rGs rtemp_table_keyword_argsrxs  837B  rcdSr r)rr%testclss rstop_test_classr{sDr)rgloggingrrr_rr"utilr getLoggerr+r1FOLLOWER_IDENTobjectr r6rHrKr0rr3rJr;r=r:rArdrvrxr{rrrrs""""""g!!,,,,,v,,,:/// *--- & NNN JJJ                       !!!2          r