位置: 编程技术 - 正文

浅谈python 线程池threadpool之实现(python如何进行线程操作)

编辑:rootadmin

推荐整理分享浅谈python 线程池threadpool之实现(python如何进行线程操作),希望有所帮助,仅作参考,欢迎阅读内容。

文章相关热门搜索词:python线程原理,python线程threading,python线程threading,python线程threading,python的线程,python线程daemon,python线程daemon,python中线程可以用在哪些方面,内容如对您有帮助,希望把文章链接给更多的朋友!

首先介绍一下自己使用到的名词:

工作线程(worker):创建线程池时,按照指定的线程数量,创建工作线程,等待从任务队列中get任务;

任务(requests):即工作线程处理的任务,任务可能成千上万个,但是工作线程只有少数。任务通过 makeRequests来创建

任务队列(request_queue):存放任务的队列,使用了queue实现的。工作线程从任务队列中get任务进行处理;

任务处理函数(callable):工作线程get到任务后,通过调用任务的任务处理函数即(request.callable_)具体 的 处理任务,并返回处理结果;

任务结果队列(result_queue):任务处理完成后,将返回的处理结果,放入到任务结果队列中(包括异常);

任务异常处理函数或回调(exc_callback):从任务结果队列中get结果,如果设置了异常,则需要调用异常回调处理异常;

任务结果回调(callback):从任务结果队列中get结果,对result进行进一步处理;

上一节介绍了线程池threadpool的安装和使用,本节将主要介绍线程池工作的主要流程:

(1)线程池的创建(2)工作线程的启动(3)任务的创建(4)任务的推送到线程池(5)线程处理任务(6)任务结束处理(7)工作线程的退出

下面是threadpool的定义:

1、线程池的创建(ThreadPool(args))

task_pool=threadpool.ThreadPool(num_works)

其中,初始化参数为:

num_works:线程池中线程个数

q_size :任务队列的长度限制,如果限制了队列的长度,那么当调用putRequest()添加任务时,到达限制长度后,那么putRequest将会不断尝试添加任务,除非在putRequest()设置了超时或者阻塞;

esq_size: 任务结果队列的长度;

pool_timeout:工作线程如果从request队列中,读取不到request,则会阻塞pool_timeout,如果仍没request则直接返回;

其中,成员变量:

self._requests_queue: 任务队列,通过threadpool.makeReuests(args)创建的任务都会放到此队列中;self._results_queue: 字典,任务对应的任务执行 self.workers: 工作线程list,通过self.createWorkers()函数内创建的工作线程会放到此工作线程list中;self.dismisssedWorkers: 被设置线程事件,并且没有被join的工作线程self.workRequests: 字典,记录推送到线程池的任务,结构为requestID:request。其中requestID是任务的唯一标识,会在后面作介绍。

2、工作线程的启动(self.createWorks(args))

函数定义:

其中WorkerThread()继承自thread,即python内置的线程类,将创建的WorkerThread对象放入到self.workers队列中。下面看一下WorkerThread类的定义:

从self.__init__(args)可看出:

初始化中变量:

self._request_queue:任务队列;self._resutls_queuqe,:任务结果队列 ;self._pool_timeout:run函数中从任务队列中get任务时的超时时间,如果超时则继续while(true);self._dismissed:线程事件,如果set线程事件则run会执行break,直接退出工作线程;

最后调用self.start()启动线程,run函数定义见上面:

从上面run函数while执行步骤如下:

(1)如果设置了self._dismissed则退出工作线程,否则执行第2步(2)尝从任务 队列self._requests_queue 中get任务,如果队列为空,则continue 执行下一次while循环,否则执行第3步(3)检测此工作线程事件是否被set,如果被设置,意味着要结束此工作线程,那么就需要将取到的任务返回到任务队列中,并且退出线程。如果线程事件没有被设置,那么执行任务处理函数request.callable,并将返回的result,压入到任务结果队列中,如果任务处理函数出现异常,则将异常压入到队列中。最后跳转第4步(4)继续循环,返回1

到此工作线程创建完毕,根据设置的线程池线程数量,创建工作线程,工作线程从任务队列中get任务,进行任务处理,并将任务处理结果压入到任务结果队列中。

3、任务的创建(makeRequests)

任务的创建函数为threadpool.makeRequests(callable_,args_list,callback=None):

其中创建任务的函数参数具体意义为下:

callable_:注册的任务处理函数,当任务被放到任务队列后,工作线程中获取到该任务的线程,会执行此 callable_

浅谈python 线程池threadpool之实现(python如何进行线程操作)

