javaFlink

sql排序

dense_rank  会考虑并列

row_number() 不会考虑并列 会直接排序

rank()  会考虑并列,但会直接跳过并列的值

source读取文件

//第二个是编码格式 
Iterator<String> lines = Source.fromFile(new File("input/click.txt"),"utf8").getLines();
        while (lines.hasNext()){
            String next = lines.next();
            System.out.println(next);
        }

获取当前时间戳

import java.util.Calendar;

long l = Calendar.getInstance().getTimeInMillis();

sink redis

package p52;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import p33.Event;

/**
 * @author jiasongfan
 * @date 2022/7/25
 * @apiNote
 */
public class SinkToRedis {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<Event> streamSource = env.fromElements(
                new Event("bob", "./home", 1000L),
                new Event("mary", "./home", 2000L),
                new Event("bob", "./pord", 3000L)
        );

        streamSource.print();

        //创建一个连接配置
        FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
                .setHost("hdp1")
                .build();


        //sink 到redis
        streamSource.addSink(new RedisSink<>(config,new MyRedisMapper()));
        env.execute();
    }

    //自动义类 实现redisMapper接口
    public static class MyRedisMapper implements RedisMapper<Event>{

        @Override
        public RedisCommandDescription getCommandDescription() {
            return new RedisCommandDescription(RedisCommand.HSET,"clicks");
        }

        @Override
        public String getKeyFromData(Event event) {
            return event.user;
        }

        @Override
        public String getValueFromData(Event event) {
            return event.url;
        }
    }

}

source mysql

package p33;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;

/**
 * @author jiasongfan
 * @date 2022/7/20
 * @apiNote
 */
public class RClickSOurce extends RichSourceFunction<Event> {
    Connection conn =null;
    PreparedStatement ps =null;
    @Override
    public void open(Configuration parameters) throws Exception {
       Class.forName("com.mysql.jdbc.Driver");
        conn=DriverManager.getConnection("jdbc:mysql://hdp1:3306/1912b", "root", "root");
         ps=conn.prepareStatement("select  * from t_event");
    }

    @Override
    public void close() throws Exception {
        conn.close();
        ps.close();
    }

    @Override
    public void run(SourceContext<Event> ctx) throws Exception {
        ResultSet resultSet = ps.executeQuery();
        while (resultSet.next()){
            String users = resultSet.getString(1);
            String uils = resultSet.getString(2);
            Long ts = resultSet.getLong(3);
            ctx.collect(new Event(users,uils,ts));
        }
    }

    @Override
    public void cancel() {

    }
}

maxby 求最大 里面的值只能写字段名

 KeyedStream<Event, String> keyedStream = eventSource.keyBy(data -> data.user);
        SingleOutputStreamOperator<Event> max = keyedStream.maxBy("ts");

reduce的规约聚合

package p42;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import p33.Event;

/**
 * @author jiasongfan
 * @date 2022/7/21
 * @apiNote
 */
public class TranformReduceTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<Event> eventSource = env.fromElements(new Event("bob", "./home", 1000L),
                new Event("mary", "./home", 2000L),
                new Event("bob", "./pord", 3000L)
        );
       //统计每个项目访问次数
        SingleOutputStreamOperator<Tuple2<String, Long>> map = eventSource.map(new MapFunction<Event, Tuple2<String, Long>>() {
            @Override
            public Tuple2<String, Long> map(Event event) throws Exception {
                return Tuple2.of(event.user, 1L);
            }
        });
        KeyedStream<Tuple2<String, Long>, String> keyedStream = map.keyBy(new KeySelector<Tuple2<String, Long>, String>() {
            @Override
            public String getKey(Tuple2<String, Long> v1) throws Exception {
                return v1.f0;
            }
        });
        SingleOutputStreamOperator<Tuple2<String, Long>> reduce = keyedStream.reduce(new ReduceFunction<Tuple2<String, Long>>() {
            @Override
            public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> t1) throws Exception {
                return Tuple2.of(v1.f0, v1.f1 + t1.f1);
            }
        });
        //选取当前最活跃的用户
        //这个key 是随便写的  用于把短的数据分到一个组
        SingleOutputStreamOperator<Tuple2<String, Long>> result = reduce.keyBy(data -> "key").reduce(new ReduceFunction<Tuple2<String, Long>>() {
            @Override
            public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> t1) throws Exception {
                return v1.f1 > t1.f1 ? v1 : t1;
            }
        });
        result.print();
        env.execute();
    }
}

