准备文件users.json
{"userId":"1", "userName":"Join", "age":23}
{"userId":"3", "userName":"Katy", "age":23}
{"userId":"4", "userName":"Mike", "age":21}
{"userId":"2", "userName":"Jeffy", "age":45}
{"userId":"5", "userName":"Av", "age":43}
准备文件orders.json
{"id":"1", "userId":"1", "userName":"Join", "totalPrice":80.0,"qty":3.0}
{"id":"2", "userId":"1", "userName":"Join", "totalPrice":50.0,"qty":3.0}
{"id":"3", "userId":"2", "userName":"Jeffy", "totalPrice":200.0,"qty":3.0}
{"id":"4", "userId":"99999", "userName":"zombie", "totalPrice":222.0,"qty":3.0}
准备文件order_items.json
{"id":"1", "orderId":"1", "name":"apple", "amount":4, "price":20.0, "userId":"1"}
{"id":"2", "orderId":"2", "name":"book", "amount":5, "price":10.0, "userId":"1"}
{"id":"3", "orderId":"3", "name":"cake", "amount":1, "price":200.0, "userId":"2"}
inner join
inner join是一定要找到左右表中满足join条件的记录,我们在写sql语句或者使用DataFrmae时,可以不用关心哪个是左表,哪个是右表,在spark sql查询优化阶段,spark会自动将大表设为左表,即streamIter,将小表设为右表,即buildIter。这样对小表的查找相对更优。
left outer join
left outer join是以左表为准,在右表中查找匹配的记录,如果查找失败,则返回一个所有字段都为null的记录。我们在写sql语句或者使用DataFrmae时,一般让大表在左边,小表在右边。
right outer join
right outer join是以右表为准,在左表中查找匹配的记录,如果查找失败,则返回一个所有字段都为null的记录。所以说,右表是streamIter,左表是buildIter,我们在写sql语句或者使用DataFrmae时,一般让大表在右边,小表在左边。
full outer join
full outer join相对来说要复杂一点,总体上来看既要做left outer join,又要做right outer join,但是又不能简单地先left outer join,再right outer join,最后union得到最终结果,因为这样最终结果中就存在两份inner join的结果了。因为既然完成left outer join又要完成right outer join,所以full outer join仅采用sort merge join实现,左边和右表既要作为streamIter,又要作为buildIter
left semi join
left semi join是以左表为准,在右表中查找匹配的记录,如果查找成功,则仅返回左边的记录,否则返回null
left anti join
left anti join与left semi join相反,是以左表为准,在右表中查找匹配的记录,如果查找成功,则返回null,否则仅返回左边的记录
join的原理图,可查看文章
https://cloud.tencent.com/developer/article/1005502
代码示例
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;public class test_29_1 {public static void main(String[] args) {SparkSession spark = SparkSession.builder().config("spark.driver.host", "localhost").appName("JoinApiTest").master("local").getOrCreate();spark.sparkContext().setLogLevel("ERROR");Dataset<Row> usersDataSet = spark.read().json(Utils.BASE_PATH + "/join/users.json");usersDataSet.show();/*+---+------+--------+|age|userId|userName|+---+------+--------+| 23| 1| Join|| 23| 3| Katy|| 21| 4| Mike|| 45| 2| Jeffy|| 43| 5| Av|+---+------+--------+*/Dataset<Row> ordersDataSet = spark.read().json(Utils.BASE_PATH + "/join/orders.json");ordersDataSet.show();/*+---+---+----------+------+--------+| id|qty|totalPrice|userId|userName|+---+---+----------+------+--------+| 1|3.0| 80.0| 1| Join|| 2|3.0| 50.0| 1| Join|| 3|3.0| 200.0| 2| Jeffy|| 4|3.0| 222.0| 99999| zombie|+---+---+----------+------+--------+*/Dataset<Row> orderItemsDataSet = spark.read().json(Utils.BASE_PATH + "/join/order_items.json");orderItemsDataSet.show();/*+------+---+-----+-------+-----+------+|amount| id| name|orderId|price|userId|+------+---+-----+-------+-----+------+| 4| 1|apple| 1| 20.0| 1|| 5| 2| book| 2| 10.0| 1|| 1| 3| cake| 3|200.0| 2|+------+---+-----+-------+-----+------+*///1:两张表有相同字段名的join//两张表有相同的join字段名的inner joinusersDataSet.join(ordersDataSet, "userId").show();/*+------+---+--------+---+---+----------+--------+|userId|age|userName| id|qty|totalPrice|userName|+------+---+--------+---+---+----------+--------+| 1| 23| Join| 2|3.0| 50.0| Join|| 1| 23| Join| 1|3.0| 80.0| Join|| 2| 45| Jeffy| 3|3.0| 200.0| Jeffy|+------+---+--------+---+---+----------+--------+*///joinType: inner、outer、left_outer、right_outer、leftsemi、leftanti//两张表有相同的join字段名的outer join,使的两个字段都只出现一次usersDataSet.join(ordersDataSet, usersDataSet.col("userId").equalTo(ordersDataSet.col("userId")), "outer").show();/*+----+------+--------+----+----+----------+------+--------+| age|userId|userName| id| qty|totalPrice|userId|userName|+----+------+--------+----+----+----------+------+--------+| 23| 3| Katy|null|null| null| null| null|| 43| 5| Av|null|null| null| null| null|| 23| 1| Join| 1| 3.0| 80.0| 1| Join|| 23| 1| Join| 2| 3.0| 50.0| 1| Join|| 21| 4| Mike|null|null| null| null| null||null| null| null| 4| 3.0| 222.0| 99999| zombie|| 45| 2| Jeffy| 3| 3.0| 200.0| 2| Jeffy|+----+------+--------+----+----+----------+------+--------+*/usersDataSet.join(ordersDataSet, usersDataSet.col("userId").equalTo(ordersDataSet.col("userId")), "left_outer").show();/*+---+------+--------+----+----+----------+------+--------+|age|userId|userName| id| qty|totalPrice|userId|userName|+---+------+--------+----+----+----------+------+--------+| 23| 1| Join| 2| 3.0| 50.0| 1| Join|| 23| 1| Join| 1| 3.0| 80.0| 1| Join|| 23| 3| Katy|null|null| null| null| null|| 21| 4| Mike|null|null| null| null| null|| 45| 2| Jeffy| 3| 3.0| 200.0| 2| Jeffy|| 43| 5| Av|null|null| null| null| null|+---+------+--------+----+----+----------+------+--------+*/usersDataSet.join(ordersDataSet, usersDataSet.col("userId").equalTo(ordersDataSet.col("userId")), "right_outer").show();/*+----+------+--------+---+---+----------+------+--------+| age|userId|userName| id|qty|totalPrice|userId|userName|+----+------+--------+---+---+----------+------+--------+| 23| 1| Join| 1|3.0| 80.0| 1| Join|| 23| 1| Join| 2|3.0| 50.0| 1| Join|| 45| 2| Jeffy| 3|3.0| 200.0| 2| Jeffy||null| null| null| 4|3.0| 222.0| 99999| zombie|+----+------+--------+---+---+----------+------+--------+*///查询出users中的userId在orders存在的usersusersDataSet.join(ordersDataSet, usersDataSet.col("userId").equalTo(ordersDataSet.col("userId")), "leftsemi").show();/*+---+------+--------+|age|userId|userName|+---+------+--------+| 23| 1| Join|| 45| 2| Jeffy|+---+------+--------+*///和leftsemi相反,查询出users中的userId不在orders存在的usersusersDataSet.join(ordersDataSet, usersDataSet.col("userId").equalTo(ordersDataSet.col("userId")), "leftanti").show();/*+---+------+--------+|age|userId|userName|+---+------+--------+| 23| 3| Katy|| 21| 4| Mike|| 43| 5| Av|+---+------+--------+*///2: 两张表中不是根据相同字段名的join,即根据条件来join的//两张表没有相同的join字段名的inner joinDataset<Row> joinResult =ordersDataSet.join(orderItemsDataSet, ordersDataSet.col("id").equalTo(orderItemsDataSet.col("orderId")));joinResult.show();/*+---+---+----------+------+--------+------+---+-----+-------+-----+------+| id|qty|totalPrice|userId|userName|amount| id| name|orderId|price|userId|+---+---+----------+------+--------+------+---+-----+-------+-----+------+| 1|3.0| 80.0| 1| Join| 4| 1|apple| 1| 20.0| 1|| 2|3.0| 50.0| 1| Join| 5| 2| book| 2| 10.0| 1|| 3|3.0| 200.0| 2| Jeffy| 1| 3| cake| 3|200.0| 2|+---+---+----------+------+--------+------+---+-----+-------+-----+------+*/ordersDataSet.join(orderItemsDataSet, ordersDataSet.col("id").equalTo(orderItemsDataSet.col("orderId")), "outer").show();/*+---+---+----------+------+--------+------+----+-----+-------+-----+------+| id|qty|totalPrice|userId|userName|amount| id| name|orderId|price|userId|+---+---+----------+------+--------+------+----+-----+-------+-----+------+| 3|3.0| 200.0| 2| Jeffy| 1| 3| cake| 3|200.0| 2|| 1|3.0| 80.0| 1| Join| 4| 1|apple| 1| 20.0| 1|| 4|3.0| 222.0| 99999| zombie| null|null| null| null| null| null|| 2|3.0| 50.0| 1| Join| 5| 2| book| 2| 10.0| 1|+---+---+----------+------+--------+------+----+-----+-------+-----+------+*/}
}