位置: 编程技术 - 正文

浅谈python 线程池threadpool之实现(python如何进行线程操作)

编辑:rootadmin

推荐整理分享浅谈python 线程池threadpool之实现(python如何进行线程操作),希望有所帮助,仅作参考,欢迎阅读内容。

文章相关热门搜索词:python线程原理,python线程threading,python线程threading,python线程threading,python的线程,python线程daemon,python线程daemon,python中线程可以用在哪些方面,内容如对您有帮助,希望把文章链接给更多的朋友!

首先介绍一下自己使用到的名词:

工作线程(worker):创建线程池时,按照指定的线程数量,创建工作线程,等待从任务队列中get任务;

任务(requests):即工作线程处理的任务,任务可能成千上万个,但是工作线程只有少数。任务通过 makeRequests来创建

任务队列(request_queue):存放任务的队列,使用了queue实现的。工作线程从任务队列中get任务进行处理;

任务处理函数(callable):工作线程get到任务后,通过调用任务的任务处理函数即(request.callable_)具体 的 处理任务,并返回处理结果;

任务结果队列(result_queue):任务处理完成后,将返回的处理结果,放入到任务结果队列中(包括异常);

任务异常处理函数或回调(exc_callback):从任务结果队列中get结果,如果设置了异常,则需要调用异常回调处理异常;

任务结果回调(callback):从任务结果队列中get结果,对result进行进一步处理;

上一节介绍了线程池threadpool的安装和使用,本节将主要介绍线程池工作的主要流程:

(1)线程池的创建(2)工作线程的启动(3)任务的创建(4)任务的推送到线程池(5)线程处理任务(6)任务结束处理(7)工作线程的退出

下面是threadpool的定义:

1、线程池的创建(ThreadPool(args))

task_pool=threadpool.ThreadPool(num_works)

其中,初始化参数为:

num_works:线程池中线程个数

q_size :任务队列的长度限制,如果限制了队列的长度,那么当调用putRequest()添加任务时,到达限制长度后,那么putRequest将会不断尝试添加任务,除非在putRequest()设置了超时或者阻塞;

esq_size: 任务结果队列的长度;

pool_timeout:工作线程如果从request队列中,读取不到request,则会阻塞pool_timeout,如果仍没request则直接返回;

其中,成员变量:

self._requests_queue: 任务队列,通过threadpool.makeReuests(args)创建的任务都会放到此队列中;self._results_queue: 字典,任务对应的任务执行 self.workers: 工作线程list,通过self.createWorkers()函数内创建的工作线程会放到此工作线程list中;self.dismisssedWorkers: 被设置线程事件,并且没有被join的工作线程self.workRequests: 字典,记录推送到线程池的任务,结构为requestID:request。其中requestID是任务的唯一标识,会在后面作介绍。

2、工作线程的启动(self.createWorks(args))

函数定义:

其中WorkerThread()继承自thread,即python内置的线程类,将创建的WorkerThread对象放入到self.workers队列中。下面看一下WorkerThread类的定义:

从self.__init__(args)可看出:

初始化中变量:

self._request_queue:任务队列;self._resutls_queuqe,:任务结果队列 ;self._pool_timeout:run函数中从任务队列中get任务时的超时时间,如果超时则继续while(true);self._dismissed:线程事件,如果set线程事件则run会执行break,直接退出工作线程;

最后调用self.start()启动线程,run函数定义见上面:

从上面run函数while执行步骤如下:

(1)如果设置了self._dismissed则退出工作线程,否则执行第2步(2)尝从任务 队列self._requests_queue 中get任务,如果队列为空,则continue 执行下一次while循环,否则执行第3步(3)检测此工作线程事件是否被set,如果被设置,意味着要结束此工作线程,那么就需要将取到的任务返回到任务队列中,并且退出线程。如果线程事件没有被设置,那么执行任务处理函数request.callable,并将返回的result,压入到任务结果队列中,如果任务处理函数出现异常,则将异常压入到队列中。最后跳转第4步(4)继续循环,返回1

到此工作线程创建完毕,根据设置的线程池线程数量,创建工作线程,工作线程从任务队列中get任务,进行任务处理,并将任务处理结果压入到任务结果队列中。

3、任务的创建(makeRequests)

任务的创建函数为threadpool.makeRequests(callable_,args_list,callback=None):

其中创建任务的函数参数具体意义为下:

callable_:注册的任务处理函数,当任务被放到任务队列后,工作线程中获取到该任务的线程,会执行此 callable_

浅谈python 线程池threadpool之实现(python如何进行线程操作)

