public classes of Spark SQL:
- SQLContext Main entry point for SQL functionality.
- DataFrame A Resilient Distributed Dataset (RDD) with Schema information for the data contained. In addition to normal RDD operations, DataFrames also support SQL.
- GroupedData
- Column Column is a DataFrame with a single column.
- Row A Row of data returned by a Spark SQL query.
- HiveContext Main entry point for accessing data stored in Apache Hive..
Main entry point for Spark SQL functionality.
A SQLContext can be used create DataFrame, register DataFrame as tables, execute SQL over tables, cache tables, and read parquet files.
Applies the given schema to the given RDD of tuple or list.
These tuples or lists can contain complex nested structures like lists, maps or nested rows.
The schema should be a StructType.
It is important that the schema matches the types of the objects in each row or exceptions could be thrown at runtime.
>>> from pyspark.sql.types import *
>>> rdd2 = sc.parallelize([(1, "row1"), (2, "row2"), (3, "row3")])
>>> schema = StructType([StructField("field1", IntegerType(), False),
... StructField("field2", StringType(), False)])
>>> df = sqlCtx.applySchema(rdd2, schema)
>>> df.collect()
[Row(field1=1, field2=u'row1'),..., Row(field1=3, field2=u'row3')]
Caches the specified table in-memory.
Removes all cached tables from the in-memory cache.
Create a DataFrame from an RDD of tuple/list, list or pandas.DataFrame.
schema could be StructType or a list of column names.
When schema is a list of column names, the type of each column will be inferred from rdd.
When schema is None, it will try to infer the column name and type from rdd, which should be an RDD of Row, or namedtuple, or dict.
If referring needed, samplingRatio is used to determined how many rows will be used to do referring. The first row will be used if samplingRatio is None.
Parameters: |
|
---|---|
Returns: | a DataFrame |
>>> l = [('Alice', 1)]
>>> sqlCtx.createDataFrame(l).collect()
[Row(_1=u'Alice', _2=1)]
>>> sqlCtx.createDataFrame(l, ['name', 'age']).collect()
[Row(name=u'Alice', age=1)]
>>> d = [{'name': 'Alice', 'age': 1}]
>>> sqlCtx.createDataFrame(d).collect()
[Row(age=1, name=u'Alice')]
>>> rdd = sc.parallelize(l)
>>> sqlCtx.createDataFrame(rdd).collect()
[Row(_1=u'Alice', _2=1)]
>>> df = sqlCtx.createDataFrame(rdd, ['name', 'age'])
>>> df.collect()
[Row(name=u'Alice', age=1)]
>>> from pyspark.sql import Row
>>> Person = Row('name', 'age')
>>> person = rdd.map(lambda r: Person(*r))
>>> df2 = sqlCtx.createDataFrame(person)
>>> df2.collect()
[Row(name=u'Alice', age=1)]
>>> from pyspark.sql.types import *
>>> schema = StructType([
... StructField("name", StringType(), True),
... StructField("age", IntegerType(), True)])
>>> df3 = sqlCtx.createDataFrame(rdd, schema)
>>> df3.collect()
[Row(name=u'Alice', age=1)]
>>> sqlCtx.createDataFrame(df.toPandas()).collect()
[Row(name=u'Alice', age=1)]
Creates an external table based on the dataset in a data source.
It returns the DataFrame associated with the external table.
The data source is specified by the source and a set of options. If source is not specified, the default data source configured by spark.sql.sources.default will be used.
Optionally, a schema can be provided as the schema of the returned DataFrame and created external table.
Returns the value of Spark SQL configuration property for the given key.
If the key is not set, returns defaultValue.
Infer and apply a schema to an RDD of Row.
When samplingRatio is specified, the schema is inferred by looking at the types of each row in the sampled dataset. Otherwise, the first 100 rows of the RDD are inspected. Nested collections are supported, which can include array, dict, list, Row, tuple, namedtuple, or object.
Each row could be pyspark.sql.Row object or namedtuple or objects. Using top level dicts is deprecated, as dict is used to represent Maps.
If a single column has multiple distinct inferred types, it may cause runtime exceptions.
>>> rdd = sc.parallelize(
... [Row(field1=1, field2="row1"),
... Row(field1=2, field2="row2"),
... Row(field1=3, field2="row3")])
>>> df = sqlCtx.inferSchema(rdd)
>>> df.collect()[0]
Row(field1=1, field2=u'row1')
Loads a text file storing one JSON object per line as a DataFrame.
If the schema is provided, applies the given schema to this JSON dataset.
Otherwise, it samples the dataset with ratio samplingRatio to determine the schema.
>>> import tempfile, shutil
>>> jsonFile = tempfile.mkdtemp()
>>> shutil.rmtree(jsonFile)
>>> with open(jsonFile, 'w') as f:
... f.writelines(jsonStrings)
>>> df1 = sqlCtx.jsonFile(jsonFile)
>>> df1.printSchema()
root
|-- field1: long (nullable = true)
|-- field2: string (nullable = true)
|-- field3: struct (nullable = true)
| |-- field4: long (nullable = true)
>>> from pyspark.sql.types import *
>>> schema = StructType([
... StructField("field2", StringType()),
... StructField("field3",
... StructType([StructField("field5", ArrayType(IntegerType()))]))])
>>> df2 = sqlCtx.jsonFile(jsonFile, schema)
>>> df2.printSchema()
root
|-- field2: string (nullable = true)
|-- field3: struct (nullable = true)
| |-- field5: array (nullable = true)
| | |-- element: integer (containsNull = true)
Loads an RDD storing one JSON object per string as a DataFrame.
If the schema is provided, applies the given schema to this JSON dataset.
Otherwise, it samples the dataset with ratio samplingRatio to determine the schema.
>>> df1 = sqlCtx.jsonRDD(json)
>>> df1.first()
Row(field1=1, field2=u'row1', field3=Row(field4=11, field5=None), field6=None)
>>> df2 = sqlCtx.jsonRDD(json, df1.schema)
>>> df2.first()
Row(field1=1, field2=u'row1', field3=Row(field4=11, field5=None), field6=None)
>>> from pyspark.sql.types import *
>>> schema = StructType([
... StructField("field2", StringType()),
... StructField("field3",
... StructType([StructField("field5", ArrayType(IntegerType()))]))
... ])
>>> df3 = sqlCtx.jsonRDD(json, schema)
>>> df3.first()
Row(field2=u'row1', field3=Row(field5=None))
Returns the dataset in a data source as a DataFrame.
The data source is specified by the source and a set of options. If source is not specified, the default data source configured by spark.sql.sources.default will be used.
Optionally, a schema can be provided as the schema of the returned DataFrame.
Loads a Parquet file, returning the result as a DataFrame.
>>> import tempfile, shutil
>>> parquetFile = tempfile.mkdtemp()
>>> shutil.rmtree(parquetFile)
>>> df.saveAsParquetFile(parquetFile)
>>> df2 = sqlCtx.parquetFile(parquetFile)
>>> sorted(df.collect()) == sorted(df2.collect())
True
Registers the given RDD as a temporary table in the catalog.
Temporary tables exist only during the lifetime of this instance of SQLContext.
>>> sqlCtx.registerDataFrameAsTable(df, "table1")
Registers a lambda function as a UDF so it can be used in SQL statements.
In addition to a name and the function itself, the return type can be optionally specified. When the return type is not given it default to a string and conversion will automatically be done. For any other return type, the produced object must match the specified type.
>>> sqlCtx.registerFunction("stringLengthString", lambda x: len(x))
>>> sqlCtx.sql("SELECT stringLengthString('test')").collect()
[Row(c0=u'4')]
>>> from pyspark.sql.types import IntegerType
>>> sqlCtx.registerFunction("stringLengthInt", lambda x: len(x), IntegerType())
>>> sqlCtx.sql("SELECT stringLengthInt('test')").collect()
[Row(c0=4)]
Sets the given Spark SQL configuration property.
Return a DataFrame representing the result of the given query.
>>> sqlCtx.registerDataFrameAsTable(df, "table1")
>>> df2 = sqlCtx.sql("SELECT field1 AS f1, field2 as f2 from table1")
>>> df2.collect()
[Row(f1=1, f2=u'row1'), Row(f1=2, f2=u'row2'), Row(f1=3, f2=u'row3')]
Returns the specified table as a DataFrame.
>>> sqlCtx.registerDataFrameAsTable(df, "table1")
>>> df2 = sqlCtx.table("table1")
>>> sorted(df.collect()) == sorted(df2.collect())
True
Returns a list of names of tables in the database dbName.
If dbName is not specified, the current database will be used.
>>> sqlCtx.registerDataFrameAsTable(df, "table1")
>>> "table1" in sqlCtx.tableNames()
True
>>> "table1" in sqlCtx.tableNames("db")
True
Returns a DataFrame containing names of tables in the given database.
If dbName is not specified, the current database will be used.
The returned DataFrame has two columns, tableName and isTemporary (a column with BooleanType indicating if a table is a temporary one or not).
>>> sqlCtx.registerDataFrameAsTable(df, "table1")
>>> df2 = sqlCtx.tables()
>>> df2.filter("tableName = 'table1'").first()
Row(tableName=u'table1', isTemporary=True)
Removes the specified table from the in-memory cache.
A variant of Spark SQL that integrates with data stored in Hive.
Configuration for Hive is read from hive-site.xml on the classpath. It supports running both SQL and HiveQL commands.
A collection of rows that have the same columns.
A DataFrame is equivalent to a relational table in Spark SQL, and can be created using various functions in SQLContext:
people = sqlContext.parquetFile("...")
Once created, it can be manipulated using the various domain-specific-language (DSL) functions defined in: DataFrame, Column.
To select a column from the data frame, use the apply method:
ageCol = people.age
Note that the Column type can also be manipulated through its various functions:
# The following creates a new column that increases everybody's age by 10.
people.age + 10
A more concrete example:
# To create DataFrame using SQLContext
people = sqlContext.parquetFile("...")
department = sqlContext.parquetFile("...")
people.filter(people.age > 30).join(department, people.deptId == department.id)) .groupBy(department.name, "gender").agg({"salary": "avg", "age": "max"})
Aggregate on the entire DataFrame without groups (shorthand for df.groupBy.agg()).
>>> df.agg({"age": "max"}).collect()
[Row(MAX(age#0)=5)]
>>> from pyspark.sql import functions as F
>>> df.agg(F.min(df.age)).collect()
[Row(MIN(age#0)=2)]
Persist with the default storage level (MEMORY_ONLY_SER).
Return a list that contains all of the rows.
Each object in the list is a Row, the fields can be accessed as attributes.
>>> df.collect()
[Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
Return all column names as a list.
>>> df.columns
[u'age', u'name']
Return the number of elements in this RDD.
Unlike the base RDD implementation of count, this implementation leverages the query optimizer to compute the count on the DataFrame, which supports features such as filter pushdown.
>>> df.count()
2L
Return a new DataFrame containing the distinct rows in this DataFrame.
>>> df.distinct().count()
2L
Return all column names and their data types as a list.
>>> df.dtypes
[('age', 'int'), ('name', 'string')]
Prints the plans (logical and physical) to the console for debugging purpose.
If extended is False, only prints the physical plan.
>>> df.explain()
PhysicalRDD [age#0,name#1], MapPartitionsRDD[...] at mapPartitions at SQLContext.scala:...
>>> df.explain(True)
== Parsed Logical Plan ==
...
== Analyzed Logical Plan ==
...
== Optimized Logical Plan ==
...
== Physical Plan ==
...
== RDD ==
Filtering rows using the given condition, which could be Column expression or string of SQL expression.
where() is an alias for filter().
>>> df.filter(df.age > 3).collect()
[Row(age=5, name=u'Bob')]
>>> df.where(df.age == 2).collect()
[Row(age=2, name=u'Alice')]
>>> df.filter("age > 3").collect()
[Row(age=5, name=u'Bob')]
>>> df.where("age = 2").collect()
[Row(age=2, name=u'Alice')]
Return the first row.
>>> df.first()
Row(age=2, name=u'Alice')
Return a new RDD by first applying a function to all elements of this, and then flattening the results.
It’s a shorthand for df.rdd.flatMap()
>>> df.flatMap(lambda p: p.name).collect()
[u'A', u'l', u'i', u'c', u'e', u'B', u'o', u'b']
Applies a function to all rows of this DataFrame.
It’s a shorthand for df.rdd.foreach()
>>> def f(person):
... print person.name
>>> df.foreach(f)
Applies a function to each partition of this DataFrame.
It’s a shorthand for df.rdd.foreachPartition()
>>> def f(people):
... for person in people:
... print person.name
>>> df.foreachPartition(f)
Group the DataFrame using the specified columns, so we can run aggregation on them. See GroupedData for all the available aggregate functions.
>>> df.groupBy().avg().collect()
[Row(AVG(age#0)=3.5)]
>>> df.groupBy('name').agg({'age': 'mean'}).collect()
[Row(name=u'Bob', AVG(age#0)=5.0), Row(name=u'Alice', AVG(age#0)=2.0)]
>>> df.groupBy(df.name).avg().collect()
[Row(name=u'Bob', AVG(age#0)=5.0), Row(name=u'Alice', AVG(age#0)=2.0)]
Return the first n rows or the first row if n is None.
>>> df.head()
Row(age=2, name=u'Alice')
>>> df.head(1)
[Row(age=2, name=u'Alice')]
Inserts the contents of this DataFrame into the specified table.
Optionally overwriting any existing data.
Return a new DataFrame containing rows only in both this frame and another frame.
This is equivalent to INTERSECT in SQL.
Returns True if the collect and take methods can be run locally (without any Spark executors).
Join with another DataFrame, using the given join expression. The following performs a full outer join between df1 and df2.
Parameters: |
|
---|
>>> df.join(df2, df.name == df2.name, 'outer').select(df.name, df2.height).collect()
[Row(name=None, height=80), Row(name=u'Bob', height=85), Row(name=u'Alice', height=None)]
Limit the result count to the number specified.
>>> df.limit(1).collect()
[Row(age=2, name=u'Alice')]
>>> df.limit(0).collect()
[]
Return a new RDD by applying a function to each Row
It’s a shorthand for df.rdd.map()
>>> df.map(lambda p: p.name).collect()
[u'Alice', u'Bob']
Return a new RDD by applying a function to each partition.
It’s a shorthand for df.rdd.mapPartitions()
>>> rdd = sc.parallelize([1, 2, 3, 4], 4)
>>> def f(iterator): yield 1
>>> rdd.mapPartitions(f).sum()
4
Return a new DataFrame sorted by the specified column(s).
Parameters: | cols – The columns or expressions used for sorting |
---|
>>> df.sort(df.age.desc()).collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
>>> df.orderBy(df.age.desc()).collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
>>> from pyspark.sql.functions import *
>>> df.sort(asc("age")).collect()
[Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
>>> df.orderBy(desc("age"), "name").collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
Set the storage level to persist its values across operations after the first time it is computed. This can only be used to assign a new storage level if the RDD does not have a storage level set yet. If no storage level is specified defaults to (MEMORY_ONLY_SER).
Prints out the schema in the tree format.
>>> df.printSchema()
root
|-- age: integer (nullable = true)
|-- name: string (nullable = true)
DEPRECATED: use registerTempTable() instead
Registers this RDD as a temporary table using the given name.
The lifetime of this temporary table is tied to the SQLContext that was used to create this DataFrame.
>>> df.registerTempTable("people")
>>> df2 = sqlCtx.sql("select * from people")
>>> sorted(df.collect()) == sorted(df2.collect())
True
Return a new DataFrame that has exactly numPartitions partitions.
>>> df.repartition(10).rdd.getNumPartitions()
10
Return a sampled subset of this DataFrame.
>>> df.sample(False, 0.5, 97).count()
1L
Saves the contents of the DataFrame to a data source.
The data source is specified by the source and a set of options. If source is not specified, the default data source configured by spark.sql.sources.default will be used.
Additionally, mode is used to specify the behavior of the save operation when data already exists in the data source. There are four modes:
Save the contents as a Parquet file, preserving the schema.
Files that are written out using this method can be read back in as a DataFrame using the SQLContext.parquetFile method.
>>> import tempfile, shutil
>>> parquetFile = tempfile.mkdtemp()
>>> shutil.rmtree(parquetFile)
>>> df.saveAsParquetFile(parquetFile)
>>> df2 = sqlCtx.parquetFile(parquetFile)
>>> sorted(df2.collect()) == sorted(df.collect())
True
Saves the contents of the DataFrame to a data source as a table.
The data source is specified by the source and a set of options. If source is not specified, the default data source configured by spark.sql.sources.default will be used.
Additionally, mode is used to specify the behavior of the saveAsTable operation when table already exists in the data source. There are four modes:
Returns the schema of this DataFrame (represented by a StructType).
>>> df.schema
StructType(List(StructField(age,IntegerType,true),StructField(name,StringType,true)))
Selecting a set of expressions.
>>> df.select('*').collect()
[Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
>>> df.select('name', 'age').collect()
[Row(name=u'Alice', age=2), Row(name=u'Bob', age=5)]
>>> df.select(df.name, (df.age + 10).alias('age')).collect()
[Row(name=u'Alice', age=12), Row(name=u'Bob', age=15)]
Selects a set of SQL expressions. This is a variant of select that accepts SQL expressions.
>>> df.selectExpr("age * 2", "abs(age)").collect()
[Row((age * 2)=4, Abs(age)=2), Row((age * 2)=10, Abs(age)=5)]
Print the first n rows.
>>> df
DataFrame[age: int, name: string]
>>> df.show()
age name
2 Alice
5 Bob
Return a new DataFrame sorted by the specified column(s).
Parameters: | cols – The columns or expressions used for sorting |
---|
>>> df.sort(df.age.desc()).collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
>>> df.orderBy(df.age.desc()).collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
>>> from pyspark.sql.functions import *
>>> df.sort(asc("age")).collect()
[Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
>>> df.orderBy(desc("age"), "name").collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
Return a new DataFrame containing rows in this frame but not in another frame.
This is equivalent to EXCEPT in SQL.
Take the first num rows of the RDD.
Each object in the list is a Row, the fields can be accessed as attributes.
>>> df.take(2)
[Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
Convert a DataFrame into a MappedRDD of JSON documents; one document per row.
>>> df.toJSON().first()
'{"age":2,"name":"Alice"}'
Collect all the rows and return a pandas.DataFrame.
>>> df.toPandas()
age name
0 2 Alice
1 5 Bob
Return a new DataFrame containing union of rows in this frame and another frame.
This is equivalent to UNION ALL in SQL.
Mark it as non-persistent, and remove all blocks for it from memory and disk.
Filtering rows using the given condition, which could be Column expression or string of SQL expression.
where() is an alias for filter().
>>> df.filter(df.age > 3).collect()
[Row(age=5, name=u'Bob')]
>>> df.where(df.age == 2).collect()
[Row(age=2, name=u'Alice')]
>>> df.filter("age > 3").collect()
[Row(age=5, name=u'Bob')]
>>> df.where("age = 2").collect()
[Row(age=2, name=u'Alice')]
Return a new DataFrame by adding a column.
>>> df.withColumn('age2', df.age + 2).collect()
[Row(age=2, name=u'Alice', age2=4), Row(age=5, name=u'Bob', age2=7)]
Rename an existing column to a new name
>>> df.withColumnRenamed('age', 'age2').collect()
[Row(age2=2, name=u'Alice'), Row(age2=5, name=u'Bob')]
A set of methods for aggregations on a DataFrame, created by DataFrame.groupBy().
Compute aggregates by specifying a map from column name to aggregate methods.
The available aggregate methods are avg, max, min, sum, count.
Parameters: | exprs – list or aggregate columns or a map from column name to aggregate methods. |
---|
>>> gdf = df.groupBy(df.name)
>>> gdf.agg({"*": "count"}).collect()
[Row(name=u'Bob', COUNT(1)=1), Row(name=u'Alice', COUNT(1)=1)]
>>> from pyspark.sql import functions as F
>>> gdf.agg(F.min(df.age)).collect()
[Row(MIN(age#0)=5), Row(MIN(age#0)=2)]
Compute the average value for each numeric columns for each group.
>>> df.groupBy().avg('age').collect()
[Row(AVG(age#0)=3.5)]
>>> df3.groupBy().avg('age', 'height').collect()
[Row(AVG(age#4L)=3.5, AVG(height#5L)=82.5)]
Count the number of rows for each group.
>>> df.groupBy(df.age).count().collect()
[Row(age=2, count=1), Row(age=5, count=1)]
Compute the max value for each numeric columns for each group.
>>> df.groupBy().max('age').collect()
[Row(MAX(age#0)=5)]
>>> df3.groupBy().max('age', 'height').collect()
[Row(MAX(age#4L)=5, MAX(height#5L)=85)]
Compute the average value for each numeric columns for each group. This is an alias for avg.
>>> df.groupBy().mean('age').collect()
[Row(AVG(age#0)=3.5)]
>>> df3.groupBy().mean('age', 'height').collect()
[Row(AVG(age#4L)=3.5, AVG(height#5L)=82.5)]
Compute the min value for each numeric column for each group.
>>> df.groupBy().min('age').collect()
[Row(MIN(age#0)=2)]
>>> df3.groupBy().min('age', 'height').collect()
[Row(MIN(age#4L)=2, MIN(height#5L)=80)]
Compute the sum for each numeric columns for each group.
>>> df.groupBy().sum('age').collect()
[Row(SUM(age#0)=7)]
>>> df3.groupBy().sum('age', 'height').collect()
[Row(SUM(age#4L)=7, SUM(height#5L)=165)]
A column in a DataFrame.
Column instances can be created by:
# 1. Select a column out of a DataFrame
df.colName
df["colName"]
# 2. Create from an expression
df.colName + 1
1 / df.colName
Return a alias for this column
>>> df.select(df.age.alias("age2")).collect()
[Row(age2=2), Row(age2=5)]
Returns a sort expression based on the ascending order of the given column name.
Convert the column into type dataType
>>> df.select(df.age.cast("string").alias('ages')).collect()
[Row(ages=u'2'), Row(ages=u'5')]
>>> df.select(df.age.cast(StringType()).alias('ages')).collect()
[Row(ages=u'2'), Row(ages=u'5')]
Returns a sort expression based on the descending order of the given column name.
binary operator
An expression that gets a field by name in a StructField.
True if the current expression is not null.
True if the current expression is null.
binary operator
binary operator
binary operator
A row in DataFrame. The fields in it can be accessed like attributes.
Row can be used to create a row object by using named arguments, the fields will be sorted by names.
>>> row = Row(name="Alice", age=11)
>>> row
Row(age=11, name='Alice')
>>> row.name, row.age
('Alice', 11)
Row also can be used to create another Row like class, then it could be used to create Row objects, such as
>>> Person = Row("name", "age")
>>> Person
<Row(name, age)>
>>> Person("Alice", 11)
Row(name='Alice', age=11)
Return as an dict
Spark SQL NullType
The data type representing None, used for the types which has not been inferred.
Spark SQL StringType
The data type representing string values.
Spark SQL BinaryType
The data type representing bytearray values.
Spark SQL BooleanType
The data type representing bool values.
Spark SQL DateType
The data type representing datetime.date values.
Spark SQL TimestampType
The data type representing datetime.datetime values.
Spark SQL DecimalType
The data type representing decimal.Decimal values.
Spark SQL DoubleType
The data type representing float values.
Spark SQL FloatType
The data type representing single precision floating-point values.
Spark SQL ByteType
The data type representing int values with 1 singed byte.
Spark SQL IntegerType
The data type representing int values.
Spark SQL LongType
The data type representing long values. If the any value is beyond the range of [-9223372036854775808, 9223372036854775807], please use DecimalType.
Spark SQL ShortType
The data type representing int values with 2 signed bytes.
Spark SQL ArrayType
The data type representing list values. An ArrayType object comprises two fields, elementType (a DataType) and containsNull (a bool). The field of elementType is used to specify the type of array elements. The field of containsNull is used to specify if the array has None values.
Spark SQL MapType
The data type representing dict values. A MapType object comprises three fields, keyType (a DataType), valueType (a DataType) and valueContainsNull (a bool).
The field of keyType is used to specify the type of keys in the map. The field of valueType is used to specify the type of values in the map. The field of valueContainsNull is used to specify if values of this map has None values.
For values of a MapType column, keys are not allowed to have None values.
Spark SQL StructField
Represents a field in a StructType. A StructField object comprises three fields, name (a string), dataType (a DataType) and nullable (a bool). The field of name is the name of a StructField. The field of dataType specifies the data type of a StructField.
The field of nullable specifies if values of a StructField can contain None values.
A collections of builtin functions
Computes the absolutle value.
Return a new Column for approximate distinct count of col
>>> df.agg(approxCountDistinct(df.age).alias('c')).collect()
[Row(c=2)]
Returns a sort expression based on the ascending order of the given column name.
Aggregate function: returns the average of the values in a group.
Returns a Column based on the given column name.
Returns a Column based on the given column name.
Aggregate function: returns the number of items in a group.
Return a new Column for distinct count of col or cols
>>> df.agg(countDistinct(df.age, df.name).alias('c')).collect()
[Row(c=2)]
>>> df.agg(countDistinct("age", "name").alias('c')).collect()
[Row(c=2)]
Returns a sort expression based on the descending order of the given column name.
Aggregate function: returns the first value in a group.
Aggregate function: returns the last value in a group.
Creates a Column of literal value.
Converts a string expression to upper case.
Aggregate function: returns the maximum value of the expression in a group.
Aggregate function: returns the average of the values in a group.
Aggregate function: returns the minimum value of the expression in a group.
Computes the square root of the specified float value.
Aggregate function: returns the sum of all values in the expression.
Aggregate function: returns the sum of distinct values in the expression.
Create a user defined function (UDF)
>>> from pyspark.sql.types import IntegerType
>>> slen = udf(lambda s: len(s), IntegerType())
>>> df.select(slen(df.name).alias('slen')).collect()
[Row(slen=5), Row(slen=3)]
Converts a string expression to upper case.