Python的伪多线程

TODO

相关讨论:

相关的模块

Queue

Queue模块主要有三种队列实现

Queue是线程同步的

threading

PyMOTW Manege concurrent threads

在看上面链接的第一个代码时,遇到了一个printsys.stdout.write的问题

具体见 TODO

最简单的threading配合Queue的Example

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# wutq@2013-02-18
# Python参考手册 P363

import threading
from Queue import Queue

class WorkThread(threading.Thread):
    def __init__(self, *args, **kwargs):
        threading.Thread.__init__(self, *args, **kwargs)
        self.input_queue = Queue()

    def send(self, item):
        self.input_queue.put(item)

    def close(self):
        #在队列上设置一个标志,使线程在处理完毕后关闭
        self.input_queue.put(None)
        self.input_queue.join()

    def run(self):
        while True:
            item = self.input_queue.get()
            if item is None:
                break
            print item
            self.input_queue.task_done()
        self.input_queue.task_done()
        return

w = WorkThread()
w.start()
w.send("hello")
w.send("world")
w.close()

在StackOverflow上也找到一篇帖子: python multithreading for dummies
里面也提到了

CPython can use threads only for *I\O waits* due to *GIL*. 
If you want to benefit from multiple cores for CPU-bound tasks, use *mutliprocessing*

线程池

线程池概念

所谓的线程池就是完成一种任务的一组线程
一般情况下是首先初始化一定数量的工作线程,并把任务提交给空闲的线程
当线程都处于忙的状态的时候,则重新生成新的工作线程
当空闲线程较多的时候则停止一部分线程
这些要看具体的调度算法。
但是线程不能滥用,因为并不是线程越多就会带来更好的性能

参考:

一个不错的Example

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import Queue
import threading
import time

class WorkManager(object):
    def __init__(self, work_num=1000,thread_num=2):
        self.work_queue = Queue.Queue()
        self.threads = []
        self.__init_work_queue(work_num)
        self.__init_thread_pool(thread_num)

    def __init_work_queue(self, jobs_num):
        """初始化工作队列"""
        for i in range(jobs_num):
            self.add_job(do_job, i)

    def __init_thread_pool(self,thread_num):
        """初始化线程"""
        for i in range(thread_num):
            self.threads.append(Work(self.work_queue))

    def add_job(self, func, *args):
        """添加一项工作入队"""
        #任务入队,Queue内部实现了同步机制
        self.work_queue.put((func, list(args)))

    def check_queue(self):
        """检查剩余队列任务"""
        return self.work_queue.qsize()

    def wait_allcomplete(self):
        """等待所有线程运行完毕"""
        for item in self.threads:
            if item.isAlive():item.join()

class Work(threading.Thread):
    def __init__(self, work_queue):
        threading.Thread.__init__(self)
        self.work_queue = work_queue
        self.start()

    def run(self):
        while True:
            try:
                #任务异步出队,Queue内部实现了同步机制
                do, args = self.work_queue.get(block=False)
                do(args)
                self.work_queue.task_done()#通知系统任务完成
            except Exception,e:
                print str(e)
                break

#具体要做的任务
def do_job(args):
    #print args
    time.sleep(0.1)#模拟处理时间
    #print threading.current_thread(), list(args)

if __name__ == '__main__':
    for thread_num in (10, 20):
        print "Thread Num: %d" % thread_num
        start = time.time()
        work_manager =  WorkManager(500, thread_num)
        work_manager.wait_allcomplete()
        end = time.time()
        print "cost all time: %s" % (end-start)

来源: http://www.open-open.com/home/space-5679-do-blog-id-3247.html

略做了一点修改

网上关于线程池不错的文章:
python线程池
python线程池的实现
* python threadpool第三方模块