位置: 编程技术 - 正文

浅谈python 线程池threadpool之实现(python如何进行线程操作)

编辑:rootadmin

推荐整理分享浅谈python 线程池threadpool之实现(python如何进行线程操作),希望有所帮助,仅作参考,欢迎阅读内容。

文章相关热门搜索词:python线程原理,python线程threading,python线程threading,python线程threading,python的线程,python线程daemon,python线程daemon,python中线程可以用在哪些方面,内容如对您有帮助,希望把文章链接给更多的朋友!

首先介绍一下自己使用到的名词:

工作线程(worker):创建线程池时,按照指定的线程数量,创建工作线程,等待从任务队列中get任务;

任务(requests):即工作线程处理的任务,任务可能成千上万个,但是工作线程只有少数。任务通过 makeRequests来创建

任务队列(request_queue):存放任务的队列,使用了queue实现的。工作线程从任务队列中get任务进行处理;

任务处理函数(callable):工作线程get到任务后,通过调用任务的任务处理函数即(request.callable_)具体 的 处理任务,并返回处理结果;

任务结果队列(result_queue):任务处理完成后,将返回的处理结果,放入到任务结果队列中(包括异常);

任务异常处理函数或回调(exc_callback):从任务结果队列中get结果,如果设置了异常,则需要调用异常回调处理异常;

任务结果回调(callback):从任务结果队列中get结果,对result进行进一步处理;

上一节介绍了线程池threadpool的安装和使用,本节将主要介绍线程池工作的主要流程:

(1)线程池的创建(2)工作线程的启动(3)任务的创建(4)任务的推送到线程池(5)线程处理任务(6)任务结束处理(7)工作线程的退出

下面是threadpool的定义:

1、线程池的创建(ThreadPool(args))

task_pool=threadpool.ThreadPool(num_works)

其中,初始化参数为:

num_works:线程池中线程个数

q_size :任务队列的长度限制,如果限制了队列的长度,那么当调用putRequest()添加任务时,到达限制长度后,那么putRequest将会不断尝试添加任务,除非在putRequest()设置了超时或者阻塞;

esq_size: 任务结果队列的长度;

pool_timeout:工作线程如果从request队列中,读取不到request,则会阻塞pool_timeout,如果仍没request则直接返回;

其中,成员变量:

self._requests_queue: 任务队列,通过threadpool.makeReuests(args)创建的任务都会放到此队列中;self._results_queue: 字典,任务对应的任务执行 self.workers: 工作线程list,通过self.createWorkers()函数内创建的工作线程会放到此工作线程list中;self.dismisssedWorkers: 被设置线程事件,并且没有被join的工作线程self.workRequests: 字典,记录推送到线程池的任务,结构为requestID:request。其中requestID是任务的唯一标识,会在后面作介绍。

2、工作线程的启动(self.createWorks(args))

函数定义:

其中WorkerThread()继承自thread,即python内置的线程类,将创建的WorkerThread对象放入到self.workers队列中。下面看一下WorkerThread类的定义:

从self.__init__(args)可看出:

初始化中变量:

self._request_queue:任务队列;self._resutls_queuqe,:任务结果队列 ;self._pool_timeout:run函数中从任务队列中get任务时的超时时间,如果超时则继续while(true);self._dismissed:线程事件,如果set线程事件则run会执行break,直接退出工作线程;

最后调用self.start()启动线程,run函数定义见上面:

从上面run函数while执行步骤如下:

(1)如果设置了self._dismissed则退出工作线程,否则执行第2步(2)尝从任务 队列self._requests_queue 中get任务,如果队列为空,则continue 执行下一次while循环,否则执行第3步(3)检测此工作线程事件是否被set,如果被设置,意味着要结束此工作线程,那么就需要将取到的任务返回到任务队列中,并且退出线程。如果线程事件没有被设置,那么执行任务处理函数request.callable,并将返回的result,压入到任务结果队列中,如果任务处理函数出现异常,则将异常压入到队列中。最后跳转第4步(4)继续循环,返回1

到此工作线程创建完毕,根据设置的线程池线程数量,创建工作线程,工作线程从任务队列中get任务,进行任务处理,并将任务处理结果压入到任务结果队列中。

3、任务的创建(makeRequests)

任务的创建函数为threadpool.makeRequests(callable_,args_list,callback=None):

其中创建任务的函数参数具体意义为下:

callable_:注册的任务处理函数,当任务被放到任务队列后,工作线程中获取到该任务的线程,会执行此 callable_

浅谈python 线程池threadpool之实现(python如何进行线程操作)