sink到redis

redis的启动

进入到redis页面

redis-server redis.conf

导包

 <!--    连接到redis-->
        <dependency>
            <groupId>org.apache.bahir</groupId>
            <artifactId>flink-connector-redis_2.11</artifactId>
            <version>1.0</version>
        </dependency>
package p52;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.apache.flink.table.expressions.E;
import p33.Event;

/**
 * @author jiasongfan
 * @date 2022/7/25
 * @apiNote
 */
public class SinkToRedis1 {
    public static void main(String[] args) throws Exception {
        
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<Event> streamSource = env.fromElements(
                new Event("bob", "./home", 1000L),
                new Event("mary", "./home", 2000L),
                new Event("bob", "./pord", 3000L)
        );
        FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
                .setHost("hdp1")
                .build();
        streamSource.addSink(new RedisSink<>(config,new RedisMapper1()));
        
        
        env.execute();
    }
    
    public static class  RedisMapper1 implements RedisMapper<Event>{

        @Override
        public RedisCommandDescription getCommandDescription() {
            //采用hash  
            return new RedisCommandDescription(RedisCommand.HSET,"clicks");
        }

        @Override
        public String getKeyFromData(Event event) {
            return event.user;
        }

        @Override
        public String getValueFromData(Event event) {
            return event.url;
        }
    }
}

avg平均值的 计算  (带窗口 必须使用processwindowfunction)

package p73;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.scala.function.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import scala.collection.Iterable;
import scala.collection.Iterator;
import java.lang.Double;

import java.time.Duration;

/**
 * @author jiasongfan
 * @date 2022/7/27
 * @apiNote
 */
public class AggTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<ABean> streamSource = env.fromElements(
                new ABean("bob", 3, 1000L),
                new ABean("Mary", 4, 2000L),
                new ABean("bob", 5, 3000L)
        );
        //求平均值
        SingleOutputStreamOperator<ABean> timeDS = streamSource.assignTimestampsAndWatermarks(WatermarkStrategy
                .<ABean>forBoundedOutOfOrderness(Duration.ofSeconds(0))
                .withTimestampAssigner((event, timestamp) -> event.ts));
        KeyedStream<ABean, String> keyDS = timeDS.keyBy(data -> data.id);

        WindowedStream<ABean, String, TimeWindow> winDS = keyDS.window(TumblingEventTimeWindows.of(Time.seconds(5)));
        SingleOutputStreamOperator<WData> avgDS = winDS.aggregate(new MyAvg(), new MyWin2());
        avgDS.print();

        env.execute();
    }
    //[IN, OUT, KEY, W <: Window]
    public static class MyWin2 extends ProcessWindowFunction<Double,WData, String,TimeWindow> {


        @Override
        public void process(String s, ProcessWindowFunction<Double, WData, String, TimeWindow>.Context context, java.lang.Iterable<Double> iterable, Collector<WData> collector) throws Exception {
            long start = context.window().getStart();
            long end = context.window().getEnd();
            Double next = iterable.iterator().next();
            collector.collect(new WData(start,end,s,next));
        }
    }

    public static class MyWin1 implements WindowFunction<Double, WData, String, TimeWindow> {
        @Override
        public void apply(String s, TimeWindow window, Iterable<Double> input, Collector<WData> out) {
            Iterator<Double> iterator = input.iterator();
            while (iterator.hasNext()){
                Double next = iterator.next();
                out.collect(new WData(window.getStart(),window.getEnd(),s,next));
            }
        }

    }
    public static class  MyAvg implements AggregateFunction<ABean, Tuple2<Long, Long>, Double> {

        @Override
        public Tuple2<Long, Long> createAccumulator() {
            return Tuple2.of(0L,0L);
        }

        @Override

        public Tuple2<Long, Long> add(ABean aBean, Tuple2<Long, Long> v1) {
            return Tuple2.of((aBean.num+v1.f0),(v1.f1+1L));
        }

        @Override
        public Double getResult(Tuple2<Long, Long> v1) {

            return  ((double)v1.f0)/v1.f1 ;
        }

        @Override
        public Tuple2<Long, Long> merge(Tuple2<Long, Long> v1, Tuple2<Long, Long> acc1) {
            return Tuple2.of(v1.f0+acc1.f0,v1.f1+acc1.f1);
        }
    }
}

