python 分布式计算

python中的分布式计算实现相当方便。

实现基于multiprocessing.managers中的BaseManager类。

话不多说,贴代码,注释写的很详细了。

#!/usr/bin/env python3
# -*- coding : utf-8 -*-
# master.py for windows

import time,queue
from multiprocessing.managers import BaseManager
from multiprocessing import freeze_support

#任务个数
task_number = 10;

#定义收发队列
task_queue = queue.Queue(task_number);
result_queue = queue.Queue(task_number);

def gettask():
    return task_queue;
def getresult():
     return result_queue;

def test():
    #windows下绑定调用接口不能使用lambda,所以只能先定义函数再绑定
    BaseManager.register('get_task',callable = gettask);
    BaseManager.register('get_result',callable = getresult);
    #绑定端口并设置验证码,windows下需要填写ip地址,linux下不填默认为本地
    manager = BaseManager(address = ('127.0.0.1',5002),authkey = b'123');
    #启动
    manager.start();
    try:
        #通过网络获取任务队列和结果队列
        task = manager.get_task();
        result = manager.get_result();

        #添加任务
        for i in range(task_number):
            print('Put task %d...' % i)
            task.put(i);

        #每秒检测一次是否所有任务都被执行完
        while not result.full():
            time.sleep(1);

        for i in range(result.qsize()):
            ans = result.get();
            print('task %d is finish , runtime:%d s' % ans);
    
    except:
        print('Manager error');
    finally:
        #一定要关闭,否则会爆管道未关闭的错误
        manager.shutdown();
        
if __name__ == '__main__':
    #windows下多进程可能会炸,添加这句可以缓解
    freeze_support()
    test();
#!/usr/bin/env python3
# -*- coding : utf-8 -*-
# task.py for windows

import time, sys, queue, random
from multiprocessing.managers import BaseManager

BaseManager.register('get_task')
BaseManager.register('get_result')

conn = BaseManager(address = ('127.0.0.1',5002), authkey = b'123');

try:
    conn.connect();
except:
    print('连接失败');
    sys.exit();

task = conn.get_task();
result = conn.get_result();

while not task.empty():
    n = task.get(timeout = 1);
    print('run task %d' % n);
    sleeptime = random.randint(0,3);
    time.sleep(sleeptime);
    rt = (n, sleeptime);
    result.put(rt);

if __name__ == '__main__':
    pass;
    


看一下效果:

先打开master.py,再在同一网络中的机器中打开task.py

=================== RESTART: E:\code\python\test\master.py ===================
Put task 0...
Put task 1...
Put task 2...
Put task 3...
Put task 4...
Put task 5...
Put task 6...
Put task 7...
Put task 8...
Put task 9...
task 0 is finish , runtime:1 s
task 2 is finish , runtime:1 s
task 4 is finish , runtime:0 s
task 1 is finish , runtime:2 s
task 5 is finish , runtime:1 s
task 7 is finish , runtime:0 s
task 3 is finish , runtime:3 s
task 6 is finish , runtime:3 s
task 8 is finish , runtime:3 s
task 9 is finish , runtime:3 s
>>>

实际上这个分布式计算的原理就是通过BaseManager来使多个进程共享某个资源,在样例中就是共享task_queue和result_queue,使多个进程共同处理任务。

当然实现多机共享的时候也遇到了问题:

首先是个人对于IP概念认识不足,导致不知该填什么IP地址才对。

如果是局域网内共同计算,填内网IP即可,如果是公网,由于电信会drop input流量,所以比较麻烦。

第二是内网也会有ping不通的情况发生。

解决方案:http://jingyan.baidu.com/article/a65957f4f557cb24e67f9ba6.html

感谢ntzyz的支持。

另外,Life Is Strange炒鸡好玩,吃我安利。

《python 分布式计算》上有7条评论

    1. 这个是给master的回调。
      因为这次的例子中是使用了sleep来模拟程序运行,所以需要告诉master这次的执行sleep了多久,这样master才能知道某个任务执行了多久,也就是最终输出的 task 8 is finish , runtime:3 s。

发表评论

电子邮件地址不会被公开。

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据