博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Python 多进程和多线程
阅读量:2346 次
发布时间:2019-05-10

本文共 11429 字,大约阅读时间需要 38 分钟。

一.多进程

进程是程序的一次动态执行过程,它对应了从代码加载、执行到执行完毕的一个完整过程。

进程是系统进行资源分配和调度的一个独立单位。进程是由代码(堆栈段)、数据(数据段)、内核状态和一组寄存器组成。

在多任务操作系统中,通过运行多个进程来并发地执行多个任务。由于每个线程都是一个能独立执行自身指令的不同控制流,因此一个包含多个线程的进程也能够实现进程内多任务的并发执行。

进程是一个内核级的实体,进程结构的所有成分都在内核空间中,一个用户程序不能直接访问这些数据。

1.1 进程的状态:

创建、准备、运行、阻塞、结束。

1.2 进程间的通信方式

  1. 文件
  2. 管道
  3. socket
  4. 信号
  5. 信号量
  6. 共享内存

1.3 进程的创建函数

  • os.fork()
  • subprocess
  • processing
  • multiprocessing

fork()语法:

功能:为当前进程创建一个子进程

参数:无

返回值:0 和 子进程PID(在父进程中)

< 0 子进程创建失败

= 0 在子进程中的返回值

> 0 在父进程中的返回值

特点:

(1)子进程会继承父进程几乎全部代码段(包括fork()前所定义的所有内容)

(2)子进程拥有自己独立的信息标识,如PID

(3)父、子进程独立存在,在各自存储空间上运行,互不影响

(4)创建父子进程执行不同的内容是多任务中固定方法

#!/usr/bin/env python# -*- coding: utf-8 -*-# 多进程import osimport randomimport subprocessimport timefrom multiprocessing import Process, Queuefrom multiprocessing.dummy import Pool# 1.使用fork() 复制子进程# 每次fork一个子进程处理服务端任务,仅在linux /unix系统上# Unix/Linux操作系统提供了一个fork()系统调用,它非常特殊。# 普通的函数调用,调用一次,返回一次,但是fork()调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后,分别在父进程和子进程内返回。# 子进程永远返回0,而父进程返回子进程的ID。这样做的理由是,一个父进程可以fork出很多子进程,所以,父进程要记下每个子进程的ID,而子进程只需要调用getppid()就可以拿到父进程的IDdef test1():    pid = os.fork()    if pid == 0:        #        print("copy progress (%s)" % (os.getpid()))    else:        print("child progress (%s) create (%s)" % (os.getpid(), pid))def run_progress(name):    print("run child progress %s (%s)"%(name,os.getpid()))# multiprocessing模块就是跨平台版本的多进程模块# multiprocessing模块提供了一个Process类来代表一个进程对象def test2():    print("parent progress %s"%(os.getpid()))    p = Process(target=run_progress,args=('test',))    print('child process will start')    # 启动    p.start()    print('child process start')    # join()方法可以等待子进程结束后再继续往下运行,通常用于进程间的同步。    p.join()    print('child process end')def longtime_task(name):    print("run task (%s) %s ..."%(name,os.getpid()))    start_time = time.time()    time.sleep(random.random()*3)    end_time = time.time()    print("task%s %s runs %0.2f seconds"%(name,os.getpid(),(end_time-start_time)))# 如果要启动大量的子进程,可以用进程池的方式批量创建子进程# 对Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了def test3():    print("")    # Pool(n) n 同时运行的进程个数,默认 n大小是 cpu核数    pool = Pool(4)    for i in range(5):        pool.apply_async(longtime_task,args=(i,))    print("waiting all progress done")    pool.close()    pool.join()    print("all sub progress done")#子进程 subprocess模块可以让我们非常方便地启动一个子进程,然后控制其输入和输出def test4():    ret = subprocess.call(['ping',' www.baidu.com'])#     如果子进程还需要输入,则可以通过communicate()方法输入:    print("code:",ret)def writeQueue(q):    print("Progess to write:%s"%(os.getpid()))    for value in ["A","B","C","D","E"]:        print("put %s to queue ..."%(value))        q.put(value)        time.sleep(random.random())def readQueue(q):    print("progress to read :%s"%(os.getpid()))    while True:        value = q.get(True)        print("get %s from queue"%(value))# Process之间肯定是需要通信的,操作系统提供了很多机制来实现进程间的通信。Python的multiprocessing模块包装了底层的机制,提供了Queue、Pipes等多种方式来交换数据。# 类似生产者消费者def test5():    queue = Queue()    pw = Process(target=writeQueue,args=(queue,))    pr = Process(target=readQueue,args=(queue,))    pw.start()    pr.start()    pw.join()    pr.join()    pr.terminate()if __name__ == '__main__':    test5()

