Skip to content

Commit a4b227c

Browse files
authored
Update Thread and Worker!
1 parent 83684da commit a4b227c

File tree

6 files changed

+210
-52
lines changed

6 files changed

+210
-52
lines changed

src/vennv/vapm/ClosureThread.php

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
namespace vennv\vapm;
2525

26+
use Generator;
2627
use function call_user_func;
2728
use function is_array;
2829
use function is_bool;
@@ -49,31 +50,42 @@ final class ClosureThread extends Thread implements ClosureThreadInterface
4950

5051
private mixed $callback;
5152

52-
public function __construct(callable $callback)
53+
/**
54+
* @var array<int|float|array|object|null, mixed>
55+
* @phpstan-var array<int|float|array|object|null, mixed>
56+
*/
57+
private array $argsCallback = [];
58+
59+
/**
60+
* @param callable $callback
61+
* @param array<int|float|array|object|null, mixed> $args
62+
*/
63+
public function __construct(callable $callback, array $args = [])
5364
{
5465
$this->callback = $callback;
55-
parent::__construct($callback);
66+
$this->argsCallback = $args;
67+
parent::__construct($callback, $args);
5668
}
5769

5870
public function onRun(): void
5971
{
6072
if (is_callable($this->callback)) {
61-
$callback = call_user_func($this->callback);
62-
if ($callback instanceof \Generator) {
63-
$callback = function () use ($callback): \Generator {
73+
$callback = call_user_func($this->callback, ...$this->argsCallback);
74+
if ($callback instanceof Generator) {
75+
$callback = function () use ($callback): Generator {
6476
yield from $callback;
6577
};
66-
$callback = call_user_func($callback);
78+
$callback = call_user_func($callback, ...$this->argsCallback);
6779
}
6880
if (is_array($callback)) {
6981
$callback = json_encode($callback);
70-
} elseif (is_object($callback) && !$callback instanceof \Generator) {
82+
} elseif (is_object($callback) && !$callback instanceof Generator) {
7183
$callback = json_encode($callback);
7284
} elseif (is_bool($callback)) {
7385
$callback = $callback ? 'true' : 'false';
7486
} elseif (is_null($callback)) {
7587
$callback = 'null';
76-
} elseif ($callback instanceof \Generator) {
88+
} elseif ($callback instanceof Generator) {
7789
$callback = json_encode(iterator_to_array($callback));
7890
} else {
7991
$callback = (string)$callback;

src/vennv/vapm/CoroutineGen.php

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,6 @@ public static function runNonBlocking(mixed ...$coroutines): void
117117
public static function runBlocking(mixed ...$coroutines): void
118118
{
119119
self::runNonBlocking(...$coroutines);
120-
121120
$gc = new GarbageCollection();
122121
while (!self::$taskQueue?->isEmpty()) {
123122
self::run();

src/vennv/vapm/Thread.php

Lines changed: 76 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,10 @@
2828
use ReflectionClass;
2929
use ReflectionException;
3030
use RuntimeException;
31+
use InvalidArgumentException;
3132
use Throwable;
32-
use function explode;
33+
use function strlen;
34+
use function rtrim;
3335
use function fclose;
3436
use function fwrite;
3537
use function get_called_class;
@@ -207,19 +209,36 @@ abstract class Thread implements ThreadInterface, ThreadedInterface
207209
private static array $threads = [];
208210

209211
/**
210-
* @var array<string, mixed>
211-
* @phpstan-var array<string, mixed>
212+
* @var array<int|string, mixed>
213+
* @phpstan-var array<int|string, mixed>
212214
*/
213215
private static array $inputs = [];
214216

215-
public function __construct(mixed $input = '')
217+
/**
218+
* @var array<int, mixed>
219+
* @phpstan-var array<int, mixed>
220+
*/
221+
private static array $args = [];
222+
223+
/**
224+
* @param mixed $input
225+
* @param array<int, mixed> $args
226+
* @phpstan-param array<int, mixed> $args
227+
*/
228+
public function __construct(mixed $input = '', array $args = [])
216229
{
217-
self::$inputs[get_called_class()] = $input;
230+
self::$inputs[$this->getCalledClassId()] = $input;
231+
self::$args[$this->getCalledClassId()] = $args;
232+
}
233+
234+
private function getCalledClassId(): int
235+
{
236+
return spl_object_id($this);
218237
}
219238

220239
public function getInput(): mixed
221240
{
222-
return self::$inputs[get_called_class()];
241+
return self::$inputs[$this->getCalledClassId()];
223242
}
224243

225244
public function getPid(): int
@@ -295,6 +314,10 @@ public static function addShared(string $key, mixed $value): void
295314
self::$shared[$key] = $value;
296315
}
297316

317+
/**
318+
* @return array<int|string, mixed>
319+
* @phpstan-return array<int|string, mixed>
320+
*/
298321
public static function getSharedData(): array
299322
{
300323
$data = fgets(STDIN);
@@ -371,6 +394,8 @@ abstract public function onRun(): void;
371394
public function start(array $mode = DescriptorSpec::BASIC): Promise
372395
{
373396
return new Promise(function ($resolve, $reject) use ($mode): mixed {
397+
$idCall = $this->getCalledClassId();
398+
374399
$className = get_called_class();
375400

376401
$reflection = new ReflectionClass($className);
@@ -384,44 +409,64 @@ public function start(array $mode = DescriptorSpec::BASIC): Promise
384409
$pathAutoLoad
385410
);
386411

387-
$input = self::$inputs[get_called_class()];
412+
$input = self::$inputs[$idCall];
388413

389-
if (is_string($input)) $input = '\'' . self::$inputs[get_called_class()] . '\'';
414+
if (is_string($input)) $input = '\'' . self::$inputs[$idCall] . '\'';
390415

391416
if (is_callable($input) && $input instanceof Closure) {
392-
$input = Utils::closureToString($input);
393-
$input = Utils::removeComments($input);
394-
395-
if (!is_string($input)) return $reject(new RuntimeException(Error::INPUT_MUST_BE_STRING_OR_CALLABLE));
396-
397-
$input = Utils::outlineToInline($input);
417+
try {
418+
$input = Utils::closureToStringSafe($input);
419+
} catch (Throwable $e) {
420+
return $reject(new ThreadException($e->getMessage()));
421+
}
422+
}
398423

399-
if (!is_string($input)) return $reject(new RuntimeException(Error::INPUT_MUST_BE_STRING_OR_CALLABLE));
424+
if (!is_string($input)) return $reject(new RuntimeException(Error::INPUT_MUST_BE_STRING_OR_CALLABLE));
400425

401-
$input = Utils::fixInputCommand($input);
426+
$args = self::$args[$idCall];
402427

403-
if (!is_string($input)) return $reject(new RuntimeException(Error::INPUT_MUST_BE_STRING_OR_CALLABLE));
428+
if (is_array($args)) {
429+
foreach ($args as $key => $arg) {
430+
$tryToString = Utils::toStringAny($arg);
431+
$args[$key] = array_values($tryToString)[0];
432+
FiberManager::wait();
433+
}
434+
} else {
435+
throw new InvalidArgumentException('Expected $args to be an array or Traversable.');
404436
}
405437

406-
if (!is_string($input)) return $reject(new RuntimeException(Error::INPUT_MUST_BE_STRING_OR_CALLABLE));
438+
$args = '[' . implode(', ', array_map(function($item) { return '' . $item . ''; }, $args)) . ']';
439+
$args = str_replace('"', '\'', $args);
407440

408-
$command = PHP_BINARY . ' -r "require_once \'' . $pathAutoLoad . '\'; include \'' . $class . '\'; $input = ' . $input . '; $class = new ' . static::class . '($input); $class->onRun();"';
409-
410-
unset(self::$inputs[get_called_class()]);
441+
$command = PHP_BINARY . ' -r "require_once \'' . $pathAutoLoad . '\'; include \'' . $class . '\'; $input = ' . $input . '; $args = ' . $args . '; $class = new ' . static::class . '($input, $args); $class->onRun();"';
442+
443+
unset(self::$inputs[$idCall]);
444+
unset(self::$args[$idCall]);
411445

412446
$process = proc_open($command, $mode, $pipes);
413447

448+
$timeStart = microtime(true);
449+
while (microtime(true) - $timeStart <= 1) {
450+
FiberManager::wait(); // wait for 1 second
451+
}
452+
414453
$output = '';
415454
$error = '';
416-
if (is_resource($process)) {
417-
$data = json_encode(self::getDataMainThread());
418-
419-
if (is_string($data)) fwrite($pipes[0], $data);
420-
fclose($pipes[0]);
421455

456+
if (is_resource($process)) {
457+
stream_set_blocking($pipes[0], false);
422458
stream_set_blocking($pipes[1], false);
423459
stream_set_blocking($pipes[2], false);
424460

461+
stream_set_write_buffer($pipes[0], 0);
462+
stream_set_write_buffer($pipes[1], 0);
463+
stream_set_write_buffer($pipes[2], 0);
464+
465+
$data = json_encode(self::getDataMainThread());
466+
467+
if (is_string($data)) @fwrite($pipes[0], $data);
468+
@fclose($pipes[0]);
469+
425470
$status = proc_get_status($process);
426471
$pid = $status['pid'];
427472
if (!isset(self::$threads[$pid])) {
@@ -446,16 +491,15 @@ public function start(array $mode = DescriptorSpec::BASIC): Promise
446491
$write = null;
447492
$except = null;
448493
$timeout = 0;
449-
450494
$n = stream_select($read, $write, $except, $timeout);
451495
if ($n === false) break;
452496
if ($n > 0) {
453497
foreach ($read as $stream) {
454498
if (!feof($stream)) {
455-
stream_set_blocking($stream, false);
456-
$data = stream_get_contents($stream, 1024);
457-
if ($data === false || $data === '') continue;
458-
$stream === $pipes[1] ? $output .= $data : $error .= $data;
499+
$data = stream_get_contents($stream);
500+
if ($data !== '') {
501+
$stream === $pipes[1] ? $output .= $data : $error .= $data;
502+
}
459503
}
460504
FiberManager::wait();
461505
}
@@ -483,7 +527,7 @@ public function start(array $mode = DescriptorSpec::BASIC): Promise
483527
} else {
484528
if ($output !== '' && self::isPostMainThread($output)) self::loadSharedData($output);
485529
elseif ($output !== '' && self::isPostThread($output)) {
486-
$output = Utils::getStringAfterSign($output, self::POST_THREAD . '=>');
530+
$output = rtrim(Utils::getStringAfterSign($output, self::POST_THREAD . '=>'));
487531
}
488532
}
489533
} else {

src/vennv/vapm/Work.php

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,12 @@ interface WorkInterface
3131

3232
/**
3333
* @param callable $work
34+
* @param array<int|float|array|object|null, mixed> $args
3435
* @return void
3536
*
3637
* The work is a function that will be executed when the work is run.
3738
*/
38-
public function add(callable $work): void;
39+
public function add(callable $work, array $args = []): void;
3940

4041
/**
4142
* @param int $index
@@ -107,9 +108,16 @@ public function __construct()
107108
$this->queue = new SplQueue();
108109
}
109110

110-
public function add(callable $work): void
111+
/**
112+
* @param callable $work
113+
* @param array<int|float|array|object|null, mixed> $args
114+
* @return void
115+
*
116+
* Add a work to the work list.
117+
*/
118+
public function add(callable $work, array $args = []): void
111119
{
112-
$this->queue->enqueue($work);
120+
$this->queue->enqueue(new ClosureThread($work, $args));
113121
}
114122

115123
public function remove(int $index): void
@@ -151,8 +159,9 @@ public function run(): void
151159
{
152160
$gc = new GarbageCollection();
153161
while (!$this->queue->isEmpty()) {
162+
/** @var ClosureThread $work */
154163
$work = $this->queue->dequeue();
155-
if (is_callable($work)) $work();
164+
$work->start();
156165
$gc->collectWL();
157166
}
158167
}

src/vennv/vapm/Worker.php

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,6 @@ public function __construct(Work $work, array $options = ["threads" => 4])
151151
$this->work = $work;
152152
$this->options = $options;
153153
$this->id = $this->generateId();
154-
155154
self::$workers[$this->id] = [];
156155
}
157156

@@ -230,12 +229,9 @@ public function run(callable $callback): Async
230229
while ($this->isLocked() || $totalCountWorks > 0) {
231230
if (!$this->isLocked()) {
232231
if (count($promises) < $threads && $work->count() > 0) {
232+
/** @var ClosureThread $callbackQueue */
233233
$callbackQueue = $work->dequeue();
234-
235-
if (!is_callable($callbackQueue)) continue;
236-
237-
$thread = new ClosureThread($callbackQueue);
238-
$promises[] = $thread->start();
234+
$promises[] = $callbackQueue->start();
239235
} else {
240236
/** @var Promise $promise */
241237
foreach ($promises as $index => $promise) {

0 commit comments

Comments
 (0)