• 1
  • 2
  • 3
  • 4
  • 5
mysql数据库问题 首 页  »  帮助中心  »  数据库  »  mysql数据库问题
实现Redis客户端
发布日期:2016-4-28 18:4:25

  一、Call到命令端

  在上一篇文章中,我们介绍了怎样实现一个Call的客户端基本模型,但只有Call怎么能满足需求呢?例如在redis-py中,一个完整的客户端应该是这样的,如下所示:

  client = redis.StrictRedis()

  client.setex("key", 10, "value")

  接下来作为一个程序的客户端,需要去做的就是封装出一个Redis Client。比如下面所说的setex方法:

  def setex(self, key, seconds, value):

  """Set the value and expiration of a key.

  :raises TypeError: if seconds is neither int

  """

  if not isinstance(seconds, int):

  raise TypeError("milliseconds argument must be int")

  fut = self._conn.execute(b'SETEX', key, seconds, value)

  return wait_ok(fut)

  剩下的就是一个个方法逐个完善。

  二、什么是连接池

  我们会看到,无论那个数据库客户端,都会有连接池机制。那么连接池到底是什么呢?我们又为什么需要连接池呢?

  我们都知道,对连接而言,创建是必要重型的操作。例如,TCP连接,接下来之后是登录认证等等过程,最后才会执行命令。这就是我们通常计算库性能的时候,很多时候会把建立连接的时候去掉的原因。但是这就出现了一个问题,当一个连接被占用时,其他的操作仍旧是不能够完成操作了,只能等待前一个操作完成。假如我们一次性创建一堆连接呢?从一堆连接中找到空闲的连接,使用完成后释放成空闲的状态,这就是线程池的本质。由于减少了每次创建连接的过程,因此对性能提升也非常有帮助。

  三、从单连接到连接池

  首先,我们还是创建一个RedisPool类,用于管理Redis的连接池。如下所示:

  class RedisPool:

  """Redis connections pool.

  """

  def __init__(self, address, db=0, password=None, encoding=None,

  *, minsize, maxsize, commands_factory, loop=None):

  if loop is None:

  loop = asyncio.get_event_loop()

  self._address = address

  self._db = db

  self._password = password

  self._encoding = encoding

  self._minsize = minsize

  self._factory = commands_factory

  self._loop = loop # 连接池数组

  self._pool = collections.deque(maxlen=maxsize)

  self._used = set()

  self._acquiring = 0

  self._cond = asyncio.Condition(loop=loop)

  def _create_new_connection(self):

  return create_redis(self._address,

  db=self._db,

  password=self._password,

  encoding=self._encoding,

  commands_factory=self._factory,

  loop=self._loop)

  接下来,就需要创建大量的连接了,如下所示:

  async def create_pool(self, *, override_min):

  # todo: drop closed connections first

  # 判断是否达到了连接池数量限制

  while not self._pool and self.size < self.maxsize:

  self._acquiring += 1

  try:

  conn = await self._create_new_connection()

  self._pool.append(conn)

  finally:

  self._acquiring -= 1

  # connection may be closed at yeild point

  self._drop_closed()

  那么怎么从这些连接中抽取连接并且进行连接呢,如下所示:

  @asyncio.coroutine

  def acquire(self):

  """Acquires a connection from free pool.

  Creates new connection if needed.

  """

  with await self._cond:

  while True:

  await self._fill_free(override_min=True)

  if self.freesize:

  conn = self._pool.popleft()

  assert not conn.closed, conn

  assert conn not in self._used, (conn, self._used)

  self._used.add(conn)

  return conn

  else:

  await self._cond.wait()

  接下来就是使用完成后进行释放,如下所示:

  def release(self, conn):

  """Returns used connection back into pool.

  When returned connection has db index that differs from one in pool

  the connection will be closed and dropped.

  When queue of free connections is full the connection will be dropped.

  """

  assert conn in self._used, "Invalid connection, maybe from other pool"

  self._used.remove(conn)

  if not conn.closed:

  if conn.in_transaction:

  logger.warning("Connection %r in transaction, closing it.",

  conn)

  conn.close()

  elif conn.db == self.db:

  if self.maxsize and self.freesize < self.maxsize:

  self._pool.append(conn)

  else:

  # consider this connection as old and close it.

  conn.close()

  else:

  conn.close()

  # FIXME: check event loop is not closed

  asyncio.async(self._wakeup(), loop=self._loop)

  到这里,你就已经可以实现一个基本的Redis客户端了,还在犹豫什么?快自己动手吧!

  注: 文中Redis库参考了aio-lib/aioredis库。

   后面我们会更新mysql的相关操作知识,关注mysql的朋友敬请期待。