二.多线程

1 线程threading

1.1 基本概述

  • 也被称为轻量级的进程。
  • 线程是计算机多任务编程的一种方式,可以使用计算机的多核资源。
  • 线程死应用程序中工作的最小单元

1.2 线程特点

(1)进程的创建开销较大,线程创建开销较小

(2)一个进程中可以包含多个线程

(3)线程依附于进程的存在,多个线程共享进程的资源

(4)每个线程也拥有自己独特的特征,比如ID、指令集

注意:

(1)进程有独立的空间,数据安全性较高

(2)线程使用进程的资源,即一般使用全局变量的方式进行线程间通信,所以需要较复杂的同步互斥

(3)多个不相关的线程功能最好不要凑到一起形成线程

1.3 线程与进程

(1)进程是线程的容器,一个程序的执行实例就是一个进程,线程是进程的实际运作单位。

(2)进程是系统进行资源分配调度的基本单元,是操作系统结构的基础;线程是操作系统能够运算调度的最小单位。

(3)进程之间是相互隔离的,即一个进程既无法访问其他进程的内容,也无法操作其他进程,而操作系统会跟踪所有正在运行的进程,给每个进程一小段运行时间,然后切换到其他进程。

(4)python的线程没有优先级,没有线程组的概念,也不能被销毁、停止、挂起,自然也没有恢复、中断。

2 线程模块

Thread module emulating a subset of Java’s threading model

2.1 线程模块之类

Thread,Event, Lock, Rlock, Condition, [Bounded]Semaphore, Timer, local。

2.2 线程模块之方法属性

方法:

threading.current_thread():返回当前的线程变量。

threading.active_count():
threading.stack_size()

threading.enumerate():返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。

threading.setprofile(func)

threading.Lock()
threading.Rlock()

属性:

t = threading.Thread(target=function),t为线程实例

:线程的名称
t.getName:获得线程名字
t.is_alive:获取线程的状态
2.3 线程模块之常量
threading.TIMEOUT_MAX 设置threading全局超时时间。
回到顶部
3 Thread类
3.1 Thread类

*class Thread(builtins.object)的Methods defined here:

Thread(self, group=None, target=None, name=None, args=(), kwargs=None, , daemon=None)
group:线程组,目前没有实现,为了将来实现ThreadGroup类的扩展而设置的。
target:要执行的方法,run()方法调用的对象
name:线程名,在默认情况下,命名为Thread-N(N为最小的十进制数字)
args:元组参数
kwargs:字典参数

3.2 Thread类的实例方法:

*start():启动线程

join(self, timeout=None):等待线程结束,阻塞线程直到调用此方法的线程终止或到达指定的的timeout
is_alive():返回线程是否在运行,运行返回True;
isAlive:等价于is_alive()
getName():获取线程名字
setName():设置线程名字,在threading.Thread()的帮助文档中没有该方法,其他资料上有。

is/setDaemon(bool): 获取/设置是后台线程(默认前台线程(False))。(在start之前设置)*

如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,主线程和后台线程均停止

如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止

3.3 线程的创建方法

线程的创建方法有两种

一种是利用threading.Thread()直接创建线程

import threading
import time

def music(sec):    print('播放音乐')    time.sleep(sec)t = threading.Thread(name='createthread',target=music,args=(2,))t.start()print("结束!")

另外是利用 threading.Thread() 类的继承、并重写 run() 来创建线程