sum计次数

package p73;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;

/**
 * @author jiasongfan
 * @date 2022/7/27
 * @apiNote
 */
public class SumTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<ABean> streamSource = env.fromElements(
                new ABean("bob", 3, 1000L),
                new ABean("Mary", 4, 2000L),
                new ABean("bob", 5, 3000L)
        );
        //求平均值
        SingleOutputStreamOperator<ABean> timeDS = streamSource.assignTimestampsAndWatermarks(WatermarkStrategy
                .<ABean>forBoundedOutOfOrderness(Duration.ofSeconds(0))
                .withTimestampAssigner((event, timestamp) -> event.ts));
        KeyedStream<ABean, String> keyDS = timeDS.keyBy(data -> data.id);


        WindowedStream<ABean, String, TimeWindow> winDS = keyDS.window(TumblingEventTimeWindows.of(Time.seconds(5)));

        SingleOutputStreamOperator<String> sumDS = winDS.aggregate(new MySum(), new MyWin2());
        sumDS.print();

        env.execute();
    }
    //[IN, OUT, KEY, W <: Window]
    public static class MyWin2 extends ProcessWindowFunction<Integer,String,String,TimeWindow> {


        @Override
        public void process(String s, ProcessWindowFunction<Integer, String, String, TimeWindow>.Context context, Iterable<Integer> iterable, Collector<String> collector) throws Exception {
            long start = context.window().getStart();
            long end = context.window().getEnd();
            Integer next = iterable.iterator().next();
            collector.collect(start+","+end+","+next);
        }
    }


    //<IN, ACC, OUT>
    public static class MySum implements AggregateFunction<ABean,Integer,Integer>{

        @Override
        public Integer createAccumulator() {
            return 0;
        }

        @Override
        public Integer add(ABean aBean, Integer integer) {
            return integer+1;
        }

        @Override
        public Integer getResult(Integer integer) {
            return integer;
        }

        @Override
        public Integer merge(Integer integer, Integer acc1) {
            return null;
        }
    }

}

累加器

/**
 * @author jiasongfan
 * @date 2022/7/28
 * @apiNote
 */

import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;

public class SumTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //env.setParallelism(1);

        //2.Source
        DataStreamSource<String> streamSource = env.fromElements("aaa", "bbb", "ccc", "ddd");
        SingleOutputStreamOperator<String> result = streamSource.map(new RichMapFunction<String, String>() {
            //1 创建累加器
            private IntCounter elementCounter = new IntCounter();
            @Override
            public void open(Configuration parameters) throws Exception {
                //注册累加器
                getRuntimeContext().addAccumulator("elementCounter", elementCounter);
            }

            @Override
            public String map(String s) throws Exception {
                this.elementCounter.add(1);
                return s;
            }
        });


        //-4.获取加强结果
        JobExecutionResult jobExecutionResult = env.execute();
        int nums = jobExecutionResult.getAccumulatorResult("elementCounter");
        System.out.println("使用累加器统计的结果:"+nums);

    }
}

sparksql

求女生占比

sum(case when sex='女' then 1 else 0  end) cnt1,

count(*) cnt ,

(sum(case when sex='女' then 1 else 0  end)*1.0/count(*)) cnt2

tableSQL (带聚合的 操作)

package p128;

/**
 * @author jiasongfan
 * @date 2022/7/30
 * @apiNote
 */
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class TableTest02 {
    public static void main(String[] args) throws Exception {


        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .inStreamingMode()
                //.inBatchMode()
                .useBlinkPlanner()
                .build();

        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
        //Mary,./home,1000
        tEnv.executeSql("CREATE TABLE t_table (\n"
                + "  users String,\n"
                + "  urls STRING,\n"
                + "  ts Bigint) \n"

                + "WITH (\n"
                + "  'connector' = 'filesystem',           -- required: specify the connector\n"
                + "  'path' = 'file:///D:\\E\\month9class\\zong\\input\\click.txt',  -- required: path to a directory\n"
                + "  'format' = 'csv'"
                + ")");

        tEnv.executeSql("select users,sum(ts) sums from t_table"
                + " group by users");
                //.print();

        //写入到kafka
        Table table = tEnv.sqlQuery("select users,sum(ts) sums from t_table group by users");

        //聚合转换
        tEnv.toChangelogStream(table).print();
        env.execute();
    }
}