args_list:首先args_list是列表,列表元素类型为元组,元组中有两个元素item[0],item[1],item[0]为位置参 数,item[1]为字典类型关键字参数。列表中元组的个数,代表启动的任务个数,在使用的时候一般都为单个元组,即一个makerequest()创建一个任务。

callback:回调函数,在poll函数中调用(后面讲解此函数),callable_调用结束后,会就任务结果放入到任务结果队列中(self._resutls_queue),在poll函数中,当从self._resutls_queue队列中get某个结果后,会执行此callback(request,result),其中result是request任务返回的结果。

exc_callback:异常回调函数,在poll函数中,如果某个request对应有执行异常,那么会调用此异常回调。

创建完成任务后,返回创建的任务。

外层记录此任务,放入到任务列表中。

上面是创建任务的函数,下面讲解任务对象的结构:

上面self.callback 以及self.exc_callback,和self.callable_ ,args,dwds都已经讲解,就不在??铝恕?br />其中有一个任务的全局唯一标识,即self.requestID,通过获取自身内存首地址作为自己的唯一标识id(self)self.exception 初始化为False,如果执行self.callable()过程中出现异常,那么此变量会标设置为True。

至此,任务创建完毕,调用makeRequests()的上层记录有任务列表request_list.

4、任务的推送到线程池(putRequest)

上面小节中介绍了任务的创建,任务的个数可以成千上百,但是处理任务的线程数量只有我们在创建线程池的时候制定的线程数量来处理,指定的线程数量往往比任务的数量小得多,因此,每个线程必须处理多个任务。

本节介绍如何将创建的任务推送的线程池中,以让线程池由阻塞状态,获取任务,然后去处理任务。

任务的推送使用ThreadPool线程池类中的putRequest(self,request,block,timeout)来创建:

函数的主要作用就是将request任务,也就是上一小节中创建的任务,put到线程池的任务队列中(self._request_queue)。然后记录已经推送到线程池的任务,通过线程池的self.workReuests 字典来存储,结构为request.requestID:request。

至此,任务创建完成,并且已经将任务推送到线程池中。

5、线程处理任务

通过上一小节,任务已经推送到了线程中。在任务没有被推送到线程池中时,线程池中的线程都处在处在阻塞状态中,即在线程的self.run()函数中,一直处于一下状态:

现在任务已经推送到线程池中,那么get任务将会正常返回,会执行下面的步骤:

获取任务--->调用任务的处理函数callable()处理任务--->将任务request以及任务返回的结果压入到self.results_queue队列中------>如果任务处理函数异常,那么将任务异常标识设置为True,并将任务request以及任务异常压入到self.results_queue队列中---->再次返回获取任务

如果,在while循环过程中,外部设置了线程事件,即self._dismissed.isSet为True,那么意味着此线程将会结束处理任务,那么会将get到的任务返回的任务队列中,并且退出线程。

6、任务结束处理

上面小节中,介绍了线程池不断的get任务,并且不断的处理任务。那么每个任务结束之后我们该怎么处理呢,线程池提供了wait()以及poll()函数。

当我们把任务提交个线程池之后,我们会调用wait()来等待任务处理结束,结束后wait()将会返回,返回后我们可以进行下一步操作,例如重新创建任务,将任务继续推送到线程池中,或者结束线程池。结束线程池会在下一小节介绍,这一小节主要介绍wait()和poll()操作。

先来看看wait()操作:

等待任务处理结束,在所有任务处理完成之前一直处于block阶段,如果self.poll()返回异常NoResultsPending异常,然后wait返回,任务处理结束。

下面看看poll函数:

(1)首先,检测任务字典({request.requestID:request})是否为空,如果为空则抛出异常NoResultPending结束,否则到第2步;

(2)检测工作线程是否为空(如果某个线程的线程事件被设置,那么工作线程退出,并从self.workers中pop出),如果为空则抛出NoWorkerAvailable异常结束,否则进入第3步;

(3)从任务结果队列中get任务结果,如果抛出队列为空,那么break,返回,否则进入第4步;

(4)如果任务处理过程中出现异常,即设置了request.exception,并且设置了异常处理回调即request.exc_callback则执行异常回调,再回调中处理异常,返回后将任务从任务列表self.workRequests中移除,继续get任务,返回第1步。否则进入第5步;

(5)如果设置了任务结果回调即request.callback不为空,则执行任务结果回调即request.callbacl(request,result),并将任务从任务列表self.workRequests中移除,继续get任务,返回第1步。

(6)重复进行上面的步骤直到抛出异常,或者任务队列为空,则poll返会;

至此抛出NoResultPending wait操作接受此异常后,至此wait()返回。

7、工作线程的退出

threadpool提供的工作线程退出的的操作有dismissWorkers()和joinAllDismissedWorker()操作:

