pyspark主要在python脚本里书写逻辑,然后直接调用即可。
1. python计算脚本:calc.py
# -*- coding:utf-8 -*-
import time
import json
import math
import logging
import subprocess
from collections import Counter,defaultdict
from pyspark import StorageLevel
from pyspark import SparkContext
import base64
import hashlib
import numpy as np
import datetime
print "Start time: ", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
# save data
def save_data(rdd_data, output_path, is_gz=False):
if not is_gz:
rdd_data.saveAsTextFile(output_path)
else:
codec = "org.apache.hadoop.io.compress.GzipCodec"
rdd_data.saveAsTextFile(output_path, codec)
# load data
def load_data(context, load_file_names):
input_data = context.textFile(load_file_names)
return input_data
def process_line(x):
line = x.split('\t')
if len(line) < 2:
return (x, "00000000")
return (line[2], x)
def datask_run(datask):
context = datask
### log ###
logger = logging.getLogger("custom")
logger.setLevel(logging.INFO)
logformat = logging.Formatter('%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s','%a, %d %b %Y %H:%M:%S')
ch = logging.StreamHandler()
ch.setFormatter(logformat)
logger.addHandler(ch)
logger.info("pyspark demo test")
### input and output ###
sk_input1 = "xxx"
sk_input2 = "xxx2"
sk_input = sk_input1 + "," + sk_input2
sk_output = "xxxoutput"
print ">>>>>>>>>input_path:", sk_input
print ">>>>>>>>>output_path:", sk_output
### proccess ### 从input2里找input1里未出现的样本
print "start to process..."
rdd_sum = load_data(context, sk_input) \
.map(lambda x: process_line(x)) \
.groupByKey().mapValues(list) \
.filter(lambda x: True if "00000000" not in x[1] else False) \
.flatMap(lambda x: x[1])
subprocess.call("hadoop fs -rm -r "+sk_output, shell=True)
save_data(rdd_sum, sk_output)
print "End time: ", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
print "done."
if __name__ == "__main__":
time_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
app_name = "test@[name=ericoliang,frequence=single,type=offline,time=" + time_str + "]"
sc = SparkContext(appName=app_name)
datask_run(datask=sc)
2. 调用
脚本说明:calc.py在这里传给spark-submit,utils.py被calc.py调用。
#!/bin/bash
source /data9/ericoliang/kbemr_env.sh
spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode client \
--driver-memory 6G \
--executor-memory 6G \
--executor-cores 2 \
--num-executors 200 \
--queue low \
--conf spark.default.parallelism=200 \
--conf spark.executor.memoryOverhead=6G \
--conf spark.driver.maxResultSize=0 \
--py-files utils.py \
calc.py