topN

package p86;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import p73.ABean;
import p73.AggTest;
import p73.WData;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;

/**
 * @author jiasongfan
 * @date 2022/7/28
 * @apiNote
 */
public class TopNTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<ABean> streamSource = env.fromElements(
                new ABean("bob", 3, 1000L),
                new ABean("Mary", 4, 2000L),
                new ABean("bob", 5, 3000L),
                new ABean("Kak", 5, 4000L),
                new ABean("KaK", 7, 5000L),
                new ABean("NoNO", 8, 6000L),
                new ABean("ZZ", 8, 7000L),
                new ABean("NoNO", 8, 9000L),
                new ABean("NoNO", 8, 11000L)
        );

        //求平均值
        SingleOutputStreamOperator<ABean> timeDS = streamSource.assignTimestampsAndWatermarks(WatermarkStrategy
                .<ABean>forBoundedOutOfOrderness(Duration.ofSeconds(0))
                .withTimestampAssigner((event, timestamp) -> event.ts));
        KeyedStream<ABean, String> keyDS = timeDS.keyBy(data -> data.id);

        WindowedStream<ABean, String, TimeWindow> winDS = keyDS.window(TumblingEventTimeWindows.of(Time.seconds(5)));
        SingleOutputStreamOperator<WData> avgDS = winDS.aggregate(new AggTest.MyAvg(), new AggTest.MyWin2());
        SingleOutputStreamOperator<String> process = avgDS.keyBy(data -> data.end).process(new MyPro2());
        process.print("topN");

        env.execute();
    }
    public static class MyPro2 extends KeyedProcessFunction<Long,WData,String>{
        //定义状态
        ListStateDescriptor<WData> descriptor =
                new ListStateDescriptor<>(
                        "buffered-elements",
                        TypeInformation.of(new TypeHint<WData>() {}));
        ListState<WData> list=null;
        @Override
        public void open(Configuration parameters) throws Exception {
            list = getRuntimeContext().getListState(descriptor);
        }

        @Override
        public void processElement(WData wData, KeyedProcessFunction<Long, WData, String>.Context context, Collector<String> collector) throws Exception {
              list.add(wData);
              //注册定时器
            context.timerService().registerEventTimeTimer(context.getCurrentKey()+1);
        }

        @Override
        public void onTimer(long timestamp, KeyedProcessFunction<Long, WData, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
            ArrayList<WData> wData = new ArrayList<>();
            for (WData wDatum : list.get()) {
                wData.add(wDatum);
            }

            //排序
            wData.sort(new Comparator<WData>() {
                @Override
                public int compare(WData o1, WData o2) {
                    return o2.getAvg().intValue()-o1.getAvg().intValue();
                }
            });
            if(wData.size()>=3){
                List<WData> wData1 = wData.subList(0, 3);
                out.collect(wData1.toString());
            }else{
                out.collect(wData.toString());
            }

        }
    }
}

自定义source类型

package p128;

import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
import p33.Event;

import java.util.Date;
import java.util.Random;

/**
 * @author jiasongfan
 * @date 2022/7/30
 * @apiNote
 */
public class  Clickhouse extends RichSourceFunction<Event> {
        private boolean running=true;
    @Override
    public void run(SourceContext<Event> sourceContext) throws Exception {
        //随机生成数据
        Random random = new Random();
        //定义字段选取的数据集
       String[] users= {"Mary","","Alice","Bob","Cary"};
        String[] urls={"./home","./cart","./fav","./prod?id=100"};
        //循环生成数据
        while (true){
            String user = users[random.nextInt(users.length)];
            String url = urls[random.nextInt(urls.length)];
            long time = new Date().getTime();

            sourceContext.collect(new Event(user,url,time));
            Thread.sleep(1000L);
        }



    }

    @Override
    public void cancel() {
        running=false;
    }
}

拿window 的ip地址

ipconfig

192.168.0.105

本地改localhost

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值