登录
首页精彩阅读Python多进程分块读取超大文件的方法
Python多进程分块读取超大文件的方法
2018-02-18
收藏

Python多进程分块读取超大文件的方法

本文实例讲述了Python多进程分块读取超大文件的方法。分享给大家供大家参考,具体如下:

读取超大的文本文件,使用多进程分块读取,将每一块单独输出成文件

# -*- coding: GBK -*-
importurlparse
importdatetime
importos
frommultiprocessingimportProcess,Queue,Array,RLock
"""
多进程分块读取文件
"""
WORKERS=4
BLOCKSIZE=100000000
FILE_SIZE=0
defgetFilesize(file):
  """
    获取要读取文件的大小
  """
  globalFILE_SIZE
  fstream=open(file,'r')
  fstream.seek(0,os.SEEK_END)
  FILE_SIZE=fstream.tell()
  fstream.close()
defprocess_found(pid,array,file,rlock):
  globalFILE_SIZE
  globalJOB
  globalPREFIX
  """
    进程处理
    Args:
      pid:进程编号
      array:进程间共享队列,用于标记各进程所读的文件块结束位置
      file:所读文件名称
    各个进程先从array中获取当前最大的值为起始位置startpossition
    结束的位置endpossition (startpossition+BLOCKSIZE) if (startpossition+BLOCKSIZE)<FILE_SIZE else FILE_SIZE
    if startpossition==FILE_SIZE则进程结束
    if startpossition==0则从0开始读取
    if startpossition!=0为防止行被block截断的情况,先读一行不处理,从下一行开始正式处理
    if 当前位置 <=endpossition 就readline
    否则越过边界,就从新查找array中的最大值
  """
  fstream=open(file,'r')
  whileTrue:
    rlock.acquire()
    print'pid%s'%pid,','.join([str(v)forvinarray])
    startpossition=max(array)     
    endpossition=array[pid]=(startpossition+BLOCKSIZE)if(startpossition+BLOCKSIZE)<FILE_SIZEelseFILE_SIZE
    rlock.release()
    ifstartpossition==FILE_SIZE:#end of the file
      print'pid%s end'%(pid)
      break
    elifstartpossition !=0:
      fstream.seek(startpossition)
      fstream.readline()
    pos=ss=fstream.tell()
    ostream=open('/data/download/tmp_pid'+str(pid)+'_jobs'+str(endpossition),'w')
    whilepos<endpossition:
      #处理line
      line=fstream.readline()
      ostream.write(line)
      pos=fstream.tell()
    print'pid:%s,startposition:%s,endposition:%s,pos:%s'%(pid,ss,pos,pos)
    ostream.flush()
    ostream.close()
    ee=fstream.tell()
  fstream.close()
defmain():
  globalFILE_SIZE
  printdatetime.datetime.now().strftime("%Y/%d/%m %H:%M:%S")
  file="/data/pds/download/scmcc_log/tmp_format_2011004.log"
  getFilesize(file)
  printFILE_SIZE
  rlock=RLock()
  array=Array('l',WORKERS,lock=rlock)
  threads=[]
  foriinrange(WORKERS):
    p=Process(target=process_found, args=[i,array,file,rlock])
    threads.append(p)
  foriinrange(WORKERS):
    threads[i].start()
  foriinrange(WORKERS):
    threads[i].join()
  printdatetime.datetime.now().strftime("%Y/%d/%m %H:%M:%S")
if__name__=='__main__':
  main()

数据分析咨询请扫描二维码

客服在线
立即咨询