SyncWMSOrderTask.php 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. <?php
  2. namespace App\Console\Commands;
  3. use App\OracleDOCOrderHeader;
  4. use App\Services\OracleDOCOrderHeaderService;
  5. use App\Services\OrderService;
  6. use App\ValueStore;
  7. use Carbon\Carbon;
  8. use Illuminate\Console\Command;
  9. use Illuminate\Support\Facades\Cache;
  10. use Illuminate\Support\Facades\Log;
  11. class SyncWMSOrderTask extends Command
  12. {
  13. /**
  14. * @var OrderService $service
  15. */
  16. private $service;
  17. protected $signature = 'sync:order';
  18. private $last_start_key ; // 上一次任务开始时间
  19. private $last_end_key ; // 上一次任务结束时间
  20. private $last_err_key ; // 上一次任务异常时间
  21. private $restart; // 重启时间
  22. private $is_enabled; // 是否开启
  23. protected $description = 'Command description';
  24. public function __construct()
  25. {
  26. parent::__construct();
  27. $this->service = app('OrderService');
  28. }
  29. public function init()
  30. {
  31. $this->last_start_key = config('sync.order_sync.cache_prefix.last_start_at');
  32. $this->last_end_key = config('sync.order_sync.cache_prefix.last_end_at');
  33. $this->last_err_key = config('sync.order_sync.cache_prefix.last_err_at');
  34. $this->restart = config('sync.order_sync.cache_prefix.restart');
  35. $this->is_enabled= config('sync.order_sync.enabled');
  36. }
  37. public function handle()
  38. {
  39. $this->init();
  40. if($this->is_enabled===false)return;
  41. sleep(rand(2,3));
  42. $start_time = Cache::remember($this->last_start_key,null,function (){
  43. return ValueStore::query()->firstOrCreate(['name'=>$this->last_start_key])->first()->value;
  44. });
  45. $end_time = Cache::remember($this->last_end_key,null,function (){
  46. return ValueStore::query()->firstOrCreate(['name'=>$this->last_end_key])->first()->value;
  47. });
  48. $start = Carbon::now();
  49. // 判断上一次任务异常了
  50. // 第一次出现异常没有记录上任务结束时间
  51. if(isset($start_time) && empty($end_time) && $start->diffInMinutes(Carbon::parse($start_time)) < $this->restart)return;
  52. // 这次任务启动时间 距离上一次任务的一个启动时间 小于10
  53. if(isset($start_time) && isset($end_time)
  54. && Carbon::parse($end_time)->lt(Carbon::parse($start_time))
  55. && $start->diffInMinutes(Carbon::parse($start_time)) < $this->restart)
  56. return;
  57. $start = (string)$start;
  58. Cache::put($this->last_start_key,$start);
  59. ValueStore::query()->where('name',$this->last_start_key)->update(['value'=>$start]);
  60. Log::info("订单同步开始时间",['date' => $start]);
  61. $this->syncCreatedOrder();
  62. $this->syncUpdatedOrder();
  63. $end = (string)Carbon::now();
  64. Cache::put($this->last_end_key,$end);
  65. ValueStore::query()->where('name',$this->last_end_key)->update(['value'=>$end]);
  66. Log::info("订单同步结束时间",['date' => $end]);
  67. }
  68. public function syncCreatedOrder()
  69. {
  70. /**
  71. * @var OracleDocOrderHeaderService $oracleDOCOrderHeaderService
  72. */
  73. $oracleDOCOrderHeaderService = app('OracleDocOrderHeaderService');
  74. $newest_key = config('sync.order_sync.cache_prefix.created_at');
  75. $newest_list_key = config('sync.order_sync.cache_prefix.newest_list');
  76. $hasKey = config('sync.order_sync.cache_prefix.newest_has');
  77. $prefixKey = config('sync.order_sync.cache_prefix.newest');
  78. ini_set('memory_limit', '1024M');
  79. $last_date = $this->service->getOrderSyncAt($newest_key,'newest'); // 获取创建时间点
  80. $orderHeaders = OracleDOCOrderHeader::query()
  81. ->selectRaw(implode(',',OracleDOCOrderHeaderService::$columns))
  82. ->whereColumn('DOC_Order_Header.editTime','=','DOC_Order_Header.addTime')
  83. ->where('addTime','>=',$last_date)
  84. ->orderBy('DOC_Order_Header.addTime')->get();
  85. if($orderHeaders->count()==0)return;
  86. $orderHeaderList = $orderHeaders->chunk(100);
  87. foreach ($orderHeaderList as $item) {
  88. $item = $oracleDOCOrderHeaderService->loadMissing($item);
  89. $last_order = $item->last(); // 时间点靠后的
  90. $newest_orders = $item->where('addtime',$last_order->addtime);
  91. $orderHeaders = $this->service->filterOrderByCache($item,$newest_list_key); // 对比缓存
  92. if(count($newest_orders)>0 && count($orderHeaders) >0){
  93. $this->service->syncOrder($orderHeaders); // 同步订单
  94. $this->service->cancelOrderCache($newest_list_key,$prefixKey); // 清除缓存
  95. $this->service->pushOrderCache($newest_orders,$prefixKey,$hasKey,$newest_list_key); // 添加缓存
  96. $this->service->setOrderSyncAt($newest_key,$last_order->addtime,count($orderHeaders)>0); // 更新时间
  97. }
  98. }
  99. unset($orderHeaders,$newest_orders,$last_order);
  100. }
  101. public function syncUpdatedOrder()
  102. {
  103. /**
  104. * @var OracleDOCOrderHeaderService $oracleDOCOrderHeaderService
  105. */
  106. $oracleDOCOrderHeaderService = app('OracleDocOrderHeaderService');
  107. $renewal_key = config('sync.order_sync.cache_prefix.updated_at');
  108. $renewal_list_key = config('sync.order_sync.cache_prefix.renewal_list');
  109. $hasKey = config('sync.order_sync.cache_prefix.renewal_has');
  110. $prefixKey = config('sync.order_sync.cache_prefix.renewal');
  111. ini_set('memory_limit', '1024M');
  112. $last_date = $this->service->getOrderSyncAt($renewal_key,'renewal'); // 获取更新时间点
  113. $orderHeaders = OracleDOCOrderHeader::query()
  114. ->selectRaw(implode(',',OracleDOCOrderHeaderService::$columns))
  115. ->whereColumn('DOC_Order_Header.editTime','!=','DOC_Order_Header.addTime')
  116. ->where('editTime',">=",$last_date)
  117. ->orderBy('editTime')->get();
  118. if($orderHeaders->count()==0)return;
  119. $orderHeaderList = $orderHeaders->chunk(100);
  120. foreach ($orderHeaderList as $item) {
  121. $item = $oracleDOCOrderHeaderService->loadMissing($item);
  122. $renewal_order = $item->last(); // 时间点靠后的
  123. $renewal_orders = $item->where('edittime',$renewal_order->edittime);
  124. $orderHeaders = $this->service->filterOrderByCache($orderHeaders,$renewal_list_key); // 对比缓存
  125. if(count($renewal_orders)>0 && count($orderHeaders)>0){
  126. $this->service->syncOrder($item); // 同步订单
  127. $this->service->cancelOrderCache($renewal_list_key,$prefixKey); // 清除缓存
  128. $this->service->pushOrderCache($renewal_orders,$prefixKey,$hasKey,$renewal_list_key); // 添加缓存
  129. $this->service->setOrderSyncAt($renewal_key,$renewal_order->edittime,count($orderHeaders)>0); // 更新时间
  130. }
  131. }
  132. unset($orderHeaders,$renewal_orders,$renewal_order);
  133. }
  134. }