• 1
  • 2
  • 3
  • 4
  • 5
mysql数据库问题 首 页  »  帮助中心  »  数据库  »  mysql数据库问题
零基础一:Python Redis Client
发布日期:2016-4-28 17:4:7

 一、什么是AIO

  Asynchronous Input/Output的简写是AIO,也就是异步IO。不过在谈什么是AIO之前,我们要先介绍一下BIO。那么什么是BIO呢?简单说来,BIO是Blocking Input/Output,也就是阻塞IO,他实现的通常是在线程池中找出一个线程处理IO,在IO过程中,其他线程都需要等待IO完成后才可以从中选取一个线程占用IO。这样导致的最大的问题是,当线程数量较多,且需要大量的IO操作时,就会造成一个大量的阻塞,因为事实上每次只有一个线程在处理IO。

  那么怎样解决这个时候的问题呢?这时候就提出了AIO的概念。通常在IO处理过程中也会伴有一些其他的处理操作,假设把所有的操作都浪费在了等待IO释放上,线程池中的线程利用率也太低了,所以我们需要一种方式,在申请IO处理之后,就去继续做其他的事情,等IO操作完成了,然后通知我们已经OK,我们可以继续处理了。这也就是我们所说的AIO的原型。

  AIO的情况也说明了它适用的场景:

    *长连接场景

    *重度的IO操作

  若找软件来做案例,我们可以找一个可能大家熟知的:NGINX。正如我们所知道的,NGINX采用了异步、事件驱动的方法来处理连接。这种处理方式无需(像使用传统架构的服务器一样)为每个请求创建额外的专用进程或线程,而是在一个工作进程中处理多个连接和请求。所以,NGINX工作在非阻塞的socket模式下,并使用了epoll 与 kqueue这样有效的方法。

  这部分的内容,在NGINX引入线程池 性能提升9倍中进行了详细的介绍,包含了NGINX的异步应用经验,同时介绍了NGINX中引入了阻塞的线程池用于解决某些特定场景问题下的效率。

  二、如何实现Python的异步IO

  这篇文章以最新的Python 3.5版本为基础来介绍实现一个异步的Python Redis Client。在此之前,我们先来看一下,如何实现Python的aio。

  Python的aio官方封装了一个比较合适的基础库asyncio。

  让我们从一个例子开始简单认识一下如何实现一个异步的aio client。以官方文档中的例子为例,参考代码如下所示:

  import asyncio

  async def tcp_echo_client(message, loop):

  reader, writer = await asyncio.open_connection('127.0.0.1', 8888,

  loop=loop)

  print('Send: %r' % message)

  writer.write(message.encode())

  data = await reader.read(100)

  print('Received: %r' % data.decode())

  print('Close the socket')

  writer.close()

  message = 'Hello World!'

  loop = asyncio.get_event_loop()

  loop.run_until_complete(tcp_echo_client(message, loop))

  loop.close()

  上面的例子用到的Python 3.5中引入的async/await关键字,还有asyncio库。这里面asyncio.open_connection会返回一个coroutine,这个可以使用await进行一个aio的调用,也就是在收到返回信号之前,程序可以继续去处理其他的任务。这里面真正核心的就是EventLoop,它负责监视发送这些信号,并且返回数据,它可通过asyncio.get_event_loop获取到。然后他会真正返回的数据是一个读取StreamReader与写入StreamWriter的对象。

  接下来就可以通过这个reader和writer进行数据的读取与写入。writer是可以直接写入的,若为reader的话,就需要aio的方式等待受到数据后返回。这样看起来更接近于普通的socket编程。不过关闭连接时,仅仅需要关闭writer就足够了。

  三、从socket通讯到redis通讯

  所有的网络请求本质上来说都可以看成是SocketIO的请求,所以,我们可以把Redis的请求当做是一个socket的通讯来进行,这样就很方便了。

  但通讯的数据格式怎么办?没关系,这里我们使用hiredis-py来解决协议解析的问题。从库设计的角度来说,我们需要封装一个RedisConnection的类出来解决Redis的通讯协议。它可能传入的参数包含:

  1.一个StreamReader、

  2.一个StreamWriter

  3.一个EventLoop

  4.编码encoding

  5.其他的我们就用一个*来表示

   参考代码如下所示:

  class RedisConnection(object):

  '''Redis Connection'''

  def __init__(self, reader, writer, *, encoding=None, loop=None):

  if loop is None:

  loop = asyncio.get_event_loop()

  self._reader = reader

  self._writer = writer

  self._encoding = encoding

  self._loop = loop

  self._db = 0

  def __repr__(self):

  return ''.format(self._db)

  记得加上__repr__用来描述这个对象,这是一个好习惯。接下来就需要完善这个类了,例如,我们需要添加一个关闭连接的方法,这需要至少一个参数用于标记连接是否关闭,一个用于执行关闭操作,比如我们需要这样子的,如下所示:

  def close(self):

  """Close connection."""

  self._do_close(None)

  def _do_close(self, exc):

  if self._closed:

  return

  self._closed = True

  self._closing = False

  # 关闭写入

  self._writer.transport.close()

  # 取消读取任务

  self._reader_task.cancel()

  self._reader_task = None

  self._writer = None

  self._reader = None

  @property

  def closed(self):

  """True if connection is closed."""

  closed = self._closing or self._closed

  if not closed and self._reader and self._reader.at_eof():

  self._closing = closed = True

  self._loop.call_soon(self._do_close, None)

  return closed

  连接这类的方法已经处理完了,那么接下来就应该是执行Redis命令了,我们可以叫它execute。那他需要几个东西,一个是执行的指令command,一个是指令参数*args,还有一些其他的,比如编码encoding。为了节省时间,只是考虑一些Set与Get的基本操作。那么Redis和mysql的数据结构是什么样子的呢?我们这里说redis,我们还需要先把它编译成Redis-server可以识别的形式,那么需要一个encode_command方法。如下所示:

  _converters = {

  bytes: lambda val: val,

  bytearray: lambda val: val,

  str: lambda val: val.encode('utf-8'),

  int: lambda val: str(val).encode('utf-8'),

  float: lambda val: str(val).encode('utf-8'),

  }

  def encode_command(*args):

  """Encodes arguments into redis bulk-strings array.

  Raises TypeError if any of args not of bytes, str, int or float type.

  """

  buf = bytearray()

  def add(data):

  return buf.extend(data + b'\r\n')

  add(b'*' + _bytes_len(args))

  for arg in args:

  if type(arg) in _converters:

  barg = _converters[type(arg)](arg)

  add(b'$' + _bytes_len(barg))

  add(barg)

  else:

  raise TypeError("Argument {!r} expected to be of bytes,"

  " str, int or float type".format(arg))

  return buf

  这样可以转化为可以识别的形式了,接下来还有一个问题,那么怎么让程序可以等待信号的生效呢?在这里我们介绍一下asyncio.Future。这个asyncio.Future类是用于封装回调函数的类,包含了一些更加方便使用的方法。通过这个类,可以实现aio的通知机制,也就是回调。这个类实例可以通过await返回我们需要的结果。不过这样就还需要在项目中添加一些更多的变量,比如所有等待返回的self._waiters。如下所示:

  def execute(self, command, *args, encoding=None):

  """Executes redis command and returns Future waiting for the answer.

  Raises:

  * TypeError if any of args can not be encoded as bytes.

  * ReplyError on redis '-ERR' resonses.

  * ProtocolError when response can not be decoded meaning connection

  is broken.

  """

  assert self._reader and not self._reader.at_eof(), (

  "Connection closed or corrupted")

  if command is None:

  raise TypeError("command must not be None")

  if None in set(args):

  raise TypeError("args must not contain None")

  # 这样小写也没有问题了

  command = command.upper().strip()

  if encoding is None:

  encoding = self._encoding

  fut = asyncio.Future(loop=self._loop)

  self._writer.write(encode_command(command, *args))

  self._waiters.append((fut, encoding, cb))

  return fut

  现在所有的命令都已经发送到了redis-server,接下来就需要读取对应的结果了。如下所示:

  async def _read_data(self):

  """Response reader task."""

  while not self._reader.at_eof():

  try:

  data = await self._reader.read(65536)

  except asyncio.CancelledError:

  break

  except Exception as exc:

  # XXX: for QUIT command connection error can be received

  # before response

  logger.error("Exception on data read %r", exc, exc_info=True)

  break

  self._parser.feed(data)

  while True:

  try:

  obj = self._parser.gets()

  except ProtocolError as exc:

  # ProtocolError is fatal

  # so connection must be closed

  self._closing = True

  self._loop.call_soon(self._do_close, exc)

  if self._in_transaction:

  self._transaction_error = exc

  return

  else:

  if obj is False:

  break

  else:

  self._process_data(obj)

  self._closing = True

  self._loop.call_soon(self._do_close, None)

  def _process_data(self, obj):

  """Processes command results."""

  waiter, encoding, cb = self._waiters.popleft()

  if waiter.done():

  logger.debug("Waiter future is already done %r", waiter)

  assert waiter.cancelled(), (

  "waiting future is in wrong state", waiter, obj)

  return

  if isinstance(obj, RedisError):

  waiter.set_exception(obj)

  if self._in_transaction:

  self._transaction_error = obj

  else:

  if encoding is not None:

  try:

  obj = decode(obj, encoding)

  except Exception as exc:

  waiter.set_exception(exc)

  return

  waiter.set_result(obj)

  if cb is not None:

  cb(obj)

  有了这些之后,我们就可以简单创建一个连接了,如下所示:

  async def create_connection(address, *, db=None, password=None,

  encoding=None, loop=None):

  """Creates redis connection.

  Opens connection to Redis server specified by address argument.

  Address argument is similar to socket address argument, ie:

  * when address is a tuple it represents (host, port) pair;

  * when address is a str it represents unix domain socket path.

  (no other address formats supported)

  Encoding argument can be used to decode byte-replies to strings.

  By default no decoding is done.

  Return value is RedisConnection instance.

  This function is a coroutine.

  """

  assert isinstance(address, (tuple, list, str)), "tuple or str expected"

  if isinstance(address, (list, tuple)):

  host, port = address

  reader, writer = await asyncio.open_connection(

  host, port, loop=loop)

  else:

  reader, writer = await asyncio.open_unix_connection(

  address, loop=loop)

  conn = RedisConnection(reader, writer, encoding=encoding, loop=loop)

  try:

  if password is not None:

  yield from conn.auth(password)

  if db is not None:

  yield from conn.select(db)

  except Exception:

  conn.close()

  return conn

  这样,连接部分的代码基本上已经处理完成了,接下来要做的就是实现基于这个连接的命令执行了,下面的内容会下一个文章中继续介绍,我们也会更新关于mysql的相关知识,敬请期待。