args_list:首先args_list是列表,列表元素类型为元组,元组中有两个元素item[0],item[1],item[0]为位置参 数,item[1]为字典类型关键字参数。列表中元组的个数,代表启动的任务个数,在使用的时候一般都为单个元组,即一个makerequest()创建一个任务。

callback:回调函数,在poll函数中调用(后面讲解此函数),callable_调用结束后,会就任务结果放入到任务结果队列中(self._resutls_queue),在poll函数中,当从self._resutls_queue队列中get某个结果后,会执行此callback(request,result),其中result是request任务返回的结果。

exc_callback:异常回调函数,在poll函数中,如果某个request对应有执行异常,那么会调用此异常回调。

创建完成任务后,返回创建的任务。

外层记录此任务,放入到任务列表中。

上面是创建任务的函数,下面讲解任务对象的结构:

上面self.callback 以及self.exc_callback,和self.callable_ ,args,dwds都已经讲解,就不在??铝恕?br />其中有一个任务的全局唯一标识,即self.requestID,通过获取自身内存首地址作为自己的唯一标识id(self)self.exception 初始化为False,如果执行self.callable()过程中出现异常,那么此变量会标设置为True。

至此,任务创建完毕,调用makeRequests()的上层记录有任务列表request_list.

4、任务的推送到线程池(putRequest)

上面小节中介绍了任务的创建,任务的个数可以成千上百,但是处理任务的线程数量只有我们在创建线程池的时候制定的线程数量来处理,指定的线程数量往往比任务的数量小得多,因此,每个线程必须处理多个任务。

本节介绍如何将创建的任务推送的线程池中,以让线程池由阻塞状态,获取任务,然后去处理任务。

任务的推送使用ThreadPool线程池类中的putRequest(self,request,block,timeout)来创建:

函数的主要作用就是将request任务,也就是上一小节中创建的任务,put到线程池的任务队列中(self._request_queue)。然后记录已经推送到线程池的任务,通过线程池的self.workReuests 字典来存储,结构为request.requestID:request。

至此,任务创建完成,并且已经将任务推送到线程池中。

5、线程处理任务

通过上一小节,任务已经推送到了线程中。在任务没有被推送到线程池中时,线程池中的线程都处在处在阻塞状态中,即在线程的self.run()函数中,一直处于一下状态:

现在任务已经推送到线程池中,那么get任务将会正常返回,会执行下面的步骤:

获取任务--->调用任务的处理函数callable()处理任务--->将任务request以及任务返回的结果压入到self.results_queue队列中------>如果任务处理函数异常,那么将任务异常标识设置为True,并将任务request以及任务异常压入到self.results_queue队列中---->再次返回获取任务

如果,在while循环过程中,外部设置了线程事件,即self._dismissed.isSet为True,那么意味着此线程将会结束处理任务,那么会将get到的任务返回的任务队列中,并且退出线程。

6、任务结束处理

上面小节中,介绍了线程池不断的get任务,并且不断的处理任务。那么每个任务结束之后我们该怎么处理呢,线程池提供了wait()以及poll()函数。

当我们把任务提交个线程池之后,我们会调用wait()来等待任务处理结束,结束后wait()将会返回,返回后我们可以进行下一步操作,例如重新创建任务,将任务继续推送到线程池中,或者结束线程池。结束线程池会在下一小节介绍,这一小节主要介绍wait()和poll()操作。

先来看看wait()操作:

等待任务处理结束,在所有任务处理完成之前一直处于block阶段,如果self.poll()返回异常NoResultsPending异常,然后wait返回,任务处理结束。

下面看看poll函数:

(1)首先,检测任务字典({request.requestID:request})是否为空,如果为空则抛出异常NoResultPending结束,否则到第2步;

(2)检测工作线程是否为空(如果某个线程的线程事件被设置,那么工作线程退出,并从self.workers中pop出),如果为空则抛出NoWorkerAvailable异常结束,否则进入第3步;

(3)从任务结果队列中get任务结果,如果抛出队列为空,那么break,返回,否则进入第4步;

(4)如果任务处理过程中出现异常,即设置了request.exception,并且设置了异常处理回调即request.exc_callback则执行异常回调,再回调中处理异常,返回后将任务从任务列表self.workRequests中移除,继续get任务,返回第1步。否则进入第5步;

(5)如果设置了任务结果回调即request.callback不为空,则执行任务结果回调即request.callbacl(request,result),并将任务从任务列表self.workRequests中移除,继续get任务,返回第1步。

