标签:Python

1277 ℉

16.02.18

Python发送邮件处理

今天要说的是Python发送邮件,其实发送邮件的代码都差不多,但是使用其他的邮件服务器,还是有很多的限制,这里就简要说明一下使用其他的邮件服务器发送邮件和自己建立邮件服务器发送邮件,主要是贴代码。

系统 Ubuntu 14.04.3

用163服务器发送邮件

优点:少量邮件需要发送的时候还是可以的,不用自建服务器,减少了服务器的开销

缺点:频繁发送会有问题,网络延迟严重,异步IO回调慢等

不要吐槽我用start_new_thread

# -*- coding: UTF-8 -*-
'''
jack.zh sen mail test.
'''
import thread        
import smtplib
from email.mime.text import MIMEText

from log import logger

From_email = {
    "mail_host" :"smtp.163.com",  #设置服务器
    "mail_user" :"user_name",    #用户名
    "mail_pass" :"xxxxxxxx",   #口令 
    "mail_postfix" :"163.com"  #发件箱的后缀 
}

def send_mail(to_list, sub, content):
    me="my_name"+"<"+From_email["mail_user"]+"@"+From_email["mail_postfix"]+">"
    msg = MIMEText(content,_subtype='html',_charset='utf-8')
    msg['Subject'] = sub
    msg['From'] = me
    msg['To'] = ";".join(to_list)
    try:
        server = smtplib.SMTP()
        server.connect(From_email["mail_host"])
        server.login(From_email["mail_user"],From_email["mail_pass"])
        server.sendmail(me, to_list, msg.as_string())
        server.close()
        return True
    except Exception, e:
        logger.error(str(e))
        logger.error(str(to_list))
        logger.error(str(sub))
        logger.error(str(content))


def send_mail_thread(mailto_list, subject, msg):
    if send_mail(mailto_list, subject, msg):
        logger.info("send mail success.") 
    else:
        logger.error("send mail fail.")
        logger.error(subject)
        logger.error(msg)


def util_send_email(msg, subject, emails):
    thread.start_new_thread(send_mail_thread, (emails, subject, msg))


if __name__ == '__main__':
    msg = "msg"

    mailto_list=["zzh.coder@qq.com"]
    if send_mail(mailto_list,"hello", msg):
        logger.error("yes")
    else:
        logger.error("no")

使用自建邮箱服务器发送sendmail

安装邮件服务:

apt-get install sendmail sendmail-cf squirrelmail spamassassin mailman mailutils sharutils

贴发送邮件代码:

#!/usr/bin/env python
#@author : zzh.coder@qq.com
#@desc: for mail sending.

import smtplib
import getopt
import sys
import os

from email.MIMEMultipart import MIMEMultipart
from email.MIMEBase import MIMEBase

from email.MIMEText import MIMEText
import email.Encoders as encoders


def send_mail(mail_from, mail_to, subject, msg_txt, files=[]):
    # Create message container - the correct MIME type is multipart/alternative.
    msg = MIMEMultipart('alternative')
    msg['Subject'] = subject
    msg['From'] = mail_from
    msg['To'] = mail_to

    # Create the body of the message (a plain-text and an HTML version).
    #text = msg
    html = msg_txt

    # Record the MIME types of both parts - text/plain and text/html.
    #part1 = MIMEText(text, 'plain')
    part2 = MIMEText(html, 'html')

    # Attach parts into message container.
    # According to RFC 2046, the last part of a multipart message, in this case
    # the HTML message, is best and preferred.
    #msg.attach(part1)
    msg.attach(part2)

    #attachment
    for f in files:
        #octet-stream:binary data
        part = MIMEBase('application', 'octet-stream')
        part.set_payload(open(f, 'rb').read())
        encoders.encode_base64(part)
        part.add_header('Content-Disposition', 'attachment; filename="%s"' % os.path.basename(f))
        msg.attach(part)

    # Send the message via local SMTP server.
    s = smtplib.SMTP('localhost')
    # sendmail function takes 3 arguments: sender's address, recipient's address
    # and message to send - here it is sent as one string.

    mailto_list = mail_to.strip().split(",")
    if len(mailto_list) > 1:
        for mailtoi in mailto_list:
            s.sendmail(mail_from, mailtoi.strip(), msg.as_string())
    else:
        s.sendmail(mail_from, mail_to, msg.as_string())

    s.quit()
    return True


