跳转至

开发者指南

数据库引擎

为了兼容多种数据库资源, 我们引入了数据库引擎(Engine)的概念, 具体代码在 sql/engines/目录下, 具体实现类似javaInterface, 基本思路如下:

  1. 定义一个EngineBase https://github.com/hhyo/Archery/blob/master/sql/engines/__init__.py
    class EngineBase:
        """enginebase 只定义了init函数和若干方法的名字, 具体实现用mysql.py pg.py等实现"""
    
        def __init__(self, instance=None):
            self.conn = None
            if instance:
                self.instance = instance
                self.instance_name = instance.instance_name
                self.host = instance.host
                self.port = int(instance.port)
                self.user = instance.user
                self.password = instance.raw_password
    
        def get_connection(self, db_name=None):
            """返回一个conn实例"""
    
        @property
        def name(self):
            """返回engine名称"""
            return 'base'
    
        @property
        def info(self):
            """返回引擎简介"""
            return 'Base engine'
    
        def get_all_databases(self):
            """获取数据库列表, 返回一个ResultSet,rows=list"""
            return ResultSet()
    
        def get_all_tables(self, db_name):
            """获取table 列表, 返回一个ResultSet,rows=list"""
            return ResultSet()
    
        def get_all_columns_by_tb(self, db_name, tb_name):
            """获取所有字段, 返回一个ResultSet,rows=list"""
            return ResultSet()
    
        def describe_table(self, db_name, tb_name):
            """获取表结构, 返回一个 ResultSet,rows=list"""
            return ResultSet()
    
        def query_check(self, db_name=None, sql=''):
            """查询语句的检查、注释去除、切分, 返回一个字典 {'bad_query': bool, 'filtered_sql': str}"""
    
        def filter_sql(self, sql='', limit_num=0):
            """给查询语句增加结果级限制或者改写语句, 返回修改后的语句"""
    
        def query(self, db_name=None, sql='', limit_num=0, close_conn=True):
            """实际查询 返回一个ResultSet"""
    
        def query_masking(self, db_name=None, sql='', resultset=None):
            """传入 sql语句, db名, 结果集,
            返回一个脱敏后的结果集"""
            return resultset
    
        def execute_check(self, db_name=None, sql=''):
            """执行语句的检查 返回一个ReviewSet"""
    
        def execute(self):
            """执行语句 返回一个ReviewSet"""
    
        def get_execute_percentage(self):
            """获取执行进度"""
    
        def get_rollback(self, workflow):
            """获取工单回滚语句"""
    
  2. 继承EngineBase来具体实现这些方法, 为了保证所有的方法返回的数据类型一致, 我们定义了 ResultSetReverSet 对象, 分别对应查询的结果集和审核/执行工单的结果集, 各方法要求的返回参见方法下方的小说明

具体对象的定义不做详细叙述, 参见代码 https://github.com/hhyo/Archery/blob/master/sql/engines/models.py

  1. get_engine处增加入口 https://github.com/hhyo/Archery/blob/master/sql/engines/__init__.py#L74

如何接入不支持的数据库

  1. model层加入新数据库的名字 https://github.com/hhyo/Archery/blob/master/sql/models.py#L72

  2. engines目录下新增一个 python 文件, 如oracle.py , 文件内部定义一个Engine, 并继承EngineBase:

    from sql.engines import EngineBase
    
    
    class OracleEngine(EngineBase):
        def get_connection(self, db_name=None):
    ...
    
    get_connection ,query_check, filter_sql, query, get_all_databases, get_all_tables, get_all_columns_by_tb, describe_table 方法实现后, 这种数据库的查询方法就可用了

将其他方法实现后, 工单执行就可用了, 所有实现都可以暂时使用伪实现, 如脱敏, 语句检查等, 只要返回值和文档中要求一致即可

  1. get_engine函数中加入新类型数据库的入口 https://github.com/hhyo/Archery/blob/master/sql/engines/__init__.py#L74 在下方加入
        elif instance.db_type == 'oracle':
            from .oracle import OracleEngine
        return OracleEngine(instance=instance)
    

一个接入范例 Redis的接入: https://github.com/hhyo/Archery/pull/101

Back to top