defthread_function(age): for i in age: i += 1 q.put( { 'age': i } ) defrun_threading(target, args, count): """ :param target: 目标函数 :param args: 函数参数 :param count: 线程数量 """ ts = [] for i inrange(count): t = Thread(target=target, args=args) ts.append(t) [i.start() for i in ts] [i.join() for i in ts]
defrun_thread_pool_sub(target, args, max_work_count=3): with ThreadPoolExecutor(max_workers=max_work_count) as t: res = [t.submit(target, i) for i in args] return res if __name__ == '__main__': ages = [1, 3, 4] res = run_thread_pool_sub(thread_function, ages) for future in as_completed(res): data = future.result() print data
defsubmit(self, fn, *args, **kwargs): withself._shutdown_lock: ifself._shutdown: raise RuntimeError('cannot schedule new futures after shutdown') f = _base.Future() w = _WorkItem(f, fn, args, kwargs) self._work_queue.put(w) self._adjust_thread_count() return f submit.__doc__ = _base.Executor.submit.__doc__
注意的是,它就是执行一个单独的函数,并且返回的是future对象(具体请看官方文档)。
方法三:使用ThreadPoolExecutor的map:
defthread_function(age): for i in age: yield i+1
defrun_thread_pool(target, args, max_work_count=6): with ThreadPoolExecutor(max_workers=max_work_count) as t: res = t.map(target, args) return res
if __name__ == '__main__': ages = [1, 3, 4] # 2222 res = run_thread_pool(target=thread_function, args=(ages,)) for j in res: for i in j: print(i)
输出:
2 4 5
Process finished with exit code 0
这里看出map的输出是有序的
这里看下map的源码:
defmap(self, fn, *iterables, **kwargs): """Returns an iterator equivalent to map(fn, iter). Args: fn: A callable that will take as many arguments as there are passed iterables. timeout: The maximum number of seconds to wait. If None, then there is no limit on the wait time. Returns: An iterator equivalent to: map(func, *iterables) but the calls may be evaluated out-of-order. Raises: TimeoutError: If the entire result iterator could not be generated before the given timeout. Exception: If fn(*args) raises for any values. """ timeout = kwargs.get('timeout') if timeout isnotNone: end_time = timeout + time.time() fs = [self.submit(fn, *args) for args in itertools.izip(*iterables)] # Yield must be hidden in closure so that the futures are submitted # before the first iterator value is required. defresult_iterator(): try: # reverse to keep finishing order fs.reverse() while fs: # Careful not to keep a reference to the popped future if timeout isNone: yield fs.pop().result() else: yield fs.pop().result(end_time - time.time()) finally: for future in fs: future.cancel() return result_iterator()