从dismissWorkers可看出,主要工作是从self.workers 工作线程中pop出指定的线程数量,并且设置此线程的线程事件,设置线程事件后,此线程self.run()函数,则会检测到此设置,并结束线程。

如果设置了在do_join,即设置了在此函数中join退出的线程,那么对退出的线程执行join操作。否则将pop出的线程放入到self.dismissedWorkers中,以等待joinAllDismissedWorkers操作去处理join线程。

8、总结

到此为止,threadpool线程池中所有的操作介绍完毕,其实现也做了具体的介绍。从上面可看出,线程池并没有那么复杂,只有几个简单的操作,主要是了解整个处理流程即可。

标签: python如何进行线程操作

本文链接地址:https://www.jiuchutong.com/biancheng/372285.html 转载请保留说明!

上一篇:python线程池(threadpool)模块使用笔记详解(python线程池最大数量)

下一篇:Python实现读取json文件到excel表(python读取grib)

  • 销售无形资产增值税纳税义务发生时间
  • 预借差旅费附什么原始凭证
  • 支付招标代理费需要哪些资料
  • 其他债权投资减值
  • 不含税劳务报酬怎么交税的
  • 应收账款已收回但是账面还有余额怎么处理
  • 公司给客户退款会计分录
  • 购入办公楼的入账价值
  • 营改增后租金收入交什么税
  • 即征即退增值税需要缴纳所得税吗
  • 税控盘的购买流程
  • 附加税记入什么科目
  • 应计入增值税一般纳税人认定标准的“年应税销售额”
  • 开的技术服务费可以做研发支出吗?
  • 发票进项抵扣有什么用
  • 一般纳税人专用发票怎么做账
  • 剑灵怎么双开2023
  • 政府收储土地补偿款账务处理
  • 会计如果做假账
  • 坏账损失的确定原则
  • 如何查询苹果手机充电次数
  • 会计分录的书写规范
  • dotnetfx2.0
  • PHP:imagepsfreefont()的用法_GD库图像处理函数
  • 腾达路由器桥接的方法
  • postman汉化包
  • Bàu Cá Cái的红树林,越南广义 (© Robert Harding World Imagery/Offset)
  • php登录注册页面完整代码
  • web期末大作业源代码
  • thinkphp where or
  • 摊销期在一年以内的费用
  • php日期计算器
  • 前端发起请求怎么设置
  • 残保金计税基数是什么
  • 支付应付账款的文章怎么写的
  • mac安装mysql8.0
  • 财务软件属于什么设备
  • 代开发票可能遇到的问题是?
  • 劳务报酬所得税可以退税吗
  • 广告公司的广告制作费计入什么科目
  • 汽车运输企业交什么税
  • 账务核对包括哪些内容
  • 上月的普票下月可以记账么
  • 工资发多了可以追回吗
  • 土地增值税的预缴
  • 小规模纳税人缴税的分录
  • 稀释股权违法吗
  • 购置一项设备,有两种可供选择
  • 总公司费用能分摊到分公司吗
  • 合资注册公司应该注意什么
  • 应收账款周转率范围多少合适
  • 建筑工程拨款申请模板
  • 公司刚成立如何做账
  • 监控系统文件
  • windows iis配置
  • 苹果mac 最新系统
  • 宏基笔记本电脑质量怎么样
  • thinkpadx230i5
  • hkcmd.exe损坏文件
  • 电脑上view是什么意思
  • win7怎么不显示桌面图标
  • win8删除所有内容并重新安装windows
  • centos安装v2
  • win10系统开机后任务栏无响应怎么解决
  • 关于linux操作系统的描述中
  • win10mobile最新版本
  • 比较好的jquery教程
  • 使用Jasmine和Karma对AngularJS页面程序进行测试
  • 激发灵感的三种方式是什么
  • JS实现的ajax和同源策略(实例讲解)
  • 在下次启动计算机时会自动关机。重起后恢复正常(“绿色版”)
  • jquery遍历li
  • nodejs来一打c++扩展
  • Jquery中巧用Ajax的beforeSend方法
  • jquery 弹出页面
  • 基于Android的游戏陪玩APP设计
  • 国家税务总局全国增值税发票官网
  • 税务局副局长分管业务
  • 河南省地税网上营业厅
  • 税法上接受捐赠是什么
  • 免责声明:网站部分图片文字素材来源于网络,如有侵权,请及时告知,我们会第一时间删除,谢谢! 邮箱:opceo@qq.com

    鄂ICP备2023003026号

    网站地图: 企业信息 工商信息 财税知识 网络常识 编程技术

    友情链接: 武汉网站建设