service = app('OrderService'); } public function init() { $this->last_start_key = config('sync.order_sync.cache_prefix.last_start_at'); $this->last_end_key = config('sync.order_sync.cache_prefix.last_end_at'); $this->last_err_key = config('sync.order_sync.cache_prefix.last_err_at'); $this->restart = config('sync.order_sync.cache_prefix.restart'); $this->is_enabled= config('sync.order_sync.enabled'); } public function handle() { $this->init(); if($this->is_enabled===false)return; sleep(rand(2,3)); $start_time = Cache::remember($this->last_start_key,null,function (){ return ValueStore::query()->firstOrCreate(['name'=>$this->last_start_key])->first()->value; }); $end_time = Cache::remember($this->last_end_key,null,function (){ return ValueStore::query()->firstOrCreate(['name'=>$this->last_end_key])->first()->value; }); $start = Carbon::now(); // 判断上一次任务异常了 // 第一次出现异常没有记录上任务结束时间 if(isset($start_time) && empty($end_time) && $start->diffInMinutes(Carbon::parse($start_time)) < $this->restart)return; // 这次任务启动时间 距离上一次任务的一个启动时间 小于10 if(isset($start_time) && isset($end_time) && Carbon::parse($end_time)->lt(Carbon::parse($start_time)) && $start->diffInMinutes(Carbon::parse($start_time)) < $this->restart) return; $start = (string)$start; Cache::put($this->last_start_key,$start); ValueStore::query()->where('name',$this->last_start_key)->update(['value'=>$start]); $this->syncCreatedOrder(); $this->syncUpdatedOrder(); $end = (string)Carbon::now(); Cache::put($this->last_end_key,$end); ValueStore::query()->where('name',$this->last_end_key)->update(['value'=>$end]); } public function syncCreatedOrder() { /** * @var OracleDocOrderHeaderService $oracleDOCOrderHeaderService */ $oracleDOCOrderHeaderService = app('OracleDocOrderHeaderService'); $newest_key = config('sync.order_sync.cache_prefix.created_at'); $newest_list_key = config('sync.order_sync.cache_prefix.newest_list'); $hasKey = config('sync.order_sync.cache_prefix.newest_has'); $prefixKey = config('sync.order_sync.cache_prefix.newest'); ini_set('memory_limit', '1024M'); $last_date = $this->service->getOrderSyncAt($newest_key,'newest'); // 获取创建时间点 $orderHeaders = OracleDOCOrderHeader::query() ->selectRaw(implode(',',OracleDOCOrderHeaderService::$columns)) ->whereColumn('DOC_Order_Header.editTime','=','DOC_Order_Header.addTime') ->where('addTime','>=',$last_date) ->orderBy('DOC_Order_Header.addTime')->get(); if($orderHeaders->count()==0)return; $orderHeaderList = $orderHeaders->chunk(100); foreach ($orderHeaderList as $item) { $item = $oracleDOCOrderHeaderService->loadMissing($item); $last_order = $item->last(); // 时间点靠后的 $newest_orders = $item->where('addtime',$last_order->addtime); $orderHeaders = $this->service->filterOrderByCache($item,$newest_list_key); // 对比缓存 if(count($newest_orders)>0 && count($orderHeaders) >0){ $this->service->syncOrder($orderHeaders); // 同步订单 $this->service->cancelOrderCache($newest_list_key,$prefixKey); // 清除缓存 $this->service->pushOrderCache($newest_orders,$prefixKey,$hasKey,$newest_list_key); // 添加缓存 $this->service->setOrderSyncAt($newest_key,$last_order->addtime,count($orderHeaders)>0); // 更新时间 } } unset($orderHeaders,$newest_orders,$last_order); } public function syncUpdatedOrder() { /** * @var OracleDOCOrderHeaderService $oracleDOCOrderHeaderService */ $oracleDOCOrderHeaderService = app('OracleDocOrderHeaderService'); $renewal_key = config('sync.order_sync.cache_prefix.updated_at'); $renewal_list_key = config('sync.order_sync.cache_prefix.renewal_list'); $hasKey = config('sync.order_sync.cache_prefix.renewal_has'); $prefixKey = config('sync.order_sync.cache_prefix.renewal'); ini_set('memory_limit', '1024M'); $last_date = $this->service->getOrderSyncAt($renewal_key,'renewal'); // 获取更新时间点 $orderHeaders = OracleDOCOrderHeader::query() ->selectRaw(implode(',',OracleDOCOrderHeaderService::$columns)) ->whereColumn('DOC_Order_Header.editTime','!=','DOC_Order_Header.addTime') ->where('editTime',">=",$last_date) ->orderBy('editTime')->get(); if($orderHeaders->count()==0)return; $orderHeaderList = $orderHeaders->chunk(100); foreach ($orderHeaderList as $item) { $item = $oracleDOCOrderHeaderService->loadMissing($item); $renewal_order = $item->last(); // 时间点靠后的 $renewal_orders = $item->where('edittime',$renewal_order->edittime); $orderHeaders = $this->service->filterOrderByCache($orderHeaders,$renewal_list_key); // 对比缓存 if(count($renewal_orders)>0 && count($orderHeaders)>0){ $this->service->syncOrder($item); // 同步订单 $this->service->cancelOrderCache($renewal_list_key,$prefixKey); // 清除缓存 $this->service->pushOrderCache($renewal_orders,$prefixKey,$hasKey,$renewal_list_key); // 添加缓存 $this->service->setOrderSyncAt($renewal_key,$renewal_order->edittime,count($orderHeaders)>0); // 更新时间 } } unset($orderHeaders,$renewal_orders,$renewal_order); } }