def main():
    files = []
    try:
        opts, args = getopt.getopt(sys.argv[1:], "f:t:s:m:a:")
        for op, value in opts:
            if op == "-f":
                mail_from = value
            elif op == "-t":
                mail_to = value
            elif op == "-s":
                subject = value
            elif op == "-m":
                msg_txt = value
            elif op == "-a":
                files = value.split(",")
    except getopt.GetoptError:
        print(sys.argv[0] + " : params are not defined well!")

    print mail_from, mail_to, subject, msg_txt
    if files:
        send_mail(mail_from, mail_to, subject, msg_txt, files)
    else:
        send_mail(mail_from, mail_to, subject, msg_txt)

if __name__ == "__main__":
    main()

Demo over。

后面的说明:

  • 服务器需要接收邮件并处理等操作此处不说,那是一个比较庞大的问题了
  • 自建服务器也有失败的可能,在配置senmail的时候有很多配置需要了解,这里不做详解,只提供思路
  • 方法2完全可以指定发件箱的地址,在此发件箱去接收那边的回复邮件,手动处理
  • 方法一和二都可以完整的使用邮件基本的所有功能,比如模版,附件等等

jack.zh 标签:Python 继续阅读

1234 ℉

16.01.22

Tomorrow is another day

偶然看见一个好玩的异步代码提供了神奇的装饰器的Python第三方库Tomorrow,查了下他的代码,短小精悍,却包含了很多有意思的特性,这篇文章就是从这个库说开去的。

来个使用的例子:

不使用tomorrow的
import time
import requests

urls = [
    'http://sina.com.cn',
    'http://163.com',
    'http://oschina.net',
    'http://baidu.com',
    'http://csdn.net',
]

def download(url):
    return requests.get(url)

if __name__ == "__main__":

    start = time.time()
    responses = [download(url) for url in urls]
    html = [response.text for response in responses]
    end = time.time()
    print ("Time: %f seconds" % (end - start))

# Time: 2.059337 seconds
tomorrow一下
import time
import requests

from tomorrow import threads

urls = [
    'http://sina.com.cn',
    'http://163.com',
    'http://oschina.net',
    'http://baidu.com',
    'http://csdn.net',
]

@threads(5)
def download(url):
    return requests.get(url)

if __name__ == "__main__":
    start = time.time()
    responses = [download(url) for url in urls]
    html = [response.text for response in responses]
    end = time.time()
    print ("Time: %f seconds" % (end - start))
# Time: 0.303633 seconds

** 2.059337 seconds VS 0.303633 seconds ** 你看的没错,就是这么神奇

为了探明究竟,自古华山一条路,查代码,OMG,代码只有42行,全贴:

from functools import wraps

from concurrent.futures import ThreadPoolExecutor


class Tomorrow():

    def __init__(self, future, timeout):
        self._future = future
        self._timeout = timeout

    def __getattr__(self, name):
        result = self._wait()
        return result.__getattribute__(name)

    def _wait(self):
        return self._future.result(self._timeout)


def async(n, base_type, timeout=None):
    def decorator(f):
        if isinstance(n, int):
            pool = base_type(n)
        elif isinstance(n, base_type):
            pool = n
        else:
            raise TypeError(
                "Invalid type: %s"
                % type(base_type)
            )
        @wraps(f)
        def wrapped(*args, **kwargs):
            return Tomorrow(
                pool.submit(f, *args, **kwargs),
                timeout=timeout
            )
        return wrapped
    return decorator


def threads(n, timeout=None):
    return async(n, ThreadPoolExecutor, timeout)

这里面主要包含了几个方面

  • 第三方库Requests
  • Python装饰器
  • Python的几个基本的Metaclasses() __getattr__ __getattribute__
  • 异步杀器 concurrent.futures

Requests 是使用 Apache2 Licensed 许可证的 HTTP 库。用 Python 编写,真正的为人类着想。在大部分第三方库的推荐中名列前茅。这里不做过多的说明。

Python装饰器是Python编程中的重要组成部分,也是函数式编程的重要概念,我相信我不会比coolshellPython修饰器的函数式编程写的更好,所以想了解的去看这篇文章吧。

