Source code for feast.infra.materialization.contrib.bytewax.bytewax_materialization_job

from typing import Optional

from kubernetes import client

from feast.infra.materialization.batch_materialization_engine import (
    MaterializationJob,
    MaterializationJobStatus,
)


[docs]class BytewaxMaterializationJob(MaterializationJob): def __init__( self, job_id, namespace, error: Optional[BaseException] = None, ): super().__init__() self._job_id = job_id self.namespace = namespace self._error: Optional[BaseException] = error self.batch_v1 = client.BatchV1Api()
[docs] def error(self): return self._error
[docs] def status(self): if self._error is not None: return MaterializationJobStatus.ERROR else: # TODO: Find a better way to parse status? job_status = self.batch_v1.read_namespaced_job_status( self.job_id(), self.namespace ).status if job_status.active is not None: if job_status.completion_time is None: return MaterializationJobStatus.RUNNING elif job_status.failed is not None: return MaterializationJobStatus.ERROR elif job_status.active is None and job_status.succeeded is not None: if job_status.conditions[0].type == "Complete": return MaterializationJobStatus.SUCCEEDED
[docs] def should_be_retried(self): return False
[docs] def job_id(self): return f"dataflow-{self._job_id}"
[docs] def url(self): return None