在pyspark中操作hdfs文件

Raven's Blog

Home Page View on GitHub Send Email

在pyspark中操作hdfs文件

背景

这段时间的工作主要是跟spark打交道,最近遇到类似这样的需求,统计一些数据(统计结果很小),然后直接把文本数据写入到hdfs文件中去。

之前一直使用的是scala语言,实现起来非常简单:


import org.apache.spark.SparkContext
import org.apache.hadoop.fs.{FileSystem, Path}

def path(filepath:String) =
  new Path(filepath)

def getFileSystem(sc:SparkContex) =
  FileSystem.get(sc.hadoopConfiguration)

def write(sc:SparkContext, filepath:String, content:String, overwrite:Boolean = true):Unit = {
  try{
    val fileSystem = getFileSystem(sc)
    val fileName = path(filepath)
    val out = fileSystem.create(fileName, overwrite)
    out.write(content.getBytes)
    out.flush
    out.close
  }catch{
    case e:Exception => System.err.println(e.toString)
  }
}

但是scala那陡峭的学习曲线,并不适合整个团队的发展,和业务的快速迭代,所以我们统一改成了spark的python接口:pyspark

那么pyspark中该如何直接操作hdfs上的文件呢? 找了一圈,并没有在python的SparkContext中找到hadoopConfiguration()方法,开始用了一些比较ugly的方法,比如在python中直接调用hadoop命令去操作文件。

后来通过看pyspark的代码,发现使用的是 py4j 来连接python与java,根据py4j的原理,可以通过以下代码在python中调用java对象来操作hdfs文件:


#!/usr/bin/python
# -*- coding:utf-8 -*-

def path(sc, filepath):
  """
  创建hadoop path对象
  :param sc sparkContext对象
  :param filename 文件绝对路径
  :return org.apache.hadoop.fs.Path对象
  """
  path_class = sc._gateway.jvm.org.apache.hadoop.fs.path
  return path_class(filepath)

def get_file_system(sc):
  """
  创建FileSystem
  :param sc SparkContext
  :return FileSystem对象
  """
  filesystem_class = sc._gateway.jvm.org.apache.hadoop.fs.FilFileSystem
  hadoop_configuration = sc._jsc.hadoopConfiguration()
  return filesystem_class.get(hadoop_configuration)

def write(sc, filepath, content, overwite=True):
  """
  写内容到hdfs文件
  :param sc SparkContext
  :param filepath 绝对路径
  :param content 文件内容
  :param overwrite 是否覆盖
  """
  try:
    filesystem = get_file_system(sc)
    out = filesystem.create(path(sc, filepath), overwrite)
    out.write(bytearray(content, "utf-8"))
    out.flush()
    out.close()
  except Error, e:
    print e

那么这个神奇的py4j是什么呢?

可以看下官网的介绍:http://www.py4j.org/

这里大概说下个人理解: 要想在java和python中使用这个东西,需要在java代码中开一个本地socket监听端口,并暴露一个entrypoint,python代码中通过JavaGateway来跟java进程交互,也就是说,这个是一个跨语言的RPC调用

联系作者:aducode@126.com
更多精彩文章请点击:http://aducode.github.io