ORM: Peewee 实践

Peewee是Python的RDBMS ORM库。与node的mongoose之于MongoDB一样,在数据库上面封装一层来抽象数据库操作。在编写业务逻辑代码时,利用Peewee可以采取OO的风格,避免编写复杂的SQL语句代码来提高写代码的效率。

Peewee与其他ORM相比,有着更简洁轻量的功能模块,在5000多行代码内完成基本功能,这包括了数据库的连接和操作、映射模型、类型定义、查询操作符等。在学习成本方面,比功能强大的sqlalchemy要简单易用得多。在兼容方面,peewee直接调用pysqlite2、psycopg2、MySQLdb库来操作SQLite、Postgresql、MySQL这三类数据库(支持扩展更多的数据库类型),并且还实现了一层连接池。

在我们的项目中,本地数据库只需存储处理结果并提供给API和二级缓存(一级缓存使用memcache)实用,使用Peewee这种小锤子就可以满足。下面,我先介绍Peewee的使用流程,再将其应用在项目上。

基本概念

使用方法还是和其他ORM一致,首先时定义数据模型Model。Model等相关概念与RDB概念的对应关系如下表。数据模型除了定义表的schema外还可以定义模型的方法。我们在模型内部嵌套一个meta类来指定Mode连接的数据库、约束条件等信息。

| ORM            | RDB                     |
|:--------------:|:-----------------------:|
| Model class    | Database table          |
| Field instance | Column on a table       |
| Model instance | Row in a database table |

连接数据库后,如果Model的表没有创建的话,还需要先使用db.create_tables([Model])创建表。

Peewee提供多种方法来操作数据库的CRUD。而Model的方法有继承自Peewee Model类的方法,这些方法时Peewee预定义的常用操作,比如.insertmany()、.getor_create()等常用方法。也可以自定义与业务自身相关的方法。在下一节中我将介绍与项目相关的自定义方法。

实践

这里我用的例子是项目中Bottle框架来与Peewee组合完成基本的API。在获得相关资源的HTTP请求时可以将数据处理交给Peewee的Model来完成。

首先我们创建两个表来存储应用数据。一个是Schema存储数据仓库中的表名和字段名。另一个是MetaColumn,存储数据字段的组合。首先我们要定义一个Model基类,其除了继承Model的通用方法并且拥有自定义的方法:.models_to_array()方法是将查询结果转换为数组。另一个.models_to_object_array()方法是将结果转换为对象数组。

# *-./models/base.py -*
from peewee import *  
from common.db import db_client  
import logging

class BaseModel(Model):

    @staticmethod
    def models_to_array(models, field):
        result = []
        for model in models:
            # hurry code need reduce the object
            result.append(model._data[field])
        logging.info('query %d record.' % len(result))
        return result

    @staticmethod
    def models_to_object_array(models, fields):
        result = []
        for model in models:
            result.append({ field: model._data[field] for field in fields})
        logging.info('query %d record.' % len(result))
        return result

    class Meta:
        database = db_client  # should be set by user

这样,接下来的两个表就可以直接继承并调用这两个方法,而不必重复定义。

Schema用来储存平台的存储数据表的信息,起结构比较简单,包括表名和字段名。

# *-./models/schema.py -*
from peewee import *  
from base import BaseModel

class Schema(BaseModel):  
    columns = CharField()
    category = CharField()

    class Meta:
        db_table = 'schemas'

这样,直接调用Schema.select().where(Schema.category == 'alerts').get()就能获得alerts表的结构信息。

而MetaColumn类处理的事情相对复杂,既需要完成数据导入,也需要对数据进行查询。

数据导入分为初始导入和增量导入,初始导入就是无脑的分批次调用.insert_many()。而增量导入对于已存在数据则不进行处理('IGNORE')。

而查询则是通过自定义的.query_by_option()方法。.query_by_option()可以通过option参数选择查询的字段,而对于条件的选择是通过category、customer、env、componen、item这五个字段来确定,再调用私用函数.__generate_clauses()可以将条件字段转换为满足Peewee查询的参数格式(其实就是Field instance)。

# *-./models/meta_column.py -*
from peewee import *  
from base import BaseModel  
from common.db import db_client  
import logging  
import operator

class MetaColumn(BaseModel):  
    category = CharField(null=True)
    customer = CharField(null=True)
    env = CharField(null=True)
    component = CharField(null=True)
    item = CharField(null=False)

    @staticmethod
    def query_by_option(option='', category='', customer='', env='', component='', item=''):
        if option.strip() != '':
            clauses = MetaColumn.__generate_clauses(category, customer, env, component, item)
            meta_columns = MetaColumn.select(getattr(MetaColumn, option)) \
                                     .distinct() \
                                     .where(clauses) \
                                     .order_by(getattr(MetaColumn, option))
            return BaseModel.models_to_array(meta_columns, option)
        else:
            return []

    @staticmethod
    def __generate_clauses(category='', customer='', env='', component='', item=''):
        clauses = []
        if category.strip() != '':
            clauses.append((MetaColumn.category == category))
        if customer.strip() != '':
            clauses.append((MetaColumn.customer == customer))
        if env.strip() != '':
            clauses.append((MetaColumn.env == env))
        if component.strip() != '':
            clauses.append((MetaColumn.component == component))
        if item.strip() != '':
            clauses.append((MetaColumn.item == item))
        if len(clauses) == 0:
            return None
        else:
            return reduce(operator.and_, clauses)

    @staticmethod
    def insert_one_by_one(data):
        for one_line in data:
            obj_inserted = {
                'category': one_line[0],
                'customer': one_line[1],
                'env': one_line[2],
                'component': one_line[3],
                'item': one_line[4]
            }

            try:
                return MetaColumn.insert(**obj_inserted).on_conflict('IGNORE').execute()  # return model instance
            except:
                logging.error('save one meta_column record failed.')

    @staticmethod
    def insert_from_array(data, bulk_size=256):
        """insert from a 2-d array of strings"""
        with db_client.transaction():
            for idx in range(0, len(data), bulk_size):
                MetaColumn.insert_many(map(lambda i: {
                    'category': i[0],
                    'customer': i[1],
                    'env': i[2],
                    'component': i[3],
                    'item': i[4],
                }, data[idx:idx + bulk_size])).execute()

    @staticmethod
    def check_empty():
        nums = MetaColumn.select().count()
        return nums == 0

    class Meta:
        db_table = 'meta_columns'
        indexes = (
            # create a unique on category/customer/env/component/item
            (('category', 'customer', 'env', 'component', 'item'), True),
        )