Python的Metaclasses是Python编程的重要概念,推荐你去看Python Pocket Reference的Operator Overloading Methods章节,另外说一下,作为手册,就应该像这本书这样写。

我们今天主要说的是concurrent.futures

在Python2里面,concurrent.futures是一个第三方库,在Python3里面已经作为标准库提供了,Python3相关的文档看这里

准备

在Python2环境里,需要安装的依赖库

pip2 install futures
pip2 install requests

Python3环境

pip3 install requests

对于python来说,作为解释型语言,Python的解释器必须做到既安全又高效。我们都知道多线程编程会遇到的问题,解释器要留意的是避免在不同的线程操作内部共享的数据,同时它还要保证在管理用户线程时保证总是有最大化的计算资源。而python是通过使用全局解释器锁来保护数据的安全性: python代码的执行由python虚拟机来控制,即Python先把代码(.py文件)编译成字节码(字节码在Python虚拟机程序里对应的是PyCodeObject对象,.pyc文件是字节码在磁盘上的表现形式),交给字节码虚拟机,然后虚拟机一条一条执行字节码指令,从而完成程序的执行。python在设计的时候在虚拟机中,同时只能有一个线程执行。同样地,虽然python解释器中可以运行多个线程,但在任意时刻,只有一个线程在解释器中运行。而对python虚拟机的访问由全局解释器锁来控制,正是这个锁能保证同一时刻只有一个线程在运行。在多线程的环境中,python虚拟机按一下方式执行:

  • 1,设置GIL(global interpreter lock).
  • 2,切换到一个线程执行。
  • 3,运行:
    • a,指定数量的字节码指令。
    • b,线程主动让出控制(可以调用time.sleep(0))。
  • 4,把线程设置为睡眠状态。
  • 5,解锁GIL.
  • 6,再次重复以上步骤。

GIL的特性,也就导致了python不能充分利用多核cpu。而对面向I/O的(会调用内建操作系统C代码的)程序来说,GIL会在这个I/O调用之前被释放,以允许其他线程在这个线程等待I/O的时候运行。如果线程并为使用很多I/O操作,它会在自己的时间片一直占用处理器和GIL。这也就是所说的:I/O密集型python程序比计算密集型的程序更能充分利用多线程的好处。

总之,不要使用python多线程,使用python多进程进行并发编程,就不会有GIL这种问题存在,并且也能充分利用多核cpu。

GIL的特性,也就导致了python不能充分利用多核cpu。而对面向I/O的(会调用内建操作系统C代码的)程序来说,GIL会在这个I/O调用之前被释放,以允许其他线程在这个线程等待I/O的时候运行。如果线程并为使用很多I/O操作,它会在自己的时间片一直占用处理器和GIL。这也就是所说的:I/O密集型python程序比计算密集型的程序更能充分利用多线程的好处。 总之,不要使用python多线程,使用python多进程进行并发编程,就不会有GIL这种问题存在,并且也能充分利用多核cpu。

concurrent.futures提供的功能:

提供了多线程(ThreadPoolExecutor)和多进程(ProcessPoolExecutor)的并发功能

concurrent.futures基本方法:

class   concurrent.futures.Executor
#Executor为ThreadPoolExecutor或者ProcessPoolExecutor
concurrent.futures提供的方法如下:

1. submit(fn, *args, **kwargs)

fn:为需要异步执行的函数

args,kwargs:为给函数传递的参数

例:

#!/bin/env python
#coding:utf-8
import time,re
import os,datetime
from concurrent import futures

def wait_on_b():
    print 5
    time.sleep(2)

def wait_on_a():
    print 6
    time.sleep(2)

ex = futures.ThreadPoolExecutor(max_workers=2)
ex.submit(wait_on_b)
ex.submit(wait_on_a)
#wait_on_a和wait_on_b函数会同时执行,因为使用了2个worker

2. map(func, *iterables, timeout=None)

此map函数和python自带的map函数功能类似,只不过concurrent模块的map函数从迭代器获得参数后异步执行。并且,每一个异步操作,能用timeout参数来设置超时时间,timeout的值可以是int或float型,如果操作timeout的话,会raisesTimeoutError。如果timeout参数不指定的话,则不设置超时间。

func:为需要异步执行的函数

