服务器之家:专注于服务器技术及软件下载分享
分类导航

PHP教程|ASP.NET教程|Java教程|ASP教程|编程技术|正则表达式|C/C++|IOS|C#|Swift|Android|VB|R语言|JavaScript|易语言|vb.net|

服务器之家 - 编程语言 - Java教程 - 简单注解实现集群同步锁(spring+redis+注解)

简单注解实现集群同步锁(spring+redis+注解)

2020-07-29 14:55partner4java Java教程

本文主要介绍了简单注解实现集群同步锁的步骤与方法。具有一定的参考价值,下面跟着小编一起来看下吧

互联网面试的时候,是不是面试官常问一个问题如何保证集群环境下数据操作并发问题,常用的synchronized肯定是无法满足了,或许你可以借助for update对数据加锁。本文的最终解决方式你只要在方法上加一个@P4jSyn注解就能保证集群环境下同synchronized的效果,且锁的key可以任意指定。本注解还支持了锁的超时机制。

本文需要对Redis、spring和spring-data-redis有一定的了解。当然你可以借助本文的思路对通过注解对方法返回数据进行缓存,类似com.google.code.simple-spring-memcached的@ReadThroughSingleCache。

第一步:  介绍两个自定义注解P4jSyn、P4jSynKey

P4jSyn:必选项,标记在方法上,表示需要对该方法加集群同步锁;

P4jSynKey:可选项,加在方法参数上,表示以方法某个参数作为锁的key,用来保证更多的坑,P4jSynKey并不是强制要添加的,当没有P4jSynKey标记的情况下只会以P4jSyn的synKey作为锁key。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package com.yaoguoyin.redis.lock;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
 * <b>同步锁:</b><br/>
 * 主要作用是在服务器集群环境下保证方法的synchronize;<br/>
 * 标记在方法上,使该方法的执行具有互斥性,并不保证并发执行方法的先后顺序;<br/>
 * 如果原有“A任务”获取锁后任务执行时间超过最大允许持锁时间,且锁被“B任务”获取到,在“B任务”成功货物锁会并不会终止“A任务”的执行;<br/>
 * <br/>
 * <b>注意:</b><br/>
 * 使用过程中需要注意keepMills、toWait、sleepMills、maxSleepMills等参数的场景使用;<br/>
 * 需要安装redis,并使用spring和spring-data-redis等,借助redis NX等方法实现。
 *
 * @see com.yaoguoyin.redis.lock.P4jSynKey
 * @see com.yaoguoyin.redis.lock.RedisLockAspect
 *
 * @author partner4java
 *
 */
