文章目录
像put()、delete()、checkAndPut()这样的修改操作是独立执行的,这意味着在一个串行方式的执行中,对于每一行必须保证行级别的操作是原子性的。region服务器提供了一个行锁(row lock的特性,这个特性保证了只有一个客户端能获取一行数据相应的锁。同时对该行进行修改,在实践中,大部分客户端应用程序都没有提供显示的锁,而是使用这个机制来保障每个操作的独立性。用户应该尽可能的避免使用行锁,就像在RDBMS中,两个客户端很可能在拥有对方要请求的锁时,又同时请求对方已拥有的锁,这样便形成了一个死锁。 锁超时之前,两个被阻塞的客户端会占用一个服务器端的处理线程(handler),而这个线程是一种十分稀缺的资源。如果在一个频繁操作的行上发生了这种情况,那么很多其他的客户端会占用掉其所有的处理线程,阻塞所有其他客户端访问这台服务器,导致这个region服务器将不能为其负责region内的行提供服务。** 比如,当使用put()访问服务器时,Put实例可以通过以下构造函数生成: Put(byte[] row) 这个构造函数就没有RowLock实例参数,所以服务器会在调用期间创建一个锁。实际上,通过客户端的API,得不到这个生存期短暂的服务端的锁的实例。 除了服务器端隐式加锁之外,客户端也可以显示地对单行数据的多次操作进行加锁,通过以下调用便可以做到:
RowLock lockRow (byte[] row) throws IOException void unlockRow (RowLock r1) throws IOException
第一个调用lockRow()需要一个行键作为参数,返回一个RowLock的实例,这个实例可以供后续的Put或者Delete的构造函数使用。一旦不再需要锁时,必须通过unLockRow()调用来释放它。 每一个排它锁,无论是由服务器提供,还是通过客户端API传入的,都能保护这一行不被其他锁锁定。换句话说,锁必须针对整个行,并且指定其行键,一旦它获得锁定权就能防止其他并发修改。 当一个锁被服务器或客户端显示获取之后,其他所有想要对这行数据加锁的客户端将会等待,直到当前锁被释放,或者锁的租期超时。后者是为了确保进程不会被占用锁太长时间或无限期占用。 默认的锁超时时间是一分钟,但可以在hbase-site.xml文件中添加以下配置项来修改这个默认值,时间以毫秒为单位:
<property > <name > hbase.regionserver.lease.period</name > <value > 120000</value > </property >
通过添加以上代码,超时时间被设置为原来的两倍——120秒也就是2分钟。小心不要将这个值设得太大,因为每一个想获取被锁住的行的客户端都会被阻塞并等待锁的恢复。
显示使用行锁代码如下:
import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.client.HTable;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.client.RowLock;import org.apache.hadoop.hbase.util.Bytes;public class HBaseRowLock { private final static byte [] ROW1 = Bytes.toBytes("row1" ); private final static byte [] ROW2 = Bytes.toBytes("row2" ); private final static byte [] COLFMA1 = Bytes.toBytes("colfam1" ); private final static byte [] COLFMA2 = Bytes.toBytes("colfam2" ); private final static byte [] QUAL1 = Bytes.toBytes("qual1" ); private final static byte [] QUAL2 = Bytes.toBytes("qual2" ); private static byte [] COLFAM1; private final static byte [] VAL1 = Bytes.toBytes("val1" ); private final static byte [] VAL2 = Bytes.toBytes("val2" ); private final static byte [] VAL3 = Bytes.toBytes("val3" ); static class UnlockedPut implements Runnable { @Override public void run() { try { Configuration conf = HBaseConfiguration.create(); HTable table = new HTable(conf, "testtable" ); Put put = new Put(ROW1); put.add(COLFAM1, QUAL1, VAL3); table.put(put); long time = System.currentTimeMillis(); System.out.println ("Thread trying to put same rwo now" ); table.put(put); System.out.println ("Wait time:" + (System.currentTimeMillis() - time) + "ms" ); } catch (Exception e) { System.err.println ("Thread error :" + e); } try { Configuration conf = HBaseConfiguration.create(); HTable table = new HTable(conf, "testtable" ); RowLock lock = table.lockRow(ROW1); System.out.println ("Lock ID:" + lock.getLockId()); Put put1 = new Put(ROW1, lock); put1.add(COLFAM1, QUAL1, VAL1); table.put(put1); Put put2 = new Put(ROW1, lock); put2.add(COLFAM1, QUAL1, VAL2); table.put(put2); } catch (IOException e) { } finally { } System.out.println ("Takin out lock..." ); Thread thread = new Thread(new UnlockedPut()); thread.start(); try { System.out.println ("Sleeping 5secs in main().." ); Thread.sleep(5000 ); } catch (Exception e) { } } } }
控制台输出如下:
Taking out lock...Lock ID : 4751274798057238718 Sleeping 5 secs in main()...Thread trying to put same row now...Releasing lock..Wait time: 5007 msAfter thread ended...KV :row1/colfam1 :qual1/ 1300775520118 /Put /vlen=4 ,Value :val2 KV :row1/colfam1 :qual1/ 1300775520113 /Put /vlen=4 ,Value :val1 KV :row1/colfam1 :qual1/ 1300775520116 /Put /vlen=4 ,Value :val3
主线程的锁一释放,阻塞线程的run()方法就继续执行并调用了第三个put。观察put操作在服务器端的执行情况,会觉得很有意思。KeyValue实例的时间戳显示了第三个put拥有的最小时间戳,虽然这个put表面上是最后执行的。这是因为线程中的put()调用是在两个主线程中的put()之前执行的,这之后主线程休眠了5秒。当put被发送到服务器时,如果它的时间戳没有被显式指定,服务器端会帮它设定时间戳,同时试图获得这一行的锁。但是示例代码中主线程已经获得了改行的锁,因此服务器端的处理一直等待了5秒多,锁被释放才得已继续。主线程中的两个put()调用的执行以及行的解锁只花费了7毫秒的时间。Get需要锁吗? 修改行时锁定行是有意义的,那么获取数据时是否需要加锁呢?Get类有一个构造器允许用户指定一个显式的锁: Get(byte[] row,RowLock rowLock) 这是遗留的方法,但服务器端根本用不着这种方法,因为在获取数据的过程中,服务器根本不需要任何锁,而是应用了一个多版本的并发控制机制来保证行级读操作。例如,get()调用永远不会返回写了一半的数据,比如当这些数据是另一个线程或者客户端写的。 这个就像是小规模的事务系统:只有当一个变动被应用到整个行之后,客户端才能读出这个改动。当改动在进行中时,所有的客户端读取操作得到的都将是所有列以前的状态。 当用户试图使用之前申请的显式锁,但锁的租约已经超时并恢复,用户将会从服务器得到一个UnknowRowLockException形式报告的错误。这个异常告诉用户服务器已经废弃了用户尝试使用的锁。用户应该在代码中丢弃这个锁,然后请求一个新的锁再试图恢复锁定状态。