erShouldBeQueued($class)) { return $this->createQueuedHandlerCallable($class, $method); } $listener = $this->container->make($class); return $this->handlerShouldBeDispatchedAfterDatabaseTransactions($listener) ? $this->createCallbackForListenerRunningAfterCommits($listener, $method) : [$listener, $method]; } /** * Parse the class listener into class and method. * * @param string $listener * @return array */ protected function parseClassCallable($listener) { return Str::parseCallback($listener, 'handle'); } /** * Determine if the event handler class should be queued. * * @param string $class * @return bool */ protected function handlerShouldBeQueued($class) { try { return (new ReflectionClass($class))->implementsInterface( ShouldQueue::class ); } catch (Exception $e) { return false; } } /** * Create a callable for putting an event handler on the queue. * * @param string $class * @param string $method * @return \Closure */ protected function createQueuedHandlerCallable($class, $method) { return function () use ($class, $method) { $arguments = array_map(function ($a) { return is_object($a) ? clone $a : $a; }, func_get_args()); if ($this->handlerWantsToBeQueued($class, $arguments)) { $this->queueHandler($class, $method, $arguments); } }; } /** * Determine if the given event handler should be dispatched after all database transactions have committed. * * @param object|mixed $listener * @return bool */ protected function handlerShouldBeDispatchedAfterDatabaseTransactions($listener) { return ($listener->afterCommit ?? null) && $this->container->bound('db.transactions'); } /** * Create a callable for dispatching a listener after database transactions. * * @param mixed $listener * @param string $method * @return \Closure */ protected function createCallbackForListenerRunningAfterCommits($listener, $method) { return function () use ($method, $listener) { $payload = func_get_args(); $this->container->make('db.transactions')->addCallback( function () use ($listener, $method, $payload) { $listener->$method(...$payload); } ); }; } /** * Determine if the event handler wants to be queued. * * @param string $class * @param array $arguments * @return bool */ protected function handlerWantsToBeQueued($class, $arguments) { $instance = $this->container->make($class); if (method_exists($instance, 'shouldQueue')) { return $instance->shouldQueue($arguments[0]); } return true; } /** * Queue the handler class. * * @param string $class * @param string $method * @param array $arguments * @return void */ protected function queueHandler($class, $method, $arguments) { [$listener, $job] = $this->createListenerAndJob($class, $method, $arguments); $connection = $this->resolveQueue()->connection(method_exists($listener, 'viaConnection') ? $listener->viaConnection() : $listener->connection ?? null); $queue = method_exists($listener, 'viaQueue') ? $listener->viaQueue() : $listener->queue ?? null; isset($listener->delay) ? $connection->laterOn($queue, $listener->delay, $job) : $connection->pushOn($queue, $job); } /** * Create the listener and job for a queued listener. * * @param string $class * @param string $method * @param array $arguments * @return array */ protected function createListenerAndJob($class, $method, $arguments) { $listener = (new ReflectionClass($class))->newInstanceWithoutConstructor(); return [$listener, $this->propagateListenerOptions( $listener, new CallQueuedListener($class, $method, $arguments) )]; } /** * Propagate listener options to the job. * * @param mixed $listener * @param mixed $job * @return mixed */ protected function propagateListenerOptions($listener, $job) { return tap($job, function ($job) use ($listener) { $job->afterCommit = property_exists($listener, 'afterCommit') ? $listener->afterCommit : null; $job->backoff = method_exists($listener, 'backoff') ? $listener->backoff() : ($listener->backoff ?? null); $job->maxExceptions = $listener->maxExceptions ?? null; $job->retryUntil = method_exists($listener, 'retryUntil') ? $listener->retryUntil() : null; $job->shouldBeEncrypted = $listener instanceof ShouldBeEncrypted; $job->timeout = $listener->timeout ?? null; $job->tries = $listener->tries ?? null; $job->through(array_merge( method_exists($listener, 'middleware') ? $listener->middleware() : [], $listener->middleware ?? [] )); }); } /** * Remove a set of listeners from the dispatcher. * * @param string $event * @return void */ public function forget($event) { if (Str::contains($event, '*')) { unset($this->wildcards[$event]); } else { unset($this->listeners[$event]); } foreach ($this->wildcardsCache as $key => $listeners) { if (Str::is($event, $key)) { unset($this->wildcardsCache[$key]); } } } /** * Forget all of the pushed listeners. * * @return void */ public function forgetPushed() { foreach ($this->listeners as $key => $value) { if (Str::endsWith($key, '_pushed')) { $this->forget($key); } } } /** * Get the queue implementation from the resolver. * * @return \Illuminate\Contracts\Queue\Queue */ protected function resolveQueue() { return call_user_func($this->queueResolver); } /** * Set the queue resolver implementation. * * @param callable $resolver * @return $this */ public function setQueueResolver(callable $resolver) { $this->queueResolver = $resolver; return $this; } }