iterables:可以是一个能迭代的对象,例如列表等。每一次func执行,会从iterables中取参数。

timeout:设置每次异步操作的超时时间

例:

#!/bin/env python
#coding:utf-8
import time,re
import os,datetime
from concurrent import futures
data = ['1','2']

def wait_on(argument):
    print argument
    time.sleep(2)
    return 'ok'

ex = futures.ThreadPoolExecutor(max_workers=2)
for i in ex.map(wait_on,data):
    print i

map函数异步执行完成之后,结果也是list,数据需要从list中取出

submit函数和map函数,根据需要,选一个使用即可。

3. shutdown(wait=True)

此函数用于释放异步执行操作后的系统资源。

一个完整的concurrent例子

#!/bin/env python
#coding:utf-8

import time,re,fcntl
import os,datetime

from concurrent import futures

count_list = list()
minute_num = 1
start_time = datetime.datetime(2016, 1, 22, 13, 30, 0, 484870)
now = datetime.datetime.now()
os.system(':>new.txt')
f_new = open('new.txt','a')

def test(count_time_format):
    f = open('push_slave.stdout','r')
    for line in f.readlines():
        if re.search(count_time_format,line):
            #获得文件专用锁
            fcntl.flock(f_new, fcntl.LOCK_EX)
            f_new.writelines(line)
            f_new.flush()

            #释放文件锁
            fcntl.flock(f_new, fcntl.LOCK_UN)
            break

while 1:
    after_one_minute = datetime.timedelta(minutes=minute_num)
    count_time = after_one_minute + start_time
    count_time_format = count_time.strftime('%Y-%m-%d %H:%M')
    minute_num = minute_num+1
    count_list.append(count_time_format)
    if count_time_format == "2014-04-23 16:00":
        break

def exec_cmd():
    with futures.ProcessPoolExecutor(max_workers=24) as executor:
        dict((executor.submit(test, times), times) for times in count_list)

if __name__ == '__main__':
    exec_cmd()
    f_new.close()

收工,有时间比较一下 futuresmultiprocessing

jack.zh 标签:Python 继续阅读

4 ℃

2443 ℉

15.10.30

Python的优雅技巧(不断添加)

枚举

不要这么做:

i = 0 
for item in iterable: 
    print i, item 
    i += 1

而是这样:

for i, item in enumerate(iterable):    
    print i, item

Enumerate可以接受第二个参数,例如:

>>> list(enumerate('abc')) 
[(0, 'a'), (1, 'b'), (2, 'c')]
>>> list(enumerate('abc', 1)) 
[(1, 'a'), (2, 'b'), (3, 'c')]

字典/集合 解析

你可能知道列表解析,但不知道字典/集合解析。字典/集合解析简单而且高效,例如:

my_dict = {i: i * i for i in xrange(100)} 
my_set = {i * 15 for i in xrange(100)}

#There is only a difference of ':' in both

浮点数除法

如果我们除以一个整数,即使结果是一个浮点数,Python(2) 依旧会给我们一个整数。为了规避这个问题,我们需要这样做:

result = 1.0/2

但是现在有一种别的方法可以解决这个问题,甚至在之前我都没有意识到有这种方法存在。你可以进行如下操作:

from __future__ import division 
result = 1/2

#print(result)
#0.5

需要注意的是这个窍门只适用于Python 2。在Python 3 中就不需要进行import 操作了,因为它已经默认进行import了。

简单的服务器

你想快速简单的分享目录下的文件吗?可以这样做:

#Python2
python -m SimpleHTTPServer

#Python 3
python3 -m http.server

这回启动一个服务器

Python表达式求值

我们都知道eval,但也许并不是所有人都知道literal_eval.可以这么做:

import ast 
my_list = ast.literal_eval(expr)

而不是这样:

expr = "[1, 2, 3]" 
my_list = eval(expr)

我相信对于大多数人来说这种形式是第一次看见,但是实际上这个在Python中已经存在很长时间了。

分析脚本

按下面的方式运行脚本,可以很简单的对其进行分析:

python -m cProfile my_script.py

对象自检

在Python中,可以通过dir()来检查对象,例如:

>>> foo = [1, 2, 3, 4]
>>> dir(foo) 
['__add__', '__class__', '__contains__', 
'__delattr__', '__delitem__', '__delslice__', ... , 
'extend', 'index', 'insert', 'pop', 'remove', 
'reverse', 'sort']