(6)重复进行上面的步骤直到抛出异常,或者任务队列为空,则poll返会;

至此抛出NoResultPending wait操作接受此异常后,至此wait()返回。

7、工作线程的退出

threadpool提供的工作线程退出的的操作有dismissWorkers()和joinAllDismissedWorker()操作:

从dismissWorkers可看出,主要工作是从self.workers 工作线程中pop出指定的线程数量,并且设置此线程的线程事件,设置线程事件后,此线程self.run()函数,则会检测到此设置,并结束线程。

如果设置了在do_join,即设置了在此函数中join退出的线程,那么对退出的线程执行join操作。否则将pop出的线程放入到self.dismissedWorkers中,以等待joinAllDismissedWorkers操作去处理join线程。

8、总结

到此为止,threadpool线程池中所有的操作介绍完毕,其实现也做了具体的介绍。从上面可看出,线程池并没有那么复杂,只有几个简单的操作,主要是了解整个处理流程即可。

标签: python如何进行线程操作

本文链接地址:https://www.jiuchutong.com/biancheng/372285.html 转载请保留说明!

上一篇:python线程池(threadpool)模块使用笔记详解(python线程池最大数量)

下一篇:Python实现读取json文件到excel表(python读取grib)

  • 二手房交易需缴纳哪些费用
  • 每月工资不一样怎么交党费
  • 垃圾袋发票类别是什么
  • 高速路费电子发票怎么打印
  • 员工垫付公司费用法规
  • 每个月计提工资没有发怎么办
  • 没有真实业务的票据贴现
  • 普通发票需要做分录吗?
  • 企业交的社保是什么
  • 企业电子印章申请流程海口
  • 税务局备案的企业是什么
  • 装饰公司收到客户工程款做分录
  • 有哪些凭证
  • 个人转让股权的合理费用都有哪些
  • 蓝屏代码0xc000000f
  • Secure Boot什么意思?BIOS中Secure Boot灰色无法更改解决方法详解
  • 预付加油充值卡可以报销吗
  • ecshop怎么用
  • linux操作系统为用户提供的接口为
  • 怎么绕开中介找到房主
  • 股权转让要交什么税举例
  • 公司搬家费用怎么入账
  • 整体租赁合同
  • 房地产企业增值税扣除土地成本
  • 非上市公司股票期权个人所得税
  • 研发费用如何加计扣除何时申报
  • 完美解决在ThinkPHP控制器中命名空间的问题
  • php如何继承多个类
  • php实现的功能
  • typescript instanceof
  • 资金平衡表的编制方法
  • 融资租赁设备所有权归谁
  • 二手固定资产怎么折旧
  • 全年一次性奖金税率表2023
  • 终止劳动合同赔偿N+1
  • sqlgun
  • 招待客户的交通费可以算招待费吗
  • 公司购烟酒怎么入账
  • 小规模出售固定资产账务处理
  • 本月开发票下月收款
  • 企业给员工的福利体检报告
  • 去年城建税多计提了怎么办
  • 出口佣金账务处理
  • 原始凭证收集
  • 会计库存商品属于什么科目
  • 个体户查账征收个人所得税税率
  • 计提工会经费如何做账
  • 企业项目的特点
  • 向其他公司投资专利账务处理
  • 税收返还会计核算
  • 公司注册取消验资
  • 生产车间生产工人的薪酬计入什么
  • 对公账户有资金往来必须入账吗
  • 设备融资租赁会上征信吗
  • sql语句错误提示
  • ef使用sql语句
  • navicat不能创建string类型
  • windows安装mysql8.0
  • fedora改密码
  • mac更改复制快捷键
  • 借用自带功能给win8.1系统做开始菜单(不仅仅是一个开始按钮)
  • centos获取权限
  • Windows 8.1 RT Update 3更新将于9月发布
  • linux device driver
  • 使用jquery实现的项目
  • css样式表三种方式
  • shell脚本详解
  • framelayout布局
  • linux系统启动进程命令
  • 批处理rd
  • unity3d入门视频教程
  • 删除的照片怎么还原
  • input checked选择
  • javascript 对象
  • 安卓权限管理app
  • 狠抓组织收入工作
  • 监察室主任岗位职责
  • 新开公司交印花税
  • 海宁到许村公交车路线
  • 免责声明:网站部分图片文字素材来源于网络,如有侵权,请及时告知,我们会第一时间删除,谢谢! 邮箱:opceo@qq.com

    鄂ICP备2023003026号

    网站地图: 企业信息 工商信息 财税知识 网络常识 编程技术

    友情链接: 武汉网站建设