python多线程&多进程&多协程学习笔记

正文索引 [隐藏]

Python 多线程学习笔记

一、简述

主要使用模块:threading

Python 中的多线程:Python为数据安全设有GIL (Global Interpreter Lock),一个线程的执行过程为:获取GIL、执行代码直到挂起、释放GIL,而在一个Python进程中,GIL只有一个,没有GIL的线程不允许进入CPU。每次释放GIL时,线程之间会进行竞争,而切换线程会消耗资源,所以在多核CPU上Python的多线程效率不高。但是在IO密集的情况下使用多线程可显著提高程序效率,因为IO操作会进行IO等待,别的线程可以充分利用这个等待过程

二、实例的创建

1. 建立实例、传函数的使用

t = threading.Thread(target=function, 
                    name=treadname(str), 
                     args=args_to_function(tuple)
                    )
t.start() # 开始这个线程
t.join() # 等这个线程完成后再进行主线程

2. 建立实例、传类实例的使用

# 下面部分测试 threading 模块
class Function():
    def __init__(self, a):
        self.a = a
    def __call__(self, secs):
        print("Awaking NO %d..."%(self.a,))
        time.sleep(secs)
        print("Hello world!")
t = threading.Tread(target=Funtion(1),
                   name=treadname(str),
                   args=2)

3. 派生Thread的子类,创建子类的实例

class ThreadFunc(threading.Thread):
    def __init__(self, func, args):
        threading.Thread.__init__(self)
        self.func = func
        self.args = args

    def run(self):
        self.func(*self.args)

# 之后实例的创建就和threading.Tread一样啦

三、同步

有些特定的函数或者代码块不希望被多个线程同时执行,这时候需要用到同步,常用到信号量

锁,Lock

锁只有两种状态,锁定和解锁。即支持两种操作:获得锁和释放锁。当多线程争夺锁的时候,允许第一个获得锁的线程进入临界区,并执行代码,其他后到达的线程将被阻塞,直到获得锁的线程执行完毕,退出临界区,并释放锁。其他等待的线程去争夺锁并进行临界区。

(待续)

信号量

(待续)

四、使用线程安全的Queue模块

应用背景:

​ Python内置GIL(Global Interperator Lock) 在代码执行IO操作(调用操作系统内建C代码)的时候GIL自动释放,也就是说此时会有别的线程开始工作,那么如果新入线程同样进行IO操作的话将会是一场灾难,虽然可以手动使用Lock进行控制将IO锁死,但是有更加方便的处理,就是queue模块,已经帮我们做好了着方面的工作

官方文档

常用class:

Queue(maxsize=0):提供FIFLO队列

LifoQueue(maxsize=0):提供LIFO队列

PriorityQueue(maxsize=0):提供优先级队列,官方推荐数据输入格式为(priority_number, data),将根据元组第一个元素进行优先级排序,每次弹出优先度最低的元组(注意弹出的是元组,务必加上获取数据的代码)

常用objects:

  • qsize()
  • empty()
  • full()
  • put(item, block=True, timeout=None):block为真时,该线程在进行中时,别的线程将被阻塞,如果timeout设置,那么将在timeout秒后将阻塞解开
  • put_nowait(item):无阻塞线程,立即执行
  • get(block=True, timeout=None)
  • get_nowait()
  • task_done():这个很重要,当一个线程完成的时候务必加上用以告诉线程队列这个线程已经进行完毕,不然会无限挂死
  • join(): 对主线程进行阻塞直到队列中的所有线程都已结束

Python多进程学习笔记

一、简述

执行方式:并行

主要模块:multiprocessing

简述:不同于多线程受制于 GIL 的限制,多进程爬虫可以充分利用 CPU 的多核,主要使用的方法是 Procee + Queue 或者 Pool + Queue

二、实例的创建

由于multiprocessing和threading使用起来非常相近,前面两种直接传target函数的方法就不再赘述,直接写继承的常用代码:

import multiprocessing
import time

class Process(multiprocessing.process):
    def __init__(self):
        multiprocessing.process(self)
    def run(self):
        print("{} is running".format(self.name))
        time.sleep(2)
        print("{} is finnished".format(self.name))

if __name__ == '__main__':   # 多进程里必须有这句
    p1 = Process()
    p2 = Process()
    p1.start()
    p2.start()
    p1.join()
    p2.join()

    print('All done')