调试脚本

你可以使用pdb模块在脚本中设置断点来调试脚本,就像这样:

import pdb
pdb.set_trace()

你可以在脚本的任何地方加入pdb.set_trace(),该函数会在那个位置设置一个断点。超级方便。你应该多阅读pdb 函数的相关内容,因为在它里面还有很多鲜为人知的功能。

简化if结构

如果必须检查一些值,可以用

if n in [1,4,5,6]:

而不是用复杂的if结构:

if n==1 or n==4 or n==5 or n==6:

字符串/数列 逆序

下面的方式可以快速反转一个列表:

>>> a = [1,2,3,4]
>>> a[::-1]
[4, 3, 2, 1]

#This creates a new reversed list. 
#If you want to reverse a list in place you can do:

a.reverse()

这种方式同样适用于字符串:

>>> foo = "yasoob"
>>> foo[::-1]
'boosay'

优雅地打印

下面的方式可以用优雅的方式打印字典和列表:

from pprint import pprint 
pprint(my_dict)

这用于字典打印是非常高效的,如果你想从文件中快速优雅的打印出json,可以这样做:

cat file.json | python -m json.tools

三元运算

三元运算是if-else 语句的快捷操作,也被称为条件运算。这里有几个例子可以供你参考:

[on_true] if [expression] else [on_false]
x, y = 50, 25
small = x if x < y else y

优化算法时间复杂度

算法的时间复杂度对程序的执行效率影响最大,在Python中可以通过选择合适的数据结构来优化时间复杂度,如list和set查找某一个元素的时间复杂度分别是O(n)和O(1)。不同的场景有不同的优化方式,总得来说,一般有分治,分支界限,贪心,动态规划等思想。

减少冗余数据

如用上三角或下三角的方式去保存一个大的对称矩阵。在0元素占大多数的矩阵里使用稀疏矩阵表示。

合理使用copy与deepcopy

对于dict和list等数据结构的对象,直接赋值使用的是引用的方式。而有些情况下需要复制整个对象,这时可以使用copy包里的copy和deepcopy,这两个函数的不同之处在于后者是递归复制的。效率也不一样:(以下程序在ipython中运行)

import copy
a = range(100000)
%timeit -n 10 copy.copy(a) # 运行10次 copy.copy(a)
%timeit -n 10 copy.deepcopy(a)
10 loops, best of 3: 1.55 ms per loop
10 loops, best of 3: 151 ms per loop

timeit后面的-n表示运行的次数,后两行对应的是两个timeit的输出,下同。由此可见后者慢一个数量级。

使用dict或set查找元素

python dict和set都是使用hash表来实现(类似c++11标准库中unordered_map),查找元素的时间复杂度是O(1)

a = range(1000)
s = set(a)
d = dict((i,1) for i in a)
%timeit -n 10000 100 in d
%timeit -n 10000 100 in s
10000 loops, best of 3: 43.5 ns per loop
10000 loops, best of 3: 49.6 ns per loop

