Hadoop入门案例-实现WordCount


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class WordCount {


    /**
     * 默认MapReduce是通过TextInputFormat进行切片并交给Mapper类进行处理
     * TextInputFormat:
     *      key->当前前行的首字母的索引
     *      value->当前行的数据
     * Mapper类参数:
     *  进行了包装:
     *      输入key类型:Long类型
     *      输入value类型:String
     *      输出key类型:String(单词)
     *      输出value类型:Long(个数)
     *MapReduce为了在传输时需要进行序列化,通过包装,对基本类型进行包装,实现自己的序列化方法
     */
    public static class WordCountMapper extends Mapper<LongWritable,Text,Text,LongWritable>{

        LongWritable one = new LongWritable(1);

        /**
         *将每行数据拆分
         */
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            String words = value.toString();
            //将每行数据拆分成各个单词
            String[] wordArr = words.split("\\s+");
            for (String word : wordArr) {
                //将单词输出 每个单词作为一个
                context.write(new Text(word),one);

            }


        }
    }


    /**
     *
     * 进行全局聚合
     * Reducer参数:
     * 同样对参数进行了包装
     *      输入:
     *          key:String(单词),value:Long(个数)
     *      输出:
     *          key:String,value:Long
     *
     */
    public static class WordCountReducer extends Reducer<Text,LongWritable,Text,LongWritable>{


        /**
         *将map输出结果进行全局聚合
         * key:单词,values:单词个数集合[1,1,1],context:上下文
         */
        @Override
        protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {

            Long sum = 0L;
            for (LongWritable value:values) {
                //累加单词的个数
                sum += value.get();
            }
            //以 单词,个数输出最终结果
            context.write(key,new LongWritable(sum));
        }
    }





    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        //创建一个Job
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS","hdfs://192.168.43.20:9000");
        Job job = Job.getInstance(conf,"word-count");
        //需要打成jar包才能运行
        job.setJarByClass(WordCount.class);

        //1.指定输入文件
            //通过参数传递一个path进来
        FileInputFormat.addInputPath(job,new Path(args[0]));

        //2.编写mapper处理逻辑
        job.setMapperClass(WordCountMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
        //3.shuffle流程

        //  todo

        //4.reducer处理逻辑
        job.setReducerClass(WordCountReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        //5.输出文件 并指定输出目录
        FileOutputFormat.setOutputPath(job,new Path(args[1]));

        //6.运行Job
        boolean result = job.waitForCompletion(true);
        System.out.println(result ? 1 : 0);
    }



}

使用maven打成jar包,上传到服务器

执行:bin/hadoop jar share/hadoop/mapreduce/jar包名称.jar 包名xxx.xxx.xxx.WordCount  输入文件目录   输出文件目录

03-28
### MCP API 的文档与使用教程 MCP 是一种用于增强大型语言模型 (LLM) 功能的技术框架,它通过提示(Prompts)、资源(Resources)以及工具(Tools)这三种核心原语来扩展 LLM 能力[^2]。Apifox 平台也认识到 MCP 技术在 API 开发领域的重要作用,并将其应用于实际场景中[^1]。 为了实现将 `/Users/syw/project/wechatAr` 文件夹下的所有文件上传至远程服务器 `47.93.xx.xx` 用户名 `root` 下的 `/opt/ll` 目录的操作,可以基于 MCP 工具功能构建一个自定义的服务逻辑。以下是具体实现方法: #### 实现方案 利用 SCP 命令完成文件传输任务,并结合 MCP 的 Tool 功能封装此操作以便于后续调用。当关键词为“上传微信目录”时,触发该工具执行相应动作。 ```python import subprocess def upload_wechat_directory(): source_dir = "/Users/syw/project/wechatAr/*" target_server = "root@47.93.xx.xx:/opt/ll/" try: result = subprocess.run(["scp", "-r", source_dir, target_server], check=True) return {"status": "success", "message": f"All files from {source_dir} have been uploaded to {target_server}"} except Exception as e: return {"status": "error", "message": str(e)} # 将上述函数注册为 MCP 中的一个 tool tools = { "upload_wechat_directory_tool": upload_wechat_directory, } # 定义 prompt 和 resource 配置部分省略... ``` 以上代码片段展示了如何创建一个名为 `upload_wechat_directory_tool` 的工具并将其集成到 MCP 系统里去[^3]。每当接收到匹配条件的消息比如含有特定关键字的时候就会激活对应的行为即启动SCP进程从而达成目标需求。 #### 进一步学习资料推荐 对于希望深入研究或者实践更多关于 MCP 应用案例的人士来说,《MCP 教程进阶篇》提供了丰富的实例分析和技术细节值得参考阅读;另外《MCP 极简入门:超快速上手运行简单的 MCP 服务和 MCP 客户端》同样是非常好的起点材料之一可以帮助初学者迅速掌握基础概念及其运作机制。
评论 5
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包

打赏作者

wonder4work

你的鼓励将是我创作的最大动力

¥1 ¥2 ¥4 ¥6 ¥10 ¥20
扫码支付:¥1
获取中
扫码支付

您的余额不足,请更换扫码支付或充值

打赏作者

实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值