SyncBatchTask.php 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314
  1. <?php
  2. namespace App\Console\Commands;
  3. use App\Exceptions\ErrorException;
  4. use App\Exceptions\Exception;
  5. use App\Jobs\BatchTaskJob;
  6. use App\Order;
  7. use App\OrderBin;
  8. use App\Services\BatchService;
  9. use App\Services\common\BatchUpdateService;
  10. use App\Services\DocWaveHeaderService;
  11. use App\Services\LogService;
  12. use App\ValueStore;
  13. use Carbon\Carbon;
  14. use Illuminate\Console\Command;
  15. use Illuminate\Support\Facades\DB;
  16. class SyncBatchTask extends Command
  17. {
  18. protected $signature = 'sync:batch';
  19. protected $description = 'sync wms batch task';
  20. /** @var DocWaveHeaderService $service */
  21. private $service;
  22. /** @var BatchService $batchService */
  23. private $batchService;
  24. public function __construct()
  25. {
  26. parent::__construct();
  27. $this->service = app(DocWaveHeaderService::class);
  28. $this->batchService = app(BatchService::class);
  29. }
  30. public function handle()
  31. {
  32. sleep(10+rand(0,10));
  33. $this->disposeHeader();
  34. sleep(rand(0,10));
  35. $this->disposeDetail();
  36. }
  37. private function disposeHeader($date = null)
  38. {
  39. DB::transaction(function ()use($date){
  40. //获取更新时间与WMS数据
  41. if (!$date){
  42. $valueStore = ValueStore::query()->where("name","wave_last_sync_date")->lockForUpdate()->first();
  43. $date = $valueStore->value ?? Carbon::now()->subSeconds(65)->toDateTimeString();
  44. }
  45. $count = DB::connection("oracle")->selectOne(DB::raw("SELECT COUNT(*) count FROM DOC_WAVE_HEADER WHERE EDITTIME >= TO_DATE(?,'yyyy-mm-dd hh24:mi:ss')"),[$date]);
  46. if ($count->count > 500){
  47. $query = DB::raw(<<<sql
  48. SELECT * FROM (SELECT header.*, ROWNUM AS rowno FROM (
  49. SELECT * FROM DOC_WAVE_HEADER WHERE EDITTIME >= TO_DATE(?,'yyyy-mm-dd hh24:mi:ss')
  50. ORDER BY EDITTIME)header WHERE ROWNUM <= 500)wave WHERE wave.rowno >= 0
  51. sql
  52. );
  53. $waves = DB::connection("oracle")->select($query,[$date]);
  54. $this->headerExe($waves);
  55. $this->disposeHeader($waves[count($waves)-1]->edittime);
  56. }else{
  57. $waves = $this->service->get(["edittime"=>$date],["edittime"=>"gtOrEqual"]);
  58. $this->headerExe($waves);
  59. }
  60. });
  61. }
  62. private function headerExe($waves)
  63. {
  64. if (count($waves) < 1)return;
  65. //获取本地数据对比差异
  66. $codes = [];
  67. foreach ($waves as $wave){
  68. $codes[] = $wave->waveno;
  69. }
  70. $map = [];
  71. $batches = $this->batchService->get(["code"=>$codes]);
  72. if ($batches){
  73. foreach ($batches as $index=>$batch)$map[$batch->code] = $index;
  74. }
  75. $update = [["id","wms_status","wms_type","updated_at"]];
  76. $insert = [];
  77. foreach ($waves as $wave){
  78. if (isset($map[$wave->waveno])){
  79. $bat = $batches[$map[$wave->waveno]];
  80. $wms_status = $this->wms_status($wave);
  81. if ($bat->wms_status != $wms_status || $bat->wms_type != $wave->descr){
  82. $update[] = [
  83. "id" => $bat->id,
  84. "wms_status" => $this->wms_status($wave),
  85. "wms_type"=>$wave->descr,
  86. "updated_at"=>$wave->edittime,
  87. ];
  88. }
  89. continue;
  90. }
  91. $owner = app("OwnerService")->codeGetOwner($wave->customerid);
  92. $insert[] = [
  93. "code" => $wave->waveno,
  94. "wms_type"=>$wave->descr,
  95. "status" => '未处理',
  96. "wms_status" => $this->wms_status($wave),
  97. "created_at"=>$wave->addtime,
  98. "updated_at"=>$wave->edittime,
  99. "owner_id"=>$owner->id,
  100. ];
  101. }
  102. //存在则更新
  103. if (count($update)>1){
  104. $bool = app(BatchUpdateService::class)->batchUpdate("batches",$update);
  105. if ($bool!==false)LogService::log(__METHOD__,"SUCCESS-同步更新波次成功",json_encode($update));
  106. else LogService::log(__METHOD__,"ERROR-同步更新波次失败",json_encode($update));
  107. }
  108. //不存在则录入
  109. if ($insert){
  110. $this->batchService->insert($insert);
  111. LogService::log(__METHOD__,"SUCCESS-同步插入波次成功",json_encode($insert));
  112. }
  113. ValueStore::query()->where("name","wave_last_sync_date")->update(["value"=>$waves[count($waves)-1]->edittime]);
  114. }
  115. public function disposeDetail($date = null)
  116. {
  117. DB::transaction(function ()use($date){
  118. if (!$date){
  119. $valueStore = ValueStore::query()->where("name","wave_detail_last_sync_date")->lockForUpdate()->first();
  120. $date = $valueStore->value ?? Carbon::now()->subSeconds(65)->toDateTimeString();
  121. }
  122. $count = DB::connection("oracle")->selectOne(DB::raw("SELECT count(*) count FROM DOC_WAVE_DETAILS WHERE EDITTIME >= TO_DATE(?,'yyyy-mm-dd hh24:mi:ss')"),[$date]);
  123. if ($count->count > 1000){
  124. $sql = <<<sql
  125. SELECT * FROM (SELECT ORDERNO,WAVENO,SEQNO,EDITTIME, ROWNUM AS rowno FROM (
  126. SELECT * FROM DOC_WAVE_DETAILS WHERE EDITTIME >= TO_DATE(?,'yyyy-mm-dd hh24:mi:ss')
  127. ORDER BY EDITTIME) WHERE ROWNUM <= 1000)wave WHERE wave.rowno >= 0
  128. sql;
  129. $details = DB::connection("oracle")->select(DB::raw($sql),[$date]);
  130. $this->detailExe($details);
  131. $this->disposeDetail($details[count($details)-1]->edittime);
  132. }else{
  133. $sql = "SELECT ORDERNO,WAVENO,SEQNO,EDITTIME FROM DOC_WAVE_DETAILS WHERE EDITTIME >= TO_DATE(?,'yyyy-mm-dd hh24:mi:ss')";
  134. $details = DB::connection("oracle")->select(DB::raw($sql),[$date]);
  135. $this->detailExe($details);
  136. }
  137. });
  138. }
  139. private function detailExe($details)
  140. {
  141. if (count($details) < 1)return;
  142. $map = [];
  143. $nos = [];
  144. $ods = [];
  145. $orderCodes = [];
  146. $seqnos = [];
  147. $batchMapping = [];
  148. foreach ($details as $detail){
  149. if (isset($map[$detail->waveno]))$map[$detail->waveno][] = $detail->orderno;
  150. else {
  151. $map[$detail->waveno] = [$detail->orderno];
  152. $nos[] = $detail->waveno;
  153. $ods[] = $detail->orderno;
  154. }
  155. $orderCodes[] = $detail->orderno;
  156. $seqnos[$detail->orderno] = $detail->seqno;
  157. $batchMapping[$detail->orderno] = $detail->waveno;
  158. }
  159. $orders = Order::query()->select("id","batch_id","code")->whereIn("code",$orderCodes)->get();
  160. if (count($orderCodes) != count($orders))LogService::log(__CLASS__,"格口号-订单存在差异",json_encode($orderCodes));
  161. if ($orders){
  162. $orderIds = [];
  163. $orderMap = [];
  164. foreach ($orders as $order){
  165. $orderIds[] = $order->id;
  166. $orderMap[$order->id] = $seqnos[$order->code];
  167. }
  168. $updateBin = [["id","number"]];
  169. $insertBin = [];
  170. $deleteBin = [];
  171. $orderBins = OrderBin::query()->select("id","order_id","number")->whereIn("order_id",$orderIds)->get();
  172. foreach ($orderBins as $orderBin){
  173. if (!isset($orderMap[$orderBin->order_id])){
  174. $deleteBin[] = $orderBin->id;
  175. continue;
  176. }
  177. if ($orderBin->number != $orderMap[$orderBin->order_id])$updateBin[]=["id"=>$orderBin->id,"number"=>$orderMap[$orderBin->order_id]];
  178. unset($orderMap[$orderBin->order_id]);
  179. }
  180. $d = date('Y-m-d H:i:s');
  181. foreach ($orderMap as $orderId=>$binNumber){
  182. $insertBin[]=[
  183. "order_id"=>$orderId,
  184. "number"=>$binNumber,
  185. 'created_at' => $d
  186. ];
  187. }
  188. if (count($updateBin)>1){
  189. app(BatchUpdateService::class)->batchUpdate("order_bins",$updateBin);
  190. LogService::log(__METHOD__,"波次同步-更新订单格口号",json_encode($updateBin));
  191. }
  192. if ($insertBin){
  193. OrderBin::query()->insert($insertBin);
  194. LogService::log(__METHOD__,"波次同步-录入订单格口号",json_encode($insertBin));
  195. }
  196. if ($deleteBin){
  197. OrderBin::destroy($deleteBin);
  198. LogService::log(__METHOD__,"波次同步-删除订单格口号",json_encode($deleteBin));
  199. }
  200. }
  201. $batches = $this->batchService->get(["code"=>$nos]);
  202. $batchDiff = array_keys(array_flip(array_diff($nos,array_column($batches->toArray(),"code"))));
  203. if (count($batchDiff)>0){
  204. $sql = <<<sql
  205. SELECT * FROM DOC_WAVE_HEADER WHERE WAVENO IN (''
  206. sql;
  207. foreach ($batchDiff as $bd)$sql .= ",'".$bd."'";
  208. $sql .= ')';
  209. $wmsBatches = DB::connection("oracle")->select(DB::raw($sql));
  210. $this->headerExe($wmsBatches);
  211. $batches = $this->batchService->get(["code"=>$nos]);
  212. }
  213. $updateOrder = [["code","batch_id"]];
  214. $existOrder = [];
  215. $updatingBatches = [];
  216. foreach (Order::query()->select("code","batch_id")->whereNotNull("batch_id")->whereIn("code",$ods)->get() as $item){
  217. $existOrder[$item->code] = $item->batch_id;
  218. }
  219. foreach ($batches as $batch){
  220. $mark = false;
  221. foreach ($map[$batch->code] as $on){
  222. if (!isset($existOrder[$on]) || $existOrder[$on]!=$batch->id){
  223. $updateOrder[] = [
  224. "code"=>$on,
  225. "batch_id"=>$batch->id
  226. ];
  227. $mark = true;
  228. }
  229. }
  230. if ($mark)$updatingBatches[] = $batch;
  231. unset($map[$batch->code]);
  232. }
  233. if (count($updateOrder)>1){
  234. app("OrderService")->batchUpdate($updateOrder);//反向修改订单
  235. LogService::log(__METHOD__,"波次同步-修改订单波次号",json_encode($updateOrder));
  236. app("BatchService")->checkBatchOrderInfo($updatingBatches);
  237. LogService::log(__METHOD__,"修改过的波次_",json_encode($updatingBatches));
  238. BatchTaskJob::dispatch($updatingBatches); //在这里为波次注册队列任务!
  239. }
  240. if ($map){
  241. $waveCodes = array_keys($map);
  242. $waves = $this->service->get(["waveno"=>$waveCodes],["waveno"=>"in"]);
  243. $insert = [];
  244. foreach ($waves as $wave){
  245. $owner = app("OwnerService")->codeGetOwner($wave->customerid);
  246. $insert[] = [
  247. "code" => $wave->waveno,
  248. "status" => '未处理',
  249. "wms_status" => $this->wms_status($wave),
  250. "wms_type"=>$wave->descr,
  251. "created_at"=>$wave->addtime,
  252. "updated_at"=>$wave->edittime,
  253. "owner_id"=>$owner->id,
  254. ];
  255. }
  256. if ($insert){
  257. $this->batchService->insert($insert);
  258. LogService::log(__METHOD__,"SUCCESS-同步插入波次成功",json_encode($insert));
  259. $batches = $this->batchService->get(["code"=>$waveCodes]);
  260. foreach ($batches as $batch){
  261. app("OrderService")->update(["code"=>$map[$batch->code]],["batch_id"=>$batch->id]);
  262. }
  263. app("BatchService")->checkBatchOrderInfo($batches);
  264. BatchTaskJob::dispatch($batches); //在这里为波次注册队列任务!
  265. }
  266. }
  267. ValueStore::query()->where("name","wave_detail_last_sync_date")->update(["value"=>$details[count($details)-1]->edittime]);
  268. }
  269. /**
  270. * @param $wave
  271. * @return string
  272. */
  273. public function wms_status($wave): string
  274. {
  275. switch ($wave->wavestatus) {
  276. case 00:
  277. $wms_status = '创建';
  278. break;
  279. case 40:
  280. $wms_status = '部分收货';
  281. break;
  282. case 90:
  283. $wms_status = '取消';
  284. break;
  285. case 99:
  286. $wms_status = '完成';
  287. break;
  288. case 62:
  289. $wms_status = '部分装箱';
  290. break;
  291. default:
  292. $wms_status = (string)$wave->wavestatus;
  293. }
  294. return $wms_status;
  295. }
  296. }