Source code for airflow.contrib.operators.gcp_text_to_speech_operator

# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

from tempfile import NamedTemporaryFile

from airflow import AirflowException
from airflow.contrib.hooks.gcp_text_to_speech_hook import GCPTextToSpeechHook
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults


[docs]class GcpTextToSpeechSynthesizeOperator(BaseOperator): """ Synthesizes text to speech and stores it in Google Cloud Storage .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:GcpTextToSpeechSynthesizeOperator` :param input_data: text input to be synthesized. See more: https://googleapis.github.io/google-cloud-python/latest/texttospeech/gapic/v1/types.html#google.cloud.texttospeech_v1.types.SynthesisInput :type input_data: dict or google.cloud.texttospeech_v1.types.SynthesisInput :param voice: configuration of voice to be used in synthesis. See more: https://googleapis.github.io/google-cloud-python/latest/texttospeech/gapic/v1/types.html#google.cloud.texttospeech_v1.types.VoiceSelectionParams :type voice: dict or google.cloud.texttospeech_v1.types.VoiceSelectionParams :param audio_config: configuration of the synthesized audio. See more: https://googleapis.github.io/google-cloud-python/latest/texttospeech/gapic/v1/types.html#google.cloud.texttospeech_v1.types.AudioConfig :type audio_config: dict or google.cloud.texttospeech_v1.types.AudioConfig :param target_bucket_name: name of the GCS bucket in which output file should be stored :type target_bucket_name: str :param target_filename: filename of the output file. :type target_filename: str :param project_id: Optional, Google Cloud Platform Project ID where the Compute Engine Instance exists. If set to None or missing, the default project_id from the GCP connection is used. :type project_id: str :param gcp_conn_id: Optional, The connection ID used to connect to Google Cloud Platform. Defaults to 'google_cloud_default'. :type gcp_conn_id: str :param retry: (Optional) A retry object used to retry requests. If None is specified, requests will not be retried. :type retry: google.api_core.retry.Retry :param timeout: (Optional) The amount of time, in seconds, to wait for the request to complete. Note that if retry is specified, the timeout applies to each individual attempt. :type timeout: float """ # [START gcp_text_to_speech_synthesize_template_fields]
[docs] template_fields = ( "input_data", "voice", "audio_config", "project_id", "gcp_conn_id", "target_bucket_name", "target_filename",
) # [END gcp_text_to_speech_synthesize_template_fields] @apply_defaults def __init__( self, input_data, voice, audio_config, target_bucket_name, target_filename, project_id=None, gcp_conn_id="google_cloud_default", retry=None, timeout=None, *args, **kwargs ): self.input_data = input_data self.voice = voice self.audio_config = audio_config self.target_bucket_name = target_bucket_name self.target_filename = target_filename self.project_id = project_id self.gcp_conn_id = gcp_conn_id self.retry = retry self.timeout = timeout self._validate_inputs() super(GcpTextToSpeechSynthesizeOperator, self).__init__(*args, **kwargs)
[docs] def _validate_inputs(self): for parameter in [ "input_data", "voice", "audio_config", "target_bucket_name", "target_filename", ]: if getattr(self, parameter) == "": raise AirflowException("The required parameter '{}' is empty".format(parameter))
[docs] def execute(self, context): gcp_text_to_speech_hook = GCPTextToSpeechHook(gcp_conn_id=self.gcp_conn_id) result = gcp_text_to_speech_hook.synthesize_speech( input_data=self.input_data, voice=self.voice, audio_config=self.audio_config, retry=self.retry, timeout=self.timeout, ) with NamedTemporaryFile() as temp_file: temp_file.write(result.audio_content) cloud_storage_hook = GoogleCloudStorageHook(google_cloud_storage_conn_id=self.gcp_conn_id) cloud_storage_hook.upload( bucket=self.target_bucket_name, object=self.target_filename, filename=temp_file.name
)

Was this entry helpful?