三、if __name__ == ' __main__ ' 的理解

__name__:

​ 内置变量,用于表示当前模块的名字,同时反应当前包的结构

假设有一个包如下:
a
├── b
│   ├── c.py
│   └── __init__.py
└── __init__.py

import a.b.c
print(c.__name__)
>>> a.b.c

__main__:

​ 如果某个模块是直接运行的,而不是import进来的,那么它的__name__就为'__mian__',说白了就是一个程序入口,相当于C/C++中的main()。

为什么在多进程中必须加这一句if __name__ == '__main__'

​ 这个是在windows上实现时的特有问题,windows上子进程会自动import启动它的这个文件,如果没有这一句,呃,无限递归。。。

​ 其实为了程序的可调用性,还是习惯性地加上这一句吧。

四、功能详解

Process 类

用于创建新的进程,和 threading.Thread 的 API 非常类似

class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

  • deamon: True or False,用于创建守护进程(没有任何存在的父进程,脱离终端运行于背景,即使终端关闭该进程仍然存在)

Methods

set_start_method()

​ multiprocessing 模块支持三种模式启动一个进程,使用这个方法进行设置

import multiprocessing as mp
def function():
    print('hello')

if __name__ == '__main__':
    mp.set_start_method('spwan') # 启动缓慢,但是支持 Unix 和 Windows
    mp.set_start_method('fork') # 子进程将全部继承主进程的资源,而且要安全的进行资源共享会神烦,仅支持 Unix
    mp.set_start_method('forkserver') # 这个开启进程的模式是向 forkserver 申请新的子进程,子进程是单一线程的,所以可以安全地使用 os.fork(),没有任何不必要地资源会被继承,仅支持 Unix
    p = mp.Process(target=fuction)
    p.start()
    p.join()
    print('End')

run()

start()

join(timeout=None)

is_alive()

kill():结束进程(on Unix)

close():关闭这个进程,释放所有资源,如果有子进程还在运行会收到ValueError,而如果该方法返回成功,该进程下所有方法都会收到ValueError

terminate():关闭这个进程,注意这个方法不会关闭这个进程的子进程,注意这个进程如果使用 lock 或者 semaphore 可能导致其他进程死锁


Members

name

daemon

pid

exitcode:子进程的退出代码

authkey

sentinel


Exceptions

ProcessError:这个模块所有异常的基类

BufferTooShort

AuthenticationError:权限错误

TimeoutError:失效错误



Queue 类 & Pipe

用于进程间的互相通信

**Pipe(duplex=True)**:返回 (conn1, conn2)(connection 实例),如果 duplex 为 True 则为双向链接,反之为单向链接

**class multiprocessing.Queue(maxsize)**:和queue.Queue的用法基本相同,但是注意有两个异常(queue.Empty 和queue.Full)需要从 queue 这个模块导入


Methods

qsize()

empty()

full()

put(obj, block=False, timeout=None)

put_nowait(obj)

get(obj, block=False, timeout=None)

get_nowait()

以下三个自动调用,具体为啥可能得研究下操作系统原理,目前用不上

close():表明这个队列不再进行数据交换操作(队列销毁时自动执行)

join_thread():只有在close()调用后才能调用,此时这个队列被加入到一个后台线程中,queue会一直阻塞直到这个后台线程退出,默认情况下,如果一个进程不是创建queue的进程,退出它时会尝试加入queue的后台线程中(进程退出时自动执行)

cancel_join_thread():(改变默认退出格式,不明白的时候慎用)

备注:这个类和 queue.Queue 的区别是没有 join() 和 task_done() ,习惯使用这种方法的请调用 multiprocessing.JoinableQueue



Connection类

class multiprocessing.connection.Connection()

一般由 Pipe() 创建,用于进程间传输数据


Methods

send(obj):向连接的另一端发送数据(需要调用 recv() 接收)

recv():接受数据

fileno(): 返回文件句柄

close():关闭连接(销毁时自动调用)

poll(timeout=0):返回连接中是否有可读数据,如果设为 None ,阻塞时间将为无穷大

send_bytes(buffer[,offset[,size]])

recv_bytes(maxlength)

recv_bytes_into(buffer[, offset])

Pool 类

用于创建进程池,进行进程的批量处理