这样在Bottle server上响应请求的函数内调用对应表的Model的方法就可以对数据进行查询了。具体内容如下。

# *-./server.py -*
@app.route('/options/<option_type>', method=['GET'])
@check_params(option_type=str, category=str, customer=int, env=str, component=str, item =str)
def options(option_type='', category='', customer='', env='', component='', item =''):  
    try:
        result = MetaColumn.query_by_option(option_type, category, customer, env, component, item)
        return json.dumps(result)
    except Exception, e:
        return error(500, e)

迁移

迁移是在工程上的概念。当我们处在开发阶段需要常规性的修改数据表的结构时,必须要保证原始数据库能够平滑地迁移到最新的结构上。这里采用的方法是按日期编写数据库迁移代码,迁移代码放在项目的./migrations目录下,以时间为前缀+_migration命名文件,比如./migrations/20160802_migration.py

# *-./migrations/20160802_migration.py -*
import sys  
import os  
import datetime  
from playhouse.migrate import *  
PROJECT_ROOT = os.path.join(os.path.dirname(__file__), '..')  
sys.path.append(PROJECT_ROOT)  # fix directory issue  
from common.db import connect_db_client  
from common.config import load_config  
PROJECT_ROOT = os.path.dirname(__file__) + '/../'  
os.chdir(PROJECT_ROOT)


def main():  
    conf = load_config()
    db_client = connect_db_client(conf['db']['type'], os.path.join(PROJECT_ROOT, conf['db']['url']))
    migrator = SqliteMigrator(db_client)
    table_name = 'user_logs'
    col_name = 'created_on'

    # check column name exist
    col_exist = False
    cursor = db_client.execute_sql('PRAGMA table_info(user_logs);')
    for row in cursor.fetchall():
        if col_name == row[1]:
            col_exist = True

    if col_exist:
        print('Column exist.')
    else:
        # modify table schema
        created_on_field = DateTimeField(default=datetime.datetime.now)
        try:
            with db_client.transaction():
                migrate(
                    migrator.add_column(table_name, col_name, created_on_field),
                )
        except Exception, e:
            raise e
        else:
            print('Add Column to table.')
    db_client.close()
    return 0

if __name__ == "__main__":  
    main()

这个脚本向userlogs表添加createdon字段。处于安全的考虑,首先检查字段是否存在。如果存在则不需要在添加字段。当然,这样只是修改数据库的结构,还需要修改原来的Model文件。这么做无疑很麻烦,但是Peewee的反射 (能够从已有数据库上反射Model的成员变量)功能比较薄弱。

我在install_local_db.py中创建了一个数据库表结构安装器对象。DbSchemasInstaller主要完成两个功能。第一个是创建表结构,第二个依次运行migration下的文件来更新表结构。

# *-./install_local_db.py -*
class DbSchemasInstaller:  
    def __init__(self, schemas, migrations_dir='./migrations', error_file='./error_file'):
        self.schemas = schemas
        self.migrations_dir = migrations_dir
        self.error_file = error_file

    def create_tables(self):
        try:
            db.create_tables(self.schemas,
                             safe=True)  # use safe=True to pass table already exists error
        except Exception, e:  # other error occur
            logging.fatal(e)
        else:
            logging.info('finish create tables')

    def migrate_one_by_one(self):
        files = self.__get_files_by_date()
        for f in files:
            p = subprocess.Popen([sys.executable, f], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            output, error = p.communicate()
            # try:
            if p.returncode != 0:
                logging.error("migrate %s failed %d %s %s" % (f, p.returncode, output, error))
                # write error_file
                self.__write_error_file(f)
                return  # TODO considering many migration situation
            else:
                logging.info("migrate %s step output: %s" % (f[-21:-3], "".join(output.strip().split('\n'))))
            # finally:
            #     p.terminate()
            #     p.wait()
        logging.info('finish migrations')

    def __get_files_by_date(self):
        error_pyfile_name = self.__read_error_file()
        files = glob.glob(self.migrations_dir + "/*.py")
        files.sort()  # sort by first name date string
        if error_pyfile_name is None:
            return files
        else:
            os.system('rm ' + self.error_file)
            return files[files.index(error_pyfile_name):]

    def __read_error_file(self):
        if os.path.isfile(self.error_file):
            try:
                f = open(self.error_file, 'r')
                error_file = f.readline().strip()
            except Exception, e:
                logging.error('read file failed.')
                f.close()
                return None
            else:
                f.close()
                return error_file
        else:
            return None

    def __write_error_file(self, pyfile):
        try:
            with open(self.error_file, 'w') as f:
                f.write(pyfile)
        except:
            logging.error('write file failed.')
        f.close()

.migrate_one_by_one()首先读取迁移文件,然后再起一个进程来修改数据库结构。当迁移过程中出现错误时,将错误输出并记录错误文件。这样在修复问题后,可以接着执行这个错误的文件的更新内容。

References

  1. Python orm
  2. Peewee docs