dict`的效率略高(占用的空间也多一些)。

合理使用生成器(generator)和yield

%timeit -n 100 a = (i for i in range(100000))
%timeit -n 100 b = [i for i in range(100000)]
100 loops, best of 3: 1.54 ms per loop
100 loops, best of 3: 4.56 ms per loop

使用()得到的是一个generator对象,所需要的内存空间与列表的大小无关,所以效率会高一些。在具体应用上,比如set(i for i in range(100000))会比set([i for i in range(100000)])快。

但是对于需要循环遍历的情况:

%timeit -n 10 for x in (i for i in range(100000)): pass
%timeit -n 10 for x in [i for i in range(100000)]: pass
10 loops, best of 3: 6.51 ms per loop
10 loops, best of 3: 5.54 ms per loop

后者的效率反而更高,但是如果循环里有break,用generator的好处是显而易见的。yield也是用于创建generator:

def yield_func(ls):
    for i in ls:
        yield i+1

def not_yield_func(ls):
    return [i+1 for i in ls]

ls = range(1000000)
%timeit -n 10 for i in yield_func(ls):pass
%timeit -n 10 for i in not_yield_func(ls):pass
10 loops, best of 3: 63.8 ms per loop
10 loops, best of 3: 62.9 ms per loop

对于内存不是非常大的list,可以直接返回一个list,但是可读性yield更佳(人个喜好)。

python2.x内置generator功能的有xrange函数、itertools包等。

优化循环

循环之外能做的事不要放在循环内,比如下面的优化可以快一倍:

a = range(10000)
size_a = len(a)
%timeit -n 1000 for i in a: k = len(a)
%timeit -n 1000 for i in a: k = size_a
1000 loops, best of 3: 569 µs per loop
1000 loops, best of 3: 256 µs per loop

优化包含多个判断表达式的顺序

对于and,应该把满足条件少的放在前面,对于or,把满足条件多的放在前面。如:

a = range(2000)  
%timeit -n 100 [i for i in a if 10 &lt; i &lt; 20 or 1000 &lt; i &lt; 2000]
%timeit -n 100 [i for i in a if 1000 &lt; i &lt; 2000 or 100 &lt; i &lt; 20]     
%timeit -n 100 [i for i in a if i % 2 == 0 and i &gt; 1900]
%timeit -n 100 [i for i in a if i &gt; 1900 and i % 2 == 0]
100 loops, best of 3: 287 µs per loop
100 loops, best of 3: 214 µs per loop
100 loops, best of 3: 128 µs per loop
100 loops, best of 3: 56.1 µs per loop

使用join合并迭代器中的字符串

In [1]: %%timeit
   ...: s = ''
   ...: for i in a:
   ...:         s += i
   ...:
10000 loops, best of 3: 59.8 µs per loop

In [2]: %%timeit
s = ''.join(a)
   ...:
100000 loops, best of 3: 11.8 µs per loop

join对于累加的方式,有大约5倍的提升。

选择合适的格式化字符方式

s1, s2 = 'ax', 'bx'
%timeit -n 100000 'abc%s%s' % (s1, s2)
%timeit -n 100000 'abc{0}{1}'.format(s1, s2)
%timeit -n 100000 'abc' + s1 + s2
100000 loops, best of 3: 183 ns per loop
100000 loops, best of 3: 169 ns per loop
100000 loops, best of 3: 103 ns per loop

三种情况中,%的方式是最慢的,但是三者的差距并不大(都非常快)。(个人觉得%的可读性最好)

不借助中间变量交换两个变量的值

In [3]: %%timeit -n 10000
    a,b=1,2
   ....: c=a;a=b;b=c;
   ....:
10000 loops, best of 3: 172 ns per loop

In [4]: %%timeit -n 10000
a,b=1,2
a,b=b,a
   ....:
10000 loops, best of 3: 86 ns per loop

使用a,b=b,a而不是c=a;a=b;b=c;来交换a,b的值,可以快1倍以上。

使用if is

a = range(10000)
%timeit -n 100 [i for i in a if i == True]
%timeit -n 100 [i for i in a if i is True]
100 loops, best of 3: 531 µs per loop
100 loops, best of 3: 362 µs per loop

使用 if is Trueif == True 将近快一倍。

使用级联比较x &lt; y &lt; z

x, y, z = 1,2,3
%timeit -n 1000000 if x &lt; y &lt; z:pass
%timeit -n 1000000 if x &lt; y and y &lt; z:pass
1000000 loops, best of 3: 101 ns per loop
1000000 loops, best of 3: 121 ns per loop

x > y > z效率略高,而且可读性更好。

while 1while True 更快

def while_1():
    n = 100000
    while 1:
        n -= 1
        if n &lt;= 0: break
def while_true():
    n = 100000
    while True:
        n -= 1
        if n &lt;= 0: break    

m, n = 1000000, 1000000 
%timeit -n 100 while_1()
%timeit -n 100 while_true()
100 loops, best of 3: 3.69 ms per loop
100 loops, best of 3: 5.61 ms per loop

while 1 比 while true快很多,原因是在python2.x中,True是一个全局变量,而非关键字。

使用**而不是pow

%timeit -n 10000 c = pow(2,20)
%timeit -n 10000 c = 2**20
10000 loops, best of 3: 284 ns per loop
10000 loops, best of 3: 16.9 ns per loop

**就是快10倍以上!

## 使用 cProfile, cStringIO 和 cPickle等用c实现相同功能(分别对应profile, StringIO, pickle)的包

import cPickle
import pickle
a = range(10000)
%timeit -n 100 x = cPickle.dumps(a)
%timeit -n 100 x = pickle.dumps(a)
100 loops, best of 3: 1.58 ms per loop
100 loops, best of 3: 17 ms per loop

由c实现的包,速度快10倍以上!

使用最佳的反序列化方式

下面比较了eval, cPickle, json方式三种对相应字符串反序列化的效率:

import json
import cPickle
a = range(10000)
s1 = str(a)
s2 = cPickle.dumps(a)
s3 = json.dumps(a)
%timeit -n 100 x = eval(s1)
%timeit -n 100 x = cPickle.loads(s2)
%timeit -n 100 x = json.loads(s3)
100 loops, best of 3: 16.8 ms per loop
100 loops, best of 3: 2.02 ms per loop
100 loops, best of 3: 798 µs per loop

可见json比cPickle快近3倍,比eval快20多倍。

使用C扩展(Extension)

目前主要有CPython(python最常见的实现的方式)原生API, ctypes,Cython,cffi三种方式,它们的作用是使得Python程序可以调用由C编译成的动态链接库,其特点分别是:

CPython原生API: 通过引入Python.h头文件,对应的C程序中可以直接使用Python的数据结构。实现过程相对繁琐,但是有比较大的适用范围。

ctypes: 通常用于封装(wrap)C程序,让纯Python程序调用动态链接库(Windows中的dll或Unix中的so文件)中的函数。如果想要在python中使用已经有C类库,使用ctypes是很好的选择,有一些基准测试下,python2+ctypes是性能最好的方式。

Cython: Cython是CPython的超集,用于简化编写C扩展的过程。Cython的优点是语法简洁,可以很好地兼容numpy等包含大量C扩展的库。Cython的使得场景一般是针对项目中某个算法或过程的优化。在某些测试中,可以有几百倍的性能提升。

cffi: cffi的就是ctypes在pypy(详见下文)中的实现,同进也兼容CPython。cffi提供了在python使用C类库的方式,可以直接在python代码中编写C代码,同时支持链接到已有的C类库。

使用这些优化方式一般是针对已有项目性能瓶颈模块的优化,可以在少量改动原有项目的情况下大幅度地提高整个程序的运行效率。

并行编程

因为GIL的存在,Python很难充分利用多核CPU的优势。但是,可以通过内置的模块multiprocessing实现下面几种并行模式:

多进程:对于CPU密集型的程序,可以使用multiprocessing的Process,Pool等封装好的类,通过多进程的方式实现并行计算。但是因为进程中的通信成本比较大,对于进程之间需要大量数据交互的程序效率未必有大的提高。

多线程:对于IO密集型的程序,multiprocessing.dummy模块使用multiprocessing的接口封装threading,使得多线程编程也变得非常轻松(比如可以使用Pool的map接口,简洁高效)。

分布式:multiprocessing中的Managers类提供了可以在不同进程之共享数据的方式,可以在此基础上开发出分布式的程序。

不同的业务场景可以选择其中的一种或几种的组合实现程序性能的优化。

终级大杀器:PyPy

PyPy是用RPython(CPython的子集)实现的Python,根据官网的基准测试数据,它比CPython实现的Python要快6倍以上。快的原因是使用了Just-in-Time(JIT)编译器,即动态编译器,与静态编译器(如gcc,javac等)不同,它是利用程序运行的过程的数据进行优化。由于历史原因,目前pypy中还保留着GIL,不过正在进行的STM项目试图将PyPy变成没有GIL的Python。

如果python程序中含有C扩展(非cffi的方式),JIT的优化效果会大打折扣,甚至比CPython慢(比Numpy)。所以在PyPy中最好用纯Python或使用cffi扩展。

随着STM,Numpy等项目的完善,相信PyPy将会替代CPython。

使用性能分析工具

除了上面在ipython使用到的timeit模块,还有cProfile。cProfile的使用方式也非常简单: python -m cProfile filename.pyfilename.py 是要运行程序的文件名,可以在标准输出中看到每一个函数被调用的次数和运行的时间,从而找到程序的性能瓶颈,然后可以有针对性地优化。

jack.zh 标签:Python 继续阅读

1
Fork me on GitHub