Memcache的使用

Memcache

Memcache 是一种简单的内存缓存工具,通过将高频率出现的请求结果存储在内存中来减少读取数据库的次数或者较长地逻辑处理任务。最终提高整个应用服务的响应速度。

Memcache的服务器以Key-Value的形式存储数据,客户端使用TCP链接与服务器通讯。客户端有3类主要的命令,以下摘自Memcache协议:

  • 存储命令:客户端以<command name> <key> <flags> <exptime> <bytes>\r\n 的形式发送命令,其中<command name>是set、add、repalce这三种,add指明只有服务器没有该键时写入内存,repalce只在服务器有该键时写入内存。<key>是客户端指定的键。<flags>是客户端用来存储自定义信息的字段。<exptime>是设置的有效期,如果设为0,则永不过期。<bytes>是数据块。
  • 取回命令:客户端以get <key>*\r\n的形式发送命令。<key>*表示一个或多个键值。
  • 删除命令:客户端以delete <key> <time>\r\n的形式发送命令。<time>表示在该段时间内,数据被放入delete队列,无法通过get获得,也无法通过addreplace设置。

在项目上实践

Memcache的安装十分简单。我这里以ubuntu举例。首先是使用apt来安装Memcache的主程序memcached。

$ sudo apt-get update
$ sudo apt-get install memcached

安装完成后,接下来就是启动memcached。这样我们就在默认端口11211部署了Memcache。

$ sudo /etc/init.d/memcached start

在编写client程序前,我们可以先使用telnet来连接测试。连接成功后即可依据上文描述的命令来操作缓存。

$ telnet 127.0.0.1 11211

这里我使用pylibmc库编写客户端客户端程序。

首先是创建MemcacheClient类来封装pylibmc。在类的初始化方法__init__()中,除了进行实例化pylibmc.Client()和有效期外,最重要的是需要使用线程映射池(Thread-mapped pooling)来解决client线程安全的问题。因为我们的Server是多线程响应请求的。在单memcache client的模式下,Server会报出线程安全的警告。

在Python中存在全局解释器锁。在解释器解释执行任何Python代码时,都需要先获得这把锁才行,在遇到I/O 操作时会释放这把锁。[3]

线程映射池会保留每个发出请求的线程。当某线程再次使用client时会检查是否已经存在,如果不存在才新克隆master client。这样当多线程的Server访问memcache时就不会报thread safe的警告了。

这里我们还是连接本地默认端口上的Memcache,并且将有效期设置为半小时。

# *-./common/memcacheclient.py -*
import pylibmc


class MemcacheClient():

    def __init__(self, address='localhost:11211', timeout=1800):
        client = pylibmc.Client([address])
        self.timeout = timeout
        self.pool = pylibmc.ThreadMappedPool(client)

    def set(self, key, value, timeout=None):
        if self.pool is not None:
            with self.pool.reserve() as mc:
                mc.set(key, value, timeout or self.timeout)

    def get(self, key):
        if self.pool is not None:
            with self.pool.reserve() as mc:
                value = mc.get(key)
        return value

而在server.py这边,我们首先创建一个装饰器来应用缓存,我们将用户半小时内的查询缓存起来。首先将查询条件中的时间字段下取整至timeout的倍数,保证时间字段能够匹配Memcache的key。然后再过滤、排序关键字段,这些关键字段会组成查询的key。因为缓存不会永远有效,所以必须有针对Memcache失效时向后端存取数据的重试方案。

# *-./server.py -*
def check_cache(f):  
    # the function use to check and save request result in memcache
    def decorate(*args, **kwargs):
        # index page invoke aggregation function, we need expire time
        if 'to_time' in kwargs and 'from_time' in kwargs:
            kwargs['to_time'] = str(kwargs['to_time'] - (kwargs['to_time'] % (mc_client.timeout * 1000)))
            kwargs['from_time'] = str(kwargs['from_time'] - kwargs['from_time'] % (mc_client.timeout * 1000))
        key = dict(filter(lambda x: not x[1] in ['', 'xls'], kwargs.items()))
        key = collections.OrderedDict(sorted(key.items()))
        key = '_'.join([request.method, request.path] + key.values())
        try:
            result = mc_client.get(key)
        except:
            logging.error('request cache get failed.')
            result = None
        if result is None:
            result = f(*args, **kwargs)
        # store ad-hoc or search result to memcache
        try:
            mc_client.set(key, result)
        except:
            logging.error('store cache set failed.')
            # logging.error(traceback.print_exc())

        return result
    return decorate

当我们需要应用缓存时,将装饰器写在接口上面就能利用Memcache缓存。比如/aggregation/(category)接口,一般的后端程序会向Hive发起group by等操作,这个API的处理时间也会随着查询时间范围的增大而增大。

# *-./server.py -*
@app.route('/aggregation/(category)', method=['GET']) 
@check_params(category=str, selections=str, filters=str, groups=str, from_time=int, to_time=int, order_by=str, format=str)
@check_cache
def aggregation(category, selections='', filters='', groups='', from_time=0, to_time=0, order_by='', format=''):  
    try:
        if selections == '':
            return error(400, Exception('must give at least one selection'))
        _selections = map(lambda i: 'count(*) AS count_' if i == 'count' else i, trunk(selections.split(',')))
        _filters = dict(map(lambda i: tuple(i.split(':')), trunk(filters.split(','))))
        _groups = trunk(groups.split(','))
        if order_by == '' and selections.find('count') >= 0:
            order_by = 'count(*)'
        _order_by = map(lambda i: 'count_' if i == 'count(*)' else i, trunk(order_by.split(',')))
        result = SqlAggregationTask().aggregate(category, _selections, _filters, _groups, from_time, to_time, _order_by)
        result = map(lambda i: dict(zip(trunk(selections.split(',')), i)), result)
        # if format == 'xls':
        #     return xls(result)
        # else:
        return json.dumps(result)
    except Exception, e:
        return error(500, e)

这样执行服务器程序后,在第一次查询时还是会直接向Hive发出请求。而当半小时内再次发出同样的查询时,就会直接将内存数据返回给客户端。

References

  1. memcache协议翻译
  2. pylibmc线程的讨论
  3. GIL