SparkContext.
newAPIHadoopRDD
Read a ‘new API’ Hadoop InputFormat with arbitrary key and value class, from an arbitrary Hadoop configuration, which is passed in as a Python dict. This will be converted into a Configuration in Java. The mechanism is the same as for meth:SparkContext.sequenceFile.
New in version 1.1.0.
fully qualified classname of Hadoop InputFormat (e.g. “org.apache.hadoop.mapreduce.lib.input.TextInputFormat”)
fully qualified classname of key Writable class (e.g. “org.apache.hadoop.io.Text”)
fully qualified classname of value Writable class (e.g. “org.apache.hadoop.io.LongWritable”)
fully qualified name of a function returning key WritableConverter (None by default)
fully qualified name of a function returning value WritableConverter (None by default)
Hadoop configuration, passed in as a dict (None by default)
The number of Python objects represented as a single Java object. (default 0, choose batchSize automatically)
RDD
RDD of tuples of key and corresponding value
See also
RDD.saveAsNewAPIHadoopDataset()
RDD.saveAsHadoopDataset()
SparkContext.hadoopRDD()
SparkContext.hadoopFile()
Examples
>>> import os >>> import tempfile
Set the related classes
>>> output_format_class = "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat" >>> input_format_class = "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat" >>> key_class = "org.apache.hadoop.io.IntWritable" >>> value_class = "org.apache.hadoop.io.Text"
>>> with tempfile.TemporaryDirectory() as d: ... path = os.path.join(d, "new_hadoop_file") ... ... # Create the conf for writing ... write_conf = { ... "mapreduce.job.outputformat.class": (output_format_class), ... "mapreduce.job.output.key.class": key_class, ... "mapreduce.job.output.value.class": value_class, ... "mapreduce.output.fileoutputformat.outputdir": path, ... } ... ... # Write a temporary Hadoop file ... rdd = sc.parallelize([(1, ""), (1, "a"), (3, "x")]) ... rdd.saveAsNewAPIHadoopDataset(conf=write_conf) ... ... # Create the conf for reading ... read_conf = {"mapreduce.input.fileinputformat.inputdir": path} ... ... loaded = sc.newAPIHadoopRDD(input_format_class, ... key_class, value_class, conf=read_conf) ... collected = sorted(loaded.collect())
>>> collected [(1, ''), (1, 'a'), (3, 'x')]