# 单机锁及现有方案分布式锁介绍

单机锁: java 常见的有 juc 的 Lock 及 jvm 的 synchronized golang 的 Mutex

分布式锁:通常使用以下三种方案

基于数据库:利用数据库事务实现 for update 乐观锁

优点及缺点:不需要引入新依赖,但性能取决于数据库读写所以不适合高并发分布式锁

基于缓存:redis 分布式锁 etcd 分布式锁

redis:redis 性能极强 但单机 redis 不具备 HA 分布式 redis 选举时会出现 master-slave 数据不同步的情况 会出现锁失效的情况

etcd:性能较 redis 弱 但是远强于 zk 方案 适合云原生环境部署 并且满足高可用 强一致性

基于 Zookeeper 临时节点:zookeeper 分布式锁

zk 方案实现了 CP 保证强一致性 不会出现锁失效的情况 但是牺牲了可用性 不满足 HA 并且 zk 维护较为复杂不适合云原生方案

# 分布式锁实现原理及 etcd 方案

分布式实现主要依赖以下几个牵扯到锁的原理

1. 载体 例如 redis 的载体是 kv zookeeper 使用的是临时的文件系统

2. 租期 服务进程持有分布式锁之后需要有一个持有时间 即超时后自动释放 zk 临时节点可以被删除 redis 使用的是缓存 ttl 机制

3. 锁机制 例如可重入机制、公平机制、减少惊群效应(某个 lock 释放后会换新所有 lock 上的线程 争抢此 lock)等等

etcd 实现以上三点实现 具体是以下方案

载体:etcd 是一个分层的多级 kv(有点像 zk 临时节点的 path)

租期:通过 lease 对 kv 设置租期,租期到期 锁失效

自动续期:使用 KeepAlive 实现租约自动续期

公平锁:多个进程同时争抢锁时 可以根据 Revision 值 大小依次获得锁 有效避免了惊群效应

Watch 机制:当 Watch 检测到某个 key 及目录变化后,Client 端可以收到通知

# client 端代码

# Java 实现

引用 jetcd 依赖

<dependency>
            <groupId>io.etcd</groupId>
            <artifactId>jetcd-core</artifactId>
            <version>0.7.5</version>
        </dependency>

代码主体

package cn.streamingone.common.distribution.lock;
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.Lease;
import io.etcd.jetcd.Lock;
import io.etcd.jetcd.lease.LeaseKeepAliveResponse;
import io.etcd.jetcd.lock.LockResponse;
import io.grpc.stub.StreamObserver;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
 * @author JoyseeKing
 * @date 2023/04/07/18:03
 * @description 使用 etcd 实现分布式锁
 */
