流畅的Python笔记(待完成)
by wqpw
TODO: 看第二版然后重新整理
第一章
# python docs <Data Model>
# magic/dunder method
# example 1-1 牌
from collections import namedtuple
from random import choice
Card = namedtuple('Card', ['rank', 'suit'])
class FrenchDeck:
ranks = [str(n) for n in range(2, 11)] + list('JQKA')
suits = 'spades diamonds clubs hearts'.split()
def __init__(self) -> None:
self._cards = [Card(rank, suit) for suit in self.suits for rank in self.ranks]
def __len__(self):
return len(self._cards)
def __getitem__(self, position):
return self._cards[position]
deck = FrenchDeck()
print(f'总共有{len(deck)}张牌.')
for i in range(3): print(choice(deck))
print(deck[12::13]) # A
print(Card('Q', 'hearts') in deck) # 没有实现__contains__默认顺序查找
# python内置类型调用len()实际上返回PyVarObject的ob_size属性
# 特殊方法调用一般是隐式的 for i in x: 和 x.__iter__()
# example 1-2 二维向量
from math import hypot
class Vector:
def __init__(self, x=0, y=0) -> None:
self.x = x
self.y = y
def __repr__(self) -> str:
return f'Vector({self.x:.2f}, {self.y:.2f})'
def __abs__(self):
return hypot(self.x, self.y)
def __bool__(self):
return bool(self.x or self.y)
def __add__(self, other): # 中缀运算符原则:不改变操作对象
x = self.x + other.x
y = self.y + other.y
return Vector(x, y)
def __mul__(self, scalar): #交换律: __rmul__
return Vector(self.x * scalar, self.y * scalar)
v1 = Vector(2, 4)
v2 = Vector(2, 1)
print(f'v1+v2={v1+v2}', abs(v1), v2*3)
# __str__在str()或print时被使用,__repr__可以代替__str__
# __repr__ 调试、日志 __str__ 终端用户
第二章
# 列表推导(listcomps)和生成器表达式(genexps)
# 列表推导只用来创建新的列表并保持简短;python会忽略[], {}, () 中的换行
print([ord(s) for s in '测试abcd' if ord(s) > 127])
print([(a, b) for a in '123' for b in 'abc'])
# 生成器表达式逐个产出元素,节省内存
print(tuple(ord(s) for s in 'abcd'))
import array
from unicodedata import name
print(array.array('I', (ord(s) for s in 'qwe')))
# 元组 拆包应用
city, year, pop = ('a', '1919', 114514)
ids = [('a', 'b'), ('c', 'd')]
print('%s/%s' % ids[0])
for q, _ in ids: print(q)
#交换 b, a = a, b
t = (20, 8)
print(divmod(*t))
# 平行赋值
a, b, *rest = range(5)
print((a, b, rest))
a, *b, c = range(5)
print((a, b, c))
# 具名元组
from collections import namedtuple
City = namedtuple('City', 'n c p o')
t = City('t', 'j', 3, (3, 3))
print((t.n, t[2]))
print(City._fields)
d = City._make(('a', 'b', 4, (1, 1)))
for k, v in d._asdict().items(): print(k + ':', v)
# 切片 seq[start:stop:step] == seq.__getitem__(slice(start, stop, step))
l = list(range(10))
print(l)
l[2:5] = [20, 30]; print(l)
del l[5:7]; print(l)
l[3::2] = [11, 22]; print(l)
l[2:5] = [100]; print(l)
# * + +=
print([1] * 5); q = [[]] * 3; q[0].append(1); print(q)
print([['_'] * 3 for _ in range(3)])
l = [1, 2]; print(id(l)); l *= 2; print(id(l))
l = (1, 2); print(id(l)); l *= 2; print(id(l)) # 不可变类型重复拼接效率低
try:
t = (1, 2, [30, 40])
__import__('dis').dis('t[2] += [50, 60]')
t[2] += [50, 60]
except Exception as e:
print(e)
print(t)
# 排序 list.sort会就地排序列表,返回None; sorted接受任何形式的可迭代对象返回list
# python的timsort算法是稳定的
print(sorted(('a', 'aa', 'bbb', 'aaaa'), key=len, reverse=True))
# bisect; bisect_left/right insort保持顺序插入
import bisect
def grade(score, bps=[60, 70, 80, 90], grades='FDCBA'):
i = bisect.bisect(bps, score)
return grades[i]
print([grade(score) for score in [33, 99, 77, 70, 89, 90, 100]])
# 只包含数字的列表用array.array更高效,且array.tofile/fromfile速度比文本读取转换更快,pickle.dump和array.tofile差不多快
a = array.array('d', (__import__('random').random() for _ in range(4)))
a= array.array(a.typecode, sorted(a))
print(a)
# memoryview 让用户在不复制内存的情况下操作同一个数组的不同切片
n = array.array('h', [-2, -1, 0, 1, 2])
memv = memoryview(n)
memv_o = memv.cast('B')
print(memv_o.tolist())
memv_o[5] = 4 #00000000 00000000->00000100 00000000
print(n)
# 双端队列 collections.deque 线程安全
# queue提供同步(线程安全)类Queue, LifoQueue, PriorityQueue
# multiprocessing, asyncio, heapq 提供类似队列类
第三章
'''
标准库的所有映射类型都是利用`dict`实现的, 只有可散列的数据类型才能作为这些映射的键.
如果一个对象是可散列的, 则这个对象的生命周期中它的散列值不变, 且它需要实现__hash__()方法和__eq__()方法
'''
# 字典
a = dict(one=1, two=2, three=3)
b = {'one':1, 'two':2, 'three':3}
c = dict(zip(['one', 'two', 'three'], [1, 2, 3]))
d = dict([('two', 2), ('one', 1), ('three', 3)])
print(a==b==c==d)
# 字典推导
dc = [(86, 'China'), (55, 'Brazil')]
a = {c: n for n, c in dc}
b = {n: c.upper() for c, n in a.items()}
print(a, b)
# setdefault
my_d = {}
my_d.setdefault('a', []).append(123)
if 'b' not in my_d:
my_d['b'] = []
my_d['b'].append(456)
print(my_d)
# defaultdict更新字典里存放的可变值 update可以批量更新
from collections import defaultdict
my_d = defaultdict(list)
my_d['a'].append(111) #list在__getitem__里调用,my_d.get('c')会返回None
my_d['b'].append(222)
print(my_d)
# __missing__方法,在__getitem__找不到键时调用
# example 3-7 把非字符串的键转换成字符串
class StrKeyDict0(dict):
def __missing__(self, key):
if isinstance(key, str): # 防止递归调用
raise KeyError(key)
return self[str(key)]
def get(self, key, default=None):
try:
return self[key]
except KeyError:
return default
def __contains__(self, key):
return key in self.keys() or str(key) in self.keys() # 防止递归调用
my_d = StrKeyDict0()
my_d['123'] = '456'
print(my_d[123])
'''
from collections import *。
OrderedDict在添加键时会保持顺序, ChainMap可以容纳数个不同的映射对象并当作整体查键, Counter给每个键一个计数器
UserDict就是把dict用纯python实现了一遍
'''
ct = __import__('collections').Counter('abracadabra')
print(ct)
ct.update('abcdefg'); ct += {'q':1}
print(ct, ct.most_common(3))
import collections
class StrKeyDict(collections.UserDict):
def __missing__(self, key):
if isinstance(key, str):
raise KeyError(key)
return self[str(key)]
def __contains__(self, key):
return str(key) in self.data
def __setitem__(self, key, item):
self.data[str(key)] = item # self.data是dict,真正保存数据的地方
my_d = StrKeyDict()
my_d['1'] = 'aaa'
print(my_d[1])
# 获得字典的只读实例
from types import MappingProxyType
d = {1:'A'}
d_proxy = MappingProxyType(d)
print(d_proxy[1])
#集合
S1 = set(range(3))
S2 = set(range(6))
print(S1 | S2, S1 & S2, S2 - S1, S1 ^ S2) # 并交差对称差 其他:> < >= <= ......
print(len(S1 & S2), len(S1.intersection(S2))) # 省去不必要的循环和判断
print(type({1, 2, 3}))
print(frozenset(range(3))) # 不可变集合
# 集合推导
print({i for i in range(100, 1000) if (i%10)**3+(i//10%10)**3+(i//100)**3==i})
'''
一个可散列的对象必须: (1) __hash__()值不变 (2) 支持__eq__() (3) a == b 则 hash(a) == hash(b)
因为使用散列表实现所以dict内存开销巨大, 存放大量记录考虑元组和元组构成列表; 键查询很快;
键的次序取决于添加顺序, 加新键可能会改变原顺序(解决冲突), 所以不要同时进行迭代和修改.
set与dict同理
'''
第四章
# 把码位转换成字节序列的过程是编码,把字节序列转换成码位的过程是解码
s = '鐢蹭箼涓欎竵'
print(s, len(s))
b = s.encode('gbk')
print(b, b.decode('utf-8'))
print((u'\uFFFD'.encode('utf-8')*2).decode('gbk'))
# bytes, bytearray各个元素为0-255之间整数
s = bytes('啊', encoding='utf-8')
print(s, s[0], s[:1], end=' ') # s[0] == s[:1] 只对str成立
sa = bytearray(s)
print(sa[-1:])
print(b'select'.hex(':'))
print(b''.fromhex('1111'))
# struct, memoryview, mmap 等处理二进制文件
fmt = '<3s3sHH'
header = memoryview(b'GIF89a+\x02\xe6\x00')
print(__import__('struct').unpack(fmt, header))
del header
# UnicodeEncodeError, UnicodeDecodeError 处理
c = b'S\xc3\xa3o Paulo'.decode('utf8')
print(c, c.encode('cp437', errors='xmlcharrefreplace'), c.encode('cp437', errors='replace'))
# 可通过codecs.register_error拓展
# BOM(byte-order mark) \xff\xfe指明编码使用intel cpu小字节序
print('啊'.encode('utf_16')) # U+FEFF ZERO WIDTH NO-BREAK SPACE
# 显式使用utf-16le, utf-16be不加bom
# 文本处理 bytes->str->bytes
# 在多台设备或多种场合下运行的代码,一定不能依赖默认编码,打开文件要始终明确encoding=
import sys, locale
# 打开文件 和 sys.stdout/in/err重定向到文件 时的默认编码 locale.getpreferredencodin()
# 编解码文件名 sys.getfilesystemencoding()
exprs = '''
locale.getpreferredencoding()
type(my_file)
my_file.encoding
sys.stdout.isatty()
sys.stdout.encoding
sys.stdin.isatty()
sys.stdin.encoding
sys.stderr.isatty()
sys.stderr.encoding
sys.getdefaultencoding()
sys.getfilesystemencoding()
'''
my_file = open('dummy', 'w')
for e in exprs.split():
value = eval(e)
print(e.rjust(30), '->', repr(value))
# 规范化 大小写折叠
from unicodedata import normalize
s1 = 'café'
s2 = 'cafe\u0301'
print((s1, s2), (len(s1), len(s2)), s1 == s2)
print(normalize('NFC', s1) == normalize('NFC', s2))
def nfc_equal(s1, s2):
return normalize('NFC', s1) == normalize('NFC', s2)
def fold_equal(s1, s2):
return (normalize('NFC', s1).casefold() == normalize('NFC', s2).casefold())
# 排序 有专用库PyUCA
locale.setlocale(locale.LC_COLLATE, 'zh_CN.UTF-8')
q = ['不', '啊', '嗯', '和', '则']
print(sorted(q, key=locale.strxfrm))
# re和os模块中一些函数对于b''和''参数行为不同,双模式API
import os
print(os.listdir('.')[0], os.listdir(b'.')[0])
第五章
'''
函数是一等对象. (1)在运行时创建 (2)能赋值给变量或数据结构中的元素 (3)能作为参数传给函数 (4)能作为函数的返回结果
'''
def foo():
'''bar'''
return '?'
a = foo
print(foo.__doc__, foo(), type(foo), a())
# 接受函数为参数,或者把函数作为结果返回的函数是`高阶函数`
# map, filter有生成器表达式代替; reduce在functools模块
print(all([1,0]), any([0,0,0,1]))
# 语法限制lambda函数定义体只能使用纯表达式
# 可调用对象: 用户定义函数, 内置函数, 内置方法, 方法, 类(运行__new__->__init__返回实例), 类的实例(定义了__call__方法), 生成器函数
# 可用callable()检测
import random
class BingoCage:
def __init__(self, items):
self._items = list(items)
random.shuffle(self._items)
def pick(self):
try:
return self._items.pop()
except IndexError:
raise LookupError('pick from empty BingoCage')
def __call__(self):
return self.pick()
bingo = BingoCage(range(10))
print(bingo(), bingo(), bingo())
def f() : f.test = '123'
f()
print(f.__dict__)
class C: pass
obj = C()
def func(): pass
print(sorted(set(dir(func)) - set(dir(C))))
'''
__annotations__ dict 参数和返回值注解
__call__ method-wrapper 实现(), 可调用对象协议
__closure__ tuple 函数闭包, 即自由变量的绑定, 通常是None
__code__ code 编译成字节码的函数元数据和定义体
__defaults__ tuple 形参默认值
__get__ method-wrapper 实现只读描述符协议
__globals__ dict 所在模块中的全局变量
__kwdefaults__ dict 仅限关键字形式参数的默认值
__name__, __qualname__ str 函数名/函数限定名称
'''
# keyword-only argument. 放在*后面, cls只能通过关键字指定
def tag(name:str, *content:'参数content', cls=None, **attrs) -> str:
if cls is not None:
attrs['class'] = cls
if attrs:
attr_str = ''.join(' %s="%s"' % (attr, value)
for attr, value in sorted(attrs.items()))
else:
attr_str = ''
if content:
return '\n'.join('<%s%s>%s</%s>' % (name, attr_str, c, name) for c in content)
else:
return '<%s%s />' % (name, attr_str)
print(tag('br'), tag('p', 'hello', 'world', id=1, cls='p-test'))
my_tag = {'name':'img', 'title':'t', 'src':'/', 'cls':'im'}
# dict中所有元素作为单个参数传入,同名键会绑定到对应的具名参数上,余下的被**attrs捕获
print(tag(**my_tag))
from inspect import signature
sig = signature(tag)
print(sig)
print(sig.return_annotation)
print(tag.__annotations__)
# operator
from functools import reduce, partial
from operator import mul, itemgetter, attrgetter, methodcaller
def fact(n):
return reduce(mul, range(1, n + 1)) #避免写 lambda a, b: a*b
print(fact(5))
a = [(1, 2), (2, 1), (3, 4), (4,3)]
print(sorted(a, key=itemgetter(1))) #lambda x: x[1]
b = itemgetter(1, 0)
c = attrgetter('__dict__')
print(b('12'), c(f))
deal1 = methodcaller('replace', ' ', '-') # `冻结`参数
print(deal1('hello world'))
triple = partial(mul, 3)
print(triple(7), list(map(triple, range(1, 10))))
s1 = 'café'
s2 = 'cafe\u0301'
nfc = partial(__import__('unicodedata').normalize, 'NFC')
print((s1, s2), s1 == s2, nfc(s1) == nfc(s2))
picture = partial(tag, 'img', cls='pic-frame')
print(picture(src='1.jpg'), picture)
print(picture.func, picture.args, picture.keywords)
# functools.partialmethod用于处理方法
第六章
# 策略模式
from abc import ABC, abstractmethod
from collections import namedtuple
Customer = namedtuple('Customer', 'name fidelity')
class Promotion(ABC): # 策略 抽象基类
@abstractmethod
def discount(self, order):
"""返回折扣额"""
class FidelityPromo(Promotion): # 策略一
def discount(self, order):
return order.total() * .05 if order.customer.fidelity >= 1000 else 0
class BulkItemPromo(Promotion): # 策略二
def discount(self, order):
discount = 0
for item in order.cart:
if item.quanlity >= 20:
discount += item.total() * .1
return discount
class LargeOrderPromo(Promotion): # 策略三
def discount(self, order):
distinct_items = {item.product for item in order.cart}
if len(distinct_items) >= 10:
return order.total() * .07
return 0
class LineItem:
def __init__(self, product, quanlity, price):
self.product = product
self.quanlity = quanlity
self.price = price
def total(self):
return self.price * self.quanlity
class Order: # 上下文
def __init__(self, customer, cart, promotion: Promotion=None):
self.customer = customer
self.cart = list(cart)
self.promotion = promotion
def total(self):
if not hasattr(self, '__total'):
self.__total = sum(item.total() for item in self.cart)
return self.__total
def due(self):
if self.promotion is None:
discount = 0
else:
discount = self.promotion.discount(self)
return self.total() - discount
def __repr__(self):
fmt = '<Order total: {:.2f} due: {:.2f}>'
return fmt.format(self.total(), self.due())
ann = Customer('Ann', 1100)
cart = [LineItem('banana', 40, .5), LineItem('apple', 20, 1.5), LineItem('watermellon', 5, 5.0)]
cart2 = [LineItem(str(i), 1, 1.0) for i in range(20)]
print(Order(ann, cart, FidelityPromo()))
print(Order(ann, cart, BulkItemPromo()))
print(Order(ann, cart2, FidelityPromo()))
print(Order(ann, cart2, BulkItemPromo()))
print(Order(ann, cart2, LargeOrderPromo()))
# 使用函数实现策略模式
class Order: # 上下文
def __init__(self, customer, cart, promotion=None):
self.customer = customer
self.cart = list(cart)
self.promotion = promotion
def total(self):
if not hasattr(self, '__total'):
self.__total = sum(item.total() for item in self.cart)
return self.__total
def due(self):
if self.promotion is None:
discount = 0
else:
discount = self.promotion(self) #
return self.total() - discount
def __repr__(self):
fmt = '<Order total: {:.2f} due: {:.2f}>'
return fmt.format(self.total(), self.due())
# 策略一般没有内部状态
def fidelity_promo(order):
return order.total() * .05 if order.customer.fidelity >= 1000 else 0
def bulk_item_promo(order): # 策略二
discount = 0
for item in order.cart:
if item.quanlity >= 20:
discount += item.total() * .1
return discount
def large_order_promo(order): # 策略三
distinct_items = {item.product for item in order.cart}
if len(distinct_items) >= 10:
return order.total() * .07
return 0
print('')
print(Order(ann, cart, fidelity_promo))
print(Order(ann, cart, bulk_item_promo))
print(Order(ann, cart2, fidelity_promo))
print(Order(ann, cart2, bulk_item_promo))
print(Order(ann, cart2, large_order_promo))
# 最佳策略 添加新策略时注意加入promos
promos = [fidelity_promo, bulk_item_promo, large_order_promo]
def best_promo(order):
return max(promo(order) for promo in promos)
print(Order(ann, cart2, best_promo))
# 自动加入promos
promos = [globals()[name] for name in globals() if name.endswith('_promo') and name != 'best_promo']
'''
用promotions模块单独保存所有策略函数
promos = [func for name, func in inspect.getmembers(promotions, inspect.isfunction)]
'''
#使用函数或可调用对象实现回调更自然
class MacroCommand: #命令模式
def __init__(self, commands):
self.commands = list(commands)
def __call__(self):
for command in self.commands:
command()
第七章
"""装饰器和闭包
# 装饰器是可调用的对象, 其参数是另一个函数(被装饰的函数). 装饰器可能会处理被装饰的函数, 然后把它返回, 或者
# 将其替换成另一个函数或可调用对象.
# 装饰器在被装饰函数定义之后立即运行, 通常是在导入(加载模块)时; 被装饰函数只在明确调用时运行
"""
from hashlib import new
def deco(func):
def inner():
print('running inner()')
return inner
@deco
def target():
print('running target()')
# @deco 等价于 target = deco(target)
target()
# 改进策略模式
promos = []
def promotion(promo_func):
promos.append(promo_func)
return promo_func
@promotion
def fidelity_promo(order):
return order.total() * .05 if order.customer.fidelity >= 1000 else 0
@promotion
def bulk_item_promo(order): # 策略二
discount = 0
for item in order.cart:
if item.quanlity >= 20:
discount += item.total() * .1
return discount
@promotion
def large_order_promo(order): # 策略三
distinct_items = {item.product for item in order.cart}
if len(distinct_items) >= 10:
return order.total() * .07
return 0
def best_promo(order):
return max(promo(order) for promo in promos)
# 变量作用域规则
b = 6
def f1(a):
print(a)
print(b)
def f2(a):
try:
print(a)
print(b)
b = 9 # python假定在函数定义体中赋值的变量是局部变量
except UnboundLocalError as e:
print(e)
def f3(a):
global b # 要求把b当成全局变量
print(a)
print(b)
b = 9
_ = [f1(1), f2(2), f3(3)]
# 闭包指延伸了作用域的函数,其中包含函数定义体中引用、但不在定义体中定义的非全局变量
def make_avg1():
series = []
def avg(new_value):
series.append(new_value) # 自由变量
return sum(series)/len(series)
return avg
avg = make_avg1()
print(avg(10), avg(11), avg(12))
print(avg.__code__.co_freevars)
print(avg.__closure__[0].cell_contents)
# 闭包会保留定义函数时存在的自由变量的绑定
# nonlocal声明
def make_avg():
count = 0
total = 0
def avg(new_value):
nonlocal count, total # 标记为自由变量
count += 1 # 因为不可变类型重新绑定会隐式创建局部变量
total += new_value
return total/count
return avg
avg = make_avg()
print(avg(10), avg(11), avg(12))
print(avg.__code__.co_freevars)
print(avg.__closure__[0].cell_contents, avg.__closure__[1].cell_contents)
# 输出函数运行时间的装饰器
import time
import functools
def clock(func):
@functools.wraps(func) # 将__name__, __doc__等复制到clocked
def clocked(*args, **kwargs):
t0 = time.perf_counter()
result = func(*args, **kwargs)
elapsed = time.perf_counter() - t0
name = func.__name__
arg_lst = []
if args:
arg_lst.append(', '.join(repr(arg) for arg in args))
if kwargs:
pairs = ['%s=%r' % (k, w) for k, w in sorted(kwargs.items())]
arg_lst.append(', '.join(pairs))
arg_str = ', '.join(arg_lst)
print(f'[{elapsed:0.8f}s] {name}({arg_str}) -> {result}')
return result
return clocked
@clock
def snooze(seconds):
time.sleep(seconds)
@clock
def fact(n):
return 1 if n < 2 else n*fact(n-1)
snooze(.123)
fact(n=5)
# 标准库中的装饰器.
# 使用functools.lru_cache做备忘(memoization)
@functools.lru_cache(maxsize=16, typed=False) # 保存16个调用结果, 是否按参数类型分开保存结果
@clock
def fibonacci(n):
if n < 2:
return n
return fibonacci(n-2) + fibonacci(n-1)
print(fibonacci(30))
# lru_cache使用字典存储结果,被装饰函数所有参数需可散列
# 单分派泛函数
# @singledispatch装饰的函数变成泛函数(generic function): 根据第一个参数的类型以不同的方式执行相同操作的一组函数
# https://www.python.org/dev/peps/pep-0443/
from functools import singledispatch
from collections import abc
import numbers
import html
@singledispatch
def htmlize(obj):
content = html.escape(repr(obj))
return f'<pre>{content}</pre>'
@htmlize.register(str)
def _(text):
content = html.escape(text).replace('\n', '<br>\n')
return f'<p>{content}</p>'
@htmlize.register(tuple)
@htmlize.register(abc.MutableSequence)
def _(seq):
inner = '</li>\n<li>'.join(htmlize(item) for item in seq)
return '<ul>\n<li>' + inner + '</li>\n</ul>'
@htmlize.register(numbers.Integral) # int 的虚拟超类
def _(n):
return f'<pre>{n} (0x{n:x})</pre>'
print(htmlize({1,2,3}))
print(htmlize(abs))
print(htmlize('Hello \n world.'))
print(htmlize(42))
print(htmlize(['test', 123, {3,2,1}]))
# 装饰器最好通过实现__call__方法的类实现
# 参数化的注册装饰器
registry = set()
def register(active=True): # 装饰器工厂函数
def decorate(func):
print('running register(active=%s)->decorate(%s)' % (active, func))
if active:
registry.add(func)
else:
registry.discard(func)
return func
return decorate
@register(active=False)
def f1(): print('running f1()')
@register()
def f2(): print('running f2()')
def f3(): print('running f3()')
print(registry)
register(active=False)(f2)
print(registry)
# 参数化clock装饰器
import time
DEFAULT_FMT = '[{elapsed:0.8f}s] {name}({args}) -> {result}'
def clock(fmt=DEFAULT_FMT):
def decorate(func):
def clocked(*_args):
t0 = time.time()
_result = func(*_args)
elapsed = time.time() - t0
name = func.__name__
args = ', '.join(repr(arg) for arg in _args)
result = repr(_result)
print(fmt.format(**locals()))
return _result
return clocked
return decorate
@clock('{name}: {elapsed}s')
def snooze(s): time.sleep(s)
for i in range(3): snooze(.123)
第八章
a = {'a':1}
b = {'a':1}
c = a # 别名
print((a == b, a is b, a is c))
# == 比较两个对象的值(__eq__) is 比较对象的标识
# 在变量和单例值之间比较时应使用is,is不能重载,比==速度快
# 元组的不可变性指tuple数据结构的物理内容(保存的引用)不可变,与引用的对象无关
t = (1, [1,2]); t[1].append(3); print(t)
# 复制列表 list [:] 浅复制:复制了最外层的容器,副本中的元素是源容器中元素的引用
l1 = [3, [1, 2], (4, 5)]
l2 = list(l1)
l3 = l1[:]
print(l1 == l2, l1 is l2, l3)
# copy, deepcopy
class Bus:
def __init__(self, passengers=None):
if passengers is None:
self.passengers = []
else:
self.passengers = list(passengers) # 复制, 否则为别名
def pick(self, name):
self.passengers.append(name)
def drop(self, name):
self.passengers.remove(name)
import copy
bus1 = Bus(['Alice', 'Bill', 'Claire', 'David'])
bus2 = copy.copy(bus1)
bus3 = copy.deepcopy(bus1)
bus1.drop('Bill')
print(bus2.passengers, (id(bus1.passengers), id(bus2.passengers), id(bus3.passengers)))
print(bus3.passengers)
# 循环引用
a = [10, 20]
b = [a, 30]
a.append(b)
print(a)
c = copy.deepcopy(a) # deepcopy会记住已经复制的对象
print(c)
'''
python唯一支持的参数传递模式是`共享传参`(call by sharing) java的引用类型也是这样
函数的各个形式参数获得实参中各个引用的副本, 即函数内部形参是实参的别名.
'''
# 不要使用可变类型作为参数的默认值
class HauntedBus:
def __init__(self, passengers = []):
self.passengers = passengers # self.passengers 变成了 passengers 的别名
def pick(self, name):
self.passengers.append(name)
def drop(self, name):
self.passengers.remove(name)
bus1 = HauntedBus()
bus1.pick('Alice')
bus3 = HauntedBus()
print(bus3.passengers)
print(HauntedBus.__init__.__defaults__[0] is bus3.passengers)
# 通常使用None作为接受可变值的参数的默认值.
# 慎重考虑函数是否要修改传入的可变参数
# del和垃圾回收
'''
del语句删除名称, 可能会导致对象被当作垃圾回收, 但仅当删除的变量保存的是对象的最后一个引用, 或者无法得到对象时.
重新绑定也可能会导致对象的引用计数归零, 导致对象被销毁.
在cpython中垃圾回收使用的主要算法是引用计数, 每个对象都会统计有多少引用指向自己, 当引用计数归零时对象就立即被销毁.
cpython会在对象上调用__del__方法(如果定义了), 然后释放分配给对象的内存.
'''
import weakref
s1 = {1, 2, 3}
s2 = s1
def bye():
print('Gone with the wind...')
ender = weakref.finalize(s1, bye)
print(ender.alive)
del s1
print(ender.alive)
s2 = 'spam'
print(ender.alive)
# 弱引用不会增加对象的引用数量,引用的目标对象称为所指对象(referent)
# WeakValueDictionary类实现一种可变映射, 里面的值是对象的弱引用, 被引用对象在其他地方被回收后对应的键会自动在WeakValueDictionary中删除
# 常用于缓存
class Cheese:
def __init__(self, kind):
self.kind = kind
def __repr__(self) -> str:
return 'Cheese(%r)' % self.kind
stock = weakref.WeakValueDictionary()
catalog = [Cheese('Red Leicester'), Cheese('Tilsit'), Cheese('Brie'), Cheese('Parmesan')]
for cheese in catalog:
stock[cheese.kind] = cheese
print(sorted(stock.keys()))
del catalog
print(sorted(stock.keys())) # for循环里的cheese是全局变量
del cheese
print(sorted(stock.keys()))
# CPython的限制. 基本的list和dict实例不能作为所指对象, 但它们的子类可以
t1 = (1, 2, 3)
t2 = tuple(t1) # 如果参数是一个元组,则返回值是同一个对象. str, bytes, frozenset实例也有这种行为
print(t1 is t2)
# 驻留. 小整数, 字符串字面量
s1 = 'ABC'
s2 = 'ABC'
print(s1 is s2)
第九章
from array import array
import math
class Vector2d:
# __slots__ = ('__x', '__y') 表示只有两个属性,避免使用__dict__从而节省空间
typecode = 'd'
# 类属性可以为实例属性提供默认值
def __init__(self, x, y):
self.__x = float(x)
self.__y = float(y)
@property #@property装饰器把读值方法标记为特性,方法名与公开属性同名
def x(self): return self.__x
@property
def y(self): return self.__y
def __iter__(self): # 可迭代对象才能拆包
return (i for i in (self.x, self.y))
def __repr__(self):
class_name = type(self).__name__
return '{}({!r}, {!r})'.format(class_name, *self)
def __str__(self):
return str(tuple(self))
def __bytes__(self):
return (bytes([ord(self.typecode)])) + bytes(array(self.typecode, self))
def __eq__(self, other):
return tuple(self) == tuple(other)
def __abs__(self):
return math.hypot(self.x, self.y)
def __bool__(self):
return bool(abs(self))
@classmethod # 定义类方法,第一个参数是类本身
def frombytes(cls, octets):
typecode = chr(octets[0])
memv = memoryview(octets[1:]).cast(typecode)
return cls(*memv)
def __format__(self, fmt_spec=''): # Format Specification Mini-Language
if fmt_spec.endswith('p'):
fmt_spec = fmt_spec[:-1]
coords = (abs(self), self.angel())
outer_fmt = '<{}, {}>'
else:
coords = self
outer_fmt = '({}, {})'
components = (format(c, fmt_spec) for c in coords)
return outer_fmt.format(*components)
def angel(self):
return math.atan2(self.y, self.x)
def __hash__(self):
return hash(self.x) ^ hash(self.y) # 实例的散列值不应该变化
class Demo:
@classmethod
def a(*args): return args
@staticmethod # 静态方法就是普通的函数
def b(*args): return args
print(Demo.a(), Demo.b())
v1 = Vector2d(3, 4)
print(v1 == eval(repr(v1)) == Vector2d.frombytes(bytes(v1)))
print(format(v1, '.5fp'))
print(v1.__dict__) # 名称改写 (name mangling)
v1._Vector2d__x = 6
print(v1)
'''
定义__slots__之后实例不能再有__slots__中没列出来的属性, 除非把__dict__加入__slots__.
每个子类都要定义__slots__, 因为解释器会忽略继承的__slots__
需要弱引用要把__weakref__加入__slots
'''
v1 = Vector2d(1.1, 2.2)
v1.typecode = 'f' # 定义实例属性, 类属性不变
print(bytes(v1))
# 修改类属性 Vector2d.typecode = 'f'
# 一般创建子类来定制类属性
class ShortVector2d(Vector2d):
typecode = 'f'
v2 = ShortVector2d(1.1, 2.2)
print(bytes(v2))
第10章
from array import array
import reprlib
import math
import numbers
import functools
import operator
import itertools
# 切片原理
class MySeq:
def __getitem__(self, index):
return index
s = MySeq()
print(s[1], s[1:4], s[1:4:2, 9], s[1:4:2, 7:9])
print(slice(None, 10, 2).indices(5)) # S.indices(len) -> (start, stop, stride)
class Vector:
typecode = 'd'
shortcut_names = 'xyzt'
def __init__(self, components):
self._components = array(self.typecode, components)
def __iter__(self):
return iter(self._components)
def __repr__(self):
components = reprlib.repr(self._components) # 限制长度
components = components[components.find('['):-1]
return 'Vector({})'.format(components)
def __str__(self):
return str(tuple(self))
def __bytes__(self):
return (bytes([ord(self.typecode)]) + bytes(self._components))
def __eq__(self, other):
return tuple(self) == tuple(other)
def __abs__(self):
return math.sqrt(sum(x*x for x in self))
def __bool__(self):
return bool(abs(self))
@classmethod
def frombytes(cls, octects):
typecode = chr(octects[0])
memv = memoryview(octects[1:]).cast(typecode)
return cls(memv)
def __len__(self):
return len(self._components)
def __getitem__(self, index):
cls = type(self)
if isinstance(index, slice):
return cls(self._components[index])
elif isinstance(index, numbers.Integral):
return self._components[index]
else:
raise TypeError(f'{cls.__name__} indices must be integers')
def __getattr__(self, name):
cls = type(self)
if len(name) == 1:
pos = cls.shortcut_names.find(name)
if 0 <= pos < len(self._components):
return self._components[pos]
raise AttributeError(f'{cls.__name__!r} object has no attribute {name!r}')
def __setattr__(self, name, value):
cls = type(self)
if len(name) == 1:
if name in cls.shortcut_names:
msg = f'readonly attribute {name!r}'
elif name.islower():
msg = f"can't set attribute 'a' to 'z' in {name!r}"
else: msg = ''
if msg: raise AttributeError(msg)
super().__setattr__(name, value)
# __setitem__ 支持v[0]=1
def __eq__(self, other):
return len(self) == len(other) and all(a == b for a, b in zip(self, other))
def __hash__(self):
hashes = (hash(x) for x in self._components)
return functools.reduce(operator.xor, hashes, 0) # 序列为空返回0否则以0为归约中第一个参数
def angle(self, n):
r = math.sqrt(sum(x*x for x in self[n:]))
a = math.atan2(r, self[n-1])
if (n == len(self) - 1) and (self[-1] < 0):
return math.pi*2 - a
else: return a
def angles(self):
return (self.angle(n) for n in range(1, len(self)))
def __format__(self, fmt_spec=''):
if fmt_spec.endswith('h'):
fmt_spec = fmt_spec[:-1]
coords = itertools.chain([abs(self)], self.angles())
outer_fmt = '<{}>'
else:
coords = self
outer_fmt = '({})'
components = (format(c, fmt_spec) for c in coords)
return outer_fmt.format(', '.join(components))
v1 = Vector(range(7))
print(v1[-1], v1[1:4], type(v1[1:4]))
print(v1.x, v1.y)
print(f'{v1:h}')
第11章
import collections
from collections import abc
import numbers
import random
def test(a, b):
return range(0, 30, 10)[b]
class Foo:
def __getitem__(self, pos):
return range(5)[pos]
Foo.__getitem__ = test # 猴子补丁. 不改变源代码而对功能进行追加和变更
f = Foo()
print(f[1], list(i for i in f), 20 in f)
# 没有__iter__, __contains__, python会调用__getitem__设法让in和迭代可用
class A:
def __len__(self): return 233
print(isinstance(A(), abc.Sized)) # Sized的__subclasshook__在A.__dict__检查到了‘__len__’
Card = collections.namedtuple('Card', ['rank', 'suit'])
class FrenchDeck2(abc.MutableSequence):
ranks = [str(n) for n in range(2, 11)] + list('JQKA')
suits = 'spades diamonds clubs hearts'.split()
def __init__(self):
self._cards = [Card(rank, suit) for suit in self.suits for rank in self.ranks]
def __len__(self):
return len(self._cards)
def __getitem__(self, pos):
return self._cards[pos]
def __setitem__(self, pos, val): # 之后可以使用random.shuffle
self._cards[pos] = val
def __delitem__(self, pos):
del self._cards[pos]
def insert(self, pos, val):
self._cards.insert(pos, val)
# collections.abc 16个抽象基类; numbers:Number, Complex, Real, Rational, Integral
print(isinstance(1.234, numbers.Real))
import abc
class Tombola(abc.ABC):
@abc.abstractmethod
def load(self, iterable):
'''从可迭代对象中添加元素'''
@abc.abstractmethod
def pick(self):
'''随机删除元素并将其返回'''
def loaded(self):
return bool(self.inspect())
def inspect(self):
items = []
while True:
try:
items.append(self.pick())
except LookupError:
break
self.load(items)
return tuple(sorted(items))
# 抽象方法可以有实现代码,子类必须覆盖抽象方法,但子类中可以用super()来调用抽象方法
# 在@abstractmethodhedef语句之间不能有其他装饰器
class BingoCage(Tombola):
def __init__(self, items):
self._randomizer = random.SystemRandom()
self._items = []
self.load(items)
def load(self, items):
self._items.extend(items)
self._randomizer.shuffle(self._items)
def pick(self):
try:
return self._items.pop()
except IndexError:
raise LookupError('pick from empty BingoCage')
def __call__(self):
self.pick()
class LotteryBlower(Tombola):
def __init__(self, iterable):
self._balls = list(iterable)
def load(self, iterable):
self._balls.extend(iterable)
def pick(self):
try:
pos = random.randrange(len(self._balls))
except ValueError:
raise LookupError('pick from empty LotteryBlower')
return self._balls.pop(pos)
def loaded(self):
return bool(self._balls)
def inspect(self):
return tuple(sorted(self._balls))
@Tombola.register #注册虚拟子类. 不会有检查. 相当于 Tombola.register(TomboList) 宣称实现了该有的接口
class TomboList(list):
def pick(self):
if self:
pos = random.randrange(len(self))
return self.pop(pos)
else:
raise LookupError('pop from empty TomboList')
load = list.extend
def loaded(self):
return bool(self)
def inspect(self):
return tuple(sorted(self))
print(issubclass(TomboList, Tombola))
print(TomboList.__mro__)
print(Tombola.__subclasses__(), Tombola._dump_registry())
第12章
class ADict(dict):
def __getitem__(self, key):
return 42
a = ADict(q=1)
print(a['q'])
d = {}
d.update(a) # 内置类型的方法通常会忽略用户覆盖的方法.
print(d)
import collections
class BDict(collections.UserDict):
def __getitem__(self, key):
return 42
a = BDict(q=1)
print(a['q'])
d = {}
d.update(a)
print(d)
# 多重继承
class A:
def ping(self): print('pingA', self)
class B(A):
def pong(self): print('pongB', self)
class C(A):
def pong(self): print('pongC', self)
class D(B, C):
def ping(self):
super().ping()
print('pingD:', self)
def pingpong(self):
self.ping()
super().ping() # A.ping(self)
self.pong()
super().pong()
C.pong(self)
d = D()
d.pong()
C.pong(d)
# 方法解析顺序 Method Resolution Order MRO
print(D.__mro__)
d.ping()
d.pingpong()
第13章
from array import array
from ast import Add, NotIn
import reprlib
import math
import numbers
import functools
import operator
import itertools
class Vector:
typecode = 'd'
shortcut_names = 'xyzt'
def __init__(self, components):
self._components = array(self.typecode, components)
def __iter__(self):
return iter(self._components)
def __repr__(self):
components = reprlib.repr(self._components)
components = components[components.find('['):-1]
return 'Vector({})'.format(components)
def __str__(self):
return str(tuple(self))
def __bytes__(self):
return (bytes([ord(self.typecode)]) + bytes(self._components))
def __eq__(self, other):
return tuple(self) == tuple(other)
# 一元运算符和中缀运算符结果应该是新对象, 并且绝不能修改操作数
def __abs__(self):
return math.sqrt(sum(x*x for x in self))
def __neg__(self):
return Vector(-x for x in self)
def __pos__(self):
return Vector(self)
def __add__(self, other):
try: # 鸭子类型
pairs = itertools.zip_longest(self, other, fillvalue=0.0)
return Vector(a + b for a, b in pairs)
except TypeError:
return NotImplemented # 让编译器尝试对调操作数
def __radd__(self, other):
return self + other
def __mul__(self, scalar):
if isinstance(scalar, numbers.Real): #显式测试
return Vector(n * scalar for n in self)
else:
return NotImplemented
def __rmul__(self, scalar):
return self * scalar
def __matmul__(self, other):
try:
return sum(a * b for a, b in zip(self, other))
except TypeError:
return NotImplemented
def __rmatmul__(self, other):
return self @ other
def __bool__(self):
return bool(abs(self))
@classmethod
def frombytes(cls, octects):
typecode = chr(octects[0])
memv = memoryview(octects[1:]).cast(typecode)
return cls(memv)
def __len__(self):
return len(self._components)
def __getitem__(self, index):
cls = type(self)
if isinstance(index, slice):
return cls(self._components[index])
elif isinstance(index, numbers.Integral):
return self._components[index]
else:
raise TypeError(f'{cls.__name__} indices must be integers')
def __getattr__(self, name):
cls = type(self)
if len(name) == 1:
pos = cls.shortcut_names.find(name)
if 0 <= pos < len(self._components):
return self._components[pos]
raise AttributeError(f'{cls.__name__!r} object has no attribute {name!r}')
def __setattr__(self, name, value):
cls = type(self)
if len(name) == 1:
if name in cls.shortcut_names:
msg = f'readonly attribute {name!r}'
elif name.islower():
msg = f"can't set attribute 'a' to 'z' in {name!r}"
else: msg = ''
if msg: raise AttributeError(msg)
super().__setattr__(name, value)
def __eq__(self, other):
if isinstance(other, Vector):
return len(self) == len(other) and all(a == b for a, b in zip(self, other))
else:
return NotImplemented
def __hash__(self):
hashes = (hash(x) for x in self._components)
return functools.reduce(operator.xor, hashes, 0)
def angle(self, n):
r = math.sqrt(sum(x*x for x in self[n:]))
a = math.atan2(r, self[n-1])
if (n == len(self) - 1) and (self[-1] < 0):
return math.pi*2 - a
else: return a
def angles(self):
return (self.angle(n) for n in range(1, len(self)))
def __format__(self, fmt_spec=''):
if fmt_spec.endswith('h'):
fmt_spec = fmt_spec[:-1]
coords = itertools.chain([abs(self)], self.angles())
outer_fmt = '<{}>'
else:
coords = self
outer_fmt = '({})'
components = (format(c, fmt_spec) for c in coords)
return outer_fmt.format(', '.join(components))
from c11 import Tombola, BingoCage
class AddableBingoCage(BingoCage):
def __add__(self, other):
if isinstance(other, Tombola):
return AddableBingoCage(self.inspect() + other.inspect())
else:
return NotImplemented
def __iadd__(self, other):
if isinstance(other, Tombola):
other_iterable = other.inspect()
else:
try:
other_iterable = iter(other)
except TypeError:
self_cls = type(self).__name__
raise TypeError(f'right operand in += must be {self_cls:r} or an iterable')
self.load(other_iterable)
return self # 增量赋值特殊方法必须返回self
globe = AddableBingoCage('aeiou')
globe += AddableBingoCage('1234')
print(globe.inspect())
# list.extend +=
# 对序列类型+一般要求左右类型相同, += 右操作数往往可以是任何可迭代对象
第14章
import re
import reprlib
class Foo:
def __iter__(self):
pass
from collections import abc
print(issubclass(Foo, abc.Iterable))
# 可迭代:有__iter__或者有__getitem__从0开始. 迭代器还要有__next__
'''
迭代器: 实现了无参数的__next__方法, 返回序列中的下一个元素; 没有元素了则抛出StopIteration异常.
Python中的迭代器还实现了__iter__方法, 因此迭代器也可以迭代
'''
RE_WORD = re.compile('\w+')
class Sentence1:
def __init__(self, text):
self. text = text
self.words = RE_WORD.findall(text)
def __getitem__(self, index):
return self.words[index]
def __len__(self):
return len(self.words)
def __repr__(self):
return f'Sentence({reprlib.repr(self.text)})'
#迭代器模式. 迭代器可以迭代(__iter__返回自己),但可迭代对象不是迭代器(不能实现__next__)
class Sentence2:
def __init__(self, text):
self. text = text
self.words = RE_WORD.findall(text)
def __repr__(self):
return f'Sentence({reprlib.repr(self.text)})'
def __iter__(self):
return SentenceIterator(self.words)
class SentenceIterator:
def __init__(self, words):
self.words = words
self.index = 0
def __next__(self):
try:
word = self.words[self.index]
except IndexError:
raise StopIteration()
self.index += 1
return word
def __iter__(self):
return self
# 符合python习惯 生成器函数
class Sentence3:
def __init__(self, text):
self. text = text
self.words = RE_WORD.findall(text)
def __repr__(self):
return f'Sentence({reprlib.repr(self.text)})'
def __iter__(self):
for word in self.words:
yield word
# 或者 return iter(self.words)
# 只要函数定义中有yield关键字,该函数就是生成器函数, 调用生成器函数会返回一个生成器对象
def gen_123():
print('One')
yield 1
yield 2
yield 3
print('Three')
for i in gen_123(): print(i)
g = gen_123()
print(next(g), '---')
print([i for i in gen_123()])
g = (x for x in gen_123())
for i in g: print(i)
# 惰性实现
class Sentence4:
def __init__(self, text):
self.text = text
def __repr__(self):
return f'Sentence({reprlib.repr(self.text)})'
def __iter__(self):
for match in RE_WORD.finditer(self.text):
yield match.group()
# 生成器表达式.
# return (match.group() for match in RE_WORD.finditer(self.text))
def fibonacci():
a, b = 0, 1
while True:
yield a
a, b = b, a + b
# 等差数列
class ArithmeticProgression:
def __init__(self, begin, step, end=None):
self.begin = begin
self.step = step
self.end = end
def __iter__(self):
result = type(self.begin + self.step)(self.begin)
forever = self.end is None
index = 0
while forever or result < self.end:
yield result
index += 1
result = self.begin + self.step * index # 降低浮点数累积效应风险
# 简化
def artiprog_gen(begin, step, end=None):
result = type(begin + step)(begin)
forever = end is None
index = 0
while forever or result < end:
yield result
index += 1
result = begin + step * index
# 利用标准库再简化. 也是生成器工厂函数
from itertools import count, takewhile
def artiprog_gen2(begin, step, end=None):
first = type(begin + step)(begin)
ap_gen = count(first, step)
if end is not None:
ap_gen = takewhile(lambda n: n < end, ap_gen)
return ap_gen
def my_chain(*iterables):
for i in iterables:
yield from i
print(list(my_chain('ABC', '123')))
# filter, enumerate, map, zip, reversed(序列), all, any, max, min, sum
# itertools: compress, dropwhile, filterfalse, islice, takewhile, accumulate, starmap, chain, chain.from_iterable
# zip_longest, count, cycle, repeat,
# 组合学生成器 product, permutations, combinations, combinations_with_replacement
# groupby, tee
# functools: reduce
'''
iter(func, sentinel)
逐行读取直到文件末尾或遇到空行. '\n'称哨符
with open('mydata.txt') as fp:
for line in iter(fp.readline, '\n'):
process_line
'''
第15章
'''
else
for/else 仅当for循环运行完毕(没有被break中止)才运行else块
while/else 仅当while循环因为条件为假退出时才运行else块
try/else 仅当try块没有异常抛出时才运行else块
with语句开始运行时会在上下文管理器对象上调用__enter__, 结束后调用__exit__
'''
class LookingGlass:
def __enter__(self):
import sys
self.original_write = sys.stdout.write
sys.stdout.write = self.reverse_write
return 'JABBERWOCKY'
def reverse_write(self, text):
self.original_write(text[::-1])
def __exit__(self, exc_type, exc_value, traceback):
import sys
sys.stdout.write = self.original_write
if exc_type is ZeroDivisionError:
print('Please DO NOT divide by zero!')
return True # 告诉解释器已经处理异常
with LookingGlass() as what:
print(what)
print('Alice - MajoKoiNiki')
print('Normal.')
import contextlib
@contextlib.contextmanager # 把函数包装成实现了__enter__, __exit__的类
def looking_glass():
import sys
original_write = sys.stdout.write
def reverse_write(text):
original_write(text[::-1])
sys.stdout.write = reverse_write
msg = ''
try: # 我们无法知道用户会在with块中做什么
yield 'JABBERWOCKY'
except ZeroDivisionError:
msg = 'Please DO NOT divide by zero!'
finally:
sys.stdout.write = original_write
if msg: print(msg)
with looking_glass() as what:
# print(1/0)
print(what)
print('Alice - MajoKoiNiki')
print('Normal.')
第16章
def simple_coroutine():
print('-> coroutine started')
x = yield
print('-> coroutine received:', x)
my_coro = simple_coroutine()
next(my_coro) # 预激协程. 向前执行到第一个yield表达式
try:
my_coro.send(42)
except StopIteration:
pass
from inspect import getgeneratorstate
from typing import NamedTuple
from unicodedata import name
def simple_coroutine2(a):
print('-> Started: a =', a)
b = yield a
print('-> Received: b =', b)
c = yield a + b
print('-> Received: c =', c)
my_coro = simple_coroutine2(1)
print(getgeneratorstate(my_coro))
next(my_coro)
print(getgeneratorstate(my_coro))
print(my_coro.send(2))
print(getgeneratorstate(my_coro))
try:
print(my_coro.send(4))
except StopIteration:
pass
print(getgeneratorstate(my_coro))
# 预激协程的装饰器
from functools import wraps
def coroutine(func):
@wraps(func)
def primer(*args, **kwargs):
gen = func(*args, **kwargs)
next(gen)
return gen
return primer
@coroutine
def averager():
total = 0.0
count = 0
average = None
while True:
term = yield average
total += term
count += 1
average = total/count
coro_avg = averager()
print(coro_avg.send(10))
print(coro_avg.send(30))
print(coro_avg.send(5))
coro_avg.close()
class DemoException(Exception):
"""demo"""
def demo_exc_handling():
print('-> coroutine started')
try:
while True:
try:
x = yield
except DemoException:
print('*** DemoException handled. Continuing...')
else:
print(f'-> coroutine received: {x!r}')
finally:
print('-> coroutine ending')
raise RuntimeError('This line should never run.')
exc_coro = demo_exc_handling()
next(exc_coro)
exc_coro.send(123)
exc_coro.throw(DemoException)
exc_coro.send(456)
# exc_coro.throw(ZeroDivisionError) 会停止
from collections import namedtuple
Result = namedtuple('Result', 'count average')
@coroutine
def averager():
total = 0.0
count = 0
average = None
while True:
term = yield
if term is None:
break
total += term
count += 1
average = total/count
return Result(count, average)
coro_avg = averager()
coro_avg.send(10)
coro_avg.send(30)
coro_avg.send(5)
try:
coro_avg.send(None)
except StopIteration as exc:
result = exc.value
print(result)