Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

try: 

    from io import StringIO 

except ImportError: 

    from StringIO import StringIO 

import posixpath 

import os 

 

from django.core.exceptions import ValidationError 

from django.forms.fields import MultiValueField, FileField, URLField 

from django.forms.util import ErrorList 

from django.core.validators import EMPTY_VALUES 

from django.core.files.uploadedfile import UploadedFile, InMemoryUploadedFile 

from django.core.files.storage import default_storage 

from django.core.files.base import ContentFile 

 

import requests 

 

from .widgets import FileOrURLWidget 

 

 

__all__ = ['MutuallyExclusiveValueField', 'FileOrURLField'] 

 

 

class MutuallyExclusiveValueField(MultiValueField): 

    too_many_values_error = 'Exactly One field is required, no more' 

    empty_values = EMPTY_VALUES 

 

    def clean(self, value): 

        """ 

        Validates every value in the given list. A value is validated against 

        the corresponding Field in self.fields. 

 

        Only allows for exactly 1 valid value to be submitted, this is what 

        gets returned by compress. 

 

        example to use directy (instead of using FileOrURLField): 

            MutuallyExclusiveValueField( 

                fields=(forms.IntegerField(), forms.IntegerField()), 

                widget=MutuallyExclusiveRadioWidget(widgets=[ 

                        forms.Select(choices=[(1,1), (2,2)]), 

                        forms.TextInput(attrs={'placeholder': 

                                               'Enter a number'}), 

                    ])) 

        """ 

        clean_data = [] 

        errors = ErrorList() 

        if not value or isinstance(value, (list, tuple)): 

            if not value or not [ 

                    v for v in value if v not in self.empty_values]: 

                if self.required: 

                    raise ValidationError( 

                        self.error_messages['required'], code='required') 

                else: 

                    return self.compress([]) 

        else: 

            raise ValidationError( 

                self.error_messages['invalid'], code='invalid') 

        for i, field in enumerate(self.fields): 

            try: 

                field_value = value[i] 

            except IndexError: 

                field_value = None 

            try: 

                clean_data.append(field.clean(field_value)) 

            except ValidationError as e: 

                # Collect all validation errors in a single list, which we'll 

                # raise at the end of clean(), rather than raising a single 

                # exception for the first error we encounter. 

                errors.extend(e.messages) 

        if errors: 

            raise ValidationError(errors) 

 

        out = self.compress(clean_data) 

        self.validate(out) 

        self.run_validators(out) 

        return out 

 

    def compress(self, data_list): 

        """ 

        Returns a single value for the given list of values. The values can be 

        assumed to be valid. 

 

        For example, if this MultiValueField was instantiated with 

        fields=(DateField(), TimeField()), this might return a datetime 

        object created by combining the date and time in data_list. 

        """ 

 

        non_empty_list = [d for d in data_list if d not in self.empty_values] 

 

        if len(non_empty_list) == 0 and not self.required: 

            return None 

        elif len(non_empty_list) > 1: 

            raise ValidationError(self.too_many_values_error) 

 

        return non_empty_list[0] 

 

 

class FileOrURLField(MutuallyExclusiveValueField): 

    widget = FileOrURLWidget 

    url_fetch_error = 'Failed to fetch URL specified' 

 

    def __init__(self, to=None, *args, **kwargs): 

        """ 

        Accepts EITHER a file or an URL. 

        The `to` parameter accepts 3 values: 

            None: default to_python, returns either url or file 

            'file': if an url is submited, download it into an inmemory object 

            'url': uploads the file to default storage and returns the URL 

        The`upload_to` param must be set when to='url' 

        if using AWS, set no_aws_qs to disable querystring auth 

        """ 

        self.to = to 

        self.no_aws_qs = kwargs.pop('no_aws_qs', False) 

        if 'upload_to' in kwargs: 

            self.upload_to = kwargs.pop('upload_to') 

        elif self.to == 'url': 

            raise RuntimeError('If normalizing to an URL `upload_to` ' 

                               'must be set') 

        fields = (FileField(), URLField()) 

        super(FileOrURLField, self).__init__(fields, *args, **kwargs) 

 

    def compress(self, data_list): 

        """ override just cause we want a to_python """ 

        value = super(FileOrURLField, self).compress(data_list) 

        return self.to_python(value) 

 

    def to_python(self, value): 

        value = super(FileOrURLField, self).to_python(value) 

 

        if self.to == None: 

            return value 

        elif self.to == 'file' and not isinstance(value, UploadedFile): 

            try: 

                resp = requests.get(value) 

            except: 

                raise ValidationError(self.url_fetch_error) 

            if not (200 <= resp.status_code < 400): 

                raise ValidationError(self.url_fetch_error) 

            io = StringIO(unicode(resp.content)) 

            io.seek(0) 

            io.seek(os.SEEK_END) 

            size = io.tell() 

            io.seek(0) 

            return InMemoryUploadedFile( 

                io, None, 

                posixpath.basename(value), 

                resp.headers['content-type'], 

                size, None) 

        elif self.to == 'url' and isinstance(value, UploadedFile): 

            path = default_storage.save( 

                posixpath.join(self.upload_to, value.name), 

                ContentFile(value.read())) 

            if self.no_aws_qs: 

                default_storage.querystring_auth = False 

            return default_storage.url(path) 

 

        return value