args_list:首先args_list是列表,列表元素类型为元组,元组中有两个元素item[0],item[1],item[0]为位置参 数,item[1]为字典类型关键字参数。列表中元组的个数,代表启动的任务个数,在使用的时候一般都为单个元组,即一个makerequest()创建一个任务。

callback:回调函数,在poll函数中调用(后面讲解此函数),callable_调用结束后,会就任务结果放入到任务结果队列中(self._resutls_queue),在poll函数中,当从self._resutls_queue队列中get某个结果后,会执行此callback(request,result),其中result是request任务返回的结果。

exc_callback:异常回调函数,在poll函数中,如果某个request对应有执行异常,那么会调用此异常回调。

创建完成任务后,返回创建的任务。

外层记录此任务,放入到任务列表中。

上面是创建任务的函数,下面讲解任务对象的结构:

上面self.callback 以及self.exc_callback,和self.callable_ ,args,dwds都已经讲解,就不在??铝恕?br />其中有一个任务的全局唯一标识,即self.requestID,通过获取自身内存首地址作为自己的唯一标识id(self)self.exception 初始化为False,如果执行self.callable()过程中出现异常,那么此变量会标设置为True。

至此,任务创建完毕,调用makeRequests()的上层记录有任务列表request_list.

4、任务的推送到线程池(putRequest)

上面小节中介绍了任务的创建,任务的个数可以成千上百,但是处理任务的线程数量只有我们在创建线程池的时候制定的线程数量来处理,指定的线程数量往往比任务的数量小得多,因此,每个线程必须处理多个任务。

本节介绍如何将创建的任务推送的线程池中,以让线程池由阻塞状态,获取任务,然后去处理任务。

任务的推送使用ThreadPool线程池类中的putRequest(self,request,block,timeout)来创建:

函数的主要作用就是将request任务,也就是上一小节中创建的任务,put到线程池的任务队列中(self._request_queue)。然后记录已经推送到线程池的任务,通过线程池的self.workReuests 字典来存储,结构为request.requestID:request。

至此,任务创建完成,并且已经将任务推送到线程池中。

5、线程处理任务

通过上一小节,任务已经推送到了线程中。在任务没有被推送到线程池中时,线程池中的线程都处在处在阻塞状态中,即在线程的self.run()函数中,一直处于一下状态:

现在任务已经推送到线程池中,那么get任务将会正常返回,会执行下面的步骤:

获取任务--->调用任务的处理函数callable()处理任务--->将任务request以及任务返回的结果压入到self.results_queue队列中------>如果任务处理函数异常,那么将任务异常标识设置为True,并将任务request以及任务异常压入到self.results_queue队列中---->再次返回获取任务

如果,在while循环过程中,外部设置了线程事件,即self._dismissed.isSet为True,那么意味着此线程将会结束处理任务,那么会将get到的任务返回的任务队列中,并且退出线程。

6、任务结束处理

上面小节中,介绍了线程池不断的get任务,并且不断的处理任务。那么每个任务结束之后我们该怎么处理呢,线程池提供了wait()以及poll()函数。

当我们把任务提交个线程池之后,我们会调用wait()来等待任务处理结束,结束后wait()将会返回,返回后我们可以进行下一步操作,例如重新创建任务,将任务继续推送到线程池中,或者结束线程池。结束线程池会在下一小节介绍,这一小节主要介绍wait()和poll()操作。

先来看看wait()操作:

等待任务处理结束,在所有任务处理完成之前一直处于block阶段,如果self.poll()返回异常NoResultsPending异常,然后wait返回,任务处理结束。

下面看看poll函数:

(1)首先,检测任务字典({request.requestID:request})是否为空,如果为空则抛出异常NoResultPending结束,否则到第2步;

(2)检测工作线程是否为空(如果某个线程的线程事件被设置,那么工作线程退出,并从self.workers中pop出),如果为空则抛出NoWorkerAvailable异常结束,否则进入第3步;

(3)从任务结果队列中get任务结果,如果抛出队列为空,那么break,返回,否则进入第4步;

(4)如果任务处理过程中出现异常,即设置了request.exception,并且设置了异常处理回调即request.exc_callback则执行异常回调,再回调中处理异常,返回后将任务从任务列表self.workRequests中移除,继续get任务,返回第1步。否则进入第5步;

(5)如果设置了任务结果回调即request.callback不为空,则执行任务结果回调即request.callbacl(request,result),并将任务从任务列表self.workRequests中移除,继续get任务,返回第1步。

(6)重复进行上面的步骤直到抛出异常,或者任务队列为空,则poll返会;

