NIO----JAVA

在 Java 编程领域,NIO(New I/O)以其高效的 I/O 处理能力成为高并发、大数据量场景下的重要技术。当 NIO 的网络编程与文件处理能力相结合,能实现诸如文件传输系统等实用功能。下面将深入探讨 Java NIO 知识,并展示如何将网络编程与文件处理结合的具体实践。

Java NIO 核心组件解析

Java NIO 主要由 Buffer(缓冲区)、Channel(通道)和 Selector(选择器)三大核心组件构成。

Buffer:数据存储的容器

Buffer 用于存储特定基本数据类型的数据,其具备 capacity(容量)、position(位置)、limit(上限)和 mark(标记)等关键属性。以 ByteBuffer 为例,创建容量为 10 的 ByteBuffer 并写入数据,再切换到读模式读取,使用完毕后清空缓冲区,示例代码如下:

import java.nio.ByteBuffer;

public class BufferExample {
    public static void main(String[] args) {
        ByteBuffer buffer = ByteBuffer.allocate(10);
        buffer.put((byte) 'H');
        buffer.put((byte) 'e');
        buffer.put((byte) 'l');
        buffer.put((byte) 'l');
        buffer.put((byte) 'o');
        buffer.flip();
        while(buffer.hasRemaining()) {
            System.out.print((char) buffer.get());
        }
        buffer.clear();
    }
}

Channel:双向数据通道

Channel 是对传统 I/O 流的升级,支持双向数据传输,可与 Buffer 直接交互,常见实现有 FileChannel、SocketChannel 等。利用 FileChannel 进行文件复制,能直接在通道之间传输数据,提高效率:

import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.FileChannel;

