系统城装机大师 - 固镇县祥瑞电脑科技销售部宣传站!

当前位置:首页 > 数据库 > Redis > 详细页面

异步redis队列实现 数据入库的方法

时间:2019-12-04来源:系统城作者:电脑系统城

业务需求

app客户端向服务端接口发送来json 数据 每天 发一次 清空缓存后会再次发送

出问题之前业务逻辑:

php 接口 首先将 json 转为数组 去重 在一张大表中插入不存在的数据

该用户已经存在 和新增的id

入另一种详情表

问题所在:

当用户因特殊情况清除缓存 导致app 发送json串 入库并发高 导致CPU 暴增到88% 并且居高不下

优化思路:

1、异步队列处理

2、redis 过滤(就是只处理当天第一次请求)

3、redis 辅助存储app名称(验证过后批量插入数据app名称表中)

4、拼接插入的以及新增的如详细表中

解决办法:

1、接口修改 redis 过滤 + 如list队列 并将结果存入redis中

首先 redis将之前的历史数据放在redis 哈希里面 中文为键名 id 为键值


 
  1. <?php
  2. /**
  3. * Created by haiyong.
  4. * User: jia
  5. * Date: 2017/9/18
  6. * Time: 20:06
  7. */
  8. namespace App\Http\Controllers\App;
  9.  
  10.  
  11. use App\Http\Controllers\Controller;
  12. use Illuminate\Http\Request;
  13. use Illuminate\Support\Facades\DB;
  14. use Illuminate\Support\Facades\Redis;
  15.  
  16. class OtherAppController extends Controller{
  17.  
  18. /**
  19. * app应用统计接口
  20. * @param Request $request
  21. * @return string
  22. */
  23. public function appTotal(Request $request)
  24. {
  25. // //历史数据入库
  26. //$redis = Redis::connection('web_active');
  27. // $app_name = DB::connection('phpLog')->table('app_set_name')->where("appName", '<>', ' ')->lists('id', 'appName');
  28. // $str = '';
  29. // foreach ($app_name as $key => $val) {
  30. // $str.= "{$val} {$key} ";
  31. // }
  32. // $redis->hmset('app_name', $app_name);
  33. // echo $str;exit;
  34. $result = $request->input('res');
  35. $list = json_decode($result, true);
  36. if (empty ($list) || !is_array($list)) {
  37. return json_encode(['result' => 'ERROR', 'msg' => 'parameter error']);
  38. }
  39. $data['uid'] = isset($list['uid']) ? $list['uid'] : '20001' ;
  40. $data['time'] = date('Y-m-d');
  41. $redis_key = 'log_app:'.$data['time'];
  42. //redis 过滤
  43. $redis = Redis::connection('web_active');
  44. //redis 键值过期设置
  45. if (empty($redis->exists($redis_key))) {
  46. $redis->hset($redis_key, 1, 'start');
  47. $redis->EXPIREAT($redis_key, strtotime($data['time'].'+2 day'));
  48. }
  49. //值确定
  50. if ($redis->hexists($redis_key, $data['uid'])) {
  51. return json_encode(['result' => 'SUCCESS']);
  52. } else {
  53. //推入队列
  54. $redis->hset($redis_key, $data['uid'], $result);
  55. $redis->rpush('log_app_list', $data['time'] . ':' . $data['uid']);
  56. return json_encode(['result' => 'SUCCESS']);
  57. }
  58. }
  59. }

2、php 脚本循环 监控redis 队列 执行逻辑 防止内存溢出

mget 获取该用户的app id 不存在就会返回null