@Target({ ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface P4jSyn {
 /**
 * 锁的key<br/>
 * 如果想增加坑的个数添加非固定锁,可以在参数上添加@P4jSynKey注解,但是本参数是必写选项<br/>
 * redis key的拼写规则为 "RedisSyn+" + synKey + @P4jSynKey<br/>
 *
 */
 String synKey();
 /**
 * 持锁时间,超时时间,持锁超过此时间自动丢弃锁<br/>
 * 单位毫秒,默认20秒<br/>
 * 如果为0表示永远不释放锁,在设置为0的情况下toWait为true是没有意义的<br/>
 * 但是没有比较强的业务要求下,不建议设置为0
 */
 long keepMills() default 20 * 1000;
 /**
 * 当获取锁失败,是继续等待还是放弃<br/>
 * 默认为继续等待
 */
 boolean toWait() default true;
 /**
 * 没有获取到锁的情况下且toWait()为继续等待,睡眠指定毫秒数继续获取锁,也就是轮训获取锁的时间<br/>
 * 默认为10毫秒
 *
 * @return
 */
 long sleepMills() default 10;
 /**
 * 锁获取超时时间:<br/>
 * 没有获取到锁的情况下且toWait()为true继续等待,最大等待时间,如果超时抛出
 * {@link java.util.concurrent.TimeoutException.TimeoutException}
 * ,可捕获此异常做相应业务处理;<br/>
 * 单位毫秒,默认一分钟,如果设置为0即为没有超时时间,一直获取下去;
 *
 * @return
 */
 long maxSleepMills() default 60 * 1000;
}
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.yaoguoyin.redis.lock;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
 * <b>同步锁 key</b><br/>
 * 加在方法的参数上,指定的参数会作为锁的key的一部分
 *
 * @author partner4java
 *
 */
@Target({ ElementType.PARAMETER })
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface P4jSynKey {
 /**
 * key的拼接顺序
 *
 * @return
 */
 int index() default 0;
}

这里就不再对两个注解进行使用上的解释了,因为注释已经说明的很详细了。

使用示例:

?
1
2
3
4
5
6
7
8
9
10
11
package com.yaoguoyin.redis.lock;
import org.springframework.stereotype.Component;
@Component
public class SysTest {
 private static int i = 0;
 @P4jSyn(synKey = "12345")
 public void add(@P4jSynKey(index = 1) String key, @P4jSynKey(index = 0) int key1) {
 i++;
 System.out.println("i=-===========" + i);
 }
}

第二步:切面编程

在不影响原有代码的前提下,保证执行同步,目前最直接的方式就是使用切面编程

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
package com.yaoguoyin.redis.lock;
import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.core.BoundValueOperations;
import org.springframework.data.redis.core.RedisTemplate;
/**
 * 锁的切面编程<br/>
 * 针对添加@RedisLock 注解的方法进行加锁
 *
 * @see com.yaoguoyin.redis.lock.P4jSyn
 *
 * @author partner4java
 *
 */
@Aspect
public class RedisLockAspect {
 @Autowired
 @Qualifier("redisTemplate")
 private RedisTemplate<String, Long> redisTemplate;
 @Around("execution(* com.yaoguoyin..*(..)) && @annotation(com.yaoguoyin.redis.lock.P4jSyn)")
 public Object lock(ProceedingJoinPoint pjp) throws Throwable {
 P4jSyn lockInfo = getLockInfo(pjp);
 if (lockInfo == null) {
  throw new IllegalArgumentException("配置参数错误");
 }
 String synKey = getSynKey(pjp, lockInfo.synKey());
 if (synKey == null || "".equals(synKey)) {
  throw new IllegalArgumentException("配置参数synKey错误");
 }
 boolean lock = false;
 Object obj = null;
 try {
  // 超时时间
  long maxSleepMills = System.currentTimeMillis() + lockInfo.maxSleepMills();
  while (!lock) {
  long keepMills = System.currentTimeMillis() + lockInfo.keepMills();
  lock = setIfAbsent(synKey, keepMills);
  // 得到锁,没有人加过相同的锁
  if (lock) {
   obj = pjp.proceed();
  }
  // 锁设置了没有超时时间
  else if (lockInfo.keepMills() <= 0) {
   // 继续等待获取锁
   if (lockInfo.toWait()) {
   // 如果超过最大等待时间抛出异常
   if (lockInfo.maxSleepMills() > 0 && System.currentTimeMillis() > maxSleepMills) {
    throw new TimeoutException("获取锁资源等待超时");
   }
   TimeUnit.MILLISECONDS.sleep(lockInfo.sleepMills());
   } else {
   break;
   }
  }
  // 已过期,并且getAndSet后旧的时间戳依然是过期的,可以认为获取到了锁
  else if (System.currentTimeMillis() > getLock(synKey) && (System.currentTimeMillis() > getSet(synKey, keepMills))) {
   lock = true;
   obj = pjp.proceed();
  }
  // 没有得到任何锁
  else {
   // 继续等待获取锁
   if (lockInfo.toWait()) {
   // 如果超过最大等待时间抛出异常
   if (lockInfo.maxSleepMills() > 0 && System.currentTimeMillis() > maxSleepMills) {
    throw new TimeoutException("获取锁资源等待超时");
   }
   TimeUnit.MILLISECONDS.sleep(lockInfo.sleepMills());
   }
   // 放弃等待
   else {
   break;
   }
  }
  }
 } catch (Exception e) {
  e.printStackTrace();
  throw e;
 } finally {
  // 如果获取到了锁,释放锁
  if (lock) {
  releaseLock(synKey);
  }
 }
 return obj;
 }
 /**
 * 获取包括方法参数上的key<br/>
 * redis key的拼写规则为 "RedisSyn+" + synKey + @P4jSynKey
 *
 */
 private String getSynKey(ProceedingJoinPoint pjp, String synKey) {
 try {
  synKey = "RedisSyn+" + synKey;
  Object[] args = pjp.getArgs();
  if (args != null && args.length > 0) {
  MethodSignature methodSignature = (MethodSignature) pjp.getSignature();
  Annotation[][] paramAnnotationArrays = methodSignature.getMethod().getParameterAnnotations();
  SortedMap<Integer, String> keys = new TreeMap<Integer, String>();
 
  for (int ix = 0; ix < paramAnnotationArrays.length; ix++) {
   P4jSynKey p4jSynKey = getAnnotation(P4jSynKey.class, paramAnnotationArrays[ix]);
   if (p4jSynKey != null) {
   Object arg = args[ix];
   if (arg != null) {
    keys.put(p4jSynKey.index(), arg.toString());
   }
   }
  }
  if (keys != null && keys.size() > 0) {
   for (String key : keys.values()) {
   synKey = synKey + key;
   }
  }
  }
  return synKey;
 } catch (Exception e) {
  e.printStackTrace();
 }
 return null;
 }
 @SuppressWarnings("unchecked")
 private static <T extends Annotation> T getAnnotation(final Class<T> annotationClass, final Annotation[] annotations) {
 if (annotations != null && annotations.length > 0) {
  for (final Annotation annotation : annotations) {
  if (annotationClass.equals(annotation.annotationType())) {
   return (T) annotation;
  }
  }
 }
 return null;
 }
 /**
 * 获取RedisLock注解信息
 */
 private P4jSyn getLockInfo(ProceedingJoinPoint pjp) {
 try {
  MethodSignature methodSignature = (MethodSignature) pjp.getSignature();
  Method method = methodSignature.getMethod();
  P4jSyn lockInfo = method.getAnnotation(P4jSyn.class);
  return lockInfo;
 } catch (Exception e) {
  e.printStackTrace();
 }
 return null;
 }
 public BoundValueOperations<String, Long> getOperations(String key) {
 return redisTemplate.boundValueOps(key);
 }
 /**
 * Set {@code value} for {@code key}, only if {@code key} does not exist.
 * <p>
 * See http://redis.io/commands/setnx
 *
 * @param key
 *  must not be {@literal null}.
 * @param value
 *  must not be {@literal null}.
 * @return
 */
 public boolean setIfAbsent(String key, Long value) {
 return getOperations(key).setIfAbsent(value);
 }
 public long getLock(String key) {
 Long time = getOperations(key).get();
 if (time == null) {
  return 0;
 }
 return time;
 }
 public long getSet(String key, Long value) {
 Long time = getOperations(key).getAndSet(value);
 if (time == null) {
  return 0;
 }
 return time;
 }
 public void releaseLock(String key) {
 redisTemplate.delete(key);
 }
}

RedisLockAspect会对添加注解的方法进行特殊处理,具体可看lock方法。

大致思路就是:

1、首选借助redis本身支持对应的setIfAbsent方法,该方法的特点是如果redis中已有该数据不保存返回false,不存该数据保存返回true;

2、如果setIfAbsent返回true标识拿到同步锁,可进行操作,操作后并释放锁;

3、如果没有通过setIfAbsent拿到数据,判断是否对锁设置了超时机制,没有设置判断是否需要继续等待;

4、判断是否锁已经过期,需要对(System.currentTimeMillis() > getLock(synKey) && (System.currentTimeMillis() > getSet(synKey, keepMills)))进行细细的揣摩一下,getSet可能会改变了其他人拥有锁的超时时间,但是几乎可以忽略;

5、没有得到任何锁,判断继续等待还是退出。

第三步:spring的基本配置

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#*****************jedis连接参数设置*********************#
 
#redis服务器ip #
redis.hostName=127.0.0.1
 
#redis服务器端口号#
redis.port=6379
 
#redis服务器外部访问密码
redis.password=XXXXXXXXXX
 
#************************jedis池参数设置*******************#
 
#jedis的最大分配对象#
jedis.pool.maxActive=1000
 
jedis.pool.minIdle=100
 
#jedis最大保存idel状态对象数 #
jedis.pool.maxIdle=1000
 
#jedis池没有对象返回时,最大等待时间 #
jedis.pool.maxWait=5000
 
#jedis调用borrowObject方法时,是否进行有效检查#
jedis.pool.testOnBorrow=true
 
#jedis调用returnObject方法时,是否进行有效检查 #
jedis.pool.testOnReturn=true
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xmlns:context="http://www.springframework.org/schema/context" xmlns:jee="http://www.springframework.org/schema/jee"xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop"xmlns:redis="http://www.springframework.org/schema/redis" xmlns:cache="http://www.springframework.org/schema/cache" xsi:schemaLocation="http://www.springframework.org/schema/beans  http://www.springframework.org/schema/beans/spring-beans-4.2.xsd  http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd  http://www.springframework.org/schema/aop  http://www.springframework.org/schema/aop/spring-aop-4.1.xsd  http://www.springframework.org/schema/redis  http://www.springframework.org/schema/redis/spring-redis.xsd
http://www.springframework.org/schema/cache  http://www.springframework.org/schema/cache/spring-cache.xsd">
 <!-- 开启注解 -->
 <aop:aspectj-autoproxy />
 <bean class="com.yaoguoyin.redis.lock.RedisLockAspect" />
 <!-- 扫描注解包范围 -->
 <context:component-scan base-package="com.yaoguoyin" />
 <!-- 引入redis配置 -->
 <context:property-placeholder location="classpath:config.properties" />
 <!-- 连接池 -->
 <bean id="poolConfig" class="redis.clients.jedis.JedisPoolConfig">
 <property name="minIdle" value="${jedis.pool.minIdle}" />
 <property name="maxIdle" value="${jedis.pool.maxIdle}" />
 <property name="maxWaitMillis" value="${jedis.pool.maxWait}" />
 </bean>
 <!-- p:password="${redis.pass}" -->
 <bean id="redisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory" p:host-name="${redis.hostName}" p:port="${redis.port}"
 p:password="${redis.password}" p:pool-config-ref="poolConfig" />
 <!-- 类似于jdbcTemplate -->
 <bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate" p:connection-factory-ref="redisConnectionFactory" />
</beans>

redis的安装本文就不再说明。

测试

?
1
2
3
4
5
6
7
8
9
package com.yaoguoyin.redis;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = { "classpath:META-INF/spring/redis.xml" })
public class BaseTest extends AbstractJUnit4SpringContextTests {
}
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.yaoguoyin.redis.lock;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import com.yaoguoyin.redis.BaseTest;
public class RedisTest extends BaseTest {
 @Autowired
 private SysTest sysTest;
 @Test
 public void testHello() throws InterruptedException {
 for (int i = 0; i < 100; i++) {
  new Thread(new Runnable() {
  @Override
  public void run() {
   try {
   TimeUnit.SECONDS.sleep(1);
   } catch (InterruptedException e) {
   e.printStackTrace();
   }
   sysTest.add("xxxxx", 111111);
  }
  }).start();
 }
 TimeUnit.SECONDS.sleep(20);
 }
 @Test
 public void testHello2() throws InterruptedException{
 sysTest.add("xxxxx", 111111);
 TimeUnit.SECONDS.sleep(10);
 }
}

你可以对

void com.yaoguoyin.redis.lock.SysTest.add(@P4jSynKey(index=1) String key, @P4jSynKey(index=0) int key1)

去除注解@P4jSyn进行测试对比。

ps:本demo的执行性能取决于redis和Java交互距离;成千山万单锁并发建议不要使用这种形式,直接通过redis等解决,本demo只解决小并发不想耦合代码的形式。

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,同时也希望多多支持服务器之家!

原文链接:http://blog.csdn.net/partner4java/article/details/52198801

延伸 · 阅读

精彩推荐