import threadingimport timeclass MyThread(threading.Thread):    def __init__(self,arg):        # 注意:一定要显式的调用父类的初始化函数。        super(MyThread, self).__init__()        self.arg=arg    # 定义每个线程要运行的函数    def run(self):        time.sleep(1)        print('the arg is:%s\r' % self.arg)for i in range(3):    t =MyThread(i)    t.start()print('main thread end!')

事实上:线程传参与函数传参没有明显区别,本质上就是函数传参

import threadingdef add(x, y):    print('{} + {} = {}'.format(x, y, x + y, threading.current_thread()))# 创建线程对象thread1 = threading.Thread(target=add, name='add', args=(4, 5))thread2 = threading.Thread(target=add, name='add', args=(4,), kwargs={'y': 5})thread3 = threading.Thread(target=add, name='add', kwargs={'x': 4, 'y': 5})# 启动线程thread1.start()thread2.start()thread3.start()

关于format函数参看 python的format函数、python中强大的format函数

3.5 线程中的变量

import threadingimport timea = 10def music(sec):    print('播放音乐')    print(a)    time.sleep(sec)    print(a)t = threading.Thread(name='mythread',target=music,args=(2,))t.start()print("------------main--------------")time.sleep(2)a = 100

说明:

(1)同一个程序,运行后的结果不相同,因为在同样sleep(2)后,全局变量重新绑定了100,而进程最后打印出变量a,若变量a先绑定100,则会打印出100,若先打印则为10

(2)将args=(2,)变为args=(3,),则会稳定打印出100

(3)将全局变量的前的sleep(2)变为sleep(3),则结果稳定打印出10

示例2

import threadingimport timea = 10def music1(sec):    a = 1000    print('m11',a)    time.sleep(sec)    print('m12',a)def music2(sec):    print('m21',a)    time.sleep(sec)    print('m22',a)t1 = threading.Thread(name='mythread1',target=music1,args=(2,))t2 = threading.Thread(name='mythread2',target=music2,args=(3,))t1.start()t2.start()print("------------main--------------")

3.6 线程中的属性方法

import threadingimport timedef music(sec):    print('播放音乐')    time.sleep(sec)t = threading.Thread(name='mythread',target=music,args=(2,))t.start()print('name:',t.name)#线程名称t.setName("yourthread")#设置线程的名称print('name:',t.name)#线程名称print('getName:',t.getName())# 获得线程名称print('is_alive:',t.is_alive()) #获取线程状态t.join()#阻塞等待进程退出print("------------main--------------")

3.7 在本地计算机上可以创建线程的最大数量

# 查看可以创建多少线程# 可以用top查看CPU运行情况import threadingimport timea = 0def music(sec):    global a    a += 1    time.sleep(sec)    print(a)    # 阻塞创建的线程退出    while True:        passthread_list = []while True:    t = threading.Thread(target=music,args=(2,))    t.start() #启动线程    thread_list.append(t)for i in thread_list:    i.join()#阻塞等待线程退出

三,异步实现网络请求

