-
Notifications
You must be signed in to change notification settings - Fork 46
Expand file tree
/
Copy pathFileProcessingWorkflow.php
More file actions
56 lines (45 loc) · 1.78 KB
/
FileProcessingWorkflow.php
File metadata and controls
56 lines (45 loc) · 1.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
<?php
/**
* This file is part of Temporal package.
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Temporal\Samples\FileProcessing;
use Carbon\CarbonInterval;
use Temporal\Activity\ActivityOptions;
use Temporal\Common\RetryOptions;
use Temporal\Internal\Workflow\ActivityProxy;
use Temporal\Workflow;
class FileProcessingWorkflow implements FileProcessingWorkflowInterface
{
public const DEFAULT_TASK_QUEUE = 'default';
/** @var ActivityProxy|StoreActivitiesInterface */
private $defaultStoreActivities;
public function __construct()
{
$this->defaultStoreActivities = Workflow::newActivityStub(
StoreActivitiesInterface::class,
ActivityOptions::new()
->withScheduleToCloseTimeout(CarbonInterval::minute(5))
->withTaskQueue(self::DEFAULT_TASK_QUEUE)
);
}
public function processFile(string $sourceURL, string $destinationURL)
{
/** @var TaskQueueFilenamePair $downloaded */
$downloaded = yield $this->defaultStoreActivities->download($sourceURL);
$hostSpecificStore = Workflow::newActivityStub(
StoreActivitiesInterface::class,
ActivityOptions::new()
->withScheduleToCloseTimeout(CarbonInterval::minute(5))
->withTaskQueue($downloaded->hostTaskQueue)
);
// Call processFile activity to zip the file.
// Call the activity to process the file using worker-specific task queue.
$processed = yield $hostSpecificStore->process($downloaded->filename);
// Call upload activity to upload the zipped file.
yield $hostSpecificStore->upload($processed, $destinationURL);
return 'OK';
}
}