service = app(DocWaveHeaderService::class); $this->batchService = app(BatchService::class); } public function handle() { sleep(rand(0,10)); $this->disposeHeader(); $this->disposeDetail(); } private function disposeHeader($date = null) { DB::transaction(function ()use($date){ //获取更新时间与WMS数据 if (!$date){ $valueStore = ValueStore::query()->where("name","wave_last_sync_date")->lockForUpdate()->first(); $date = $valueStore->value ?? Carbon::now()->subSeconds(65)->toDateTimeString(); } $count = DB::connection("oracle")->selectOne(DB::raw("SELECT COUNT(*) count FROM DOC_WAVE_HEADER WHERE EDITTIME >= TO_DATE(?,'yyyy-mm-dd hh24:mi:ss')"),[$date]); if ($count->count > 500){ $query = DB::raw(<<= TO_DATE(?,'yyyy-mm-dd hh24:mi:ss') ORDER BY EDITTIME)header WHERE ROWNUM <= 500)wave WHERE wave.rowno >= 0 sql ); $waves = DB::connection("oracle")->select($query,[$date]); $this->headerExe($waves); $this->disposeHeader($waves[count($waves)-1]->edittime); }else{ $waves = $this->service->get(["edittime"=>$date],["edittime"=>"gtOrEqual"]); $this->headerExe($waves); } }); } private function headerExe($waves) { if (count($waves) < 1)return; //获取本地数据对比差异 $codes = []; foreach ($waves as $wave){ $codes[] = $wave->waveno; } $map = []; $batches = $this->batchService->get(["code"=>$codes]); if ($batches){ foreach ($batches as $batch)$map[$batch->code] = $batch->id; } $update = [["id","wms_status","remark","updated_at"]]; $insert = []; foreach ($waves as $wave){ if (isset($map[$wave->waveno])){ $update[] = [ "id" => $map[$wave->waveno], "wms_status" => $this->wms_status($wave), "remark"=>$wave->descr, "updated_at"=>$wave->edittime, ]; continue; } $owner = app("OwnerService")->codeGetOwner($wave->customerid); $insert[] = [ "code" => $wave->waveno, "remark"=>$wave->descr, "status" => '未处理', "wms_status" => $this->wms_status($wave), "created_at"=>$wave->addtime, "updated_at"=>$wave->edittime, "owner_id"=>$owner->id, ]; } //存在则更新 if (count($update)>1){ $bool = app(BatchUpdateService::class)->batchUpdate("batches",$update); if ($bool)LogService::log(__METHOD__,"SUCCESS-同步更新波次成功",json_encode($update)); else LogService::log(__METHOD__,"ERROR-同步更新波次失败",json_encode($update)); } //不存在则录入 if ($insert){ $this->batchService->insert($insert); LogService::log(__METHOD__,"SUCCESS-同步插入波次成功",json_encode($insert)); } ValueStore::query()->where("name","wave_last_sync_date")->update(["value"=>$waves[count($waves)-1]->edittime]); } public function disposeDetail($date = null) { DB::transaction(function ()use($date){ if (!$date){ $valueStore = ValueStore::query()->where("name","wave_detail_last_sync_date")->lockForUpdate()->first(); $date = $valueStore->value ?? Carbon::now()->subSeconds(65)->toDateTimeString(); } $count = DB::connection("oracle")->selectOne(DB::raw("SELECT count(*) count FROM DOC_WAVE_DETAILS WHERE EDITTIME >= TO_DATE(?,'yyyy-mm-dd hh24:mi:ss')"),[$date]); if ($count->count > 1000){ $sql = <<= TO_DATE(?,'yyyy-mm-dd hh24:mi:ss') ORDER BY EDITTIME) WHERE ROWNUM <= 1000)wave WHERE wave.rowno >= 0 sql; $details = DB::connection("oracle")->select(DB::raw($sql),[$date]); $this->detailExe($details); $this->disposeDetail($details[count($details)-1]->edittime); }else{ $sql = "SELECT ORDERNO,WAVENO,SEQNO,EDITTIME FROM DOC_WAVE_DETAILS WHERE EDITTIME >= TO_DATE(?,'yyyy-mm-dd hh24:mi:ss')"; $details = DB::connection("oracle")->select(DB::raw($sql),[$date]); $this->detailExe($details); } }); } private function detailExe($details) { if (count($details) < 1)return; $map = []; $nos = []; $orderCodes = []; $seqnos = []; $batchMapping = []; foreach ($details as $detail){ if (isset($map[$detail->waveno]))$map[$detail->waveno][] = $detail->orderno; else { $map[$detail->waveno] = [$detail->orderno]; $nos[] = $detail->waveno; } $orderCodes[] = $detail->orderno; $seqnos[$detail->orderno] = $detail->seqno; $batchMapping[$detail->orderno] = $detail->waveno; } $orders = Order::query()->select("id","batch_id","code")->whereIn("code",$orderCodes)->get(); //if (count($orderCodes) != count($orders))LogService::log(__METHOD__,"波次同步-本地订单缺失",json_encode($orderCodes)." | ".count($orderCodes)." | ".count($orders)); if ($orders){ $orderIds = []; $orderMap = []; foreach ($orders as $order){ $orderIds[] = $order->id; $orderMap[$order->id] = $seqnos[$order->code]; } $updateBin = [["id","number"]]; $insertBin = []; $deleteBin = []; $orderBins = OrderBin::query()->select("id","order_id","number")->whereIn("order_id",$orderIds)->get(); foreach ($orderBins as $orderBin){ if (!isset($orderMap[$orderBin->order_id])){ $deleteBin[] = $orderBin->id; continue; } if ($orderBin->number != $orderMap[$orderBin->order_id])$updateBin[]=["id"=>$orderBin->id,"number"=>$orderMap[$orderBin->order_id]]; unset($orderMap[$orderBin->order_id]); } $d = date('Y-m-d H:i:s'); foreach ($orderMap as $orderId=>$binNumber){ $insertBin[]=[ "order_id"=>$orderId, "number"=>$binNumber, 'created_at' => $d ]; } if (count($updateBin)>1){ app(BatchUpdateService::class)->batchUpdate("order_bins",$updateBin); LogService::log(__METHOD__,"波次同步-更新订单格口号",json_encode($updateBin)); } if ($insertBin){ OrderBin::query()->insert($insertBin); LogService::log(__METHOD__,"波次同步-录入订单格口号",json_encode($insertBin)); } if ($deleteBin){ OrderBin::destroy($deleteBin); LogService::log(__METHOD__,"波次同步-删除订单格口号",json_encode($deleteBin)); } } $batches = $this->batchService->get(["code"=>$nos]); $batchDiff = array_keys(array_flip(array_diff($nos,array_column($batches->toArray(),"code")))); if (count($batchDiff)>0){ $sql = <<select(DB::raw($sql)); $this->headerExe($wmsBatches); $batches = $this->batchService->get(["code"=>$nos]); } $updateOrder = [["code","batch_id"]]; foreach ($batches as $batch){ $updateOrder[] = [ "code"=>$map[$batch->code], "batch_id"=>$batch->id ]; unset($map[$batch->code]); } if (count($updateOrder)>1){ app("OrderService")->batchUpdate($updateOrder);//反向修改订单 LogService::log(__METHOD__,"波次同步-修改订单波次号",json_encode($updateOrder)); } if ($map){ $waveCodes = array_keys($map); $waves = $this->service->get(["waveno"=>$waveCodes],["waveno"=>"in"]); $insert = []; foreach ($waves as $wave){ $owner = app("OwnerService")->codeGetOwner($wave->customerid); $insert[] = [ "code" => $wave->waveno, "status" => '未处理', "wms_status" => $this->wms_status($wave), "remark"=>$wave->descr, "created_at"=>$wave->addtime, "updated_at"=>$wave->edittime, "owner_id"=>$owner->id, ]; } if ($insert){ $this->batchService->insert($insert); LogService::log(__METHOD__,"SUCCESS-同步插入波次成功",json_encode($insert)); $batches = $this->batchService->get(["code"=>$waveCodes]); foreach ($batches as $batch){ app("OrderService")->update(["code"=>$map[$batch->code]],["batch_id"=>$batch->id]); } } } // $this->batchService->assignTasks($batches); ValueStore::query()->where("name","wave_detail_last_sync_date")->update(["value"=>$details[count($details)-1]->edittime]); } /** * @param $wave * @return string */ public function wms_status($wave): string { switch ($wave->wavestatus) { case 00: $wms_status = '创建'; break; case 40: $wms_status = '部分收货'; break; case 90: $wms_status = '取消'; break; case 99: $wms_status = '完成'; break; case 62: $wms_status = '部分装箱'; break; default: $wms_status = (string)$wave->wavestatus; } return $wms_status; } }