使用 MongoDB 构建弹性应用程序
要编写能够利用 MongoDB 功能并妥善处理副本集选举的应用程序代码,您应该:
安装最新的驱动程序。
使用指定所有主机的连接字符串。
使用可重试写入和可重试读取。
使用对您的应用程序有意义的
majority
写关注和读关注。处理应用程序中的错误。
安装最新驱动程序
首先,从 MongoDB 驱动程序安装适合您的语言的最新驱动程序。驱动程序将查询从应用程序连接并中继到数据库。使用最新的驱动程序可以使用最新的 MongoDB 功能。
然后,在应用程序中,导入依赖项:
// Latest 'mongodb' version installed with npm const MongoClient = require('mongodb').MongoClient;
连接字符串(Connection Strings)
使用指定部署中所有主机的连接string将应用程序连接到数据库。 如果您的部署执行副本集选举并选举了新的主节点,则指定部署中所有主机的连接string会在没有应用程序逻辑的情况下发现新的主节点。
您可以使用以下任一方法指定部署中的所有主机:
连接string还可以指定选项,特别是retryWrites和writeConcern。
另请参阅:
如需格式化连接string的帮助,请参阅 使用MongoDB驱动程序连接到部署。
使用连接字符串在应用程序中实例化 MongoDB 客户端:
// Create a variable for your connection string String uri = "mongodb://[<username>:<password>@]hostname0<:port>[,hostname1:<port1>][,hostname2:<port2>][...][,hostnameN:<portN>]"; // Instantiate the MongoDB client with the URI MongoClient client = MongoClients.create(uri);
// Create a variable for your connection string const uri = "mongodb://[<username>:<password>@]hostname0<:port>[,hostname1:<port1>][,hostname2:<port2>][...][,hostnameN:<portN>]"; // Instantiate the MongoDB client with the URI const client = new MongoClient(uri, { useNewUrlParser: true, useUnifiedTopology: true });
可重试写入和读取
注意
从 MongoDB 3.6版本开始以及4.2兼容驱动程序,MongoDB 默认会重试一次写入和读取操作。
可重试写入 (Retryable Writes)
如果某些写入操作失败,则使用可重试写入重试一次这些操作。
重试写入一次是处理暂时性网络错误和副本集选举(在此类错误中,应用程序暂时无法找到正常主节点)的最佳策略。 如果重试成功,则整个操作成功,并且不会返回错误。 如果操作失败,原因可能是:
持续的网络错误,或
无效命令。
另请参阅:
有关启用可重试写入的更多信息,请参阅启用可重试写入。
当操作失败时,应用程序需要自行处理错误。
可重试读取
如果读取操作在 MongoDB 3.6版本中和使用4.2兼容驱动程序中启动失败,则会自动重试一次。 您无需将应用程序配置为重试读取。
写关注和读关注
可以使用写关注和读关注来调整应用程序的一致性和可用性。更严格的关注意味着数据库操作需要等待更强的数据一致性保证,而宽松的一致性要求则提供更高的可用性。
例子
如果您的应用程序处理货币余额,则一致性极为重要。 您可以使用majority
写关注和读关注(read concern)来确保您永远不会读取过时的数据或可能回滚的数据。
或者,如果您的应用程序每秒记录来自数百个传感器的温度数据,您可能不会担心读取的数据是否不包括最新读数。 您可以放宽一致性要求,以更快地访问该数据。
写关注
您可以通过连接string URI 设置副本集的 写关注级别 。使用majority
写关注确保您的数据成功写入数据库并持久保存。 这是推荐的默认值,对于大多数使用案例来说,这是足够的。
当您使用需要确认的写关注(例如 majority
)时,您还可以指定写入达到该确认级别的最大时间限制:
用于所有写入的 wtimeoutMS 连接字符串参数,或
用于单次写入操作的 wtimeout 选项。
是否使用时间限制以及使用的值取决于应用程序上下文。
另请参阅:
有关设置写关注(write concern)级别的更多信息,请参阅写关注选项。
重要
如果没有指定写入时间限制,并且写关注(write concern)级别为无法实现,则写入操作将永远无法完成。
读关注 (read concern)
您可以通过连接string URI副本集的 读关注(read concern)级别 。理想的读关注(read concern)取决于应用程序要求,但默认对于大多数使用案例来说就足够了。 使用默认读关注不需要连接string参数。
指定读关注(read concern)可以提高对应用程序从数据库接收的数据的保证。
另请参阅:
有关设置读关注级别的更多信息,请参阅读关注选项。
注意
应用程序使用的写入关注和读关注(read concern)的特定组合会影响操作顺序ACID 一致性保证。 这称为因果一致性。 有关因果一致性ACID 一致性保证的更多信息,请参阅因果一致性和读写关注。
Error Handling
可重试写入未处理的无效命令、网络中断和网络错误都会返回错误。 有关错误详细信息,请参阅驱动程序的 API文档。
例如,如果应用程序尝试插入具有重复_id
的文档,您的驱动程序将返回错误,其中包括:
Unable to insert due to an error: com.mongodb.MongoWriteException: E11000 duplicate key error collection: <db>.<collection> ...
{ "name": : "MongoError", "message": "E11000 duplicate key error collection on: <db>.<collection> ... ", ... }
如果没有正确的错误处理,错误可能会阻止应用程序处理请求,直到重新启动为止。
您的应用程序应处理错误,而不会崩溃或产生副作用。 在前面的应用程序插入重复_id
的示例中,该应用程序可以按如下方式处理错误:
// Declare a logger instance from java.util.logging.Logger private static final Logger LOGGER = ... ... try { InsertOneResult result = collection.insertOne(new Document() .append("_id", 1) .append("body", "I'm a goofball trying to insert a duplicate _id")); // Everything is OK LOGGER.info("Inserted document id: " + result.getInsertedId()); // Refer to the API documentation for specific exceptions to catch } catch (MongoException me) { // Report the error LOGGER.severe("Failed due to an error: " + me); }
... collection.insertOne({ _id: 1, body: "I'm a goofball trying to insert a duplicate _id" }) .then(result => { response.sendStatus(200) // send "OK" message to the client }, err => { response.sendStatus(400); // send "Bad Request" message to the client });
此示例中的插入操作在第二次调用时会引发“重复键”错误,因为_id
字段必须是唯一的。 应用程序捕获错误,通知客户端,然后应用继续运行。 但是,插入操作失败,您可以决定是否向用户显示消息、重试操作或执行其他操作。
您应该始终记录错误。进一步处理错误的常见策略包括:
将错误返回给客户端,并显示错误消息。 当您无法解决错误并且需要通知用户操作无法完成时,这是一个很好的策略。
写入备份数据库。 当您无法解决错误但又不想冒丢失请求数据的风险时,这是一个很好的策略。
在单次默认重试之后重试该操作。如果您可以通过编程方式解决错误原因,请重试,这是一个很好的策略。
您必须为应用程序上下文选择最佳策略。
例子
在重复键错误的示例中,您应该记录错误但不要重试操作,因为它永远不会成功。相反,您可以写入回退数据库并稍后查看该数据库的内容,以确保不会丢失任何信息。用户无需执行任何其他操作,数据就会被记录下来,因此您可以选择不向客户端发送错误消息。
规划网络错误
当操作永远无法完成并阻止应用程序执行新操作时,返回错误可能是理想行为。 您可以使用maxTimeMS方法对单个操作设置时间限制,如果超过该时间限制,则返回错误供应用程序进行处理。
对每个操作设置的时间限制取决于该操作的上下文。
例子
如果您的应用程序读取并显示 inventory
集合中的简单产品信息,您可以确信这些读取操作只需要一点时间。查询长时间运行表明一直存在网络问题。将该操作的 maxTimeMS
设置为 5000(即 5 秒)意味着,一旦您确信存在网络问题,应用程序就会收到反馈。
弹性示例应用程序
以下示例应用程序汇集了构建弹性应用程序的建议。
该应用程序是一个简单的用户记录 API ,它在 http://localhost: 上公开两个端点:3000
方法 | 端点 | 说明 |
---|---|---|
|
| 从 |
|
| 要求在请求正文中加入 |
1 // File: App.java 2 3 import java.util.Map; 4 import java.util.logging.Logger; 5 6 import org.bson.Document; 7 import org.json.JSONArray; 8 9 import com.mongodb.MongoException; 10 import com.mongodb.client.MongoClient; 11 import com.mongodb.client.MongoClients; 12 import com.mongodb.client.MongoCollection; 13 import com.mongodb.client.MongoDatabase; 14 15 import fi.iki.elonen.NanoHTTPD; 16 17 public class App extends NanoHTTPD { 18 private static final Logger LOGGER = Logger.getLogger(App.class.getName()); 19 20 static int port = 3000; 21 static MongoClient client = null; 22 23 public App() throws Exception { 24 super(port); 25 26 // Replace the uri string with your MongoDB deployment's connection string 27 String uri = "mongodb://<username>:<password>@hostname0:27017,hostname1:27017,hostname2:27017/?retryWrites=true&w=majority"; 28 client = MongoClients.create(uri); 29 30 start(NanoHTTPD.SOCKET_READ_TIMEOUT, false); 31 LOGGER.info("\nStarted the server: http://localhost:" + port + "/ \n"); 32 } 33 34 public static void main(String[] args) { 35 try { 36 new App(); 37 } catch (Exception e) { 38 LOGGER.severe("Couldn't start server:\n" + e); 39 } 40 } 41 42 43 public Response serve(IHTTPSession session) { 44 StringBuilder msg = new StringBuilder(); 45 Map<String, String> params = session.getParms(); 46 47 Method reqMethod = session.getMethod(); 48 String uri = session.getUri(); 49 50 if (Method.GET == reqMethod) { 51 if (uri.equals("/")) { 52 msg.append("Welcome to my API!"); 53 } else if (uri.equals("/users")) { 54 msg.append(listUsers(client)); 55 } else { 56 msg.append("Unrecognized URI: ").append(uri); 57 } 58 } else if (Method.POST == reqMethod) { 59 try { 60 String name = params.get("name"); 61 if (name == null) { 62 throw new Exception("Unable to process POST request: 'name' parameter required"); 63 } else { 64 insertUser(client, name); 65 msg.append("User successfully added!"); 66 } 67 } catch (Exception e) { 68 msg.append(e); 69 } 70 } 71 72 return newFixedLengthResponse(msg.toString()); 73 } 74 75 static String listUsers(MongoClient client) { 76 MongoDatabase database = client.getDatabase("test"); 77 MongoCollection<Document> collection = database.getCollection("users"); 78 79 final JSONArray jsonResults = new JSONArray(); 80 collection.find().forEach((result) -> jsonResults.put(result.toJson())); 81 82 return jsonResults.toString(); 83 } 84 85 static String insertUser(MongoClient client, String name) throws MongoException { 86 MongoDatabase database = client.getDatabase("test"); 87 MongoCollection<Document> collection = database.getCollection("users"); 88 89 collection.insertOne(new Document().append("name", name)); 90 return "Successfully inserted user: " + name; 91 } 92 }
注意
以下服务器应用程序使用 Express,您需要先将其作为依赖项添加到项目中,然后才能运行它。
1 const express = require('express'); 2 const bodyParser = require('body-parser'); 3 4 // Use the latest drivers by installing & importing them 5 const MongoClient = require('mongodb').MongoClient; 6 7 const app = express(); 8 app.use(bodyParser.json()); 9 app.use(bodyParser.urlencoded({ extended: true })); 10 11 // Use a connection string that lists all hosts 12 // with retryable writes & majority write concern 13 const uri = "mongodb://<username>:<password>@hostname0:27017,hostname1:27017,hostname2:27017/?retryWrites=true&w=majority"; 14 15 const client = new MongoClient(uri, { 16 useNewUrlParser: true, 17 useUnifiedTopology: true 18 }); 19 20 // ----- API routes ----- // 21 app.get('/', (req, res) => res.send('Welcome to my API!')); 22 23 app.get('/users', (req, res) => { 24 const collection = client.db("test").collection("users"); 25 26 collection 27 .find({}) 28 // In this example, 'maxTimeMS' throws an error after 5 seconds, 29 // alerting the application to a lasting network outage 30 .maxTimeMS(5000) 31 .toArray((err, data) => { 32 if (err) { 33 // Handle errors in your application 34 // In this example, by sending the client a message 35 res.send("The request has timed out. Please check your connection and try again."); 36 } 37 return res.json(data); 38 }); 39 }); 40 41 app.post('/users', (req, res) => { 42 const collection = client.db("test").collection("users"); 43 collection.insertOne({ name: req.body.name }) 44 .then(result => { 45 res.send("User successfully added!"); 46 }, err => { 47 // Handle errors in your application 48 // In this example, by sending the client a message 49 res.send("An application error has occurred. Please try again."); 50 }) 51 }); 52 // ----- End of API routes ----- // 53 54 app.listen(3000, () => { 55 console.log(`Listening on port 3000.`); 56 client.connect(err => { 57 if (err) { 58 console.log("Not connected: ", err); 59 process.exit(0); 60 } 61 console.log('Connected.'); 62 }); 63 });