Tomorrow is another day

偶然看见一个好玩的异步代码提供了神奇的装饰器的Python第三方库Tomorrow,查了下他的代码,短小精悍,却包含了很多有意思的特性,这篇文章就是从这个库说开去的。

来个使用的例子:

不使用tomorrow的
import time
import requests

urls = [
    'http://sina.com.cn',
    'http://163.com',
    'http://oschina.net',
    'http://baidu.com',
    'http://csdn.net',
]

def download(url):
    return requests.get(url)

if __name__ == "__main__":

    start = time.time()
    responses = [download(url) for url in urls]
    html = [response.text for response in responses]
    end = time.time()
    print ("Time: %f seconds" % (end - start))

# Time: 2.059337 seconds
tomorrow一下
import time
import requests

from tomorrow import threads

urls = [
    'http://sina.com.cn',
    'http://163.com',
    'http://oschina.net',
    'http://baidu.com',
    'http://csdn.net',
]

@threads(5)
def download(url):
    return requests.get(url)

if __name__ == "__main__":
    start = time.time()
    responses = [download(url) for url in urls]
    html = [response.text for response in responses]
    end = time.time()
    print ("Time: %f seconds" % (end - start))
# Time: 0.303633 seconds

** 2.059337 seconds VS 0.303633 seconds ** 你看的没错,就是这么神奇

为了探明究竟,自古华山一条路,查代码,OMG,代码只有42行,全贴:

from functools import wraps

from concurrent.futures import ThreadPoolExecutor


class Tomorrow():

    def __init__(self, future, timeout):
        self._future = future
        self._timeout = timeout

    def __getattr__(self, name):
        result = self._wait()
        return result.__getattribute__(name)

    def _wait(self):
        return self._future.result(self._timeout)


def async(n, base_type, timeout=None):
    def decorator(f):
        if isinstance(n, int):
            pool = base_type(n)
        elif isinstance(n, base_type):
            pool = n
        else:
            raise TypeError(
                "Invalid type: %s"
                % type(base_type)
            )
        @wraps(f)
        def wrapped(*args, **kwargs):
            return Tomorrow(
                pool.submit(f, *args, **kwargs),
                timeout=timeout
            )
        return wrapped
    return decorator


def threads(n, timeout=None):
    return async(n, ThreadPoolExecutor, timeout)

这里面主要包含了几个方面

  • 第三方库Requests
  • Python装饰器
  • Python的几个基本的Metaclasses() __getattr__ __getattribute__
  • 异步杀器 concurrent.futures

Requests 是使用 Apache2 Licensed 许可证的 HTTP 库。用 Python 编写,真正的为人类着想。在大部分第三方库的推荐中名列前茅。这里不做过多的说明。

Python装饰器是Python编程中的重要组成部分,也是函数式编程的重要概念,我相信我不会比coolshellPython修饰器的函数式编程写的更好,所以想了解的去看这篇文章吧。

Python的Metaclasses是Python编程的重要概念,推荐你去看Python Pocket Reference的Operator Overloading Methods章节,另外说一下,作为手册,就应该像这本书这样写。

我们今天主要说的是concurrent.futures

在Python2里面,concurrent.futures是一个第三方库,在Python3里面已经作为标准库提供了,Python3相关的文档看这里

准备

在Python2环境里,需要安装的依赖库

pip2 install futures
pip2 install requests

Python3环境

pip3 install requests

对于python来说,作为解释型语言,Python的解释器必须做到既安全又高效。我们都知道多线程编程会遇到的问题,解释器要留意的是避免在不同的线程操作内部共享的数据,同时它还要保证在管理用户线程时保证总是有最大化的计算资源。而python是通过使用全局解释器锁来保护数据的安全性: python代码的执行由python虚拟机来控制,即Python先把代码(.py文件)编译成字节码(字节码在Python虚拟机程序里对应的是PyCodeObject对象,.pyc文件是字节码在磁盘上的表现形式),交给字节码虚拟机,然后虚拟机一条一条执行字节码指令,从而完成程序的执行。python在设计的时候在虚拟机中,同时只能有一个线程执行。同样地,虽然python解释器中可以运行多个线程,但在任意时刻,只有一个线程在解释器中运行。而对python虚拟机的访问由全局解释器锁来控制,正是这个锁能保证同一时刻只有一个线程在运行。在多线程的环境中,python虚拟机按一下方式执行:

  • 1,设置GIL(global interpreter lock).
  • 2,切换到一个线程执行。
  • 3,运行:
    • a,指定数量的字节码指令。
    • b,线程主动让出控制(可以调用time.sleep(0))。
  • 4,把线程设置为睡眠状态。
  • 5,解锁GIL.
  • 6,再次重复以上步骤。

