在本手册中,我们介绍了为应用程序提供事务保护所需的概念和机制。 在本章中,我们总结了这些机制,并提供了一个多线程事务JE应用程序的完整示例。
6.1 Anatomy of a Transactional Application 事务申请的剖析
事务应用程序的特点是执行以下活动:
- 创建环境句柄。
- 打开您的环境,指定要使用事务子系统。
- 如果您使用的是基本API,请打开数据库句柄,指示它们是否支持事务。否则,打开您的商店,使其配置为进行事务。
产生工人线程。您需要多少这些以及如何分割其JE工作负载完全取决于您的应用程序的要求。但是,执行写入操作的任何工作线程都将执行以下操作:
a. 开始事务。 b.执行一个或多个读写操作。 c.如果一切顺利,请提交交易。 d.如果检测到死锁,则中止并重试该操作。 e.中止大多数其他错误的交易。
5.应用程序关闭时:
a. 确保没有打开的游标。
b. 确保没有活动的交易。在关闭之前中止或提交所有事务。
c.关闭数据库。
d.关闭你的环境。
注意
强大的JE应用程序应监视其工作线程,以确保它们没有意外死亡。
如果线程异常终止,则必须关闭所有工作线程,然后运行正常恢复(您必须重新打开环境才能执行此操作)。
这是清除异常退出的工作线程在死亡时可能保留的任何资源(例如锁或互斥锁)的唯一方法。
无法执行此恢复可能会导致仍在运行的工作线程最终在等待永远不会释放的锁定时永远阻塞。
除了这些活动(完全由应用程序中的代码处理)之外,您还需要定期备份日志文件。 这是获得JE的交易ACID支持所提供的耐久性保证所必需的。 有关更多信息,请参阅备份和恢复Berkeley DB,Java版应用程序。
6.2 Base API Transaction Example 基础API事务范例
以下Java代码提供了多线程事务JE应用程序的完整功能示例。该示例打开一个环境和数据库,然后创建5个线程,每个线程将500条记录写入数据库。用于这些写入的键是预定义的字符串,而数据是包含随机生成的数据的类。这意味着实际数据是任意的,因此无趣;我们之所以选择它只是因为它需要最少的代码才能实现,因此它将不会影响本示例的主要内容。
每个线程在提交和写入另外10个之前在单个事务下写入10个记录(这重复50次)。在每个事务结束时,但在提交之前,每个线程都调用一个函数,该函数使用游标来读取数据库中的每个记录。我们这样做是为了在事务环境中提出有关数据库读取的一些观点。
当然,每个编写器线程都执行本手册中描述的死锁检测。此外,在打开环境时执行正常恢复。
要实现此示例,我们需要三个类:
- TxnGuide.java
这是应用程序的主要类。 它执行环境和数据库管理,生成线程,并创建放置在数据库中的数据。
- DBWriter.java
这个类扩展了java.lang.Thread,因此它是我们的线程实现。 它负责实际读取和写入数据库。 它还执行我们的所有事务管理。
- PayloadData.java
这是一个用于封装多个数据字段的数据类。 这是相当无趣的,除了使用类意味着我们必须使用绑定API来序列化它以便在数据库中存储。
6.2.1 TxnGuide.java
我们的示例应用程序中的主类用于打开和关闭我们的环境和数据库。 它还产生了我们需要的所有线程。 我们从正常的Java包和import语句开始,然后是我们的类声明:
// File TxnGuide.java
package je.txn;
import com.sleepycat.bind.serial.StoredClassCatalog;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
import java.io.File;
import java.io.FileNotFoundException;
public class TxnGuide { 接下来,我们声明我们的类’私有数据成员。 大多数情况下,它们用于常量,例如我们正在打开的数据库的名称以及我们正在生成的线程数。 但是,我们还在此处声明了我们的环境和数据库句柄。
private static String myEnvPath = "./";
private static String dbName = "mydb.db";
private static String cdbName = "myclassdb.db";
// DB handles
private static Database myDb = null;
private static Database myClassDb = null;
private static Environment myEnv = null;
private static final int NUMTHREADS = 5; 接下来,我们实现usage()方法。 此应用程序可选地接受单个命令行参数,该参数用于标识环境主目录。
private static void usage() {
System.out.println("TxnGuide [-h <env directory>]");
System.exit(-1);
} 现在我们实现main()方法。 此方法只调用方法来解析命令行参数并打开环境和数据库。 它还创建了我们用于序列化我们想要存储在数据库中的数据的存储类目录。 最后,它创建然后加入数据库编写器线程。
public static void main(String args[]) {
try {
// Parse the arguments list
parseArgs(args);
// Open the environment and databases
openEnv();
// Get our class catalog (used to serialize objects)
StoredClassCatalog classCatalog =
new StoredClassCatalog(myClassDb);
// Start the threads
DBWriter[] threadArray;
threadArray = new DBWriter[NUMTHREADS];
for (int i = 0; i < NUMTHREADS; i++) {
threadArray[i] = new DBWriter(myEnv, myDb, classCatalog);
threadArray[i].start();
}
// Join the threads. That is, wait for each thread to
// complete before exiting the application.
for (int i = 0; i < NUMTHREADS; i++) {
threadArray[i].join();
}
} catch (Exception e) {
System.err.println("TxnGuide: " + e.toString());
e.printStackTrace();
} finally {
closeEnv();
}
System.out.println("All done.");
} 接下来我们实现openEnv()。 此方法用于打开环境,然后在该环境中打开数据库。 在此过程中,我们确保事务子系统已正确初始化。
对于打开的数据库,请注意我们打开数据库以使其支持重复记录。 这完全是由我们写入数据库的数据所必需的,只有在没有首先删除环境的情况下多次运行应用程序时才需要这样做。
private static void openEnv() throws DatabaseException {
System.out.println("opening env");
// Set up the environment.
EnvironmentConfig myEnvConfig = new EnvironmentConfig();
myEnvConfig.setAllowCreate(true);
myEnvConfig.setTransactional(true);
// Environment handles are free-threaded by default in JE,
// so we do not have to do anything to cause the
// environment handle to be free-threaded.
// Set up the database
DatabaseConfig myDbConfig = new DatabaseConfig();
myDbConfig.setAllowCreate(true);
myDbConfig.setTransactional(true);
myDbConfig.setSortedDuplicates(true);
// Open the environment
myEnv = new Environment(new File(myEnvPath), // Env home
myEnvConfig);
// Open the database. Do not provide a txn handle. This open
// is auto committed because DatabaseConfig.setTransactional()
// is true.
myDb = myEnv.openDatabase(null, // txn handle
dbName, // Database file name
myDbConfig);
// Used by the bind API for serializing objects
// Class database must not support duplicates
myDbConfig.setSortedDuplicates(false);
myClassDb = myEnv.openDatabase(null, // txn handle
cdbName, // Database file name
myDbConfig);
} 最后,我们实现用于关闭环境和数据库的方法,解析命令行参数,并提供我们的类构造函数。 这是相当标准的代码,从本手册的角度来看,它几乎无趣。 我们在这里仅仅为了完整性而包含它。
private static void closeEnv() {
System.out.println("Closing env and databases");
if (myDb != null ) {
try {
myDb.close();
} catch (DatabaseException e) {
System.err.println("closeEnv: myDb: " +
e.toString());
e.printStackTrace();
}
}
if (myClassDb != null ) {
try {
myClassDb.close();
} catch (DatabaseException e) {
System.err.println("closeEnv: myClassDb: " +
e.toString());
e.printStackTrace();
}
}
if (myEnv != null ) {
try {
myEnv.close();
} catch (DatabaseException e) {
System.err.println("closeEnv: " + e.toString());
e.printStackTrace();
}
}
}
private TxnGuide() {}
private static void parseArgs(String args[]) {
for(int i = 0; i < args.length; ++i) {
if (args[i].startsWith("-")) {
switch(args[i].charAt(1)) {
case 'h':
myEnvPath = new String(args[++i]);
break;
default:
usage();
}
}
}
}
} 6.2.2 PayloadData.java
在我们展示数据库编写器线程的实现之前,我们需要显示我们将放入数据库的类。 这个课程相当简单。 它只是允许您存储和检索int,String和double。 我们将在编写器线程中使用JE绑定API来序列化此类的实例并将它们放入我们的数据库中。
import java.io.Serializable;
public class PayloadData implements Serializable {
private int oID;
private String threadName;
private double doubleData;
PayloadData(int id, String name, double data) {
oID = id;
threadName = name;
doubleData = data;
}
public double getDoubleData() { return doubleData; }
public int getID() { return oID; }
public String getThreadName() { return threadName; }
} 6.2.3 DBWriter.java
DBWriter.java为我们的数据库编写器线程提供了实现。 它负责:
- 所有交易管理。
- 响应死锁异常。
- 提供要存储到数据库中的数据。
- 序列化然后将数据写入数据库。
为了展示JE的事务支持提供的一些ACID属性,DBWriter.java以一种效率低于您可能决定在真实生产应用程序中使用的方式执行某些操作。首先,它可以在一个事务中将10个数据库写入组合在一起,就像您可以轻松地为每个事务执行一次写入一样。如果您这样做,您可以使用自动提交进行单个数据库写入,这意味着您的代码会稍微简单一些,并且您遇到阻塞和死锁操作的可能性要小得多。但是,通过这种方式,我们可以显示事务原子性以及死锁处理。
在每个事务结束时,DBWriter.java通过计算数据库中当前存在的记录数来在整个数据库上运行游标。有更好的方法来发现这些信息,但在这种情况下,我们想要对游标,事务应用程序和死锁提出一些观点(我们将在本节后面详细介绍)。
首先,我们提供通常的包和import语句,然后我们声明我们的类:
import com.sleepycat.bind.EntryBinding;
import com.sleepycat.bind.serial.StoredClassCatalog;
import com.sleepycat.bind.serial.SerialBinding;
import com.sleepycat.bind.tuple.StringBinding;
import com.sleepycat.je.Cursor;
import com.sleepycat.je.CursorConfig;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Environment;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.LockConflictException;
import com.sleepycat.je.OperationStatus;
import com.sleepycat.je.Transaction;
import java.io.UnsupportedEncodingException;
import java.util.Random;
public class DBWriter extends Thread
{ 接下来我们声明我们的私有数据成员。 请注意,我们获取了环境和数据库的句柄。 我们还获得了EntryBinding的句柄。 我们将使用它来序列化PayloadData类实例(请参阅PayloadData.java)以存储在数据库中。 我们实例化的随机数生成器用于生成用于存储在数据库中的唯一数据。 MAX_RETRY变量用于定义在遇到死锁时我们将重试事务的次数。 最后,keys是一个String数组,用于存放用于数据库条目的键。
private Database myDb = null;
private Environment myEnv = null;
private EntryBinding dataBinding = null;
private Random generator = new Random();
private static final int MAX_RETRY = 20;
private static String[] keys = {"key 1", "key 2", "key 3",
"key 4", "key 5", "key 6",
"key 7", "key 8", "key 9",
"key 10"}; 接下来我们实现我们的类构造函数。 我们在这里做的最有趣的事情是实例化串行绑定以序列化PayloadData实例。
//构造函数 从这里获取我们的数据库句柄
DBWriter(Environment env,Database db,StoredClassCatalog scc)
throws DatabaseException {
myDb = db;
myEnv = env;
dataBinding = new SerialBinding(scc,PayloadData.class); 现在我们实现我们的线程的run()方法。 这是在主程序中启动DBWriter线程时运行的方法(请参阅TxnGuide.java)。
// Thread method that writes a series of records
// to the database using transaction protection.
// Deadlock handling is demonstrated here.
public void run () { 我们做的第一件事是在进入主循环之前获取一个空事务句柄。 我们还在这里开始顶级事务循环,导致我们的应用程序执行50个事务。
Transaction txn = null;
// Perform 50 transactions
for (int i=0; i<50; i++) { 接下来我们声明一个重试变量。 这用于确定死锁是否应该导致我们重试操作。 我们还声明了一个retry_count变量,该变量用于确保我们不会在线程无法获得必要锁定的情况下永远重试事务。 (唯一可能导致这种情况的是,如果某个其他线程在持有重要锁定时死亡。这是我们必须防范的唯一代码,因为此应用程序的简单性使其不太可能发生。)
boolean retry = true;
int retry_count = 0;
// while loop is used for deadlock retries
while (retry) { 现在我们进入我们用于死锁检测的try块。 我们也在这里开始交易。
// try block used for deadlock detection and
// general db exception handling
try {
// Get a transaction
txn = myEnv.beginTransaction(null, null); 现在我们在刚刚开始的交易下写下10条记录。通过在单个事务下将多个写入组合在一起,我们增加了发生死锁的可能性。通常,您希望减少死锁的可能性,在这种情况下,执行此操作的方法是对每个事务执行单次写入。换句话说,我们应该使用自动提交来写入此工作负载的数据库。
但是,我们希望显示死锁处理,并且通过对每个事务执行多次写入,我们实际上可以观察到发生的死锁。我们还想强调您可以在单个原子工作单元中组合多个数据库操作的想法。所以对于我们的例子,我们做了(略微)错误的事情。
此外,请注意我们使用com.sleepycat.bind.tuple.StringBinding将我们的密钥存储到DatabaseEntry中以执行序列化。此外,当我们实例化PayloadData对象时,我们调用getName(),它给我们这个线程名称的字符串表示,以及Random.nextDouble(),它给我们一个随机的double值。使用后一个值以避免数据库中的重复记录。
// Write 10 records to the db
// for each transaction
for (int j = 0; j < 10; j++) {
// Get the key
DatabaseEntry key = new DatabaseEntry();
StringBinding.stringToEntry(keys[j], key);
// Get the data
PayloadData pd = new PayloadData(i+j, getName(),
generator.nextDouble());
DatabaseEntry data = new DatabaseEntry();
dataBinding.objectToEntry(pd, data);
// Do the put
myDb.put(txn, key, data);
} 完成内部数据库写入循环后,我们可以简单地提交事务并继续下一个10次写入块。但是,我们首先要说明有关事务处理的几点,所以我们在调用事务提交之前调用countRecords()方法。 countRecords()使用游标读取数据库中的每条记录,并返回它找到的记录数。
因为countRecords()读取数据库中的每条记录,如果使用不正确,线程将自我死锁。编写器线程刚刚向数据库写入了500条记录,但由于用于该写入的事务尚未提交,因此这500条记录中的每条记录仍然被线程的事务锁定。然后,如果我们在锁定了这500条记录的同一线程内简单地在数据库上运行非事务性游标,则游标将在尝试读取其中一条事务受保护记录时阻塞。当光标等待它请求的读锁定时,线程立即停止该点的操作。因为永远不会释放读锁(线程永远不能进行任何前进),这代表了线程的自死锁。
有三种方法可以防止这种自我死锁:
- 我们可以将调用countRecords()移动到线程事务提交后的某个点。
- 我们可以允许countRecords()在执行所有写操作的同一事务下运行。
- 我们可以通过允许未提交的读取来减少应用程序的隔离保证。
对于此示例,我们选择使用选项3(未提交的读取)来避免死锁。 这意味着我们必须打开我们的游标句柄,以便它知道执行未提交的读取。
// commit
System.out.println(getName() + " : committing txn : "
+ i);
// Using uncommitted reads to avoid the deadlock, so
// null is passed for the transaction here.
System.out.println(getName() + " : Found " +
countRecords(null) + " records in the database."); 在对数据库中的记录进行了这种有点不优雅的计数后,我们现在可以提交事务。
try {
txn.commit();
txn = null;
} catch (DatabaseException e) {
System.err.println("Error on txn commit: " +
e.toString());
}
retry = false; 如果一切顺利,我们就完成了,我们可以继续下一批10条记录添加到数据库中。 但是,如果发生错误,我们必须正确处理异常。 第一个是死锁异常。 如果发生死锁,我们希望中止并重试该事务,前提是我们尚未超过此事务的重试限制。
} catch (LockConflictException le) {
System.out.println("################# " + getName() +
" : caught deadlock");
// retry if necessary
if (retry_count < MAX_RETRY) {
System.err.println(getName() +
" : Retrying operation.");
retry = true;
retry_count++;
} else {
System.err.println(getName() +
" : out of retries. Giving up.");
retry = false;
} 如果出现标准的非特定数据库异常,我们只需记录异常然后放弃(不重试事务)。
} catch (DatabaseException e) {
// abort and don't retry
retry = false;
System.err.println(getName() +
" : caught exception: " + e.toString());
e.printStackTrace(); 最后,如果事务句柄不为null,我们总是中止事务。 请注意,在提交事务后,我们立即将事务句柄设置为null,以防止中止已提交的事务。
} finally {
if (txn != null) {
try {
txn.abort();
} catch (Exception e) {
System.err.println("Error aborting txn: " +
e.toString());
e.printStackTrace();
}
}
}
}
}
} 我们的DBWriter类的最后一部分是countRecords()实现。 请注意在此示例中我们如何打开游标以执行未提交的读取:
// A method that counts every record in the database.
// Note that this method exists only for illustrative purposes.
// A more straight-forward way to count the number of records in
// a database is to use the Database.getStats() method.
private int countRecords(Transaction txn) throws DatabaseException {
DatabaseEntry key = new DatabaseEntry();
DatabaseEntry data = new DatabaseEntry();
int count = 0;
Cursor cursor = null;
try {
// Get the cursor
CursorConfig cc = new CursorConfig();
cc.setReadUncommitted(true);
cursor = myDb.openCursor(txn, cc);
while (cursor.getNext(key, data, LockMode.DEFAULT) ==
OperationStatus.SUCCESS) {
count++;
}
} finally {
if (cursor != null) {
cursor.close();
}
}
return count;
}
} 这完成了我们的事务示例。 如果您想试验此代码,可以在JE发行版的以下位置找到该示例:
JE_HOME/examples/je/txn 6.3 DPL Transaction Example DPL事务例子
以下Java代码提供了使用DPL的多线程事务JE应用程序的完整功能示例。此示例几乎与上一节中提供的示例相同,只是它使用实体类和实体存储来管理其数据。
与前面的示例一样,此示例打开一个环境,然后打开一个实体存储。然后它创建5个线程,每个线程将500条记录写入数据库。这些写入的主键基于预定义的整数,而数据是随机生成的数据。这意味着实际数据是任意的,因此无趣;我们之所以选择它只是因为它需要最少的代码才能实现,因此它将不会影响本示例的主要内容。
每个线程在提交和写入另外10个之前在单个事务下写入10个记录(这重复50次)。在每个事务结束时,但在提交之前,每个线程都调用一个函数,该函数使用游标来读取数据库中的每个记录。我们这样做是为了在事务环境中提出有关数据库读取的一些观点。
当然,每个编写器线程都执行本手册中描述的死锁检测。此外,在打开环境时执行正常恢复。
要实现此示例,我们需要三个类:
TxnGuide.java
这是应用程序的主要类。 它执行环境和存储管理,生成线程,并创建放置在数据库中的数据。 有关实现的详细信息,请参阅TxnGuide.java。
StoreWriter.java
这个类扩展了java.lang.Thread,因此它是我们的线程实现。 它负责实际读写商店。 它还执行我们的所有事务管理。 有关实现的详细信息,请参阅StoreWriter.java。
PayloadDataEntity.java
这是一个用于封装多个数据字段的实体类。 有关实现的详细信息,请参阅PayloadDataEntity.java。
6.3.1 TxnGuide.java
我们的示例应用程序中的主类用于打开和关闭我们的环境和存储。 它还产生了我们需要的所有线程。 我们从正常的Java包和import语句开始,然后是我们的类声明:
// File TxnGuideDPL.java
import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.persist.EntityStore;
import com.sleepycat.persist.StoreConfig;
import java.io.File;
public class TxnGuideDPL { 接下来,我们声明我们的类’私有数据成员。 大多数情况下,它们用于常量,例如我们正在打开的数据库的名称以及我们正在生成的线程数。 但是,我们还在此处声明了我们的环境和数据库句柄。
private static String myEnvPath = "./";
private static String storeName = "exampleStore";
// Handles
private static EntityStore myStore = null;
private static Environment myEnv = null;
private static final int NUMTHREADS = 5; 接下来,我们实现usage()方法。 此应用程序可选地接受单个命令行参数,该参数用于标识环境主目录。
private static void usage() {
System.out.println("TxnGuideDPL [-h <env directory>]");
System.exit(-1);
} 现在我们实现main()方法。 此方法只调用方法来解析命令行参数并打开环境和存储。 它还创建然后加入商店编写器线程。
public static void main(String args[]) {
try {
// Parse the arguments list
parseArgs(args);
// Open the environment and store
openEnv();
// Start the threads
StoreWriter[] threadArray;
threadArray = new StoreWriter[NUMTHREADS];
for (int i = 0; i < NUMTHREADS; i++) {
threadArray[i] = new StoreWriter(myEnv, myStore);
threadArray[i].start();
}
for (int i = 0; i < NUMTHREADS; i++) {
threadArray[i].join();
}
} catch (Exception e) {
System.err.println("TxnGuideDPL: " + e.toString());
e.printStackTrace();
} finally {
closeEnv();
}
System.out.println("All done.");
} 接下来我们实现openEnv()。 此方法用于打开环境,然后在该环境中打开实体存储。 在此过程中,我们确保事务子系统已正确初始化。
private static void openEnv() throws DatabaseException {
System.out.println("opening env and store");
// Set up the environment.
EnvironmentConfig myEnvConfig = new EnvironmentConfig();
myEnvConfig.setAllowCreate(true);
myEnvConfig.setTransactional(true);
// Environment handles are free-threaded by default in JE,
// so we do not have to do anything to cause the
// environment handle to be free-threaded.
// Set up the entity store
StoreConfig myStoreConfig = new StoreConfig();
myStoreConfig.setAllowCreate(true);
myStoreConfig.setTransactional(true);
// Open the environment
myEnv = new Environment(new File(myEnvPath), // Env home
myEnvConfig);
// Open the store
myStore = new EntityStore(myEnv, storeName, myStoreConfig);
} 最后,我们实现用于关闭环境和数据库的方法,解析命令行参数,并提供我们的类构造函数。 这是相当标准的代码,从本手册的角度来看,它几乎无趣。 我们在这里仅仅为了完整性而包含它。
private static void closeEnv() {
System.out.println("Closing env and store");
if (myStore != null ) {
try {
myStore.close();
} catch (DatabaseException e) {
System.err.println("closeEnv: myStore: " +
e.toString());
e.printStackTrace();
}
}
if (myEnv != null ) {
try {
myEnv.close();
} catch (DatabaseException e) {
System.err.println("closeEnv: " + e.toString());
e.printStackTrace();
}
}
}
private TxnGuideDPL() {}
private static void parseArgs(String args[]) {
int nArgs = args.length;
for(int i = 0; i < args.length; ++i) {
if (args[i].startsWith("-")) {
switch(args[i].charAt(1)) {
case 'h':
if (i < nArgs - 1) {
myEnvPath = new String(args[++i]);
}
break;
default:
usage();
}
}
}
}
} 6.3.2 PayloadDataEntity.java
在我们展示商店编写器线程的实现之前,我们需要显示我们将要放入商店的类。 这个课程相当简单。 它只是允许您存储和检索int,String和double。 int是我们的主键。
import com.sleepycat.persist.model.Entity;
import com.sleepycat.persist.model.PrimaryKey;
import static com.sleepycat.persist.model.Relationship.*;
@Entity
public class PayloadDataEntity {
@PrimaryKey
private int oID;
private String threadName;
private double doubleData;
PayloadDataEntity() {}
public double getDoubleData() { return doubleData; }
public int getID() { return oID; }
public String getThreadName() { return threadName; }
public void setDoubleData(double dd) { doubleData = dd; }
public void setID(int id) { oID = id; }
public void setThreadName(String tn) { threadName = tn; }
} 6.3.3 StoreWriter.java
StoreWriter.java为我们的实体商店编写器线程提供实现。它负责:
- 所有交易管理。
- 响应死锁异常。
- 提供要存储在实体商店中的数据。
- 将数据写入商店。
为了展示JE的事务支持提供的一些ACID属性,StoreWriter.java以一种效率低于您可能决定在真实生产应用程序中使用的方式执行某些操作。首先,它可以在一个事务中将10个数据库写入组合在一起,就像您可以轻松地为每个事务执行一次写入一样。如果您这样做,您可以使用自动提交进行单个数据库写入,这意味着您的代码会稍微简单一些,并且您遇到阻塞和死锁操作的可能性要小得多。但是,通过这种方式,我们可以显示事务原子性以及死锁处理。
首先,我们提供通常的包和import语句,然后我们声明我们的类:
import com.sleepycat.je.CursorConfig;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.LockConflictException;
import com.sleepycat.je.Environment;
import com.sleepycat.je.Transaction;
import com.sleepycat.persist.EntityCursor;
import com.sleepycat.persist.EntityStore;
import com.sleepycat.persist.PrimaryIndex;
import java.util.Iterator;
import java.util.Random;
import java.io.UnsupportedEncodingException;
public class StoreWriter extends Thread
{ 接下来我们声明我们的私有数据成员。 请注意,我们获取环境和实体存储的句柄。 我们实例化的随机数生成器用于生成用于存储在数据库中的唯一数据。 最后,MAX_RETRY变量用于定义在遇到死锁时我们将重试事务的次数。
private EntityStore myStore = null;
private Environment myEnv = null;
private PrimaryIndex<Integer,PayloadDataEntity> pdKey;
private Random generator = new Random();
private boolean passTxn = false;
private static final int MAX_RETRY = 20; 接下来我们实现我们的类构造函数。 关于构造函数最有趣的事情是我们使用它来获取实体类的主索引。
// Constructor. Get our handles from here
StoreWriter(Environment env, EntityStore store)
throws DatabaseException {
myStore = store;
myEnv = env;
// Open the data accessor. This is used to store persistent
// objects.
pdKey = myStore.getPrimaryIndex(Integer.class,
PayloadDataEntity.class);
} 现在我们实现我们的线程的run()方法。 这是在主程序中启动StoreWriter线程时运行的方法(请参阅TxnGuide.java)。
// Thread method that writes a series of records
// to the database using transaction protection.
// Deadlock handling is demonstrated here.
public void run () { 我们做的第一件事是在进入主循环之前获取一个空事务句柄。 我们还在这里开始顶级事务循环,导致我们的应用程序执行50个事务。
Transaction txn = null;
// Perform 50 transactions
for (int i=0; i<50; i++) { 接下来我们声明一个重试变量。 这用于确定死锁是否应该导致我们重试操作。 我们还声明了一个retry_count变量,该变量用于确保我们不会在线程无法获得必要锁定的情况下永远重试事务。 (唯一可能导致这种情况的是,如果某个其他线程在持有重要锁定时死亡。这是我们必须防范的唯一代码,因为此应用程序的简单性使其不太可能发生。)
boolean retry = true;
int retry_count = 0;
// while loop is used for deadlock retries
while (retry) { 现在我们进入我们用于死锁检测的try块。 我们也在这里开始交易。
// try block used for deadlock detection and
// general exception handling
try {
// Get a transaction
txn = myEnv.beginTransaction(null, null); 现在我们在刚刚开始的事务中编写10个对象。 通过在单个事务下将多个写入组合在一起,我们增加了发生死锁的可能性。 通常,您希望减少死锁的可能性,在这种情况下,执行此操作的方法是对每个事务执行单次写入。 换句话说,我们应该使用自动提交来写入此工作负载的数据库。
但是,我们希望显示死锁处理,并且通过对每个事务执行多次写入,我们实际上可以观察到发生的死锁。 我们还想强调您可以在单个原子工作单元中组合多个数据库操作的想法。 所以对于我们的例子,我们做了(略微)错误的事情。
// Write 10 PayloadDataEntity objects to the
// store for each transaction
for (int j = 0; j < 10; j++) {
// Instantiate an object
PayloadDataEntity pd = new PayloadDataEntity();
// Set the Object ID. This is used as the
// primary key.
pd.setID(i + j);
// The thread name is used as a secondary key, and
// it is retrieved by this class's getName()
// method.
pd.setThreadName(getName());
// The last bit of data that we use is a double
// that we generate randomly. This data is not
// indexed.
pd.setDoubleData(generator.nextDouble());
// Do the put
pdKey.put(txn, pd);
} 完成内部数据库写入循环后,我们可以简单地提交事务并继续下一个10次写入块。但是,我们首先要说明有关事务处理的几点,所以我们在调用事务提交之前调用countObjects()方法。 countObjects()使用游标读取实体存储中的每个对象,并返回它找到的对象数量的计数。
因为countObjects()读取存储中的每个对象,如果使用不正确,线程将自我死锁。编写器线程刚刚向数据库写入了500个对象,但由于用于该写入的事务尚未提交,因此这500个对象中的每一个仍然被线程的事务锁定。然后,如果我们在锁定了这500个对象的同一个线程中简单地在存储上运行非事务性游标,则当光标尝试读取其中一个事务受保护的记录时,它将阻塞。当光标等待它请求的读锁定时,线程立即停止该点的操作。因为永远不会释放读锁(线程永远不能进行任何前进),这代表了线程的自死锁。
有三种方法可以防止这种自我死锁:
- 我们可以将调用countObjects()移动到线程事务提交后的某个点。
- 我们可以允许countObjects()在执行所有写入的同一事务下运行。
- 我们可以通过允许未提交的读取来减少应用程序的隔离保证。
对于此示例,我们选择使用选项3(未提交的读取)来避免死锁。 这意味着我们必须打开我们的游标句柄,以便它知道执行未提交的读取。
// commit
System.out.println(getName() + " : committing txn : "
+ i);
System.out.println(getName() + " : Found " +
countObjects(txn) + " objects in the store."); 在对数据库中的对象进行了这种有点不优雅的计数之后,我们现在可以提交事务。
try {
txn.commit();
txn = null;
} catch (DatabaseException e) {
System.err.println("Error on txn commit: " +
e.toString());
}
retry = false; 如果一切顺利,我们就完成了,我们可以继续下一批10个对象添加到商店。 但是,如果发生错误,我们必须正确处理异常。 第一个是死锁异常。 如果发生死锁,我们希望中止并重试该事务,前提是我们尚未超过此事务的重试限制。
} catch (LockConflictException lce) {
System.out.println("################# " + getName() +
" : caught deadlock");
// retry if necessary
if (retry_count < MAX_RETRY) {
System.err.println(getName() +
" : Retrying operation.");
retry = true;
retry_count++;
} else {
System.err.println(getName() +
" : out of retries. Giving up.");
retry = false;
} 如果出现标准的非特定数据库异常,我们只需记录异常然后放弃(不重试事务)。
} catch (DatabaseException e) {
// abort and don't retry
retry = false;
System.err.println(getName() +
" : caught exception: " + e.toString());
e.printStackTrace(); 最后,如果事务句柄不为null,我们总是中止事务。 请注意,在提交事务后,我们立即将事务句柄设置为null,以防止中止已提交的事务。
} finally {
if (txn != null) {
try {
txn.abort();
} catch (Exception e) {
System.err.println("Error aborting txn: " +
e.toString());
e.printStackTrace();
}
}
}
}
}
} StoreWriter类的最后一部分是countObjects()实现。 请注意在此示例中我们如何打开游标以执行未提交的读取:
// A method that counts every object in the store.
private int countObjects(Transaction txn) throws DatabaseException {
int count = 0;
CursorConfig cc = new CursorConfig();
// This is ignored if the store is not opened with uncommitted read
// support.
cc.setReadUncommitted(true);
EntityCursor<PayloadDataEntity> cursor = pdKey.entities(txn, cc);
try {
for (PayloadDataEntity pdi : cursor) {
count++;
}
} finally {
if (cursor != null) {
cursor.close();
}
}
return count;
}
} 这完成了我们的事务示例。 如果您想试验此代码,可以在JE发行版的以下位置找到该示例:
JE_HOME/examples/persist/txn




还没有评论,来说两句吧...