登录
首页精彩阅读Python常用技巧
Python常用技巧
2016-05-19
收藏

Python常用技巧

1. 获得当前文件路径

import os
script_dir = os.path.abspath(os.path.dirname(__file__))

在hadoop中获得输入文件的地址:
os.path.dirname(os.environ["map_input_file"])
os.path.basename(os.path.dirname(os.environ["map_input_file"]))

hadoop 2.5.0版本环境名字发生改变:mapreduce.map.input.file,在streaming中将.替换为_

2. 时间戳处理
从时间戳变为格式化字符串
from datetime import *
time_stamp_instance = 1381419600 
tmp_datetime = datetime.fromtimestamp(time_stamp_instance)
tmp_date = tmp_datetime.strftime('%Y%m%d')
tmp_hourmin = tmp_datetime.strftime('%H%M')

格式化字符串变为时间戳
a = "2013-10-10 23:40:00"
import time
timeArray = time.strptime(a, "%Y-%m-%d %H:%M:%S")
timeStamp = int(time.mktime(timeArray))
timeStamp == 1381419600

3. 命令行参数解析
from optparse import OptionParser
    try:
        usage_string = "usage: python %prog [options] arg"
        parser = OptionParser(usage=usage_string)
        parser.add_option('-c', dest='conf', default=None, help='read conf here')
        parser.add_option('-v', dest='version', action='store_true', default=None, help='print version')
        (options, args) = parser.parse_args()
        if options.version:
            print_version()
            sys.exit(0)

    except Exception,e :
        print >> sys.sderr, e
    if None == options.conf:
        print '-c argument is necessary'
        sys.exit(1)

3. 配置文件
自己写一个conf.py
#-*-coding:gbk-*-
"""""""""""""""
input:
                fname: conf file name
                kname: 
"""""""""""""""
import sys
import logging

class Conf:
        def __init__(self, fname, kname):
                '''Conf.__init__(fname, kname)
                        fname(string):conf path/name
                        kname(list):key which you want'''

                self.conf_name={}
                for cname in kname:
                        self.conf_name[cname] = ''
                try: 
                        f = file(fname)
                except IOError:
                        raise Exception('fail to open file [%s]'%(fname))

                while True:
                        line = f.readline()
                        if len(line) == 0:
                                break
                        if line.startswith('#'):
                                continue
                        arr = line.split(':')
                        if len(arr) != 2:
                                continue
                        if self.conf_name.has_key(arr[0].strip()):
                                self.conf_name[arr[0].strip()] = arr[1].strip()

                f.close()

                for key, value in self.conf_name.items():
                        if value == '':
                                raise Exception('fail in conf[key=%s, val=%s]'%(key, value))


if __name__ == '__main__':
        if len(sys.argv) != 2:
                logging.warning('input error')
                sys.exit(1)

        kname=['HADOOP_HOME', 'INPUT_PATH_1', 'INPUT_SOURCE_1', 'INPUT_PATH_2', 'INPUT_SOURCE_2']
        try :
                myconf = Conf(sys.argv[1], kname)
                for name in kname:
                        print '%s:%s'%(name, myconf.conf_name[name])
        except Exception, e:
                logging.warning('failed to create conf [%s]'%(e))

4. 写log
#-*-coding:gbk-*-
"""
实现log相关功能,分级和输出形式模仿了ullog样式

******************使用方法*********华丽的分割线***********
import logger

# 初始化,输出DEUBG及以上级别的日志
# DEBUG, TRACE和NOTICE结果打在/log/a.log文件中
# WARNING和FATAL结果打在/log/a.log.wf文件中
test_logger = logger.Logger('/log/a', 'DEBUG')

# 打DEBUG日志
test_logger.debug_log('sdlksdlks')

# 打FATAL日志
test_logger.fatal_log('ksldsll')

# 其他级别log类似
"""

import os
import sys 
import inspect
import logging
from logging import handlers

logging.TRACE = 15
logging.addLevelName(logging.TRACE, 'TRACE')
logging.NOTICE = logging.INFO
logging.addLevelName(logging.NOTICE, "NOTICE")
logging.FATAL = logging.ERROR
logging.addLevelName(logging.FATAL, 'FATAL')

