pyspark模板

pyspark主要在python脚本里书写逻辑,然后直接调用即可。

1. python计算脚本:calc.py

# -*- coding:utf-8 -*-
import time
import json
import math
import logging
import subprocess
from collections import Counter,defaultdict
from pyspark import StorageLevel
from pyspark import SparkContext
import base64
import hashlib
import numpy as np
import datetime
 

print "Start time: ", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())

# save data
def save_data(rdd_data, output_path, is_gz=False):
    if not is_gz:
        rdd_data.saveAsTextFile(output_path)
    else:
        codec = "org.apache.hadoop.io.compress.GzipCodec"
        rdd_data.saveAsTextFile(output_path, codec)

# load data
def load_data(context, load_file_names):
    input_data = context.textFile(load_file_names)
    return input_data

def process_line(x):
    line = x.split('\t')
    if len(line) < 2:
        return (x, "00000000")
    return (line[2], x)

def datask_run(datask):
    context = datask
    ### log ###
    logger = logging.getLogger("custom")
    logger.setLevel(logging.INFO)
    logformat = logging.Formatter('%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s','%a, %d %b %Y %H:%M:%S')
    ch = logging.StreamHandler()
    ch.setFormatter(logformat)
    logger.addHandler(ch)
    logger.info("pyspark demo test")
    
    ### input and output ###
    sk_input1 = "xxx"
    sk_input2 = "xxx2"
    sk_input = sk_input1 + "," + sk_input2
    sk_output = "xxxoutput"
    
    print ">>>>>>>>>input_path:", sk_input
    print ">>>>>>>>>output_path:", sk_output
    
    ### proccess ### 从input2里找input1里未出现的样本
    print "start to process..."
    rdd_sum = load_data(context, sk_input) \
                .map(lambda x: process_line(x)) \
                .groupByKey().mapValues(list) \
                .filter(lambda x: True if "00000000" not in x[1] else False) \
                .flatMap(lambda x: x[1])
    
    subprocess.call("hadoop fs -rm -r "+sk_output, shell=True)
    save_data(rdd_sum, sk_output)
    
    print "End time: ", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
    print "done."

if __name__ == "__main__":
    time_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
    app_name = "test@[name=ericoliang,frequence=single,type=offline,time=" + time_str + "]" 
    sc = SparkContext(appName=app_name)
    datask_run(datask=sc)

2. 调用

脚本说明:calc.py在这里传给spark-submit,utils.py被calc.py调用。

#!/bin/bash
source /data9/ericoliang/kbemr_env.sh

spark-submit \
    --class org.apache.spark.examples.SparkPi \
    --master yarn \
    --deploy-mode client \
    --driver-memory 6G \
    --executor-memory 6G \
    --executor-cores 2 \ 
    --num-executors 200 \
    --queue low \
    --conf spark.default.parallelism=200 \
    --conf spark.executor.memoryOverhead=6G \
    --conf spark.driver.maxResultSize=0 \
    --py-files utils.py \
    calc.py

打赏一个呗

取消

感谢您的支持,我会继续努力的!

扫码支持
扫码支持
扫码打赏,你说多少就多少

打开支付宝扫一扫,即可进行扫码打赏哦