spark streaming demo

spark streaming:

package sparkstreaming;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;

import java.util.Arrays;

public class SparkStreamingDemo2 {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));
        JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 2121);
//        JavaDStream<String> dStream = jssc.textFileStream(dir);

        // Split each line into words
        JavaDStream<String> words = lines.flatMap(
                new FlatMapFunction<String, String>() {
                    public Iterable<String> call(String x) {
//                        System.out.println("========"+x);
                        return Arrays.asList(x.split(","));
                    }
                });

        // Count each word in each batch
        JavaPairDStream<String, Integer> pairs = words.mapToPair(
                new PairFunction<String, String, Integer>() {
                    public Tuple2<String, Integer> call(String s) {
                        return new Tuple2<String, Integer>(s, 1);
                    }
                });
        JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(
                new Function2<Integer, Integer, Integer>() {
                    public Integer call(Integer i1, Integer i2) {
                        return i1 + i2;
                    }
                });

        //Print the first ten elements of each RDD generated in this DStream to the console
        wordCounts.print();

        jssc.start();              // Start the computation
        System.out.println("jssc.started");
        jssc.awaitTermination();   // Wait for the computation to terminate
    }
}
开启SocketServer:

package socket.demo;

import java.net.ServerSocket;
import java.net.Socket;

/**
 * 服务器端Socket
 *
 * @author Administrator
 *
 */
public class SocketServer {
	/**
	 * 服务器端Socket构�?方法
	 */
	public SocketServer() {
		try {
			int clientcount = 0; // 统计客户端�?�?
			boolean listening = true; // 是否对客户端进行监听
			ServerSocket server = null; // 服务器端Socket对象
			try {
				// 创建ServerSocket在端2121监听客户请求
				server = new ServerSocket(2121);
				System.out.println("Server starts...");
			} catch (Exception e) {
				System.out.println("Can not listen to. " + e);
			}
			while (listening) {
				clientcount++;
				Socket accept = server.accept();
				// 监听到客户请�?根据得到的Socket对象和客户计数创建服务线�?并启动之
				new ServerThread(accept, clientcount, "socketServerThread"+clientcount).start();
			}
		} catch (Exception e) {
			System.out.println("Error. " + e);
		}
	}

	public static void print(String info) {
		System.out.println("***************" + info);
	}

	/**
	 * @param args
	 */
	public static void main(String[] args) {
		new SocketServer();
	}
}
Socket处理线程:

package socket.demo;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;

public class ServerThread extends Thread {
	private static int number = 0; // 保存本进程的客户计数
	Socket socket = null; // 保存与本线程相关的Socket对象

	public ServerThread(Socket socket, int clientnum,String threadName) {
		this.socket = socket;
		number = clientnum;
		System.out.println("当前在线的用户数: " + number);
		this.setName(threadName);
	}

	public void run() {
		try {
			print("thread run...");
			// 由Socket对象得到输入并构造相应的BufferedReader对象
			BufferedReader in = new BufferedReader(new InputStreamReader(
					socket.getInputStream()));
			// 由Socket对象得到输出并构造PrintWriter对象
			PrintWriter out = new PrintWriter(socket.getOutputStream());
			// 由系统标准输入设备构造BufferedReader对象
			BufferedReader sysin = new BufferedReader(new InputStreamReader(
					System.in));
			String line; 
			// 从标准输入读入一字符串
			line = sysin.readLine();
			print("thread 3");
			while (!line.equals("bye")) {
				print("thread 4");
				// 向客户端输出该字符串
				out.println(line);
				// 刷新输出�?使Client马上收到该字符串
				out.flush();
				// 在系统标准输出上打印读入的字符串
				System.out.println("[Server send]: " + line);
				// 从系统标准输入读入一字符串
				line = sysin.readLine();
				print("thread 8");
			}
			out.close();
			in.close(); 
			socket.close(); 
		} catch (Exception e) {
			System.out.println("Error. " + e);
		}
	}

	public static void print(String info) {
		System.out.println("***************" + info);
	}
}




评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值