log_level_dict = {'DEBUG':10, 'TRACE':15, 'NOTICE':20, 'WARNING':30, 'FATAL':40}

class Logger:
    def __init__(self, log_file, log_level):
        formatter = logging.Formatter('%(levelname)s: %(asctime)s: %(message)s', datefmt='%m-%d %H:%M:%S')

        real_log_level = log_level_dict.get(log_level, 20) 

        log_dir = os.path.dirname(log_file)
        if not os.path.exists(log_dir):
            os.makedirs(log_dir)
        if os.path.isfile(log_dir):
            raise IOException('the path [%s] is regular file but not a dir'%(log_dir))

        self.normal_handler = logging.handlers.WatchedFileHandler('%s.log'%(log_file))
        self.normal_handler.setFormatter(formatter)
        self.normal_logger = logging.getLogger('normal')
        self.normal_logger.addHandler(self.normal_handler)
        self.normal_logger.setLevel(real_log_level)

        self.wf_handler = logging.handlers.WatchedFileHandler('%s.log.wf'%(log_file))
        self.wf_handler.setFormatter(formatter)
        self.wf_logger = logging.getLogger('wf')
        self.wf_logger.addHandler(self.wf_handler)
        self.wf_logger.setLevel(logging.WARNING)

        self.valid = True

    def __del__(self):
        self.close()

    def close(self):
        if not self.valid:
            return

        self.normal_handler.flush()
        self.normal_handler.close()

        self.wf_handler.flush()
        self.wf_handler.close()

        logging.shutdown()

        self.valid = False

    def __get_call_func_frame_info(self):
        frame = inspect.getouterframes(inspect.currentframe())[2]
        frame_info = inspect.getframeinfo(frame[0])
        info = '[%s][%d][%s]'%(frame_info.filename, frame_info.lineno, frame_info.function)
        return info
   def warning_log(self, info):
        stack_info = self.__get_call_func_frame_info()
        self.wf_logger.warning('%s %s'%(stack_info, info))

    def fatal_log(self, info):
        stack_info = self.__get_call_func_frame_info()
        self.wf_logger.log(logging.FATAL, '%s %s'%(stack_info, info))

    def notice_log(self, info):
        stack_info = self.__get_call_func_frame_info()
        self.normal_logger.log(logging.NOTICE, '%s %s'%(stack_info, info))

    def trace_log(self, info):
        stack_info = self.__get_call_func_frame_info()
        self.normal_logger.log(logging.TRACE, '%s %s'%(stack_info, info))

    def debug_log(self, info):
        stack_info = self.__get_call_func_frame_info()
        self.normal_logger.debug('%s %s'%(stack_info, info))

  使用示例:
  import logger
    home= os.path.dirname(sys.path[0])
    log_path = '%(home)s/log/traffic_link_compress'%{'home': home}

    try:
        my_logger = logger.Logger(log_path, 'NOTICE')
    except Exception, e:
        print "invalid info =%s " %e         
        pass

 my_logger.fatal_log('failed to read conf')
my_logger.warning_log('input_path not exist'                                                               

5. 根据经纬度算距离
import math
def GetPToPLength(X1,Y1,X2,Y2):
    try:
        dx = X1 - X2
        dy = Y1 - Y2
        sx = math.cos( Y1 * 0.01745329252)
        return (math.sqrt(dx * dx * sx * sx + dy * dy) * 111195.0)
    except Exception, e:
        return -1

6. 简单logger
def log_warn(s):
print >>sys.stderr, "[%s][WARN] %s" % (
datetime.strftime(datetime.now(), "%Y-%m-%d %H:%M:%S") ,s)
https://www.cda.cn/kecheng/4.html
def log_info(s):
print >>sys.stderr, "[%s][INFO] %s" % (
datetime.strftime(datetime.now(), "%Y-%m-%d %H:%M:%S") ,s)

7. 读取文件
XXX_list = []
with open(r'XXX.txt') as file_list:
    for line in file_list:
        flds = line.rstrip()
        XXX_list.append(flds)


数据分析咨询请扫描二维码

客服在线
立即咨询