博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
网络编程之进程3
阅读量:4628 次
发布时间:2019-06-09

本文共 4840 字,大约阅读时间需要 16 分钟。

 什么叫做水平扩展:增加计算机的数量,并没有提高计算机的性能

 什么叫开源:开放源代码

 什么叫做虚拟化:同时跑多个系统

 什么是分布式和集中式:集中式就是在一台机器上执行任务;分布式就是将任务分散到多台机器上。

一 JoinableQueue模块

 JoinableQueue模块介绍:比Queue多了两个函数,一个是task_done,另一个是join,都是专用于全球进行编程的,大多数用于生产者和消费者之间的。

  task_done:是用在get后面的 ,告诉os已经处理完了内容。

  join:是说Queue里面的生产数据全部处理完了。

使用方法如下:

# import multiprocessing# import time# import random# def sheng(name,q,wuping):#     for i in range(5):#         time.sleep(random.random())#         ret='%s%s'%(wuping,i)#         q.put(ret)#         print('厨师%s创建了%s'%(name,ret))#     q.join()# def xiao(name,q):#     while True:#         time.sleep(random.random())#         ret=q.get()#         print('%s吃了%s'%(name,ret))#         q.task_done()# if __name__=='__main__':#     q=multiprocessing.JoinableQueue()#     s1=multiprocessing.Process(target=sheng,args=('egon1',q,'包子'))#     s2=multiprocessing.Process(target=sheng,args=('egon2',q,'rou'))#     s3=multiprocessing.Process(target=sheng,args=('egon3',q,'骨头'))#     x1=multiprocessing.Process(target=xiao,args=('alex1',q))#     x2=multiprocessing.Process(target=xiao,args=('alex2',q))#     x1.daemon=True#     x2.daemon=True#     s1.start()#     s2.start()#     s3.start()#     x1.start()#     x2.start()#     s1.join()#     s2.join()#     s3.join() 

二 Manager:共享内存空间

 dict:共享的以恶搞字典,能够被多个共享

如下1:

# import multiprocessing# def walk(d):#     d['count']-=1# if __name__=='__main__':#     m=multiprocessing.Manager()#     d=m.dict({'count':100})#     for i in range(100):#         w=multiprocessing.Process(target=walk,args=(d,))#         w.start()#     w.join()#     print(d)

 如下2:

# import multiprocessing# def walk(d,loak):#     with loak:#         d['count']-=1# if __name__=='__main__':#     m=multiprocessing.Manager()#     d=m.dict({'count':100})#     lock=multiprocessing.Lock()#     li=[]#     for i in range(100):#         w=multiprocessing.Process(target=walk,args=(d,lock))#         li.append(w)#         w.start()#     for j in li:#         j.join()#     print(j)#     print(d)

 三 进程池

 什么是进程池:创建一定数量的进程个数

 同步和异步:提交任务的两种方式。

 Pool:创建进程池和控制进程的数目,默认的个数是根据CPU的核数

 apply:传入两个参数,第一个是指定任务。向进程池提交一个任务,实现了串行和同步调用。结束任务后,立马会拿到结果。

  开启的进程数目有几个,就会有几个pid。

 什么是同步调用:提交一个任务,等到任务结束后才能执行下一个任务。

# import multiprocessing# import time# import random# import os# def walk(n):#     print('%s is walking'%os.getpid())#     time.sleep(random.random())#     return n## if __name__=='__main__':#     p=multiprocessing.Pool(4)#     for i in range(10):#         q=p.apply(walk,args=(i,))#         print(q)

 apply_async:向进程池提交任务,提交完任务后就不管了,只管提交任务,不能直接执行任务。实现了一个并发和恶异步调用

  执行方法:先close:关闭任务,好让任务结束

       然后join:等待一个进程池不在提交任务,并且任务结束和计算任务个数

       最后get:获取返回值

 什么是异步调用:提交完一个任务过后不会在原地等待,而是继续提交下一个任务。等待所有的任务结束后在用get获取任务

如下:

# import multiprocessing# import time# import random# import os# def walk(n):#     print('%s is walking'%os.getpid())#     time.sleep(random.random())#     return n## if __name__=='__main__':#     p=multiprocessing.Pool(4)#     li=[]#     for i in range(10):#         q=p.apply_async(walk,args=(i,))#         li.append(q)#     p.close()#     p.join()#     for i in li:#         print(i.get())#

同步和异步详情:https://www.zhihu.com/question/19732473

 进程池没有任务了,进程就会被回收调。

如下:

from  multiprocessing import Poolimport os,time,randomdef work(n):    print('%s is working' %os.getpid())if __name__ == '__main__':    p=Pool(4)    for i in range(50):        # res=p.apply(work,args=(i,))        obj=p.apply_async(work,args=(i,))    p.close()    p.join()

 如果在执行和提交任务的时间非常快,有时会提交任务后,第一个任务没有执行结束,就会开启一个新的进程执行这个任务;有时会在提交任务后,某进程刚好执行完,并且没有被回收掉,那么这次提交的任务就会被这个进程执行。

 为什么要用进程池:为了实现并发,然后在并发的基础上对进程的数目进行一个限制。

四  回调函数

 什么是回调函数:通过一个函数内存调用的函数,如果将这个函数的地址当作参数传给另一个函数,当这个函数的地址用来调用其所只想的函数是,这个所指向的函数就是回调函数。详情访问:http://www.cnblogs.com/hainan-zhang/p/6222552.html

  callback:后面加上的是回调函数

  回调函数的进程其实就是主进程。

 用回调函数实现一个网络爬虫;需要用到requests模块

  get:获取网址

  status_code:返回状态码

  text:查看下载网址的内容

 

from multiprocessing import Pool,Processimport requestsimport osimport time,randomdef get(url):    print('%s GET %s' %(os.getpid(),url))    response=requests.get(url)    time.sleep(random.randint(1,3))    if response.status_code == 200:        print('%s DONE %s' % (os.getpid(), url))        return {'url':url,'text':response.text}def parse(dic):    print('%s PARSE %s' %(os.getpid(),dic['url']))    time.sleep(1)    res='%s:%s\n' %(dic['url'],len(dic['text']))    with open('db.txt','a') as f:        f.write(res)if __name__ == '__main__':    urls=[        'https://www.baidu.com',        'https://www.python.org',        'https://www.openstack.org',        'https://help.github.com/',        'http://www.sina.com.cn/'    ]    p=Pool(2)    start_time=time.time()    objs=[]    for url in urls:        obj=p.apply_async(get,args=(url,),callback=parse) #主进程负责干回调函数的活        objs.append(obj)    p.close()    p.join()    print('主',(time.time()-start_time))

 

  

 

转载于:https://www.cnblogs.com/fangjie0410/p/7657150.html

你可能感兴趣的文章
go1.8之安装配置
查看>>
JavaScript闭包
查看>>
App调用safar
查看>>
IOC和DI(转)
查看>>
java web 开发应用 ----过滤器
查看>>
背单词:3年,34150分钟!
查看>>
安卓版文字扫描识别软件
查看>>
Process's address space and heap
查看>>
LVS 介绍 原理
查看>>
欧拉函数
查看>>
工厂模式
查看>>
Ajax 的优势和不足
查看>>
vi 环境,跳转函数定义
查看>>
mac os x 查看网络端口情况
查看>>
springMVC入门截图
查看>>
jquery学习(3)--高级选择器
查看>>
C#获取本机IP
查看>>
从 StarCraft 2 Installer.exe 中提取种子文件
查看>>
关系连接查询
查看>>
[转]买土豆的故事
查看>>