至此抛出NoResultPending wait操作接受此异常后,至此wait()返回。

7、工作线程的退出

threadpool提供的工作线程退出的的操作有dismissWorkers()和joinAllDismissedWorker()操作:

从dismissWorkers可看出,主要工作是从self.workers 工作线程中pop出指定的线程数量,并且设置此线程的线程事件,设置线程事件后,此线程self.run()函数,则会检测到此设置,并结束线程。

如果设置了在do_join,即设置了在此函数中join退出的线程,那么对退出的线程执行join操作。否则将pop出的线程放入到self.dismissedWorkers中,以等待joinAllDismissedWorkers操作去处理join线程。

8、总结

到此为止,threadpool线程池中所有的操作介绍完毕,其实现也做了具体的介绍。从上面可看出,线程池并没有那么复杂,只有几个简单的操作,主要是了解整个处理流程即可。

标签: python如何进行线程操作

本文链接地址:https://www.jiuchutong.com/biancheng/372285.html 转载请保留说明!

上一篇:python线程池(threadpool)模块使用笔记详解(python线程池最大数量)

下一篇:Python实现读取json文件到excel表(python读取grib)

  • 税盘服务费抵扣增值税
  • 应税劳务销售额怎么算
  • 计提附加税的会计分录
  • 村委会什么会计准则
  • 住房公积金的会计处理
  • 服装租赁开票税目
  • 银行贷款利息支出汇算清缴需要调整吗
  • 分公司收到的总部拨款会计如何处理
  • 小微企业所得税优惠政策最新2023
  • 实缴年月正常补收
  • 出口用的增值税税率
  • 资产处置税务风险有哪些
  • 企业被列为风险纳税人税控开票会显示什么
  • 环卫公司增值税税率
  • 不锈钢材料开票
  • 土地租赁摊销年限按合同年限
  • 计提代扣代缴个税
  • 生产成本包括哪三类
  • 实习生短期意外保险能企业所得税前扣除吗?
  • php类和对象
  • 游泳耳朵进水怎么办
  • php递归实现1到100的和
  • 车辆购买保险发票未回怎么做会计分录
  • 房产税的计税依据的形式包括
  • 增值税专用发票有几联?
  • 免征个人所得税33种情形
  • 大学网页制作作业dw
  • thinkphp框架搭建
  • php怎么样
  • yolov5目标检测流程图
  • 资产负债表应收账款怎么计算
  • ICLR‘23 UnderReview | LightGCL: 简单而有效的图对比学习推荐系统
  • win+e快捷键在哪
  • 如何验证工具坐标系
  • 购买二手设备
  • 利润分配的基本原则是( )
  • 研发支出的主要内容
  • 发现以前年度的账错误
  • 财务咨询公司能出尽调报告吗
  • discuzcms
  • 暂缓收缴工会经费申请
  • 固定资产一次性扣除政策
  • 小企业外币业务包括
  • 公司临时工的车可以买吗
  • 4s店销售返利
  • 资产减值损失什么科目
  • 免费样品要交税么
  • 分支机构企业所得税计提和缴纳数额不一致怎么办?
  • 原始凭证和记账凭证的区别
  • 工程施工借款如何做会计分录
  • 咨询服务费开票税率
  • 公司购药品入什么科目
  • 收入成本的确认条件
  • 商业折扣入账
  • 短期薪酬主要包括
  • 如何彻底关闭mac电脑app
  • 更新win8
  • win8 设置
  • linux系统管理命令有哪些
  • win10天气系统怎么弄掉
  • 同一个文件夹中的文件可以同名吗
  • dos 判断
  • python中print函数的end参数
  • js如何创建类
  • js闭包作用问题解决应用
  • jquery mobile
  • javascript初级教程
  • java script教程
  • 用javascript
  • 河南城乡居民医疗保险缴费怎么交
  • 国际货运怎么代理
  • 浙江国税局电子税务局
  • 湖北税务查询电话
  • 北京市国家税务网
  • 包头市到青山区多少公里
  • 河南助学金申请表
  • 个体户是否需要缴纳企业所得税
  • 硅矿的合法开采方法
  • 国税局和地税局分别征收什么税种
  • 2022年太原医保缴费时间
  • 免责声明:网站部分图片文字素材来源于网络,如有侵权,请及时告知,我们会第一时间删除,谢谢! 邮箱:opceo@qq.com

    鄂ICP备2023003026号

    网站地图: 企业信息 工商信息 财税知识 网络常识 编程技术

    友情链接: 武汉网站建设