RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient(
"127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183",
retryPolicy);
client.start();
InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(
client, "/semaphores/semaphore_01", 3);
Lease lease = semaphore.acquire();
Thread.sleep(3000);
semaphore.returnLease(lease); 锁初始化
private InterProcessSemaphoreV2(CuratorFramework client, String path, int maxLeases, SharedCountReader count)
{
this.client = client;
//注意这里,发现底层依赖于了InterProcessMutex
lock = new InterProcessMutex(client, ZKPaths.makePath(path, LOCK_PARENT));
//设置maxLeases 就是信号量 数量
this.maxLeases = (count != null) ? count.getCount() : maxLeases;
leasesPath = ZKPaths.makePath(path, LEASE_PARENT);
if ( count != null )
{
count.addListener
(
new SharedCountListener()
{
@Override
public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception
{
InterProcessSemaphoreV2.this.maxLeases = newCount;
}
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
{
// no need to handle this here - clients should set their own connection state listener
}
}
);
}
}
尝试获取锁
public Lease acquire() throws Exception
{
Collection<Lease> leases = acquire(1, 0, null);
return leases.iterator().next();
} public Collection<Lease> acquire(int qty, long time, TimeUnit unit) throws Exception
{
long startMs = System.currentTimeMillis();
boolean hasWait = (unit != null);
long waitMs = hasWait ? TimeUnit.MILLISECONDS.convert(time, unit) : 0;
Preconditions.checkArgument(qty > 0, "qty cannot be 0");
ImmutableList.Builder<Lease> builder = ImmutableList.builder();
boolean success = false;
try
{
while ( qty-- > 0 )
{
int retryCount = 0;
long startMillis = System.currentTimeMillis();
boolean isDone = false;
while ( !isDone )
{
switch ( internalAcquire1Lease(builder, startMs, hasWait, waitMs) )
{
case CONTINUE:
{
isDone = true;
break;
}
case RETURN_NULL:
{
return null;
}
case RETRY_DUE_TO_MISSING_NODE:
{
// gets thrown by internalAcquire1Lease when it can't find the lock node
// this can happen when the session expires, etc. So, if the retry allows, just try it all again
if ( !client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
{
throw new KeeperException.NoNodeException("Sequential path not found - possible session loss");
}
// try again
break;
}
}
}
}
success = true;
}
finally
{
if ( !success )
{
returnAll(builder.build());
}
}
return builder.build();
} private InternalAcquireResult internalAcquire1Lease(ImmutableList.Builder<Lease> builder, long startMs, boolean hasWait, long waitMs) throws Exception
{
if ( client.getState() != CuratorFrameworkState.STARTED )
{
return InternalAcquireResult.RETURN_NULL;
}
if ( hasWait )
{
long thisWaitMs = getThisWaitMs(startMs, waitMs);
if ( !lock.acquire(thisWaitMs, TimeUnit.MILLISECONDS) )
{
return InternalAcquireResult.RETURN_NULL;
}
}
else
{
//private final InterProcessMutex lock;
//底层还是基于一般锁来实现的
//逻辑是什么 就是底层去判断创建的node节点的排序是否< maxLeases ,否则添加一个watcher 然后wait()
lock.acquire();
}
try
{
PathAndBytesable<String> createBuilder = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL);
String path = (nodeData != null) ? createBuilder.forPath(ZKPaths.makePath(leasesPath, LEASE_BASE_NAME), nodeData) : createBuilder.forPath(ZKPaths.makePath(leasesPath, LEASE_BASE_NAME));
//然后又去创建了一个 /semaphores/semaphore_01/leases/sdasdadasdasdas-lease-00000 节点
String nodeName = ZKPaths.getNodeFromPath(path);
builder.add(makeLease(path));
synchronized(this)
{
for(;;)
{
//获取/semaphores/semaphore_01/leases 下的所有节点
List<String> children = client.getChildren().usingWatcher(watcher).forPath(leasesPath);
if ( !children.contains(nodeName) )
{
log.error("Sequential path not found: " + path);
return InternalAcquireResult.RETRY_DUE_TO_MISSING_NODE;
}
//如果超过了 maxLease 数量
if ( children.size() <= maxLeases )
{
break;
}
if ( hasWait )
{
long thisWaitMs = getThisWaitMs(startMs, waitMs);
if ( thisWaitMs <= 0 )
{
return InternalAcquireResult.RETURN_NULL;
}
wait(thisWaitMs);
}
else
{
wait();
}
}
}
}
finally
{
//如果小于等于3 就会将之前加的锁释放掉
lock.release();
}
return InternalAcquireResult.CONTINUE;
} 大家思考一下,为什么要先去获取/semaphores/semaphore_01/lock 锁之后,才能继续去/semaphores/semaphore_01/leases/ 节点下去操作,去进入信号量获取?????
为什么不直接通过/locks/lock_01 中允许前3个序号的元素去直接操作?
因为如果第4个线程进来,或发现size > maxLeases数量,然后添加一个watcher然后进入阻塞,不会释放lock 的锁,剩下的 5 6 7...后续线程都会被阻塞在lock锁那边,而不是尝试获取lease 信号线这边,这样的好处是什么?
(1)能够保证 公平的顺序的获取信号量,
(2)信号量释放的时候不必通知过多的watcher,进行回调,只需要回调一个就ok
(3)使得程序的实现简单清晰,这充分体现了框架编写者的水平真的很优秀
释放信号量,其实很简单,直接将连接关闭就ok了,zk会自动的删除节点信息,然后触发watcher 去通知lease中的阻塞node,然后尝试获取信号量
/**
* Convenience method. Closes the lease
*
* @param lease lease to close
*/
public void returnLease(Lease lease)
{
Closeables.closeQuietly(lease);
}
本文标题:Curator 源码初探(二) Semaphore
本文链接:https://blog.quwenai.cn/post/2894.html
版权声明:本文不使用任何协议授权,您可以任何形式自由转载或使用。






还没有评论,来说两句吧...