sql排序
dense_rank 会考虑并列
row_number() 不会考虑并列 会直接排序
rank() 会考虑并列,但会直接跳过并列的值
source读取文件
//第二个是编码格式
Iterator<String> lines = Source.fromFile(new File("input/click.txt"),"utf8").getLines();
while (lines.hasNext()){
String next = lines.next();
System.out.println(next);
}
获取当前时间戳
import java.util.Calendar;
long l = Calendar.getInstance().getTimeInMillis();
sink redis
package p52;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import p33.Event;
/**
* @author jiasongfan
* @date 2022/7/25
* @apiNote
*/
public class SinkToRedis {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Event> streamSource = env.fromElements(
new Event("bob", "./home", 1000L),
new Event("mary", "./home", 2000L),
new Event("bob", "./pord", 3000L)
);
streamSource.print();
//创建一个连接配置
FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
.setHost("hdp1")
.build();
//sink 到redis
streamSource.addSink(new RedisSink<>(config,new MyRedisMapper()));
env.execute();
}
//自动义类 实现redisMapper接口
public static class MyRedisMapper implements RedisMapper<Event>{
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET,"clicks");
}
@Override
public String getKeyFromData(Event event) {
return event.user;
}
@Override
public String getValueFromData(Event event) {
return event.url;
}
}
}
source mysql
package p33;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
/**
* @author jiasongfan
* @date 2022/7/20
* @apiNote
*/
public class RClickSOurce extends RichSourceFunction<Event> {
Connection conn =null;
PreparedStatement ps =null;
@Override
public void open(Configuration parameters) throws Exception {
Class.forName("com.mysql.jdbc.Driver");
conn=DriverManager.getConnection("jdbc:mysql://hdp1:3306/1912b", "root", "root");
ps=conn.prepareStatement("select * from t_event");
}
@Override
public void close() throws Exception {
conn.close();
ps.close();
}
@Override
public void run(SourceContext<Event> ctx) throws Exception {
ResultSet resultSet = ps.executeQuery();
while (resultSet.next()){
String users = resultSet.getString(1);
String uils = resultSet.getString(2);
Long ts = resultSet.getLong(3);
ctx.collect(new Event(users,uils,ts));
}
}
@Override
public void cancel() {
}
}
maxby 求最大 里面的值只能写字段名
KeyedStream<Event, String> keyedStream = eventSource.keyBy(data -> data.user);
SingleOutputStreamOperator<Event> max = keyedStream.maxBy("ts");
reduce的规约聚合
package p42;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import p33.Event;
/**
* @author jiasongfan
* @date 2022/7/21
* @apiNote
*/
public class TranformReduceTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Event> eventSource = env.fromElements(new Event("bob", "./home", 1000L),
new Event("mary", "./home", 2000L),
new Event("bob", "./pord", 3000L)
);
//统计每个项目访问次数
SingleOutputStreamOperator<Tuple2<String, Long>> map = eventSource.map(new MapFunction<Event, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(Event event) throws Exception {
return Tuple2.of(event.user, 1L);
}
});
KeyedStream<Tuple2<String, Long>, String> keyedStream = map.keyBy(new KeySelector<Tuple2<String, Long>, String>() {
@Override
public String getKey(Tuple2<String, Long> v1) throws Exception {
return v1.f0;
}
});
SingleOutputStreamOperator<Tuple2<String, Long>> reduce = keyedStream.reduce(new ReduceFunction<Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> t1) throws Exception {
return Tuple2.of(v1.f0, v1.f1 + t1.f1);
}
});
//选取当前最活跃的用户
//这个key 是随便写的 用于把短的数据分到一个组
SingleOutputStreamOperator<Tuple2<String, Long>> result = reduce.keyBy(data -> "key").reduce(new ReduceFunction<Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> t1) throws Exception {
return v1.f1 > t1.f1 ? v1 : t1;
}
});
result.print();
env.execute();
}
}
sink到redis
redis的启动
进入到redis页面
redis-server redis.conf
导包
<!-- 连接到redis-->
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
package p52;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.apache.flink.table.expressions.E;
import p33.Event;
/**
* @author jiasongfan
* @date 2022/7/25
* @apiNote
*/
public class SinkToRedis1 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Event> streamSource = env.fromElements(
new Event("bob", "./home", 1000L),
new Event("mary", "./home", 2000L),
new Event("bob", "./pord", 3000L)
);
FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
.setHost("hdp1")
.build();
streamSource.addSink(new RedisSink<>(config,new RedisMapper1()));
env.execute();
}
public static class RedisMapper1 implements RedisMapper<Event>{
@Override
public RedisCommandDescription getCommandDescription() {
//采用hash
return new RedisCommandDescription(RedisCommand.HSET,"clicks");
}
@Override
public String getKeyFromData(Event event) {
return event.user;
}
@Override
public String getValueFromData(Event event) {
return event.url;
}
}
}
avg平均值的 计算 (带窗口 必须使用processwindowfunction)
package p73;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.scala.function.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import scala.collection.Iterable;
import scala.collection.Iterator;
import java.lang.Double;
import java.time.Duration;
/**
* @author jiasongfan
* @date 2022/7/27
* @apiNote
*/
public class AggTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<ABean> streamSource = env.fromElements(
new ABean("bob", 3, 1000L),
new ABean("Mary", 4, 2000L),
new ABean("bob", 5, 3000L)
);
//求平均值
SingleOutputStreamOperator<ABean> timeDS = streamSource.assignTimestampsAndWatermarks(WatermarkStrategy
.<ABean>forBoundedOutOfOrderness(Duration.ofSeconds(0))
.withTimestampAssigner((event, timestamp) -> event.ts));
KeyedStream<ABean, String> keyDS = timeDS.keyBy(data -> data.id);
WindowedStream<ABean, String, TimeWindow> winDS = keyDS.window(TumblingEventTimeWindows.of(Time.seconds(5)));
SingleOutputStreamOperator<WData> avgDS = winDS.aggregate(new MyAvg(), new MyWin2());
avgDS.print();
env.execute();
}
//[IN, OUT, KEY, W <: Window]
public static class MyWin2 extends ProcessWindowFunction<Double,WData, String,TimeWindow> {
@Override
public void process(String s, ProcessWindowFunction<Double, WData, String, TimeWindow>.Context context, java.lang.Iterable<Double> iterable, Collector<WData> collector) throws Exception {
long start = context.window().getStart();
long end = context.window().getEnd();
Double next = iterable.iterator().next();
collector.collect(new WData(start,end,s,next));
}
}
public static class MyWin1 implements WindowFunction<Double, WData, String, TimeWindow> {
@Override
public void apply(String s, TimeWindow window, Iterable<Double> input, Collector<WData> out) {
Iterator<Double> iterator = input.iterator();
while (iterator.hasNext()){
Double next = iterator.next();
out.collect(new WData(window.getStart(),window.getEnd(),s,next));
}
}
}
public static class MyAvg implements AggregateFunction<ABean, Tuple2<Long, Long>, Double> {
@Override
public Tuple2<Long, Long> createAccumulator() {
return Tuple2.of(0L,0L);
}
@Override
public Tuple2<Long, Long> add(ABean aBean, Tuple2<Long, Long> v1) {
return Tuple2.of((aBean.num+v1.f0),(v1.f1+1L));
}
@Override
public Double getResult(Tuple2<Long, Long> v1) {
return ((double)v1.f0)/v1.f1 ;
}
@Override
public Tuple2<Long, Long> merge(Tuple2<Long, Long> v1, Tuple2<Long, Long> acc1) {
return Tuple2.of(v1.f0+acc1.f0,v1.f1+acc1.f1);
}
}
}
sum计次数
package p73;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.time.Duration;
/**
* @author jiasongfan
* @date 2022/7/27
* @apiNote
*/
public class SumTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<ABean> streamSource = env.fromElements(
new ABean("bob", 3, 1000L),
new ABean("Mary", 4, 2000L),
new ABean("bob", 5, 3000L)
);
//求平均值
SingleOutputStreamOperator<ABean> timeDS = streamSource.assignTimestampsAndWatermarks(WatermarkStrategy
.<ABean>forBoundedOutOfOrderness(Duration.ofSeconds(0))
.withTimestampAssigner((event, timestamp) -> event.ts));
KeyedStream<ABean, String> keyDS = timeDS.keyBy(data -> data.id);
WindowedStream<ABean, String, TimeWindow> winDS = keyDS.window(TumblingEventTimeWindows.of(Time.seconds(5)));
SingleOutputStreamOperator<String> sumDS = winDS.aggregate(new MySum(), new MyWin2());
sumDS.print();
env.execute();
}
//[IN, OUT, KEY, W <: Window]
public static class MyWin2 extends ProcessWindowFunction<Integer,String,String,TimeWindow> {
@Override
public void process(String s, ProcessWindowFunction<Integer, String, String, TimeWindow>.Context context, Iterable<Integer> iterable, Collector<String> collector) throws Exception {
long start = context.window().getStart();
long end = context.window().getEnd();
Integer next = iterable.iterator().next();
collector.collect(start+","+end+","+next);
}
}
//<IN, ACC, OUT>
public static class MySum implements AggregateFunction<ABean,Integer,Integer>{
@Override
public Integer createAccumulator() {
return 0;
}
@Override
public Integer add(ABean aBean, Integer integer) {
return integer+1;
}
@Override
public Integer getResult(Integer integer) {
return integer;
}
@Override
public Integer merge(Integer integer, Integer acc1) {
return null;
}
}
}
累加器
/**
* @author jiasongfan
* @date 2022/7/28
* @apiNote
*/
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
public class SumTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//env.setParallelism(1);
//2.Source
DataStreamSource<String> streamSource = env.fromElements("aaa", "bbb", "ccc", "ddd");
SingleOutputStreamOperator<String> result = streamSource.map(new RichMapFunction<String, String>() {
//1 创建累加器
private IntCounter elementCounter = new IntCounter();
@Override
public void open(Configuration parameters) throws Exception {
//注册累加器
getRuntimeContext().addAccumulator("elementCounter", elementCounter);
}
@Override
public String map(String s) throws Exception {
this.elementCounter.add(1);
return s;
}
});
//-4.获取加强结果
JobExecutionResult jobExecutionResult = env.execute();
int nums = jobExecutionResult.getAccumulatorResult("elementCounter");
System.out.println("使用累加器统计的结果:"+nums);
}
}
sparksql
求女生占比
sum(case when sex='女' then 1 else 0 end) cnt1,
count(*) cnt ,
(sum(case when sex='女' then 1 else 0 end)*1.0/count(*)) cnt2
tableSQL (带聚合的 操作)
package p128;
/**
* @author jiasongfan
* @date 2022/7/30
* @apiNote
*/
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class TableTest02 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
//.inBatchMode()
.useBlinkPlanner()
.build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
//Mary,./home,1000
tEnv.executeSql("CREATE TABLE t_table (\n"
+ " users String,\n"
+ " urls STRING,\n"
+ " ts Bigint) \n"
+ "WITH (\n"
+ " 'connector' = 'filesystem', -- required: specify the connector\n"
+ " 'path' = 'file:///D:\\E\\month9class\\zong\\input\\click.txt', -- required: path to a directory\n"
+ " 'format' = 'csv'"
+ ")");
tEnv.executeSql("select users,sum(ts) sums from t_table"
+ " group by users");
//.print();
//写入到kafka
Table table = tEnv.sqlQuery("select users,sum(ts) sums from t_table group by users");
//聚合转换
tEnv.toChangelogStream(table).print();
env.execute();
}
}
topN
package p86;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import p73.ABean;
import p73.AggTest;
import p73.WData;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
/**
* @author jiasongfan
* @date 2022/7/28
* @apiNote
*/
public class TopNTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<ABean> streamSource = env.fromElements(
new ABean("bob", 3, 1000L),
new ABean("Mary", 4, 2000L),
new ABean("bob", 5, 3000L),
new ABean("Kak", 5, 4000L),
new ABean("KaK", 7, 5000L),
new ABean("NoNO", 8, 6000L),
new ABean("ZZ", 8, 7000L),
new ABean("NoNO", 8, 9000L),
new ABean("NoNO", 8, 11000L)
);
//求平均值
SingleOutputStreamOperator<ABean> timeDS = streamSource.assignTimestampsAndWatermarks(WatermarkStrategy
.<ABean>forBoundedOutOfOrderness(Duration.ofSeconds(0))
.withTimestampAssigner((event, timestamp) -> event.ts));
KeyedStream<ABean, String> keyDS = timeDS.keyBy(data -> data.id);
WindowedStream<ABean, String, TimeWindow> winDS = keyDS.window(TumblingEventTimeWindows.of(Time.seconds(5)));
SingleOutputStreamOperator<WData> avgDS = winDS.aggregate(new AggTest.MyAvg(), new AggTest.MyWin2());
SingleOutputStreamOperator<String> process = avgDS.keyBy(data -> data.end).process(new MyPro2());
process.print("topN");
env.execute();
}
public static class MyPro2 extends KeyedProcessFunction<Long,WData,String>{
//定义状态
ListStateDescriptor<WData> descriptor =
new ListStateDescriptor<>(
"buffered-elements",
TypeInformation.of(new TypeHint<WData>() {}));
ListState<WData> list=null;
@Override
public void open(Configuration parameters) throws Exception {
list = getRuntimeContext().getListState(descriptor);
}
@Override
public void processElement(WData wData, KeyedProcessFunction<Long, WData, String>.Context context, Collector<String> collector) throws Exception {
list.add(wData);
//注册定时器
context.timerService().registerEventTimeTimer(context.getCurrentKey()+1);
}
@Override
public void onTimer(long timestamp, KeyedProcessFunction<Long, WData, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
ArrayList<WData> wData = new ArrayList<>();
for (WData wDatum : list.get()) {
wData.add(wDatum);
}
//排序
wData.sort(new Comparator<WData>() {
@Override
public int compare(WData o1, WData o2) {
return o2.getAvg().intValue()-o1.getAvg().intValue();
}
});
if(wData.size()>=3){
List<WData> wData1 = wData.subList(0, 3);
out.collect(wData1.toString());
}else{
out.collect(wData.toString());
}
}
}
}
自定义source类型
package p128;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
import p33.Event;
import java.util.Date;
import java.util.Random;
/**
* @author jiasongfan
* @date 2022/7/30
* @apiNote
*/
public class Clickhouse extends RichSourceFunction<Event> {
private boolean running=true;
@Override
public void run(SourceContext<Event> sourceContext) throws Exception {
//随机生成数据
Random random = new Random();
//定义字段选取的数据集
String[] users= {"Mary","","Alice","Bob","Cary"};
String[] urls={"./home","./cart","./fav","./prod?id=100"};
//循环生成数据
while (true){
String user = users[random.nextInt(users.length)];
String url = urls[random.nextInt(urls.length)];
long time = new Date().getTime();
sourceContext.collect(new Event(user,url,time));
Thread.sleep(1000L);
}
}
@Override
public void cancel() {
running=false;
}
}
拿window 的ip地址
ipconfig
192.168.0.105
本地改localhost