@Component
@Log4j2
public class EtcdLock {
    private Client client;
    private Lock lockClient;
    private Lease leaseClient;
    private LockStates lockStates;
    @Value("${etcd.endpoint:http://localhost:2379}")
    private String endpoint;
    public EtcdLock(String lockKey, Long leaseTTL, TimeUnit unit) {
        this.client = Client.builder().endpoints(endpoint).build();
        this.lockClient = client.getLockClient();
        this.leaseClient = client.getLeaseClient();
        this.lockStates = new LockStates(lockKey, unit.toSeconds(leaseTTL));
    }
    public void lock() {
        try {
            // 正常释放锁
            if (this.lockStates.getLockPath() != null) {
                lockClient.unlock(ByteSequence.from(lockStates.getLockPath().getBytes())).get();
            }
            // 如果是主动续约,则关闭续约的定时任务
            // 删除租约
            if (lockStates.getLeaseId() != 0L) {
                leaseClient.revoke(lockStates.getLeaseId());
            }
        } catch (InterruptedException | ExecutionException e) {
            //todo: 异常处理
        }
        log.info("线程:{} 释放锁", Thread.currentThread().getName());
    }
    // 创建一个租约
    private void createLease() throws ExecutionException, InterruptedException {
        log.debug("[etcd-lock]: start to createLease." + this.lockStates.getLockKey() + Thread.currentThread().getName());
        try {
            long leaseId = leaseClient.grant(this.lockStates.getLeaseTTL()).get().getID();
            lockStates.setLeaseId(leaseId);
            // 自动续约
            StreamObserver<LeaseKeepAliveResponse> observer = new StreamObserver<LeaseKeepAliveResponse>() {
                @Override
                public void onNext(LeaseKeepAliveResponse value) {
                    log.info("cluster node lease remaining ttl: {}, lease id: {}", value.getTTL(), value.getID());
                }
                @Override
                public void onError(Throwable t) {
                    log.error("cluster node lease keep alive failed. exception info: {}", t);
                }
                @Override
                public void onCompleted() {
                    log.info("cluster node lease completed");
                }
            };
            // 设置自动续约
            leaseClient.keepAlive(leaseId, observer);
        } catch (InterruptedException | ExecutionException e) {
            log.error("[etcd-lock] Create lease failed:" + e);
            lockStates.setErrorMsg("Create lease failed:" + e);
            throw e;
        }
    }
    private void createLock() throws ExecutionException, InterruptedException {
        String lockKey = this.lockStates.getLockKey();
        log.debug("[etcd-lock]: start to createLock." + lockKey + Thread.currentThread().getName());
        try {
            LockResponse lockResponse = lockClient.lock(ByteSequence.from(lockKey.getBytes()), lockStates.getLeaseId()).get();
            if (lockResponse != null) {
                String lockPath = lockResponse.getKey().toString(StandardCharsets.UTF_8);
                this.lockStates.setLockPath(lockPath);
                log.info("线程:{} 加锁成功,锁路径:{}", Thread.currentThread().getName(), lockPath);
                this.lockStates.setLocked(true);
            }
        } catch (InterruptedException | ExecutionException e) {
            log.error("[etcd-lock] lock failed:" + e);
            lockStates.setErrorMsg("[etcd-lock] lock failed:" + e);
            leaseClient.revoke(this.lockStates.getLeaseId());
            throw e;
        }
    }
}
@Data
class LockStates {
    private String lockKey;
    private String lockPath;
    private String errorMsg;
    private long leaseTTL;
    private long leaseId;
    private boolean isLocked;
    public LockStates(String lockKey, long leaseTTL) {
        this.lockKey = lockKey;
        this.leaseTTL = leaseTTL;
    }
}

# Go 实现

package main
import (
	"context"
	clientv3 "go.etcd.io/etcd/client/v3"
	"go.etcd.io/etcd/client/v3/concurrency"
	"log"
	"time"
)
func main() {
	// 初始化客户端
	client, err := clientv3.New(clientv3.Config{Endpoints: []string{"localhost:2379"}, DialTimeout: time.Second * 3})
	if err != nil {
		log.Fatalf("client connect failed :%v\n", err)
	}
	// 创建一个 session 并设置租期 30s (内部会自动续期 Etcd KeepAlive)
	session, err := concurrency.NewSession(client, concurrency.WithTTL(30))
	if err != nil {
		log.Fatalf("session init failes:%v\n", err)
		return
	}
	defer func(session *concurrency.Session) {
		err := session.Close()
		if err != nil {
			log.Fatalf("session close :%v\n", err)
		}
	}(session)
	// 获取指定前缀的锁对象
	mutex := concurrency.NewMutex(session, "biz-lock")
	// 加锁默认等待 3s
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
	defer cancel()
	err = mutex.TryLock(ctx)
	if err != nil {
		log.Fatalf("lock failed :%v\n", err)
		return
	}
	// 执行业务
	for i := 1; i <= 10; i++ {
		time.Sleep(time.Second)
		log.Printf("biz execed %%%d ...", i*10)
	}
	// 释放锁
	err = mutex.Unlock(context.TODO())
	if err != nil {
		log.Fatalf("cluster node lease failed :%v\n", err)
		return
	}
	log.Println("cluster node lease")
}
阅读次数

请我喝[茶]~( ̄▽ ̄)~*

JoyseeKing 微信支付

微信支付

JoyseeKing 支付宝

支付宝

JoyseeKing 贝宝

贝宝