| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311 |
- <?php
- namespace App\Console\Commands;
- use App\Jobs\BatchTaskJob;
- use App\Jobs\BroadcastBatchToZhengCangJob;
- use App\Order;
- use App\OrderBin;
- use App\Services\BatchService;
- use App\Services\common\BatchUpdateService;
- use App\Services\DocWaveHeaderService;
- use App\Services\LogService;
- use App\ValueStore;
- use Carbon\Carbon;
- use Illuminate\Console\Command;
- use Illuminate\Support\Facades\DB;
- class SyncBatchTask extends Command
- {
- protected $signature = 'sync:batch';
- protected $description = 'sync wms batch task';
- /** @var DocWaveHeaderService $service */
- private $service;
- /** @var BatchService $batchService */
- private $batchService;
- public function __construct()
- {
- parent::__construct();
- $this->service = app(DocWaveHeaderService::class);
- $this->batchService = app(BatchService::class);
- }
- public function handle()
- {
- sleep(10+rand(0,10));
- $this->disposeHeader();
- sleep(rand(0,10));
- $this->disposeDetail();
- }
- private function disposeHeader($date = null)
- {
- DB::transaction(function ()use($date){
- //获取更新时间与WMS数据
- if (!$date){
- $valueStore = ValueStore::query()->where("name","wave_last_sync_date")->first();
- $date = $valueStore->value ?? Carbon::now()->subSeconds(65)->toDateTimeString();
- }
- $count = DB::connection("oracle")->selectOne(DB::raw("SELECT COUNT(*) count FROM DOC_WAVE_HEADER WHERE EDITTIME >= TO_DATE(?,'yyyy-mm-dd hh24:mi:ss')"),[$date]);
- if ($count->count > 500){
- $query = DB::raw(<<<sql
- SELECT * FROM (SELECT header.*, ROWNUM AS rowno FROM (
- SELECT * FROM DOC_WAVE_HEADER WHERE EDITTIME >= TO_DATE(?,'yyyy-mm-dd hh24:mi:ss')
- ORDER BY EDITTIME)header WHERE ROWNUM <= 500)wave WHERE wave.rowno >= 0
- sql
- );
- $waves = DB::connection("oracle")->select($query,[$date]);
- $this->headerExe($waves);
- $this->disposeHeader($waves[count($waves)-1]->edittime);
- }else{
- $waves = $this->service->get(["edittime"=>$date],["edittime"=>"gtOrEqual"]);
- $this->headerExe($waves);
- }
- });
- }
- private function headerExe($waves)
- {
- if (count($waves) < 1)return;
- //获取本地数据对比差异
- $codes = [];
- foreach ($waves as $wave){
- $codes[] = $wave->waveno;
- }
- $map = [];
- $batches = $this->batchService->get(["code"=>$codes]);
- if ($batches){
- foreach ($batches as $index=>$batch)$map[$batch->code] = $index;
- }
- $update = [["id","wms_status","wms_type","updated_at"]];
- $insert = [];
- foreach ($waves as $wave){
- if (isset($map[$wave->waveno])){
- $bat = $batches[$map[$wave->waveno]];
- $wms_status = $this->wms_status($wave);
- if ($bat->wms_status != $wms_status || $bat->wms_type != $wave->descr){
- $update[] = [
- "id" => $bat->id,
- "wms_status" => $this->wms_status($wave),
- "wms_type"=>$wave->descr,
- "updated_at"=>$wave->edittime,
- ];
- }
- continue;
- }
- $owner = app("OwnerService")->codeGetOwner($wave->customerid);
- $insert[] = [
- "code" => $wave->waveno,
- "wms_type"=>$wave->descr,
- "status" => '未处理',
- "wms_status" => $this->wms_status($wave),
- "created_at"=>$wave->addtime,
- "updated_at"=>$wave->edittime,
- "owner_id"=>$owner->id,
- ];
- }
- //存在则更新
- if (count($update)>1){
- $bool = app(BatchUpdateService::class)->batchUpdate("batches",$update);
- if ($bool!==false)LogService::log(__METHOD__,"SUCCESS-同步更新波次成功",json_encode($update));
- else LogService::log(__METHOD__,"ERROR-同步更新波次失败",json_encode($update));
- }
- //不存在则录入
- if ($insert){
- $this->batchService->insert($insert);
- LogService::log(__METHOD__,"SUCCESS-同步插入波次成功",json_encode($insert));
- }
- ValueStore::query()->where("name","wave_last_sync_date")->update(["value"=>$waves[count($waves)-1]->edittime]);
- }
- public function disposeDetail($date = null)
- {
- DB::transaction(function ()use($date){
- if (!$date){
- $valueStore = ValueStore::query()->where("name","wave_detail_last_sync_date")->lockForUpdate()->first();
- $date = $valueStore->value ?? Carbon::now()->subSeconds(65)->toDateTimeString();
- }
- $count = DB::connection("oracle")->selectOne(DB::raw("SELECT count(*) count FROM DOC_WAVE_DETAILS WHERE EDITTIME >= TO_DATE(?,'yyyy-mm-dd hh24:mi:ss')"),[$date]);
- if ($count->count > 1000){
- $sql = <<<sql
- SELECT * FROM (SELECT ORDERNO,WAVENO,SEQNO,EDITTIME, ROWNUM AS rowno FROM (
- SELECT * FROM DOC_WAVE_DETAILS WHERE EDITTIME >= TO_DATE(?,'yyyy-mm-dd hh24:mi:ss')
- ORDER BY EDITTIME) WHERE ROWNUM <= 1000)wave WHERE wave.rowno >= 0
- sql;
- $details = DB::connection("oracle")->select(DB::raw($sql),[$date]);
- $this->detailExe($details);
- $this->disposeDetail($details[count($details)-1]->edittime);
- }else{
- $sql = "SELECT ORDERNO,WAVENO,SEQNO,EDITTIME FROM DOC_WAVE_DETAILS WHERE EDITTIME >= TO_DATE(?,'yyyy-mm-dd hh24:mi:ss')";
- $details = DB::connection("oracle")->select(DB::raw($sql),[$date]);
- $this->detailExe($details);
- }
- });
- }
- private function detailExe($details)
- {
- if (count($details) < 1)return;
- $map = [];
- $nos = [];
- $orderCodes = [];
- $seqnos = [];
- $batchMapping = [];
- foreach ($details as $detail){
- if (isset($map[$detail->waveno]))$map[$detail->waveno][] = $detail->orderno;
- else {
- $map[$detail->waveno] = [$detail->orderno];
- $nos[] = $detail->waveno;
- }
- $orderCodes[] = $detail->orderno;
- $seqnos[$detail->orderno] = $detail->seqno;
- $batchMapping[$detail->orderno] = $detail->waveno;
- }
- $orders = Order::query()->select("id","batch_id","code")->whereIn("code",$orderCodes)->get();
- if (count($orderCodes) != count($orders))LogService::log(__CLASS__,"格口号-订单存在差异",json_encode($orderCodes));
- $existOrder = [];
- if ($orders){
- $orderIds = [];
- $orderMap = [];
- foreach ($orders as $order){
- $orderIds[] = $order->id;
- $orderMap[$order->id] = $seqnos[$order->code];
- $existOrder[$order->code] = $order->batch_id;
- }
- $updateBin = [["id","number"]];
- $insertBin = [];
- $deleteBin = [];
- $orderBins = OrderBin::query()->select("id","order_id","number")->whereIn("order_id",$orderIds)->get();
- foreach ($orderBins as $orderBin){
- if (!isset($orderMap[$orderBin->order_id])){
- $deleteBin[] = $orderBin->id;
- continue;
- }
- if ($orderBin->number != $orderMap[$orderBin->order_id])$updateBin[]=["id"=>$orderBin->id,"number"=>$orderMap[$orderBin->order_id]];
- unset($orderMap[$orderBin->order_id]);
- }
- $d = date('Y-m-d H:i:s');
- foreach ($orderMap as $orderId=>$binNumber){
- $insertBin[]=[
- "order_id"=>$orderId,
- "number"=>$binNumber,
- 'created_at' => $d
- ];
- }
- if (count($updateBin)>1){
- app(BatchUpdateService::class)->batchUpdate("order_bins",$updateBin);
- LogService::log(__METHOD__,"波次同步-更新订单格口号",json_encode($updateBin));
- }
- if ($insertBin){
- OrderBin::query()->insert($insertBin);
- LogService::log(__METHOD__,"波次同步-录入订单格口号",json_encode($insertBin));
- }
- if ($deleteBin){
- OrderBin::destroy($deleteBin);
- LogService::log(__METHOD__,"波次同步-删除订单格口号",json_encode($deleteBin));
- }
- }
- $batches = $this->batchService->get(["code"=>$nos]);
- $batchDiff = array_keys(array_flip(array_diff($nos,array_column($batches->toArray(),"code"))));
- if (count($batchDiff)>0){
- $sql = <<<sql
- SELECT * FROM DOC_WAVE_HEADER WHERE WAVENO IN (''
- sql;
- foreach ($batchDiff as $bd)$sql .= ",'".$bd."'";
- $sql .= ')';
- $wmsBatches = DB::connection("oracle")->select(DB::raw($sql));
- $this->headerExe($wmsBatches);
- $batches = $this->batchService->get(["code"=>$nos]);
- }
- $updateOrder = [["code","batch_id"]];
- $updatingBatches = [];
- foreach ($batches as $batch){
- $mark = false;
- foreach ($map[$batch->code] as $on){
- if ((!isset($existOrder[$on])) || $existOrder[$on]!=$batch->id){
- $updateOrder[] = [
- "code"=>$on,
- "batch_id"=>$batch->id
- ];
- $mark = true;
- }
- }
- if ($mark)$updatingBatches[] = $batch;
- unset($map[$batch->code]);
- }
- if (count($updateOrder)>1){
- app("OrderService")->batchUpdate($updateOrder);//反向修改订单
- LogService::log(__METHOD__,"波次同步-修改订单波次号",json_encode($updateOrder));
- app("BatchService")->checkBatchOrderInfo($updatingBatches);
- LogService::log(__METHOD__,"波次注册一入口",json_encode($updatingBatches));
- BatchTaskJob::dispatch($updatingBatches); //在这里为波次注册队列任务!
- BroadcastBatchToZhengCangJob::dispatch($updatingBatches); //在这里为波次注册祯仓推送任务!
- }
- if ($map){
- $waveCodes = array_keys($map);
- $waves = $this->service->get(["waveno"=>$waveCodes],["waveno"=>"in"]);
- $insert = [];
- foreach ($waves as $wave){
- $owner = app("OwnerService")->codeGetOwner($wave->customerid);
- $insert[] = [
- "code" => $wave->waveno,
- "status" => '未处理',
- "wms_status" => $this->wms_status($wave),
- "wms_type"=>$wave->descr,
- "created_at"=>$wave->addtime,
- "updated_at"=>$wave->edittime,
- "owner_id"=>$owner->id,
- ];
- }
- if ($insert){
- $this->batchService->insert($insert);
- LogService::log(__METHOD__,"SUCCESS-同步插入波次成功",json_encode($insert));
- $batches = $this->batchService->get(["code"=>$waveCodes]);
- foreach ($batches as $batch){
- app("OrderService")->update(["code"=>$map[$batch->code]],["batch_id"=>$batch->id]);
- }
- app("BatchService")->checkBatchOrderInfo($batches);
- LogService::log(__METHOD__,"波次注册二入口",json_encode($batches));
- BatchTaskJob::dispatch($batches); //在这里为波次注册队列任务!
- BroadcastBatchToZhengCangJob::dispatch($batches); //在这里为波次注册祯仓推送任务!
- }
- }
- ValueStore::query()->where("name","wave_detail_last_sync_date")->update(["value"=>$details[count($details)-1]->edittime]);
- }
- /**
- * @param $wave
- * @return string
- */
- public function wms_status($wave): string
- {
- switch ($wave->wavestatus) {
- case 00:
- $wms_status = '创建';
- break;
- case 40:
- $wms_status = '部分收货';
- break;
- case 90:
- $wms_status = '取消';
- break;
- case 99:
- $wms_status = '完成';
- break;
- case 62:
- $wms_status = '部分装箱';
- break;
- default:
- $wms_status = (string)$wave->wavestatus;
- }
- return $wms_status;
- }
- }
|