requestServer= "https://www.baidu.com"#装饰器def async_call(fn):    def wrapper(*args, **kwargs):        threading.Thread(target=fn, args=args, kwargs=kwargs).start()    return wrapperclass JhServerRequest(object):	#锁    lock = threading.Lock()    # def __init__(self):    #     super.__init__()        # pass    # 接口地址:http://192.168.1.225:8090/sdk/Cltpg/game    #    @async_call    def getGameList(self,callback_fun):        requesturl = requestServer+"/sdk/Cltpg/game"        # con = http.client.HTTPConnection("http://192.168.1.225")        # con.request(method="GET",url=requesturl)        # resu = con.getresponse()        # print(resu.status,resu.reason,resu.info())        # print(resu.read())        #证书验证错误,遇到这种错误有可能是机器无外网权限造成的        ssl._create_default_https_context = ssl._create_unverified_context        req = urllib.request.urlopen(url=requesturl, data=b'data')        jsondatas = req.read().decode('utf-8')        # jsondatas = asyncio.get_event_loop().run_until_complete(self.asyncRequestFromUrl(url=requesturl)).decode(        #     'utf-8')        print(jsondatas)        jsonModel = json.loads(jsondatas)        # print(jsonModel)        # for key in jsonModel:        #     print(key)        # f = urllib.request.urlopen(req)        # print(f.read().decode('utf-8'))        # 回调函数        callback_fun(jsonModel)        return jsonModel    # :http://192.168.1.225:8090/sdk/Cltpg/platform?tp=2    #    @async_call    def getChannaelList(self,callback_fun):        requesturl = requestServer+"/sdk/Cltpg/platform?tp=1"        # con = http.client.HTTPConnection(requesturl)        # resu = con.getresponse()        # print(resu.status, resu.reason, resu.info())        # print(resu.read())        ssl._create_default_https_context = ssl._create_unverified_context        req = urllib.request.urlopen(url=requesturl,data=b'data')        jsondatas = req.read().decode('utf-8')        print(jsondatas)        # jsondatas = asyncio.get_event_loop().run_until_complete(self.asyncRequestFromUrl(url=requesturl)).decode(        #     'utf-8')        jsonModel = json.loads(jsondatas)        # print(jsonModel)        # for key in jsonModel:        #     print(key)        # f = urllib.request.urlopen(req)        # print(f.read().decode('utf-8'))        callback_fun(jsonModel)        return jsonModel    #gid gameId    # pid 渠道ID    # @async_call    def getChannelDetail(self,gid,pid):        # http://192.168.1.225:8090/sdk/Cltpg/gpconfig?gid=1&pid=71&tp=2        ssl._create_default_https_context = ssl._create_unverified_context        requesturl = requestServer + "/sdk/Cltpg/gpconfig?gid=%s&pid=%s&tp=1"%(gid,pid)        print(requesturl)        # con = http.client.HTTPConnection(requesturl)        # resu = con.getresponse()        # print(resu.status, resu.reason, resu.info())        # print(resu.read())        req = urllib.request.urlopen(url=requesturl, data=b'data')        jsondatas = req.read().decode('utf-8')        # jsondatas = asyncio.get_event_loop().run_until_complete(self.asyncRequestFromUrl(url=requesturl)).decode('utf-8')        jsonModel = json.loads(jsondatas)        # print(jsonModel)        # for key in jsonModel:        #     print(key)        # f = urllib.request.urlopen(req)        # print(f.read().decode('utf-8'))        return jsonModel    # # 异步请求    # async def asyncRequestFromUrl(self,url):    #     async with ClientSession() as session:    #         async with session.get(url) as response:    #             response = await response.read()    #             return response    @classmethod    def getinstance(cls):        if not hasattr(cls,"instance"):            JhServerRequest.instance = JhServerRequest()        return JhServerRequest.instance

调用详情

# 回调函数def channelListCallBackFun(datas):    # 处理数据    handleDatas(datas)    # 更新数据,由于我这是个工具,就更新界面    updateUi()    if __name__ == '__main__':    # 异步网络请求    JhServerRequest.getinstance().getGameList(gameListCallBackFun)

参考文章

转载地址:http://uenvb.baihongyu.com/

你可能感兴趣的文章
旋转关节绳子
查看>>
射箭box2d
查看>>
cocos2d iphone-wax cocowax
查看>>
angribird level editor
查看>>
love2d 苹果运行
查看>>
GridBagLayout 的注意
查看>>
ajax 跨域iis6 设置
查看>>
4.0版本改动
查看>>
IE8 9 ajax no-transport ajax 问题
查看>>
oracle 启动dbconsole
查看>>
entity-framework 6解决方案中多个项目使用
查看>>
ios基础
查看>>
unity3d
查看>>
metronic 1.5
查看>>
unity3d 4 assert store
查看>>
tab bar control 注意事项
查看>>
sql优化部分总结
查看>>
IDEA运行时动态加载页面
查看>>
UML总结(对九种图的认识和如何使用Rational Rose 画图)
查看>>
js遍历输出map
查看>>