GIL的特性,也就导致了python不能充分利用多核cpu。而对面向I/O的(会调用内建操作系统C代码的)程序来说,GIL会在这个I/O调用之前被释放,以允许其他线程在这个线程等待I/O的时候运行。如果线程并为使用很多I/O操作,它会在自己的时间片一直占用处理器和GIL。这也就是所说的:I/O密集型python程序比计算密集型的程序更能充分利用多线程的好处。

总之,不要使用python多线程,使用python多进程进行并发编程,就不会有GIL这种问题存在,并且也能充分利用多核cpu。

GIL的特性,也就导致了python不能充分利用多核cpu。而对面向I/O的(会调用内建操作系统C代码的)程序来说,GIL会在这个I/O调用之前被释放,以允许其他线程在这个线程等待I/O的时候运行。如果线程并为使用很多I/O操作,它会在自己的时间片一直占用处理器和GIL。这也就是所说的:I/O密集型python程序比计算密集型的程序更能充分利用多线程的好处。 总之,不要使用python多线程,使用python多进程进行并发编程,就不会有GIL这种问题存在,并且也能充分利用多核cpu。

concurrent.futures提供的功能:

提供了多线程(ThreadPoolExecutor)和多进程(ProcessPoolExecutor)的并发功能

concurrent.futures基本方法:

class   concurrent.futures.Executor
#Executor为ThreadPoolExecutor或者ProcessPoolExecutor
concurrent.futures提供的方法如下:

1. submit(fn, *args, **kwargs)

fn:为需要异步执行的函数

args,kwargs:为给函数传递的参数

例:

#!/bin/env python
#coding:utf-8
import time,re
import os,datetime
from concurrent import futures

def wait_on_b():
    print 5
    time.sleep(2)

def wait_on_a():
    print 6
    time.sleep(2)

ex = futures.ThreadPoolExecutor(max_workers=2)
ex.submit(wait_on_b)
ex.submit(wait_on_a)
#wait_on_a和wait_on_b函数会同时执行,因为使用了2个worker

2. map(func, *iterables, timeout=None)

此map函数和python自带的map函数功能类似,只不过concurrent模块的map函数从迭代器获得参数后异步执行。并且,每一个异步操作,能用timeout参数来设置超时时间,timeout的值可以是int或float型,如果操作timeout的话,会raisesTimeoutError。如果timeout参数不指定的话,则不设置超时间。

func:为需要异步执行的函数

iterables:可以是一个能迭代的对象,例如列表等。每一次func执行,会从iterables中取参数。

timeout:设置每次异步操作的超时时间

例:

#!/bin/env python
#coding:utf-8
import time,re
import os,datetime
from concurrent import futures
data = ['1','2']

def wait_on(argument):
    print argument
    time.sleep(2)
    return 'ok'

ex = futures.ThreadPoolExecutor(max_workers=2)
for i in ex.map(wait_on,data):
    print i

map函数异步执行完成之后,结果也是list,数据需要从list中取出

submit函数和map函数,根据需要,选一个使用即可。

3. shutdown(wait=True)

此函数用于释放异步执行操作后的系统资源。

一个完整的concurrent例子

#!/bin/env python
#coding:utf-8

import time,re,fcntl
import os,datetime

from concurrent import futures

count_list = list()
minute_num = 1
start_time = datetime.datetime(2016, 1, 22, 13, 30, 0, 484870)
now = datetime.datetime.now()
os.system(':>new.txt')
f_new = open('new.txt','a')

def test(count_time_format):
    f = open('push_slave.stdout','r')
    for line in f.readlines():
        if re.search(count_time_format,line):
            #获得文件专用锁
            fcntl.flock(f_new, fcntl.LOCK_EX)
            f_new.writelines(line)
            f_new.flush()

            #释放文件锁
            fcntl.flock(f_new, fcntl.LOCK_UN)
            break

while 1:
    after_one_minute = datetime.timedelta(minutes=minute_num)
    count_time = after_one_minute + start_time
    count_time_format = count_time.strftime('%Y-%m-%d %H:%M')
    minute_num = minute_num+1
    count_list.append(count_time_format)
    if count_time_format == "2014-04-23 16:00":
        break

def exec_cmd():
    with futures.ProcessPoolExecutor(max_workers=24) as executor:
        dict((executor.submit(test, times), times) for times in count_list)

if __name__ == '__main__':
    exec_cmd()
    f_new.close()

收工,有时间比较一下 futuresmultiprocessing

jack.zh 01.22 Python 阅读  1235  次

Fork me on GitHub