public class FileChannelExample {
    public static void main(String[] args) {
        try (FileInputStream fis = new FileInputStream("source.txt");
             FileOutputStream fos = new FileOutputStream("target.txt");
             FileChannel inChannel = fis.getChannel();
             FileChannel outChannel = fos.getChannel()) {
            inChannel.transferTo(0, inChannel.size(), outChannel);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

Selector:多路复用的关键

Selector 可检测多个 Channel 上的事件,实现单个线程管理多个 Channel。在使用时,先创建 Selector 实例,将 Channel 注册到 Selector 并指定感兴趣的事件,再通过 select () 方法等待事件发生并处理。如下是使用 Selector 实现的简单 TCP 服务器示例:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class NioServer {
    private static final int PORT = 8080;
    private static final int BUFFER_SIZE = 1024;

    public static void main(String[] args) {
        try (Selector selector = Selector.open();
             ServerSocketChannel serverSocketChannel = ServerSocketChannel.open()) {
            serverSocketChannel.bind(new InetSocketAddress(PORT));
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("服务器启动,监听端口: " + PORT);
            while (true) {
                int readyChannels = selector.select();
                if (readyChannels == 0) continue;
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
                while (keyIterator.hasNext()) {
                    SelectionKey key = keyIterator.next();
                    if (key.isAcceptable()) {
                        ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                        SocketChannel socketChannel = ssc.accept();
                        socketChannel.configureBlocking(false);
                        socketChannel.register(selector, SelectionKey.OP_READ);
                        System.out.println("新客户端连接: " + socketChannel.getRemoteAddress());
                    } 
                    else if (key.isReadable()) {
                        SocketChannel socketChannel = (SocketChannel) key.channel();
                        ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
                        int bytesRead = socketChannel.read(buffer);
                        if (bytesRead > 0) {
                            buffer.flip();
                            byte[] data = new byte[buffer.remaining()];
                            buffer.get(data);
                            String message = new String(data);
                            System.out.println("收到客户端消息: " + message);
                            ByteBuffer response = ByteBuffer.wrap(("服务器已收到: " + message).getBytes());
                            socketChannel.write(response);
                        } else if (bytesRead == -1) {
                            System.out.println("客户端断开连接: " + socketChannel.getRemoteAddress());
                            socketChannel.close();
                        }
                    }
                    keyIterator.remove();
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

Java NIO 在文件处理中的应用

文件的基本读写

NIO 提供了强大的文件处理能力。在文件读取方面,通过 FileChannel 和 ByteBuffer,利用直接缓冲区减少内存拷贝,实现高效读取:

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;

public class FileReadExample {
    public static void main(String[] args) {
        Path path = Paths.get("example.txt");
        try (FileChannel channel = FileChannel.open(path, StandardOpenOption.READ)) {
            ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
            while (channel.read(buffer) != -1) {
                buffer.flip();
                while (buffer.hasRemaining()) {
                    System.out.print((char) buffer.get());
                }
                buffer.clear();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

文件写入时,可指定文件的打开方式,如创建、写入和截断原有内容等:

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;

public class FileWriteExample {
    public static void main(String[] args) {
        Path path = Paths.get("output.txt");
        String content = "Hello, NIO File Channel!";
        try (FileChannel channel = FileChannel.open(
                path, 
                StandardOpenOption.CREATE, 
                StandardOpenOption.WRITE, 
                StandardOpenOption.TRUNCATE_EXISTING)) {
            ByteBuffer buffer = ByteBuffer.wrap(content.getBytes());
            channel.write(buffer);
            System.out.println("文件写入成功");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

大文件处理与其他操作

对于大文件复制,可使用内存映射文件技术,将文件部分直接映射到内存,减少系统调用和数据拷贝。此外,还能进行文件追加写入、属性操作以及目录遍历等操作 :

// 大文件复制(内存映射)
import java.io.IOException;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;

public class LargeFileCopyExample {
    private static final int BUFFER_SIZE = 1024 * 1024;
    public static void main(String[] args) {
        Path source = Paths.get("source_large_file.dat");
        Path target = Paths.get("target_large_file.dat");
        try (FileChannel inChannel = FileChannel.open(source, StandardOpenOption.READ);
             FileChannel outChannel = FileChannel.open(
                     target, 
                     StandardOpenOption.CREATE, 
                     StandardOpenOption.WRITE)) {
            long size = inChannel.size();
            long position = 0;
            while (position < size) {
                long chunkSize = Math.min(size - position, BUFFER_SIZE);
                MappedByteBuffer inBuffer = inChannel.map(
                        FileChannel.MapMode.READ_ONLY, 
                        position, 
                        chunkSize);
                outChannel.write(inBuffer);
                position += chunkSize;
            }
            System.out.println("大文件复制完成");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

// 文件追加写入
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;

public class FileAppendExample {
    public static void main(String[] args) {
        Path path = Paths.get("append.txt");
        String appendContent = "\nThis is appended content.";
        try (FileChannel channel = FileChannel.open(
                path, 
                StandardOpenOption.CREATE, 
                StandardOpenOption.WRITE, 
                StandardOpenOption.APPEND)) {
            ByteBuffer buffer = ByteBuffer.wrap(appendContent.getBytes());
            channel.write(buffer);
            System.out.println("内容追加成功");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

// 文件属性操作
import java.io.IOException;
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributes;
import java.nio.file.attribute.FileTime;
import java.time.Instant;

public class FileAttributeExample {
    public static void main(String[] args) {
        Path path = Paths.get("example.txt");
        try {
            if (Files.exists(path)) {
                BasicFileAttributes attrs = Files.readAttributes(path, BasicFileAttributes.class);
                System.out.println("创建时间: " + attrs.creationTime());
                System.out.println("最后修改时间: " + attrs.lastModifiedTime());
                System.out.println("是否为目录: " + attrs.isDirectory());
                System.out.println("文件大小: " + attrs.size() + " 字节");
                Files.setLastModifiedTime(path, FileTime.from(Instant.now()));
                System.out.println("已更新文件最后修改时间");
                Path newPath = Paths.get("example_renamed.txt");
                Files.move(path, newPath, StandardCopyOption.REPLACE_EXISTING);
                System.out.println("文件已重命名");
            } else {
                System.out.println("文件不存在");
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

// 目录遍历
import java.io.IOException;
import java.nio.file.*;

public class DirectoryTraversalExample {
    public static void main(String[] args) {
        Path dir = Paths.get(".");
        try (DirectoryStream<Path> stream = Files.newDirectoryStream(dir)) {
            System.out.println("目录内容:");
            for (Path path : stream) {
                if (Files.isDirectory(path)) {
                    System.out.println("[目录] " + path.getFileName());
                } else {
                    System.out.println("[文件] " + path.getFileName() + 
                            " (大小: " + Files.size(path) + " 字节)");
                }
            }
        } catch (IOException | DirectoryIteratorException e) {
            e.printStackTrace();
        }
    }
}

Java NIO 网络编程与文件处理的结合实践

将 NIO 的网络编程与文件处理功能相结合,可实现文件传输系统。以下为一个简单的文件传输服务器和客户端示例。

服务器端实现

服务器端使用 Selector 实现非阻塞 I/O,能同时处理多个客户端连接,支持文件上传和下载,文件存储在server_files目录下,并通过状态机管理客户端连接的不同阶段。核心代码如下:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.file.*;
import java.util.Iterator;
import java.util.Set;

public class FileTransferServer {
    private static final int PORT = 8080;
    private static final int BUFFER_SIZE = 8192;
    private static final Path FILE_DIR = Paths.get("server_files");

    public static void main(String[] args) {
        // 创建服务器文件目录
        try {
            Files.createDirectories(FILE_DIR);
        } catch (IOException e) {
            System.err.println("无法创建文件目录: " + e.getMessage());
            return;
        }

        try (Selector selector = Selector.open();
             ServerSocketChannel serverChannel = ServerSocketChannel.open()) {
            
            // 配置服务器通道
            serverChannel.bind(new InetSocketAddress(PORT));
            serverChannel.configureBlocking(false);
            serverChannel.register(selector, SelectionKey.OP_ACCEPT);
            
            System.out.println("文件传输服务器启动,监听端口: " + PORT);
            
            // 事件循环
            while (true) {
                selector.select();
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
                
                while (keyIterator.hasNext()) {
                    SelectionKey key = keyIterator.next();
                    
                    if (key.isAcceptable()) {
                        handleAccept(key, selector);
                    } else if (key.isReadable()) {
                        handleRead(key);
                    } else if (key.isWritable()) {
                        handleWrite(key);
                    }
                    
                    keyIterator.remove();
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    // 处理新连接
    private static void handleAccept(SelectionKey key, Selector selector) throws IOException {
        ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
        SocketChannel clientChannel = serverChannel.accept();
        clientChannel.configureBlocking(false);
        
        // 注册读事件,等待客户端命令
        clientChannel.register(selector, SelectionKey.OP_READ, new ClientState());
        
        System.out.println("新客户端连接: " + clientChannel.getRemoteAddress());
    }

    // 处理读事件
    private static void handleRead(SelectionKey key) throws IOException {
        SocketChannel clientChannel = (SocketChannel) key.channel();
        ClientState state = (ClientState) key.attachment();
        ByteBuffer buffer = state.buffer;
        
        buffer.clear();
        int bytesRead = clientChannel.read(buffer);
        
        if (bytesRead == -1) {
            // 客户端关闭连接
            System.out.println("客户端断开连接: " + clientChannel.getRemoteAddress());
            clientChannel.close();
            return;
        }
        
        buffer.flip();
        
        if (!state.commandProcessed) {
            // 处理命令
            processCommand(key, buffer, state);
        } else if (state.operation == Operation.UPLOAD) {
            // 处理文件上传
            processFileUpload(key, buffer, state);
        } else if (state.operation == Operation.DOWNLOAD) {
            // 准备文件下载
            prepareFileDownload(key, state);
        }
    }

    // 处理写事件
    private static void handleWrite(SelectionKey key) throws IOException {
        SocketChannel clientChannel = (SocketChannel) key.channel();
        ClientState state = (ClientState) key.attachment();
        
        if (state.operation == Operation.DOWNLOAD && state.fileChannel != null) {
            // 执行文件下载
            long bytesWritten = state.fileChannel.transferTo(state.position, BUFFER_SIZE, clientChannel);
            state.position += bytesWritten;
            
            if (state.position >= state.fileSize) {
                // 文件传输完成
                System.out.println("文件下载完成: " + state.fileName);
                cleanupFileTransfer(state);
                key.interestOps(SelectionKey.OP_READ);
            }
        }
    }

    // 处理客户端命令
    private static void processCommand(SelectionKey key, ByteBuffer buffer, ClientState state) throws IOException {
        // 读取命令类型
        if (buffer.remaining() < 1) return;
        byte command = buffer.get();
        
        // 读取文件名长度
        if (buffer.remaining() < 4) return;
        int fileNameLength = buffer.getInt();
        
        // 读取文件名
        if (buffer.remaining() < fileNameLength) return;
        byte[] fileNameBytes = new byte[fileNameLength];
        buffer.get(fileNameBytes);
        String fileName = new String(fileNameBytes);
        
        state.fileName = fileName;
        Path filePath = FILE_DIR.resolve(fileName);
        
        if (command == 'U') {
            // 上传命令
            state.operation = Operation.UPLOAD;
            state.fileChannel = FileChannel.open(
                    filePath, 
                    StandardOpenOption.CREATE, 
                    StandardOpenOption.WRITE);
            state.commandProcessed = true;
            System.out.println("准备接收文件: " + fileName);
        } else if (command == 'D') {
            // 下载命令
            if (Files.exists(filePath)) {
                state.operation = Operation.DOWNLOAD;
                state.fileSize = Files.size(filePath);
                state.fileChannel = FileChannel.open(filePath, StandardOpenOption.READ);
                state.commandProcessed = true;
                
                // 发送文件大小给客户端
                ByteBuffer sizeBuffer = ByteBuffer.allocate(8);
                sizeBuffer.putLong(state.fileSize);
                sizeBuffer.flip();
                ((SocketChannel) key.channel()).write(sizeBuffer);
                
                key.interestOps(SelectionKey.OP_WRITE);
                System.out.println("准备发送文件: " + fileName + " (大小: " + state.fileSize + " 字节)");
            } else {
                // 文件不存在
                ByteBuffer response = ByteBuffer.wrap("FILE_NOT_FOUND".getBytes());
                ((SocketChannel) key.channel()).write(response);
                state.reset();
            }
        }
    }

    // 处理文件上传
    private static void processFileUpload(SelectionKey key, ByteBuffer buffer, ClientState state) throws IOException {
        if (buffer.hasRemaining()) {
            state.fileChannel.write(buffer);
        }
        
        // 检查文件是否传输完成
        if (state.expectedFileSize > 0 && state.fileChannel.size() >= state.expectedFileSize) {
            System.out.println("文件上传完成: " + state.fileName);
            cleanupFileTransfer(state);
            key.interestOps(SelectionKey.OP_READ);
        }
    }

    // 准备文件下载
    private static void prepareFileDownload(SelectionKey key, ClientState state) {
        key.interestOps(SelectionKey.OP_WRITE);
    }

    // 清理文件传输资源
    private static void cleanupFileTransfer(ClientState state) {
        try {
            if (state.fileChannel != null) {
                state.fileChannel.close();
                state.fileChannel = null;
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        state.reset();
    }

    // 客户端状态
    private static class ClientState {
        ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
        Operation operation = null;
        String fileName = null;
        FileChannel fileChannel = null;
        long fileSize = 0;
        long position = 0;
        long expectedFileSize = 0;
        boolean commandProcessed = false;

        void reset() {
            operation = null;
            fileName = null;
            fileSize = 0;
            position = 0;
            expectedFileSize = 0;
            commandProcessed = false;
        }
    }

    // 操作类型
    private enum Operation {
        UPLOAD, DOWNLOAD
    }
}

客户端完整代码

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.SocketChannel;
import java.nio.file.*;
import java.util.Scanner;

public class FileTransferClient {
    private static final String SERVER_HOST = "localhost";
    private static final int SERVER_PORT = 8080;
    private static final int BUFFER_SIZE = 8192;
    private static final Path DOWNLOAD_DIR = Paths.get("client_downloads");

    public static void main(String[] args) {
        // 创建下载目录
        try {
            Files.createDirectories(DOWNLOAD_DIR);
        } catch (IOException e) {
            System.err.println("无法创建下载目录: " + e.getMessage());
            return;
        }

        try (SocketChannel socketChannel = SocketChannel.open();
             Scanner scanner = new Scanner(System.in)) {
            
            // 连接服务器
            socketChannel.connect(new InetSocketAddress(SERVER_HOST, SERVER_PORT));
            System.out.println("已连接到服务器: " + SERVER_HOST + ":" + SERVER_PORT);
            
            while (true) {
                System.out.println("\n请选择操作:");
                System.out.println("1. 上传文件");
                System.out.println("2. 下载文件");
                System.out.println("3. 退出");
                System.out.print("输入选项: ");
                
                int choice = scanner.nextInt();
                scanner.nextLine(); // 消耗换行符
                
                switch (choice) {
                    case 1:
                        uploadFile(socketChannel, scanner);
                        break;
                    case 2:
                        downloadFile(socketChannel, scanner);
                        break;
                    case 3:
                        System.out.println("退出客户端");
                        return;
                    default:
                        System.out.println("无效选项");
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    // 上传文件
    private static void uploadFile(SocketChannel socketChannel, Scanner scanner) throws IOException {
        System.out.print("请输入要上传的文件路径: ");
        String filePath = scanner.nextLine();
        Path path = Paths.get(filePath);
        
        if (!Files.exists(path) || Files.isDirectory(path)) {
            System.out.println("文件不存在或为目录");
            return;
        }
        
        String fileName = path.getFileName().toString();
        long fileSize = Files.size(path);
        
        // 发送上传命令
        ByteBuffer commandBuffer = ByteBuffer.allocate(5 + fileName.getBytes().length);
        commandBuffer.put((byte) 'U'); // U 表示上传
        commandBuffer.putInt(fileName.getBytes().length);
        commandBuffer.put(fileName.getBytes());
        commandBuffer.flip();
        
        socketChannel.write(commandBuffer);
        
        // 发送文件内容
        try (FileChannel fileChannel = FileChannel.open(path, StandardOpenOption.READ)) {
            long bytesTransferred = 0;
            long totalBytes = fileSize;
            
            System.out.println("开始上传文件: " + fileName + " (大小: " + fileSize + " 字节)");
            
            while (bytesTransferred < totalBytes) {
                bytesTransferred += fileChannel.transferTo(0, totalBytes, socketChannel);
                System.out.printf("上传进度: %.2f%%\r", (100.0 * bytesTransferred / totalBytes));
            }
            
            System.out.println("\n文件上传完成");
        }
    }

    // 下载文件
    private static void downloadFile(SocketChannel socketChannel, Scanner scanner) throws IOException {
        System.out.print("请输入要下载的文件名: ");
        String fileName = scanner.nextLine();
        
        // 发送下载命令
        ByteBuffer commandBuffer = ByteBuffer.allocate(5 + fileName.getBytes().length);
        commandBuffer.put((byte) 'D'); // D 表示下载
        commandBuffer.putInt(fileName.getBytes().length);
        commandBuffer.put(fileName.getBytes());
        commandBuffer.flip();
        
        socketChannel.write(commandBuffer);
        
        // 接收文件大小
        ByteBuffer sizeBuffer = ByteBuffer.allocate(8);
        int bytesRead = socketChannel.read(sizeBuffer);
        
        if (bytesRead == -1) {
            System.out.println("服务器断开连接");
            return;
        }
        
        sizeBuffer.flip();
        
        if (sizeBuffer.remaining() < 8) {
            // 读取错误消息
            byte[] errorBytes = new byte[sizeBuffer.remaining()];
            sizeBuffer.get(errorBytes);
            String errorMessage = new String(errorBytes);
            System.out.println("下载失败: " + errorMessage);
            return;
        }
        
        long fileSize = sizeBuffer.getLong();
        Path downloadPath = DOWNLOAD_DIR.resolve(fileName);
        
        // 下载文件
        try (FileChannel fileChannel = FileChannel.open(
                downloadPath, 
                StandardOpenOption.CREATE, 
                StandardOpenOption.WRITE)) {
            
            System.out.println("开始下载文件: " + fileName + " (大小: " + fileSize + " 字节)");
            
            ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
            long bytesDownloaded = 0;
            
            while (bytesDownloaded < fileSize) {
                buffer.clear();
                int bytesReadThisTime = socketChannel.read(buffer);
                
                if (bytesReadThisTime == -1) {
                    System.out.println("下载中断,文件可能不完整");
                    break;
                }
                
                buffer.flip();
                fileChannel.write(buffer);
                bytesDownloaded += bytesReadThisTime;
                
                System.out.printf("下载进度: %.2f%%\r", (100.0 * bytesDownloaded / fileSize));
            }
            
            System.out.println("\n文件下载完成,保存至: " + downloadPath);
        }
    }
}

通信协议说明

这个文件传输系统使用了简单的自定义协议:

  1. 命令格式

    • [命令类型 (1 字节)][文件名长度 (4 字节)][文件名 (n 字节)]
    • 上传命令:'U' + 文件名
    • 下载命令:'D' + 文件名
  2. 数据传输

    • 文件数据直接通过 Channel 传输
    • 下载前先发送文件大小信息
    • 上传时服务器根据接收数据自动创建文件
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值