简单高性能的分布式锁实现Redis

Owen Jia 2019年10月21日 1,961次浏览

背景介绍

本着活到老,学到老的原则;
推荐理由:Redis单线程运行且高性能,关键是实现code很简单易懂
基于原子性方法getset,结合setnx\get实现

现在的系统基本多节点方式部署,至少2台吧也可以当做灾备处理和负载均衡,大的平台节点数量就更多了。
既然是多节点就会面临同步锁问题。单节点可以通过java同步锁来处理并发问题;多节点就没有办法同步锁(synchronized)了,也有方案是通过数据表来控制:如数据库的排它锁、或者数据记录;也有zookeeper实现分布式锁的;
我这边推荐redis的SETNX方法,redis的知识就不介绍了,推荐直接去看官网:https://redis.io/

分布式锁是啥

就是保证同一个方法在同一操作只能被一个线程执行;
在并发编程中,经常遇到多个线程访问同一个共享资源,这时候必须考虑如何维护数据一致性,在java中synchronized关键字被常用于维护数据一致性,但是必须是同一个JVM内;
在分布式部署的应用集群中,同一个方法在同一操作也要只能被一台机器上的一个线程执行;服务是部署在不同服务器上面是多个JVM,分布式锁就应用而生;
比如:电商中的下单操作,需要检查库存和锁库存;电商系统一般都是集群环境;

redis分布式锁的原理

主要基于redis的setnx getset方法来实现分布式锁,且getset是原子性方法。官网有详细介绍:
setnx,https://redis.io/commands/setnx
getset,https://redis.io/commands/getset

setnx value key:
若key不存在,返回1;若key存在,返回0;
setget key value:
指定key的值value,并返回key的旧值。
以下分布锁流程图涵盖了实现的精华,需要认真阅读并弄懂掌握:

alt

code干货分享

代码建议直接copy复用,有不足的地方欢迎提意见或把方案直接贴给我
这里使用的RedisUtil是自己封装的,spring-boot提供的redisTemplate做是很方便

本地测试需要去除同步锁:synchronized,实际项目中要加上。

package com.ts.rent.utils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;

import java.util.Random;

/**
 * 分布锁:Redis
 * @author: Owen Jia
 * @time: 2019/8/22 13:46
 */
public class JedisNew {

    protected Logger LOGGER = LoggerFactory.getLogger(getClass());
    //获取锁最大等待时间,5分钟
    int timeoutMsecs = 2 * 60 * 1000;
    //锁逻辑块执行时间,5秒
    int expireMsecs  = 5 * 1000;

    boolean locked = false; //默认未被使用
    String lockKey; //锁名,标记
    Long lockValue = 0L; //锁值,时间毫秒值
    final String lockKeyPre = "rent:lock:";

    public JedisNew(String lockKey) {
        this.lockKey = this.lockKeyPre + lockKey;
    }

    private Jedis init(){
        return new Jedis("172.20.30.14",19000);
    }

    /**
     * 取锁
     */
    public boolean acquire(){
        Jedis jedis = null;
        try {
            jedis = this.init();
            int timeout = timeoutMsecs;
            while (timeout >= 0) {
                String expiresStr = String.valueOf(System.currentTimeMillis() + expireMsecs + 1);
                if (jedis.setnx(lockKey, expiresStr) == 1) {//锁空闲,获取锁成功
                    System.out.println(this.hashCode()+": 第一个节点获得锁: "+expiresStr);
                    lockValue = Long.valueOf(expiresStr);
                    locked = true;
                    return true;
                }
                LOGGER.debug("锁被获取,检测是否key失效");
                //锁被占用
                String currentValueStr = jedis.get(lockKey);
                if (currentValueStr != null && Long.parseLong(currentValueStr) < System.currentTimeMillis()) {
                    String oldValueStr = jedis.getSet(lockKey, expiresStr);
                    if (oldValueStr != null && oldValueStr.equals(currentValueStr)){
                        System.out.println(this.hashCode()+": 旧锁超时,重置获得锁:" + expiresStr);
                        this.lockValue = Long.valueOf(expiresStr);
                        this.locked = true;
                        return true;
                    }
                }
                LOGGER.debug("未能获得锁,等等");
                // 随机时间等待,防止扎堆
                int waitMilSecs = new Random().nextInt(100) + 50;
                timeout -= waitMilSecs;
                Thread.sleep(waitMilSecs);
            }
        } catch (Exception err) {
            LOGGER.error("[acquire]获取分布式锁失败", err.getMessage());
            throw new RuntimeException("获取分布式锁失败", err);
        } finally {
            jedis.close();
        }
        LOGGER.error("[acquire] 取锁等待<{}>超时,请检查该类型锁<{}>业务吞吐量设计",this.timeoutMsecs,this.lockKey);
        return false;
    }

    /**
     * 放锁
     */
    public void release() {
        if (this.locked && this.lockValue > System.currentTimeMillis()) {
            Jedis jedis = null;
            try{
                jedis = this.init();
                jedis.del(this.lockKey);
                this.locked = false;
                System.out.println(this.hashCode()+": 释放锁: " + this.lockValue);
            } catch (Exception err) {
                LOGGER.error("[release]释放分布式锁<"+this.lockKey+">失败", err.getMessage());
                throw new RuntimeException("释放分布式锁失败", err);
            } finally {
                jedis.close();
            }
        }
    }

}

并发测试用例

package com.ts.rent.utils;

/**
 * 并发测试用例
 * @author: Owen Jia
 * @time: 2019/8/22 13:26
 */
public class JedisLockTest implements Runnable{

    @Override
    public void run() {
        JedisNew lock = new JedisNew("test2019");

        if(lock.acquire()){
            test1();

            lock.release();
        } else {
            System.err.println("分布锁错误,errorThreadCount");
        }
    }

    public void test1(){
        try {
            int i = 1;
            while (i <= 4) {
                System.out.println(this.hashCode() +": 当前毫秒"+i+": " + System.currentTimeMillis());

                Thread.sleep(1000);

                i++;
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args){
        for(int i = 0; i< 50; i++){
            Thread thread = new Thread(new JedisLockTest());
            thread.start();
        }
    }
}

总结

对于分布锁实现类建议单例的形式使用;

绝对的干活分享,很实用;
redis的安装可以参考:http://www.runoob.com/redis/redis-install.html

厚着脸要个赞!
有不足地方,欢迎指正