通过判断null 运用redis 新值作为自增id指针 将null 补齐 之后批量入mysql 并跟新redis 哈希 和指针值 并入库 详情表


 
  1. <?php
  2.  
  3. namespace App\Console\Commands;
  4.  
  5. use Illuminate\Console\Command;
  6. use Illuminate\Support\Facades\Redis;
  7. use Illuminate\Support\Facades\DB;
  8. use Illuminate\Support\Facades\Storage;
  9.  
  10. class AppTotal extends Command
  11. {
  12. /**
  13. * The name and signature of the console command.
  14. *
  15. * @var string
  16. */
  17. protected $signature = 'AppTotal:run';
  18.  
  19. /**
  20. * The console command description.
  21. *
  22. * @var string
  23. */
  24. protected $description = 'Command description';
  25.  
  26. /**
  27. * Create a new command instance.
  28. *
  29. * @return void
  30. */
  31. public function __construct()
  32. {
  33. parent::__construct();
  34. }
  35.  
  36. /**
  37. * Execute the console command.
  38. *
  39. * @return mixed
  40. */
  41. public function handle()
  42. {
  43. //历史数据入库
  44. // $redis = Redis::connection('web_active');
  45. // $app_name = DB::connection('phpLog')->table('app_set_name')->where("appName", '<>', ' ')->lists('id', 'appName');
  46. // $redis->hmset('app_name', $app_name);
  47. // exit;
  48. while(1) {
  49. $redis = Redis::connection('web_active');
  50. //队列名称
  51. $res = $redis->lpop('log_app_list');
  52. //开关按钮
  53. $lock = $redis->get('log_app_lock');
  54. if (!empty($res)) {
  55. list($date,$uid) = explode(':',$res);
  56. $result = $redis->hget('log_app:'.$date, $uid);
  57. if (!empty($result)) {
  58. $table_name = 'app_total'.date('Ym');
  59. $list = json_decode($result, true);
  60. $data['uid'] = isset($list['uid']) ? $list['uid'] : '20001' ;
  61. $data['sex'] = isset($list['sex']) ? $list['sex'] : '' ;
  62. $data['device'] = isset($list['device']) ? $list['device'] : '' ;
  63. $data['appList'] = isset($list['list']) ? $list['list'] : '' ;
  64. //数据去重 flip比unique更节约性能
  65. $data['appList'] = array_flip($data['appList']);
  66. $data['appList'] = array_flip($data['appList']);
  67. $data['time'] = date('Y-m-d');
  68. //app应用过滤
  69. $app_res = $redis->hmget('app_name', $data['appList']);
  70. //新增加app数组
  71. $new_app = [];
  72. //mysql 入库数组
  73. $mysql_new_app = [];
  74. //获取当前redis 自增指针
  75. $total = $redis->get('app_name_total');
  76. foreach ($app_res as $key =>& $val) {
  77. if (is_null($val)) {
  78. $total += 1;
  79. $new_app[$data['appList'][$key]] = $total;
  80. $val = $total;
  81. array_push($mysql_new_app,['id' => $total, 'appName'=> $data['appList'][$key]]);
  82. }
  83. }
  84. if (count($new_app)){
  85. $str = "INSERT IGNORE INTO app_set_name (id,appName) values";
  86. foreach ($new_app as $key => $val) {
  87. $str.= "(".$val.",'".$key."'),";
  88. }
  89. $str = trim($str, ',');
  90. //$mysql_res = DB::connection('phpLog')->table('app_set_name')->insert($mysql_new_app);
  91. $mysql_res = DB::connection('phpLog')->statement($str);
  92. if ($mysql_res) {
  93. // 设置redis 指针
  94. $redis->set('app_name_total', $total);
  95. // redis 数据入库
  96. $redis->hmset('app_name', $new_app);
  97. }
  98. }
  99. // 详情数据入库
  100. $data['appList'] = implode(',', $app_res);
  101. //app统计入库
  102. DB::connection('phpLog')->statement("INSERT IGNORE INTO ".$table_name." (uid,sex,device,`time`,appList)
  103. values('".$data['uid']."',".$data['sex'].",'".$data['device']."','".$data['time']."','".$data['appList']."')");
  104. //log 记录 当文件达到123MB的时候产生内存保错 所有这个地方可是利用日志切割 或者 不写入 日志
  105. Storage::disk('local')->append(DIRECTORY_SEPARATOR.'total'.DIRECTORY_SEPARATOR.'loaAppTotal.txt', date('Y-m-d H:i:s').' success '.$result."\n");
  106. } else {
  107. Storage::disk('local')->append(DIRECTORY_SEPARATOR.'total'.DIRECTORY_SEPARATOR.'loaAppTotal.txt', date('Y-m-d H:i:s').' error '.$result."\n");
  108. }
  109. }
  110. //执行间隔
  111. sleep(1);
  112. //结束按钮
  113. if ($lock == 2) {
  114. exit;
  115. }
  116. //内存检测
  117. if(memory_get_usage()>1000*1024*1024){
  118. exit('内存溢出');//大于100M内存退出程序,防止内存泄漏被系统杀死导致任务终端
  119. }
  120. }
  121. }
  122. }

3、执定 定时任务监控脚本执行情况


 
  1. crontab -e
  2.  
  3. /2 * * * * /bin/bash /usr/local/nginx/html/test.sh 1>>/usr/local/nginx/html/log.log 2>&1

test.sh 内容 (查看执行命令返回的进程id 如果没有就执行命令开启)


 
  1. #!/bin/bash
  2. alive=`ps -ef | grep AppTotal | grep -v grep | awk '{print $2}'`
  3. if [ ! $alive ]
  4. then
  5. /usr/local/php/bin/php /var/ms/artisan AppTotal:run > /dev/null &
  6. fi

记得授权哦 chmod +x test.sh

笔者用的laravel 框架 将命令激活丢入后台

执行命令


 
  1. /usr/local/php/bin/php /var/ms/artisan AppTotal:run > /dev/null &

完事直接 ctrl -c 结束就行 命令以在后台运行 可以用shell 中的命令查看进程id

这样就实现队列异步入库

还有很多问题需要优化!!大致功能已经实现!!!!!!

优化完成后cpu

以上这篇异步redis队列实现 数据入库的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持我们。

分享到:

相关信息

  • redis分布式ID解决方案示例详解

    常用的分布式ID解决方案 UUID Snowflake Snowflake算法的Java代码: Leaf Leaf算法的Java代码: 基于数据库自增ID生成 基于UUID生成 基于Redis生成 基于ZooKeeper生成...

    2023-03-09

  • Redis并发访问问题详细讲解

    什么场景需要控制并发访问 并发访问的控制方法 1、加入锁机制 2、操作原子化...

    2022-12-06

系统教程栏目

栏目热门教程

人气教程排行

站长推荐

热门系统下载